mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-31 02:27:46 +00:00 
			
		
		
		
	Merge pull request #237 from accelerated/buff_prod_comments
Added clarifications and comments to the BufferedProducer class
This commit is contained in:
		| @@ -34,12 +34,11 @@ | ||||
| #include <deque> | ||||
| #include <cstdint> | ||||
| #include <algorithm> | ||||
| #include <unordered_set> | ||||
| #include <unordered_map> | ||||
| #include <map> | ||||
| #include <mutex> | ||||
| #include <atomic> | ||||
| #include <future> | ||||
| #include <thread> | ||||
| #include <boost/optional.hpp> | ||||
| #include "../producer.h" | ||||
| #include "../detail/callback_invoker.h" | ||||
| @@ -53,8 +52,9 @@ namespace cppkafka { | ||||
|  * This class allows buffering messages and flushing them synchronously while also allowing | ||||
|  * to produce them just as you would using the Producer class. | ||||
|  * | ||||
|  * When calling either flush or wait_for_acks, the buffered producer will block until all | ||||
|  * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. | ||||
|  * When calling either flush or wait_for_acks/wait_for_current_thread_acks, the buffered producer | ||||
|  * will block until all produced messages (either buffered or sent directly) are acknowledged | ||||
|  * by the kafka brokers. | ||||
|  * | ||||
|  * When producing messages, this class will handle cases where the producer's queue is full so it | ||||
|  * will poll until the production is successful. | ||||
| @@ -79,7 +79,7 @@ namespace cppkafka { | ||||
|  * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char> | ||||
|  * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type | ||||
|  * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka | ||||
|  * shall not make any internal copies of the message and it is the application's responsability to free | ||||
|  * shall not make any internal copies of the message and it is the application's responsibility to free | ||||
|  * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory | ||||
|  * corruptions. | ||||
|  */ | ||||
| @@ -185,8 +185,9 @@ public: | ||||
|     /** | ||||
|      * \brief Produces a message asynchronously without buffering it | ||||
|      * | ||||
|      * The message will still be tracked so that a call to flush or wait_for_acks will actually | ||||
|      * wait for it to be acknowledged. | ||||
|      * The message will still be tracked so that a call to flush or | ||||
|      * wait_for_acks/wait_for_current_thread_acks will actually wait for it | ||||
|      * to be acknowledged. | ||||
|      * | ||||
|      * \param builder The builder that contains the message to be produced | ||||
|      * | ||||
| @@ -206,11 +207,23 @@ public: | ||||
|      */ | ||||
|     void sync_produce(const MessageBuilder& builder); | ||||
|      | ||||
|     /** | ||||
|      * \brief Same as sync_produce but waits up to 'timeout' for acks to be received. | ||||
|      * | ||||
|      * If retries are enabled, the timeout will limit the amount of time to wait | ||||
|      * before all retries are completed. | ||||
|      * | ||||
|      * \returns True if succeeded, false otherwise. If retries are enabled, false | ||||
|      *          indicates there are still retries left. | ||||
|      */ | ||||
|     bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); | ||||
|  | ||||
|     /** | ||||
|      * \brief Produces a message asynchronously without buffering it | ||||
|      * | ||||
|      * The message will still be tracked so that a call to flush or wait_for_acks will actually | ||||
|      * wait for it to be acknowledged. | ||||
|      * The message will still be tracked so that a call to flush or | ||||
|      * wait_for_acks/wait_for_current_thread_acks will actually wait for it | ||||
|      * to be acknowledged. | ||||
|      * | ||||
|      * \param message The message to be produced | ||||
|      * | ||||
| @@ -230,7 +243,7 @@ public: | ||||
|      * \brief Flushes the buffered messages. | ||||
|      * | ||||
|      * This will send all messages and keep waiting until all of them are acknowledged (this is | ||||
|      * done by calling wait_for_acks). | ||||
|      * done by calling wait_for_acks/wait_for_current_thread_acks). | ||||
|      * | ||||
|      * \param preserve_order If set to True, each message in the queue will be flushed only when the previous | ||||
|      *                       message ack is received. This may result in performance degradation as messages | ||||
| @@ -256,17 +269,31 @@ public: | ||||
|     bool flush(std::chrono::milliseconds timeout, bool preserve_order = false); | ||||
|  | ||||
|     /** | ||||
|      * Waits for produced message's acknowledgements from the brokers | ||||
|      * \brief Waits for produced message's acknowledgements from the brokers | ||||
|      */ | ||||
|     void wait_for_acks(); | ||||
|      | ||||
|     /** | ||||
|      * Waits for produced message's acknowledgements from the brokers up to 'timeout'. | ||||
|      * \brief Waits for acknowledgements from brokers for messages produced | ||||
|      *        on the current thread only | ||||
|      */ | ||||
|     void wait_for_current_thread_acks(); | ||||
|      | ||||
|     /** | ||||
|      * \brief Waits for produced message's acknowledgements from the brokers up to 'timeout'. | ||||
|      * | ||||
|      * \return True if the operation completes and all acks have been received. | ||||
|      */ | ||||
|     bool wait_for_acks(std::chrono::milliseconds timeout); | ||||
|      | ||||
|     /** | ||||
|      * \brief Waits for acknowledgements from brokers for messages produced | ||||
|      *        on the current thread only. Times out after 'timeout' milliseconds. | ||||
|      * | ||||
|      * \return True if the operation completes and all acks have been received. | ||||
|      */ | ||||
|     bool wait_for_current_thread_acks(std::chrono::milliseconds timeout); | ||||
|  | ||||
|     /** | ||||
|      * Clears any buffered messages | ||||
|      */ | ||||
| @@ -316,12 +343,20 @@ public: | ||||
|     FlushMethod get_flush_method() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the number of messages not yet acked by the broker | ||||
|      * \brief Get the number of messages not yet acked by the broker. | ||||
|      * | ||||
|      * \return The number of messages | ||||
|      */ | ||||
|     size_t get_pending_acks() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the number of pending acks for messages produces on the | ||||
|      *        current thread only. | ||||
|      * | ||||
|      * \return The number of messages | ||||
|      */ | ||||
|     size_t get_current_thread_pending_acks() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Get the total number of messages successfully produced since the beginning | ||||
|      * | ||||
| @@ -347,9 +382,10 @@ public: | ||||
|     size_t get_flushes_in_progress() const; | ||||
|      | ||||
|     /** | ||||
|      * \brief Sets the maximum number of retries per message until giving up | ||||
|      * \brief Sets the maximum number of retries per message until giving up. Default is 5. | ||||
|      * | ||||
|      * Default is 5 | ||||
|      * \remark Is it recommended to set the RdKafka option message.send.max.retries=0 | ||||
|      *         to prevent re-ordering of messages inside RdKafka. | ||||
|      */ | ||||
|     void set_max_number_retries(size_t max_number_retries); | ||||
|      | ||||
| @@ -484,39 +520,170 @@ protected: | ||||
|  | ||||
| private: | ||||
|     enum class SenderType { Sync, Async }; | ||||
|     enum class QueueKind { Retry, Regular }; | ||||
|     enum class QueueKind { Retry, Produce }; | ||||
|     enum class FlushAction { DontFlush, DoFlush }; | ||||
|     enum class Threads { All, Current }; | ||||
|  | ||||
|     // Simple RAII type which increments a counter on construction and | ||||
|     // decrements it on destruction, meant to be used as reference counting. | ||||
|     template <typename T> | ||||
|     struct CounterGuard{ | ||||
|         CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; } | ||||
|         CounterGuard(std::atomic<T>& counter) | ||||
|         : counter_(counter) { | ||||
|             ++counter_; | ||||
|         } | ||||
|         ~CounterGuard() { --counter_; } | ||||
|         std::atomic<T>& counter_; | ||||
|     }; | ||||
|      | ||||
|     // If the application enables retry logic, this object is passed | ||||
|     // as internal (opaque) data with each message, so that it can keep | ||||
|     // track of each failed attempt. Only a single tracker will be | ||||
|     // instantiated and it's lifetime will be the same as the message it | ||||
|     // belongs to. | ||||
|     struct Tracker : public Internal { | ||||
|         Tracker(SenderType sender, size_t num_retries) | ||||
|             : sender_(sender), num_retries_(num_retries) | ||||
|         {} | ||||
|         std::future<bool> get_new_future() { | ||||
|             should_retry_ = std::promise<bool>(); //reset shared data | ||||
|             return should_retry_.get_future(); //issue new future | ||||
|         : sender_(sender), | ||||
|           num_retries_(num_retries) { | ||||
|         } | ||||
|         // Creates a new promise for synchronizing with the | ||||
|         // on_delivery_report() callback. For synchronous producers only. | ||||
|         void prepare_to_retry() { | ||||
|             if (sender_ == SenderType::Sync) { | ||||
|                 retry_promise_ = std::promise<bool>(); | ||||
|             } | ||||
|         } | ||||
|         // Waits for the on_delivery_report() callback and determines if this message | ||||
|         // should be retried. This call will block until on_delivery_report() executes. | ||||
|         // For synchronous producers only. | ||||
|         bool retry_again() { | ||||
|             if (sender_ == SenderType::Sync) { | ||||
|                 return retry_promise_.get_future().get(); | ||||
|             } | ||||
|             return false; | ||||
|         } | ||||
|         // Signal the synchronous producer if the message should be retried or not. | ||||
|         // Called from inside on_delivery_report(). For synchronous producers only. | ||||
|         void should_retry(bool value) const { | ||||
|             if (sender_ == SenderType::Sync) { | ||||
|                 try { | ||||
|                     retry_promise_.set_value(value); | ||||
|                 } | ||||
|                 catch (const std::future_error&) { | ||||
|                     //Promise has already been set once. | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         void set_sender_type(SenderType type) { | ||||
|             sender_ = type; | ||||
|         } | ||||
|         SenderType get_sender_type() const { | ||||
|             return sender_; | ||||
|         } | ||||
|         bool has_retries_left() const { | ||||
|             return num_retries_ > 0; | ||||
|         } | ||||
|         void decrement_retries() { | ||||
|             if (num_retries_ > 0) { | ||||
|                 --num_retries_; | ||||
|             } | ||||
|         } | ||||
|     private: | ||||
|         SenderType sender_; | ||||
|         std::promise<bool> should_retry_; | ||||
|         mutable std::promise<bool> retry_promise_; | ||||
|         size_t num_retries_; | ||||
|     }; | ||||
|     using TrackerPtr = std::shared_ptr<Tracker>; | ||||
|      | ||||
|     // The AckMonitor is responsible for properly counting the | ||||
|     // outstanding unacknowledged messages for each thread as well | ||||
|     // as the total acks. Counting acks on a per-thread basis is | ||||
|     // critical in a multi-threaded producer since we don't want one | ||||
|     // producer having to wait for all concurrent pending acks. Each | ||||
|     // producer should only wait for his own acks. | ||||
|     struct AckMonitor | ||||
|     { | ||||
|         // Increments the number of sent acks | ||||
|         void increment_pending_acks() { | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 //save the last ack number for this thread so we only | ||||
|                 //wait up to this number. | ||||
|                 last_ack_[std::this_thread::get_id()] = ++sent_acks_; | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         // Increments the number of received acks, | ||||
|         // reducing the total pending acks. | ||||
|         void decrement_pending_acks() { | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 ++received_acks_; | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         // Returns true if there are any pending acks overall. | ||||
|         bool has_pending_acks() const { | ||||
|             return get_pending_acks() > 0; | ||||
|         } | ||||
|         // Returns true if there are any pending acks on this thread. | ||||
|         bool has_current_thread_pending_acks() const { | ||||
|             return get_current_thread_pending_acks() > 0; | ||||
|         } | ||||
|         // Returns total pending acks. This is the difference between | ||||
|         // total produced and total received. | ||||
|         ssize_t get_pending_acks() const { | ||||
|             ssize_t rc = 0; | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 rc = get_pending_acks_impl(); | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|             return rc; | ||||
|         } | ||||
|         // Returns the total pending acks for this thread | ||||
|         ssize_t get_current_thread_pending_acks() const { | ||||
|             ssize_t rc = 0; | ||||
|             while (!flag_.test_and_set()) { | ||||
|                 rc = get_current_thread_pending_acks_impl(); | ||||
|                 flag_.clear(); | ||||
|                 break; | ||||
|             } | ||||
|             return rc; | ||||
|         } | ||||
|     private: | ||||
|         ssize_t get_pending_acks_impl() const { | ||||
|             return (sent_acks_ - received_acks_); | ||||
|         } | ||||
|         ssize_t get_current_thread_pending_acks_impl() const { | ||||
|             auto it  = last_ack_.find(std::this_thread::get_id()); | ||||
|             if (it != last_ack_.end()) { | ||||
|                 return (it->second > received_acks_) ? it->second - received_acks_ : 0; | ||||
|             } | ||||
|             return 0; | ||||
|         } | ||||
|         mutable std::atomic_flag flag_{0}; | ||||
|         ssize_t sent_acks_{0}; | ||||
|         ssize_t received_acks_{0}; | ||||
|         std::map<std::thread::id, ssize_t> last_ack_; //last ack number expected for this thread | ||||
|     }; | ||||
|      | ||||
|     // Returns existing tracker or creates new one | ||||
|     template <typename BuilderType> | ||||
|     TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { | ||||
|         if (has_internal_data_) { | ||||
|         if (enable_message_retries_) { | ||||
|             if (!builder.internal()) { | ||||
|                 // Add message tracker only if it hasn't been added before | ||||
|                 builder.internal(std::make_shared<Tracker>(sender, max_number_retries_)); | ||||
|                 return std::static_pointer_cast<Tracker>(builder.internal()); | ||||
|             } | ||||
|             return std::static_pointer_cast<Tracker>(builder.internal()); | ||||
|             // Return existing tracker | ||||
|             TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal()); | ||||
|             // Update the sender type. Since a message could have been initially produced | ||||
|             // asynchronously but then flushed synchronously (or vice-versa), the sender | ||||
|             // type should always reflect the latest retry mechanism. | ||||
|             tracker->set_sender_type(sender); | ||||
|             return tracker; | ||||
|         } | ||||
|         return nullptr; | ||||
|     } | ||||
| @@ -529,6 +696,11 @@ private: | ||||
|     template <typename BuilderType> | ||||
|     void async_produce(BuilderType&& message, bool throw_on_error); | ||||
|     static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); | ||||
|     bool wait_for_acks_impl(Threads threads, std::chrono::milliseconds timeout); | ||||
|  | ||||
|     // Static members | ||||
|     static const std::chrono::milliseconds infinite_timeout; | ||||
|     static const std::chrono::milliseconds no_timeout; | ||||
|      | ||||
|     // Members | ||||
|     Producer producer_; | ||||
| @@ -544,18 +716,26 @@ private: | ||||
|     QueueFullCallback queue_full_callback_; | ||||
|     ssize_t max_buffer_size_{-1}; | ||||
|     FlushMethod flush_method_{FlushMethod::Sync}; | ||||
|     std::atomic<size_t> pending_acks_{0}; | ||||
|     AckMonitor ack_monitor_; | ||||
|     std::atomic<size_t> flushes_in_progress_{0}; | ||||
|     std::atomic<size_t> total_messages_produced_{0}; | ||||
|     std::atomic<size_t> total_messages_dropped_{0}; | ||||
|     int max_number_retries_{0}; | ||||
|     bool has_internal_data_{false}; | ||||
|     bool enable_message_retries_{false}; | ||||
|     QueueFullNotification queue_full_notification_{QueueFullNotification::None}; | ||||
| #ifdef KAFKA_TEST_INSTANCE | ||||
|     TestParameters* test_params_; | ||||
| #endif | ||||
| }; | ||||
|  | ||||
| // Full blocking wait as per RdKafka | ||||
| template <typename BufferType, typename Allocator> | ||||
| const std::chrono::milliseconds | ||||
| BufferedProducer<BufferType, Allocator>::infinite_timeout = std::chrono::milliseconds(-1); | ||||
| template <typename BufferType, typename Allocator> | ||||
| const std::chrono::milliseconds | ||||
| BufferedProducer<BufferType, Allocator>::no_timeout = std::chrono::milliseconds::zero(); | ||||
|  | ||||
| template <typename BufferType> | ||||
| Producer::PayloadPolicy get_default_payload_policy() { | ||||
|     return Producer::PayloadPolicy::COPY_PAYLOAD; | ||||
| @@ -586,12 +766,16 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder& | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) { | ||||
|     add_tracker(SenderType::Async, builder); | ||||
|     do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); | ||||
|     //post message unto the producer queue | ||||
|     do_add_message(move(builder), QueueKind::Produce, FlushAction::DoFlush); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& builder) { | ||||
|     if (has_internal_data_) { | ||||
|     if (enable_message_retries_) { | ||||
|         //Adding a retry tracker requires copying the builder since | ||||
|         //we cannot modify the original instance. Cloning is a fast operation | ||||
|         //since the MessageBuilder class holds pointers to data only. | ||||
|         MessageBuilder builder_clone(builder.clone()); | ||||
|         add_tracker(SenderType::Async, builder_clone); | ||||
|         async_produce(builder_clone, true); | ||||
| @@ -603,22 +787,38 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) { | ||||
|     if (has_internal_data_) { | ||||
|     sync_produce(builder, infinite_timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder, | ||||
|                                                            std::chrono::milliseconds timeout) { | ||||
|     if (enable_message_retries_) { | ||||
|         //Adding a retry tracker requires copying the builder since | ||||
|         //we cannot modify the original instance. Cloning is a fast operation | ||||
|         //since the MessageBuilder class holds pointers to data only. | ||||
|         MessageBuilder builder_clone(builder.clone()); | ||||
|         TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); | ||||
|         // produce until we succeed or we reach max retry limit | ||||
|         std::future<bool> should_retry; | ||||
|         auto endTime = std::chrono::steady_clock::now() + timeout; | ||||
|         do { | ||||
|             should_retry = tracker->get_new_future(); | ||||
|             tracker->prepare_to_retry(); | ||||
|             produce_message(builder_clone); | ||||
|             wait_for_acks(); | ||||
|             //Wait w/o timeout since we must get the ack to avoid a race condition. | ||||
|             //Otherwise retry_again() will block as the producer won't get flushed | ||||
|             //and the delivery callback will never be invoked. | ||||
|             wait_for_current_thread_acks(); | ||||
|         } | ||||
|         while (should_retry.get()); | ||||
|         while (tracker->retry_again() && | ||||
|               ((timeout == infinite_timeout) || | ||||
|               (std::chrono::steady_clock::now() >= endTime))); | ||||
|         return !tracker->has_retries_left(); | ||||
|     } | ||||
|     else { | ||||
|         // produce once | ||||
|         produce_message(builder); | ||||
|         wait_for_acks(); | ||||
|         wait_for_current_thread_acks(timeout); | ||||
|         return !ack_monitor_.has_current_thread_pending_acks(); | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -629,117 +829,80 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) { | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::async_flush() { | ||||
|     CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|     auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void | ||||
|     { | ||||
|         QueueType flush_queue; // flush from temporary queue | ||||
|         swap_queues(queue, flush_queue, mutex); | ||||
|          | ||||
|         while (!flush_queue.empty()) { | ||||
|             async_produce(std::move(flush_queue.front()), false); | ||||
|             flush_queue.pop_front(); | ||||
|         } | ||||
|     }; | ||||
|     queue_flusher(retry_messages_, retry_mutex_); | ||||
|     queue_flusher(messages_, mutex_); | ||||
|     wait_for_acks(std::chrono::milliseconds(0)); //flush the producer but don't wait | ||||
|     flush(no_timeout, false); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) { | ||||
|     if (preserve_order) { | ||||
|         CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|         auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void | ||||
|         { | ||||
|             QueueType flush_queue; // flush from temporary queue | ||||
|             swap_queues(queue, flush_queue, mutex); | ||||
|  | ||||
|             while (!flush_queue.empty()) { | ||||
|                 sync_produce(flush_queue.front()); | ||||
|                 flush_queue.pop_front(); | ||||
|             } | ||||
|         }; | ||||
|         queue_flusher(retry_messages_, retry_mutex_); | ||||
|         queue_flusher(messages_, mutex_); | ||||
|     } | ||||
|     else { | ||||
|         async_flush(); | ||||
|         wait_for_acks(); | ||||
|     } | ||||
|     flush(infinite_timeout, preserve_order); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout, | ||||
|                                                     bool preserve_order) { | ||||
|     if (preserve_order) { | ||||
|         CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|     CounterGuard<size_t> counter_guard(flushes_in_progress_); | ||||
|     auto queue_flusher = [timeout, preserve_order, this] | ||||
|                          (QueueType& queue, std::mutex & mutex)->void | ||||
|     { | ||||
|         QueueType flush_queue; // flush from temporary queue | ||||
|         swap_queues(messages_, flush_queue, mutex_); | ||||
|         QueueType retry_flush_queue; // flush from temporary retry queue | ||||
|         swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); | ||||
|  | ||||
|         auto queue_flusher = [this](QueueType& queue)->bool | ||||
|         { | ||||
|             if (!queue.empty()) { | ||||
|                 sync_produce(queue.front()); | ||||
|                 queue.pop_front(); | ||||
|                 return true; | ||||
|         swap_queues(queue, flush_queue, mutex); | ||||
|         //Produce one message at a time and wait for acks until queue is empty | ||||
|         while (!flush_queue.empty()) { | ||||
|             if (preserve_order) { | ||||
|                 //When preserving order, we must ensure that each message | ||||
|                 //gets delivered before producing the next one. | ||||
|                 sync_produce(flush_queue.front(), timeout); | ||||
|             } | ||||
|             return false; | ||||
|         }; | ||||
|         auto remaining = timeout; | ||||
|         auto start_time = std::chrono::high_resolution_clock::now(); | ||||
|         do { | ||||
|             if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { | ||||
|                 break; | ||||
|             else { | ||||
|                 //Produce as fast as possible w/o waiting. If one or more | ||||
|                 //messages fail, they will be re-enqueued for retry | ||||
|                 //on the next flush cycle, which causes re-ordering. | ||||
|                 async_produce(flush_queue.front(), false); | ||||
|             } | ||||
|             // calculate remaining time | ||||
|             remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> | ||||
|                 (std::chrono::high_resolution_clock::now() - start_time); | ||||
|         } while (remaining.count() > 0); | ||||
|          | ||||
|         // Re-enqueue remaining messages in original order | ||||
|         auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void | ||||
|         { | ||||
|             if (!src_queue.empty()) { | ||||
|                 std::lock_guard<std::mutex> lock(mutex); | ||||
|                 dst_queue.insert(dst_queue.begin(), | ||||
|                                  std::make_move_iterator(src_queue.begin()), | ||||
|                                  std::make_move_iterator(src_queue.end())); | ||||
|             } | ||||
|         }; | ||||
|         re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); | ||||
|         re_enqueuer(flush_queue, messages_, mutex_); | ||||
|         return true; | ||||
|     } | ||||
|     else { | ||||
|         async_flush(); | ||||
|         return wait_for_acks(timeout); | ||||
|             flush_queue.pop_front(); | ||||
|         } | ||||
|     }; | ||||
|     //Produce retry queue first since these messages were produced first. | ||||
|     queue_flusher(retry_messages_, retry_mutex_); | ||||
|     //Produce recently enqueued messages | ||||
|     queue_flusher(messages_, mutex_); | ||||
|     if (!preserve_order) { | ||||
|         //Wait for acks from the messages produced above via async_produce | ||||
|         wait_for_current_thread_acks(timeout); | ||||
|     } | ||||
|     return !ack_monitor_.has_current_thread_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::wait_for_acks() { | ||||
|     while (pending_acks_ > 0) { | ||||
|         try { | ||||
|             producer_.flush(); | ||||
|         } | ||||
|         catch (const HandleException& ex) { | ||||
|             // If we just hit the timeout, keep going, otherwise re-throw | ||||
|             if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { | ||||
|                 continue; | ||||
|             } | ||||
|             else { | ||||
|                 throw; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|     //block until all acks have been received | ||||
|     wait_for_acks_impl(Threads::All, infinite_timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks() { | ||||
|     //block until all acks from the current thread have been received | ||||
|     wait_for_acks_impl(Threads::Current, infinite_timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) { | ||||
|     //block until all acks have been received | ||||
|     return wait_for_acks_impl(Threads::All, timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_current_thread_acks(std::chrono::milliseconds timeout) { | ||||
|     //block until all acks from the current thread have been received | ||||
|     return wait_for_acks_impl(Threads::Current, timeout); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| bool BufferedProducer<BufferType, Allocator>::wait_for_acks_impl(Threads threads, | ||||
|                                                                  std::chrono::milliseconds timeout) { | ||||
|     auto remaining = timeout; | ||||
|     auto start_time = std::chrono::high_resolution_clock::now(); | ||||
|     bool pending_acks = true; | ||||
|     do { | ||||
|         try { | ||||
|             producer_.flush(remaining); | ||||
| @@ -748,7 +911,10 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise | ||||
|             // If we just hit the timeout, keep going, otherwise re-throw | ||||
|             if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { | ||||
|                 // There is no time remaining | ||||
|                 return (pending_acks_ == 0); | ||||
|                 pending_acks = (threads == Threads::All) ? | ||||
|                                 ack_monitor_.has_pending_acks() : | ||||
|                                 ack_monitor_.has_current_thread_pending_acks(); | ||||
|                 return !pending_acks; | ||||
|             } | ||||
|             else { | ||||
|                 throw; | ||||
| @@ -757,8 +923,11 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise | ||||
|         // calculate remaining time | ||||
|         remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> | ||||
|             (std::chrono::high_resolution_clock::now() - start_time); | ||||
|     } while ((pending_acks_ > 0) && (remaining.count() > 0)); | ||||
|     return (pending_acks_ == 0); | ||||
|         pending_acks = (threads == Threads::All) ? | ||||
|                         ack_monitor_.has_pending_acks() : | ||||
|                         ack_monitor_.has_current_thread_pending_acks(); | ||||
|     } while (pending_acks && ((remaining.count() > 0) || (timeout == infinite_timeout))); | ||||
|     return !pending_acks; | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| @@ -820,10 +989,12 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build | ||||
|         std::lock_guard<std::mutex> lock(mutex_); | ||||
|         messages_.emplace_back(std::forward<BuilderType>(builder)); | ||||
|     } | ||||
|  | ||||
|     // Flush the queues only if a regular message is added. Retry messages may be added | ||||
|     // from rdkafka callbacks, and flush/async_flush is a user-level call | ||||
|     if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { | ||||
|     // Flush the queues only if a produced message is added. Retry messages may be added | ||||
|     // from on_delivery_report() during which flush()/async_flush() cannot be called. | ||||
|     if (queue_kind == QueueKind::Produce && | ||||
|         flush_action == FlushAction::DoFlush && | ||||
|         (max_buffer_size_ >= 0) && | ||||
|         (max_buffer_size_ <= (ssize_t)get_buffer_size())) { | ||||
|         if (flush_method_ == FlushMethod::Sync) { | ||||
|             flush(); | ||||
|         } | ||||
| @@ -845,7 +1016,12 @@ const Producer& BufferedProducer<BufferType, Allocator>::get_producer() const { | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| size_t BufferedProducer<BufferType, Allocator>::get_pending_acks() const { | ||||
|     return pending_acks_; | ||||
|     return ack_monitor_.get_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| size_t BufferedProducer<BufferType, Allocator>::get_current_thread_pending_acks() const { | ||||
|     return ack_monitor_.get_current_thread_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| @@ -865,8 +1041,8 @@ size_t BufferedProducer<BufferType, Allocator>::get_flushes_in_progress() const | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::set_max_number_retries(size_t max_number_retries) { | ||||
|     if (!has_internal_data_ && (max_number_retries > 0)) { | ||||
|         has_internal_data_ = true; //enable once | ||||
|     if (!enable_message_retries_ && (max_number_retries > 0)) { | ||||
|         enable_message_retries_ = true; //enable once | ||||
|     } | ||||
|     max_number_retries_ = max_number_retries; | ||||
| } | ||||
| @@ -934,7 +1110,7 @@ void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& buil | ||||
|             producer_.produce(builder); | ||||
|             internal_guard.release(); | ||||
|             // Sent successfully | ||||
|             ++pending_acks_; | ||||
|             ack_monitor_.increment_pending_acks(); | ||||
|             break; | ||||
|         } | ||||
|         catch (const HandleException& ex) { | ||||
| @@ -969,8 +1145,11 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde | ||||
|         CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_); | ||||
|         if (!callback || callback(builder, ex.get_error())) { | ||||
|             TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal()); | ||||
|             if (tracker && tracker->num_retries_ > 0) { | ||||
|                 --tracker->num_retries_; | ||||
|             if (tracker && tracker->has_retries_left()) { | ||||
|                 tracker->decrement_retries(); | ||||
|                 //Post message unto the retry queue. This queue has higher priority and will be | ||||
|                 //flushed before the producer queue to preserve original message order. | ||||
|                 //We don't flush now since we just had an error while producing. | ||||
|                 do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush); | ||||
|                 return; | ||||
|             } | ||||
| @@ -995,24 +1174,30 @@ Configuration BufferedProducer<BufferType, Allocator>::prepare_configuration(Con | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& message) { | ||||
|     //Get tracker data | ||||
|     TestParameters* test_params = get_test_parameters(); | ||||
|     TrackerPtr tracker = has_internal_data_ ? | ||||
|         std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->get_internal()) : nullptr; | ||||
|     bool should_retry = false; | ||||
|     //Get tracker if present | ||||
|     TrackerPtr tracker = | ||||
|         enable_message_retries_ ? | ||||
|         std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->get_internal()) : | ||||
|         nullptr; | ||||
|     bool retry = false; | ||||
|     if (message.get_error() || (test_params && test_params->force_delivery_error_)) { | ||||
|         // We should produce this message again if we don't have a produce failure callback | ||||
|         // or we have one but it returns true | ||||
|         // or we have one but it returns true (indicating error is re-tryable) | ||||
|         CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_); | ||||
|         if (!callback || callback(message)) { | ||||
|             // Check if we have reached the maximum retry limit | ||||
|             if (tracker && tracker->num_retries_ > 0) { | ||||
|                 --tracker->num_retries_; | ||||
|                 if (tracker->sender_ == SenderType::Async) { | ||||
|                     // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) | ||||
|             if (tracker && tracker->has_retries_left()) { | ||||
|                 tracker->decrement_retries(); | ||||
|                 //If the sender is asynchronous, the message is re-enqueued. If the sender is | ||||
|                 //synchronous, we simply notify via Tracker::should_retry() below. | ||||
|                 if (tracker->get_sender_type() == SenderType::Async) { | ||||
|                     //Post message unto the retry queue. This queue has higher priority and will be | ||||
|                     //flushed later by the application (before the producer queue) to preserve original message order. | ||||
|                     //We prevent flushing now since we are within a callback context. | ||||
|                     do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); | ||||
|                 } | ||||
|                 should_retry = true; | ||||
|                 retry = true; | ||||
|             } | ||||
|             else { | ||||
|                 ++total_messages_dropped_; | ||||
| @@ -1032,23 +1217,18 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& | ||||
|         // Increment the total successful transmissions | ||||
|         ++total_messages_produced_; | ||||
|     } | ||||
|     // Signal producers | ||||
|     // Signal synchronous sender and unblock it since it's waiting for this ack to arrive. | ||||
|     if (tracker) { | ||||
|         try { | ||||
|             tracker->should_retry_.set_value(should_retry); | ||||
|         } | ||||
|         catch (const std::future_error& ex) { | ||||
|             //This is an async retry and future is not being read | ||||
|         } | ||||
|         tracker->should_retry(retry); | ||||
|     } | ||||
|     // Decrement the expected acks and check to prevent underflow | ||||
|     if (pending_acks_ > 0) { | ||||
|         --pending_acks_; | ||||
|     } | ||||
|     ack_monitor_.decrement_pending_acks(); | ||||
| } | ||||
|  | ||||
| template <typename BufferType, typename Allocator> | ||||
| void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex) | ||||
| void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, | ||||
|                                                           BufferedProducer<BufferType, Allocator>::QueueType & queue2, | ||||
|                                                           std::mutex & mutex) | ||||
| { | ||||
|     std::lock_guard<std::mutex> lock(mutex); | ||||
|     std::swap(queue1, queue2); | ||||
|   | ||||
| @@ -1,12 +1,34 @@ | ||||
| include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) | ||||
| include_directories(SYSTEM ${CATCH_INCLUDE}) | ||||
|  | ||||
| set(KAFKA_TEST_INSTANCE "kafka-vm:9092"  | ||||
| if (NOT KAFKA_TEST_INSTANCE) | ||||
| set(KAFKA_TEST_INSTANCE kafka-vm:9092 | ||||
|     CACHE STRING "The kafka instance to which to connect to run tests") | ||||
| endif() | ||||
| if (NOT KAFKA_NUM_PARTITIONS) | ||||
|    set(KAFKA_NUM_PARTITIONS 3 CACHE STRING "Kafka Number of partitions") | ||||
| endif() | ||||
| if (NOT KAFKA_TOPICS) | ||||
|    set(KAFKA_TOPICS "cppkafka_test1;cppkafka_test2" CACHE STRING "Kafka topics") | ||||
| endif() | ||||
|  | ||||
| # Convert list of topics into a C++ initializer list | ||||
| FOREACH(TOPIC ${KAFKA_TOPICS}) | ||||
|    if (NOT TOPIC_LIST) | ||||
|       set(TOPIC_LIST "\"${TOPIC}\"") | ||||
|    else() | ||||
|       set(TOPIC_LIST "${TOPIC_LIST},\"${TOPIC}\"") | ||||
|    endif() | ||||
| ENDFOREACH() | ||||
|  | ||||
| add_custom_target(tests) | ||||
|  | ||||
| include_directories(${CMAKE_CURRENT_SOURCE_DIR}) | ||||
| add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") | ||||
| add_definitions( | ||||
|   "-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"" | ||||
|   -DKAFKA_NUM_PARTITIONS=${KAFKA_NUM_PARTITIONS} | ||||
|   -DKAFKA_TOPIC_NAMES=${TOPIC_LIST} | ||||
| ) | ||||
|  | ||||
| add_executable(cppkafka_tests | ||||
|     buffer_test.cpp | ||||
| @@ -25,6 +47,6 @@ add_executable(cppkafka_tests | ||||
| ) | ||||
|  | ||||
| # In CMake >= 3.15 Boost::boost == Boost::headers | ||||
| target_link_libraries(cppkafka_tests cppkafka RdKafka::rdkafka Boost::boost Boost::program_options ) | ||||
| target_link_libraries(cppkafka_tests cppkafka RdKafka::rdkafka Boost::boost Boost::program_options) | ||||
| add_dependencies(tests cppkafka_tests) | ||||
| add_test(cppkafka cppkafka_tests) | ||||
|   | ||||
| @@ -15,8 +15,7 @@ using Catch::TestCaseStats; | ||||
| using Catch::Totals; | ||||
| using Catch::Session; | ||||
|  | ||||
| std::vector<std::string> KAFKA_TOPICS = {"cppkafka_test1", "cppkafka_test2"}; | ||||
| int KAFKA_NUM_PARTITIONS = 3; | ||||
| std::vector<std::string> KAFKA_TOPICS = {KAFKA_TOPIC_NAMES}; | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
|   | ||||
| @@ -9,7 +9,6 @@ | ||||
| #include "cppkafka/utils/consumer_dispatcher.h" | ||||
|  | ||||
| extern const std::vector<std::string> KAFKA_TOPICS; | ||||
| extern const int KAFKA_NUM_PARTITIONS; | ||||
|  | ||||
| using namespace cppkafka; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Matias Fontanini
					Matias Fontanini