added method to empty the buffer when max limit is reached

This commit is contained in:
accelerated
2018-06-07 14:09:17 -04:00
parent 3cf9bb53e9
commit 7530b9f9e4

View File

@@ -86,6 +86,8 @@ namespace cppkafka {
template <typename BufferType>
class CPPKAFKA_API BufferedProducer {
public:
enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker
Purge }; ///< Empty the buffer and don't wait for acks
/**
* Concrete builder
*/
@@ -227,6 +229,20 @@ public:
*/
ssize_t get_max_buffer_size() const;
/**
* \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush'
*
* \param method The method
*/
void set_buffer_empty_method(EmptyBufferMethod method);
/**
* \brief Gets the method used to empty the internal buffer.
*
* \return The method
*/
EmptyBufferMethod get_buffer_empty_method() const;
/**
* \brief Get the number of messages not yet acked by the broker
*
@@ -382,7 +398,7 @@ private:
return nullptr;
}
template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer);
template <typename BuilderType>
void produce_message(BuilderType&& builder);
Configuration prepare_configuration(Configuration config);
@@ -398,6 +414,7 @@ private:
ProduceFailureCallback produce_failure_callback_;
FlushFailureCallback flush_failure_callback_;
ssize_t max_buffer_size_{-1};
EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush};
std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0};
@@ -540,11 +557,22 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
return max_buffer_size_;
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_buffer_empty_method(EmptyBufferMethod method) {
empty_buffer_method_ = method;
}
template <typename BufferType>
typename BufferedProducer<BufferType>::EmptyBufferMethod
BufferedProducer<BufferType>::get_buffer_empty_method() const {
return empty_buffer_method_;
}
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
MessagePriority priority,
bool do_flush) {
bool do_empty_buffer) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) {
@@ -554,8 +582,13 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
messages_.emplace_back(std::forward<BuilderType>(builder));
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
flush();
if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
if (empty_buffer_method_ == EmptyBufferMethod::Flush) {
flush();
}
else {
purge();
}
}
}