mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 02:57:53 +00:00
Changed purge to async_flush
This commit is contained in:
@@ -86,8 +86,8 @@ namespace cppkafka {
|
||||
template <typename BufferType>
|
||||
class CPPKAFKA_API BufferedProducer {
|
||||
public:
|
||||
enum class EmptyBufferMethod { Flush, ///< Empty the buffer and wait for acks from the broker
|
||||
Purge }; ///< Empty the buffer and don't wait for acks
|
||||
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
|
||||
Async }; ///< Empty the buffer and don't wait for acks
|
||||
/**
|
||||
* Concrete builder
|
||||
*/
|
||||
@@ -177,7 +177,7 @@ public:
|
||||
*
|
||||
* Similar to flush, it will send all messages but will not wait for acks to complete.
|
||||
*/
|
||||
void purge();
|
||||
void async_flush();
|
||||
|
||||
/**
|
||||
* \brief Flushes the buffered messages.
|
||||
@@ -230,18 +230,19 @@ public:
|
||||
ssize_t get_max_buffer_size() const;
|
||||
|
||||
/**
|
||||
* \brief Sets the method used to empty the internal buffer when 'max_buffer_size' is reached. Default is 'Flush'
|
||||
* \brief Sets the method used to flush the internal buffer when 'max_buffer_size' is reached.
|
||||
* Default is 'Sync'
|
||||
*
|
||||
* \param method The method
|
||||
*/
|
||||
void set_buffer_empty_method(EmptyBufferMethod method);
|
||||
void set_flush_method(FlushMethod method);
|
||||
|
||||
/**
|
||||
* \brief Gets the method used to empty the internal buffer.
|
||||
* \brief Gets the method used to flush the internal buffer.
|
||||
*
|
||||
* \return The method
|
||||
*/
|
||||
EmptyBufferMethod get_buffer_empty_method() const;
|
||||
FlushMethod get_flush_method() const;
|
||||
|
||||
/**
|
||||
* \brief Get the number of messages not yet acked by the broker
|
||||
@@ -398,7 +399,7 @@ private:
|
||||
return nullptr;
|
||||
}
|
||||
template <typename BuilderType>
|
||||
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_empty_buffer);
|
||||
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
|
||||
template <typename BuilderType>
|
||||
void produce_message(BuilderType&& builder);
|
||||
Configuration prepare_configuration(Configuration config);
|
||||
@@ -414,7 +415,7 @@ private:
|
||||
ProduceFailureCallback produce_failure_callback_;
|
||||
FlushFailureCallback flush_failure_callback_;
|
||||
ssize_t max_buffer_size_{-1};
|
||||
EmptyBufferMethod empty_buffer_method_{EmptyBufferMethod::Flush};
|
||||
FlushMethod flush_method_{FlushMethod::Sync};
|
||||
std::atomic<size_t> pending_acks_{0};
|
||||
std::atomic<size_t> flushes_in_progress_{0};
|
||||
std::atomic<size_t> total_messages_produced_{0};
|
||||
@@ -495,7 +496,7 @@ void BufferedProducer<BufferType>::produce(const Message& message) {
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::purge() {
|
||||
void BufferedProducer<BufferType>::async_flush() {
|
||||
CounterGuard<size_t> counter_guard(flushes_in_progress_);
|
||||
QueueType flush_queue; // flush from temporary queue
|
||||
{
|
||||
@@ -510,7 +511,7 @@ void BufferedProducer<BufferType>::purge() {
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::flush() {
|
||||
purge();
|
||||
async_flush();
|
||||
wait_for_acks();
|
||||
}
|
||||
|
||||
@@ -558,21 +559,21 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
void BufferedProducer<BufferType>::set_buffer_empty_method(EmptyBufferMethod method) {
|
||||
empty_buffer_method_ = method;
|
||||
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
|
||||
flush_method_ = method;
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
typename BufferedProducer<BufferType>::EmptyBufferMethod
|
||||
BufferedProducer<BufferType>::get_buffer_empty_method() const {
|
||||
return empty_buffer_method_;
|
||||
typename BufferedProducer<BufferType>::FlushMethod
|
||||
BufferedProducer<BufferType>::get_flush_method() const {
|
||||
return flush_method_;
|
||||
}
|
||||
|
||||
template <typename BufferType>
|
||||
template <typename BuilderType>
|
||||
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
|
||||
MessagePriority priority,
|
||||
bool do_empty_buffer) {
|
||||
bool do_flush) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
if (priority == MessagePriority::High) {
|
||||
@@ -582,12 +583,12 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
|
||||
messages_.emplace_back(std::forward<BuilderType>(builder));
|
||||
}
|
||||
}
|
||||
if (do_empty_buffer && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
|
||||
if (empty_buffer_method_ == EmptyBufferMethod::Flush) {
|
||||
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
|
||||
if (flush_method_ == FlushMethod::Sync) {
|
||||
flush();
|
||||
}
|
||||
else {
|
||||
purge();
|
||||
async_flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user