mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-03 03:58:04 +00:00
Added queue full notify callback
This commit is contained in:
@@ -92,9 +92,9 @@ public:
|
|||||||
Async ///< Empty the buffer and don't wait for acks.
|
Async ///< Empty the buffer and don't wait for acks.
|
||||||
};
|
};
|
||||||
enum class QueueFullNotification {
|
enum class QueueFullNotification {
|
||||||
None, ///< Don't notify
|
None, ///< Don't notify (default).
|
||||||
EdgeTriggered, ///< Notify once. Application must call queue_full_trigger_reset() to enable again.
|
OncePerMessage, ///< Notify once per message.
|
||||||
EachOccurence ///< Notify on each occurence.
|
EachOccurence ///< Notify on each occurence.
|
||||||
};
|
};
|
||||||
/**
|
/**
|
||||||
* Concrete builder
|
* Concrete builder
|
||||||
@@ -144,6 +144,14 @@ public:
|
|||||||
* then FlushFailureCallback should not be set.
|
* then FlushFailureCallback should not be set.
|
||||||
*/
|
*/
|
||||||
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
|
using FlushTerminationCallback = std::function<void(const MessageBuilder&, Error error)>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback to indicate a queue full error was received when producing.
|
||||||
|
*
|
||||||
|
* The MessageBuilder instance represents the message which triggered the error. This callback will be called
|
||||||
|
* according to the set_queue_full_notification() setting.
|
||||||
|
*/
|
||||||
|
using QueueFullCallback = std::function<void(const MessageBuilder&)>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Constructs a buffered producer using the provided configuration
|
* \brief Constructs a buffered producer using the provided configuration
|
||||||
@@ -377,13 +385,6 @@ public:
|
|||||||
* Get the queue full notification type.
|
* Get the queue full notification type.
|
||||||
*/
|
*/
|
||||||
QueueFullNotification get_queue_full_notification() const;
|
QueueFullNotification get_queue_full_notification() const;
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset the queue full notification trigger.
|
|
||||||
*
|
|
||||||
* This function has no effect unless QueueFullNotification == EdgeTriggered.
|
|
||||||
*/
|
|
||||||
void queue_full_trigger_reset();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Sets the message produce failure callback
|
* \brief Sets the message produce failure callback
|
||||||
@@ -449,6 +450,18 @@ public:
|
|||||||
*/
|
*/
|
||||||
void set_flush_termination_callback(FlushTerminationCallback callback);
|
void set_flush_termination_callback(FlushTerminationCallback callback);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* \brief Sets the local queue full error callback
|
||||||
|
*
|
||||||
|
* This callback will be called when local message production fails during a produce() operation according to the
|
||||||
|
* set_queue_full_notification() setting.
|
||||||
|
*
|
||||||
|
* \param callback
|
||||||
|
*
|
||||||
|
* \warning Do not call any method on the BufferedProducer while inside this callback
|
||||||
|
*/
|
||||||
|
void set_queue_full_callback(QueueFullCallback callback);
|
||||||
|
|
||||||
struct TestParameters {
|
struct TestParameters {
|
||||||
bool force_delivery_error_;
|
bool force_delivery_error_;
|
||||||
bool force_produce_error_;
|
bool force_produce_error_;
|
||||||
@@ -523,6 +536,7 @@ private:
|
|||||||
ProduceTerminationCallback produce_termination_callback_;
|
ProduceTerminationCallback produce_termination_callback_;
|
||||||
FlushFailureCallback flush_failure_callback_;
|
FlushFailureCallback flush_failure_callback_;
|
||||||
FlushTerminationCallback flush_termination_callback_;
|
FlushTerminationCallback flush_termination_callback_;
|
||||||
|
QueueFullCallback queue_full_callback_;
|
||||||
ssize_t max_buffer_size_{-1};
|
ssize_t max_buffer_size_{-1};
|
||||||
FlushMethod flush_method_{FlushMethod::Sync};
|
FlushMethod flush_method_{FlushMethod::Sync};
|
||||||
std::atomic<size_t> pending_acks_{0};
|
std::atomic<size_t> pending_acks_{0};
|
||||||
@@ -532,7 +546,6 @@ private:
|
|||||||
int max_number_retries_{0};
|
int max_number_retries_{0};
|
||||||
bool has_internal_data_{false};
|
bool has_internal_data_{false};
|
||||||
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
|
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
|
||||||
bool queue_full_trigger_{true};
|
|
||||||
#ifdef KAFKA_TEST_INSTANCE
|
#ifdef KAFKA_TEST_INSTANCE
|
||||||
TestParameters* test_params_;
|
TestParameters* test_params_;
|
||||||
#endif
|
#endif
|
||||||
@@ -837,11 +850,6 @@ BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
|
|||||||
return queue_full_notification_;
|
return queue_full_notification_;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
|
||||||
void BufferedProducer<BufferType, Allocator>::queue_full_trigger_reset() {
|
|
||||||
queue_full_trigger_ = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
|
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
|
||||||
produce_failure_callback_ = std::move(callback);
|
produce_failure_callback_ = std::move(callback);
|
||||||
@@ -867,13 +875,16 @@ void BufferedProducer<BufferType, Allocator>::set_flush_termination_callback(Flu
|
|||||||
flush_termination_callback_ = std::move(callback);
|
flush_termination_callback_ = std::move(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename BufferType, typename Allocator>
|
||||||
|
void BufferedProducer<BufferType, Allocator>::set_queue_full_callback(QueueFullCallback callback) {
|
||||||
|
queue_full_callback_ = std::move(callback);
|
||||||
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
template <typename BuilderType>
|
template <typename BuilderType>
|
||||||
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
|
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
|
||||||
using builder_type = typename std::decay<BuilderType>::type;
|
using builder_type = typename std::decay<BuilderType>::type;
|
||||||
bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false :
|
bool queue_full_notify = queue_full_notification_ != QueueFullNotification::None;
|
||||||
(queue_full_notification_ == QueueFullNotification::EdgeTriggered) ?
|
|
||||||
queue_full_trigger_ : true;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
|
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
|
||||||
@@ -889,10 +900,8 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
|
|||||||
producer_.poll();
|
producer_.poll();
|
||||||
// Notify application so it can slow-down production
|
// Notify application so it can slow-down production
|
||||||
if (queue_full_notify) {
|
if (queue_full_notify) {
|
||||||
queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state
|
queue_full_notify = queue_full_notification_ == QueueFullNotification::EachOccurence;
|
||||||
CallbackInvoker<Configuration::ErrorCallback>
|
CallbackInvoker<QueueFullCallback>("queue full", queue_full_callback_, &producer_)(builder);
|
||||||
("error", get_producer().get_configuration().get_error_callback(), &get_producer())
|
|
||||||
(get_producer(), static_cast<int>(ex.get_error().get_error()), ex.what());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|||||||
Reference in New Issue
Block a user