From eb46b8808ebd0eac466a2cd6c20db26f7641b347 Mon Sep 17 00:00:00 2001 From: Alex Damian Date: Mon, 18 Jun 2018 17:46:31 -0400 Subject: [PATCH] Bug fixes for sync flush and add_tracker (#91) * fixes for sync flush and also add_tracker * added flag for flush --- include/cppkafka/utils/buffered_producer.h | 76 +++++++++++++++++----- tests/producer_test.cpp | 15 ++++- 2 files changed, 73 insertions(+), 18 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 496a9c9..cd802a5 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -185,20 +185,28 @@ public: * This will send all messages and keep waiting until all of them are acknowledged (this is * done by calling wait_for_acks). * + * \param preserve_order If set to True, each message in the queue will be flushed only when the previous + * message ack is received. This may result in performance degradation as messages + * are sent one at a time. This calls sync_produce() on each message in the buffer. + * If set to False, all messages are flushed in one batch before waiting for acks, + * however message reordering may occur if librdkafka setting 'messages.sent.max.retries > 0'. + * * \remark Although it is possible to call flush from multiple threads concurrently, better * performance is achieved when called from the same thread or when serialized * with respect to other threads. */ - void flush(); + void flush(bool preserve_order = false); /** * \brief Flushes the buffered messages and waits up to 'timeout' * * \param timeout The maximum time to wait until all acks are received * + * \param preserve_order True to preserve message ordering, False otherwise. See flush above for more details. + * * \return True if the operation completes and all acks have been received. */ - bool flush(std::chrono::milliseconds timeout); + bool flush(std::chrono::milliseconds timeout, bool preserve_order = false); /** * Waits for produced message's acknowledgements from the brokers @@ -404,13 +412,15 @@ private: }; using TrackerPtr = std::shared_ptr; + // Returns existing tracker or creates new one template - TrackerPtr add_tracker(BuilderType& builder) { - if (has_internal_data_ && !builder.internal()) { - // Add message tracker only if it hasn't been added before - TrackerPtr tracker = std::make_shared(SenderType::Async, max_number_retries_); - builder.internal(tracker); - return tracker; + TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { + if (has_internal_data_) { + if (!builder.internal()) { + // Add message tracker only if it hasn't been added before + builder.internal(std::make_shared(sender, max_number_retries_)); + } + return std::static_pointer_cast(builder.internal()); } return nullptr; } @@ -469,7 +479,7 @@ void BufferedProducer::add_message(const MessageBuilder& builder) { template void BufferedProducer::add_message(Builder builder) { - add_tracker(builder); + add_tracker(SenderType::Async, builder); do_add_message(move(builder), MessagePriority::Low, true); } @@ -477,7 +487,7 @@ template void BufferedProducer::produce(const MessageBuilder& builder) { if (has_internal_data_) { MessageBuilder builder_clone(builder.clone()); - add_tracker(builder_clone); + add_tracker(SenderType::Async, builder_clone); async_produce(builder_clone, true); } else { @@ -489,7 +499,7 @@ template void BufferedProducer::sync_produce(const MessageBuilder& builder) { if (has_internal_data_) { MessageBuilder builder_clone(builder.clone()); - TrackerPtr tracker = add_tracker(builder_clone); + TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); // produce until we succeed or we reach max retry limit std::future should_retry; do { @@ -526,15 +536,47 @@ void BufferedProducer::async_flush() { } template -void BufferedProducer::flush() { - async_flush(); - wait_for_acks(); +void BufferedProducer::flush(bool preserve_order) { + if (preserve_order) { + CounterGuard counter_guard(flushes_in_progress_); + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(messages_, flush_queue); + } + while (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + } + else { + async_flush(); + wait_for_acks(); + } } template -bool BufferedProducer::flush(std::chrono::milliseconds timeout) { - async_flush(); - return wait_for_acks(timeout); +bool BufferedProducer::flush(std::chrono::milliseconds timeout, + bool preserve_order) { + if (preserve_order) { + CounterGuard counter_guard(flushes_in_progress_); + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(messages_, flush_queue); + } + auto start_time = std::chrono::high_resolution_clock::now(); + while (!flush_queue.empty() && + (std::chrono::duration_cast + (std::chrono::high_resolution_clock::now() - start_time) < timeout)) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + } + else { + async_flush(); + return wait_for_acks(timeout); + } } template diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index d3128fc..841c98a 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -79,6 +79,19 @@ void flusher_run(BufferedProducer& producer, producer.flush(); } +void async_flusher_run(BufferedProducer& producer, + int& exit_flag, + int num_flush) { + while (!exit_flag) { + if (producer.get_buffer_size() >= (size_t)num_flush) { + producer.async_flush(); + } + this_thread::sleep_for(milliseconds(10)); + } + producer.async_flush(); + producer.wait_for_acks(); +} + void clear_run(BufferedProducer& producer, condition_variable& clear) { mutex m; @@ -377,7 +390,7 @@ TEST_CASE("replay async messages with errors", "[producer][buffered_producer][as ErrorProducer producer(make_producer_config(), BufferedProducer::TestParameters{false, true}); producer.set_max_number_retries(num_retries); - thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0); + thread flusher_thread(async_flusher_run, ref(producer), ref(exit_flag), 0); string payload = "Hello world"; producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload)); this_thread::sleep_for(milliseconds(2000));