diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 05474cb..7820c19 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -108,11 +108,21 @@ public: */ const BufferType& key() const; + /** + * Gets the message's key + */ + BufferType& key(); + /** * Gets the message's payload */ const BufferType& payload() const; + /** + * Gets the message's payload + */ + BufferType& payload(); + /** * Gets the message's user data pointer */ @@ -183,11 +193,21 @@ const T& BasicMessageBuilder::key() const { return key_; } +template +T& BasicMessageBuilder::key() { + return key_; +} + template const T& BasicMessageBuilder::payload() const { return payload_; } +template +T& BasicMessageBuilder::payload() { + return payload_; +} + template void* BasicMessageBuilder::user_data() const { return user_data_; diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 8fce20c..ceed67a 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -38,6 +38,18 @@ public: */ void add_message(const MessageBuilder& builder); + /** + * \brief Adds a message to the producer's buffer. + * + * The message won't be sent until flush is called. + * + * Using this overload, you can avoid copies and construct your builder using the type + * you are actually using in this buffered producer. + * + * \param builder The builder that contains the message to be added + */ + void add_message(Builder builder); + /** * \brief Flushes the buffered messages. * @@ -54,10 +66,17 @@ public: * Gets the Producer object */ const Producer& get_producer() const; + + /** + * Simple helper to construct a builder object + */ + Builder make_builder(const Topic& topic); private: // Pick the most appropriate index type depending on the platform we're using using IndexType = std::conditional::type; + template + void do_add_message(BuilderType&& builder); const Topic& get_topic(const std::string& topic); void produce_message(IndexType index, Builder& message); Configuration prepare_configuration(Configuration config); @@ -79,13 +98,12 @@ BufferedProducer::BufferedProducer(Configuration config) template void BufferedProducer::add_message(const MessageBuilder& builder) { - Builder local_builder(get_topic(builder.topic().get_name())); - local_builder.partition(builder.partition()); - local_builder.key(builder.key()); - local_builder.payload(builder.payload()); + do_add_message(builder); +} - IndexType index = messages_.size(); - messages_.emplace(index, std::move(local_builder)); +template +void BufferedProducer::add_message(Builder builder) { + do_add_message(move(builder)); } template @@ -105,6 +123,18 @@ void BufferedProducer::flush() { } } +template +template +void BufferedProducer::do_add_message(BuilderType&& builder) { + Builder local_builder(get_topic(builder.topic().get_name())); + local_builder.partition(builder.partition()); + local_builder.key(std::move(builder.key())); + local_builder.payload(std::move(builder.payload())); + + IndexType index = messages_.size(); + messages_.emplace(index, std::move(local_builder)); +} + template Producer& BufferedProducer::get_producer() { return producer_; @@ -115,6 +145,12 @@ const Producer& BufferedProducer::get_producer() const { return producer_; } +template +typename BufferedProducer::Builder +BufferedProducer::make_builder(const Topic& topic) { + return Builder(topic); +} + template const Topic& BufferedProducer::get_topic(const std::string& topic) { auto iter = topic_mapping_.find(topic); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index c8df3e2..d8929fc 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -294,7 +294,7 @@ TEST_F(ProducerTest, BufferedProducer) { string key = "such key"; Topic topic = producer.get_producer().get_topic(KAFKA_TOPIC); producer.add_message(MessageBuilder(topic).partition(partition).key(key).payload(payload)); - producer.add_message(MessageBuilder(topic).partition(partition).payload(payload)); + producer.add_message(producer.make_builder(topic).partition(partition).payload(payload)); producer.flush(); runner.try_join();