From c615664f122d45cfbd360a720ebb4f7b5a6518ec Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 19 Jun 2016 21:00:25 -0700 Subject: [PATCH] Add Message::get_timestamp --- include/cppkafka/message.h | 41 +++++++++++++++++++++++++++++++++++++- src/message.cpp | 29 ++++++++++++++++++++++++++- tests/producer_test.cpp | 2 ++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 4048617..c910f5e 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -32,6 +32,7 @@ #include #include +#include #include #include "buffer.h" #include "topic.h" @@ -39,6 +40,8 @@ namespace cppkafka { +class MessageTimestamp; + /** * \brief Thin wrapper over a rdkafka message handle * @@ -126,7 +129,14 @@ public: * This should only be used on messages produced by a Producer that were set a private data * attribute */ - void* private_data() const; + void* get_private_data() const; + + /** + * \brief Gets this Message's timestamp + * + * If calling rd_kafka_message_timestamp returns -1, then boost::none_t will be returned. + */ + boost::optional get_timestamp() const; /** * Indicates whether this message is valid (not null) @@ -150,6 +160,35 @@ private: Buffer key_; }; +class CPPKAFKA_API MessageTimestamp { +public: + /** + * The timestamp type + */ + enum TimestampType { + CREATE_TIME = RD_KAFKA_TIMESTAMP_CREATE_TIME, + LOG_APPEND_TIME = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME + }; + + /** + * Constructs a timestamp object + */ + MessageTimestamp(int64_t timestamp, TimestampType type); + + /** + * Gets the timestamp value + */ + int64_t get_timestamp() const; + + /** + * Gets the timestamp type + */ + TimestampType get_type() const; +private: + int64_t timestamp_; + TimestampType type_; +}; + } // cppkafka #endif // CPPKAFKA_MESSAGE_H diff --git a/src/message.cpp b/src/message.cpp index f73e604..07e453f 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -31,6 +31,9 @@ using std::string; +using boost::optional; +using boost::none_t; + namespace cppkafka { void dummy_deleter(rd_kafka_message_t*) { @@ -99,10 +102,19 @@ int64_t Message::get_offset() const { return handle_->offset; } -void* Message::private_data() const { +void* Message::get_private_data() const { return handle_->_private; } +optional Message::get_timestamp() const { + rd_kafka_timestamp_type_t type = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE; + int64_t timestamp = rd_kafka_message_timestamp(handle_.get(), &type); + if (timestamp == -1 || type == RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) { + return none_t(); + } + return MessageTimestamp(timestamp, static_cast(type)); +} + Message::operator bool() const { return handle_ != nullptr; } @@ -111,4 +123,19 @@ rd_kafka_message_t* Message::get_handle() const { return handle_.get(); } +// MessageTimestamp + +MessageTimestamp::MessageTimestamp(int64_t timestamp, TimestampType type) +: timestamp_(timestamp), type_(type) { + +} + +int64_t MessageTimestamp::get_timestamp() const { + return timestamp_; +} + +MessageTimestamp::TimestampType MessageTimestamp::get_type() const { + return type_; +} + } // cppkafka diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 3871257..93f2bf5 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -156,6 +156,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) { EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error()); + // NOTE: if this line fails, then you're using kafka 0.10+ and that's okay + EXPECT_FALSE(message.get_timestamp()); } TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {