Handle producer queue being full on buffered producer

This commit is contained in:
Matias Fontanini
2016-11-07 07:32:40 -08:00
parent aa7fa6ce0d
commit 6f60881d87

View File

@@ -171,13 +171,29 @@ unsigned BufferedProducer<BufferType>::get_topic_index(const std::string& topic)
template <typename BufferType>
void BufferedProducer<BufferType>::produce_message(IndexType index,
const BufferedMessage& message) {
if (message.key) {
producer_.produce(topics_[message.topic_index], message.partition, *message.key,
message.payload, reinterpret_cast<void*>(index));
}
else {
producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/,
message.payload, reinterpret_cast<void*>(index));
bool sent = false;
while (!sent) {
try {
if (message.key) {
producer_.produce(topics_[message.topic_index], message.partition, *message.key,
message.payload, reinterpret_cast<void*>(index));
}
else {
producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/,
message.payload, reinterpret_cast<void*>(index));
}
sent = true;
}
catch (const HandleException& ex) {
const Error error = ex.get_error();
if (error == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
// If the output queue is full, then just poll
producer_.poll();
}
else {
throw;
}
}
}
}