diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h new file mode 100644 index 0000000..7a13b52 --- /dev/null +++ b/include/cppkafka/utils/buffered_producer.h @@ -0,0 +1,210 @@ +#ifndef CPPKAFKA_BUFFERED_PRODUCER_H +#define CPPKAFKA_BUFFERED_PRODUCER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include "producer.h" + +namespace cppkafka { + +template +class BufferedProducer { +public: + /** + * \brief Constructs a buffered producer using the provided configuration + * + * \param config The configuration to be used on the actual Producer object + */ + BufferedProducer(Configuration config); + + /** + * \brief Adds a message to the producer's buffer. + * + * The message won't be sent until flush is called. + * + * \param topic The topic in which this message should be written to + * \param partition The partition in which this message should be written to + * \param payload The message's payload + */ + void add_message(const std::string& topic, const Partition& partition, BufferType payload); + + /** + * \brief Adds a message to the producer's buffer. + * + * The message won't be sent until flush is called. + * + * \param topic The topic in which this message should be written to + * \param partition The partition in which this message should be written to + * \param key The message's key + * \param payload The message's payload + */ + void add_message(const std::string& topic, const Partition& partition, BufferType key, + BufferType payload); + + /** + * \brief Flushes the buffered messages. + * + * This will send all messages and keep waiting until all of them are acknowledged. + */ + void flush(); + + /** + * Gets the Producer object + */ + Producer& get_producer(); + + /** + * Gets the Producer object + */ + const Producer& get_producer() const; +private: + // Pick the most appropriate index type depending on the platform we're using + using IndexType = std::conditional::type; + + struct BufferedMessage { + BufferedMessage(unsigned topic_index, Partition partition, BufferType key, + BufferType payload) + : key(std::move(key)), payload(std::move(payload)), topic_index(topic_index), + partition(partition) { + + } + + BufferedMessage(unsigned topic_index, Partition partition, BufferType payload) + : payload(std::move(payload)), topic_index(topic_index), partition(partition) { + + } + + boost::optional key; + BufferType payload; + unsigned topic_index; + Partition partition; + }; + + template + void buffer_message(const std::string& topic, Args&&... args); + unsigned get_topic_index(const std::string& topic); + void produce_message(IndexType index, const BufferedMessage& message); + Configuration prepare_configuration(Configuration config); + void on_delivery_report(const Message& message); + + Producer producer_; + std::map messages_; + std::vector failed_indexes_; + IndexType current_index_{0}; + std::vector topics_; + std::unordered_map topic_mapping_; +}; + +template +BufferedProducer::BufferedProducer(Configuration config) +: producer_(prepare_configuration(std::move(config))) { + +} + +template +void BufferedProducer::add_message(const std::string& topic, + const Partition& partition, + BufferType payload) { + buffer_message(topic, partition, payload); +} + +template +void BufferedProducer::flush() { + for (const auto& message_pair : messages_) { + produce_message(message_pair.first, message_pair.second); + } + + while (!messages_.empty()) { + producer_.poll(); + if (!failed_indexes_.empty()) { + for (const IndexType index : failed_indexes_) { + produce_message(index, messages_.at(index)); + } + } + failed_indexes_.clear(); + } +} + +template +Producer& BufferedProducer::get_producer() { + return producer_; +} + +template +const Producer& BufferedProducer::get_producer() const { + return producer_; +} + +template +void BufferedProducer::add_message(const std::string& topic, + const Partition& partition, + BufferType key, BufferType payload) { + buffer_message(topic, partition, key, payload); +} + +template +template +void BufferedProducer::buffer_message(const std::string& topic, Args&&... args) { + IndexType index = messages_.size(); + BufferedMessage message{get_topic_index(topic), std::forward(args)...}; + messages_.emplace(index, std::move(message)); +} + +template +unsigned BufferedProducer::get_topic_index(const std::string& topic) { + auto iter = topic_mapping_.find(topic); + if (iter == topic_mapping_.end()) { + unsigned index = topics_.size(); + topics_.push_back(producer_.get_topic(topic)); + iter = topic_mapping_.emplace(topic, index).first; + } + return iter->second; +} + +template +void BufferedProducer::produce_message(IndexType index, + const BufferedMessage& message) { + if (message.key) { + producer_.produce(topics_[message.topic_index], message.partition, *message.key, + message.payload, reinterpret_cast(index)); + } + else { + producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/, + message.payload, reinterpret_cast(index)); + } +} + +template +Configuration BufferedProducer::prepare_configuration(Configuration config) { + using std::placeholders::_2; + auto callback = std::bind(&BufferedProducer::on_delivery_report, this, _2); + config.set_delivery_report_callback(std::move(callback)); + return config; +} + +template +void BufferedProducer::on_delivery_report(const Message& message) { + const IndexType index = reinterpret_cast(message.get_private_data()); + auto iter = messages_.find(index); + // Got an ACK for an unexpected message? + if (iter == messages_.end()) { + return; + } + // If there was an error sending this message, then we need to re-send it + if (message.get_error()) { + failed_indexes_.push_back(index); + } + else { + messages_.erase(iter); + } +} + +} // cppkafka + +#endif // CPPKAFKA_BUFFERED_PRODUCER_H diff --git a/include/cppkafka/utils/compacted_topic_processor.h b/include/cppkafka/utils/compacted_topic_processor.h index 8d8f475..1d155a1 100644 --- a/include/cppkafka/utils/compacted_topic_processor.h +++ b/include/cppkafka/utils/compacted_topic_processor.h @@ -39,7 +39,6 @@ #include "consumer.h" namespace cppkafka { -namespace utils { /** * \brief Events generated by a CompactedTopicProcessor */ @@ -341,7 +340,6 @@ void CompactedTopicProcessor::on_assignment(TopicPartitionList& topic_part } } -} // utils } // cppkafka #endif // CPPKAFKA_COMPACTED_TOPIC_PROCESSOR_H diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 572d6b6..a80280f 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -6,6 +6,7 @@ #include #include "cppkafka/producer.h" #include "cppkafka/consumer.h" +#include "cppkafka/utils/buffered_producer.h" using std::string; using std::to_string; @@ -275,3 +276,33 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { EXPECT_EQ(partition, message.get_partition()); EXPECT_TRUE(callback_called); } + +TEST_F(ProducerTest, BufferedProducer) { + int partition = 0; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 2, 1); + + // Now create a buffered producer and produce two messages + BufferedProducer producer(make_producer_config()); + string payload = "Hello world! 2"; + string key = "such key"; + producer.add_message(KAFKA_TOPIC, partition, key, payload); + producer.add_message(KAFKA_TOPIC, partition, payload); + producer.flush(); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(2, messages.size()); + const auto& message = messages[0]; + EXPECT_EQ(Buffer(payload), message.get_payload()); + EXPECT_EQ(Buffer(key), message.get_key()); + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(partition, message.get_partition()); + EXPECT_FALSE(message.get_error()); + + EXPECT_FALSE(messages[1].get_key()); + EXPECT_EQ(Buffer(payload), messages[1].get_payload()); +}