mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-11-04 04:27:48 +00:00 
			
		
		
		
	Merge pull request #153 from demin80/hi-priority-queue-fix
Added a high-priority queue to BufferedProducer to avoid message re-ordering
This commit is contained in:
		@@ -482,8 +482,9 @@ protected:
 | 
				
			|||||||
#endif
 | 
					#endif
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
private:
 | 
					private:
 | 
				
			||||||
    enum class MessagePriority { Low, High };
 | 
					 | 
				
			||||||
    enum class SenderType { Sync, Async };
 | 
					    enum class SenderType { Sync, Async };
 | 
				
			||||||
 | 
					    enum class QueueKind { Retry, Regular };
 | 
				
			||||||
 | 
					    enum class FlushAction { DontFlush, DoFlush };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    template <typename T>
 | 
					    template <typename T>
 | 
				
			||||||
    struct CounterGuard{
 | 
					    struct CounterGuard{
 | 
				
			||||||
@@ -519,18 +520,21 @@ private:
 | 
				
			|||||||
        return nullptr;
 | 
					        return nullptr;
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    template <typename BuilderType>
 | 
					    template <typename BuilderType>
 | 
				
			||||||
    void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
 | 
					    void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
 | 
				
			||||||
    template <typename BuilderType>
 | 
					    template <typename BuilderType>
 | 
				
			||||||
    void produce_message(BuilderType&& builder);
 | 
					    void produce_message(BuilderType&& builder);
 | 
				
			||||||
    Configuration prepare_configuration(Configuration config);
 | 
					    Configuration prepare_configuration(Configuration config);
 | 
				
			||||||
    void on_delivery_report(const Message& message);
 | 
					    void on_delivery_report(const Message& message);
 | 
				
			||||||
    template <typename BuilderType>
 | 
					    template <typename BuilderType>
 | 
				
			||||||
    void async_produce(BuilderType&& message, bool throw_on_error);
 | 
					    void async_produce(BuilderType&& message, bool throw_on_error);
 | 
				
			||||||
 | 
					    static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // Members
 | 
					    // Members
 | 
				
			||||||
    Producer producer_;
 | 
					    Producer producer_;
 | 
				
			||||||
    QueueType messages_;
 | 
					    QueueType messages_;
 | 
				
			||||||
 | 
					    QueueType retry_messages_;
 | 
				
			||||||
    mutable std::mutex mutex_;
 | 
					    mutable std::mutex mutex_;
 | 
				
			||||||
 | 
					    mutable std::mutex retry_mutex_;
 | 
				
			||||||
    ProduceSuccessCallback produce_success_callback_;
 | 
					    ProduceSuccessCallback produce_success_callback_;
 | 
				
			||||||
    ProduceFailureCallback produce_failure_callback_;
 | 
					    ProduceFailureCallback produce_failure_callback_;
 | 
				
			||||||
    ProduceTerminationCallback produce_termination_callback_;
 | 
					    ProduceTerminationCallback produce_termination_callback_;
 | 
				
			||||||
@@ -565,7 +569,8 @@ template <typename BufferType, typename Allocator>
 | 
				
			|||||||
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
 | 
					BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
 | 
				
			||||||
                                                          const Allocator& alloc)
 | 
					                                                          const Allocator& alloc)
 | 
				
			||||||
