diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index b01700d..1905345 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -703,25 +703,7 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { - CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void - { - QueueType flush_queue; // flush from temporary queue - swap_queues(queue, flush_queue, mutex); - - while (!flush_queue.empty()) { - async_produce(std::move(flush_queue.front()), false); - flush_queue.pop_front(); - } - }; - //Produce retry queue first since these messages were produced first - queue_flusher(retry_messages_, retry_mutex_); - //Produce recently enqueued messages - queue_flusher(messages_, mutex_); - // Flush the producer but don't wait. It is necessary to poll - // the producer at least once during this operation because - // async_produce() will not. - wait_for_acks(std::chrono::milliseconds(0)); + flush(std::chrono::milliseconds::zero(), false); } template @@ -732,26 +714,34 @@ void BufferedProducer::flush(bool preserve_order) { template bool BufferedProducer::flush(std::chrono::milliseconds timeout, bool preserve_order) { - if (preserve_order) { - CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void - { - QueueType flush_queue; // flush from temporary queue - swap_queues(queue, flush_queue, mutex); - //Produce one message at a time and wait for acks until queue is empty - while (!flush_queue.empty()) { + CounterGuard counter_guard(flushes_in_progress_); + auto queue_flusher = [timeout, preserve_order, this] + (QueueType& queue, std::mutex & mutex)->void + { + QueueType flush_queue; // flush from temporary queue + swap_queues(queue, flush_queue, mutex); + //Produce one message at a time and wait for acks until queue is empty + while (!flush_queue.empty()) { + if (preserve_order) { + //When preserving order, we must ensure that each message + //gets delivered before producing the next one. sync_produce(flush_queue.front(), timeout); - flush_queue.pop_front(); } - }; - //Produce retry queue first since these messages were produced first - queue_flusher(retry_messages_, retry_mutex_); - //Produce recently enqueued messages - queue_flusher(messages_, mutex_); - } - else { - //Produce all messages at once then wait for acks. - async_flush(); + else { + //Produce as fast as possible w/o waiting. If one or more + //messages fail, they will be re-enqueued for retry + //on the next flush cycle, which causes re-ordering. + async_produce(flush_queue.front(), false); + } + flush_queue.pop_front(); + } + }; + //Produce retry queue first since these messages were produced first. + queue_flusher(retry_messages_, retry_mutex_); + //Produce recently enqueued messages + queue_flusher(messages_, mutex_); + if (!preserve_order) { + //Wait for acks from the messages produced above via async_produce wait_for_acks(timeout); } return pending_acks_ == 0; @@ -759,20 +749,7 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti template void BufferedProducer::wait_for_acks() { - while (pending_acks_ > 0) { - try { - producer_.flush(); - } - catch (const HandleException& ex) { - // If we just hit the timeout, keep going, otherwise re-throw - if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { - continue; - } - else { - throw; - } - } - } + wait_for_acks(producer_.get_timeout()); } template