From a4eefacaa1220ecce3964fcff240a28939c7f616 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Sun, 10 Jun 2018 10:14:00 -0400 Subject: [PATCH] concurrency issues in MessageBuilder internal data --- include/cppkafka/message_builder.h | 9 ++++++ include/cppkafka/message_internal.h | 12 ++++++-- include/cppkafka/utils/buffered_producer.h | 32 +++++++++++++--------- src/message.cpp | 4 +-- src/message_internal.cpp | 8 ++++++ 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 64e8d54..d09a602 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -348,6 +348,15 @@ public: void construct_buffer(Buffer& lhs, const T& rhs) { lhs = Buffer(rhs); } + + MessageBuilder clone() const { + return std::move(MessageBuilder(topic()). + key(Buffer(key().get_data(), key().get_size())). + payload(Buffer(payload().get_data(), payload().get_size())). + timestamp(timestamp()). + user_data(user_data()). + internal(internal())); + } }; /** diff --git a/include/cppkafka/message_internal.h b/include/cppkafka/message_internal.h index 266e145..06e9953 100644 --- a/include/cppkafka/message_internal.h +++ b/include/cppkafka/message_internal.h @@ -36,7 +36,8 @@ namespace cppkafka { class Message; -struct Internal { +class Internal { +public: virtual ~Internal() = default; }; using InternalPtr = std::shared_ptr; @@ -44,15 +45,20 @@ using InternalPtr = std::shared_ptr; /** * \brief Private message data structure */ -struct MessageInternal { +class MessageInternal { +public: MessageInternal(void* user_data, std::shared_ptr internal); static std::unique_ptr load(Message& message); + void* get_user_data() const; + InternalPtr get_internal() const; +private: void* user_data_; InternalPtr internal_; }; template -struct MessageInternalGuard { +class MessageInternalGuard { +public: MessageInternalGuard(BuilderType& builder) : builder_(builder), user_data_(builder.user_data()) { diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index bf4a2ea..347ca68 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -366,9 +366,6 @@ private: template TrackerPtr add_tracker(BuilderType& builder) { - if (!has_internal_data_ && (max_number_retries_ > 0)) { - has_internal_data_ = true; //enable once - } if (has_internal_data_ && !builder.internal()) { // Add message tracker only if it hasn't been added before TrackerPtr tracker = std::make_shared(SenderType::Async, max_number_retries_); @@ -426,8 +423,7 @@ BufferedProducer::BufferedProducer(Configuration config) template void BufferedProducer::add_message(const MessageBuilder& builder) { - add_tracker(const_cast(builder)); - do_add_message(builder, MessagePriority::Low, true); + add_message(Builder(builder)); //make ConcreteBuilder } template @@ -438,19 +434,26 @@ void BufferedProducer::add_message(Builder builder) { template void BufferedProducer::produce(const MessageBuilder& builder) { - add_tracker(const_cast(builder)); - async_produce(builder, true); + if (has_internal_data_) { + MessageBuilder builder_copy(builder.clone()); + add_tracker(builder_copy); + async_produce(builder_copy, true); + } + else { + async_produce(builder, true); + } } template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - TrackerPtr tracker = add_tracker(const_cast(builder)); - if (tracker) { + if (has_internal_data_) { + MessageBuilder builder_copy(builder.clone()); + TrackerPtr tracker = add_tracker(builder_copy); // produce until we succeed or we reach max retry limit std::future should_retry; do { should_retry = tracker->get_new_future(); - produce_message(builder); + produce_message(builder_copy); wait_for_acks(); } while (should_retry.get()); @@ -576,6 +579,9 @@ size_t BufferedProducer::get_flushes_in_progress() const { template void BufferedProducer::set_max_number_retries(size_t max_number_retries) { + if (!has_internal_data_ && (max_number_retries > 0)) { + has_internal_data_ = true; //enable once + } max_number_retries_ = max_number_retries; } @@ -638,12 +644,12 @@ void BufferedProducer::async_produce(BuilderType&& builder, bool thr if (test_params && test_params->force_produce_error_) { throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN)); } - produce_message(std::forward(builder)); + produce_message(builder); } catch (const HandleException& ex) { // If we have a flush failure callback and it returns true, we retry producing this message later CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); - if (!callback || callback(std::forward(builder), ex.get_error())) { + if (!callback || callback(builder, ex.get_error())) { TrackerPtr tracker = std::static_pointer_cast(builder.internal()); if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; @@ -671,7 +677,7 @@ void BufferedProducer::on_delivery_report(const Message& message) { //Get tracker data TestParameters* test_params = get_test_parameters(); TrackerPtr tracker = has_internal_data_ ? - std::static_pointer_cast(MessageInternal::load(const_cast(message))->internal_) : nullptr; + std::static_pointer_cast(MessageInternal::load(const_cast(message))->get_internal()) : nullptr; bool should_retry = false; if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback diff --git a/src/message.cpp b/src/message.cpp index d2b3dbb..3ac6c07 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -68,8 +68,8 @@ Message::Message(HandlePtr handle) Message& Message::load_internal() { if (user_data_) { MessageInternal* mi = static_cast(user_data_); - user_data_ = mi->user_data_; - internal_ = mi->internal_; + user_data_ = mi->get_user_data(); + internal_ = mi->get_internal(); } return *this; } diff --git a/src/message_internal.cpp b/src/message_internal.cpp index c33d469..012b651 100644 --- a/src/message_internal.cpp +++ b/src/message_internal.cpp @@ -45,4 +45,12 @@ std::unique_ptr MessageInternal::load(Message& message) { static_cast(message.get_handle()->_private) : nullptr); } +void* MessageInternal::get_user_data() const { + return user_data_; +} + +InternalPtr MessageInternal::get_internal() const { + return internal_; +} + }