: producer_(prepare_configuration(std::move(config))),
 | 
					: producer_(prepare_configuration(std::move(config))),
 | 
				
			||||||
  messages_(alloc) {
 | 
					  messages_(alloc),
 | 
				
			||||||
 | 
					  retry_messages_(alloc) {
 | 
				
			||||||
    producer_.set_payload_policy(get_default_payload_policy<BufferType>());
 | 
					    producer_.set_payload_policy(get_default_payload_policy<BufferType>());
 | 
				
			||||||
#ifdef KAFKA_TEST_INSTANCE
 | 
					#ifdef KAFKA_TEST_INSTANCE
 | 
				
			||||||
    test_params_ = nullptr;
 | 
					    test_params_ = nullptr;
 | 
				
			||||||
@@ -580,7 +585,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
 | 
				
			|||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
 | 
					void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
 | 
				
			||||||
    add_tracker(SenderType::Async, builder);
 | 
					    add_tracker(SenderType::Async, builder);
 | 
				
			||||||
    do_add_message(move(builder), MessagePriority::Low, true);
 | 
					    do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
@@ -624,30 +629,36 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
 | 
				
			|||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
void BufferedProducer<BufferType, Allocator>::async_flush() {
 | 
					void BufferedProducer<BufferType, Allocator>::async_flush() {
 | 
				
			||||||
    CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
					    CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
				
			||||||
    QueueType flush_queue; // flush from temporary queue
 | 
					    auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        std::lock_guard<std::mutex> lock(mutex_);
 | 
					        QueueType flush_queue; // flush from temporary queue
 | 
				
			||||||
        std::swap(messages_, flush_queue);
 | 
					        swap_queues(queue, flush_queue, mutex);
 | 
				
			||||||
    }
 | 
					        
 | 
				
			||||||
        while (!flush_queue.empty()) {
 | 
					        while (!flush_queue.empty()) {
 | 
				
			||||||
            async_produce(std::move(flush_queue.front()), false);
 | 
					            async_produce(std::move(flush_queue.front()), false);
 | 
				
			||||||
            flush_queue.pop_front();
 | 
					            flush_queue.pop_front();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					    };
 | 
				
			||||||
 | 
					    queue_flusher(retry_messages_, retry_mutex_);
 | 
				
			||||||
 | 
					    queue_flusher(messages_, mutex_);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
 | 
					void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
 | 
				
			||||||
    if (preserve_order) {
 | 
					    if (preserve_order) {
 | 
				
			||||||
        CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
					        CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
				
			||||||
        QueueType flush_queue; // flush from temporary queue
 | 
					        auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            std::lock_guard<std::mutex> lock(mutex_);
 | 
					            QueueType flush_queue; // flush from temporary queue
 | 
				
			||||||
            std::swap(messages_, flush_queue);
 | 
					            swap_queues(queue, flush_queue, mutex);
 | 
				
			||||||
        }
 | 
					
 | 
				
			||||||
            while (!flush_queue.empty()) {
 | 
					            while (!flush_queue.empty()) {
 | 
				
			||||||
                sync_produce(flush_queue.front());
 | 
					                sync_produce(flush_queue.front());
 | 
				
			||||||
                flush_queue.pop_front();
 | 
					                flush_queue.pop_front();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        queue_flusher(retry_messages_, retry_mutex_);
 | 
				
			||||||
 | 
					        queue_flusher(messages_, mutex_);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else {
 | 
					    else {
 | 
				
			||||||
        async_flush();
 | 
					        async_flush();
 | 
				
			||||||
@@ -661,25 +672,42 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
 | 
				
			|||||||
    if (preserve_order) {
 | 
					    if (preserve_order) {
 | 
				
			||||||
        CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
					        CounterGuard<size_t> counter_guard(flushes_in_progress_);
 | 
				
			||||||
        QueueType flush_queue; // flush from temporary queue
 | 
					        QueueType flush_queue; // flush from temporary queue
 | 
				
			||||||
 | 
					        swap_queues(messages_, flush_queue, mutex_);
 | 
				
			||||||
 | 
					        QueueType retry_flush_queue; // flush from temporary retry queue
 | 
				
			||||||
 | 
					        swap_queues(retry_messages_, retry_flush_queue, retry_mutex_);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        auto queue_flusher = [this](QueueType& queue)->bool
 | 
				
			||||||
        {
 | 
					        {
 | 
				
			||||||
            std::lock_guard<std::mutex> lock(mutex_);
 | 
					            if (!queue.empty()) {
 | 
				
			||||||
            std::swap(messages_, flush_queue);
 | 
					                sync_produce(queue.front());
 | 
				
			||||||
 | 
					                queue.pop_front();
 | 
				
			||||||
 | 
					                return true;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					            return false;
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
        auto remaining = timeout;
 | 
					        auto remaining = timeout;
 | 
				
			||||||
        auto start_time = std::chrono::high_resolution_clock::now();
 | 
					        auto start_time = std::chrono::high_resolution_clock::now();
 | 
				
			||||||
        do {
 | 
					        do {
 | 
				
			||||||
            sync_produce(flush_queue.front());
 | 
					            if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
 | 
				
			||||||
            flush_queue.pop_front();
 | 
					                break;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
            // calculate remaining time
 | 
					            // calculate remaining time
 | 
				
			||||||
            remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
 | 
					            remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
 | 
				
			||||||
                (std::chrono::high_resolution_clock::now() - start_time);
 | 
					                (std::chrono::high_resolution_clock::now() - start_time);
 | 
				
			||||||
        } while (!flush_queue.empty() && (remaining.count() > 0));
 | 
					        } while (remaining.count() > 0);
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        // Re-enqueue remaining messages in original order
 | 
					        // Re-enqueue remaining messages in original order
 | 
				
			||||||
        if (!flush_queue.empty()) {
 | 
					        auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void
 | 
				
			||||||
            std::lock_guard<std::mutex> lock(mutex_);
 | 
					        {
 | 
				
			||||||
            messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end()));
 | 
					            if (!src_queue.empty()) {
 | 
				
			||||||
 | 
					                std::lock_guard<std::mutex> lock(mutex);
 | 
				
			||||||
 | 
					                dst_queue.insert(dst_queue.begin(),
 | 
				
			||||||
 | 
					                                 std::make_move_iterator(src_queue.begin()),
 | 
				
			||||||
 | 
					                                 std::make_move_iterator(src_queue.end()));
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
 | 
					        };
 | 
				
			||||||
 | 
					        re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_);
 | 
				
			||||||
 | 
					        re_enqueuer(flush_queue, messages_, mutex_);
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else {
 | 
					    else {
 | 
				
			||||||
        async_flush();
 | 
					        async_flush();
 | 
				
			||||||
@@ -732,14 +760,15 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
void BufferedProducer<BufferType, Allocator>::clear() {
 | 
					void BufferedProducer<BufferType, Allocator>::clear() {
 | 
				
			||||||
    std::lock_guard<std::mutex> lock(mutex_);
 | 
					 | 
				
			||||||
    QueueType tmp;
 | 
					    QueueType tmp;
 | 
				
			||||||
    std::swap(tmp, messages_);
 | 
					    swap_queues(messages_, tmp, mutex_);
 | 
				
			||||||
 | 
					    QueueType retry_tmp;
 | 
				
			||||||
 | 
					    swap_queues(retry_messages_, retry_tmp, retry_mutex_);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
 | 
					size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
 | 
				
			||||||
    return messages_.size();
 | 
					    return messages_.size() + retry_messages_.size();
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
@@ -769,18 +798,20 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
 | 
				
			|||||||
template <typename BufferType, typename Allocator>
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
template <typename BuilderType>
 | 
					template <typename BuilderType>
 | 
				
			||||||
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
 | 
					void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
 | 
				
			||||||
                                                             MessagePriority priority,
 | 
					                                                             QueueKind queue_kind,
 | 
				
			||||||
                                                             bool do_flush) {
 | 
					                                                             FlushAction flush_action) {
 | 
				
			||||||
    {
 | 
					    if (queue_kind == QueueKind::Retry) {
 | 
				
			||||||
        std::lock_guard<std::mutex> lock(mutex_);
 | 
					        std::lock_guard<std::mutex> lock(retry_mutex_);
 | 
				
			||||||
        if (priority == MessagePriority::High) {
 | 
					        retry_messages_.emplace_back(std::forward<BuilderType>(builder));
 | 
				
			||||||
            messages_.emplace_front(std::forward<BuilderType>(builder));
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    else {
 | 
					    else {
 | 
				
			||||||
 | 
					        std::lock_guard<std::mutex> lock(mutex_);
 | 
				
			||||||
        messages_.emplace_back(std::forward<BuilderType>(builder));
 | 
					        messages_.emplace_back(std::forward<BuilderType>(builder));
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    }
 | 
					
 | 
				
			||||||
    if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
 | 
					    // Flush the queues only if a regular message is added. Retry messages may be added
 | 
				
			||||||
 | 
					    // from rdkafka callbacks, and flush/async_flush is a user-level call
 | 
				
			||||||
 | 
					    if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
 | 
				
			||||||
        if (flush_method_ == FlushMethod::Sync) {
 | 
					        if (flush_method_ == FlushMethod::Sync) {
 | 
				
			||||||
            flush();
 | 
					            flush();
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@@ -928,7 +959,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
 | 
				
			|||||||
            TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
 | 
					            TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
 | 
				
			||||||
            if (tracker && tracker->num_retries_ > 0) {
 | 
					            if (tracker && tracker->num_retries_ > 0) {
 | 
				
			||||||
                --tracker->num_retries_;
 | 
					                --tracker->num_retries_;
 | 
				
			||||||
                do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false);
 | 
					                do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush);
 | 
				
			||||||
                return;
 | 
					                return;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
@@ -967,7 +998,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
 | 
				
			|||||||
                --tracker->num_retries_;
 | 
					                --tracker->num_retries_;
 | 
				
			||||||
                if (tracker->sender_ == SenderType::Async) {
 | 
					                if (tracker->sender_ == SenderType::Async) {
 | 
				
			||||||
                    // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
 | 
					                    // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
 | 
				
			||||||
                    do_add_message(Builder(message), MessagePriority::High, false);
 | 
					                    do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush);
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
                should_retry = true;
 | 
					                should_retry = true;
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
@@ -999,6 +1030,13 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					template <typename BufferType, typename Allocator>
 | 
				
			||||||
 | 
					void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    std::lock_guard<std::mutex> lock(mutex);
 | 
				
			||||||
 | 
					    std::swap(queue1, queue2);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
} // cppkafka
 | 
					} // cppkafka
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#endif // CPPKAFKA_BUFFERED_PRODUCER_H
 | 
					#endif // CPPKAFKA_BUFFERED_PRODUCER_H
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user