From 8cfd4595f6a936640543287226fb20d9a2ba2c7f Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Mon, 31 Aug 2020 21:05:10 -0400 Subject: [PATCH] Call flush termination callbacks from sync_produce --- include/cppkafka/utils/buffered_producer.h | 62 ++++++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 3f5424f..69577fb 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -691,6 +691,7 @@ private: void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action); template void produce_message(BuilderType&& builder); + bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout, bool throw_on_error); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); template @@ -787,12 +788,19 @@ void BufferedProducer::produce(const MessageBuilder& buil template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - sync_produce(builder, infinite_timeout); + sync_produce(builder, infinite_timeout, true); } template bool BufferedProducer::sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout) { + return sync_produce(builder, infinite_timeout, true); +} + +template +bool BufferedProducer::sync_produce(const MessageBuilder& builder, + std::chrono::milliseconds timeout, + bool throw_on_error) { if (enable_message_retries_) { //Adding a retry tracker requires copying the builder since //we cannot modify the original instance. Cloning is a fast operation @@ -802,12 +810,32 @@ bool BufferedProducer::sync_produce(const MessageBuilder& // produce until we succeed or we reach max retry limit auto endTime = std::chrono::steady_clock::now() + timeout; do { - tracker->prepare_to_retry(); - produce_message(builder_clone); - //Wait w/o timeout since we must get the ack to avoid a race condition. - //Otherwise retry_again() will block as the producer won't get flushed - //and the delivery callback will never be invoked. - wait_for_current_thread_acks(); + try { + tracker->prepare_to_retry(); + produce_message(builder_clone); + //Wait w/o timeout since we must get the ack to avoid a race condition. + //Otherwise retry_again() will block as the producer won't get flushed + //and the delivery callback will never be invoked. + wait_for_current_thread_acks(); + } + catch (const HandleException& ex) { + // If we have a flush failure callback and it returns true, we retry producing this message later + CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); + if (!callback || callback(builder, ex.get_error())) { + if (tracker && tracker->has_retries_left()) { + tracker->decrement_retries(); + continue; + } + } + ++total_messages_dropped_; + // Call the flush termination callback + CallbackInvoker("flush termination", flush_termination_callback_, &producer_) + (builder, ex.get_error()); + if (throw_on_error) { + throw; + } + break; + } } while (tracker->retry_again() && ((timeout == infinite_timeout) || @@ -816,10 +844,22 @@ bool BufferedProducer::sync_produce(const MessageBuilder& } else { // produce once - produce_message(builder); - wait_for_current_thread_acks(timeout); - return !ack_monitor_.has_current_thread_pending_acks(); + try { + produce_message(builder); + wait_for_current_thread_acks(timeout); + return !ack_monitor_.has_current_thread_pending_acks(); + } + catch (const HandleException& ex) { + ++total_messages_dropped_; + // Call the flush termination callback + CallbackInvoker("flush termination", flush_termination_callback_, &producer_) + (builder, ex.get_error()); + if (throw_on_error) { + throw; + } + } } + return false; } template @@ -851,7 +891,7 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti if (preserve_order) { //When preserving order, we must ensure that each message //gets delivered before producing the next one. - sync_produce(flush_queue.front(), timeout); + sync_produce(flush_queue.front(), timeout, false); } else { //Produce as fast as possible w/o waiting. If one or more