mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-30 18:17:58 +00:00 
			
		
		
		
	Proper implementation of flush() with timeout
This commit is contained in:
		| @@ -206,6 +206,11 @@ public: | ||||
|      */ | ||||
|     void sync_produce(const MessageBuilder& builder); | ||||
|      | ||||
|     /** | ||||
|      * \brief Same as sync_produce but waits up to 'timeout' for acks to be received. | ||||
|      */ | ||||
|     void sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); | ||||
|  | ||||
|     /** | ||||
|      * \brief Produces a message asynchronously without buffering it | ||||
|      * | ||||
| @@ -481,7 +486,7 @@ protected: | ||||
|         return nullptr; | ||||
|     } | ||||
| #endif | ||||
|      | ||||
|  | ||||
| private: | ||||
|     enum class SenderType { Sync, Async }; | ||||
|     enum class QueueKind { Retry, Regular }; | ||||
| @@ -523,6 +528,7 @@ private: | ||||
|             if (sender_ == SenderType::Sync) { | ||||
|                 return retry_promise_.get_future().get(); | ||||
|             } | ||||
|             return false; | ||||
|         } | ||||
|         // Signal the synchronous producer if the message should be retried or not. | ||||
|         // Called from inside on_delivery_report(). For synchronous producers only. | ||||
| @@ -663,6 +669,12 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) { | ||||
|     sync_produce(builder, producer_.get_timeout()); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder, | ||||
|                                                            std::chrono::milliseconds timeout) { | ||||
|     if (enable_message_retries_) { | ||||
|         //Adding a retry tracker requires copying the builder since | ||||
|         //we cannot modify the original instance. Cloning is a fast operation | ||||
| @@ -673,14 +685,14 @@ void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& | ||||
|         do { | ||||
|             tracker->prepare_to_retry(); | ||||
|             produce_message(builder_clone); | ||||
|             wait_for_acks(); | ||||
|             wait_for_acks(timeout); | ||||
|         } | ||||
|         while (tracker->retry_again()); | ||||
|     } | ||||
|     else { | ||||
|         // produce once | ||||
|         produce_message(builder); | ||||
|         wait_for_acks(); | ||||
|         wait_for_acks(timeout); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -714,15 +726,21 @@ void BufferedProducer<BufferType, Allocator>::async_flush() { | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) { | ||||
|     flush(producer_.get_timeout(), preserve_order); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout, | ||||
|                                                     bool preserve_order) { | ||||
|     if (preserve_order) { | ||||
|         CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|         auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void | ||||
|         auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void | ||||
|         { | ||||
|             QueueType flush_queue; // flush from temporary queue | ||||
|             swap_queues(queue, flush_queue, mutex); | ||||
|             //Produce one message at a time and wait for acks until queue is empty | ||||
|             while (!flush_queue.empty()) { | ||||
|                 sync_produce(flush_queue.front()); | ||||
|                 sync_produce(flush_queue.front(), timeout); | ||||
|                 flush_queue.pop_front(); | ||||
|             } | ||||
|         }; | ||||
| @@ -734,61 +752,9 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) { | ||||
|     else { | ||||
|         //Produce all messages at once then wait for acks. | ||||
|         async_flush(); | ||||
|         wait_for_acks(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout, | ||||
|                                                     bool preserve_order) { | ||||
|     if (preserve_order) { | ||||
|         CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|         QueueType flush_queue; // flush from temporary queue | ||||
|         swap_queues(messages_, flush_queue, mutex_); | ||||
|         QueueType retry_flush_queue; // flush from temporary retry queue | ||||
|         swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); | ||||
|  | ||||
|         auto queue_flusher = [this](QueueType& queue)->bool | ||||
|         { | ||||
|             //Produce one message at a time and wait for acks | ||||
|             if (!queue.empty()) { | ||||
|                 sync_produce(queue.front()); | ||||
|                 queue.pop_front(); | ||||
|                 return true; | ||||
|             } | ||||
|             return false; | ||||
|         }; | ||||
|         auto remaining = timeout; | ||||
|         auto start_time = std::chrono::high_resolution_clock::now(); | ||||
|         do { | ||||
|             if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { | ||||
|                 break; | ||||
|             } | ||||
|             // calculate remaining time | ||||
|             remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> | ||||
|                 (std::chrono::high_resolution_clock::now() - start_time); | ||||
|         } while (remaining.count() > 0); | ||||
|          | ||||
|         // When timeout has expired, any remaining messages must be re-enqueue in their | ||||
|         // original order so they can be flushed later. | ||||
|         auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool | ||||
|         { | ||||
|             if (!src_queue.empty()) { | ||||
|                 std::lock_guard<std::mutex> lock(mutex); | ||||
|                 dst_queue.insert(dst_queue.begin(), | ||||
|                                  std::make_move_iterator(src_queue.begin()), | ||||
|                                  std::make_move_iterator(src_queue.end())); | ||||
|                 return true; | ||||
|             } | ||||
|             return false; | ||||
|         }; | ||||
|         return !re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_) && | ||||
|                !re_enqueuer(flush_queue, messages_, mutex_); | ||||
|     } | ||||
|     else { | ||||
|         async_flush(); | ||||
|         return wait_for_acks(timeout); | ||||
|         wait_for_acks(timeout); | ||||
|     } | ||||
|     return pending_acks_ == 0; | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Alexander Damian
					Alexander Damian