diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index f443a3e..2aa44e2 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -171,13 +171,29 @@ unsigned BufferedProducer::get_topic_index(const std::string& topic) template void BufferedProducer::produce_message(IndexType index, const BufferedMessage& message) { - if (message.key) { - producer_.produce(topics_[message.topic_index], message.partition, *message.key, - message.payload, reinterpret_cast(index)); - } - else { - producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/, - message.payload, reinterpret_cast(index)); + bool sent = false; + while (!sent) { + try { + if (message.key) { + producer_.produce(topics_[message.topic_index], message.partition, *message.key, + message.payload, reinterpret_cast(index)); + } + else { + producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/, + message.payload, reinterpret_cast(index)); + } + sent = true; + } + catch (const HandleException& ex) { + const Error error = ex.get_error(); + if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) { + // If the output queue is full, then just poll + producer_.poll(); + } + else { + throw; + } + } } }