Revised the implementation based on the reviewers' response

This commit is contained in:
demin80
2019-01-08 13:48:26 -05:00
parent 00370c981d
commit 71c4e02143

View File

@@ -482,7 +482,6 @@ protected:
#endif #endif
private: private:
enum class MessagePriority { Low, High };
enum class SenderType { Sync, Async }; enum class SenderType { Sync, Async };
template <typename T> template <typename T>
@@ -519,7 +518,7 @@ private:
return nullptr; return nullptr;
} }
template <typename BuilderType> template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); void do_add_message(BuilderType&& builder, bool is_retry, bool do_flush);
template <typename BuilderType> template <typename BuilderType>
void produce_message(BuilderType&& builder); void produce_message(BuilderType&& builder);
Configuration prepare_configuration(Configuration config); Configuration prepare_configuration(Configuration config);
@@ -530,7 +529,7 @@ private:
// Members // Members
Producer producer_; Producer producer_;
QueueType messages_; QueueType messages_;
QueueType hi_pri_messages_; QueueType retry_messages_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_; ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_; ProduceFailureCallback produce_failure_callback_;
@@ -567,7 +566,7 @@ BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
const Allocator& alloc) const Allocator& alloc)
: producer_(prepare_configuration(std::move(config))), : producer_(prepare_configuration(std::move(config))),
messages_(alloc), messages_(alloc),
hi_pri_messages_(alloc) { retry_messages_(alloc) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>()); producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE #ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr; test_params_ = nullptr;
@@ -582,7 +581,7 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) { void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
add_tracker(SenderType::Async, builder); add_tracker(SenderType::Async, builder);
do_add_message(move(builder), MessagePriority::Low, true); do_add_message(move(builder), false, true);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
@@ -626,42 +625,40 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() { void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [this](QueueType& queue)->void
{
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(queue, flush_queue);
std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
async_produce(std::move(hi_pri_flush_queue.front()), false);
hi_pri_flush_queue.pop_front();
} }
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false); async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front(); flush_queue.pop_front();
} }
};
queue_flusher(retry_messages_);
queue_flusher(messages_);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) { void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) { if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
auto queue_flusher = [this](QueueType& queue)->void
{
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(queue, flush_queue);
std::swap(messages_, flush_queue);
}
while (!hi_pri_flush_queue.empty()) {
sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
} }
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
sync_produce(flush_queue.front()); sync_produce(flush_queue.front());
flush_queue.pop_front(); flush_queue.pop_front();
} }
};
queue_flusher(retry_messages_);
queue_flusher(messages_);
} }
else { else {
async_flush(); async_flush();
@@ -675,24 +672,25 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) { if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue QueueType retry_flush_queue; // flush from temporary retry queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue); std::swap(retry_messages_, retry_flush_queue);
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
auto queue_flusher = [this](QueueType& queue)->bool
{
if (!queue.empty()) {
sync_produce(queue.front());
queue.pop_front();
return true;
}
return false;
};
auto remaining = timeout; auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now(); auto start_time = std::chrono::high_resolution_clock::now();
do { do {
if (!hi_pri_flush_queue.empty()) { if (!queue_flusher(retry_flush_queue) && !queue_flusher(flush_queue)) {
sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
}
else if (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
else {
break; break;
} }
// calculate remaining time // calculate remaining time
@@ -701,19 +699,17 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
} while (remaining.count() > 0); } while (remaining.count() > 0);
// Re-enqueue remaining messages in original order // Re-enqueue remaining messages in original order
if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) { auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue)->void
{
if (!src_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (!!hi_pri_flush_queue.empty()) { dst_queue.insert(dst_queue.begin(),
hi_pri_messages_.insert(hi_pri_messages_.begin(), std::make_move_iterator(src_queue.begin()),
std::make_move_iterator(hi_pri_flush_queue.begin()), std::make_move_iterator(src_queue.end()));
std::make_move_iterator(hi_pri_flush_queue.end()));
}
if (!flush_queue.empty()) {
messages_.insert(messages_.begin(),
std::make_move_iterator(flush_queue.begin()),
std::make_move_iterator(flush_queue.end()));
}
} }
};
re_enqueuer(retry_flush_queue, retry_messages_);
re_enqueuer(flush_queue, messages_);
} }
else { else {
async_flush(); async_flush();
@@ -769,13 +765,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp; QueueType tmp;
std::swap(tmp, messages_); std::swap(tmp, messages_);
QueueType hi_pri_tmp; QueueType retry_tmp;
std::swap(hi_pri_tmp, hi_pri_messages_); std::swap(retry_tmp, retry_messages_);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const { size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
return messages_.size() + hi_pri_messages_.size(); return messages_.size() + retry_messages_.size();
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
@@ -805,19 +801,21 @@ BufferedProducer<BufferType, Allocator>::get_flush_method() const {
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
template <typename BuilderType> template <typename BuilderType>
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder, void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
MessagePriority priority, bool is_retry,
bool do_flush) { bool do_flush) {
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) { if (is_retry) {
hi_pri_messages_.emplace_back(std::forward<BuilderType>(builder)); retry_messages_.emplace_back(std::forward<BuilderType>(builder));
} }
else { else {
messages_.emplace_back(std::forward<BuilderType>(builder)); messages_.emplace_back(std::forward<BuilderType>(builder));
} }
} }
if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) { // Flush the queues only if a regular message is added. Retry messages may be added
// from rdkafka callbacks, and flush/async_flush is a user-level call
if (!is_retry && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
if (flush_method_ == FlushMethod::Sync) { if (flush_method_ == FlushMethod::Sync) {
flush(); flush();
} }
@@ -965,7 +963,7 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal()); TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
if (tracker && tracker->num_retries_ > 0) { if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_; --tracker->num_retries_;
do_add_message(std::forward<BuilderType>(builder), MessagePriority::High, false); do_add_message(std::forward<BuilderType>(builder), true, false);
return; return;
} }
} }
@@ -1004,7 +1002,7 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
--tracker->num_retries_; --tracker->num_retries_;
if (tracker->sender_ == SenderType::Async) { if (tracker->sender_ == SenderType::Async) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue) // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), MessagePriority::High, false); do_add_message(Builder(message), true, false);
} }
should_retry = true; should_retry = true;
} }