diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index fc8cadc..d3620eb 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -564,7 +564,7 @@ void BufferedProducer::flush(bool preserve_order) { template bool BufferedProducer::flush(std::chrono::milliseconds timeout, - bool preserve_order) { + bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue @@ -572,13 +572,15 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti std::lock_guard lock(mutex_); std::swap(messages_, flush_queue); } + auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); - while (!flush_queue.empty() && - (std::chrono::duration_cast - (std::chrono::high_resolution_clock::now() - start_time) < timeout)) { + do { sync_produce(flush_queue.front()); flush_queue.pop_front(); - } + // calculate remaining time + remaining = timeout - std::chrono::duration_cast + (std::chrono::high_resolution_clock::now() - start_time); + } while (!flush_queue.empty() && (remaining.count() > 0)); } else { async_flush(); @@ -608,7 +610,7 @@ template bool BufferedProducer::wait_for_acks(std::chrono::milliseconds timeout) { auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); - while ((pending_acks_ > 0) && (remaining.count() > 0)) { + do { try { producer_.flush(remaining); } @@ -625,7 +627,7 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } + } while ((pending_acks_ > 0) && (remaining.count() > 0)); return (pending_acks_ == 0); }