From 52822fdb613ac1e74724778b2dd250e9aba58a42 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 10 Jun 2017 19:15:53 -0700 Subject: [PATCH] Move some small functions into header files --- include/cppkafka/message.h | 52 ++++++++++++++---- include/cppkafka/utils/consumer_dispatcher.h | 4 +- src/message.cpp | 55 -------------------- src/utils/consumer_dispatcher.cpp | 8 --- 4 files changed, 43 insertions(+), 76 deletions(-) diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 18fa1a2..42354fd 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -82,37 +82,51 @@ public: /** * Gets the error attribute */ - Error get_error() const; + Error get_error() const { + return handle_->err; + } /** * Utility function to check for get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF */ - bool is_eof() const; + bool is_eof() const { + return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF; + } /** * Gets the topic that this message belongs to */ - std::string get_topic() const; + std::string get_topic() const { + return rd_kafka_topic_name(handle_->rkt); + } /** * Gets the partition that this message belongs to */ - int get_partition() const; + int get_partition() const { + return handle_->partition; + } /** * Gets the message's payload */ - const Buffer& get_payload() const; + const Buffer& get_payload() const { + return payload_; + } /** * Gets the message's key */ - const Buffer& get_key() const; + const Buffer& get_key() const { + return key_; + } /** * Gets the message offset */ - int64_t get_offset() const; + int64_t get_offset() const { + return handle_->offset; + } /** * \brief Gets the private data. @@ -120,24 +134,30 @@ public: * This should only be used on messages produced by a Producer that were set a private data * attribute */ - void* get_private_data() const; + void* get_private_data() const { + return handle_->_private; + } /** * \brief Gets this Message's timestamp * * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. */ - boost::optional get_timestamp() const; + inline boost::optional get_timestamp() const; /** * Indicates whether this message is valid (not null) */ - explicit operator bool() const; + explicit operator bool() const { + return handle_ != nullptr; + } /** * Gets the rdkafka message handle */ - rd_kafka_message_t* get_handle() const; + rd_kafka_message_t* get_handle() const { + return handle_.get(); + } private: using HandlePtr = std::unique_ptr; @@ -183,6 +203,16 @@ private: TimestampType type_; }; +boost::optional Message::get_timestamp() const { + rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); + if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + return {}; + } + return MessageTimestamp(std::chrono::milliseconds(timestamp), + static_cast(type)); +} + } // cppkafka #endif // CPPKAFKA_MESSAGE_H diff --git a/include/cppkafka/utils/consumer_dispatcher.h b/include/cppkafka/utils/consumer_dispatcher.h index 0b1925d..d0c8012 100644 --- a/include/cppkafka/utils/consumer_dispatcher.h +++ b/include/cppkafka/utils/consumer_dispatcher.h @@ -109,8 +109,8 @@ private: using OnTimeoutArgs = std::tuple; static void handle_error(Error error); - static void handle_eof(EndOfFile, const TopicPartition& topic_partition); - static void handle_timeout(Timeout); + static void handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { } + static void handle_timeout(Timeout) { } // Traits and template helpers diff --git a/src/message.cpp b/src/message.cpp index 605613a..a9236bc 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -29,13 +29,8 @@ #include "message.h" -using std::string; - using std::chrono::milliseconds; -using boost::optional; -using boost::none_t; - namespace cppkafka { void dummy_deleter(rd_kafka_message_t*) { @@ -68,56 +63,6 @@ Message::Message(HandlePtr handle) } -Error Message::get_error() const { - return handle_->err; -} - -bool Message::is_eof() const { - return get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF; -} - -int Message::get_partition() const { - return handle_->partition; -} - -string Message::get_topic() const { - return rd_kafka_topic_name(handle_->rkt); -} - -const Buffer& Message::get_payload() const { - return payload_; -} - -const Buffer& Message::get_key() const { - return key_; -} - -int64_t Message::get_offset() const { - return handle_->offset; -} - -void* Message::get_private_data() const { - return handle_->_private; -} - -optional Message::get_timestamp() const { - rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; - int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); - if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { - return {}; - } - return MessageTimestamp(milliseconds(timestamp), - static_cast(type)); -} - -Message::operator bool() const { - return handle_ != nullptr; -} - -rd_kafka_message_t* Message::get_handle() const { - return handle_.get(); -} - // MessageTimestamp MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) diff --git a/src/utils/consumer_dispatcher.cpp b/src/utils/consumer_dispatcher.cpp index 2e2ef9e..d084127 100644 --- a/src/utils/consumer_dispatcher.cpp +++ b/src/utils/consumer_dispatcher.cpp @@ -44,12 +44,4 @@ void ConsumerDispatcher::handle_error(Error error) { throw ConsumerException(error); } -void ConsumerDispatcher::handle_eof(EndOfFile, const TopicPartition& /*topic_partition*/) { - -} - -void ConsumerDispatcher::handle_timeout(Timeout) { - -} - } // cppkafka