diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 147f582..3f5424f 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -34,12 +34,11 @@ #include #include #include -#include -#include #include #include #include #include +#include #include #include "../producer.h" #include "../detail/callback_invoker.h" @@ -53,8 +52,9 @@ namespace cppkafka { * This class allows buffering messages and flushing them synchronously while also allowing * to produce them just as you would using the Producer class. * - * When calling either flush or wait_for_acks, the buffered producer will block until all - * produced messages (either buffered or sent directly) are acknowledged by the kafka brokers. + * When calling either flush or wait_for_acks/wait_for_current_thread_acks, the buffered producer + * will block until all produced messages (either buffered or sent directly) are acknowledged + * by the kafka brokers. * * When producing messages, this class will handle cases where the producer's queue is full so it * will poll until the production is successful. @@ -79,7 +79,7 @@ namespace cppkafka { * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka - * shall not make any internal copies of the message and it is the application's responsability to free + * shall not make any internal copies of the message and it is the application's responsibility to free * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory * corruptions. */ @@ -185,8 +185,9 @@ public: /** * \brief Produces a message asynchronously without buffering it * - * The message will still be tracked so that a call to flush or wait_for_acks will actually - * wait for it to be acknowledged. + * The message will still be tracked so that a call to flush or + * wait_for_acks/wait_for_current_thread_acks will actually wait for it + * to be acknowledged. * * \param builder The builder that contains the message to be produced * @@ -206,11 +207,23 @@ public: */ void sync_produce(const MessageBuilder& builder); + /** + * \brief Same as sync_produce but waits up to 'timeout' for acks to be received. + * + * If retries are enabled, the timeout will limit the amount of time to wait + * before all retries are completed. + * + * \returns True if succeeded, false otherwise. If retries are enabled, false + * indicates there are still retries left. + */ + bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); + /** * \brief Produces a message asynchronously without buffering it * - * The message will still be tracked so that a call to flush or wait_for_acks will actually - * wait for it to be acknowledged. + * The message will still be tracked so that a call to flush or + * wait_for_acks/wait_for_current_thread_acks will actually wait for it + * to be acknowledged. * * \param message The message to be produced * @@ -230,7 +243,7 @@ public: * \brief Flushes the buffered messages. * * This will send all messages and keep waiting until all of them are acknowledged (this is - * done by calling wait_for_acks). + * done by calling wait_for_acks/wait_for_current_thread_acks). * * \param preserve_order If set to True, each message in the queue will be flushed only when the previous * message ack is received. This may result in performance degradation as messages @@ -256,16 +269,30 @@ public: bool flush(std::chrono::milliseconds timeout, bool preserve_order = false); /** - * Waits for produced message's acknowledgements from the brokers + * \brief Waits for produced message's acknowledgements from the brokers */ void wait_for_acks(); /** - * Waits for produced message's acknowledgements from the brokers up to 'timeout'. + * \brief Waits for acknowledgements from brokers for messages produced + * on the current thread only + */ + void wait_for_current_thread_acks(); + + /** + * \brief Waits for produced message's acknowledgements from the brokers up to 'timeout'. * * \return True if the operation completes and all acks have been received. */ bool wait_for_acks(std::chrono::milliseconds timeout); + + /** + * \brief Waits for acknowledgements from brokers for messages produced + * on the current thread only. Times out after 'timeout' milliseconds. + * + * \return True if the operation completes and all acks have been received. + */ + bool wait_for_current_thread_acks(std::chrono::milliseconds timeout); /** * Clears any buffered messages @@ -316,12 +343,20 @@ public: FlushMethod get_flush_method() const; /** - * \brief Get the number of messages not yet acked by the broker + * \brief Get the number of messages not yet acked by the broker. * * \return The number of messages */ size_t get_pending_acks() const; + /** + * \brief Get the number of pending acks for messages produces on the + * current thread only. + * + * \return The number of messages + */ + size_t get_current_thread_pending_acks() const; + /** * \brief Get the total number of messages successfully produced since the beginning * @@ -347,9 +382,10 @@ public: size_t get_flushes_in_progress() const; /** - * \brief Sets the maximum number of retries per message until giving up + * \brief Sets the maximum number of retries per message until giving up. Default is 5. * - * Default is 5 + * \remark Is it recommended to set the RdKafka option message.send.max.retries=0 + * to prevent re-ordering of messages inside RdKafka. */ void set_max_number_retries(size_t max_number_retries); @@ -481,42 +517,173 @@ protected: return nullptr; } #endif - + private: enum class SenderType { Sync, Async }; - enum class QueueKind { Retry, Regular }; + enum class QueueKind { Retry, Produce }; enum class FlushAction { DontFlush, DoFlush }; + enum class Threads { All, Current }; + // Simple RAII type which increments a counter on construction and + // decrements it on destruction, meant to be used as reference counting. template struct CounterGuard{ - CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } + CounterGuard(std::atomic& counter) + : counter_(counter) { + ++counter_; + } ~CounterGuard() { --counter_; } std::atomic& counter_; }; + // If the application enables retry logic, this object is passed + // as internal (opaque) data with each message, so that it can keep + // track of each failed attempt. Only a single tracker will be + // instantiated and it's lifetime will be the same as the message it + // belongs to. struct Tracker : public Internal { Tracker(SenderType sender, size_t num_retries) - : sender_(sender), num_retries_(num_retries) - {} - std::future get_new_future() { - should_retry_ = std::promise(); //reset shared data - return should_retry_.get_future(); //issue new future + : sender_(sender), + num_retries_(num_retries) { } + // Creates a new promise for synchronizing with the + // on_delivery_report() callback. For synchronous producers only. + void prepare_to_retry() { + if (sender_ == SenderType::Sync) { + retry_promise_ = std::promise(); + } + } + // Waits for the on_delivery_report() callback and determines if this message + // should be retried. This call will block until on_delivery_report() executes. + // For synchronous producers only. + bool retry_again() { + if (sender_ == SenderType::Sync) { + return retry_promise_.get_future().get(); + } + return false; + } + // Signal the synchronous producer if the message should be retried or not. + // Called from inside on_delivery_report(). For synchronous producers only. + void should_retry(bool value) const { + if (sender_ == SenderType::Sync) { + try { + retry_promise_.set_value(value); + } + catch (const std::future_error&) { + //Promise has already been set once. + } + } + } + void set_sender_type(SenderType type) { + sender_ = type; + } + SenderType get_sender_type() const { + return sender_; + } + bool has_retries_left() const { + return num_retries_ > 0; + } + void decrement_retries() { + if (num_retries_ > 0) { + --num_retries_; + } + } + private: SenderType sender_; - std::promise should_retry_; + mutable std::promise retry_promise_; size_t num_retries_; }; using TrackerPtr = std::shared_ptr; + // The AckMonitor is responsible for properly counting the + // outstanding unacknowledged messages for each thread as well + // as the total acks. Counting acks on a per-thread basis is + // critical in a multi-threaded producer since we don't want one + // producer having to wait for all concurrent pending acks. Each + // producer should only wait for his own acks. + struct AckMonitor + { + // Increments the number of sent acks + void increment_pending_acks() { + while (!flag_.test_and_set()) { + //save the last ack number for this thread so we only + //wait up to this number. + last_ack_[std::this_thread::get_id()] = ++sent_acks_; + flag_.clear(); + break; + } + } + // Increments the number of received acks, + // reducing the total pending acks. + void decrement_pending_acks() { + while (!flag_.test_and_set()) { + ++received_acks_; + flag_.clear(); + break; + } + } + // Returns true if there are any pending acks overall. + bool has_pending_acks() const { + return get_pending_acks() > 0; + } + // Returns true if there are any pending acks on this thread. + bool has_current_thread_pending_acks() const { + return get_current_thread_pending_acks() > 0; + } + // Returns total pending acks. This is the difference between + // total produced and total received. + ssize_t get_pending_acks() const { + ssize_t rc = 0; + while (!flag_.test_and_set()) { + rc = get_pending_acks_impl(); + flag_.clear(); + break; + } + return rc; + } + // Returns the total pending acks for this thread + ssize_t get_current_thread_pending_acks() const { + ssize_t rc = 0; + while (!flag_.test_and_set()) { + rc = get_current_thread_pending_acks_impl(); + flag_.clear(); + break; + } + return rc; + } + private: + ssize_t get_pending_acks_impl() const { + return (sent_acks_ - received_acks_); + } + ssize_t get_current_thread_pending_acks_impl() const { + auto it = last_ack_.find(std::this_thread::get_id()); + if (it != last_ack_.end()) { + return (it->second > received_acks_) ? it->second - received_acks_ : 0; + } + return 0; + } + mutable std::atomic_flag flag_{0}; + ssize_t sent_acks_{0}; + ssize_t received_acks_{0}; + std::map last_ack_; //last ack number expected for this thread + }; + // Returns existing tracker or creates new one template TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { - if (has_internal_data_) { + if (enable_message_retries_) { if (!builder.internal()) { // Add message tracker only if it hasn't been added before builder.internal(std::make_shared(sender, max_number_retries_)); + return std::static_pointer_cast(builder.internal()); } - return std::static_pointer_cast(builder.internal()); + // Return existing tracker + TrackerPtr tracker = std::static_pointer_cast(builder.internal()); + // Update the sender type. Since a message could have been initially produced + // asynchronously but then flushed synchronously (or vice-versa), the sender + // type should always reflect the latest retry mechanism. + tracker->set_sender_type(sender); + return tracker; } return nullptr; } @@ -529,7 +696,12 @@ private: template void async_produce(BuilderType&& message, bool throw_on_error); static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); + bool wait_for_acks_impl(Threads threads, std::chrono::milliseconds timeout); + // Static members + static const std::chrono::milliseconds infinite_timeout; + static const std::chrono::milliseconds no_timeout; + // Members Producer producer_; QueueType messages_; @@ -544,18 +716,26 @@ private: QueueFullCallback queue_full_callback_; ssize_t max_buffer_size_{-1}; FlushMethod flush_method_{FlushMethod::Sync}; - std::atomic pending_acks_{0}; + AckMonitor ack_monitor_; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; std::atomic total_messages_dropped_{0}; int max_number_retries_{0}; - bool has_internal_data_{false}; + bool enable_message_retries_{false}; QueueFullNotification queue_full_notification_{QueueFullNotification::None}; #ifdef KAFKA_TEST_INSTANCE TestParameters* test_params_; #endif }; +// Full blocking wait as per RdKafka +template +const std::chrono::milliseconds +BufferedProducer::infinite_timeout = std::chrono::milliseconds(-1); +template +const std::chrono::milliseconds +BufferedProducer::no_timeout = std::chrono::milliseconds::zero(); + template Producer::PayloadPolicy get_default_payload_policy() { return Producer::PayloadPolicy::COPY_PAYLOAD; @@ -586,12 +766,16 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); - do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); + //post message unto the producer queue + do_add_message(move(builder), QueueKind::Produce, FlushAction::DoFlush); } template void BufferedProducer::produce(const MessageBuilder& builder) { - if (has_internal_data_) { + if (enable_message_retries_) { + //Adding a retry tracker requires copying the builder since + //we cannot modify the original instance. Cloning is a fast operation + //since the MessageBuilder class holds pointers to data only. MessageBuilder builder_clone(builder.clone()); add_tracker(SenderType::Async, builder_clone); async_produce(builder_clone, true); @@ -603,22 +787,38 @@ void BufferedProducer::produce(const MessageBuilder& buil template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - if (has_internal_data_) { + sync_produce(builder, infinite_timeout); +} + +template +bool BufferedProducer::sync_produce(const MessageBuilder& builder, + std::chrono::milliseconds timeout) { + if (enable_message_retries_) { + //Adding a retry tracker requires copying the builder since + //we cannot modify the original instance. Cloning is a fast operation + //since the MessageBuilder class holds pointers to data only. MessageBuilder builder_clone(builder.clone()); TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); // produce until we succeed or we reach max retry limit - std::future should_retry; + auto endTime = std::chrono::steady_clock::now() + timeout; do { - should_retry = tracker->get_new_future(); + tracker->prepare_to_retry(); produce_message(builder_clone); - wait_for_acks(); + //Wait w/o timeout since we must get the ack to avoid a race condition. + //Otherwise retry_again() will block as the producer won't get flushed + //and the delivery callback will never be invoked. + wait_for_current_thread_acks(); } - while (should_retry.get()); + while (tracker->retry_again() && + ((timeout == infinite_timeout) || + (std::chrono::steady_clock::now() >= endTime))); + return !tracker->has_retries_left(); } else { // produce once produce_message(builder); - wait_for_acks(); + wait_for_current_thread_acks(timeout); + return !ack_monitor_.has_current_thread_pending_acks(); } } @@ -629,117 +829,80 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { - CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void - { - QueueType flush_queue; // flush from temporary queue - swap_queues(queue, flush_queue, mutex); - - while (!flush_queue.empty()) { - async_produce(std::move(flush_queue.front()), false); - flush_queue.pop_front(); - } - }; - queue_flusher(retry_messages_, retry_mutex_); - queue_flusher(messages_, mutex_); - wait_for_acks(std::chrono::milliseconds(0)); //flush the producer but don't wait + flush(no_timeout, false); } template void BufferedProducer::flush(bool preserve_order) { - if (preserve_order) { - CounterGuard counter_guard(flushes_in_progress_); - auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void - { - QueueType flush_queue; // flush from temporary queue - swap_queues(queue, flush_queue, mutex); - - while (!flush_queue.empty()) { - sync_produce(flush_queue.front()); - flush_queue.pop_front(); - } - }; - queue_flusher(retry_messages_, retry_mutex_); - queue_flusher(messages_, mutex_); - } - else { - async_flush(); - wait_for_acks(); - } + flush(infinite_timeout, preserve_order); } template bool BufferedProducer::flush(std::chrono::milliseconds timeout, bool preserve_order) { - if (preserve_order) { - CounterGuard counter_guard(flushes_in_progress_); + CounterGuard counter_guard(flushes_in_progress_); + auto queue_flusher = [timeout, preserve_order, this] + (QueueType& queue, std::mutex & mutex)->void + { QueueType flush_queue; // flush from temporary queue - swap_queues(messages_, flush_queue, mutex_); - QueueType retry_flush_queue; // flush from temporary retry queue - swap_queues(retry_messages_, retry_flush_queue, retry_mutex_); - - auto queue_flusher = [this](QueueType& queue)->bool - { - if (!queue.empty()) { - sync_produce(queue.front()); - queue.pop_front(); - return true; + swap_queues(queue, flush_queue, mutex); + //Produce one message at a time and wait for acks until queue is empty + while (!flush_queue.empty()) { + if (preserve_order) { + //When preserving order, we must ensure that each message + //gets delivered before producing the next one. + sync_produce(flush_queue.front(), timeout); } - return false; - }; - auto remaining = timeout; - auto start_time = std::chrono::high_resolution_clock::now(); - do { - if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) { - break; + else { + //Produce as fast as possible w/o waiting. If one or more + //messages fail, they will be re-enqueued for retry + //on the next flush cycle, which causes re-ordering. + async_produce(flush_queue.front(), false); } - // calculate remaining time - remaining = timeout - std::chrono::duration_cast - (std::chrono::high_resolution_clock::now() - start_time); - } while (remaining.count() > 0); - - // Re-enqueue remaining messages in original order - auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void - { - if (!src_queue.empty()) { - std::lock_guard lock(mutex); - dst_queue.insert(dst_queue.begin(), - std::make_move_iterator(src_queue.begin()), - std::make_move_iterator(src_queue.end())); - } - }; - re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); - re_enqueuer(flush_queue, messages_, mutex_); - return true; - } - else { - async_flush(); - return wait_for_acks(timeout); + flush_queue.pop_front(); + } + }; + //Produce retry queue first since these messages were produced first. + queue_flusher(retry_messages_, retry_mutex_); + //Produce recently enqueued messages + queue_flusher(messages_, mutex_); + if (!preserve_order) { + //Wait for acks from the messages produced above via async_produce + wait_for_current_thread_acks(timeout); } + return !ack_monitor_.has_current_thread_pending_acks(); } template void BufferedProducer::wait_for_acks() { - while (pending_acks_ > 0) { - try { - producer_.flush(); - } - catch (const HandleException& ex) { - // If we just hit the timeout, keep going, otherwise re-throw - if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { - continue; - } - else { - throw; - } - } - } + //block until all acks have been received + wait_for_acks_impl(Threads::All, infinite_timeout); +} + +template +void BufferedProducer::wait_for_current_thread_acks() { + //block until all acks from the current thread have been received + wait_for_acks_impl(Threads::Current, infinite_timeout); } template bool BufferedProducer::wait_for_acks(std::chrono::milliseconds timeout) { + //block until all acks have been received + return wait_for_acks_impl(Threads::All, timeout); +} + +template +bool BufferedProducer::wait_for_current_thread_acks(std::chrono::milliseconds timeout) { + //block until all acks from the current thread have been received + return wait_for_acks_impl(Threads::Current, timeout); +} + +template +bool BufferedProducer::wait_for_acks_impl(Threads threads, + std::chrono::milliseconds timeout) { auto remaining = timeout; auto start_time = std::chrono::high_resolution_clock::now(); + bool pending_acks = true; do { try { producer_.flush(remaining); @@ -748,7 +911,10 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // If we just hit the timeout, keep going, otherwise re-throw if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) { // There is no time remaining - return (pending_acks_ == 0); + pending_acks = (threads == Threads::All) ? + ack_monitor_.has_pending_acks() : + ack_monitor_.has_current_thread_pending_acks(); + return !pending_acks; } else { throw; @@ -757,8 +923,11 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while ((pending_acks_ > 0) && (remaining.count() > 0)); - return (pending_acks_ == 0); + pending_acks = (threads == Threads::All) ? + ack_monitor_.has_pending_acks() : + ack_monitor_.has_current_thread_pending_acks(); + } while (pending_acks && ((remaining.count() > 0) || (timeout == infinite_timeout))); + return !pending_acks; } template @@ -820,10 +989,12 @@ void BufferedProducer::do_add_message(BuilderType&& build std::lock_guard lock(mutex_); messages_.emplace_back(std::forward(builder)); } - - // Flush the queues only if a regular message is added. Retry messages may be added - // from rdkafka callbacks, and flush/async_flush is a user-level call - if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { + // Flush the queues only if a produced message is added. Retry messages may be added + // from on_delivery_report() during which flush()/async_flush() cannot be called. + if (queue_kind == QueueKind::Produce && + flush_action == FlushAction::DoFlush && + (max_buffer_size_ >= 0) && + (max_buffer_size_ <= (ssize_t)get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -845,7 +1016,12 @@ const Producer& BufferedProducer::get_producer() const { template size_t BufferedProducer::get_pending_acks() const { - return pending_acks_; + return ack_monitor_.get_pending_acks(); +} + +template +size_t BufferedProducer::get_current_thread_pending_acks() const { + return ack_monitor_.get_current_thread_pending_acks(); } template @@ -865,8 +1041,8 @@ size_t BufferedProducer::get_flushes_in_progress() const template void BufferedProducer::set_max_number_retries(size_t max_number_retries) { - if (!has_internal_data_ && (max_number_retries > 0)) { - has_internal_data_ = true; //enable once + if (!enable_message_retries_ && (max_number_retries > 0)) { + enable_message_retries_ = true; //enable once } max_number_retries_ = max_number_retries; } @@ -934,7 +1110,7 @@ void BufferedProducer::produce_message(BuilderType&& buil producer_.produce(builder); internal_guard.release(); // Sent successfully - ++pending_acks_; + ack_monitor_.increment_pending_acks(); break; } catch (const HandleException& ex) { @@ -969,8 +1145,11 @@ void BufferedProducer::async_produce(BuilderType&& builde CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); if (!callback || callback(builder, ex.get_error())) { TrackerPtr tracker = std::static_pointer_cast(builder.internal()); - if (tracker && tracker->num_retries_ > 0) { - --tracker->num_retries_; + if (tracker && tracker->has_retries_left()) { + tracker->decrement_retries(); + //Post message unto the retry queue. This queue has higher priority and will be + //flushed before the producer queue to preserve original message order. + //We don't flush now since we just had an error while producing. do_add_message(std::forward(builder), QueueKind::Retry, FlushAction::DontFlush); return; } @@ -995,24 +1174,30 @@ Configuration BufferedProducer::prepare_configuration(Con template void BufferedProducer::on_delivery_report(const Message& message) { - //Get tracker data TestParameters* test_params = get_test_parameters(); - TrackerPtr tracker = has_internal_data_ ? - std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : nullptr; - bool should_retry = false; + //Get tracker if present + TrackerPtr tracker = + enable_message_retries_ ? + std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : + nullptr; + bool retry = false; if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback - // or we have one but it returns true + // or we have one but it returns true (indicating error is re-tryable) CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); if (!callback || callback(message)) { // Check if we have reached the maximum retry limit - if (tracker && tracker->num_retries_ > 0) { - --tracker->num_retries_; - if (tracker->sender_ == SenderType::Async) { - // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + if (tracker && tracker->has_retries_left()) { + tracker->decrement_retries(); + //If the sender is asynchronous, the message is re-enqueued. If the sender is + //synchronous, we simply notify via Tracker::should_retry() below. + if (tracker->get_sender_type() == SenderType::Async) { + //Post message unto the retry queue. This queue has higher priority and will be + //flushed later by the application (before the producer queue) to preserve original message order. + //We prevent flushing now since we are within a callback context. do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); } - should_retry = true; + retry = true; } else { ++total_messages_dropped_; @@ -1032,23 +1217,18 @@ void BufferedProducer::on_delivery_report(const Message& // Increment the total successful transmissions ++total_messages_produced_; } - // Signal producers + // Signal synchronous sender and unblock it since it's waiting for this ack to arrive. if (tracker) { - try { - tracker->should_retry_.set_value(should_retry); - } - catch (const std::future_error& ex) { - //This is an async retry and future is not being read - } + tracker->should_retry(retry); } // Decrement the expected acks and check to prevent underflow - if (pending_acks_ > 0) { - --pending_acks_; - } + ack_monitor_.decrement_pending_acks(); } template -void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, BufferedProducer::QueueType & queue2, std::mutex & mutex) +void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, + BufferedProducer::QueueType & queue2, + std::mutex & mutex) { std::lock_guard lock(mutex); std::swap(queue1, queue2); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index fd3b424..46447b4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,12 +1,34 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) include_directories(SYSTEM ${CATCH_INCLUDE}) -set(KAFKA_TEST_INSTANCE "kafka-vm:9092" +if (NOT KAFKA_TEST_INSTANCE) +set(KAFKA_TEST_INSTANCE kafka-vm:9092 CACHE STRING "The kafka instance to which to connect to run tests") +endif() +if (NOT KAFKA_NUM_PARTITIONS) + set(KAFKA_NUM_PARTITIONS 3 CACHE STRING "Kafka Number of partitions") +endif() +if (NOT KAFKA_TOPICS) + set(KAFKA_TOPICS "cppkafka_test1;cppkafka_test2" CACHE STRING "Kafka topics") +endif() + +# Convert list of topics into a C++ initializer list +FOREACH(TOPIC ${KAFKA_TOPICS}) + if (NOT TOPIC_LIST) + set(TOPIC_LIST "\"${TOPIC}\"") + else() + set(TOPIC_LIST "${TOPIC_LIST},\"${TOPIC}\"") + endif() +ENDFOREACH() + add_custom_target(tests) include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") +add_definitions( + "-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"" + -DKAFKA_NUM_PARTITIONS=${KAFKA_NUM_PARTITIONS} + -DKAFKA_TOPIC_NAMES=${TOPIC_LIST} +) add_executable(cppkafka_tests buffer_test.cpp @@ -25,6 +47,6 @@ add_executable(cppkafka_tests ) # In CMake >= 3.15 Boost::boost == Boost::headers -target_link_libraries(cppkafka_tests cppkafka RdKafka::rdkafka Boost::boost Boost::program_options ) +target_link_libraries(cppkafka_tests cppkafka RdKafka::rdkafka Boost::boost Boost::program_options) add_dependencies(tests cppkafka_tests) add_test(cppkafka cppkafka_tests) diff --git a/tests/test_main.cpp b/tests/test_main.cpp index 5c41d07..f46d0c2 100644 --- a/tests/test_main.cpp +++ b/tests/test_main.cpp @@ -15,8 +15,7 @@ using Catch::TestCaseStats; using Catch::Totals; using Catch::Session; -std::vector KAFKA_TOPICS = {"cppkafka_test1", "cppkafka_test2"}; -int KAFKA_NUM_PARTITIONS = 3; +std::vector KAFKA_TOPICS = {KAFKA_TOPIC_NAMES}; namespace cppkafka { diff --git a/tests/test_utils.h b/tests/test_utils.h index 030e1c0..f786d4f 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -9,7 +9,6 @@ #include "cppkafka/utils/consumer_dispatcher.h" extern const std::vector KAFKA_TOPICS; -extern const int KAFKA_NUM_PARTITIONS; using namespace cppkafka;