From 57268e666c1ba6203de92ebf15606e45d2d9f261 Mon Sep 17 00:00:00 2001 From: Alex Damian Date: Thu, 25 Oct 2018 10:39:22 -0400 Subject: [PATCH] Added time_point overloads for creating timestamps. (#128) * Added time_point overloads for creating timestamps. * aliased std::chrono types --- include/cppkafka/message.h | 11 +++++++++-- include/cppkafka/message_builder.h | 21 +++++++++++++++++++-- src/message.cpp | 10 +++++++++- tests/producer_test.cpp | 6 ++++-- 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 75d053e..ad9a980 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -240,12 +240,19 @@ public: }; /** - * Constructs a timestamp object + * Constructs a timestamp object using a 'duration'. */ MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type); + + /** + * Constructs a timestamp object using a 'time_point'. + */ + template + MessageTimestamp(std::chrono::time_point timestamp, TimestampType type); /** - * Gets the timestamp value + * Gets the timestamp value. If the timestamp was created with a 'time_point', + * the duration represents the number of milliseconds since epoch. */ std::chrono::milliseconds get_timestamp() const; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 6f8f123..df216d9 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -128,11 +128,19 @@ public: Concrete& payload(BufferType&& value); /** - * Sets the message's timestamp + * Sets the message's timestamp with a 'duration' * * \param value The timestamp to be used */ Concrete& timestamp(std::chrono::milliseconds value); + + /** + * Sets the message's timestamp with a 'time_point'. + * + * \param value The timestamp to be used + */ + template + Concrete& timestamp(std::chrono::time_point value); /** * Sets the message's user data pointer @@ -184,7 +192,8 @@ public: BufferType& payload(); /** - * Gets the message's timestamp + * Gets the message's timestamp as a duration. If the timestamp was created with a 'time_point', + * the duration represents the number of milliseconds since epoch. */ std::chrono::milliseconds timestamp() const; @@ -295,6 +304,14 @@ C& BasicMessageBuilder::timestamp(std::chrono::milliseconds value) { return get_concrete(); } +template +template +C& BasicMessageBuilder::timestamp(std::chrono::time_point value) +{ + timestamp_ = std::chrono::duration_cast(value.time_since_epoch()); + return get_concrete(); +} + template C& BasicMessageBuilder::user_data(void* value) { user_data_ = value; diff --git a/src/message.cpp b/src/message.cpp index 798642d..77c0989 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -87,10 +87,18 @@ Message& Message::load_internal() { // MessageTimestamp MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) -: timestamp_(timestamp), type_(type) { +: timestamp_(timestamp), + type_(type) { } +template +MessageTimestamp::MessageTimestamp(std::chrono::time_point timestamp, TimestampType type) +: timestamp_(std::chrono::duration_cast(timestamp.time_since_epoch())), + type_(type) { + +} + milliseconds MessageTimestamp::get_timestamp() const { return timestamp_; } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 693954c..64f70b8 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -24,6 +24,8 @@ using std::condition_variable; using std::chrono::system_clock; using std::chrono::seconds; using std::chrono::milliseconds; +using std::chrono::time_point; +using std::chrono::duration_cast; using std::ref; using namespace cppkafka; @@ -164,7 +166,7 @@ TEST_CASE("simple production", "[producer]") { SECTION("message with key") { const string payload = "Hello world! 2"; const string key = "such key"; - const milliseconds timestamp{15}; + auto timestamp = system_clock::now(); Producer producer(config); producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) .key(key) @@ -181,7 +183,7 @@ TEST_CASE("simple production", "[producer]") { CHECK(message.get_partition() == partition); CHECK(!!message.get_error() == false); REQUIRE(!!message.get_timestamp() == true); - CHECK(message.get_timestamp()->get_timestamp() == timestamp); + CHECK(message.get_timestamp()->get_timestamp() == duration_cast(timestamp.time_since_epoch())); } #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)