diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 1fec288..147f582 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -771,7 +771,16 @@ void BufferedProducer::clear() { template size_t BufferedProducer::get_buffer_size() const { - return messages_.size() + retry_messages_.size(); + size_t size = 0; + { + std::lock_guard lock(mutex_); + size += messages_.size(); + } + { + std::lock_guard lock(retry_mutex_); + size += retry_messages_.size(); + } + return size; } template @@ -1025,7 +1034,12 @@ void BufferedProducer::on_delivery_report(const Message& } // Signal producers if (tracker) { - tracker->should_retry_.set_value(should_retry); + try { + tracker->should_retry_.set_value(should_retry); + } + catch (const std::future_error& ex) { + //This is an async retry and future is not being read + } } // Decrement the expected acks and check to prevent underflow if (pending_acks_ > 0) {