From aab182c4ea6c748bc6bbe92d9af4aa85199f925e Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 23 Apr 2017 11:23:37 -0700 Subject: [PATCH] Allow producing messages without buffering them in buffered producer --- include/cppkafka/utils/buffered_producer.h | 42 +++++++++++++++++++++- tests/producer_test.cpp | 12 ++++--- 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index e9bf63f..2cec099 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -14,6 +14,18 @@ namespace cppkafka { +/** + * \brief Allows producing messages and waiting for them to be acknowledged by kafka brokers + * + * This class allows buffering messages and flushing them synchronously while also allowing + * to produce them just as you would using the Producer class. + * + * When calling either flush or wait_for_acks, the buffered producer will block until all + * produced messages (either in a buffer or non buffered way) are acknowledged by the kafka + * brokers. + * + * This class is not thread safe. + */ template class BufferedProducer { public: @@ -55,13 +67,29 @@ public: */ void add_message(Builder builder); + /** + * \brief Produces a message without buffering it + * + * The message will still be tracked so that a call to flush or wait_for_acks will actually + * wait for it to be acknowledged. + * + * \param builder The builder that contains the message to be produced + */ + void produce(const MessageBuilder& builder); + /** * \brief Flushes the buffered messages. * - * This will send all messages and keep waiting until all of them are acknowledged. + * This will send all messages and keep waiting until all of them are acknowledged (this is + * done by calling wait_for_acks). */ void flush(); + /** + * Waits for produced message's acknowledgements from the brokers + */ + void wait_for_acks(); + /** * Gets the Producer object */ @@ -120,6 +148,12 @@ void BufferedProducer::add_message(Builder builder) { do_add_message(move(builder)); } +template +void BufferedProducer::produce(const MessageBuilder& builder) { + expected_acks_++; + produce_message(builder); +} + template void BufferedProducer::flush() { while (!messages_.empty()) { @@ -127,6 +161,11 @@ void BufferedProducer::flush() { messages_.pop(); } + wait_for_acks(); +} + +template +void BufferedProducer::wait_for_acks() { messages_acked_ = 0; while (messages_acked_ != expected_acks_) { try { @@ -142,6 +181,7 @@ void BufferedProducer::flush() { } } } + expected_acks_ = 0; } template diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index d3595f7..be0c4c6 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -225,7 +225,7 @@ TEST_F(ProducerTest, BufferedProducer) { // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); - ConsumerRunner runner(consumer, 2, 1); + ConsumerRunner runner(consumer, 3, 1); // Now create a buffered producer and produce two messages BufferedProducer producer(make_producer_config()); @@ -236,17 +236,21 @@ TEST_F(ProducerTest, BufferedProducer) { .payload(payload)); producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.flush(); + producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); + producer.wait_for_acks(); runner.try_join(); const auto& messages = runner.get_messages(); - ASSERT_EQ(2, messages.size()); + ASSERT_EQ(3, messages.size()); const auto& message = messages[0]; - EXPECT_EQ(Buffer(payload), message.get_payload()); EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_FALSE(message.get_error()); EXPECT_FALSE(messages[1].get_key()); - EXPECT_EQ(Buffer(payload), messages[1].get_payload()); + EXPECT_FALSE(messages[2].get_key()); + for (const auto& message : messages) { + EXPECT_EQ(Buffer(payload), message.get_payload()); + } }