added retry logic for producers

This commit is contained in:
accelerated
2018-06-01 15:55:52 -04:00
parent f15b59cb13
commit 71e6e2e4e5
9 changed files with 431 additions and 41 deletions

View File

@@ -145,7 +145,7 @@ public:
Configuration& set_default_topic_configuration(TopicConfiguration config); Configuration& set_default_topic_configuration(TopicConfiguration config);
/** /**
* Returns true iff the given property name has been set * Returns true if the given property name has been set
*/ */
bool has_property(const std::string& name) const; bool has_property(const std::string& name) const;

View File

@@ -43,6 +43,7 @@
namespace cppkafka { namespace cppkafka {
class MessageTimestamp; class MessageTimestamp;
struct Internal;
/** /**
* \brief Thin wrapper over a rdkafka message handle * \brief Thin wrapper over a rdkafka message handle
@@ -56,6 +57,8 @@ class MessageTimestamp;
*/ */
class CPPKAFKA_API Message { class CPPKAFKA_API Message {
public: public:
friend class MessageInternal;
using InternalPtr = std::shared_ptr<Internal>;
/** /**
* Constructs a message that won't take ownership of the given pointer * Constructs a message that won't take ownership of the given pointer
*/ */
@@ -134,14 +137,13 @@ public:
} }
/** /**
* \brief Gets the private data. * \brief Gets the private user data.
* *
* This should only be used on messages produced by a Producer that were set a private data * This should only be used on messages produced by a Producer that were set a private data
* attribute * attribute
*/ */
void* get_user_data() const { void* get_user_data() const {
assert(handle_); return user_data_;
return handle_->_private;
} }
/** /**
@@ -164,6 +166,13 @@ public:
rd_kafka_message_t* get_handle() const { rd_kafka_message_t* get_handle() const {
return handle_.get(); return handle_.get();
} }
/**
* Internal private const data accessor (internal use only)
*/
InternalPtr internal() const {
return internal_;
}
private: private:
using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>; using HandlePtr = std::unique_ptr<rd_kafka_message_t, decltype(&rd_kafka_message_destroy)>;
@@ -171,10 +180,13 @@ private:
Message(rd_kafka_message_t* handle, NonOwningTag); Message(rd_kafka_message_t* handle, NonOwningTag);
Message(HandlePtr handle); Message(HandlePtr handle);
void load_internal(void* user_data, InternalPtr internal);
HandlePtr handle_; HandlePtr handle_;
Buffer payload_; Buffer payload_;
Buffer key_; Buffer key_;
void* user_data_;
InternalPtr internal_;
}; };
using MessageList = std::vector<Message>; using MessageList = std::vector<Message>;

View File

