diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 6df9f49..a28f5f9 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -30,6 +30,7 @@ #ifndef CPPKAFKA_MESSAGE_BUILDER_H #define CPPKAFKA_MESSAGE_BUILDER_H +#include #include "buffer.h" #include "topic.h" #include "macros.h" @@ -49,6 +50,21 @@ public: */ BasicMessageBuilder(std::string topic); + /** + * \brief Construct a message builder from another one that uses a different buffer type + * + * Note that this can only be used if BufferType can be constructed from an OtherBufferType + * + * \param rhs The message builder to be constructed from + */ + template + BasicMessageBuilder(const BasicMessageBuilder& rhs); + + BasicMessageBuilder(BasicMessageBuilder&&) = default; + BasicMessageBuilder(const BasicMessageBuilder&) = default; + BasicMessageBuilder& operator=(BasicMessageBuilder&&) = default; + BasicMessageBuilder& operator=(const BasicMessageBuilder&) = default; + /** * Sets the topic in which this message will be produced * @@ -91,6 +107,13 @@ public: */ Concrete& payload(BufferType&& value); + /** + * Sets the message's timestamp + * + * \param value The timestamp to be used + */ + Concrete& timestamp(std::chrono::milliseconds value); + /** * Sets the message's user data pointer * @@ -128,17 +151,24 @@ public: */ BufferType& payload(); + /** + * Gets the message's timestamp + */ + std::chrono::milliseconds timestamp() const; + /** * Gets the message's user data pointer */ void* user_data() const; private: void construct_buffer(BufferType& lhs, const BufferType& rhs); + Concrete& get_concrete(); std::string topic_; int partition_{-1}; BufferType key_; BufferType payload_; + std::chrono::milliseconds timestamp_{0}; void* user_data_; }; @@ -147,46 +177,61 @@ BasicMessageBuilder::BasicMessageBuilder(std::string topic) : topic_(std::move(topic)) { } +template +template +BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) +: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), + user_data_(rhs.user_data()) { + get_concrete().construct_buffer(key_, rhs.key()); + get_concrete().construct_buffer(payload_, rhs.payload()); +} + template C& BasicMessageBuilder::topic(std::string value) { topic_ = std::move(value); - return static_cast(*this); + return get_concrete(); } template C& BasicMessageBuilder::partition(int value) { partition_ = value; - return static_cast(*this); + return get_concrete(); } template C& BasicMessageBuilder::key(const T& value) { - static_cast(*this).construct_buffer(key_, value); - return static_cast(*this); + get_concrete().construct_buffer(key_, value); + return get_concrete(); } template C& BasicMessageBuilder::key(T&& value) { key_ = std::move(value); - return static_cast(*this); + return get_concrete(); } template C& BasicMessageBuilder::payload(const T& value) { - static_cast(*this).construct_buffer(payload_, value); - return static_cast(*this); + get_concrete().construct_buffer(payload_, value); + return get_concrete(); } template C& BasicMessageBuilder::payload(T&& value) { payload_ = std::move(value); - return static_cast(*this); + return get_concrete(); +} + +template +C& BasicMessageBuilder::timestamp(std::chrono::milliseconds value) { + timestamp_ = value; + return get_concrete(); } template C& BasicMessageBuilder::user_data(void* value) { user_data_ = value; - return static_cast(*this); + return get_concrete(); } template @@ -219,6 +264,11 @@ T& BasicMessageBuilder::payload() { return payload_; } +template +std::chrono::milliseconds BasicMessageBuilder::timestamp() const { + return timestamp_; +} + template void* BasicMessageBuilder::user_data() const { return user_data_; @@ -229,6 +279,11 @@ void BasicMessageBuilder::construct_buffer(T& lhs, const T& rhs) { lhs = rhs; } +template +C& BasicMessageBuilder::get_concrete() { + return static_cast(*this); +} + /** * \brief Message builder class * @@ -250,6 +305,11 @@ public: void construct_buffer(Buffer& lhs, const Buffer& rhs) { lhs = Buffer(rhs.get_data(), rhs.get_size()); } + + template + void construct_buffer(Buffer& lhs, const T& rhs) { + lhs = Buffer(rhs); + } }; /** diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index d7b9413..0c99d6a 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -124,13 +124,8 @@ void BufferedProducer::flush() { template template void BufferedProducer::do_add_message(BuilderType&& builder) { - Builder local_builder(builder.topic()); - local_builder.partition(builder.partition()) - .key(std::move(builder.key())) - .payload(std::move(builder.payload())); - IndexType index = messages_.size(); - messages_.emplace(index, std::move(local_builder)); + messages_.emplace(index, std::move(builder)); } template @@ -152,11 +147,8 @@ BufferedProducer::make_builder(std::string topic) { template void BufferedProducer::produce_message(IndexType index, Builder& builder) { bool sent = false; - MessageBuilder local_builder(builder.topic()); - local_builder.partition(builder.partition()) - .key(builder.key()) - .payload(builder.payload()) - .user_data(reinterpret_cast(index)); + MessageBuilder local_builder = builder; + local_builder.user_data(reinterpret_cast(index)); while (!sent) { try { producer_.produce(local_builder); diff --git a/src/producer.cpp b/src/producer.cpp index a7fc643..7e4e8d2 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -70,8 +70,9 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_PARTITION(builder.partition()), RD_KAFKA_V_MSGFLAGS(policy), - RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_END); check_error(result);