Added a high-priority queue to BufferedProducer to avoid message re-ordering

This commit is contained in:
demin80
2019-01-07 14:39:09 -05:00
parent 4ba6b38b6e
commit 97229ebfd9

View File

@@ -530,6 +530,7 @@ private:
// Members // Members
Producer producer_; Producer producer_;
QueueType messages_; QueueType messages_;
QueueType hi_pri_messages_;
mutable std::mutex mutex_; mutable std::mutex mutex_;
ProduceSuccessCallback produce_success_callback_; ProduceSuccessCallback produce_success_callback_;
ProduceFailureCallback produce_failure_callback_; ProduceFailureCallback produce_failure_callback_;
@@ -565,7 +566,8 @@ template <typename BufferType, typename Allocator>
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config, BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
const Allocator& alloc) const Allocator& alloc)
: producer_(prepare_configuration(std::move(config))), : producer_(prepare_configuration(std::move(config))),
messages_(alloc) { messages_(alloc),
hi_pri_messages_(alloc) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>()); producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE #ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr; test_params_ = nullptr;
@@ -625,10 +627,16 @@ template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() { void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
while (!hi_pri_flush_queue.empty()) {
async_produce(std::move(hi_pri_flush_queue.front()), false);
hi_pri_flush_queue.pop_front();
}
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
async_produce(std::move(flush_queue.front()), false); async_produce(std::move(flush_queue.front()), false);
flush_queue.pop_front(); flush_queue.pop_front();
@@ -640,10 +648,16 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) { if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
while (!hi_pri_flush_queue.empty()) {
sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
}
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
sync_produce(flush_queue.front()); sync_produce(flush_queue.front());
flush_queue.pop_front(); flush_queue.pop_front();
@@ -661,25 +675,45 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) { if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
QueueType hi_pri_flush_queue; // flush from hi-priority temporary queue
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(hi_pri_messages_, hi_pri_flush_queue);
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
auto remaining = timeout; auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now(); auto start_time = std::chrono::high_resolution_clock::now();
do { do {
sync_produce(flush_queue.front()); if (!hi_pri_flush_queue.empty()) {
flush_queue.pop_front(); sync_produce(hi_pri_flush_queue.front());
hi_pri_flush_queue.pop_front();
}
else if (!flush_queue.empty()) {
sync_produce(flush_queue.front());
flush_queue.pop_front();
}
else {
break;
}
// calculate remaining time // calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time); (std::chrono::high_resolution_clock::now() - start_time);
} while (!flush_queue.empty() && (remaining.count() > 0)); } while (remaining.count() > 0);
// Re-enqueue remaining messages in original order // Re-enqueue remaining messages in original order
if (!flush_queue.empty()) { if (!hi_pri_flush_queue.empty() || !flush_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
messages_.insert(messages_.begin(), std::make_move_iterator(flush_queue.begin()), std::make_move_iterator(flush_queue.end())); if (!!hi_pri_flush_queue.empty()) {
} hi_pri_messages_.insert(hi_pri_messages_.begin(),
std::make_move_iterator(hi_pri_flush_queue.begin()),
std::make_move_iterator(hi_pri_flush_queue.end()));
}
if (!flush_queue.empty()) {
messages_.insert(messages_.begin(),
std::make_move_iterator(flush_queue.begin()),
std::make_move_iterator(flush_queue.end()));
}
}
} }
else { else {
async_flush(); async_flush();
@@ -735,11 +769,13 @@ void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp; QueueType tmp;
std::swap(tmp, messages_); std::swap(tmp, messages_);
QueueType hi_pri_tmp;
std::swap(hi_pri_tmp, hi_pri_messages_);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const { size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
return messages_.size(); return messages_.size() + hi_pri_messages_.size();
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
@@ -774,20 +810,21 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) { if (priority == MessagePriority::High) {
messages_.emplace_front(std::forward<BuilderType>(builder)); hi_pri_messages_.emplace_back(std::forward<BuilderType>(builder));
} }
else { else {
messages_.emplace_back(std::forward<BuilderType>(builder)); messages_.emplace_back(std::forward<BuilderType>(builder));
} }
} }
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
if (priority == MessagePriority::Low && do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= get_buffer_size())) {
if (flush_method_ == FlushMethod::Sync) { if (flush_method_ == FlushMethod::Sync) {
flush(); flush();
} }
else { else {
async_flush(); async_flush();
} }
} }
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>