From bda2f4156d5b9ba4f034fafef537e759ca0dee48 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Mon, 3 Feb 2020 16:46:28 -0500 Subject: [PATCH] Fix tracker promise from throwing when set multiple times --- include/cppkafka/utils/buffered_producer.h | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 1fec288..8a65b03 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -771,7 +771,16 @@ void BufferedProducer::clear() { template size_t BufferedProducer::get_buffer_size() const { - return messages_.size() + retry_messages_.size(); + int size = 0; + { + std::lock_guard lock(mutex_); + size += messages_.size(); + } + { + std::lock_guard lock(retry_mutex_); + size += retry_messages_.size(); + } + return size; } template @@ -1025,7 +1034,12 @@ void BufferedProducer::on_delivery_report(const Message& } // Signal producers if (tracker) { - tracker->should_retry_.set_value(should_retry); + try { + tracker->should_retry_.set_value(should_retry); + } + catch (const std::future_error& ex) { + //This is an async retry and future is not being read + } } // Decrement the expected acks and check to prevent underflow if (pending_acks_ > 0) {