From 3cf9bb53e908d1c9a8356c5666c7e0a87b6b4e52 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 5 Jun 2018 17:45:41 -0400 Subject: [PATCH 1/3] Added purge (aka async_flush) functionality --- include/cppkafka/utils/buffered_producer.h | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index cef799a..8727216 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -169,6 +169,13 @@ public: * \remark This method throws cppkafka::HandleException on failure */ void produce(const Message& message); + + /** + * \brief Flushes all buffered messages and returns immediately. + * + * Similar to flush, it will send all messages but will not wait for acks to complete. + */ + void purge(); /** * \brief Flushes the buffered messages. @@ -471,7 +478,7 @@ void BufferedProducer::produce(const Message& message) { } template -void BufferedProducer::flush() { +void BufferedProducer::purge() { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue { @@ -482,6 +489,11 @@ void BufferedProducer::flush() { async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); } +} + +template +void BufferedProducer::flush() { + purge(); wait_for_acks(); } From 7530b9f9e4e445ae5b981616d4ca0d79a317a71f Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 7 Jun 2018 14:09:17 -0400 Subject: [PATCH 2/3] added method to empty the buffer when max limit is reached --- include/cppkafka/utils/buffered_producer.h | 41 +++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 8727216..b80e0b4 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -86,6 +86,8 @@ namespace cppkafka { template class CPPKAFKA_API BufferedProducer { public: + enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker + Purge }; ///< Empty the buffer and don't wait for acks /** * Concrete builder */ @@ -227,6 +229,20 @@ public: */ ssize_t get_max_buffer_size() const; + /** + * \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush' + * + * \param method The method + */ + void set_buffer_empty_method(EmptyBufferMethod method); + + /** + * \brief Gets the method used to empty the internal buffer. + * + * \return The method + */ + EmptyBufferMethod get_buffer_empty_method() const; + /** * \brief Get the number of messages not yet acked by the broker * @@ -382,7 +398,7 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); @@ -398,6 +414,7 @@ private: ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; + EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush}; std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; @@ -540,11 +557,22 @@ ssize_t BufferedProducer::get_max_buffer_size() const { return max_buffer_size_; } +template +void BufferedProducer::set_buffer_empty_method(EmptyBufferMethod method) { + empty_buffer_method_ = method; +} + +template +typename BufferedProducer::EmptyBufferMethod +BufferedProducer::get_buffer_empty_method() const { + return empty_buffer_method_; +} + template template void BufferedProducer::do_add_message(BuilderType&& builder, MessagePriority priority, - bool do_flush) { + bool do_empty_buffer) { { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { @@ -554,8 +582,13 @@ void BufferedProducer::do_add_message(BuilderType&& builder, messages_.emplace_back(std::forward(builder)); } } - if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { - flush(); + if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + if (empty_buffer_method_ == EmptyBufferMethod::Flush) { + flush(); + } + else { + purge(); + } } } From f220062e400031c0cbb03af7233801a6efa9ac7c Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 12 Jun 2018 10:23:48 -0400 Subject: [PATCH 3/3] Changed purge to async_flush --- include/cppkafka/utils/buffered_producer.h | 41 +++++++++++----------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index b80e0b4..70c1f5e 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -86,8 +86,8 @@ namespace cppkafka { template class CPPKAFKA_API BufferedProducer { public: - enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker - Purge }; ///< Empty the buffer and don't wait for acks + enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker + Async }; ///< Empty the buffer and don't wait for acks /** * Concrete builder */ @@ -177,7 +177,7 @@ public: * * Similar to flush, it will send all messages but will not wait for acks to complete. */ - void purge(); + void async_flush(); /** * \brief Flushes the buffered messages. @@ -230,18 +230,19 @@ public: ssize_t get_max_buffer_size() const; /** - * \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush' + * \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached. + * Default is 'Sync' * * \param method The method */ - void set_buffer_empty_method(EmptyBufferMethod method); + void set_flush_method(FlushMethod method); /** - * \brief Gets the method used to empty the internal buffer. + * \brief Gets the method used to flush the internal buffer. * * \return The method */ - EmptyBufferMethod get_buffer_empty_method() const; + FlushMethod get_flush_method() const; /** * \brief Get the number of messages not yet acked by the broker @@ -398,7 +399,7 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer); + void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); @@ -414,7 +415,7 @@ private: ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; - EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush}; + FlushMethod flush_method_{FlushMethod::Sync}; std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; @@ -495,7 +496,7 @@ void BufferedProducer::produce(const Message& message) { } template -void BufferedProducer::purge() { +void BufferedProducer::async_flush() { CounterGuard counter_guard(flushes_in_progress_); QueueType flush_queue; // flush from temporary queue { @@ -510,7 +511,7 @@ void BufferedProducer::purge() { template void BufferedProducer::flush() { - purge(); + async_flush(); wait_for_acks(); } @@ -558,21 +559,21 @@ ssize_t BufferedProducer::get_max_buffer_size() const { } template -void BufferedProducer::set_buffer_empty_method(EmptyBufferMethod method) { - empty_buffer_method_ = method; +void BufferedProducer::set_flush_method(FlushMethod method) { + flush_method_ = method; } template -typename BufferedProducer::EmptyBufferMethod -BufferedProducer::get_buffer_empty_method() const { - return empty_buffer_method_; +typename BufferedProducer::FlushMethod +BufferedProducer::get_flush_method() const { + return flush_method_; } template template void BufferedProducer::do_add_message(BuilderType&& builder, MessagePriority priority, - bool do_empty_buffer) { + bool do_flush) { { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { @@ -582,12 +583,12 @@ void BufferedProducer::do_add_message(BuilderType&& builder, messages_.emplace_back(std::forward(builder)); } } - if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { - if (empty_buffer_method_ == EmptyBufferMethod::Flush) { + if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + if (flush_method_ == FlushMethod::Sync) { flush(); } else { - purge(); + async_flush(); } } }