Added purge (aka async_flush) functionality

This commit is contained in:
accelerated
2018-06-05 17:45:41 -04:00
parent 0c7a3b0c25
commit 3cf9bb53e9

View File

@@ -169,6 +169,13 @@ public:
* \remark This method throws cppkafka::HandleException on failure * \remark This method throws cppkafka::HandleException on failure
*/ */
void produce(const Message& message); void produce(const Message& message);
/**
* \brief Flushes all buffered messages and returns immediately.
*
* Similar to flush, it will send all messages but will not wait for acks to complete.
*/
void purge();
/** /**
* \brief Flushes the buffered messages. * \brief Flushes the buffered messages.
@@ -471,7 +478,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::flush() { void BufferedProducer<BufferType>::purge() {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
{ {
@@ -482,6 +489,11 @@ void BufferedProducer<BufferType>::flush() {
async_produce(std::move(flush_queue.front()), false); async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front(); flush_queue.pop_front();
} }
}
template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
purge();
wait_for_acks(); wait_for_acks();
} }