From 97229ebfd910406748f7d7f9532d5b9fdf430d3a Mon Sep 17 00:00:00 2001 From: demin80 Date: Mon, 7 Jan 2019 14:39:09 -0500 Subject: [PATCH] Added a high-priority queue to BufferedProducer to avoid message re-ordering --- include/cppkafka/utils/buffered_producer.h | 61 +++++++++++++++++----- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index a241e7b..b37d22d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -530,6 +530,7 @@ private: // Members Producer producer_; QueueType messages_; + QueueType hi_pri_messages_; mutable std::mutex mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; @@ -565,7 +566,8 @@ template BufferedProducer::BufferedProducer(Configuration config, const Allocator& alloc) : producer_(prepare_configuration(std::move(config))), - messages_(alloc) { + messages_(alloc), + hi_pri_messages_(alloc) { producer_.set_payload_policy(get_default_payload_policy()); #ifdef KAFKA_TEST_INSTANCE test_params_ = nullptr; @@ -625,10 +627,16 @@ template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } + while (!hi_pri_flush_queue.empty()) { + async_produce(std::move(hi_pri_flush_queue.front()), false); + hi_pri_flush_queue.pop_front(); + } while (!flush_queue.empty()) { async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); @@ -640,10 +648,16 @@ void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } + while (!hi_pri_flush_queue.empty()) { + sync_produce(hi_pri_flush_queue.front()); + hi_pri_flush_queue.pop_front(); + } while (!flush_queue.empty()) { sync_produce(flush_queue.front()); flush_queue.pop_front(); @@ -661,25 +675,45 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue + QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue { std::lock_guard lock(mutex_); + std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(messages_, flush_queue); } auto remaining = timeout; - auto start_time = std::chrono::high_resolution_clock::now(); + auto start_time = std::chrono::high_resolution_clock::now(); do { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); + if (!hi_pri_flush_queue.empty()) { + sync_produce(hi_pri_flush_queue.front()); + hi_pri_flush_queue.pop_front(); + } + else if (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + else { + break; + } // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while (!flush_queue.empty() && (remaining.count() > 0)); + } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - if (!flush_queue.empty()) { + if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) { std::lock_guard lock(mutex_); - messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end())); - } + if (!!hi_pri_flush_queue.empty()) { + hi_pri_messages_.insert(hi_pri_messages_.begin(), + std::make_move_iterator(hi_pri_flush_queue.begin()), + std::make_move_iterator(hi_pri_flush_queue.end())); + } + if (!flush_queue.empty()) { + messages_.insert(messages_.begin(), + std::make_move_iterator(flush_queue.begin()), + std::make_move_iterator(flush_queue.end())); + } + } } else { async_flush(); @@ -735,11 +769,13 @@ void BufferedProducer::clear() { std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); + QueueType hi_pri_tmp; + std::swap(hi_pri_tmp, hi_pri_messages_); } template size_t BufferedProducer::get_buffer_size() const { - return messages_.size(); + return messages_.size() + hi_pri_messages_.size(); } template @@ -774,20 +810,21 @@ void BufferedProducer::do_add_message(BuilderType&& build { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { - messages_.emplace_front(std::forward(builder)); + hi_pri_messages_.emplace_back(std::forward(builder)); } else { messages_.emplace_back(std::forward(builder)); } } - if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + + if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } else { async_flush(); } - } + } } template