Added logic to conditionally enable internal data

This commit is contained in:
accelerated
2018-06-05 09:07:00 -04:00
parent 597c026555
commit f746653841
8 changed files with 125 additions and 57 deletions

View File

@@ -44,6 +44,7 @@
#include <cppkafka/macros.h> #include <cppkafka/macros.h>
#include <cppkafka/message.h> #include <cppkafka/message.h>
#include <cppkafka/message_builder.h> #include <cppkafka/message_builder.h>
#include <cppkafka/message_internal.h>
#include <cppkafka/metadata.h> #include <cppkafka/metadata.h>
#include <cppkafka/producer.h> #include <cppkafka/producer.h>
#include <cppkafka/queue.h> #include <cppkafka/queue.h>

View File

@@ -35,6 +35,8 @@
namespace cppkafka { namespace cppkafka {
class Producer;
struct Internal { struct Internal {
virtual ~Internal() = default; virtual ~Internal() = default;
}; };
@@ -44,25 +46,11 @@ using InternalPtr = std::shared_ptr<Internal>;
* \brief Private message data structure * \brief Private message data structure
*/ */
class MessageInternal { class MessageInternal {
friend class Producer; friend Producer;
public: public:
static std::unique_ptr<MessageInternal> load(Message& message) { static std::unique_ptr<MessageInternal> load(const Producer& producer, Message& message);
if (message.get_user_data()) {
// Unpack internal data
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
message.load_internal(internal_data->user_data_, internal_data->internal_);
return internal_data;
}
return nullptr;
}
private: private:
MessageInternal(void* user_data, std::shared_ptr<Internal> internal) MessageInternal(void* user_data, std::shared_ptr<Internal> internal);
: user_data_(user_data),
internal_(internal) {
}
void* user_data_; void* user_data_;
InternalPtr internal_; InternalPtr internal_;
}; };

View File

@@ -31,12 +31,14 @@
#define CPPKAFKA_PRODUCER_H #define CPPKAFKA_PRODUCER_H
#include <memory> #include <memory>
#include <tuple>
#include "kafka_handle_base.h" #include "kafka_handle_base.h"
#include "configuration.h" #include "configuration.h"
#include "buffer.h" #include "buffer.h"
#include "topic.h" #include "topic.h"
#include "macros.h" #include "macros.h"
#include "message_builder.h" #include "message_builder.h"
#include "message_internal.h"
namespace cppkafka { namespace cppkafka {
@@ -78,6 +80,7 @@ class Message;
*/ */
class CPPKAFKA_API Producer : public KafkaHandleBase { class CPPKAFKA_API Producer : public KafkaHandleBase {
public: public:
friend MessageInternal;
/** /**
* The policy to use for the payload. The default policy is COPY_PAYLOAD * The policy to use for the payload. The default policy is COPY_PAYLOAD
*/ */
@@ -156,7 +159,11 @@ public:
*/ */
void flush(std::chrono::milliseconds timeout); void flush(std::chrono::milliseconds timeout);
private: private:
using LoadResult = std::tuple<void*, std::unique_ptr<MessageInternal>>;
LoadResult load_internal(void* user_data, InternalPtr internal);
PayloadPolicy message_payload_policy_; PayloadPolicy message_payload_policy_;
bool has_internal_data_;
}; };
} // cppkafka } // cppkafka

View File

@@ -362,6 +362,21 @@ private:
std::promise<bool> should_retry_; std::promise<bool> should_retry_;
size_t num_retries_; size_t num_retries_;
}; };
using TrackerPtr = std::shared_ptr<Tracker>;
template <typename BuilderType>
TrackerPtr add_tracker(BuilderType& builder) {
if (!has_internal_data_ && (max_number_retries_ > 0)) {
has_internal_data_ = true; //enable once
}
if (has_internal_data_) {
// Add message tracker
TrackerPtr tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
builder.internal(tracker);
return tracker;
}
return nullptr;
}
template <typename BuilderType> template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
@@ -385,7 +400,8 @@ private:
std::atomic<size_t> flushes_in_progress_{0}; std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0}; std::atomic<size_t> total_messages_produced_{0};
std::atomic<size_t> total_messages_dropped_{0}; std::atomic<size_t> total_messages_dropped_{0};
int max_number_retries_{5}; int max_number_retries_{0};
bool has_internal_data_{false};
#ifdef KAFKA_TEST_INSTANCE #ifdef KAFKA_TEST_INSTANCE
TestParameters* test_params_; TestParameters* test_params_;
#endif #endif
@@ -412,40 +428,40 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) { void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
// Add message tracker add_tracker(const_cast<MessageBuilder&>(builder));
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<MessageBuilder&>(builder).internal(tracker);
do_add_message(builder, MessagePriority::Low, true); do_add_message(builder, MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) { void BufferedProducer<BufferType>::add_message(Builder builder) {
// Add message tracker add_tracker(builder);
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<Builder&>(builder).internal(tracker);
do_add_message(move(builder), MessagePriority::Low, true); do_add_message(move(builder), MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) { void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
// Add message tracker add_tracker(const_cast<MessageBuilder&>(builder));
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<MessageBuilder&>(builder).internal(tracker);
async_produce(builder, true); async_produce(builder, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) { void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
// Add message tracker TrackerPtr tracker = add_tracker(const_cast<MessageBuilder&>(builder));
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_); if (tracker) {
const_cast<MessageBuilder&>(builder).internal(tracker); // produce until we succeed or we reach max retry limit
std::future<bool> should_retry; std::future<bool> should_retry;
do { do {
should_retry = tracker->get_new_future(); should_retry = tracker->get_new_future();
produce_message(builder);
wait_for_acks();
}
while (should_retry.get());
}
else {
// produce once
produce_message(builder); produce_message(builder);
wait_for_acks(); wait_for_acks();
} }
while (should_retry.get());
} }
template <typename BufferType> template <typename BufferType>
@@ -634,8 +650,8 @@ void BufferedProducer<BufferType>::async_produce(MessageType&& message, bool thr
// If we have a flush failure callback and it returns true, we retry producing this message later // If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_); CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) { if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) {
std::shared_ptr<Tracker> tracker = std::static_pointer_cast<Tracker>(message.internal()); TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
if (tracker->num_retries_ > 0) { if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_; --tracker->num_retries_;
do_add_message(std::forward<MessageType>(message), MessagePriority::High, false); do_add_message(std::forward<MessageType>(message), MessagePriority::High, false);
return; return;
@@ -660,7 +676,7 @@ template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) { void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
//Get tracker data //Get tracker data
TestParameters* test_params = get_test_parameters(); TestParameters* test_params = get_test_parameters();
std::shared_ptr<Tracker> tracker = std::static_pointer_cast<Tracker>(message.internal()); TrackerPtr tracker = std::static_pointer_cast<Tracker>(message.internal());
bool should_retry = false; bool should_retry = false;
if (message.get_error() || (test_params && test_params->force_delivery_error_)) { if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
// We should produce this message again if we don't have a produce failure callback // We should produce this message again if we don't have a produce failure callback
@@ -668,7 +684,7 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_); CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_);
if (!callback || callback(message)) { if (!callback || callback(message)) {
// Check if we have reached the maximum retry limit // Check if we have reached the maximum retry limit
if (tracker->num_retries_ > 0) { if (tracker && tracker->num_retries_ > 0) {
--tracker->num_retries_; --tracker->num_retries_;
if (tracker->sender_ == SenderType::Async) { if (tracker->sender_ == SenderType::Async) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue) // Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
@@ -691,7 +707,9 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
++total_messages_produced_; ++total_messages_produced_;
} }
// Signal producers // Signal producers
tracker->should_retry_.set_value(should_retry); if (tracker) {
tracker->should_retry_.set_value(should_retry);
}
// Decrement the expected acks // Decrement the expected acks
--pending_acks_; --pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow assert(pending_acks_ != (size_t)-1); // Prevent underflow

View File

@@ -7,6 +7,7 @@ set(SOURCES
buffer.cpp buffer.cpp
queue.cpp queue.cpp
message.cpp message.cpp
message_internal.cpp
topic_partition.cpp topic_partition.cpp
topic_partition_list.cpp topic_partition_list.cpp
metadata.cpp metadata.cpp

View File

@@ -52,7 +52,7 @@ namespace cppkafka {
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
Producer* handle = static_cast<Producer*>(opaque); Producer* handle = static_cast<Producer*>(opaque);
Message message = Message::make_non_owning((rd_kafka_message_t*)msg); Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
unique_ptr<MessageInternal> internal_data(MessageInternal::load(message)); unique_ptr<MessageInternal> internal_data(MessageInternal::load(*handle, message));
CallbackInvoker<Configuration::DeliveryReportCallback> CallbackInvoker<Configuration::DeliveryReportCallback>
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle) ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
(*handle, message); (*handle, message);

49
src/message_internal.cpp Normal file
View File

@@ -0,0 +1,49 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "message_internal.h"
#include "producer.h"
namespace cppkafka {
MessageInternal::MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
: user_data_(user_data),
internal_(internal) {
}
std::unique_ptr<MessageInternal> MessageInternal::load(const Producer& producer, Message& message) {
if (producer.has_internal_data_ && message.get_user_data()) {
// Unpack internal data
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
message.load_internal(internal_data->user_data_, internal_data->internal_);
return internal_data;
}
return nullptr;
}
}

View File

@@ -37,11 +37,13 @@ using std::move;
using std::string; using std::string;
using std::chrono::milliseconds; using std::chrono::milliseconds;
using std::unique_ptr; using std::unique_ptr;
using std::get;
namespace cppkafka { namespace cppkafka {
Producer::Producer(Configuration config) Producer::Producer(Configuration config)
: KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD) { : KafkaHandleBase(move(config)), message_payload_policy_(PayloadPolicy::COPY_PAYLOAD),
has_internal_data_(false) {
char error_buffer[512]; char error_buffer[512];
auto config_handle = get_configuration().get_handle(); auto config_handle = get_configuration().get_handle();
rd_kafka_conf_set_opaque(config_handle, this); rd_kafka_conf_set_opaque(config_handle, this);
@@ -67,12 +69,7 @@ void Producer::produce(const MessageBuilder& builder) {
const Buffer& payload = builder.payload(); const Buffer& payload = builder.payload();
const Buffer& key = builder.key(); const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_); const int policy = static_cast<int>(message_payload_policy_);
void* opaque = builder.user_data(); LoadResult load_result = load_internal(builder.user_data(), builder.internal());
unique_ptr<MessageInternal> internal_data;
if (get_configuration().get_delivery_report_callback()) {
internal_data.reset(new MessageInternal(builder.user_data(), builder.internal()));
opaque = internal_data.get();
}
auto result = rd_kafka_producev(get_handle(), auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()), RD_KAFKA_V_PARTITION(builder.partition()),
@@ -80,10 +77,10 @@ void Producer::produce(const MessageBuilder& builder) {
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_OPAQUE(get<0>(load_result)),
RD_KAFKA_V_END); RD_KAFKA_V_END);
check_error(result); check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
} }
void Producer::produce(const Message& message) { void Producer::produce(const Message& message) {
@@ -91,12 +88,7 @@ void Producer::produce(const Message& message) {
const Buffer& key = message.get_key(); const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_); const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
void* opaque = message.get_user_data(); LoadResult load_result = load_internal(message.get_user_data(), message.internal());
unique_ptr<MessageInternal> internal_data;
if (get_configuration().get_delivery_report_callback()) {
internal_data.reset(new MessageInternal(message.get_user_data(), message.internal()));
opaque = internal_data.get();
}
auto result = rd_kafka_producev(get_handle(), auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()), RD_KAFKA_V_PARTITION(message.get_partition()),
@@ -104,10 +96,10 @@ void Producer::produce(const Message& message) {
RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_OPAQUE(get<0>(load_result)),
RD_KAFKA_V_END); RD_KAFKA_V_END);
check_error(result); check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership get<1>(load_result).release(); //data has been passed-on to rdkafka so we release ownership
} }
int Producer::poll() { int Producer::poll() {
@@ -127,4 +119,16 @@ void Producer::flush(milliseconds timeout) {
check_error(result); check_error(result);
} }
Producer::LoadResult Producer::load_internal(void* user_data, InternalPtr internal) {
unique_ptr<MessageInternal> internal_data;
if (!has_internal_data_ && internal) {
has_internal_data_ = true; //enable once for this producer
}
if (has_internal_data_ && get_configuration().get_delivery_report_callback()) {
internal_data.reset(new MessageInternal(user_data, internal));
user_data = internal_data.get(); //point to the internal data
}
return LoadResult(user_data, move(internal_data));
}
} // cppkafka } // cppkafka