From 935a34238bdea6fb9d39f8a073f9daee5acd0a34 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Mon, 17 Feb 2020 11:25:26 -0500 Subject: [PATCH] Added implementation for thread-aware ack monitoring --- include/cppkafka/utils/buffered_producer.h | 196 +++++++++++++++++---- 1 file changed, 162 insertions(+), 34 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 3b2ae8e..3f5424f 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -34,12 +34,11 @@ #include #include #include -#include -#include #include #include #include #include +#include #include #include "../producer.h" #include "../detail/callback_invoker.h" @@ -53,8 +52,9 @@ namespace cppkafka { * This class allows buffering messages and flushing them synchronously while also allowing * to produce them just as you would using the Producer class. * - * When calling either flush or wait_for_acks, the buffered producer will block until all - * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. + * When calling either flush or wait_for_acks/wait_for_current_thread_acks, the buffered producer + * will block until all produced messages (either buffered or sent directly) are acknowledged + * by the kafka brokers. * * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. @@ -185,8 +185,9 @@ public: /** * \brief Produces a message asynchronously without buffering it * - * The message will still be tracked so that a call to flush or wait_for_acks will actually - * wait for it to be acknowledged. + * The message will still be tracked so that a call to flush or + * wait_for_acks/wait_for_current_thread_acks will actually wait for it + * to be acknowledged. * * \param builder The builder that contains the message to be produced * @@ -220,8 +221,9 @@ public: /** * \brief Produces a message asynchronously without buffering it * - * The message will still be tracked so that a call to flush or wait_for_acks will actually - * wait for it to be acknowledged. + * The message will still be tracked so that a call to flush or + * wait_for_acks/wait_for_current_thread_acks will actually wait for it + * to be acknowledged. * * \param message The message to be produced * @@ -241,7 +243,7 @@ public: * \brief Flushes the buffered messages. * * This will send all messages and keep waiting until all of them are acknowledged (this is - * done by calling wait_for_acks). + * done by calling wait_for_acks/wait_for_current_thread_acks). * * \param preserve_order If set to True, each message in the queue will be flushed only when the previous * message ack is received. This may result in performance degradation as messages @@ -267,16 +269,30 @@ public: bool flush(std::chrono::milliseconds timeout, bool preserve_order = false); /** - * Waits for produced message's acknowledgements from the brokers + * \brief Waits for produced message's acknowledgements from the brokers */ void wait_for_acks(); /** - * Waits for produced message's acknowledgements from the brokers up to 'timeout'. + * \brief Waits for acknowledgements from brokers for messages produced + * on the current thread only + */ + void wait_for_current_thread_acks(); + + /** + * \brief Waits for produced message's acknowledgements from the brokers up to 'timeout'. * * \return True if the operation completes and all acks have been received. */ bool wait_for_acks(std::chrono::milliseconds timeout); + + /** + * \brief Waits for acknowledgements from brokers for messages produced + * on the current thread only. Times out after 'timeout' milliseconds. + * + * \return True if the operation completes and all acks have been received. + */ + bool wait_for_current_thread_acks(std::chrono::milliseconds timeout); /** * Clears any buffered messages @@ -327,12 +343,20 @@ public: FlushMethod get_flush_method() const; /** - * \brief Get the number of messages not yet acked by the broker + * \brief Get the number of messages not yet acked by the broker. * * \return The number of messages */ size_t get_pending_acks() const; + /** + * \brief Get the number of pending acks for messages produces on the + * current thread only. + * + * \return The number of messages + */ + size_t get_current_thread_pending_acks() const; + /** * \brief Get the total number of messages successfully produced since the beginning * @@ -358,9 +382,10 @@ public: size_t get_flushes_in_progress() const; /** - * \brief Sets the maximum number of retries per message until giving up + * \brief Sets the maximum number of retries per message until giving up. Default is 5. * - * Default is 5 + * \remark Is it recommended to set the RdKafka option message.send.max.retries=0 + * to prevent re-ordering of messages inside RdKafka. */ void set_max_number_retries(size_t max_number_retries); @@ -495,8 +520,9 @@ protected: private: enum class SenderType { Sync, Async }; - enum class QueueKind { Retry, Regular }; + enum class QueueKind { Retry, Produce }; enum class FlushAction { DontFlush, DoFlush }; + enum class Threads { All, Current }; // Simple RAII type which increments a counter on construction and // decrements it on destruction, meant to be used as reference counting. @@ -569,6 +595,79 @@ private: }; using TrackerPtr = std::shared_ptr; + // The AckMonitor is responsible for properly counting the + // outstanding unacknowledged messages for each thread as well + // as the total acks. Counting acks on a per-thread basis is + // critical in a multi-threaded producer since we don't want one + // producer having to wait for all concurrent pending acks. Each + // producer should only wait for his own acks. + struct AckMonitor + { + // Increments the number of sent acks + void increment_pending_acks() { + while (!flag_.test_and_set()) { + //save the last ack number for this thread so we only + //wait up to this number. + last_ack_[std::this_thread::get_id()] = ++sent_acks_; + flag_.clear(); + break; + } + } + // Increments the number of received acks, + // reducing the total pending acks. + void decrement_pending_acks() { + while (!flag_.test_and_set()) { + ++received_acks_; + flag_.clear(); + break; + } + } + // Returns true if there are any pending acks overall. + bool has_pending_acks() const { + return get_pending_acks() > 0; + } + // Returns true if there are any pending acks on this thread. + bool has_current_thread_pending_acks() const { + return get_current_thread_pending_acks() > 0; + } + // Returns total pending acks. This is the difference between + // total produced and total received. + ssize_t get_pending_acks() const { + ssize_t rc = 0; + while (!flag_.test_and_set()) { + rc = get_pending_acks_impl(); + flag_.clear(); + break; + } + return rc; + } + // Returns the total pending acks for this thread + ssize_t get_current_thread_pending_acks() const { + ssize_t rc = 0; + while (!flag_.test_and_set()) { + rc = get_current_thread_pending_acks_impl(); + flag_.clear(); + break; + } + return rc; + } + private: + ssize_t get_pending_acks_impl() const { + return (sent_acks_ - received_acks_); + } + ssize_t get_current_thread_pending_acks_impl() const { + auto it = last_ack_.find(std::this_thread::get_id()); + if (it != last_ack_.end()) { + return (it->second > received_acks_) ? it->second - received_acks_ : 0; + } + return 0; + } + mutable std::atomic_flag flag_{0}; + ssize_t sent_acks_{0}; + ssize_t received_acks_{0}; + std::map last_ack_; //last ack number expected for this thread + }; + // Returns existing tracker or creates new one template TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { @@ -597,6 +696,7 @@ private: template void async_produce(BuilderType&& message, bool throw_on_error); static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); + bool wait_for_acks_impl(Threads threads, std::chrono::milliseconds timeout); // Static members static const std::chrono::milliseconds infinite_timeout; @@ -616,7 +716,7 @@ private: QueueFullCallback queue_full_callback_; ssize_t max_buffer_size_{-1}; FlushMethod flush_method_{FlushMethod::Sync}; - std::atomic pending_acks_{0}; + AckMonitor ack_monitor_; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; std::atomic total_messages_dropped_{0}; @@ -667,7 +767,7 @@ template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); //post message unto the producer queue - do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); + do_add_message(move(builder), QueueKind::Produce, FlushAction::DoFlush); } template @@ -707,7 +807,7 @@ bool BufferedProducer::sync_produce(const MessageBuilder& //Wait w/o timeout since we must get the ack to avoid a race condition. //Otherwise retry_again() will block as the producer won't get flushed //and the delivery callback will never be invoked. - wait_for_acks(); + wait_for_current_thread_acks(); } while (tracker->retry_again() && ((timeout == infinite_timeout) || @@ -717,8 +817,8 @@ bool BufferedProducer::sync_produce(const MessageBuilder& else { // produce once produce_message(builder); - wait_for_acks(timeout); - return (pending_acks_ == 0); + wait_for_current_thread_acks(timeout); + return !ack_monitor_.has_current_thread_pending_acks(); } } @@ -768,21 +868,41 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti queue_flusher(messages_, mutex_); if (!preserve_order) { //Wait for acks from the messages produced above via async_produce - wait_for_acks(timeout); + wait_for_current_thread_acks(timeout); } - return pending_acks_ == 0; + return !ack_monitor_.has_current_thread_pending_acks(); } template void BufferedProducer::wait_for_acks() { //block until all acks have been received - wait_for_acks(infinite_timeout); + wait_for_acks_impl(Threads::All, infinite_timeout); +} + +template +void BufferedProducer::wait_for_current_thread_acks() { + //block until all acks from the current thread have been received + wait_for_acks_impl(Threads::Current, infinite_timeout); } template bool BufferedProducer::wait_for_acks(std::chrono::milliseconds timeout) { + //block until all acks have been received + return wait_for_acks_impl(Threads::All, timeout); +} + +template +bool BufferedProducer::wait_for_current_thread_acks(std::chrono::milliseconds timeout) { + //block until all acks from the current thread have been received + return wait_for_acks_impl(Threads::Current, timeout); +} + +template +bool BufferedProducer::wait_for_acks_impl(Threads threads, + std::chrono::milliseconds timeout) { auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); + bool pending_acks = true; do { try { producer_.flush(remaining); @@ -791,7 +911,10 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // If we just hit the timeout, keep going, otherwise re-throw if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { // There is no time remaining - return (pending_acks_ == 0); + pending_acks = (threads == Threads::All) ? + ack_monitor_.has_pending_acks() : + ack_monitor_.has_current_thread_pending_acks(); + return !pending_acks; } else { throw; @@ -800,9 +923,11 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while ((pending_acks_ > 0) && - ((remaining.count() > 0) || (timeout == infinite_timeout))); - return (pending_acks_ == 0); + pending_acks = (threads == Threads::All) ? + ack_monitor_.has_pending_acks() : + ack_monitor_.has_current_thread_pending_acks(); + } while (pending_acks && ((remaining.count() > 0) || (timeout == infinite_timeout))); + return !pending_acks; } template @@ -864,9 +989,9 @@ void BufferedProducer::do_add_message(BuilderType&& build std::lock_guard lock(mutex_); messages_.emplace_back(std::forward(builder)); } - // Flush the queues only if a regular message is added. Retry messages may be added + // Flush the queues only if a produced message is added. Retry messages may be added // from on_delivery_report() during which flush()/async_flush() cannot be called. - if (queue_kind == QueueKind::Regular && + if (queue_kind == QueueKind::Produce && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { @@ -891,7 +1016,12 @@ const Producer& BufferedProducer::get_producer() const { template size_t BufferedProducer::get_pending_acks() const { - return pending_acks_; + return ack_monitor_.get_pending_acks(); +} + +template +size_t BufferedProducer::get_current_thread_pending_acks() const { + return ack_monitor_.get_current_thread_pending_acks(); } template @@ -980,7 +1110,7 @@ void BufferedProducer::produce_message(BuilderType&& buil producer_.produce(builder); internal_guard.release(); // Sent successfully - ++pending_acks_; + ack_monitor_.increment_pending_acks(); break; } catch (const HandleException& ex) { @@ -1092,9 +1222,7 @@ void BufferedProducer::on_delivery_report(const Message& tracker->should_retry(retry); } // Decrement the expected acks and check to prevent underflow - if (pending_acks_ > 0) { - --pending_acks_; - } + ack_monitor_.decrement_pending_acks(); } template