From bda2f4156d5b9ba4f034fafef537e759ca0dee48 Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Mon, 3 Feb 2020 16:46:28 -0500 Subject: [PATCH 1/2] Fix tracker promise from throwing when set multiple times --- include/cppkafka/utils/buffered_producer.h | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 1fec288..8a65b03 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -771,7 +771,16 @@ void BufferedProducer::clear() { template size_t BufferedProducer::get_buffer_size() const { - return messages_.size() + retry_messages_.size(); + int size = 0; + { + std::lock_guard lock(mutex_); + size += messages_.size(); + } + { + std::lock_guard lock(retry_mutex_); + size += retry_messages_.size(); + } + return size; } template @@ -1025,7 +1034,12 @@ void BufferedProducer::on_delivery_report(const Message& } // Signal producers if (tracker) { - tracker->should_retry_.set_value(should_retry); + try { + tracker->should_retry_.set_value(should_retry); + } + catch (const std::future_error& ex) { + //This is an async retry and future is not being read + } } // Decrement the expected acks and check to prevent underflow if (pending_acks_ > 0) { From fbbd5bc5a60a2929f52f23a5c729a98a06bb29dd Mon Sep 17 00:00:00 2001 From: Alexander Damian Date: Mon, 3 Feb 2020 22:04:50 -0500 Subject: [PATCH 2/2] Changed int to size_t --- include/cppkafka/utils/buffered_producer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 8a65b03..147f582 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -771,7 +771,7 @@ void BufferedProducer::clear() { template size_t BufferedProducer::get_buffer_size() const { - int size = 0; + size_t size = 0; { std::lock_guard lock(mutex_); size += messages_.size();