Added flush termination callback

This commit is contained in:
accelerated
2018-12-13 10:43:29 -05:00
parent ab002fe119
commit 0b9b7bab11

View File

@@ -110,8 +110,21 @@ public:
/** /**
* Callback to indicate a message failed to be flushed * Callback to indicate a message failed to be flushed
*
* If this callback returns true, the message will be re-enqueued and flushed again later subject
* to the maximum number of retries set. If this callback returns false or if the number or retries
* reaches zero, the FlushTerminationCallback will be called.
*/ */
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>; using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
/**
* Callback to indicate a message was dropped after multiple flush attempts or when the retry count
* reaches zero.
*
* The application can use this callback to track delivery failure of messages similar to the
* ProduceFailureCallback.
*/
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
/** /**
* \brief Constructs a buffered producer using the provided configuration * \brief Constructs a buffered producer using the provided configuration
@@ -360,12 +373,12 @@ public:
void set_produce_success_callback(ProduceSuccessCallback callback); void set_produce_success_callback(ProduceSuccessCallback callback);
/** /**
* \brief Sets the local message produce failure callback * \brief Sets the local flush failure callback
* *
* This callback will be called when local message production fails during a flush() operation. * This callback will be called when local message production fails during a flush() operation.
* Failure errors are typically payload too large, unknown topic or unknown partition. * Failure errors are typically payload too large, unknown topic or unknown partition.
* Note that if the callback returns false, the message will be dropped from the buffer, * Note that if the callback returns false, the message will be dropped from the buffer,
* otherwise it will be re-enqueued for later retry. * otherwise it will be re-enqueued for later retry subject to the message retry count.
* *
* \param callback * \param callback
* *
@@ -373,6 +386,18 @@ public:
*/ */
void set_flush_failure_callback(FlushFailureCallback callback); void set_flush_failure_callback(FlushFailureCallback callback);
/**
* \brief Sets the local flush termination callback
*
* This callback will be called when local message production fails during a flush() operation after
* all previous flush attempts have failed. The message will be dropped after this callback.
*
* \param callback
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_flush_termination_callback(FlushTerminationCallback callback);
struct TestParameters { struct TestParameters {
bool force_delivery_error_; bool force_delivery_error_;
bool force_produce_error_; bool force_produce_error_;
@@ -445,6 +470,7 @@ private:
ProduceSuccessCallback produce_success_callback_; ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_; ProduceFailureCallback produce_failure_callback_;
FlushFailureCallback flush_failure_callback_; FlushFailureCallback flush_failure_callback_;
FlushTerminationCallback flush_termination_callback_;
ssize_t max_buffer_size_{-1}; ssize_t max_buffer_size_{-1};
FlushMethod flush_method_{FlushMethod::Sync}; FlushMethod flush_method_{FlushMethod::Sync};
std::atomic<size_t> pending_acks_{0}; std::atomic<size_t> pending_acks_{0};
@@ -755,6 +781,11 @@ void BufferedProducer<BufferType, Allocator>::set_flush_failure_callback(FlushFa
flush_failure_callback_ = std::move(callback); flush_failure_callback_ = std::move(callback);
} }
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(FlushTerminationCallback callback) {
flush_termination_callback_ = std::move(callback);
}
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
template <typename BuilderType> template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) { void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
@@ -802,6 +833,9 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
} }
} }
++total_messages_dropped_; ++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) { if (throw_on_error) {
throw; throw;
} }