diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 5e41040..9b288a3 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -482,7 +482,6 @@ protected: #endif private: - enum class MessagePriority { Low, High }; enum class SenderType { Sync, Async }; template @@ -519,18 +518,18 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); template void async_produce(BuilderType&& message, bool throw_on_error); - + // Members Producer producer_; QueueType messages_; - QueueType hi_pri_messages_; + QueueType retry_messages_; mutable std::mutex mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; @@ -567,7 +566,7 @@ BufferedProducer::BufferedProducer(Configuration config, const Allocator& alloc) : producer_(prepare_configuration(std::move(config))), messages_(alloc), - hi_pri_messages_(alloc) { + retry_messages_(alloc) { producer_.set_payload_policy(get_default_payload_policy()); #ifdef KAFKA_TEST_INSTANCE test_params_ = nullptr; @@ -582,7 +581,7 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); - do_add_message(move(builder), MessagePriority::Low, true); + do_add_message(move(builder), false, true); } template @@ -626,42 +625,40 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + auto queue_flusher = [this](QueueType& queue)->void { - std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); - std::swap(messages_, flush_queue); - } - while (!hi_pri_flush_queue.empty()) { - async_produce(std::move(hi_pri_flush_queue.front()), false); - hi_pri_flush_queue.pop_front(); - } - while (!flush_queue.empty()) { - async_produce(std::move(flush_queue.front()), false); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(queue, flush_queue); + } + while (!flush_queue.empty()) { + async_produce(std::move(flush_queue.front()), false); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_); + queue_flusher(messages_); } template void BufferedProducer::flush(bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + auto queue_flusher = [this](QueueType& queue)->void { - std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); - std::swap(messages_, flush_queue); - } - while (!hi_pri_flush_queue.empty()) { - sync_produce(hi_pri_flush_queue.front()); - hi_pri_flush_queue.pop_front(); - } - while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } + QueueType flush_queue; // flush from temporary queue + { + std::lock_guard lock(mutex_); + std::swap(queue, flush_queue); + } + while (!flush_queue.empty()) { + sync_produce(flush_queue.front()); + flush_queue.pop_front(); + } + }; + queue_flusher(retry_messages_); + queue_flusher(messages_); } else { async_flush(); @@ -675,24 +672,25 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue - QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue + QueueType retry_flush_queue; // flush from temporary retry queue { std::lock_guard lock(mutex_); - std::swap(hi_pri_messages_, hi_pri_flush_queue); + std::swap(retry_messages_, retry_flush_queue); std::swap(messages_, flush_queue); } + auto queue_flusher = [this](QueueType& queue)->bool + { + if (!queue.empty()) { + sync_produce(queue.front()); + queue.pop_front(); + return true; + } + return false; + }; auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); do { - if (!hi_pri_flush_queue.empty()) { - sync_produce(hi_pri_flush_queue.front()); - hi_pri_flush_queue.pop_front(); - } - else if (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } - else { + if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { break; } // calculate remaining time @@ -701,19 +699,17 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti } while (remaining.count() > 0); // Re-enqueue remaining messages in original order - if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) { - std::lock_guard lock(mutex_); - if (!!hi_pri_flush_queue.empty()) { - hi_pri_messages_.insert(hi_pri_messages_.begin(), - std::make_move_iterator(hi_pri_flush_queue.begin()), - std::make_move_iterator(hi_pri_flush_queue.end())); + auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void + { + if (!src_queue.empty()) { + std::lock_guard lock(mutex_); + dst_queue.insert(dst_queue.begin(), + std::make_move_iterator(src_queue.begin()), + std::make_move_iterator(src_queue.end())); } - if (!flush_queue.empty()) { - messages_.insert(messages_.begin(), - std::make_move_iterator(flush_queue.begin()), - std::make_move_iterator(flush_queue.end())); - } - } + }; + re_enqueuer(retry_flush_queue, retry_messages_); + re_enqueuer(flush_queue, messages_); } else { async_flush(); @@ -769,13 +765,13 @@ void BufferedProducer::clear() { std::lock_guard lock(mutex_); QueueType tmp; std::swap(tmp, messages_); - QueueType hi_pri_tmp; - std::swap(hi_pri_tmp, hi_pri_messages_); + QueueType retry_tmp; + std::swap(retry_tmp, retry_messages_); } template size_t BufferedProducer::get_buffer_size() const { - return messages_.size() + hi_pri_messages_.size(); + return messages_.size() + retry_messages_.size(); } template @@ -805,19 +801,21 @@ BufferedProducer::get_flush_method() const { template template void BufferedProducer::do_add_message(BuilderType&& builder, - MessagePriority priority, + bool is_retry, bool do_flush) { { std::lock_guard lock(mutex_); - if (priority == MessagePriority::High) { - hi_pri_messages_.emplace_back(std::forward(builder)); + if (is_retry) { + retry_messages_.emplace_back(std::forward(builder)); } else { messages_.emplace_back(std::forward(builder)); } } - if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { + // Flush the queues only if a regular message is added. Retry messages may be added + // from rdkafka callbacks, and flush/async_flush is a user-level call + if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -965,7 +963,7 @@ void BufferedProducer::async_produce(BuilderType&& builde TrackerPtr tracker = std::static_pointer_cast(builder.internal()); if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; - do_add_message(std::forward(builder), MessagePriority::High, false); + do_add_message(std::forward(builder), true, false); return; } } @@ -1004,7 +1002,7 @@ void BufferedProducer::on_delivery_report(const Message& --tracker->num_retries_; if (tracker->sender_ == SenderType::Async) { // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + do_add_message(Builder(message), true, false); } should_retry = true; }