From 97229ebfd910406748f7d7f9532d5b9fdf430d3a Mon Sep 17 00:00:00 2001 From: demin80 Date: Mon, 7 Jan 2019 14:39:09 -0500 Subject: [PATCH 1/4] Added a high-priority queue to BufferedProducer to avoid message re-ordering --- include/cppkafka/utils/buffered_producer.h | 61 +++++++++++++++++----- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index a241e7b..b37d22d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -530,6 +530,7 @@ private: // Members Producer producer_; QueueType messages_; + QueueType hi_pri_messages_; mutable std::mutex mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; @@ -565,7 +566,8 @@ template BufferedProducer::BufferedProducer(Configuration config, const Allocator& alloc) : producer_(prepare_configuration(std::move(config))), - messages_(alloc) { + messages_(alloc), + hi_pri_messages_(alloc) { producer_.set_payload_policy(get_default_payload_policy()); #ifdef KAFKA_TEST_INSTANCE test_params_ = nullptr; @@ -625,10 +627,16 @@ template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } + while (!hi_pri_flush_queue.empty()) { + async_produce(std::move(hi_pri_flush_queue.front()), false); + hi_pri_flush_queue.pop_front(); + } while (!flush_queue.empty()) { async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); @@ -640,10 +648,16 @@ void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } + while (!hi_pri_flush_queue.empty()) { + sync_produce(hi_pri_flush_queue.front()); + hi_pri_flush_queue.pop_front(); + } while (!flush_queue.empty()) { sync_produce(flush_queue.front()); flush_queue.pop_front(); @@ -661,25 +675,45 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } auto remaining = timeout; - auto start_time = std::chrono::high_resolution_clock::now(); + auto start_time = std::chrono::high_resolution_clock::now(); do { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); + if (!hi_pri_flush_queue.empty()) { + sync_produce(hi_pri_flush_queue.front()); + hi_pri_flush_queue.pop_front(); + } + else if (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + else { + break; + } // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while (!flush_queue.empty() && (remaining.count() > 0)); + } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - if (!flush_queue.empty()) { + if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) { std::lock_guard lock(mutex_); - messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end())); - } + if (!!hi_pri_flush_queue.empty()) { + hi_pri_messages_.insert(hi_pri_messages_.begin(), + std::make_move_iterator(hi_pri_flush_queue.begin()), + std::make_move_iterator(hi_pri_flush_queue.end())); + } + if (!flush_queue.empty()) { + messages_.insert(messages_.begin(), + std::make_move_iterator(flush_queue.begin()), + std::make_move_iterator(flush_queue.end())); + } + } } else { async_flush(); @@ -735,11 +769,13 @@ void BufferedProducer::clear() { std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); + QueueType hi_pri_tmp; + std::swap(hi_pri_tmp, hi_pri_messages_); } template size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); + return messages_.size() + hi_pri_messages_.size(); } template @@ -774,20 +810,21 @@ void BufferedProducer::do_add_message(BuilderType&& build { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { - messages_.emplace_front(std::forward(builder)); + hi_pri_messages_.emplace_back(std::forward(builder)); } else { messages_.emplace_back(std::forward(builder)); } } - if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + + if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } else { async_flush(); } - } + } } template From 00370c981d79299d9ef0d6fbca52da068f2731ea Mon Sep 17 00:00:00 2001 From: demin80 Date: Mon, 7 Jan 2019 14:42:32 -0500 Subject: [PATCH 2/4] Fixed spacing issues --- include/cppkafka/utils/buffered_producer.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index b37d22d..5e41040 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -682,7 +682,7 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti std::swap(messages_, flush_queue); } auto remaining = timeout; - auto start_time = std::chrono::high_resolution_clock::now(); + auto start_time = std::chrono::high_resolution_clock::now(); do { if (!hi_pri_flush_queue.empty()) { sync_produce(hi_pri_flush_queue.front()); @@ -824,7 +824,7 @@ void BufferedProducer::do_add_message(BuilderType&& build else { async_flush(); } - } + } } template From 71c4e0214370dcccf3cc63a853e98d39a44fbf9d Mon Sep 17 00:00:00 2001 From: demin80 Date: Tue, 8 Jan 2019 13:48:26 -0500 Subject: [PATCH 3/4] Revised the implementation based on the reviewers' response --- include/cppkafka/utils/buffered_producer.h | 130 ++++++++++----------- 1 file changed, 64 insertions(+), 66 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 5e41040..9b288a3 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -482,7 +482,6 @@ protected: #endif private: - enum class MessagePriority { Low, High }; enum class SenderType { Sync, Async }; template @@ -519,18 +518,18 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); template void async_produce(BuilderType&& message, bool throw_on_error); - + // Members Producer producer_; QueueType messages_; - QueueType hi_pri_messages_; + QueueType retry_messages_; mutable std::mutex mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; @@ -567,7 +566,7 @@ BufferedProducer::BufferedProducer(Configuration config, const Allocator& alloc) : producer_(prepare_configuration(std::move(config))), messages_(alloc), - hi_pri_messages_(alloc) { + retry_messages_(alloc) { producer_.set_payload_policy(get_default_payload_policy()); #ifdef KAFKA_TEST_INSTANCE test_params_ = nullptr; @@ -582,7 +581,7 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); - do_add_message(move(builder), MessagePriority::Low, true); + do_add_message(move(builder), false, true); } template @@ -626,42 +625,40 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + auto queue_flusher = [this](QueueType& queue)->void { - std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); - std::swap(messages_, flush_queue); - } - while (!hi_pri_flush_queue.empty()) { - async_produce(std::move(hi_pri_flush_queue.front()), false); - hi_pri_flush_queue.pop_front(); - } - while (!flush_queue.empty()) { - async_produce(std::move(flush_queue.front()), false); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(queue, flush_queue); + } + while (!flush_queue.empty()) { + async_produce(std::move(flush_queue.front()), false); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_); + queue_flusher(messages_); } template void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + auto queue_flusher = [this](QueueType& queue)->void { - std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); - std::swap(messages_, flush_queue); - } - while (!hi_pri_flush_queue.empty()) { - sync_produce(hi_pri_flush_queue.front()); - hi_pri_flush_queue.pop_front(); - } - while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(queue, flush_queue); + } + while (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_); + queue_flusher(messages_); } else { async_flush(); @@ -675,24 +672,25 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + QueueType retry_flush_queue; // flush from temporary retry queue { std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); + std::swap(retry_messages_, retry_flush_queue); std::swap(messages_, flush_queue); } + auto queue_flusher = [this](QueueType& queue)->bool + { + if (!queue.empty()) { + sync_produce(queue.front()); + queue.pop_front(); + return true; + } + return false; + }; auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); do { - if (!hi_pri_flush_queue.empty()) { - sync_produce(hi_pri_flush_queue.front()); - hi_pri_flush_queue.pop_front(); - } - else if (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } - else { + if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { break; } // calculate remaining time @@ -701,19 +699,17 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) { - std::lock_guard lock(mutex_); - if (!!hi_pri_flush_queue.empty()) { - hi_pri_messages_.insert(hi_pri_messages_.begin(), - std::make_move_iterator(hi_pri_flush_queue.begin()), - std::make_move_iterator(hi_pri_flush_queue.end())); + auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void + { + if (!src_queue.empty()) { + std::lock_guard lock(mutex_); + dst_queue.insert(dst_queue.begin(), + std::make_move_iterator(src_queue.begin()), + std::make_move_iterator(src_queue.end())); } - if (!flush_queue.empty()) { - messages_.insert(messages_.begin(), - std::make_move_iterator(flush_queue.begin()), - std::make_move_iterator(flush_queue.end())); - } - } + }; + re_enqueuer(retry_flush_queue, retry_messages_); + re_enqueuer(flush_queue, messages_); } else { async_flush(); @@ -769,13 +765,13 @@ void BufferedProducer::clear() { std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); - QueueType hi_pri_tmp; - std::swap(hi_pri_tmp, hi_pri_messages_); + QueueType retry_tmp; + std::swap(retry_tmp, retry_messages_); } template size_t BufferedProducer::get_buffer_size() const { - return messages_.size() + hi_pri_messages_.size(); + return messages_.size() + retry_messages_.size(); } template @@ -805,19 +801,21 @@ BufferedProducer::get_flush_method() const { template template void BufferedProducer::do_add_message(BuilderType&& builder, - MessagePriority priority, + bool is_retry, bool do_flush) { { std::lock_guard lock(mutex_); - if (priority == MessagePriority::High) { - hi_pri_messages_.emplace_back(std::forward(builder)); + if (is_retry) { + retry_messages_.emplace_back(std::forward(builder)); } else { messages_.emplace_back(std::forward(builder)); } } - if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { + // Flush the queues only if a regular message is added. Retry messages may be added + // from rdkafka callbacks, and flush/async_flush is a user-level call + if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -965,7 +963,7 @@ void BufferedProducer::async_produce(BuilderType&& builde TrackerPtr tracker = std::static_pointer_cast(builder.internal()); if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; - do_add_message(std::forward(builder), MessagePriority::High, false); + do_add_message(std::forward(builder), true, false); return; } } @@ -1004,7 +1002,7 @@ void BufferedProducer::on_delivery_report(const Message& --tracker->num_retries_; if (tracker->sender_ == SenderType::Async) { // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + do_add_message(Builder(message), true, false); } should_retry = true; } From 93c2edf6babfdb7f865d55722e68f75edb474948 Mon Sep 17 00:00:00 2001 From: demin80 Date: Thu, 10 Jan 2019 14:37:46 -0500 Subject: [PATCH 4/4] refactored by adding retry_mutex_ and replacing bools with enums; fixed formatting issues --- include/cppkafka/utils/buffered_producer.h | 89 +++++++++++----------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 9b288a3..0ee3f93 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -483,7 +483,9 @@ protected: private: enum class SenderType { Sync, Async }; - + enum class QueueKind { Retry, Regular }; + enum class FlushAction { DontFlush, DoFlush }; + template struct CounterGuard{ CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } @@ -518,19 +520,21 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush); + void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); template void async_produce(BuilderType&& message, bool throw_on_error); + static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); // Members Producer producer_; QueueType messages_; QueueType retry_messages_; mutable std::mutex mutex_; + mutable std::mutex retry_mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; ProduceTerminationCallback produce_termination_callback_; @@ -581,7 +585,7 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); - do_add_message(move(builder), false, true); + do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); } template @@ -625,40 +629,36 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue)->void + auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void { QueueType flush_queue; // flush from temporary queue - { - std::lock_guard lock(mutex_); - std::swap(queue, flush_queue); - } + swap_queues(queue, flush_queue, mutex); + while (!flush_queue.empty()) { async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); } }; - queue_flusher(retry_messages_); - queue_flusher(messages_); + queue_flusher(retry_messages_, retry_mutex_); + queue_flusher(messages_, mutex_); } template void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue)->void + auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void { QueueType flush_queue; // flush from temporary queue - { - std::lock_guard lock(mutex_); - std::swap(queue, flush_queue); - } + swap_queues(queue, flush_queue, mutex); + while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); + sync_produce(flush_queue.front()); + flush_queue.pop_front(); } }; - queue_flusher(retry_messages_); - queue_flusher(messages_); + queue_flusher(retry_messages_, retry_mutex_); + queue_flusher(messages_, mutex_); } else { async_flush(); @@ -672,12 +672,10 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + swap_queues(messages_, flush_queue, mutex_); QueueType retry_flush_queue; // flush from temporary retry queue - { - std::lock_guard lock(mutex_); - std::swap(retry_messages_, retry_flush_queue); - std::swap(messages_, flush_queue); - } + swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); + auto queue_flusher = [this](QueueType& queue)->bool { if (!queue.empty()) { @@ -699,17 +697,17 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void + auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void { if (!src_queue.empty()) { - std::lock_guard lock(mutex_); + std::lock_guard lock(mutex); dst_queue.insert(dst_queue.begin(), std::make_move_iterator(src_queue.begin()), std::make_move_iterator(src_queue.end())); } }; - re_enqueuer(retry_flush_queue, retry_messages_); - re_enqueuer(flush_queue, messages_); + re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); + re_enqueuer(flush_queue, messages_, mutex_); } else { async_flush(); @@ -762,11 +760,10 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise template void BufferedProducer::clear() { - std::lock_guard lock(mutex_); QueueType tmp; - std::swap(tmp, messages_); + swap_queues(messages_, tmp, mutex_); QueueType retry_tmp; - std::swap(retry_tmp, retry_messages_); + swap_queues(retry_messages_, retry_tmp, retry_mutex_); } template @@ -801,21 +798,20 @@ BufferedProducer::get_flush_method() const { template template void BufferedProducer::do_add_message(BuilderType&& builder, - bool is_retry, - bool do_flush) { - { + QueueKind queue_kind, + FlushAction flush_action) { + if (queue_kind == QueueKind::Retry) { + std::lock_guard lock(retry_mutex_); + retry_messages_.emplace_back(std::forward(builder)); + } + else { std::lock_guard lock(mutex_); - if (is_retry) { - retry_messages_.emplace_back(std::forward(builder)); - } - else { - messages_.emplace_back(std::forward(builder)); - } + messages_.emplace_back(std::forward(builder)); } // Flush the queues only if a regular message is added. Retry messages may be added // from rdkafka callbacks, and flush/async_flush is a user-level call - if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { + if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -963,7 +959,7 @@ void BufferedProducer::async_produce(BuilderType&& builde TrackerPtr tracker = std::static_pointer_cast(builder.internal()); if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; - do_add_message(std::forward(builder), true, false); + do_add_message(std::forward(builder), QueueKind::Retry, FlushAction::DontFlush); return; } } @@ -1002,7 +998,7 @@ void BufferedProducer::on_delivery_report(const Message& --tracker->num_retries_; if (tracker->sender_ == SenderType::Async) { // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), true, false); + do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); } should_retry = true; } @@ -1034,6 +1030,13 @@ void BufferedProducer::on_delivery_report(const Message& } } +template +void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, BufferedProducer::QueueType & queue2, std::mutex & mutex) +{ + std::lock_guard lock(mutex); + std::swap(queue1, queue2); +} + } // cppkafka #endif // CPPKAFKA_BUFFERED_PRODUCER_H