diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 678df0d..b01700d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -206,6 +206,11 @@ public: */ void sync_produce(const MessageBuilder& builder); + /** + * \brief Same as sync_produce but waits up to 'timeout' for acks to be received. + */ + void sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); + /** * \brief Produces a message asynchronously without buffering it * @@ -481,7 +486,7 @@ protected: return nullptr; } #endif - + private: enum class SenderType { Sync, Async }; enum class QueueKind { Retry, Regular }; @@ -523,6 +528,7 @@ private: if (sender_ == SenderType::Sync) { return retry_promise_.get_future().get(); } + return false; } // Signal the synchronous producer if the message should be retried or not. // Called from inside on_delivery_report(). For synchronous producers only. @@ -663,6 +669,12 @@ void BufferedProducer::produce(const MessageBuilder& buil template void BufferedProducer::sync_produce(const MessageBuilder& builder) { + sync_produce(builder, producer_.get_timeout()); +} + +template +void BufferedProducer::sync_produce(const MessageBuilder& builder, + std::chrono::milliseconds timeout) { if (enable_message_retries_) { //Adding a retry tracker requires copying the builder since //we cannot modify the original instance. Cloning is a fast operation @@ -673,14 +685,14 @@ void BufferedProducer::sync_produce(const MessageBuilder& do { tracker->prepare_to_retry(); produce_message(builder_clone); - wait_for_acks(); + wait_for_acks(timeout); } while (tracker->retry_again()); } else { // produce once produce_message(builder); - wait_for_acks(); + wait_for_acks(timeout); } } @@ -714,15 +726,21 @@ void BufferedProducer::async_flush() { template void BufferedProducer::flush(bool preserve_order) { + flush(producer_.get_timeout(), preserve_order); +} + +template +bool BufferedProducer::flush(std::chrono::milliseconds timeout, + bool preserve_order) { if (preserve_order) { CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void + auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void { QueueType flush_queue; // flush from temporary queue swap_queues(queue, flush_queue, mutex); //Produce one message at a time and wait for acks until queue is empty while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); + sync_produce(flush_queue.front(), timeout); flush_queue.pop_front(); } }; @@ -734,61 +752,9 @@ void BufferedProducer::flush(bool preserve_order) { else { //Produce all messages at once then wait for acks. async_flush(); - wait_for_acks(); - } -} - -template -bool BufferedProducer::flush(std::chrono::milliseconds timeout, - bool preserve_order) { - if (preserve_order) { - CounterGuard counter_guard(flushes_in_progress_); - QueueType flush_queue; // flush from temporary queue - swap_queues(messages_, flush_queue, mutex_); - QueueType retry_flush_queue; // flush from temporary retry queue - swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); - - auto queue_flusher = [this](QueueType& queue)->bool - { - //Produce one message at a time and wait for acks - if (!queue.empty()) { - sync_produce(queue.front()); - queue.pop_front(); - return true; - } - return false; - }; - auto remaining = timeout; - auto start_time = std::chrono::high_resolution_clock::now(); - do { - if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { - break; - } - // calculate remaining time - remaining = timeout - std::chrono::duration_cast - (std::chrono::high_resolution_clock::now() - start_time); - } while (remaining.count() > 0); - - // When timeout has expired, any remaining messages must be re-enqueue in their - // original order so they can be flushed later. - auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool - { - if (!src_queue.empty()) { - std::lock_guard lock(mutex); - dst_queue.insert(dst_queue.begin(), - std::make_move_iterator(src_queue.begin()), - std::make_move_iterator(src_queue.end())); - return true; - } - return false; - }; - return !re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_) && - !re_enqueuer(flush_queue, messages_, mutex_); - } - else { - async_flush(); - return wait_for_acks(timeout); + wait_for_acks(timeout); } + return pending_acks_ == 0; } template