Added timeout to flush and wait_for_acks

This commit is contained in:
accelerated
2018-06-14 11:43:12 -04:00
parent 157b7ec997
commit 3c72eb5752

View File

@@ -190,11 +190,27 @@ public:
* with respect to other threads.
*/
void flush();
/**
* \brief Flushes the buffered messages and waits up to 'timeout'
*
* \param timeout The maximum time to wait until all acks are received
*
* \return True if the operation completes and all acks have been received.
*/
bool flush(std::chrono::milliseconds timeout);
/**
* Waits for produced message's acknowledgements from the brokers
*/
void wait_for_acks();
/**
* Waits for produced message's acknowledgements from the brokers up to 'timeout'.
*
* \return True if the operation completes and all acks have been received.
*/
bool wait_for_acks(std::chrono::milliseconds timeout);
/**
* Clears any buffered messages
@@ -515,6 +531,12 @@ void BufferedProducer<BufferType>::flush() {
wait_for_acks();
}
template <typename BufferType>
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout) {
async_flush();
return wait_for_acks(timeout);
}
template <typename BufferType>
void BufferedProducer<BufferType>::wait_for_acks() {
while (pending_acks_ > 0) {
@@ -533,6 +555,31 @@ void BufferedProducer<BufferType>::wait_for_acks() {
}
}
template <typename BufferType>
bool BufferedProducer<BufferType>::wait_for_acks(std::chrono::milliseconds timeout) {
auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now();
while ((pending_acks_ > 0) && (remaining.count() > 0)) {
try {
producer_.flush(remaining);
}
catch (const HandleException& ex) {
// If we just hit the timeout, keep going, otherwise re-throw
if (ex.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) {
// There is no time remaining
return (pending_acks_ == 0);
}
else {
throw;
}
}
// calculate remaining time
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
(std::chrono::high_resolution_clock::now() - start_time);
}
return (pending_acks_ == 0);
}
template <typename BufferType>
void BufferedProducer<BufferType>::clear() {
std::lock_guard<std::mutex> lock(mutex_);