If timeout is 0, the function should at least run once (#123)

This commit is contained in:
Alex Damian
2018-10-22 10:55:29 -04:00
committed by Matias Fontanini
parent 416a7d43ce
commit ad9a1e4a49

View File

@@ -564,7 +564,7 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout, bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
bool preserve_order) { bool preserve_order) {
if (preserve_order) { if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_); CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
@@ -572,13 +572,15 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now(); auto start_time = std::chrono::high_resolution_clock::now();
while (!flush_queue.empty() && do {
(std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time) < timeout)) {
sync_produce(flush_queue.front()); sync_produce(flush_queue.front());
flush_queue.pop_front(); flush_queue.pop_front();
} // calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time);
} while (!flush_queue.empty() && (remaining.count() > 0));
} }
else { else {
async_flush(); async_flush();
@@ -608,7 +610,7 @@ template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) { bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) {
auto remaining = timeout; auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now(); auto start_time = std::chrono::high_resolution_clock::now();
while ((pending_acks_ > 0) && (remaining.count() > 0)) { do {
try { try {
producer_.flush(remaining); producer_.flush(remaining);
} }
@@ -625,7 +627,7 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
// calculate remaining time // calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds> remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time); (std::chrono::high_resolution_clock::now() - start_time);
} } while ((pending_acks_ > 0) && (remaining.count() > 0));
return (pending_acks_ == 0); return (pending_acks_ == 0);
} }