Call flush termination callbacks from sync_produce

This commit is contained in:
Alexander Damian
2020-08-31 21:05:10 -04:00
parent f117720f66
commit 8cfd4595f6

View File

@@ -691,6 +691,7 @@ private:
void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action); void do_add_message(BuilderType&& builder, QueueKind queue_kind, FlushAction flush_action);
template <typename BuilderType> template <typename BuilderType>
void produce_message(BuilderType&& builder); void produce_message(BuilderType&& builder);
bool sync_produce(const MessageBuilder& builder, std::chrono::milliseconds timeout, bool throw_on_error);
Configuration prepare_configuration(Configuration config); Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message); void on_delivery_report(const Message& message);
template <typename BuilderType> template <typename BuilderType>
@@ -787,12 +788,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) { void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
sync_produce(builder, infinite_timeout); sync_produce(builder, infinite_timeout, true);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder, bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
std::chrono::milliseconds timeout) { std::chrono::milliseconds timeout) {
return sync_produce(builder, infinite_timeout, true);
}
template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder,
std::chrono::milliseconds timeout,
bool throw_on_error) {
if (enable_message_retries_) { if (enable_message_retries_) {
//Adding a retry tracker requires copying the builder since //Adding a retry tracker requires copying the builder since
//we cannot modify the original instance. Cloning is a fast operation //we cannot modify the original instance. Cloning is a fast operation
@@ -802,6 +810,7 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
// produce until we succeed or we reach max retry limit // produce until we succeed or we reach max retry limit
auto endTime = std::chrono::steady_clock::now() + timeout; auto endTime = std::chrono::steady_clock::now() + timeout;
do { do {
try {
tracker->prepare_to_retry(); tracker->prepare_to_retry();
produce_message(builder_clone); produce_message(builder_clone);
//Wait w/o timeout since we must get the ack to avoid a race condition. //Wait w/o timeout since we must get the ack to avoid a race condition.
@@ -809,6 +818,25 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
//and the delivery callback will never be invoked. //and the delivery callback will never be invoked.
wait_for_current_thread_acks(); wait_for_current_thread_acks();
} }
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(builder, ex.get_error())) {
if (tracker && tracker->has_retries_left()) {
tracker->decrement_retries();
continue;
}
}
++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) {
throw;
}
break;
}
}
while (tracker->retry_again() && while (tracker->retry_again() &&
((timeout == infinite_timeout) || ((timeout == infinite_timeout) ||
(std::chrono::steady_clock::now() >= endTime))); (std::chrono::steady_clock::now() >= endTime)));
@@ -816,10 +844,22 @@ bool BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder&
} }
else { else {
// produce once // produce once
try {
produce_message(builder); produce_message(builder);
wait_for_current_thread_acks(timeout); wait_for_current_thread_acks(timeout);
return !ack_monitor_.has_current_thread_pending_acks(); return !ack_monitor_.has_current_thread_pending_acks();
} }
catch (const HandleException& ex) {
++total_messages_dropped_;
// Call the flush termination callback
CallbackInvoker<FlushTerminationCallback>("flush termination", flush_termination_callback_, &producer_)
(builder, ex.get_error());
if (throw_on_error) {
throw;
}
}
}
return false;
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
@@ -851,7 +891,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
if (preserve_order) { if (preserve_order) {
//When preserving order, we must ensure that each message //When preserving order, we must ensure that each message
//gets delivered before producing the next one. //gets delivered before producing the next one.
sync_produce(flush_queue.front(), timeout); sync_produce(flush_queue.front(), timeout, false);
} }
else { else {
//Produce as fast as possible w/o waiting. If one or more //Produce as fast as possible w/o waiting. If one or more