diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index a2d9cf6..404fbe4 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -43,6 +43,7 @@ namespace cppkafka { class Topic; class Buffer; class TopicConfiguration; +class Message; /** * \brief Producer class @@ -86,39 +87,44 @@ public: }; /** - * Constructs a producer using the given configuration + * \brief Constructs a producer using the given configuration * * \param config The configuration to use */ Producer(Configuration config); /** - * Sets the payload policy + * \brief Sets the payload policy * * \param policy The payload policy to be used */ void set_payload_policy(PayloadPolicy policy); /** - * Returns the current payload policy + * \brief Returns the current payload policy */ PayloadPolicy get_payload_policy() const; /** - * Produces a message + * \brief Produces a message * - * \param topic The topic to write the message to - * \param partition The partition to write the message to - * \param payload The message payload + * \param builder The builder class used to compose a message */ void produce(const MessageBuilder& builder); + + /** + * \brief Produces a message + * + * \param message The message to be produced + */ + void produce(const Message& message); /** * \brief Polls on this handle * * This translates into a call to rd_kafka_poll. * - * The timeout used on this call is the one configured via Producer::set_timeout. + * \remark The timeout used on this call is the one configured via Producer::set_timeout. */ int poll(); @@ -136,7 +142,7 @@ public: * * This translates into a call to rd_kafka_flush. * - * The timeout used on this call is the one configured via Producer::set_timeout. + * \remark The timeout used on this call is the one configured via Producer::set_timeout. */ void flush(); diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index d01a8e6..c6cb51f 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -108,6 +108,16 @@ public: * \param builder The builder that contains the message to be produced */ void produce(const MessageBuilder& builder); + + /** + * \brief Produces a message without buffering it + * + * The message will still be tracked so that a call to flush or wait_for_acks will actually + * wait for it to be acknowledged. + * + * \param message The message to be produced + */ + void produce(const Message& message); /** * \brief Flushes the buffered messages. @@ -126,6 +136,34 @@ public: * Clears any buffered messages */ void clear(); + + /** + * \brief Sets the maximum amount of messages to be enqueued in the buffer. + * + * After 'max_buffer_size' is reached, flush() will be called automatically. + * + * \param size The max size of the internal buffer. Allowed values are: + * -1 : Unlimited buffer size. Must be flushed manually (default value) + * 0 : Don't buffer anything. add_message() behaves like produce() + * > 0 : Max number of messages before flush() is called. + * + * \remark add_message() will block when 'max_buffer_size' is reached due to flush() + */ + void set_max_buffer_size(ssize_t max_buffer_size); + + /** + * \brief Return the maximum allowed buffer size. + * + * \return The max buffer size. A value of -1 indicates an unbounded buffer. + */ + ssize_t get_max_buffer_size() const; + + /** + * \brief Get the number of messages in the buffer + * + * \return The number of messages + */ + size_t get_buffer_size() const; /** * Gets the Producer object @@ -157,20 +195,25 @@ private: template void do_add_message(BuilderType&& builder); - void produce_message(const MessageBuilder& message); + template + void produce_message(const MessageType& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); + + Configuration::DeliveryReportCallback delivery_report_callback_; Producer producer_; QueueType messages_; ProduceFailureCallback produce_failure_callback_; size_t expected_acks_{0}; size_t messages_acked_{0}; + ssize_t max_buffer_size_{-1}; }; template BufferedProducer::BufferedProducer(Configuration config) -: producer_(prepare_configuration(std::move(config))) { +: delivery_report_callback_(config.get_delivery_report_callback()), + producer_(prepare_configuration(std::move(config))) { } @@ -190,13 +233,18 @@ void BufferedProducer::produce(const MessageBuilder& builder) { expected_acks_++; } +template +void BufferedProducer::produce(const Message& message) { + produce_message(message); + expected_acks_++; +} + template void BufferedProducer::flush() { while (!messages_.empty()) { produce_message(messages_.front()); messages_.pop(); } - wait_for_acks(); } @@ -228,11 +276,32 @@ void BufferedProducer::clear() { messages_acked_ = 0; } +template +void BufferedProducer::set_max_buffer_size(ssize_t max_buffer_size) { + if (max_buffer_size < -1) { + throw Exception("Invalid buffer size."); + } + max_buffer_size_ = max_buffer_size; +} + +template +ssize_t BufferedProducer::get_max_buffer_size() const { + return max_buffer_size_; +} + +template +size_t BufferedProducer::get_buffer_size() const { + return messages_.size(); +} + template template void BufferedProducer::do_add_message(BuilderType&& builder) { expected_acks_++; - messages_.push(std::move(builder)); + messages_.push(std::forward(builder)); + if ((max_buffer_size_ >= 0) && (max_buffer_size_ <= (ssize_t)messages_.size())) { + flush(); + } } template @@ -257,11 +326,12 @@ void BufferedProducer::set_produce_failure_callback(ProduceFailureCa } template -void BufferedProducer::produce_message(const MessageBuilder& builder) { +template +void BufferedProducer::produce_message(const MessageType& message) { bool sent = false; while (!sent) { try { - producer_.produce(builder); + producer_.produce(message); sent = true; } catch (const HandleException& ex) { @@ -287,22 +357,16 @@ Configuration BufferedProducer::prepare_configuration(Configuration template void BufferedProducer::on_delivery_report(const Message& message) { - // We should produce this message again if it has an error and we either don't have a + // Call the user-supplied delivery report callback if any + if (delivery_report_callback_) { + delivery_report_callback_(producer_, message); + } + // We should produce this message again if it has an error and we either don't have a // produce failure callback or we have one but it returns true bool should_produce = message.get_error() && (!produce_failure_callback_ || produce_failure_callback_(message)); if (should_produce) { - MessageBuilder builder(message.get_topic()); - const auto& key = message.get_key(); - const auto& payload = message.get_payload(); - builder.partition(message.get_partition()) - .key(Buffer(key.get_data(), key.get_size())) - .payload(Buffer(payload.get_data(), payload.get_size())) - .user_data(message.get_user_data()); - if (message.get_timestamp()) { - builder.timestamp(message.get_timestamp()->get_timestamp()); - } - produce_message(builder); + produce_message(message); return; } // If production was successful or the produce failure callback returned false, then diff --git a/src/producer.cpp b/src/producer.cpp index 964ce01..e4d47d4 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -30,10 +30,10 @@ #include #include "producer.h" #include "exceptions.h" +#include "message.h" using std::move; using std::string; - using std::chrono::milliseconds; namespace cppkafka { @@ -77,6 +77,23 @@ void Producer::produce(const MessageBuilder& builder) { check_error(result); } +void Producer::produce(const Message& message) { + const Buffer& payload = message.get_payload(); + const Buffer& key = message.get_key(); + const int policy = static_cast(message_payload_policy_); + int64_t duration = message.get_timestamp() ? message.get_timestamp().get().get_timestamp().count() : 0; + auto result = rd_kafka_producev(get_handle(), + RD_KAFKA_V_TOPIC(message.get_topic().data()), + RD_KAFKA_V_PARTITION(message.get_partition()), + RD_KAFKA_V_MSGFLAGS(policy), + RD_KAFKA_V_TIMESTAMP(duration), + RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), + RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), + RD_KAFKA_V_OPAQUE(message.get_user_data()), + RD_KAFKA_V_END); + check_error(result); +} + int Producer::poll() { return poll(get_timeout()); } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 4454ac6..cda960d 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -101,6 +101,38 @@ TEST_CASE("simple production", "[producer]") { REQUIRE(!!message.get_timestamp() == true); CHECK(message.get_timestamp()->get_timestamp() == timestamp); } + + SECTION("message without message builder") { + const string payload = "Goodbye cruel world!"; + const string key = "replay key"; + const milliseconds timestamp{15}; + Producer producer(config); + producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) + .key(key) + .payload(payload) + .timestamp(timestamp)); + runner.try_join(); + ConsumerRunner runner2(consumer, 1, 1); + + const auto& replay_messages = runner.get_messages(); + REQUIRE(replay_messages.size() == 1); + const auto& replay_message = replay_messages[0]; + + //produce the same message again + producer.produce(replay_message); + runner2.try_join(); + + const auto& messages = runner2.get_messages(); + REQUIRE(messages.size() == 1); + const auto& message = messages[0]; + CHECK(message.get_payload() == payload); + CHECK(message.get_key() == key); + CHECK(message.get_topic() == KAFKA_TOPIC); + CHECK(message.get_partition() == partition); + CHECK(!!message.get_error() == false); + REQUIRE(!!message.get_timestamp() == true); + CHECK(message.get_timestamp()->get_timestamp() == timestamp); + } SECTION("callbacks") { // Now create a producer and produce a message @@ -240,3 +272,34 @@ TEST_CASE("buffered producer", "[producer]") { CHECK(message.get_payload() == payload); } } + +TEST_CASE("buffered producer with limited buffer", "[producer]") { + int partition = 0; + int num_messages = 4; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 3, 1); + + // Now create a buffered producer and produce two messages + BufferedProducer producer(make_producer_config()); + const string payload = "Hello world! 2"; + const string key = "such key"; + REQUIRE(producer.get_buffer_size() == 0); + REQUIRE(producer.get_max_buffer_size() == -1); + + // Limit the size of the internal buffer + producer.set_max_buffer_size(num_messages-1); + while (num_messages--) { + producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); + } + REQUIRE(producer.get_buffer_size() == 1); + + // Finish the runner + runner.try_join(); + + // Validate messages received + const auto& messages = runner.get_messages(); + REQUIRE(messages.size() == producer.get_max_buffer_size()); +}