diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 1905345..3b2ae8e 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -208,8 +208,14 @@ public: /** * \brief Same as sync_produce but waits up to 'timeout' for acks to be received. + * + * If retries are enabled, the timeout will limit the amount of time to wait + * before all retries are completed. + * + * \returns True if succeeded, false otherwise. If retries are enabled, false + * indicates there are still retries left. */ - void sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); + bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout); /** * \brief Produces a message asynchronously without buffering it @@ -592,6 +598,10 @@ private: void async_produce(BuilderType&& message, bool throw_on_error); static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex); + // Static members + static const std::chrono::milliseconds infinite_timeout; + static const std::chrono::milliseconds no_timeout; + // Members Producer producer_; QueueType messages_; @@ -618,6 +628,14 @@ private: #endif }; +// Full blocking wait as per RdKafka +template +const std::chrono::milliseconds +BufferedProducer::infinite_timeout = std::chrono::milliseconds(-1); +template +const std::chrono::milliseconds +BufferedProducer::no_timeout = std::chrono::milliseconds::zero(); + template Producer::PayloadPolicy get_default_payload_policy() { return Producer::PayloadPolicy::COPY_PAYLOAD; @@ -669,11 +687,11 @@ void BufferedProducer::produce(const MessageBuilder& buil template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - sync_produce(builder, producer_.get_timeout()); + sync_produce(builder, infinite_timeout); } template -void BufferedProducer::sync_produce(const MessageBuilder& builder, +bool BufferedProducer::sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout) { if (enable_message_retries_) { //Adding a retry tracker requires copying the builder since @@ -682,17 +700,25 @@ void BufferedProducer::sync_produce(const MessageBuilder& MessageBuilder builder_clone(builder.clone()); TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); // produce until we succeed or we reach max retry limit + auto endTime = std::chrono::steady_clock::now() + timeout; do { tracker->prepare_to_retry(); produce_message(builder_clone); - wait_for_acks(timeout); + //Wait w/o timeout since we must get the ack to avoid a race condition. + //Otherwise retry_again() will block as the producer won't get flushed + //and the delivery callback will never be invoked. + wait_for_acks(); } - while (tracker->retry_again()); + while (tracker->retry_again() && + ((timeout == infinite_timeout) || + (std::chrono::steady_clock::now() >= endTime))); + return !tracker->has_retries_left(); } else { // produce once produce_message(builder); wait_for_acks(timeout); + return (pending_acks_ == 0); } } @@ -703,12 +729,12 @@ void BufferedProducer::produce(const Message& message) { template void BufferedProducer::async_flush() { - flush(std::chrono::milliseconds::zero(), false); + flush(no_timeout, false); } template void BufferedProducer::flush(bool preserve_order) { - flush(producer_.get_timeout(), preserve_order); + flush(infinite_timeout, preserve_order); } template @@ -749,7 +775,8 @@ bool BufferedProducer::flush(std::chrono::milliseconds ti template void BufferedProducer::wait_for_acks() { - wait_for_acks(producer_.get_timeout()); + //block until all acks have been received + wait_for_acks(infinite_timeout); } template @@ -773,7 +800,8 @@ bool BufferedProducer::wait_for_acks(std::chrono::millise // calculate remaining time remaining = timeout - std::chrono::duration_cast (std::chrono::high_resolution_clock::now() - start_time); - } while ((pending_acks_ > 0) && (remaining.count() > 0)); + } while ((pending_acks_ > 0) && + ((remaining.count() > 0) || (timeout == infinite_timeout))); return (pending_acks_ == 0); }