diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index cb466ae..5c7d56d 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -145,7 +145,7 @@ public: Configuration& set_default_topic_configuration(TopicConfiguration config); /** - * Returns true iff the given property name has been set + * Returns true if the given property name has been set */ bool has_property(const std::string& name) const; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index a15f947..eda40dc 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -43,6 +43,7 @@ namespace cppkafka { class MessageTimestamp; +struct Internal; /** * \brief Thin wrapper over a rdkafka message handle @@ -56,6 +57,8 @@ class MessageTimestamp; */ class CPPKAFKA_API Message { public: + friend class MessageInternal; + using InternalPtr = std::shared_ptr; /** * Constructs a message that won't take ownership of the given pointer */ @@ -134,14 +137,13 @@ public: } /** - * \brief Gets the private data. + * \brief Gets the private user data. * * This should only be used on messages produced by a Producer that were set a private data * attribute */ void* get_user_data() const { - assert(handle_); - return handle_->_private; + return user_data_; } /** @@ -164,6 +166,13 @@ public: rd_kafka_message_t* get_handle() const { return handle_.get(); } + + /** + * Internal private const data accessor (internal use only) + */ + InternalPtr internal() const { + return internal_; + } private: using HandlePtr = std::unique_ptr; @@ -171,10 +180,13 @@ private: Message(rd_kafka_message_t* handle, NonOwningTag); Message(HandlePtr handle); + void load_internal(void* user_data, InternalPtr internal); HandlePtr handle_; Buffer payload_; Buffer key_; + void* user_data_; + InternalPtr internal_; }; using MessageList = std::vector; diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 59b3365..64e8d54 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -166,6 +166,13 @@ public: * Gets the message's user data pointer */ void* user_data() const; + + /** + * Private data accessor (internal use only) + */ + Message::InternalPtr internal() const; + Concrete& internal(Message::InternalPtr internal); + private: void construct_buffer(BufferType& lhs, const BufferType& rhs); Concrete& get_concrete(); @@ -176,11 +183,13 @@ private: BufferType payload_; std::chrono::milliseconds timestamp_{0}; void* user_data_; + Message::InternalPtr internal_; }; template BasicMessageBuilder::BasicMessageBuilder(std::string topic) -: topic_(std::move(topic)) { +: topic_(std::move(topic)), + user_data_(nullptr) { } template @@ -190,16 +199,16 @@ BasicMessageBuilder::BasicMessageBuilder(const Message& message) payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())), timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() : std::chrono::milliseconds(0)), - user_data_(message.get_user_data()) -{ - + user_data_(message.get_user_data()), + internal_(message.internal()) { } template template BasicMessageBuilder::BasicMessageBuilder(const BasicMessageBuilder& rhs) : topic_(rhs.topic()), partition_(rhs.partition()), timestamp_(rhs.timestamp()), - user_data_(rhs.user_data()) { + user_data_(rhs.user_data()), + internal_(rhs.internal()) { get_concrete().construct_buffer(key_, rhs.key()); get_concrete().construct_buffer(payload_, rhs.payload()); } @@ -292,6 +301,17 @@ void* BasicMessageBuilder::user_data() const { return user_data_; } +template +Message::InternalPtr BasicMessageBuilder::internal() const { + return internal_; +} + +template +C& BasicMessageBuilder::internal(Message::InternalPtr internal) { + internal_ = internal; + return get_concrete(); +} + template void BasicMessageBuilder::construct_buffer(T& lhs, const T& rhs) { lhs = rhs; diff --git a/include/cppkafka/message_internal.h b/include/cppkafka/message_internal.h new file mode 100644 index 0000000..1e7402f --- /dev/null +++ b/include/cppkafka/message_internal.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_MESSAGE_INTERNAL_H +#define CPPKAFKA_MESSAGE_INTERNAL_H + +#include +#include "message.h" + +namespace cppkafka { + +struct Internal { + virtual ~Internal() = default; +}; +using InternalPtr = std::shared_ptr; + +/** + * \brief Private message data structure + */ +class MessageInternal { + friend class Producer; + +public: + static std::unique_ptr load(Message& message) { + if (message.get_user_data()) { + // Unpack internal data + std::unique_ptr internal_data(static_cast(message.get_user_data())); + message.load_internal(internal_data->user_data_, internal_data->internal_); + return internal_data; + } + return nullptr; + } + +private: + MessageInternal(void* user_data, std::shared_ptr internal) + : user_data_(user_data), + internal_(internal) { + } + + void* user_data_; + InternalPtr internal_; +}; + +} + +#endif //CPPKAFKA_MESSAGE_INTERNAL_H diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index c9bcb65..35eaf39 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -39,10 +39,11 @@ #include #include #include +#include #include #include "../producer.h" -#include "../message.h" #include "../detail/callback_invoker.h" +#include "../message_internal.h" namespace cppkafka { @@ -113,7 +114,7 @@ public: BufferedProducer(Configuration config); /** - * \brief Adds a message to the producer's buffer. + * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * @@ -122,7 +123,7 @@ public: void add_message(const MessageBuilder& builder); /** - * \brief Adds a message to the producer's buffer. + * \brief Adds a message to the producer's buffer. * * The message won't be sent until flush is called. * @@ -145,6 +146,18 @@ public: */ void produce(const MessageBuilder& builder); + /** + * \brief Produces a message synchronously without buffering it + * + * In case of failure, the message will be replayed until 'max_number_retries' is reached + * or until the user ProduceFailureCallback returns false. + * + * \param builder The builder that contains the message to be produced + * + * \remark This method throws cppkafka::HandleException on failure + */ + void sync_produce(const MessageBuilder& builder); + /** * \brief Produces a message asynchronously without buffering it * @@ -221,6 +234,13 @@ public: */ size_t get_total_messages_produced() const; + /** + * \brief Get the total number of messages dropped since the beginning + * + * \return The number of messages + */ + size_t get_total_messages_dropped() const; + /** * \brief Get the total outstanding flush operations in progress * @@ -230,6 +250,20 @@ public: * \return The number of outstanding flush operations. */ size_t get_flushes_in_progress() const; + + /** + * \brief Sets the maximum number of retries per message until giving up + * + * Default is 5 + */ + void set_max_number_retries(size_t max_number_retries); + + /** + * \brief Gets the max number of retries + * + * \return The number of retries + */ + size_t get_max_number_retries() const; /** * Gets the Producer object @@ -285,9 +319,29 @@ public: */ void set_flush_failure_callback(FlushFailureCallback callback); + struct TestParameters { + bool force_delivery_error_; + bool force_produce_error_; + }; +protected: + //For testing purposes only +#ifdef KAFKA_TEST_INSTANCE + void set_test_parameters(TestParameters *test_params) { + test_params_ = test_params; + } + TestParameters* get_test_parameters() { + return test_params_; + } +#else + TestParameters* get_test_parameters() { + return nullptr; + } +#endif + private: using QueueType = std::deque; enum class MessagePriority { Low, High }; + enum class SenderType { Sync, Async }; template struct CounterGuard{ @@ -295,13 +349,29 @@ private: ~CounterGuard() { --counter_; } std::atomic& counter_; }; + + struct Tracker : public Internal { + Tracker(SenderType sender, size_t num_retries) + : sender_(sender), num_retries_(num_retries) + {} + std::future get_new_future() { + should_retry_ = std::promise(); //reset shared data + return should_retry_.get_future(); //issue new future + } + SenderType sender_; + std::promise should_retry_; + size_t num_retries_; + }; template void do_add_message(BuilderType&& builder, MessagePriority priority, bool do_flush); + void do_add_message(const Message& message, MessagePriority priority, bool do_flush); template - void produce_message(const MessageType& message); + void produce_message(MessageType&& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); + template + void async_produce(MessageType&& message, bool throw_on_error); // Members Producer producer_; @@ -314,6 +384,11 @@ private: std::atomic pending_acks_{0}; std::atomic flushes_in_progress_{0}; std::atomic total_messages_produced_{0}; + std::atomic total_messages_dropped_{0}; + int max_number_retries_{5}; +#ifdef KAFKA_TEST_INSTANCE + TestParameters* test_params_; +#endif }; template @@ -330,26 +405,52 @@ template BufferedProducer::BufferedProducer(Configuration config) : producer_(prepare_configuration(std::move(config))) { producer_.set_payload_policy(get_default_payload_policy()); +#ifdef KAFKA_TEST_INSTANCE + test_params_ = nullptr; +#endif } template void BufferedProducer::add_message(const MessageBuilder& builder) { + // Add message tracker + std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); + const_cast(builder).internal(tracker); do_add_message(builder, MessagePriority::Low, true); } template void BufferedProducer::add_message(Builder builder) { + // Add message tracker + std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); + const_cast(builder).internal(tracker); do_add_message(move(builder), MessagePriority::Low, true); } template void BufferedProducer::produce(const MessageBuilder& builder) { - produce_message(builder); + // Add message tracker + std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); + const_cast(builder).internal(tracker); + async_produce(builder, true); +} + +template +void BufferedProducer::sync_produce(const MessageBuilder& builder) { + // Add message tracker + std::shared_ptr tracker = std::make_shared(SenderType::Async, max_number_retries_); + const_cast(builder).internal(tracker); + std::future should_retry; + do { + should_retry = tracker->get_new_future(); + produce_message(builder); + wait_for_acks(); + } + while (should_retry.get()); } template void BufferedProducer::produce(const Message& message) { - produce_message(message); + async_produce(message, true); } template @@ -361,16 +462,7 @@ void BufferedProducer::flush() { std::swap(messages_, flush_queue); } while (!flush_queue.empty()) { - try { - produce_message(flush_queue.front()); - } - catch (const HandleException& ex) { - // If we have a flush failure callback and it returns true, we retry producing this message later - CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); - if (callback && callback(flush_queue.front(), ex.get_error())) { - do_add_message(std::move(flush_queue.front()), MessagePriority::Low, false); - } - } + async_produce(std::move(flush_queue.front()), false); flush_queue.pop_front(); } wait_for_acks(); @@ -427,10 +519,10 @@ void BufferedProducer::do_add_message(BuilderType&& builder, { std::lock_guard lock(mutex_); if (priority == MessagePriority::High) { - messages_.emplace_front(std::move(builder)); + messages_.emplace_front(std::forward(builder)); } else { - messages_.emplace_back(std::move(builder)); + messages_.emplace_back(std::forward(builder)); } } if (do_flush && (max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { @@ -438,6 +530,13 @@ void BufferedProducer::do_add_message(BuilderType&& builder, } } +template +void BufferedProducer::do_add_message(const Message& message, + MessagePriority priority, + bool do_flush) { + do_add_messsage(MessageBuilder(message), priority, do_flush); +} + template Producer& BufferedProducer::get_producer() { return producer_; @@ -458,11 +557,26 @@ size_t BufferedProducer::get_total_messages_produced() const { return total_messages_produced_; } +template +size_t BufferedProducer::get_total_messages_dropped() const { + return total_messages_dropped_; +} + template size_t BufferedProducer::get_flushes_in_progress() const { return flushes_in_progress_; } +template +void BufferedProducer::set_max_number_retries(size_t max_number_retries) { + max_number_retries_ = max_number_retries; +} + +template +size_t BufferedProducer::get_max_number_retries() const { + return max_number_retries_; +} + template typename BufferedProducer::Builder BufferedProducer::make_builder(std::string topic) { @@ -486,10 +600,10 @@ void BufferedProducer::set_flush_failure_callback(FlushFailureCallba template template -void BufferedProducer::produce_message(const MessageType& message) { +void BufferedProducer::produce_message(MessageType&& message) { while (true) { try { - producer_.produce(message); + producer_.produce(std::forward(message)); // Sent successfully ++pending_acks_; break; @@ -506,6 +620,34 @@ void BufferedProducer::produce_message(const MessageType& message) { } } +template +template +void BufferedProducer::async_produce(MessageType&& message, bool throw_on_error) { + try { + TestParameters* test_params = get_test_parameters(); + if (test_params && test_params->force_produce_error_) { + throw HandleException(Error(RD_KAFKA_RESP_ERR_UNKNOWN)); + } + produce_message(std::forward(message)); + } + catch (const HandleException& ex) { + // If we have a flush failure callback and it returns true, we retry producing this message later + CallbackInvoker callback("flush failure", flush_failure_callback_, &producer_); + if (!callback || callback(std::forward(message), ex.get_error())) { + std::shared_ptr tracker = std::static_pointer_cast(message.internal()); + if (tracker->num_retries_ > 0) { + --tracker->num_retries_; + do_add_message(std::forward(message), MessagePriority::High, false); + return; + } + } + ++total_messages_dropped_; + if (throw_on_error) { + throw; + } + } +} + template Configuration BufferedProducer::prepare_configuration(Configuration config) { using std::placeholders::_2; @@ -516,13 +658,30 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - if (message.get_error()) { + //Get tracker data + TestParameters* test_params = get_test_parameters(); + std::shared_ptr tracker = std::static_pointer_cast(message.internal()); + bool should_retry = false; + if (message.get_error() || (test_params && test_params->force_delivery_error_)) { // We should produce this message again if we don't have a produce failure callback // or we have one but it returns true CallbackInvoker callback("produce failure", produce_failure_callback_, &producer_); if (!callback || callback(message)) { - // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) - do_add_message(Builder(message), MessagePriority::High, false); + // Check if we have reached the maximum retry limit + if (tracker->num_retries_ > 0) { + --tracker->num_retries_; + if (tracker->sender_ == SenderType::Async) { + // Re-enqueue for later retransmission with higher priority (i.e. front of the queue) + do_add_message(Builder(message), MessagePriority::High, false); + } + should_retry = true; + } + else { + ++total_messages_dropped_; + } + } + else { + ++total_messages_dropped_; } } else { @@ -531,6 +690,8 @@ void BufferedProducer::on_delivery_report(const Message& message) { // Increment the total successful transmissions ++total_messages_produced_; } + // Signal producers + tracker->should_retry_.set_value(should_retry); // Decrement the expected acks --pending_acks_; assert(pending_acks_ != (size_t)-1); // Prevent underflow diff --git a/src/configuration.cpp b/src/configuration.cpp index 92a81df..43a2851 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -31,7 +31,7 @@ #include #include #include "exceptions.h" -#include "message.h" +#include "message_internal.h" #include "producer.h" #include "consumer.h" @@ -40,7 +40,7 @@ using std::map; using std::move; using std::vector; using std::initializer_list; - +using std::unique_ptr; using boost::optional; using std::chrono::milliseconds; @@ -52,6 +52,7 @@ namespace cppkafka { void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { Producer* handle = static_cast(opaque); Message message = Message::make_non_owning((rd_kafka_message_t*)msg); + unique_ptr internal_data(MessageInternal::load(message)); CallbackInvoker ("delivery report", handle->get_configuration().get_delivery_report_callback(), handle) (*handle, message); diff --git a/src/message.cpp b/src/message.cpp index d9c0870..23070bf 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -42,7 +42,8 @@ Message Message::make_non_owning(rd_kafka_message_t* handle) { } Message::Message() -: handle_(nullptr, nullptr) { +: handle_(nullptr, nullptr), + user_data_(nullptr) { } @@ -51,15 +52,21 @@ Message::Message(rd_kafka_message_t* handle) } -Message::Message(rd_kafka_message_t* handle, NonOwningTag) +Message::Message(rd_kafka_message_t* handle, NonOwningTag) : Message(HandlePtr(handle, &dummy_deleter)) { } -Message::Message(HandlePtr handle) +Message::Message(HandlePtr handle) : handle_(move(handle)), payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), - key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) { + key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()), + user_data_(handle_ ? handle_->_private : nullptr) { +} + +void Message::load_internal(void* user_data, InternalPtr internal) { + user_data_ = user_data; + internal_ = internal; } // MessageTimestamp diff --git a/src/producer.cpp b/src/producer.cpp index e4d47d4..e533c35 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -28,13 +28,15 @@ */ #include +#include #include "producer.h" #include "exceptions.h" -#include "message.h" +#include "message_internal.h" using std::move; using std::string; using std::chrono::milliseconds; +using std::unique_ptr; namespace cppkafka { @@ -65,6 +67,7 @@ void Producer::produce(const MessageBuilder& builder) { const Buffer& payload = builder.payload(); const Buffer& key = builder.key(); const int policy = static_cast(message_payload_policy_); + unique_ptr internal_data(new MessageInternal(builder.user_data(), builder.internal())); auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(builder.topic().data()), RD_KAFKA_V_PARTITION(builder.partition()), @@ -72,9 +75,10 @@ void Producer::produce(const MessageBuilder& builder) { RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(builder.user_data()), + RD_KAFKA_V_OPAQUE(internal_data.get()), RD_KAFKA_V_END); check_error(result); + internal_data.release(); //data has been passed-on to rdkafka so we release ownership } void Producer::produce(const Message& message) { @@ -82,6 +86,7 @@ void Producer::produce(const Message& message) { const Buffer& key = message.get_key(); const int policy = static_cast(message_payload_policy_); int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; + unique_ptr internal_data(new MessageInternal(message.get_user_data(), message.internal())); auto result = rd_kafka_producev(get_handle(), RD_KAFKA_V_TOPIC(message.get_topic().data()), RD_KAFKA_V_PARTITION(message.get_partition()), @@ -89,9 +94,10 @@ void Producer::produce(const Message& message) { RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), - RD_KAFKA_V_OPAQUE(message.get_user_data()), + RD_KAFKA_V_OPAQUE(internal_data.get()), RD_KAFKA_V_END); check_error(result); + internal_data.release(); //data has been passed-on to rdkafka so we release ownership } int Producer::poll() { diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 007f2e6..148ad00 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -74,6 +74,7 @@ void flusher_run(BufferedProducer& producer, if (producer.get_buffer_size() >= (size_t)num_flush) { producer.flush(); } + this_thread::sleep_for(milliseconds(10)); } producer.flush(); } @@ -86,6 +87,36 @@ void clear_run(BufferedProducer& producer, producer.clear(); } +vector dr_data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; +void dr_callback(const Message& message) { + static int i = 0; + if (!message || message.is_eof()) return; + CHECK(message.get_user_data() == &dr_data[i]); + CHECK(*static_cast(message.get_user_data()) == dr_data[i]); + ++i; +} + +bool dr_failure_callback(const Message& message) { + if (!message || message.is_eof()) return true; + CHECK(message.get_user_data() == &dr_data[0]); + CHECK(*static_cast(message.get_user_data()) == dr_data[0]); + return true; //always retry +} + +template +class ErrorProducer : public BufferedProducer +{ +public: + ErrorProducer(Configuration config, + typename BufferedProducer::TestParameters params) : + BufferedProducer(config), + params_(params) { + this->set_test_parameters(¶ms_); + } +private: + typename BufferedProducer::TestParameters params_; +}; + TEST_CASE("simple production", "[producer]") { int partition = 0; @@ -271,6 +302,86 @@ TEST_CASE("multiple messages", "[producer]") { } } +TEST_CASE("multiple sync messages", "[producer][buffered_producer][sync]") { + size_t message_count = 10; + set payloads; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + BufferedProducer producer(make_producer_config()); + producer.set_produce_success_callback(dr_callback); + const string payload_base = "Hello world "; + for (size_t i = 0; i < message_count; ++i) { + const string payload = payload_base + to_string(i); + payloads.insert(payload); + producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload).user_data(&dr_data[i])); + } + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == message_count); + for (size_t i = 0; i < messages.size(); ++i) { + const auto& message = messages[i]; + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(payloads.erase(message.get_payload()) == 1); + CHECK(!!message.get_error() == false); + CHECK(!!message.get_key() == false); + CHECK(message.get_partition() >= 0); + CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS); + } +} + +TEST_CASE("replay sync messages with errors", "[producer][buffered_producer][sync]") { + size_t num_retries = 4; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPICS[0] }); + ConsumerRunner runner(consumer, num_retries+1, KAFKA_NUM_PARTITIONS); + + // Now create a producer and produce a message + ErrorProducer producer(make_producer_config(), BufferedProducer::TestParameters{true, false}); + producer.set_produce_failure_callback(dr_failure_callback); + producer.set_max_number_retries(num_retries); + string payload = "Hello world"; + producer.sync_produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload).user_data(&dr_data[0])); + runner.try_join(); + + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == num_retries+1); + for (size_t i = 0; i < messages.size(); ++i) { + const auto& message = messages[i]; + CHECK(message.get_topic() == KAFKA_TOPICS[0]); + CHECK(message.get_payload() == payload); + CHECK(!!message.get_error() == false); + CHECK(!!message.get_key() == false); + CHECK(message.get_partition() >= 0); + CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS); + } +} + +TEST_CASE("replay async messages with errors", "[producer][buffered_producer][async]") { + size_t num_retries = 4; + int exit_flag = 0; + + // Now create a producer and produce a message + ErrorProducer producer(make_producer_config(), + BufferedProducer::TestParameters{false, true}); + producer.set_max_number_retries(num_retries); + thread flusher_thread(flusher_run, ref(producer), ref(exit_flag), 0); + string payload = "Hello world"; + producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload)); + this_thread::sleep_for(milliseconds(2000)); + exit_flag = 1; + flusher_thread.join(); + REQUIRE(producer.get_total_messages_produced() == 0); + CHECK(producer.get_total_messages_dropped() == 1); +} + TEST_CASE("buffered producer", "[producer][buffered_producer]") { int partition = 0;