mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 02:57:53 +00:00
Added queue full notification
This commit is contained in:
@@ -87,8 +87,15 @@ template <typename BufferType,
|
||||
typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
|
||||
class CPPKAFKA_API BufferedProducer {
|
||||
public:
|
||||
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
|
||||
Async }; ///< Empty the buffer and don't wait for acks
|
||||
enum class FlushMethod {
|
||||
Sync, ///< Empty the buffer and wait for acks from the broker.
|
||||
Async ///< Empty the buffer and don't wait for acks.
|
||||
};
|
||||
enum class QueueFullNotification {
|
||||
None, ///< Don't notify
|
||||
EdgeTriggered, ///< Notify once. Application must call queue_full_trigger_reset() to enable again.
|
||||
EachOccurence ///< Notify on each occurence.
|
||||
};
|
||||
/**
|
||||
* Concrete builder
|
||||
*/
|
||||
@@ -359,6 +366,25 @@ public:
|
||||
*/
|
||||
Builder make_builder(std::string topic);
|
||||
|
||||
/**
|
||||
* Set the type of notification when RD_KAFKA_RESP_ERR__QUEUE_FULL is received.
|
||||
*
|
||||
* This will call the error callback for this producer. By default this is set to QueueFullNotification::None.
|
||||
*/
|
||||
void set_queue_full_notification(QueueFullNotification notification);
|
||||
|
||||
/**
|
||||
* Get the queue full notification type.
|
||||
*/
|
||||
QueueFullNotification get_queue_full_notification() const;
|
||||
|
||||
/**
|
||||
* Reset the queue full notification trigger.
|
||||
*
|
||||
* This function has no effect unless QueueFullNotification == EdgeTriggered.
|
||||
*/
|
||||
void queue_full_trigger_reset();
|
||||
|
||||
/**
|
||||
* \brief Sets the message produce failure callback
|
||||
*
|
||||
@@ -505,6 +531,8 @@ private:
|
||||
std::atomic<size_t> total_messages_dropped_{0};
|
||||
int max_number_retries_{0};
|
||||
bool has_internal_data_{false};
|
||||
QueueFullNotification queue_full_notification_{QueueFullNotification::None};
|
||||
bool queue_full_trigger_{true};
|
||||
#ifdef KAFKA_TEST_INSTANCE
|
||||
TestParameters* test_params_;
|
||||
#endif
|
||||
@@ -798,6 +826,22 @@ BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
|
||||
return Builder(std::move(topic));
|
||||
}
|
||||
|
||||
template <typename BufferType, typename Allocator>
|
||||
void BufferedProducer<BufferType, Allocator>::set_queue_full_notification(QueueFullNotification notification) {
|
||||
queue_full_notification_ = notification;
|
||||
}
|
||||
|
||||
template <typename BufferType, typename Allocator>
|
||||
typename BufferedProducer<BufferType, Allocator>::QueueFullNotification
|
||||
BufferedProducer<BufferType, Allocator>::get_queue_full_notification() const {
|
||||
return queue_full_notification_;
|
||||
}
|
||||
|
||||
template <typename BufferType, typename Allocator>
|
||||
void BufferedProducer<BufferType, Allocator>::queue_full_trigger_reset() {
|
||||
queue_full_trigger_ = true;
|
||||
}
|
||||
|
||||
template <typename BufferType, typename Allocator>
|
||||
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
|
||||
produce_failure_callback_ = std::move(callback);
|
||||
@@ -827,6 +871,9 @@ template <typename BufferType, typename Allocator>
|
||||
template <typename BuilderType>
|
||||
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
|
||||
using builder_type = typename std::decay<BuilderType>::type;
|
||||
bool queue_full_notify = (queue_full_notification_ == QueueFullNotification::None) ? false :
|
||||
(queue_full_notification_ == QueueFullNotification::EdgeTriggered) ?
|
||||
queue_full_trigger_ : true;
|
||||
while (true) {
|
||||
try {
|
||||
MessageInternalGuard<builder_type> internal_guard(const_cast<builder_type&>(builder));
|
||||
@@ -840,6 +887,13 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil
|
||||
if (ex.get_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
|
||||
// If the output queue is full, then just poll
|
||||
producer_.poll();
|
||||
// Notify application so it can slow-down production
|
||||
if (queue_full_notify) {
|
||||
queue_full_notify = queue_full_trigger_ = false; //clear trigger and local state
|
||||
CallbackInvoker<Configuration::ErrorCallback>
|
||||
("error", get_producer().get_configuration().get_error_callback(), &get_producer())
|
||||
(get_producer(), static_cast<int>(ex.get_error().get_error()), ex.what());
|
||||
}
|
||||
}
|
||||
else {
|
||||
throw;
|
||||
|
||||
Reference in New Issue
Block a user