From f74665384158a3279c1d24338dfc1f4910ce06bc Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Tue, 5 Jun 2018 09:07:00 -0400 Subject: [PATCH] Added logic to conditionally enable internal data --- include/cppkafka/cppkafka.h | 1 + include/cppkafka/message_internal.h | 22 ++------ include/cppkafka/producer.h | 7 +++ include/cppkafka/utils/buffered_producer.h | 62 ++++++++++++++-------- src/CMakeLists.txt | 1 + src/configuration.cpp | 2 +- src/message_internal.cpp | 49 +++++++++++++++++ src/producer.cpp | 38 +++++++------ 8 files changed, 125 insertions(+), 57 deletions(-) create mode 100644 src/message_internal.cpp diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 2473d1d..c1c6885 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include diff --git a/include/cppkafka/message_internal.h b/include/cppkafka/message_internal.h index 1e7402f..a71add1 100644 --- a/include/cppkafka/message_internal.h +++ b/include/cppkafka/message_internal.h @@ -35,6 +35,8 @@ namespace cppkafka { +class Producer; + struct Internal { virtual ~Internal() = default; }; @@ -44,25 +46,11 @@ using InternalPtr = std::shared_ptr; * \brief Private message data structure */ class MessageInternal { - friend class Producer; - + friend Producer; public: - static std::unique_ptr load(Message& message) { - if (message.get_user_data()) { - // Unpack internal data - std::unique_ptr internal_data(static_cast(message.get_user_data())); - message.load_internal(internal_data->user_data_, internal_data->internal_); - return internal_data; - } - return nullptr; - } - + static std::unique_ptr load(const Producer& producer, Message& message); private: - MessageInternal(void* user_data, std::shared_ptr internal) - : user_data_(user_data), - internal_(internal) { - } - + MessageInternal(void* user_data, std::shared_ptr internal); void* user_data_; InternalPtr internal_; }; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 358a0fc..a545c95 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -31,12 +31,14 @@ #define CPPKAFKA_PRODUCER_H #include +#include #include "kafka_handle_base.h" #include "configuration.h" #include "buffer.h" #include "topic.h" #include "macros.h" #include "message_builder.h" +#include "message_internal.h" namespace cppkafka { @@ -78,6 +80,7 @@ class Message; */ class CPPKAFKA_API Producer : public KafkaHandleBase { public: + friend MessageInternal; /** * The policy to use for the payload. The default policy is COPY_PAYLOAD */ @@ -156,7 +159,11 @@ public: */ void flush(std::chrono::milliseconds timeout); private: + using LoadResult = std::tuple>; + LoadResult load_internal(void* user_data, InternalPtr internal); + PayloadPolicy message_payload_policy_; + bool has_internal_data_; }; } // cppkafka diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 35eaf39..cec143c 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -362,6 +362,21 @@ private: std::promise should_retry_; size_t num_retries_; }; + using TrackerPtr = std::shared_ptr; + + template + TrackerPtr add_tracker(BuilderType& builder) { + if (!has_internal_data_ && (max_number_retries_ > 0)) { + has_internal_data_ = true; //enable once + } + if (has_internal_data_) { + // Add message tracker + TrackerPtr tracker = std::make_shared(SenderType::Async, max_number_retries_); + builder.internal(tracker); + return tracker; + } + return nullptr; + } template void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); @@ -385,7 +400,8 @@ private: std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; std::atomic total_messages_dropped_{0}; - int max_number_retries_{5}; + int max_number_retries_{0}; + bool has_internal_data_{false}; #ifdef KAFKA_TEST_INSTANCE TestParameters* test_params_; #endif @@ -412,40 +428,40 @@ BufferedProducer::BufferedProducer(Configuration config) template void BufferedProducer::add_message(const MessageBuilder& builder) { - // Add message tracker - std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); - const_cast(builder).internal(tracker); + add_tracker(const_cast(builder)); do_add_message(builder, MessagePriority::Low, true); } template void BufferedProducer::add_message(Builder builder) { - // Add message tracker - std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); - const_cast(builder).internal(tracker); + add_tracker(builder); do_add_message(move(builder), MessagePriority::Low, true); } template void BufferedProducer::produce(const MessageBuilder& builder) { - // Add message tracker - std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); - const_cast(builder).internal(tracker); + add_tracker(const_cast(builder)); async_produce(builder, true); } template void BufferedProducer::sync_produce(const MessageBuilder& builder) { - // Add message tracker - std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); - const_cast(builder).internal(tracker); - std::future should_retry; - do { - should_retry = tracker->get_new_future(); + TrackerPtr tracker = add_tracker(const_cast(builder)); + if (tracker) { + // produce until we succeed or we reach max retry limit + std::future should_retry; + do { + should_retry = tracker->get_new_future(); + produce_message(builder); + wait_for_acks(); + } + while (should_retry.get()); + } + else { + // produce once produce_message(builder); wait_for_acks(); } - while (should_retry.get()); } template @@ -634,8 +650,8 @@ void BufferedProducer::async_produce(MessageType&& message, bool thr // If we have a flush failure callback and it returns true, we retry producing this message later CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); if (!callback || callback(std::forward(message), ex.get_error())) { - std::shared_ptr tracker = std::static_pointer_cast(message.internal()); - if (tracker->num_retries_ > 0) { + TrackerPtr tracker = std::static_pointer_cast(message.internal()); + if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; do_add_message(std::forward(message), MessagePriority::High, false); return; @@ -660,7 +676,7 @@ template void BufferedProducer::on_delivery_report(const Message& message) { //Get tracker data TestParameters* test_params = get_test_parameters(); - std::shared_ptr tracker = std::static_pointer_cast(message.internal()); + TrackerPtr tracker = std::static_pointer_cast(message.internal()); bool should_retry = false; if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback @@ -668,7 +684,7 @@ void BufferedProducer::on_delivery_report(const Message& message) { CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); if (!callback || callback(message)) { // Check if we have reached the maximum retry limit - if (tracker->num_retries_ > 0) { + if (tracker && tracker->num_retries_ > 0) { --tracker->num_retries_; if (tracker->sender_ == SenderType::Async) { // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) @@ -691,7 +707,9 @@ void BufferedProducer::on_delivery_report(const Message& message) { ++total_messages_produced_; } // Signal producers - tracker->should_retry_.set_value(should_retry); + if (tracker) { + tracker->should_retry_.set_value(should_retry); + } // Decrement the expected acks --pending_acks_; assert(pending_acks_ != (size_t)-1); // Prevent underflow diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2e893a8..1525b1c 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ set(SOURCES buffer.cpp queue.cpp message.cpp + message_internal.cpp topic_partition.cpp topic_partition_list.cpp metadata.cpp diff --git a/src/configuration.cpp b/src/configuration.cpp index 43a2851..1783660 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -52,7 +52,7 @@ namespace cppkafka { void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { Producer* handle = static_cast(opaque); Message message = Message::make_non_owning((rd_kafka_message_t*)msg); - unique_ptr internal_data(MessageInternal::load(message)); + unique_ptr internal_data(MessageInternal::load(*handle, message)); CallbackInvoker ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle) (*handle, message); diff --git a/src/message_internal.cpp b/src/message_internal.cpp new file mode 100644 index 0000000..a385377 --- /dev/null +++ b/src/message_internal.cpp @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#include "message_internal.h" +#include "producer.h" + +namespace cppkafka { + +MessageInternal::MessageInternal(void* user_data, std::shared_ptr internal) +: user_data_(user_data), + internal_(internal) { +} + +std::unique_ptr MessageInternal::load(const Producer& producer, Message& message) { + if (producer.has_internal_data_ && message.get_user_data()) { + // Unpack internal data + std::unique_ptr internal_data(static_cast(message.get_user_data())); + message.load_internal(internal_data->user_data_, internal_data->internal_); + return internal_data; + } + return nullptr; +} + +} diff --git a/src/producer.cpp b/src/producer.cpp index d1c4ddd..bb8affb 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -37,11 +37,13 @@ using std::move; using std::string; using std::chrono::milliseconds; using std::unique_ptr; +using std::get; namespace cppkafka { Producer::Producer(Configuration config) -: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD) { +: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD), + has_internal_data_(false) { char error_buffer[512]; auto config_handle = get_configuration().get_handle(); rd_kafka_conf_set_opaque(config_handle, this); @@ -67,12 +69,7 @@ void Producer::produce(const MessageBuilder& builder) { const Buffer& payload = builder.payload(); const Buffer& key = builder.key(); const int policy = static_cast(message_payload_policy_); - void* opaque = builder.user_data(); - unique_ptr internal_data; - if (get_configuration().get_delivery_report_callback()) { - internal_data.reset(new MessageInternal(builder.user_data(), builder.internal())); - opaque = internal_data.get(); - } + LoadResult load_result = load_internal(builder.user_data(), builder.internal()); auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_PARTITION(builder.partition()), @@ -80,10 +77,10 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(opaque), + RD_KAFKA_V_OPAQUE(get<0>(load_result)), RD_KAFKA_V_END); check_error(result); - internal_data.release(); //data has been passed-on to rdkafka so we release ownership + get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership } void Producer::produce(const Message& message) { @@ -91,12 +88,7 @@ void Producer::produce(const Message& message) { const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; - void* opaque = message.get_user_data(); - unique_ptr internal_data; - if (get_configuration().get_delivery_report_callback()) { - internal_data.reset(new MessageInternal(message.get_user_data(), message.internal())); - opaque = internal_data.get(); - } + LoadResult load_result = load_internal(message.get_user_data(), message.internal()); auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), @@ -104,10 +96,10 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(opaque), + RD_KAFKA_V_OPAQUE(get<0>(load_result)), RD_KAFKA_V_END); check_error(result); - internal_data.release(); //data has been passed-on to rdkafka so we release ownership + get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership } int Producer::poll() { @@ -127,4 +119,16 @@ void Producer::flush(milliseconds timeout) { check_error(result); } +Producer::LoadResult Producer::load_internal(void* user_data, InternalPtr internal) { + unique_ptr internal_data; + if (!has_internal_data_ && internal) { + has_internal_data_ = true; //enable once for this producer + } + if (has_internal_data_ && get_configuration().get_delivery_report_callback()) { + internal_data.reset(new MessageInternal(user_data, internal)); + user_data = internal_data.get(); //point to the internal data + } + return LoadResult(user_data, move(internal_data)); +} + } // cppkafka