diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index d3620eb..16a5c36 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -110,8 +110,21 @@ public: /** * Callback to indicate a message failed to be flushed + * + * If this callback returns true, the message will be re-enqueued and flushed again later subject + * to the maximum number of retries set. If this callback returns false or if the number or retries + * reaches zero, the FlushTerminationCallback will be called. */ using FlushFailureCallback = std::function; + + /** + * Callback to indicate a message was dropped after multiple flush attempts or when the retry count + * reaches zero. + * + * The application can use this callback to track delivery failure of messages similar to the + * ProduceFailureCallback. + */ + using FlushTerminationCallback = std::function; /** * \brief Constructs a buffered producer using the provided configuration @@ -360,12 +373,12 @@ public: void set_produce_success_callback(ProduceSuccessCallback callback); /** - * \brief Sets the local message produce failure callback + * \brief Sets the local flush failure callback * * This callback will be called when local message production fails during a flush() operation. * Failure errors are typically payload too large, unknown topic or unknown partition. * Note that if the callback returns false, the message will be dropped from the buffer, - * otherwise it will be re-enqueued for later retry. + * otherwise it will be re-enqueued for later retry subject to the message retry count. * * \param callback * @@ -373,6 +386,18 @@ public: */ void set_flush_failure_callback(FlushFailureCallback callback); + /** + * \brief Sets the local flush termination callback + * + * This callback will be called when local message production fails during a flush() operation after + * all previous flush attempts have failed. The message will be dropped after this callback. + * + * \param callback + * + * \warning Do not call any method on the BufferedProducer while inside this callback + */ + void set_flush_termination_callback(FlushTerminationCallback callback); + struct TestParameters { bool force_delivery_error_; bool force_produce_error_; @@ -445,6 +470,7 @@ private: ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; + FlushTerminationCallback flush_termination_callback_; ssize_t max_buffer_size_{-1}; FlushMethod flush_method_{FlushMethod::Sync}; std::atomic pending_acks_{0}; @@ -755,6 +781,11 @@ void BufferedProducer::set_flush_failure_callback(FlushFa flush_failure_callback_ = std::move(callback); } +template +void BufferedProducer::set_flush_termination_callback(FlushTerminationCallback callback) { + flush_termination_callback_ = std::move(callback); +} + template template void BufferedProducer::produce_message(BuilderType&& builder) { @@ -802,6 +833,9 @@ void BufferedProducer::async_produce(BuilderType&& builde } } ++total_messages_dropped_; + // Call the flush termination callback + CallbackInvoker("flush termination", flush_termination_callback_, &producer_) + (builder, ex.get_error()); if (throw_on_error) { throw; }