@@ -166,6 +166,13 @@ public:
* Gets the message's user data pointer * Gets the message's user data pointer
*/ */
void* user_data() const; void* user_data() const;
/**
* Private data accessor (internal use only)
*/
Message::InternalPtr internal() const;
Concrete& internal(Message::InternalPtr internal);
private: private:
void construct_buffer(BufferType& lhs, const BufferType& rhs); void construct_buffer(BufferType& lhs, const BufferType& rhs);
Concrete& get_concrete(); Concrete& get_concrete();
@@ -176,11 +183,13 @@ private:
BufferType payload_; BufferType payload_;
std::chrono::milliseconds timestamp_{0}; std::chrono::milliseconds timestamp_{0};
void* user_data_; void* user_data_;
Message::InternalPtr internal_;
}; };
template <typename T, typename C> template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic) BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
: topic_(std::move(topic)) { : topic_(std::move(topic)),
user_data_(nullptr) {
} }
template <typename T, typename C> template <typename T, typename C>
@@ -190,16 +199,16 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
std::chrono::milliseconds(0)), std::chrono::milliseconds(0)),
user_data_(message.get_user_data()) user_data_(message.get_user_data()),
{ internal_(message.internal()) {
} }
template <typename T, typename C> template <typename T, typename C>
template <typename U, typename V> template <typename U, typename V>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs) BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
: topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), : topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()),
user_data_(rhs.user_data()) { user_data_(rhs.user_data()),
internal_(rhs.internal()) {
get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(key_, rhs.key());
get_concrete().construct_buffer(payload_, rhs.payload()); get_concrete().construct_buffer(payload_, rhs.payload());
} }
@@ -292,6 +301,17 @@ void* BasicMessageBuilder<T, C>::user_data() const {
return user_data_; return user_data_;
} }
template <typename T, typename C>
Message::InternalPtr BasicMessageBuilder<T, C>::internal() const {
return internal_;
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::internal(Message::InternalPtr internal) {
internal_ = internal;
return get_concrete();
}
template <typename T, typename C> template <typename T, typename C>
void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) { void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
lhs = rhs; lhs = rhs;

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_MESSAGE_INTERNAL_H
#define CPPKAFKA_MESSAGE_INTERNAL_H
#include <memory>
#include "message.h"
namespace cppkafka {
struct Internal {
virtual ~Internal() = default;
};
using InternalPtr = std::shared_ptr<Internal>;
/**
* \brief Private message data structure
*/
class MessageInternal {
friend class Producer;
public:
static std::unique_ptr<MessageInternal> load(Message& message) {
if (message.get_user_data()) {
// Unpack internal data
std::unique_ptr<MessageInternal> internal_data(static_cast<MessageInternal*>(message.get_user_data()));
message.load_internal(internal_data->user_data_, internal_data->internal_);
return internal_data;
}
return nullptr;
}
private:
MessageInternal(void* user_data, std::shared_ptr<Internal> internal)
: user_data_(user_data),
internal_(internal) {
}
void* user_data_;
InternalPtr internal_;
};
}
#endif //CPPKAFKA_MESSAGE_INTERNAL_H

View File

@@ -39,10 +39,11 @@
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <atomic> #include <atomic>
#include <future>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include "../producer.h" #include "../producer.h"
#include "../message.h"
#include "../detail/callback_invoker.h" #include "../detail/callback_invoker.h"
#include "../message_internal.h"
namespace cppkafka { namespace cppkafka {
@@ -113,7 +114,7 @@ public:
BufferedProducer(Configuration config); BufferedProducer(Configuration config);
/** /**
* \brief Adds a message to the producer's buffer. * \brief Adds a message to the producer's buffer.
* *
* The message won't be sent until flush is called. * The message won't be sent until flush is called.
* *
@@ -122,7 +123,7 @@ public:
void add_message(const MessageBuilder& builder); void add_message(const MessageBuilder& builder);
/** /**
* \brief Adds a message to the producer's buffer. * \brief Adds a message to the producer's buffer.
* *
* The message won't be sent until flush is called. * The message won't be sent until flush is called.
* *
@@ -145,6 +146,18 @@ public:
*/ */
void produce(const MessageBuilder& builder); void produce(const MessageBuilder& builder);
/**
* \brief Produces a message synchronously without buffering it
*
* In case of failure, the message will be replayed until 'max_number_retries' is reached
* or until the user ProduceFailureCallback returns false.
*
* \param builder The builder that contains the message to be produced
*
* \remark This method throws cppkafka::HandleException on failure
*/
void sync_produce(const MessageBuilder& builder);
/** /**
* \brief Produces a message asynchronously without buffering it * \brief Produces a message asynchronously without buffering it
* *
@@ -221,6 +234,13 @@ public:
*/ */
size_t get_total_messages_produced() const; size_t get_total_messages_produced() const;
/**
* \brief Get the total number of messages dropped since the beginning
*
* \return The number of messages
*/
size_t get_total_messages_dropped() const;
/** /**
* \brief Get the total outstanding flush operations in progress * \brief Get the total outstanding flush operations in progress
* *
@@ -230,6 +250,20 @@ public:
* \return The number of outstanding flush operations. * \return The number of outstanding flush operations.
*/ */
size_t get_flushes_in_progress() const; size_t get_flushes_in_progress() const;
/**
* \brief Sets the maximum number of retries per message until giving up
*
* Default is 5
*/
void set_max_number_retries(size_t max_number_retries);
/**
* \brief Gets the max number of retries
*
* \return The number of retries
*/
size_t get_max_number_retries() const;
/** /**
* Gets the Producer object * Gets the Producer object
@@ -285,9 +319,29 @@ public:
*/ */
void set_flush_failure_callback(FlushFailureCallback callback); void set_flush_failure_callback(FlushFailureCallback callback);
struct TestParameters {
bool force_delivery_error_;
bool force_produce_error_;
};
protected:
//For testing purposes only
#ifdef KAFKA_TEST_INSTANCE
void set_test_parameters(TestParameters *test_params) {
test_params_ = test_params;
}
TestParameters* get_test_parameters() {
return test_params_;
}
#else
TestParameters* get_test_parameters() {
return nullptr;
}
#endif
private: private:
using QueueType = std::deque<Builder>; using QueueType = std::deque<Builder>;
enum class MessagePriority { Low, High }; enum class MessagePriority { Low, High };
enum class SenderType { Sync, Async };
template <typename T> template <typename T>
struct CounterGuard{ struct CounterGuard{
@@ -295,13 +349,29 @@ private:
~CounterGuard() { --counter_; } ~CounterGuard() { --counter_; }
std::atomic<T>& counter_; std::atomic<T>& counter_;
}; };
struct Tracker : public Internal {
Tracker(SenderType sender, size_t num_retries)
: sender_(sender), num_retries_(num_retries)
{}
std::future<bool> get_new_future() {
should_retry_ = std::promise<bool>(); //reset shared data
return should_retry_.get_future(); //issue new future
}
SenderType sender_;
std::promise<bool> should_retry_;
size_t num_retries_;
};
template <typename BuilderType> template <typename BuilderType>
void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush);
void do_add_message(const Message& message, MessagePriority priority, bool do_flush);
template <typename MessageType> template <typename MessageType>
void produce_message(const MessageType& message); void produce_message(MessageType&& message);
Configuration prepare_configuration(Configuration config); Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message); void on_delivery_report(const Message& message);
template <typename MessageType>
void async_produce(MessageType&& message, bool throw_on_error);
// Members // Members
Producer producer_; Producer producer_;
@@ -314,6 +384,11 @@ private:
std::atomic<size_t> pending_acks_{0}; std::atomic<size_t> pending_acks_{0};
std::atomic<size_t> flushes_in_progress_{0}; std::atomic<size_t> flushes_in_progress_{0};
std::atomic<size_t> total_messages_produced_{0}; std::atomic<size_t> total_messages_produced_{0};
std::atomic<size_t> total_messages_dropped_{0};
int max_number_retries_{5};
#ifdef KAFKA_TEST_INSTANCE
TestParameters* test_params_;
#endif
}; };
template <typename BufferType> template <typename BufferType>
@@ -330,26 +405,52 @@ template <typename BufferType>
BufferedProducer<BufferType>::BufferedProducer(Configuration config) BufferedProducer<BufferType>::BufferedProducer(Configuration config)
: producer_(prepare_configuration(std::move(config))) { : producer_(prepare_configuration(std::move(config))) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>()); producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr;
#endif
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) { void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
// Add message tracker
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<MessageBuilder&>(builder).internal(tracker);
do_add_message(builder, MessagePriority::Low, true); do_add_message(builder, MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) { void BufferedProducer<BufferType>::add_message(Builder builder) {
// Add message tracker
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<Builder&>(builder).internal(tracker);
do_add_message(move(builder), MessagePriority::Low, true); do_add_message(move(builder), MessagePriority::Low, true);
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) { void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
produce_message(builder); // Add message tracker
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<MessageBuilder&>(builder).internal(tracker);
async_produce(builder, true);
}
template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
// Add message tracker
std::shared_ptr<Tracker> tracker = std::make_shared<Tracker>(SenderType::Async, max_number_retries_);
const_cast<MessageBuilder&>(builder).internal(tracker);
std::future<bool> should_retry;
do {
should_retry = tracker->get_new_future();
produce_message(builder);
wait_for_acks();
}
while (should_retry.get());
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce(const Message& message) { void BufferedProducer<BufferType>::produce(const Message& message) {
produce_message(message); async_produce(message, true);
} }
template <typename BufferType> template <typename BufferType>
@@ -361,16 +462,7 @@ void BufferedProducer<BufferType>::flush() {
std::swap(messages_, flush_queue); std::swap(messages_, flush_queue);
} }
while (!flush_queue.empty()) { while (!flush_queue.empty()) {
try { async_produce(std::move(flush_queue.front()), false);
produce_message(flush_queue.front());
}
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (callback && callback(flush_queue.front(), ex.get_error())) {
do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false);
}
}
flush_queue.pop_front(); flush_queue.pop_front();
} }
wait_for_acks(); wait_for_acks();
@@ -427,10 +519,10 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
{ {
std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) { if (priority == MessagePriority::High) {
messages_.emplace_front(std::move(builder)); messages_.emplace_front(std::forward<BuilderType>(builder));
} }
else { else {
messages_.emplace_back(std::move(builder)); messages_.emplace_back(std::forward<BuilderType>(builder));
} }
} }
if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) {
@@ -438,6 +530,13 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
} }
} }
template <typename BufferType>
void BufferedProducer<BufferType>::do_add_message(const Message& message,
MessagePriority priority,
bool do_flush) {
do_add_messsage(MessageBuilder(message), priority, do_flush);
}
template <typename BufferType> template <typename BufferType>
Producer& BufferedProducer<BufferType>::get_producer() { Producer& BufferedProducer<BufferType>::get_producer() {
return producer_; return producer_;
@@ -458,11 +557,26 @@ size_t BufferedProducer<BufferType>::get_total_messages_produced() const {
return total_messages_produced_; return total_messages_produced_;
} }
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_dropped() const {
return total_messages_dropped_;
}
template <typename BufferType> template <typename BufferType>
size_t BufferedProducer<BufferType>::get_flushes_in_progress() const { size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
return flushes_in_progress_; return flushes_in_progress_;
} }
template <typename BufferType>
void BufferedProducer<BufferType>::set_max_number_retries(size_t max_number_retries) {
max_number_retries_ = max_number_retries;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_max_number_retries() const {
return max_number_retries_;
}
template <typename BufferType> template <typename BufferType>
typename BufferedProducer<BufferType>::Builder typename BufferedProducer<BufferType>::Builder
BufferedProducer<BufferType>::make_builder(std::string topic) { BufferedProducer<BufferType>::make_builder(std::string topic) {
@@ -486,10 +600,10 @@ void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallba
template <typename BufferType> template <typename BufferType>
template <typename MessageType> template <typename MessageType>
void BufferedProducer<BufferType>::produce_message(const MessageType& message) { void BufferedProducer<BufferType>::produce_message(MessageType&& message) {
while (true) { while (true) {
try { try {
producer_.produce(message); producer_.produce(std::forward<MessageType>(message));
// Sent successfully // Sent successfully
++pending_acks_; ++pending_acks_;
break; break;
@@ -506,6 +620,34 @@ void BufferedProducer<BufferType>::produce_message(const MessageType& message) {
} }
} }
template <typename BufferType>
template <typename MessageType>
void BufferedProducer<BufferType>::async_produce(MessageType&& message, bool throw_on_error) {
try {
TestParameters* test_params = get_test_parameters();
if (test_params && test_params->force_produce_error_) {
throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN));
}
produce_message(std::forward<MessageType>(message));
}
catch (const HandleException& ex) {
// If we have a flush failure callback and it returns true, we retry producing this message later
CallbackInvoker<FlushFailureCallback> callback("flush failure", flush_failure_callback_, &producer_);
if (!callback || callback(std::forward<MessageType>(message), ex.get_error())) {
std::shared_ptr<Tracker> tracker = std::static_pointer_cast<Tracker>(message.internal());
if (tracker->num_retries_ > 0) {
--tracker->num_retries_;
do_add_message(std::forward<MessageType>(message), MessagePriority::High, false);
return;
}
}
++total_messages_dropped_;
if (throw_on_error) {
throw;
}
}
}
template <typename BufferType> template <typename BufferType>
Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration config) { Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration config) {
using std::placeholders::_2; using std::placeholders::_2;
@@ -516,13 +658,30 @@ Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) { void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
if (message.get_error()) { //Get tracker data
TestParameters* test_params = get_test_parameters();
std::shared_ptr<Tracker> tracker = std::static_pointer_cast<Tracker>(message.internal());
bool should_retry = false;
if (message.get_error() || (test_params && test_params->force_delivery_error_)) {
// We should produce this message again if we don't have a produce failure callback // We should produce this message again if we don't have a produce failure callback
// or we have one but it returns true // or we have one but it returns true
CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_); CallbackInvoker<ProduceFailureCallback> callback("produce failure", produce_failure_callback_, &producer_);
if (!callback || callback(message)) { if (!callback || callback(message)) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue) // Check if we have reached the maximum retry limit
do_add_message(Builder(message), MessagePriority::High, false); if (tracker->num_retries_ > 0) {
--tracker->num_retries_;
if (tracker->sender_ == SenderType::Async) {
// Re-enqueue for later retransmission with higher priority (i.e. front of the queue)
do_add_message(Builder(message), MessagePriority::High, false);
}
should_retry = true;
}
else {
++total_messages_dropped_;
}
}
else {
++total_messages_dropped_;
} }
} }
else { else {
@@ -531,6 +690,8 @@ void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
// Increment the total successful transmissions // Increment the total successful transmissions
++total_messages_produced_; ++total_messages_produced_;
} }
// Signal producers
tracker->should_retry_.set_value(should_retry);
// Decrement the expected acks // Decrement the expected acks
--pending_acks_; --pending_acks_;
assert(pending_acks_ != (size_t)-1); // Prevent underflow assert(pending_acks_ != (size_t)-1); // Prevent underflow

View File

@@ -31,7 +31,7 @@
#include <vector> #include <vector>
#include <librdkafka/rdkafka.h> #include <librdkafka/rdkafka.h>
#include "exceptions.h" #include "exceptions.h"
#include "message.h" #include "message_internal.h"
#include "producer.h" #include "producer.h"
#include "consumer.h" #include "consumer.h"
@@ -40,7 +40,7 @@ using std::map;
using std::move; using std::move;
using std::vector; using std::vector;
using std::initializer_list; using std::initializer_list;
using std::unique_ptr;
using boost::optional; using boost::optional;
using std::chrono::milliseconds; using std::chrono::milliseconds;
@@ -52,6 +52,7 @@ namespace cppkafka {
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
Producer* handle = static_cast<Producer*>(opaque); Producer* handle = static_cast<Producer*>(opaque);
Message message = Message::make_non_owning((rd_kafka_message_t*)msg); Message message = Message::make_non_owning((rd_kafka_message_t*)msg);
unique_ptr<MessageInternal> internal_data(MessageInternal::load(message));
CallbackInvoker<Configuration::DeliveryReportCallback> CallbackInvoker<Configuration::DeliveryReportCallback>
("delivery report", handle->get_configuration().get_delivery_report_callback(), handle) ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle)
(*handle, message); (*handle, message);

View File

@@ -42,7 +42,8 @@ Message Message::make_non_owning(rd_kafka_message_t* handle) {
} }
Message::Message() Message::Message()
: handle_(nullptr, nullptr) { : handle_(nullptr, nullptr),
user_data_(nullptr) {
} }
@@ -51,15 +52,21 @@ Message::Message(rd_kafka_message_t* handle)
} }
Message::Message(rd_kafka_message_t* handle, NonOwningTag) Message::Message(rd_kafka_message_t* handle, NonOwningTag)
: Message(HandlePtr(handle, &dummy_deleter)) { : Message(HandlePtr(handle, &dummy_deleter)) {
} }
Message::Message(HandlePtr handle) Message::Message(HandlePtr handle)
: handle_(move(handle)), : handle_(move(handle)),
payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) { key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()),
user_data_(handle_ ? handle_->_private : nullptr) {
}
void Message::load_internal(void* user_data, InternalPtr internal) {
user_data_ = user_data;
internal_ = internal;
} }
// MessageTimestamp // MessageTimestamp

