diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 70c1f5e..496a9c9 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -190,11 +190,27 @@ public: * with respect to other threads. */ void flush(); + + /** + * \brief Flushes the buffered messages and waits up to 'timeout' + * + * \param timeout The maximum time to wait until all acks are received + * + * \return True if the operation completes and all acks have been received. + */ + bool flush(std::chrono::milliseconds timeout); /** * Waits for produced message's acknowledgements from the brokers */ void wait_for_acks(); + + /** + * Waits for produced message's acknowledgements from the brokers up to 'timeout'. + * + * \return True if the operation completes and all acks have been received. + */ + bool wait_for_acks(std::chrono::milliseconds timeout); /** * Clears any buffered messages @@ -515,6 +531,12 @@ void BufferedProducer::flush() { wait_for_acks(); } +template +bool BufferedProducer::flush(std::chrono::milliseconds timeout) { + async_flush(); + return wait_for_acks(timeout); +} + template void BufferedProducer::wait_for_acks() { while (pending_acks_ > 0) { @@ -533,6 +555,31 @@ void BufferedProducer::wait_for_acks() { } } +template +bool BufferedProducer::wait_for_acks(std::chrono::milliseconds timeout) { + auto remaining = timeout; + auto start_time = std::chrono::high_resolution_clock::now(); + while ((pending_acks_ > 0) && (remaining.count() > 0)) { + try { + producer_.flush(remaining); + } + catch (const HandleException& ex) { + // If we just hit the timeout, keep going, otherwise re-throw + if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { + // There is no time remaining + return (pending_acks_ == 0); + } + else { + throw; + } + } + // calculate remaining time + remaining = timeout - std::chrono::duration_cast + (std::chrono::high_resolution_clock::now() - start_time); + } + return (pending_acks_ == 0); +} + template void BufferedProducer::clear() { std::lock_guard lock(mutex_);