Prevent MessageInternal structures if there is no delivery callback registered

This commit is contained in:
accelerated
2018-06-04 17:12:07 -04:00
parent 71e6e2e4e5
commit 597c026555

View File

@@ -67,7 +67,12 @@ void Producer::produce(const MessageBuilder& builder) {
const Buffer& payload = builder.payload();
const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_);
unique_ptr<MessageInternal> internal_data(new MessageInternal(builder.user_data(), builder.internal()));
void* opaque = builder.user_data();
unique_ptr<MessageInternal> internal_data;
if (get_configuration().get_delivery_report_callback()) {
internal_data.reset(new MessageInternal(builder.user_data(), builder.internal()));
opaque = internal_data.get();
}
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()),
@@ -75,7 +80,7 @@ void Producer::produce(const MessageBuilder& builder) {
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(internal_data.get()),
RD_KAFKA_V_OPAQUE(opaque),
RD_KAFKA_V_END);
check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
@@ -86,7 +91,12 @@ void Producer::produce(const Message& message) {
const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
unique_ptr<MessageInternal> internal_data(new MessageInternal(message.get_user_data(), message.internal()));
void* opaque = message.get_user_data();
unique_ptr<MessageInternal> internal_data;
if (get_configuration().get_delivery_report_callback()) {
internal_data.reset(new MessageInternal(message.get_user_data(), message.internal()));
opaque = internal_data.get();
}
auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()),
@@ -94,7 +104,7 @@ void Producer::produce(const Message& message) {
RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(internal_data.get()),
RD_KAFKA_V_OPAQUE(opaque),
RD_KAFKA_V_END);
check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership