Express async_flush in terms of flush since the logic is identical except for the timeout

This commit is contained in:
Alexander Damian
2020-02-15 12:05:18 -05:00
parent 92e46aa6cb
commit 2287e0994b

View File

@@ -703,25 +703,7 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
{
QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex);
while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
};
//Produce retry queue first since these messages were produced first
queue_flusher(retry_messages_, retry_mutex_);
//Produce recently enqueued messages
queue_flusher(messages_, mutex_);
// Flush the producer but don't wait. It is necessary to poll
// the producer at least once during this operation because
// async_produce() will not.
wait_for_acks(std::chrono::milliseconds(0));
flush(std::chrono::milliseconds::zero(), false);
}
template <typename BufferType, typename Allocator>
@@ -732,26 +714,34 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [&timeout, this](QueueType& queue, std::mutex & mutex)->void
{
QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex);
//Produce one message at a time and wait for acks until queue is empty
while (!flush_queue.empty()) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [timeout, preserve_order, this]
(QueueType& queue, std::mutex & mutex)->void
{
QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex);
//Produce one message at a time and wait for acks until queue is empty
while (!flush_queue.empty()) {
if (preserve_order) {
//When preserving order, we must ensure that each message
//gets delivered before producing the next one.
sync_produce(flush_queue.front(), timeout);
flush_queue.pop_front();
}
};
//Produce retry queue first since these messages were produced first
queue_flusher(retry_messages_, retry_mutex_);
//Produce recently enqueued messages
queue_flusher(messages_, mutex_);
}
else {
//Produce all messages at once then wait for acks.
async_flush();
else {
//Produce as fast as possible w/o waiting. If one or more
//messages fail, they will be re-enqueued for retry
//on the next flush cycle, which causes re-ordering.
async_produce(flush_queue.front(), false);
}
flush_queue.pop_front();
}
};
//Produce retry queue first since these messages were produced first.
queue_flusher(retry_messages_, retry_mutex_);
//Produce recently enqueued messages
queue_flusher(messages_, mutex_);
if (!preserve_order) {
//Wait for acks from the messages produced above via async_produce
wait_for_acks(timeout);
}
return pending_acks_ == 0;
@@ -759,20 +749,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
while (pending_acks_ > 0) {
try {
producer_.flush();
}
catch (const HandleException& ex) {
// If we just hit the timeout, keep going, otherwise re-throw
if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) {
continue;
}
else {
throw;
}
}
}
wait_for_acks(producer_.get_timeout());
}
template <typename BufferType, typename Allocator>