mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-01 02:57:53 +00:00
Wait until the ack is received without timing out.
wait_for_acks() should default to infinite timeout since the original implementation was never timing out.
This commit is contained in:
@@ -208,8 +208,14 @@ public:
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Same as sync_produce but waits up to 'timeout' for acks to be received.
|
* \brief Same as sync_produce but waits up to 'timeout' for acks to be received.
|
||||||
|
*
|
||||||
|
* If retries are enabled, the timeout will limit the amount of time to wait
|
||||||
|
* before all retries are completed.
|
||||||
|
*
|
||||||
|
* \returns True if succeeded, false otherwise. If retries are enabled, false
|
||||||
|
* indicates there are still retries left.
|
||||||
*/
|
*/
|
||||||
void sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout);
|
bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* \brief Produces a message asynchronously without buffering it
|
* \brief Produces a message asynchronously without buffering it
|
||||||
@@ -592,6 +598,10 @@ private:
|
|||||||
void async_produce(BuilderType&& message, bool throw_on_error);
|
void async_produce(BuilderType&& message, bool throw_on_error);
|
||||||
static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex);
|
static void swap_queues(QueueType & queue1, QueueType & queue2, std::mutex & mutex);
|
||||||
|
|
||||||
|
// Static members
|
||||||
|
static const std::chrono::milliseconds infinite_timeout;
|
||||||
|
static const std::chrono::milliseconds no_timeout;
|
||||||
|
|
||||||
// Members
|
// Members
|
||||||
Producer producer_;
|
Producer producer_;
|
||||||
QueueType messages_;
|
QueueType messages_;
|
||||||
@@ -618,6 +628,14 @@ private:
|
|||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Full blocking wait as per RdKafka
|
||||||
|
template <typename BufferType, typename Allocator>
|
||||||
|
const std::chrono::milliseconds
|
||||||
|
BufferedProducer<BufferType, Allocator>::infinite_timeout = std::chrono::milliseconds(-1);
|
||||||
|
template <typename BufferType, typename Allocator>
|
||||||
|
const std::chrono::milliseconds
|
||||||
|
BufferedProducer<BufferType, Allocator>::no_timeout = std::chrono::milliseconds::zero();
|
||||||
|
|
||||||
template <typename BufferType>
|
template <typename BufferType>
|
||||||
Producer::PayloadPolicy get_default_payload_policy() {
|
Producer::PayloadPolicy get_default_payload_policy() {
|
||||||
return Producer::PayloadPolicy::COPY_PAYLOAD;
|
return Producer::PayloadPolicy::COPY_PAYLOAD;
|
||||||
@@ -669,11 +687,11 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
|
|||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
|
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
|
||||||
sync_produce(builder, producer_.get_timeout());
|
sync_produce(builder, infinite_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
|
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
|
||||||
std::chrono::milliseconds timeout) {
|
std::chrono::milliseconds timeout) {
|
||||||
if (enable_message_retries_) {
|
if (enable_message_retries_) {
|
||||||
//Adding a retry tracker requires copying the builder since
|
//Adding a retry tracker requires copying the builder since
|
||||||
@@ -682,17 +700,25 @@ void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
|
|||||||
MessageBuilder builder_clone(builder.clone());
|
MessageBuilder builder_clone(builder.clone());
|
||||||
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
|
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
|
||||||
// produce until we succeed or we reach max retry limit
|
// produce until we succeed or we reach max retry limit
|
||||||
|
auto endTime = std::chrono::steady_clock::now() + timeout;
|
||||||
do {
|
do {
|
||||||
tracker->prepare_to_retry();
|
tracker->prepare_to_retry();
|
||||||
produce_message(builder_clone);
|
produce_message(builder_clone);
|
||||||
wait_for_acks(timeout);
|
//Wait w/o timeout since we must get the ack to avoid a race condition.
|
||||||
|
//Otherwise retry_again() will block as the producer won't get flushed
|
||||||
|
//and the delivery callback will never be invoked.
|
||||||
|
wait_for_acks();
|
||||||
}
|
}
|
||||||
while (tracker->retry_again());
|
while (tracker->retry_again() &&
|
||||||
|
((timeout == infinite_timeout) ||
|
||||||
|
(std::chrono::steady_clock::now() >= endTime)));
|
||||||
|
return !tracker->has_retries_left();
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
// produce once
|
// produce once
|
||||||
produce_message(builder);
|
produce_message(builder);
|
||||||
wait_for_acks(timeout);
|
wait_for_acks(timeout);
|
||||||
|
return (pending_acks_ == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -703,12 +729,12 @@ void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
|
|||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::async_flush() {
|
void BufferedProducer<BufferType, Allocator>::async_flush() {
|
||||||
flush(std::chrono::milliseconds::zero(), false);
|
flush(no_timeout, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
|
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
|
||||||
flush(producer_.get_timeout(), preserve_order);
|
flush(infinite_timeout, preserve_order);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
@@ -749,7 +775,8 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
|
|||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
|
void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
|
||||||
wait_for_acks(producer_.get_timeout());
|
//block until all acks have been received
|
||||||
|
wait_for_acks(infinite_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename BufferType, typename Allocator>
|
template <typename BufferType, typename Allocator>
|
||||||
@@ -773,7 +800,8 @@ bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::millise
|
|||||||
// calculate remaining time
|
// calculate remaining time
|
||||||
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
|
remaining = timeout - std::chrono::duration_cast<std::chrono::milliseconds>
|
||||||
(std::chrono::high_resolution_clock::now() - start_time);
|
(std::chrono::high_resolution_clock::now() - start_time);
|
||||||
} while ((pending_acks_ > 0) && (remaining.count() > 0));
|
} while ((pending_acks_ > 0) &&
|
||||||
|
((remaining.count() > 0) || (timeout == infinite_timeout)));
|
||||||
return (pending_acks_ == 0);
|
return (pending_acks_ == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user