Added time_point overloads for creating timestamps. (#128)

* Added time_point overloads for creating timestamps.

* aliased std::chrono types
This commit is contained in:
Alex Damian
2018-10-25 10:39:22 -04:00
committed by Matias Fontanini
parent ad9a1e4a49
commit 57268e666c
4 changed files with 41 additions and 7 deletions

View File

@@ -240,12 +240,19 @@ public:
}; };
/** /**
* Constructs a timestamp object * Constructs a timestamp object using a 'duration'.
*/ */
MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type); MessageTimestamp(std::chrono::milliseconds timestamp, TimestampType type);
/**
* Constructs a timestamp object using a 'time_point'.
*/
template <typename Clock, typename Duration = typename Clock::duration>
MessageTimestamp(std::chrono::time_point<Clock, Duration> timestamp, TimestampType type);
/** /**
* Gets the timestamp value * Gets the timestamp value. If the timestamp was created with a 'time_point',
* the duration represents the number of milliseconds since epoch.
*/ */
std::chrono::milliseconds get_timestamp() const; std::chrono::milliseconds get_timestamp() const;

View File

@@ -128,11 +128,19 @@ public:
Concrete& payload(BufferType&& value); Concrete& payload(BufferType&& value);
/** /**
* Sets the message's timestamp * Sets the message's timestamp with a 'duration'
* *
* \param value The timestamp to be used * \param value The timestamp to be used
*/ */
Concrete& timestamp(std::chrono::milliseconds value); Concrete& timestamp(std::chrono::milliseconds value);
/**
* Sets the message's timestamp with a 'time_point'.
*
* \param value The timestamp to be used
*/
template <typename Clock, typename Duration = typename Clock::duration>
Concrete& timestamp(std::chrono::time_point<Clock, Duration> value);
/** /**
* Sets the message's user data pointer * Sets the message's user data pointer
@@ -184,7 +192,8 @@ public:
BufferType& payload(); BufferType& payload();
/** /**
* Gets the message's timestamp * Gets the message's timestamp as a duration. If the timestamp was created with a 'time_point',
* the duration represents the number of milliseconds since epoch.
*/ */
std::chrono::milliseconds timestamp() const; std::chrono::milliseconds timestamp() const;
@@ -295,6 +304,14 @@ C& BasicMessageBuilder<T, C>::timestamp(std::chrono::milliseconds value) {
return get_concrete(); return get_concrete();
} }
template <typename T, typename C>
template <typename Clock, typename Duration>
C& BasicMessageBuilder<T, C>::timestamp(std::chrono::time_point<Clock, Duration> value)
{
timestamp_ = std::chrono::duration_cast<std::chrono::milliseconds>(value.time_since_epoch());
return get_concrete();
}
template <typename T, typename C> template <typename T, typename C>
C& BasicMessageBuilder<T, C>::user_data(void* value) { C& BasicMessageBuilder<T, C>::user_data(void* value) {
user_data_ = value; user_data_ = value;

View File

@@ -87,10 +87,18 @@ Message& Message::load_internal() {
// MessageTimestamp // MessageTimestamp
MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type) MessageTimestamp::MessageTimestamp(milliseconds timestamp, TimestampType type)
: timestamp_(timestamp), type_(type) { : timestamp_(timestamp),
type_(type) {
} }
template <typename Clock, typename Duration>
MessageTimestamp::MessageTimestamp(std::chrono::time_point<Clock, Duration> timestamp, TimestampType type)
: timestamp_(std::chrono::duration_cast<std::chrono::milliseconds>(timestamp.time_since_epoch())),
type_(type) {
}
milliseconds MessageTimestamp::get_timestamp() const { milliseconds MessageTimestamp::get_timestamp() const {
return timestamp_; return timestamp_;
} }

View File

@@ -24,6 +24,8 @@ using std::condition_variable;
using std::chrono::system_clock; using std::chrono::system_clock;
using std::chrono::seconds; using std::chrono::seconds;
using std::chrono::milliseconds; using std::chrono::milliseconds;
using std::chrono::time_point;
using std::chrono::duration_cast;
using std::ref; using std::ref;
using namespace cppkafka; using namespace cppkafka;
@@ -164,7 +166,7 @@ TEST_CASE("simple production", "[producer]") {
SECTION("message with key") { SECTION("message with key") {
const string payload = "Hello world! 2"; const string payload = "Hello world! 2";
const string key = "such key"; const string key = "such key";
const milliseconds timestamp{15}; auto timestamp = system_clock::now();
Producer producer(config); Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition) producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
.key(key) .key(key)
@@ -181,7 +183,7 @@ TEST_CASE("simple production", "[producer]") {
CHECK(message.get_partition() == partition); CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false); CHECK(!!message.get_error() == false);
REQUIRE(!!message.get_timestamp() == true); REQUIRE(!!message.get_timestamp() == true);
CHECK(message.get_timestamp()->get_timestamp() == timestamp); CHECK(message.get_timestamp()->get_timestamp() == duration_cast<milliseconds>(timestamp.time_since_epoch()));
} }
#if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION) #if (RD_KAFKA_VERSION >= RD_KAFKA_HEADERS_SUPPORT_VERSION)