Merge pull request #79 from accelerated/purge

Added purge (aka async_flush) functionality
This commit is contained in:
Matias Fontanini
2018-06-12 09:30:58 -07:00
committed by GitHub

View File

@@ -86,6 +86,8 @@ namespace cppkafka {
template <typename BufferType>
class CPPKAFKA_API BufferedProducer {
public:
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
Async }; ///< Empty the buffer and don't wait for acks
/**
* Concrete builder
*/
@@ -169,6 +171,13 @@ public:
* \remark This method throws cppkafka::HandleException on failure
*/
void produce(const Message& message);
/**
* \brief Flushes all buffered messages and returns immediately.
*
* Similar to flush, it will send all messages but will not wait for acks to complete.
*/
void async_flush();
/**
* \brief Flushes the buffered messages.
@@ -220,6 +229,21 @@ public:
*/
ssize_t get_max_buffer_size() const;
/**
* \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached.
* Default is 'Sync'
*
* \param method The method
*/
void set_flush_method(FlushMethod method);
/**
* \brief Gets the method used to flush the internal buffer.
*
* \return The method
*/
FlushMethod get_flush_method() const;
/**
* \brief Get the number of messages not yet acked by the broker
*
@@ -391,6 +415,7 @@ private:
ProduceFailureCallback produce_failure_callback_;
FlushFailureCallback flush_failure_callback_;
ssize_t max_buffer_size_{-1};
FlushMethod flush_method_{FlushMethod::Sync};
std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0};
@@ -471,7 +496,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
}
template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
void BufferedProducer<BufferType>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
{
@@ -482,6 +507,11 @@ void BufferedProducer<BufferType>::flush() {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::flush() {
async_flush();
wait_for_acks();
}
@@ -528,6 +558,17 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
return max_buffer_size_;
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
flush_method_ = method;
}
template <typename BufferType>
typename BufferedProducer<BufferType>::FlushMethod
BufferedProducer<BufferType>::get_flush_method() const {
return flush_method_;
}
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
@@ -543,7 +584,12 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
}
}
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
flush();
if (flush_method_ == FlushMethod::Sync) {
flush();
}
else {
async_flush();
}
}
}