From 7530b9f9e4e445ae5b981616d4ca0d79a317a71f Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Thu, 7 Jun 2018 14:09:17 -0400 Subject: [PATCH] added method to empty the buffer when max limit is reached --- include/cppkafka/utils/buffered_producer.h | 41 +++++++++++++++++++--- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 8727216..b80e0b4 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -86,6 +86,8 @@ namespace cppkafka { template class CPPKAFKA_API BufferedProducer { public: + enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker + Purge }; ///< Empty the buffer and don't wait for acks /** * Concrete builder */ @@ -227,6 +229,20 @@ public: */ ssize_t get_max_buffer_size() const; + /** + * \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush' + * + * \param method The method + */ + void set_buffer_empty_method(EmptyBufferMethod method); + + /** + * \brief Gets the method used to empty the internal buffer. + * + * \return The method + */ + EmptyBufferMethod get_buffer_empty_method() const; + /** * \brief Get the number of messages not yet acked by the broker * @@ -382,7 +398,7 @@ private: return nullptr; } template - void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer); template void produce_message(BuilderType&& builder); Configuration prepare_configuration(Configuration config); @@ -398,6 +414,7 @@ private: ProduceFailureCallback produce_failure_callback_; FlushFailureCallback flush_failure_callback_; ssize_t max_buffer_size_{-1}; + EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush}; std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; @@ -540,11 +557,22 @@ ssize_t BufferedProducer::get_max_buffer_size() const { return max_buffer_size_; } +template +void BufferedProducer::set_buffer_empty_method(EmptyBufferMethod method) { + empty_buffer_method_ = method; +} + +template +typename BufferedProducer::EmptyBufferMethod +BufferedProducer::get_buffer_empty_method() const { + return empty_buffer_method_; +} + template template void BufferedProducer::do_add_message(BuilderType&& builder, MessagePriority priority, - bool do_flush) { + bool do_empty_buffer) { { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { @@ -554,8 +582,13 @@ void BufferedProducer::do_add_message(BuilderType&& builder, messages_.emplace_back(std::forward(builder)); } } - if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { - flush(); + if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + if (empty_buffer_method_ == EmptyBufferMethod::Flush) { + flush(); + } + else { + purge(); + } } }