View File

@@ -28,13 +28,15 @@
*/ */
#include <errno.h> #include <errno.h>
#include <memory>
#include "producer.h" #include "producer.h"
#include "exceptions.h" #include "exceptions.h"
#include "message.h" #include "message_internal.h"
using std::move; using std::move;
using std::string; using std::string;
using std::chrono::milliseconds; using std::chrono::milliseconds;
using std::unique_ptr;
namespace cppkafka { namespace cppkafka {
@@ -65,6 +67,7 @@ void Producer::produce(const MessageBuilder& builder) {
const Buffer& payload = builder.payload(); const Buffer& payload = builder.payload();
const Buffer& key = builder.key(); const Buffer& key = builder.key();
const int policy = static_cast<int>(message_payload_policy_); const int policy = static_cast<int>(message_payload_policy_);
unique_ptr<MessageInternal> internal_data(new MessageInternal(builder.user_data(), builder.internal()));
auto result = rd_kafka_producev(get_handle(), auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_TOPIC(builder.topic().data()),
RD_KAFKA_V_PARTITION(builder.partition()), RD_KAFKA_V_PARTITION(builder.partition()),
@@ -72,9 +75,10 @@ void Producer::produce(const MessageBuilder& builder) {
RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_OPAQUE(internal_data.get()),
RD_KAFKA_V_END); RD_KAFKA_V_END);
check_error(result); check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
} }
void Producer::produce(const Message& message) { void Producer::produce(const Message& message) {
@@ -82,6 +86,7 @@ void Producer::produce(const Message& message) {
const Buffer& key = message.get_key(); const Buffer& key = message.get_key();
const int policy = static_cast<int>(message_payload_policy_); const int policy = static_cast<int>(message_payload_policy_);
int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0;
unique_ptr<MessageInternal> internal_data(new MessageInternal(message.get_user_data(), message.internal()));
auto result = rd_kafka_producev(get_handle(), auto result = rd_kafka_producev(get_handle(),
RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_TOPIC(message.get_topic().data()),
RD_KAFKA_V_PARTITION(message.get_partition()), RD_KAFKA_V_PARTITION(message.get_partition()),
@@ -89,9 +94,10 @@ void Producer::produce(const Message& message) {
RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_TIMESTAMP(duration),
RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()),
RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()),
RD_KAFKA_V_OPAQUE(message.get_user_data()), RD_KAFKA_V_OPAQUE(internal_data.get()),
RD_KAFKA_V_END); RD_KAFKA_V_END);
check_error(result); check_error(result);
internal_data.release(); //data has been passed-on to rdkafka so we release ownership
} }
int Producer::poll() { int Producer::poll() {

View File

@@ -74,6 +74,7 @@ void flusher_run(BufferedProducer<string>& producer,
if (producer.get_buffer_size() >= (size_t)num_flush) { if (producer.get_buffer_size() >= (size_t)num_flush) {
producer.flush(); producer.flush();
} }
this_thread::sleep_for(milliseconds(10));
} }
producer.flush(); producer.flush();
} }
@@ -86,6 +87,36 @@ void clear_run(BufferedProducer<string>& producer,
producer.clear(); producer.clear();
} }
vector<int> dr_data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
void dr_callback(const Message& message) {
static int i = 0;
if (!message || message.is_eof()) return;
CHECK(message.get_user_data() == &dr_data[i]);
CHECK(*static_cast<int*>(message.get_user_data()) == dr_data[i]);
++i;
}
bool dr_failure_callback(const Message& message) {
if (!message || message.is_eof()) return true;
CHECK(message.get_user_data() == &dr_data[0]);
CHECK(*static_cast<int*>(message.get_user_data()) == dr_data[0]);
return true; //always retry
}
template <typename B>
class ErrorProducer : public BufferedProducer<B>
{
public:
ErrorProducer(Configuration config,
typename BufferedProducer<B>::TestParameters params) :
BufferedProducer<B>(config),
params_(params) {
this->set_test_parameters(&params_);
}
private:
typename BufferedProducer<B>::TestParameters params_;
};
TEST_CASE("simple production", "[producer]") { TEST_CASE("simple production", "[producer]") {
int partition = 0; int partition = 0;
@@ -271,6 +302,86 @@ TEST_CASE("multiple messages", "[producer]") {
} }
} }
TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") {
size_t message_count = 10;
set<string> payloads;
// Create a consumer and subscribe to this topic
Consumer consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS);
// Now create a producer and produce a message
BufferedProducer<string> producer(make_producer_config());
producer.set_produce_success_callback(dr_callback);
const string payload_base = "Hello world ";
for (size_t i = 0; i < message_count; ++i) {
const string payload = payload_base + to_string(i);
payloads.insert(payload);
producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload).user_data(&dr_data[i]));
}
runner.try_join();
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == message_count);
for (size_t i = 0; i < messages.size(); ++i) {
const auto& message = messages[i];
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(payloads.erase(message.get_payload()) == 1);
CHECK(!!message.get_error() == false);
CHECK(!!message.get_key() == false);
CHECK(message.get_partition() >= 0);
CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS);
}
}
TEST_CASE("replay sync messages with errors", "[producer][buffered_producer][sync]") {
size_t num_retries = 4;
// Create a consumer and subscribe to this topic
Consumer consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, num_retries+1, KAFKA_NUM_PARTITIONS);
// Now create a producer and produce a message
ErrorProducer<string> producer(make_producer_config(), BufferedProducer<string>::TestParameters{true, false});
producer.set_produce_failure_callback(dr_failure_callback);
producer.set_max_number_retries(num_retries);
string payload = "Hello world";
producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload).user_data(&dr_data[0]));
runner.try_join();
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == num_retries+1);
for (size_t i = 0; i < messages.size(); ++i) {
const auto& message = messages[i];
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_payload() == payload);
CHECK(!!message.get_error() == false);
CHECK(!!message.get_key() == false);
CHECK(message.get_partition() >= 0);
CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS);
}
}
TEST_CASE("replay async messages with errors", "[producer][buffered_producer][async]") {
size_t num_retries = 4;
int exit_flag = 0;
// Now create a producer and produce a message
ErrorProducer<string> producer(make_producer_config(),
BufferedProducer<string>::TestParameters{false, true});
producer.set_max_number_retries(num_retries);
thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0);
string payload = "Hello world";
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
this_thread::sleep_for(milliseconds(2000));
exit_flag = 1;
flusher_thread.join();
REQUIRE(producer.get_total_messages_produced() == 0);
CHECK(producer.get_total_messages_dropped() == 1);
}
TEST_CASE("buffered producer", "[producer][buffered_producer]") { TEST_CASE("buffered producer", "[producer][buffered_producer]") {
int partition = 0; int partition = 0;