refactored by adding retry_mutex_ and replacing bools with enums; fixed formatting issues

This commit is contained in:
demin80
2019-01-10 14:37:46 -05:00
parent 71c4e02143
commit 93c2edf6ba

View File

@@ -483,7 +483,9 @@ protected:
private:
enum class SenderType { Sync, Async };
enum class QueueKind { Retry, Regular };
enum class FlushAction { DontFlush, DoFlush };
template <typename T>
struct CounterGuard{
CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; }
@@ -518,19 +520,21 @@ private:
return nullptr;
}
template <typename BuilderType>
void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush);
void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
template <typename BuilderType>
void produce_message(BuilderType&& builder);
Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message);
template <typename BuilderType>
void async_produce(BuilderType&& message, bool throw_on_error);
static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex);
// Members
Producer producer_;
QueueType messages_;
QueueType retry_messages_;
mutable std::mutex mutex_;
mutable std::mutex retry_mutex_;
ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_;
ProduceTerminationCallback produce_termination_callback_;
@@ -581,7 +585,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
add_tracker(SenderType::Async, builder);
do_add_message(move(builder), false, true);
do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush);
}
template <typename BufferType, typename Allocator>
@@ -625,40 +629,36 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [this](QueueType& queue)->void
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
{
QueueType flush_queue; // flush from temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(queue, flush_queue);
}
swap_queues(queue, flush_queue, mutex);
while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front();
}
};
queue_flusher(retry_messages_);
queue_flusher(messages_);
queue_flusher(retry_messages_, retry_mutex_);
queue_flusher(messages_, mutex_);
}
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [this](QueueType& queue)->void
auto queue_flusher = [this](QueueType& queue, std::mutex & mutex)->void
{
QueueType flush_queue; // flush from temporary queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(queue, flush_queue);
}
swap_queues(queue, flush_queue, mutex);
while (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
};
queue_flusher(retry_messages_);
queue_flusher(messages_);
queue_flusher(retry_messages_, retry_mutex_);
queue_flusher(messages_, mutex_);
}
else {
async_flush();
@@ -672,12 +672,10 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
swap_queues(messages_, flush_queue, mutex_);
QueueType retry_flush_queue; // flush from temporary retry queue
{
std::lock_guard<std::mutex> lock(mutex_);
std::swap(retry_messages_, retry_flush_queue);
std::swap(messages_, flush_queue);
}
swap_queues(retry_messages_, retry_flush_queue, retry_mutex_);
auto queue_flusher = [this](QueueType& queue)->bool
{
if (!queue.empty()) {
@@ -699,17 +697,17 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
} while (remaining.count() > 0);
// Re-enqueue remaining messages in original order
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void
{
if (!src_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(mutex);
dst_queue.insert(dst_queue.begin(),
std::make_move_iterator(src_queue.begin()),
std::make_move_iterator(src_queue.end()));
}
};
re_enqueuer(retry_flush_queue, retry_messages_);
re_enqueuer(flush_queue, messages_);
re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_);
re_enqueuer(flush_queue, messages_, mutex_);
}
else {
async_flush();
@@ -762,11 +760,10 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp;
std::swap(tmp, messages_);
swap_queues(messages_, tmp, mutex_);
QueueType retry_tmp;
std::swap(retry_tmp, retry_messages_);
swap_queues(retry_messages_, retry_tmp, retry_mutex_);
}
template <typename BufferType, typename Allocator>
@@ -801,21 +798,20 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
bool is_retry,
bool do_flush) {
{
QueueKind queue_kind,
FlushAction flush_action) {
if (queue_kind == QueueKind::Retry) {
std::lock_guard<std::mutex> lock(retry_mutex_);
retry_messages_.emplace_back(std::forward<BuilderType>(builder));
}
else {
std::lock_guard<std::mutex> lock(mutex_);
if (is_retry) {
retry_messages_.emplace_back(std::forward<BuilderType>(builder));
}
else {
messages_.emplace_back(std::forward<BuilderType>(builder));
}
messages_.emplace_back(std::forward<BuilderType>(builder));
}
// Flush the queues only if a regular message is added. Retry messages may be added
// from rdkafka callbacks, and flush/async_flush is a user-level call
if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
if (flush_method_ == FlushMethod::Sync) {
flush();
}
@@ -963,7 +959,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_;
do_add_message(std::forward<BuilderType>(builder), true, false);
do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush);
return;
}
}
@@ -1002,7 +998,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
--tracker->num_retries_;
if (tracker->sender_ == SenderType::Async) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), true, false);
do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush);
}
should_retry = true;
}
@@ -1034,6 +1030,13 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
}
}
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex)
{
std::lock_guard<std::mutex> lock(mutex);
std::swap(queue1, queue2);
}
} // cppkafka
#endif // CPPKAFKA_BUFFERED_PRODUCER_H