diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 16a5c36..06a2de6 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -104,15 +104,26 @@ public: * Callback to indicate a message failed to be produced by the broker. * * The returned bool indicates whether the BufferedProducer should try to produce - * the message again after each failure. + * the message again after each failure, subject to the maximum number of retries set. If this callback + * is not set or returns false or if the number of retries reaches zero, the ProduceTerminationCallback + * will be called. */ using ProduceFailureCallback = std::function; + /** + * Callback to indicate a message failed to be produced by the broker and was dropped. + * + * The application can use this callback to track delivery failure of messages similar to the + * FlushTerminationCallback. If the application is only interested in message dropped events, + * then ProduceFailureCallback should not be set. + */ + using ProduceTerminationCallback = std::function; + /** * Callback to indicate a message failed to be flushed * * If this callback returns true, the message will be re-enqueued and flushed again later subject - * to the maximum number of retries set. If this callback returns false or if the number or retries + * to the maximum number of retries set. If this callback is not set or returns false or if the number of retries * reaches zero, the FlushTerminationCallback will be called. */ using FlushFailureCallback = std::function; @@ -122,7 +133,8 @@ public: * reaches zero. * * The application can use this callback to track delivery failure of messages similar to the - * ProduceFailureCallback. + * ProduceTerminationCallback. If the application is only interested in message dropped events, + * then FlushFailureCallback should not be set. */ using FlushTerminationCallback = std::function; @@ -356,13 +368,24 @@ public: * * \param callback The callback to be set * - * \remark It is *highly* recommended to set this callback as your message may be produced - * indefinitely if there's a remote error. - * * \warning Do not call any method on the BufferedProducer while inside this callback. */ void set_produce_failure_callback(ProduceFailureCallback callback); + /** + * \brief Sets the message produce termination callback + * + * This will be called when the delivery report callback is executed for a message having + * an error and after all retries have expired and the message is dropped. + * + * \param callback The callback to be set + * + * \remark If the application only tracks dropped messages, the set_produce_failure_callback() should not be set. + * + * \warning Do not call any method on the BufferedProducer while inside this callback. + */ + void set_produce_termination_callback(ProduceTerminationCallback callback); + /** * \brief Sets the successful delivery callback * @@ -394,6 +417,8 @@ public: * * \param callback * + * \remark If the application only tracks dropped messages, the set_flush_failure_callback() should not be set. + * * \warning Do not call any method on the BufferedProducer while inside this callback */ void set_flush_termination_callback(FlushTerminationCallback callback); @@ -469,6 +494,7 @@ private: mutable std::mutex mutex_; ProduceSuccessCallback produce_success_callback_; ProduceFailureCallback produce_failure_callback_; + ProduceTerminationCallback produce_termination_callback_; FlushFailureCallback flush_failure_callback_; FlushTerminationCallback flush_termination_callback_; ssize_t max_buffer_size_{-1}; @@ -771,6 +797,11 @@ void BufferedProducer::set_produce_failure_callback(Produ produce_failure_callback_ = std::move(callback); } +template +void BufferedProducer::set_produce_termination_callback(ProduceTerminationCallback callback) { + produce_termination_callback_ = std::move(callback); +} + template void BufferedProducer::set_produce_success_callback(ProduceSuccessCallback callback) { produce_success_callback_ = std::move(callback); @@ -873,10 +904,14 @@ void BufferedProducer::on_delivery_report(const Message& } else { ++total_messages_dropped_; + CallbackInvoker + ("produce termination", produce_termination_callback_, &producer_)(message); } } else { ++total_messages_dropped_; + CallbackInvoker + ("produce termination", produce_termination_callback_, &producer_)(message); } } else {