Allow producing messages without buffering them in buffered producer

This commit is contained in:
Matias Fontanini
2017-04-23 11:23:37 -07:00
parent 35cf6fd0bf
commit aab182c4ea
2 changed files with 49 additions and 5 deletions

View File

@@ -14,6 +14,18 @@
namespace cppkafka { namespace cppkafka {
/**
* \brief Allows producing messages and waiting for them to be acknowledged by kafka brokers
*
* This class allows buffering messages and flushing them synchronously while also allowing
* to produce them just as you would using the Producer class.
*
* When calling either flush or wait_for_acks, the buffered producer will block until all
* produced messages (either in a buffer or non buffered way) are acknowledged by the kafka
* brokers.
*
* This class is not thread safe.
*/
template <typename BufferType> template <typename BufferType>
class BufferedProducer { class BufferedProducer {
public: public:
@@ -55,13 +67,29 @@ public:
*/ */
void add_message(Builder builder); void add_message(Builder builder);
/**
* \brief Produces a message without buffering it
*
* The message will still be tracked so that a call to flush or wait_for_acks will actually
* wait for it to be acknowledged.
*
* \param builder The builder that contains the message to be produced
*/
void produce(const MessageBuilder& builder);
/** /**
* \brief Flushes the buffered messages. * \brief Flushes the buffered messages.
* *
* This will send all messages and keep waiting until all of them are acknowledged. * This will send all messages and keep waiting until all of them are acknowledged (this is
* done by calling wait_for_acks).
*/ */
void flush(); void flush();
/**
* Waits for produced message's acknowledgements from the brokers
*/
void wait_for_acks();
/** /**
* Gets the Producer object * Gets the Producer object
*/ */
@@ -120,6 +148,12 @@ void BufferedProducer<BufferType>::add_message(Builder builder) {
do_add_message(move(builder)); do_add_message(move(builder));
} }
template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
expected_acks_++;
produce_message(builder);
}
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::flush() { void BufferedProducer<BufferType>::flush() {
while (!messages_.empty()) { while (!messages_.empty()) {
@@ -127,6 +161,11 @@ void BufferedProducer<BufferType>::flush() {
messages_.pop(); messages_.pop();
} }
wait_for_acks();
}
template <typename BufferType>
void BufferedProducer<BufferType>::wait_for_acks() {
messages_acked_ = 0; messages_acked_ = 0;
while (messages_acked_ != expected_acks_) { while (messages_acked_ != expected_acks_) {
try { try {
@@ -142,6 +181,7 @@ void BufferedProducer<BufferType>::flush() {
} }
} }
} }
expected_acks_ = 0;
} }
template <typename BufferType> template <typename BufferType>

View File

@@ -225,7 +225,7 @@ TEST_F(ProducerTest, BufferedProducer) {
// Create a consumer and assign this topic/partition // Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config()); Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, 2, 1); ConsumerRunner runner(consumer, 3, 1);
// Now create a buffered producer and produce two messages // Now create a buffered producer and produce two messages
BufferedProducer<string> producer(make_producer_config()); BufferedProducer<string> producer(make_producer_config());
@@ -236,17 +236,21 @@ TEST_F(ProducerTest, BufferedProducer) {
.payload(payload)); .payload(payload));
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.flush(); producer.flush();
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.wait_for_acks();
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();
ASSERT_EQ(2, messages.size()); ASSERT_EQ(3, messages.size());
const auto& message = messages[0]; const auto& message = messages[0];
EXPECT_EQ(Buffer(payload), message.get_payload());
EXPECT_EQ(Buffer(key), message.get_key()); EXPECT_EQ(Buffer(key), message.get_key());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(partition, message.get_partition());
EXPECT_FALSE(message.get_error()); EXPECT_FALSE(message.get_error());
EXPECT_FALSE(messages[1].get_key()); EXPECT_FALSE(messages[1].get_key());
EXPECT_EQ(Buffer(payload), messages[1].get_payload()); EXPECT_FALSE(messages[2].get_key());
for (const auto& message : messages) {
EXPECT_EQ(Buffer(payload), message.get_payload());
}
} }