Added similar logic for ProduceTerminationCallback

This commit is contained in:
accelerated
2018-12-13 11:48:34 -05:00
parent 0b9b7bab11
commit 8dd5428c49

View File

@@ -104,15 +104,26 @@ public:
* Callback to indicate a message failed to be produced by the broker.
*
* The returned bool indicates whether the BufferedProducer should try to produce
* the message again after each failure.
* the message again after each failure, subject to the maximum number of retries set. If this callback
* is not set or returns false or if the number of retries reaches zero, the ProduceTerminationCallback
* will be called.
*/
using ProduceFailureCallback = std::function<bool(const Message&)>;
/**
* Callback to indicate a message failed to be produced by the broker and was dropped.
*
* The application can use this callback to track delivery failure of messages similar to the
* FlushTerminationCallback. If the application is only interested in message dropped events,
* then ProduceFailureCallback should not be set.
*/
using ProduceTerminationCallback = std::function<void(const Message&)>;
/**
* Callback to indicate a message failed to be flushed
*
* If this callback returns true, the message will be re-enqueued and flushed again later subject
* to the maximum number of retries set. If this callback returns false or if the number or retries
* to the maximum number of retries set. If this callback is not set or returns false or if the number of retries
* reaches zero, the FlushTerminationCallback will be called.
*/
using FlushFailureCallback = std::function<bool(const MessageBuilder&, Error error)>;
@@ -122,7 +133,8 @@ public:
* reaches zero.
*
* The application can use this callback to track delivery failure of messages similar to the
* ProduceFailureCallback.
* ProduceTerminationCallback. If the application is only interested in message dropped events,
* then FlushFailureCallback should not be set.
*/
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
@@ -356,13 +368,24 @@ public:
*
* \param callback The callback to be set
*
* \remark It is *highly* recommended to set this callback as your message may be produced
* indefinitely if there's a remote error.
*
* \warning Do not call any method on the BufferedProducer while inside this callback.
*/
void set_produce_failure_callback(ProduceFailureCallback callback);
/**
* \brief Sets the message produce termination callback
*
* This will be called when the delivery report callback is executed for a message having
* an error and after all retries have expired and the message is dropped.
*
* \param callback The callback to be set
*
* \remark If the application only tracks dropped messages, the set_produce_failure_callback() should not be set.
*
* \warning Do not call any method on the BufferedProducer while inside this callback.
*/
void set_produce_termination_callback(ProduceTerminationCallback callback);
/**
* \brief Sets the successful delivery callback
*
@@ -394,6 +417,8 @@ public:
*
* \param callback
*
* \remark If the application only tracks dropped messages, the set_flush_failure_callback() should not be set.
*
* \warning Do not call any method on the BufferedProducer while inside this callback
*/
void set_flush_termination_callback(FlushTerminationCallback callback);
@@ -469,6 +494,7 @@ private:
mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_;
ProduceTerminationCallback produce_termination_callback_;
FlushFailureCallback flush_failure_callback_;
FlushTerminationCallback flush_termination_callback_;
ssize_t max_buffer_size_{-1};
@@ -771,6 +797,11 @@ void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(Produ
produce_failure_callback_ = std::move(callback);
}
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_termination_callback(ProduceTerminationCallback callback) {
produce_termination_callback_ = std::move(callback);
}
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_success_callback(ProduceSuccessCallback callback) {
produce_success_callback_ = std::move(callback);
@@ -873,10 +904,14 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
}
else {
++total_messages_dropped_;
CallbackInvoker<ProduceTerminationCallback>
("produce termination", produce_termination_callback_, &producer_)(message);
}
}
else {
++total_messages_dropped_;
CallbackInvoker<ProduceTerminationCallback>
("produce termination", produce_termination_callback_, &producer_)(message);
}
}
else {