Added clarifications and comments to the BufferedProducer class

This commit is contained in:
Alexander Damian
2020-02-08 21:24:12 -05:00
parent 7d097df34d
commit e401e97b40

View File

@@ -79,7 +79,7 @@ namespace cppkafka {
* \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char> * \warning Payload Policy: For payload-owning BufferTypes such as std::string or std::vector<char>
* the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type * the default policy is set to Producer::PayloadPolicy::COPY_PAYLOAD. For the specific non-payload owning type
* cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka * cppkafka::Buffer the policy is Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD. In this case, librdkafka
* shall not make any internal copies of the message and it is the application's responsability to free * shall not make any internal copies of the message and it is the application's responsibility to free
* the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory * the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory
* corruptions. * corruptions.
*/ */
@@ -487,23 +487,72 @@ private:
enum class QueueKind { Retry, Regular }; enum class QueueKind { Retry, Regular };
enum class FlushAction { DontFlush, DoFlush }; enum class FlushAction { DontFlush, DoFlush };
// Simple RAII type which increments a counter on construction and
// decrements it on destruction, meant to be used as reference counting.
template <typename T> template <typename T>
struct CounterGuard{ struct CounterGuard{
CounterGuard(std::atomic<T>& counter) : counter_(counter) { ++counter_; } CounterGuard(std::atomic<T>& counter)
: counter_(counter) {
++counter_;
}
~CounterGuard() { --counter_; } ~CounterGuard() { --counter_; }
std::atomic<T>& counter_; std::atomic<T>& counter_;
}; };
// If the application enables retry logic, this object is passed
// as internal (opaque) data with each message, so that it can keep
// track of each failed attempt. Only a single tracker will be
// instantiated and it's lifetime will be the same as the message it
// belongs to.
struct Tracker : public Internal { struct Tracker : public Internal {
Tracker(SenderType sender, size_t num_retries) Tracker(SenderType sender, size_t num_retries)
: sender_(sender), num_retries_(num_retries) : sender_(sender),
{} num_retries_(num_retries) {
std::future<bool> get_new_future() {
should_retry_ = std::promise<bool>(); //reset shared data
return should_retry_.get_future(); //issue new future
} }
// Creates a new promise for synchronizing with the
// on_delivery_report() callback. For synchronous producers only.
void prepare_to_retry() {
if (sender_ == SenderType::Sync) {
retry_promise_ = std::promise<bool>();
}
}
// Waits for the on_delivery_report() callback and determines if this message
// should be retried. This call will block until on_delivery_report() executes.
// For synchronous producers only.
bool retry_again() {
if (sender_ == SenderType::Sync) {
return retry_promise_.get_future().get();
}
}
// Signal the synchronous producer if the message should be retried or not.
// Called from inside on_delivery_report(). For synchronous producers only.
void should_retry(bool value) const {
if (sender_ == SenderType::Sync) {
try {
retry_promise_.set_value(value);
}
catch (const std::future_error&) {
//Promise has already been set once.
}
}
}
void set_sender_type(SenderType type) {
sender_ = type;
}
SenderType get_sender_type() const {
return sender_;
}
bool has_retries_left() const {
return num_retries_ > 0;
}
void decrement_retries() {
if (num_retries_ > 0) {
--num_retries_;
}
}
private:
SenderType sender_; SenderType sender_;
std::promise<bool> should_retry_; mutable std::promise<bool> retry_promise_;
size_t num_retries_; size_t num_retries_;
}; };
using TrackerPtr = std::shared_ptr<Tracker>; using TrackerPtr = std::shared_ptr<Tracker>;
@@ -511,13 +560,20 @@ private:
// Returns existing tracker or creates new one // Returns existing tracker or creates new one
template <typename BuilderType> template <typename BuilderType>
TrackerPtr add_tracker(SenderType sender, BuilderType& builder) { TrackerPtr add_tracker(SenderType sender, BuilderType& builder) {
if (has_internal_data_) { if (enable_message_retries_) {
if (!builder.internal()) { if (!builder.internal()) {
// Add message tracker only if it hasn't been added before // Add message tracker only if it hasn't been added before
builder.internal(std::make_shared<Tracker>(sender, max_number_retries_)); builder.internal(std::make_shared<Tracker>(sender, max_number_retries_));
}
return std::static_pointer_cast<Tracker>(builder.internal()); return std::static_pointer_cast<Tracker>(builder.internal());
} }
// Return existing tracker
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
// Update the sender type. Since a message could have been initially produced
// asynchronously but then flushed synchronously (or vice-versa), the sender
// type should always reflect the latest retry mechanism.
tracker->set_sender_type(sender);
return tracker;
}
return nullptr; return nullptr;
} }
template <typename BuilderType> template <typename BuilderType>
@@ -549,7 +605,7 @@ private:
std::atomic<size_t> total_messages_produced_{0}; std::atomic<size_t> total_messages_produced_{0};
std::atomic<size_t> total_messages_dropped_{0}; std::atomic<size_t> total_messages_dropped_{0};
int max_number_retries_{0}; int max_number_retries_{0};
bool has_internal_data_{false}; bool enable_message_retries_{false};
QueueFullNotification queue_full_notification_{QueueFullNotification::None}; QueueFullNotification queue_full_notification_{QueueFullNotification::None};
#ifdef KAFKA_TEST_INSTANCE #ifdef KAFKA_TEST_INSTANCE
TestParameters* test_params_; TestParameters* test_params_;
@@ -586,12 +642,16 @@ void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder&
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) { void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
add_tracker(SenderType::Async, builder); add_tracker(SenderType::Async, builder);
//post message unto the producer queue
do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush); do_add_message(move(builder), QueueKind::Regular, FlushAction::DoFlush);
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& builder) { void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& builder) {
if (has_internal_data_) { if (enable_message_retries_) {
//Adding a retry tracker requires copying the builder since
//we cannot modify the original instance. Cloning is a fast operation
//since the MessageBuilder class holds pointers to data only.
MessageBuilder builder_clone(builder.clone()); MessageBuilder builder_clone(builder.clone());
add_tracker(SenderType::Async, builder_clone); add_tracker(SenderType::Async, builder_clone);
async_produce(builder_clone, true); async_produce(builder_clone, true);
@@ -603,17 +663,19 @@ void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& buil
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) { void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
if (has_internal_data_) { if (enable_message_retries_) {
//Adding a retry tracker requires copying the builder since
//we cannot modify the original instance. Cloning is a fast operation
//since the MessageBuilder class holds pointers to data only.
MessageBuilder builder_clone(builder.clone()); MessageBuilder builder_clone(builder.clone());
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone); TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
// produce until we succeed or we reach max retry limit // produce until we succeed or we reach max retry limit
std::future<bool> should_retry;
do { do {
should_retry = tracker->get_new_future(); tracker->prepare_to_retry();
produce_message(builder_clone); produce_message(builder_clone);
wait_for_acks(); wait_for_acks();
} }
while (should_retry.get()); while (tracker->retry_again());
} }
else { else {
// produce once // produce once
@@ -640,9 +702,14 @@ void BufferedProducer<BufferType, Allocator>::async_flush() {
flush_queue.pop_front(); flush_queue.pop_front();
} }
}; };
//Produce retry queue first since these messages were produced first
queue_flusher(retry_messages_, retry_mutex_); queue_flusher(retry_messages_, retry_mutex_);
//Produce recently enqueued messages
queue_flusher(messages_, mutex_); queue_flusher(messages_, mutex_);
wait_for_acks(std::chrono::milliseconds(0)); //flush the producer but don't wait // Flush the producer but don't wait. It is necessary to poll
// the producer at least once during this operation because
// async_produce() will not.
wait_for_acks(std::chrono::milliseconds(0));
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
@@ -653,16 +720,19 @@ void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
{ {
QueueType flush_queue; // flush from temporary queue QueueType flush_queue; // flush from temporary queue
swap_queues(queue, flush_queue, mutex); swap_queues(queue, flush_queue, mutex);
//Produce one message at a time and wait for acks until queue is empty
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
sync_produce(flush_queue.front()); sync_produce(flush_queue.front());
flush_queue.pop_front(); flush_queue.pop_front();
} }
}; };
//Produce retry queue first since these messages were produced first
queue_flusher(retry_messages_, retry_mutex_); queue_flusher(retry_messages_, retry_mutex_);
//Produce recently enqueued messages
queue_flusher(messages_, mutex_); queue_flusher(messages_, mutex_);
} }
else { else {
//Produce all messages at once then wait for acks.
async_flush(); async_flush();
wait_for_acks(); wait_for_acks();
} }
@@ -680,6 +750,7 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
auto queue_flusher = [this](QueueType& queue)->bool auto queue_flusher = [this](QueueType& queue)->bool
{ {
//Produce one message at a time and wait for acks
if (!queue.empty()) { if (!queue.empty()) {
sync_produce(queue.front()); sync_produce(queue.front());
queue.pop_front(); queue.pop_front();
@@ -698,20 +769,22 @@ bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds ti
(std::chrono::high_resolution_clock::now() - start_time); (std::chrono::high_resolution_clock::now() - start_time);
} while (remaining.count() > 0); } while (remaining.count() > 0);
// Re-enqueue remaining messages in original order // When timeout has expired, any remaining messages must be re-enqueue in their
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->void // original order so they can be flushed later.
auto re_enqueuer = [this](QueueType& src_queue, QueueType& dst_queue, std::mutex & mutex)->bool
{ {
if (!src_queue.empty()) { if (!src_queue.empty()) {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
dst_queue.insert(dst_queue.begin(), dst_queue.insert(dst_queue.begin(),
std::make_move_iterator(src_queue.begin()), std::make_move_iterator(src_queue.begin()),
std::make_move_iterator(src_queue.end())); std::make_move_iterator(src_queue.end()));
}
};
re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_);
re_enqueuer(flush_queue, messages_, mutex_);
return true; return true;
} }
return false;
};
return !re_enqueuer(retry_flush_queue, retry_messages_, retry_mutex_) &&
!re_enqueuer(flush_queue, messages_, mutex_);
}
else { else {
async_flush(); async_flush();
return wait_for_acks(timeout); return wait_for_acks(timeout);
@@ -820,10 +893,12 @@ void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& build
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
messages_.emplace_back(std::forward<BuilderType>(builder)); messages_.emplace_back(std::forward<BuilderType>(builder));
} }
// Flush the queues only if a regular message is added. Retry messages may be added // Flush the queues only if a regular message is added. Retry messages may be added
// from rdkafka callbacks, and flush/async_flush is a user-level call // from on_delivery_report() during which flush()/async_flush() cannot be called.
if (queue_kind == QueueKind::Regular && flush_action == FlushAction::DoFlush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)get_buffer_size())) { if (queue_kind == QueueKind::Regular &&
flush_action == FlushAction::DoFlush &&
(max_buffer_size_ >= 0) &&
(max_buffer_size_ <= (ssize_t)get_buffer_size())) {
if (flush_method_ == FlushMethod::Sync) { if (flush_method_ == FlushMethod::Sync) {
flush(); flush();
} }
@@ -865,8 +940,8 @@ size_t BufferedProducer<BufferType, Allocator>::get_flushes_in_progress() const
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_max_number_retries(size_t max_number_retries) { void BufferedProducer<BufferType, Allocator>::set_max_number_retries(size_t max_number_retries) {
if (!has_internal_data_ && (max_number_retries > 0)) { if (!enable_message_retries_ && (max_number_retries > 0)) {
has_internal_data_ = true; //enable once enable_message_retries_ = true; //enable once
} }
max_number_retries_ = max_number_retries; max_number_retries_ = max_number_retries;
} }
@@ -969,8 +1044,11 @@ void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builde
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_); CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(builder, ex.get_error())) { if (!callback || callback(builder, ex.get_error())) {
TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal()); TrackerPtr tracker = std::static_pointer_cast<Tracker>(builder.internal());
if (tracker && tracker->num_retries_ > 0) { if (tracker && tracker->has_retries_left()) {
--tracker->num_retries_; tracker->decrement_retries();
//Post message unto the retry queue. This queue has higher priority and will be
//flushed before the producer queue to preserve original message order.
//We don't flush now since we just had an error while producing.
do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush); do_add_message(std::forward<BuilderType>(builder), QueueKind::Retry, FlushAction::DontFlush);
return; return;
} }
@@ -995,24 +1073,30 @@ Configuration BufferedProducer<BufferType, Allocator>::prepare_configuration(Con
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& message) { void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& message) {
//Get tracker data
TestParameters* test_params = get_test_parameters(); TestParameters* test_params = get_test_parameters();
TrackerPtr tracker = has_internal_data_ ? //Get tracker if present
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->get_internal()) : nullptr; TrackerPtr tracker =
bool should_retry = false; enable_message_retries_ ?
std::static_pointer_cast<Tracker>(MessageInternal::load(const_cast<Message&>(message))->get_internal()) :
nullptr;
bool retry = false;
if (message.get_error() || (test_params && test_params->force_delivery_error_)) { if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
// We should produce this message again if we don't have a produce failure callback // We should produce this message again if we don't have a produce failure callback
// or we have one but it returns true // or we have one but it returns true (indicating error is re-tryable)
CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_); CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_);
if (!callback || callback(message)) { if (!callback || callback(message)) {
// Check if we have reached the maximum retry limit // Check if we have reached the maximum retry limit
if (tracker && tracker->num_retries_ > 0) { if (tracker && tracker->has_retries_left()) {
--tracker->num_retries_; tracker->decrement_retries();
if (tracker->sender_ == SenderType::Async) { //If the sender is asynchronous, the message is re-enqueued. If the sender is
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue) //synchronous, we simply notify via Tracker::should_retry() below.
if (tracker->get_sender_type() == SenderType::Async) {
//Post message unto the retry queue. This queue has higher priority and will be
//flushed later by the application (before the producer queue) to preserve original message order.
//We prevent flushing now since we are within a callback context.
do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush); do_add_message(Builder(message), QueueKind::Retry, FlushAction::DontFlush);
} }
should_retry = true; retry = true;
} }
else { else {
++total_messages_dropped_; ++total_messages_dropped_;
@@ -1032,14 +1116,9 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
// Increment the total successful transmissions // Increment the total successful transmissions
++total_messages_produced_; ++total_messages_produced_;
} }
// Signal producers // Signal synchronous sender and unblock it since it's waiting for this ack to arrive.
if (tracker) { if (tracker) {
try { tracker->should_retry(retry);
tracker->should_retry_.set_value(should_retry);
}
catch (const std::future_error& ex) {
//This is an async retry and future is not being read
}
} }
// Decrement the expected acks and check to prevent underflow // Decrement the expected acks and check to prevent underflow
if (pending_acks_ > 0) { if (pending_acks_ > 0) {
@@ -1048,7 +1127,9 @@ void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message&
} }
template <typename BufferType, typename Allocator> template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1, BufferedProducer<BufferType, Allocator>::QueueType & queue2, std::mutex & mutex) void BufferedProducer<BufferType, Allocator>::swap_queues(BufferedProducer<BufferType, Allocator>::QueueType & queue1,
BufferedProducer<BufferType, Allocator>::QueueType & queue2,
std::mutex & mutex)
{ {
std::lock_guard<std::mutex> lock(mutex); std::lock_guard<std::mutex> lock(mutex);
std::swap(queue1, queue2); std::swap(queue1, queue2);