diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 147f582..678df0d 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -79,7 +79,7 @@ namespace cppkafka { * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka - * shall not make any internal copies of the message and it is the application's responsability to free + * shall not make any internal copies of the message and it is the application's responsibility to free * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory * corruptions. */ @@ -487,23 +487,72 @@ private: enum class QueueKind { Retry, Regular }; enum class FlushAction { DontFlush, DoFlush }; + // Simple RAII type which increments a counter on construction and + // decrements it on destruction, meant to be used as reference counting. template struct CounterGuard{ - CounterGuard(std::atomic& counter) : counter_(counter) { ++counter_; } + CounterGuard(std::atomic& counter) + : counter_(counter) { + ++counter_; + } ~CounterGuard() { --counter_; } std::atomic& counter_; }; + // If the application enables retry logic, this object is passed + // as internal (opaque) data with each message, so that it can keep + // track of each failed attempt. Only a single tracker will be + // instantiated and it's lifetime will be the same as the message it + // belongs to. struct Tracker : public Internal { Tracker(SenderType sender, size_t num_retries) - : sender_(sender), num_retries_(num_retries) - {} - std::future get_new_future() { - should_retry_ = std::promise(); //reset shared data - return should_retry_.get_future(); //issue new future + : sender_(sender), + num_retries_(num_retries) { } + // Creates a new promise for synchronizing with the + // on_delivery_report() callback. For synchronous producers only. + void prepare_to_retry() { + if (sender_ == SenderType::Sync) { + retry_promise_ = std::promise(); + } + } + // Waits for the on_delivery_report() callback and determines if this message + // should be retried. This call will block until on_delivery_report() executes. + // For synchronous producers only. + bool retry_again() { + if (sender_ == SenderType::Sync) { + return retry_promise_.get_future().get(); + } + } + // Signal the synchronous producer if the message should be retried or not. + // Called from inside on_delivery_report(). For synchronous producers only. + void should_retry(bool value) const { + if (sender_ == SenderType::Sync) { + try { + retry_promise_.set_value(value); + } + catch (const std::future_error&) { + //Promise has already been set once. + } + } + } + void set_sender_type(SenderType type) { + sender_ = type; + } + SenderType get_sender_type() const { + return sender_; + } + bool has_retries_left() const { + return num_retries_ > 0; + } + void decrement_retries() { + if (num_retries_ > 0) { + --num_retries_; + } + } + private: SenderType sender_; - std::promise should_retry_; + mutable std::promise retry_promise_; size_t num_retries_; }; using TrackerPtr = std::shared_ptr; @@ -511,12 +560,19 @@ private: // Returns existing tracker or creates new one template TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { - if (has_internal_data_) { + if (enable_message_retries_) { if (!builder.internal()) { // Add message tracker only if it hasn't been added before builder.internal(std::make_shared(sender, max_number_retries_)); + return std::static_pointer_cast(builder.internal()); } - return std::static_pointer_cast(builder.internal()); + // Return existing tracker + TrackerPtr tracker = std::static_pointer_cast(builder.internal()); + // Update the sender type. Since a message could have been initially produced + // asynchronously but then flushed synchronously (or vice-versa), the sender + // type should always reflect the latest retry mechanism. + tracker->set_sender_type(sender); + return tracker; } return nullptr; } @@ -549,7 +605,7 @@ private: std::atomic total_messages_produced_{0}; std::atomic total_messages_dropped_{0}; int max_number_retries_{0}; - bool has_internal_data_{false}; + bool enable_message_retries_{false}; QueueFullNotification queue_full_notification_{QueueFullNotification::None}; #ifdef KAFKA_TEST_INSTANCE TestParameters* test_params_; @@ -586,12 +642,16 @@ void BufferedProducer::add_message(const MessageBuilder& template void BufferedProducer::add_message(Builder builder) { add_tracker(SenderType::Async, builder); + //post message unto the producer queue do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); } template void BufferedProducer::produce(const MessageBuilder& builder) { - if (has_internal_data_) { + if (enable_message_retries_) { + //Adding a retry tracker requires copying the builder since + //we cannot modify the original instance. Cloning is a fast operation + //since the MessageBuilder class holds pointers to data only. MessageBuilder builder_clone(builder.clone()); add_tracker(SenderType::Async, builder_clone); async_produce(builder_clone, true); @@ -603,17 +663,19 @@ void BufferedProducer::produce(const MessageBuilder& buil template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - if (has_internal_data_) { + if (enable_message_retries_) { + //Adding a retry tracker requires copying the builder since + //we cannot modify the original instance. Cloning is a fast operation + //since the MessageBuilder class holds pointers to data only. MessageBuilder builder_clone(builder.clone()); TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); // produce until we succeed or we reach max retry limit - std::future should_retry; do { - should_retry = tracker->get_new_future(); + tracker->prepare_to_retry(); produce_message(builder_clone); wait_for_acks(); } - while (should_retry.get()); + while (tracker->retry_again()); } else { // produce once @@ -640,9 +702,14 @@ void BufferedProducer::async_flush() { flush_queue.pop_front(); } }; + //Produce retry queue first since these messages were produced first queue_flusher(retry_messages_, retry_mutex_); + //Produce recently enqueued messages queue_flusher(messages_, mutex_); - wait_for_acks(std::chrono::milliseconds(0)); //flush the producer but don't wait + // Flush the producer but don't wait. It is necessary to poll + // the producer at least once during this operation because + // async_produce() will not. + wait_for_acks(std::chrono::milliseconds(0)); } template @@ -653,16 +720,19 @@ void BufferedProducer::flush(bool preserve_order) { { QueueType flush_queue; // flush from temporary queue swap_queues(queue, flush_queue, mutex); - + //Produce one message at a time and wait for acks until queue is empty while (!flush_queue.empty()) { sync_produce(flush_queue.front()); flush_queue.pop_front(); } }; + //Produce retry queue first since these messages were produced first queue_flusher(retry_messages_, retry_mutex_); + //Produce recently enqueued messages queue_flusher(messages_, mutex_); } else { + //Produce all messages at once then wait for acks. async_flush(); wait_for_acks(); } @@ -680,6 +750,7 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti auto queue_flusher = [this](QueueType& queue)->bool { + //Produce one message at a time and wait for acks if (!queue.empty()) { sync_produce(queue.front()); queue.pop_front(); @@ -698,19 +769,21 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti (std::chrono::high_resolution_clock::now() - start_time); } while (remaining.count() > 0); - // Re-enqueue remaining messages in original order - auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void + // When timeout has expired, any remaining messages must be re-enqueue in their + // original order so they can be flushed later. + auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool { if (!src_queue.empty()) { std::lock_guard lock(mutex); dst_queue.insert(dst_queue.begin(), std::make_move_iterator(src_queue.begin()), std::make_move_iterator(src_queue.end())); + return true; } + return false; }; - re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_); - re_enqueuer(flush_queue, messages_, mutex_); - return true; + return !re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_) && + !re_enqueuer(flush_queue, messages_, mutex_); } else { async_flush(); @@ -820,10 +893,12 @@ void BufferedProducer::do_add_message(BuilderType&& build std::lock_guard lock(mutex_); messages_.emplace_back(std::forward(builder)); } - // Flush the queues only if a regular message is added. Retry messages may be added - // from rdkafka callbacks, and flush/async_flush is a user-level call - if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { + // from on_delivery_report() during which flush()/async_flush() cannot be called. + if (queue_kind == QueueKind::Regular && + flush_action == FlushAction::DoFlush && + (max_buffer_size_ >= 0) && + (max_buffer_size_ <= (ssize_t)get_buffer_size())) { if (flush_method_ == FlushMethod::Sync) { flush(); } @@ -865,8 +940,8 @@ size_t BufferedProducer::get_flushes_in_progress() const template void BufferedProducer::set_max_number_retries(size_t max_number_retries) { - if (!has_internal_data_ && (max_number_retries > 0)) { - has_internal_data_ = true; //enable once + if (!enable_message_retries_ && (max_number_retries > 0)) { + enable_message_retries_ = true; //enable once } max_number_retries_ = max_number_retries; } @@ -969,8 +1044,11 @@ void BufferedProducer::async_produce(BuilderType&& builde CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); if (!callback || callback(builder, ex.get_error())) { TrackerPtr tracker = std::static_pointer_cast(builder.internal()); - if (tracker && tracker->num_retries_ > 0) { - --tracker->num_retries_; + if (tracker && tracker->has_retries_left()) { + tracker->decrement_retries(); + //Post message unto the retry queue. This queue has higher priority and will be + //flushed before the producer queue to preserve original message order. + //We don't flush now since we just had an error while producing. do_add_message(std::forward(builder), QueueKind::Retry, FlushAction::DontFlush); return; } @@ -995,24 +1073,30 @@ Configuration BufferedProducer::prepare_configuration(Con template void BufferedProducer::on_delivery_report(const Message& message) { - //Get tracker data TestParameters* test_params = get_test_parameters(); - TrackerPtr tracker = has_internal_data_ ? - std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : nullptr; - bool should_retry = false; + //Get tracker if present + TrackerPtr tracker = + enable_message_retries_ ? + std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : + nullptr; + bool retry = false; if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback - // or we have one but it returns true + // or we have one but it returns true (indicating error is re-tryable) CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); if (!callback || callback(message)) { // Check if we have reached the maximum retry limit - if (tracker && tracker->num_retries_ > 0) { - --tracker->num_retries_; - if (tracker->sender_ == SenderType::Async) { - // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + if (tracker && tracker->has_retries_left()) { + tracker->decrement_retries(); + //If the sender is asynchronous, the message is re-enqueued. If the sender is + //synchronous, we simply notify via Tracker::should_retry() below. + if (tracker->get_sender_type() == SenderType::Async) { + //Post message unto the retry queue. This queue has higher priority and will be + //flushed later by the application (before the producer queue) to preserve original message order. + //We prevent flushing now since we are within a callback context. do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); } - should_retry = true; + retry = true; } else { ++total_messages_dropped_; @@ -1032,14 +1116,9 @@ void BufferedProducer::on_delivery_report(const Message& // Increment the total successful transmissions ++total_messages_produced_; } - // Signal producers + // Signal synchronous sender and unblock it since it's waiting for this ack to arrive. if (tracker) { - try { - tracker->should_retry_.set_value(should_retry); - } - catch (const std::future_error& ex) { - //This is an async retry and future is not being read - } + tracker->should_retry(retry); } // Decrement the expected acks and check to prevent underflow if (pending_acks_ > 0) { @@ -1048,7 +1127,9 @@ void BufferedProducer::on_delivery_report(const Message& } template -void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, BufferedProducer::QueueType & queue2, std::mutex & mutex) +void BufferedProducer::swap_queues(BufferedProducer::QueueType & queue1, + BufferedProducer::QueueType & queue2, + std::mutex & mutex) { std::lock_guard lock(mutex); std::swap(queue1, queue2);