mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-31 02:27:46 +00:00 
			
		
		
		
	Added implementation for thread-aware ack monitoring
This commit is contained in:
		| @@ -34,12 +34,11 @@ | ||||
| #include <deque> | ||||
| #include <cstdint> | ||||
| #include <algorithm> | ||||
| #include <unordered_set> | ||||
| #include <unordered_map> | ||||
| #include <map> | ||||
| #include <mutex> | ||||
| #include <atomic> | ||||
| #include <future> | ||||
| #include <thread> | ||||
| #include <boost/optional.hpp> | ||||
| #include "../producer.h" | ||||
| #include "../detail/callback_invoker.h" | ||||
| @@ -53,8 +52,9 @@ namespace cppkafka { | ||||
|  * This class allows buffering messages and flushing them synchronously while also allowing | ||||
|  * to produce them just as you would using the Producer class. | ||||
|  * | ||||
|  * When calling either flush or wait_for_acks, the buffered producer will block until all | ||||
|  * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. | ||||
|  * When calling either flush or wait_for_acks/wait_for_current_thread_acks, the buffered producer | ||||
|  * will block until all produced messages (either buffered or sent directly) are acknowledged | ||||
|  * by the kafka brokers. | ||||
|  * | ||||
|  * When producing messages, this class will handle cases where the producer's queue is full so it | ||||
|  * will poll until the production is successful. | ||||
| @@ -185,8 +185,9 @@ public: | ||||
|     /** | ||||
|      * \brief Produces a message asynchronously without buffering it | ||||
|      * | ||||
|      * The message will still be tracked so that a call to flush or wait_for_acks will actually | ||||
|      * wait for it to be acknowledged. | ||||
|      * The message will still be tracked so that a call to flush or | ||||
|      * wait_for_acks/wait_for_current_thread_acks will actually wait for it | ||||
|      * to be acknowledged. | ||||
|      * | ||||
|      * \param builder The builder that contains the message to be produced | ||||
|      * | ||||
| @@ -220,8 +221,9 @@ public: | ||||
|     /** | ||||
|      * \brief Produces a message asynchronously without buffering it | ||||
|      * | ||||
|      * The message will still be tracked so that a call to flush or wait_for_acks will actually | ||||
|      * wait for it to be acknowledged. | ||||
|      * The message will still be tracked so that a call to flush or | ||||
|      * wait_for_acks/wait_for_current_thread_acks will actually wait for it | ||||
|      * to be acknowledged. | ||||
|      * | ||||
|      * \param message The message to be produced | ||||
|      * | ||||
| @@ -241,7 +243,7 @@ public: | ||||
|      * \brief Flushes the buffered messages. | ||||
|      * | ||||
|      * This will send all messages and keep waiting until all of them are acknowledged (this is | ||||
|      * done by calling wait_for_acks). | ||||
|      * done by calling wait_for_acks/wait_for_current_thread_acks). | ||||
|      * | ||||
|      * \param preserve_order If set to True, each message in the queue will be flushed only when the previous | ||||
|      *                       message ack is received. This may result in performance degradation as messages | ||||
| @@ -267,16 +269,30 @@ public: | ||||
|     bool flush(std::chrono::milliseconds timeout, bool preserve_order = false); | ||||
|  | ||||
|     /** | ||||
|      * Waits for produced message's acknowledgements from the brokers | ||||
|      * \brief Waits for produced message's acknowledgements from the brokers | ||||
|      */ | ||||
|     void wait_for_acks(); | ||||
|      | ||||
|     /** | ||||
|      * Waits for produced message's acknowledgements from the brokers up to 'timeout'. | ||||
|      * \brief Waits for acknowledgements from brokers for messages produced | ||||
|      *        on the current thread only | ||||
|      */ | ||||
|     void wait_for_current_thread_acks(); | ||||
|      | ||||
|     /** | ||||
|      * \brief Waits for produced message's acknowledgements from the brokers up to 'timeout'. | ||||
|      * | ||||
|      * \return True if the operation completes and all acks have been received. | ||||
|      */ | ||||
|     bool wait_for_acks(std::chrono::milliseconds timeout); | ||||
|      | ||||
|     /** | ||||
|      * \brief Waits for acknowledgements from brokers for messages produced | ||||
|      *        on the current thread only. Times out after 'timeout' milliseconds. | ||||
|      * | ||||
|      * \return True if the operation completes and all acks have been received. | ||||
|      */ | ||||
|     bool wait_for_current_thread_acks(std::chrono::milliseconds timeout); | ||||
|  | ||||
|     /** | ||||
|      * Clears any buffered messages | ||||
| @@ -327,12 +343,20 @@ public: | ||||
|     FlushMethod get_flush_method() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the number of messages not yet acked by the broker | ||||
|      * \brief Get the number of messages not yet acked by the broker. | ||||
|      * | ||||
|      * \return The number of messages | ||||
|      */ | ||||
|     size_t get_pending_acks() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the number of pending acks for messages produces on the | ||||
|      *        current thread only. | ||||
|      * | ||||
|      * \return The number of messages | ||||
|      */ | ||||
|     size_t get_current_thread_pending_acks() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the total number of messages successfully produced since the beginning | ||||
|      * | ||||
| @@ -358,9 +382,10 @@ public: | ||||
|     size_t get_flushes_in_progress() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Sets the maximum number of retries per message until giving up | ||||
|      * \brief Sets the maximum number of retries per message until giving up. Default is 5. | ||||
|      * | ||||
|      * Default is 5 | ||||
|      * \remark Is it recommended to set the RdKafka option message.send.max.retries=0 | ||||
|      *         to prevent re-ordering of messages inside RdKafka. | ||||
|      */ | ||||
|     void set_max_number_retries(size_t max_number_retries); | ||||
|      | ||||
| @@ -495,8 +520,9 @@ protected: | ||||
|  | ||||
| private: | ||||
|     enum class SenderType { Sync, Async }; | ||||
|     enum class QueueKind { Retry, Regular }; | ||||
|     enum class QueueKind { Retry, Produce }; | ||||
|     enum class FlushAction { DontFlush, DoFlush }; | ||||
|     enum class Threads { All, Current }; | ||||
|  | ||||
|     // Simple RAII type which increments a counter on construction and | ||||
|     // decrements it on destruction, meant to be used as reference counting. | ||||
| @@ -569,6 +595,79 @@ private: | ||||
|     }; | ||||
|     using TrackerPtr = std::shared_ptr<Tracker>; | ||||
|      | ||||
|     // The AckMonitor is responsible for properly counting the | ||||
|     // outstanding unacknowledged messages for each thread as well | ||||
|     // as the total acks. Counting acks on a per-thread basis is | ||||
|     // critical in a multi-threaded producer since we don't want one | ||||
|     // producer having to wait for all concurrent pending acks. Each | ||||
|     // producer should only wait for his own acks. | ||||
|     struct AckMonitor | ||||
|     { | ||||
|         // Increments the number of sent acks | ||||
|         void increment_pending_acks() { | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 //save the last ack number for this thread so we only | ||||
|                 //wait up to this number. | ||||
|                 last_ack_[std::this_thread::get_id()] = ++sent_acks_; | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         // Increments the number of received acks, | ||||
|         // reducing the total pending acks. | ||||
|         void decrement_pending_acks() { | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 ++received_acks_; | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         // Returns true if there are any pending acks overall. | ||||
|         bool has_pending_acks() const { | ||||
|             return get_pending_acks() > 0; | ||||
|         } | ||||
|         // Returns true if there are any pending acks on this thread. | ||||
|         bool has_current_thread_pending_acks() const { | ||||
|             return get_current_thread_pending_acks() > 0; | ||||
|         } | ||||
|         // Returns total pending acks. This is the difference between | ||||
|         // total produced and total received. | ||||
|         ssize_t get_pending_acks() const { | ||||
|             ssize_t rc = 0; | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 rc = get_pending_acks_impl(); | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|             return rc; | ||||
|         } | ||||
|         // Returns the total pending acks for this thread | ||||
|         ssize_t get_current_thread_pending_acks() const { | ||||
|             ssize_t rc = 0; | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 rc = get_current_thread_pending_acks_impl(); | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|             return rc; | ||||
|         } | ||||
|     private: | ||||
|         ssize_t get_pending_acks_impl() const { | ||||
|             return (sent_acks_ - received_acks_); | ||||
|         } | ||||
|         ssize_t get_current_thread_pending_acks_impl() const { | ||||
|             auto it  = last_ack_.find(std::this_thread::get_id()); | ||||
|             if (it != last_ack_.end()) { | ||||
|                 return (it->second > received_acks_) ? it->second - received_acks_ : 0; | ||||
|             } | ||||
|             return 0; | ||||
|         } | ||||
|         mutable std::atomic_flag flag_{0}; | ||||
|         ssize_t sent_acks_{0}; | ||||
|         ssize_t received_acks_{0}; | ||||
|         std::map<std::thread::id, ssize_t> last_ack_; //last ack number expected for this thread | ||||
|     }; | ||||
|      | ||||
|     // Returns existing tracker or creates new one | ||||
|     template <typename BuilderType> | ||||
|     TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { | ||||
| @@ -597,6 +696,7 @@ private: | ||||
|     template <typename BuilderType> | ||||
|     void async_produce(BuilderType&& message, bool throw_on_error); | ||||
|     static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); | ||||
|     bool wait_for_acks_impl(Threads threads, std::chrono::milliseconds timeout); | ||||
|  | ||||
|     // Static members | ||||
|     static const std::chrono::milliseconds infinite_timeout; | ||||
| @@ -616,7 +716,7 @@ private: | ||||
|     QueueFullCallback queue_full_callback_; | ||||
|     ssize_t max_buffer_size_{-1}; | ||||
|     FlushMethod flush_method_{FlushMethod::Sync}; | ||||
|     std::atomic<size_t> pending_acks_{0}; | ||||
|     AckMonitor ack_monitor_; | ||||
|     std::atomic<size_t> flushes_in_progress_{0}; | ||||
|     std::atomic<size_t> total_messages_produced_{0}; | ||||
|     std::atomic<size_t> total_messages_dropped_{0}; | ||||
| @@ -667,7 +767,7 @@ template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) { | ||||
|     add_tracker(SenderType::Async, builder); | ||||
|     //post message unto the producer queue | ||||
|     do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); | ||||
|     do_add_message(move(builder), QueueKind::Produce, FlushAction::DoFlush); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| @@ -707,7 +807,7 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& | ||||
|             //Wait w/o timeout since we must get the ack to avoid a race condition. | ||||
|             //Otherwise retry_again() will block as the producer won't get flushed | ||||
|             //and the delivery callback will never be invoked. | ||||
|             wait_for_acks(); | ||||
|             wait_for_current_thread_acks(); | ||||
|         } | ||||
|         while (tracker->retry_again() && | ||||
|               ((timeout == infinite_timeout) || | ||||
| @@ -717,8 +817,8 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& | ||||
|     else { | ||||
|         // produce once | ||||
|         produce_message(builder); | ||||
|         wait_for_acks(timeout); | ||||
|         return (pending_acks_ == 0); | ||||
|         wait_for_current_thread_acks(timeout); | ||||
|         return !ack_monitor_.has_current_thread_pending_acks(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -768,21 +868,41 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti | ||||
|     queue_flusher(messages_, mutex_); | ||||
|     if (!preserve_order) { | ||||
|         //Wait for acks from the messages produced above via async_produce | ||||
|         wait_for_acks(timeout); | ||||
|         wait_for_current_thread_acks(timeout); | ||||
|     } | ||||
|     return pending_acks_ == 0; | ||||
|     return !ack_monitor_.has_current_thread_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::wait_for_acks() { | ||||
|     //block until all acks have been received | ||||
|     wait_for_acks(infinite_timeout); | ||||
|     wait_for_acks_impl(Threads::All, infinite_timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks() { | ||||
|     //block until all acks from the current thread have been received | ||||
|     wait_for_acks_impl(Threads::Current, infinite_timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) { | ||||
|     //block until all acks have been received | ||||
|     return wait_for_acks_impl(Threads::All, timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks(std::chrono::milliseconds timeout) { | ||||
|     //block until all acks from the current thread have been received | ||||
|     return wait_for_acks_impl(Threads::Current, timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_acks_impl(Threads threads, | ||||
|                                                                  std::chrono::milliseconds timeout) { | ||||
|     auto remaining = timeout; | ||||
|     auto start_time = std::chrono::high_resolution_clock::now(); | ||||
|     bool pending_acks = true; | ||||
|     do { | ||||
|         try { | ||||
|             producer_.flush(remaining); | ||||
| @@ -791,7 +911,10 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise | ||||
|             // If we just hit the timeout, keep going, otherwise re-throw | ||||
|             if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { | ||||
|                 // There is no time remaining | ||||
|                 return (pending_acks_ == 0); | ||||
|                 pending_acks = (threads == Threads::All) ? | ||||
|                                 ack_monitor_.has_pending_acks() : | ||||
|                                 ack_monitor_.has_current_thread_pending_acks(); | ||||
|                 return !pending_acks; | ||||
|             } | ||||
|             else { | ||||
|                 throw; | ||||
| @@ -800,9 +923,11 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise | ||||
|         // calculate remaining time | ||||
|         remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> | ||||
|             (std::chrono::high_resolution_clock::now() - start_time); | ||||
|     } while ((pending_acks_ > 0) && | ||||
|             ((remaining.count() > 0) || (timeout == infinite_timeout))); | ||||
|     return (pending_acks_ == 0); | ||||
|         pending_acks = (threads == Threads::All) ? | ||||
|                         ack_monitor_.has_pending_acks() : | ||||
|                         ack_monitor_.has_current_thread_pending_acks(); | ||||
|     } while (pending_acks && ((remaining.count() > 0) || (timeout == infinite_timeout))); | ||||
|     return !pending_acks; | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| @@ -864,9 +989,9 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build | ||||
|         std::lock_guard<std::mutex> lock(mutex_); | ||||
|         messages_.emplace_back(std::forward<BuilderType>(builder)); | ||||
|     } | ||||
|     // Flush the queues only if a regular message is added. Retry messages may be added | ||||
|     // Flush the queues only if a produced message is added. Retry messages may be added | ||||
|     // from on_delivery_report() during which flush()/async_flush() cannot be called. | ||||
|     if (queue_kind == QueueKind::Regular && | ||||
|     if (queue_kind == QueueKind::Produce && | ||||
|         flush_action == FlushAction::DoFlush && | ||||
|         (max_buffer_size_ >= 0) && | ||||
|         (max_buffer_size_ <= (ssize_t)get_buffer_size())) { | ||||
| @@ -891,7 +1016,12 @@ const Producer& BufferedProducer<BufferType, Allocator>::get_producer() const { | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| size_t BufferedProducer<BufferType, Allocator>::get_pending_acks() const { | ||||
|     return pending_acks_; | ||||
|     return ack_monitor_.get_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| size_t BufferedProducer<BufferType, Allocator>::get_current_thread_pending_acks() const { | ||||
|     return ack_monitor_.get_current_thread_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| @@ -980,7 +1110,7 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil | ||||
|             producer_.produce(builder); | ||||
|             internal_guard.release(); | ||||
|             // Sent successfully | ||||
|             ++pending_acks_; | ||||
|             ack_monitor_.increment_pending_acks(); | ||||
|             break; | ||||
|         } | ||||
|         catch (const HandleException& ex) { | ||||
| @@ -1092,9 +1222,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& | ||||
|         tracker->should_retry(retry); | ||||
|     } | ||||
|     // Decrement the expected acks and check to prevent underflow | ||||
|     if (pending_acks_ > 0) { | ||||
|         --pending_acks_; | ||||
|     } | ||||
|     ack_monitor_.decrement_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Alexander Damian
					Alexander Damian