From 597c026555aa1a2aa3b26fb7e30bfa0137b62973 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Mon, 4 Jun 2018 17:12:07 -0400 Subject: [PATCH] Prevent MessageInternal structures if there is no delivery callback registered --- src/producer.cpp | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/producer.cpp b/src/producer.cpp index e533c35..d1c4ddd 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -67,7 +67,12 @@ void Producer::produce(const MessageBuilder& builder) { const Buffer& payload = builder.payload(); const Buffer& key = builder.key(); const int policy = static_cast(message_payload_policy_); - unique_ptr internal_data(new MessageInternal(builder.user_data(), builder.internal())); + void* opaque = builder.user_data(); + unique_ptr internal_data; + if (get_configuration().get_delivery_report_callback()) { + internal_data.reset(new MessageInternal(builder.user_data(), builder.internal())); + opaque = internal_data.get(); + } auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_PARTITION(builder.partition()), @@ -75,7 +80,7 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(internal_data.get()), + RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_END); check_error(result); internal_data.release(); //data has been passed-on to rdkafka so we release ownership @@ -86,7 +91,12 @@ void Producer::produce(const Message& message) { const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; - unique_ptr internal_data(new MessageInternal(message.get_user_data(), message.internal())); + void* opaque = message.get_user_data(); + unique_ptr internal_data; + if (get_configuration().get_delivery_report_callback()) { + internal_data.reset(new MessageInternal(message.get_user_data(), message.internal())); + opaque = internal_data.get(); + } auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), @@ -94,7 +104,7 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(internal_data.get()), + RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_END); check_error(result); internal_data.release(); //data has been passed-on to rdkafka so we release ownership