From e26d7e7db249bc83fc493b29e94adabf3793605a Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 15 Apr 2017 16:46:37 -0700 Subject: [PATCH] Add MessageBuilder and use it on producer classes --- examples/kafka_producer.cpp | 3 +- include/cppkafka/message_builder.h | 126 +++++++++++++++++++++ include/cppkafka/producer.h | 30 +---- include/cppkafka/utils/buffered_producer.h | 91 ++++----------- src/CMakeLists.txt | 1 + src/message_builder.cpp | 50 ++++++++ src/producer.cpp | 23 ++-- tests/compacted_topic_processor_test.cpp | 6 +- tests/consumer_test.cpp | 6 +- tests/producer_test.cpp | 15 +-- 10 files changed, 224 insertions(+), 127 deletions(-) create mode 100644 include/cppkafka/message_builder.h create mode 100644 src/message_builder.cpp diff --git a/examples/kafka_producer.cpp b/examples/kafka_producer.cpp index 65b7b27..1aceb56 100644 --- a/examples/kafka_producer.cpp +++ b/examples/kafka_producer.cpp @@ -15,6 +15,7 @@ using cppkafka::Producer; using cppkafka::Configuration; using cppkafka::Topic; using cppkafka::Partition; +using cppkafka::MessageBuilder; namespace po = boost::program_options; @@ -70,6 +71,6 @@ int main(int argc, char* argv[]) { string line; while (getline(cin, line)) { // Write the string into the partition - producer.produce(topic, partition, line); + producer.produce(MessageBuilder(topic).partition(partition).payload(line)); } } diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h new file mode 100644 index 0000000..ed0ea0d --- /dev/null +++ b/include/cppkafka/message_builder.h @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_MESSAGE_BUILDER_H +#define CPPKAFKA_MESSAGE_BUILDER_H + +#include +#include "buffer.h" +#include "topic.h" +#include "partition.h" + +namespace cppkafka { + +/** + * \brief Message builder class + * + * Allows building a message including topic, partition, key, payload, etc. + * + * Example: + * + * \code + * Producer producer(...); + * Topic topic = producer.get_topic("test"); + * + * string payload = "hello world"; + * producer.produce(MessageBuilder(topic).partition(5).payload(payload)); + * \endcode + */ +class MessageBuilder { +public: + /** + * Construct a MessageBuilder + * + * \param topic The topic into which this message would be produced + */ + MessageBuilder(const Topic& topic); + + /** + * Sets the partition into which this message will be produced + * + * \param value The partition to be used + */ + MessageBuilder& partition(Partition value); + + /** + * Sets the message's key + * + * \param value The key to be used + */ + MessageBuilder& key(const Buffer& value); + + /** + * Sets the message's payload + * + * \param value The payload to be used + */ + MessageBuilder& payload(const Buffer& value); + + /** + * Sets the message's user data pointer + * + * \param value Pointer to the user data to be used on the produce call + */ + MessageBuilder& user_data(void* value); + + /** + * Gets the topic this message will be produced into + */ + const Topic& topic() const; + + /** + * Gets the partition this message will be produced into + */ + const Partition& partition() const; + + /** + * Gets the message's key + */ + const Buffer& key() const; + + /** + * Gets the message's payload + */ + const Buffer& payload() const; + + /** + * Gets the message's user data pointer + */ + void* user_data() const; +private: + const Topic& topic_; + Partition partition_; + Buffer key_; + Buffer payload_; + void* user_data_; +}; + +} // cppkafka + +#endif // CPPKAFKA_MESSAGE_BUILDER_H diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index f16f2f1..550a5a6 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -37,6 +37,7 @@ #include "topic.h" #include "partition.h" #include "macros.h" +#include "message_builder.h" namespace cppkafka { @@ -72,10 +73,10 @@ class TopicConfiguration; * string payload = "some payload"; * * // Write a message into an unassigned partition - * producer.produce(topic, Partition(), payload); + * producer.produce(MessageBuilder(topic).payload(payload)); * * // Write using a key on a fixed partition (42) - * producer.produce(topic, 42, key, payload); + * producer.produce(MessageBuilder(topic).partition(42).key(key).payload(payload)); * * \endcode */ @@ -112,30 +113,7 @@ public: * \param partition The partition to write the message to * \param payload The message payload */ - void produce(const Topic& topic, const Partition& partition, const Buffer& payload); - - /** - * Produces a message - * - * \param topic The topic to write the message to - * \param partition The partition to write the message to - * \param key The message key - * \param payload The message payload - */ - void produce(const Topic& topic, const Partition& partition, const Buffer& key, - const Buffer& payload); - - /** - * Produces a message - * - * \param topic The topic to write the message to - * \param partition The partition to write the message to - * \param key The message key - * \param payload The message payload - * \param user_data The opaque data pointer to be used (accesible via Message::private_data) - */ - void produce(const Topic& topic, const Partition& partition, const Buffer& key, - const Buffer& payload, void* user_data); + void produce(const MessageBuilder& builder); /** * \brief Polls on this handle diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 2aa44e2..b51c17e 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -29,24 +29,9 @@ public: * * The message won't be sent until flush is called. * - * \param topic The topic in which this message should be written to - * \param partition The partition in which this message should be written to - * \param payload The message's payload + * \param builder The builder that contains the message to be added */ - void add_message(const std::string& topic, const Partition& partition, BufferType payload); - - /** - * \brief Adds a message to the producer's buffer. - * - * The message won't be sent until flush is called. - * - * \param topic The topic in which this message should be written to - * \param partition The partition in which this message should be written to - * \param key The message's key - * \param payload The message's payload - */ - void add_message(const std::string& topic, const Partition& partition, BufferType key, - BufferType payload); + void add_message(const MessageBuilder& builder); /** * \brief Flushes the buffered messages. @@ -68,34 +53,13 @@ private: // Pick the most appropriate index type depending on the platform we're using using IndexType = std::conditional::type; - struct BufferedMessage { - BufferedMessage(unsigned topic_index, Partition partition, BufferType key, - BufferType payload) - : key(std::move(key)), payload(std::move(payload)), topic_index(topic_index), - partition(partition) { - - } - - BufferedMessage(unsigned topic_index, Partition partition, BufferType payload) - : payload(std::move(payload)), topic_index(topic_index), partition(partition) { - - } - - boost::optional key; - BufferType payload; - unsigned topic_index; - Partition partition; - }; - - template - void buffer_message(const std::string& topic, Args&&... args); - unsigned get_topic_index(const std::string& topic); - void produce_message(IndexType index, const BufferedMessage& message); + const Topic& get_topic(const std::string& topic); + void produce_message(IndexType index, MessageBuilder& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); Producer producer_; - std::map messages_; + std::map messages_; std::vector failed_indexes_; IndexType current_index_{0}; std::vector topics_; @@ -109,15 +73,19 @@ BufferedProducer::BufferedProducer(Configuration config) } template -void BufferedProducer::add_message(const std::string& topic, - const Partition& partition, - BufferType payload) { - buffer_message(topic, partition, payload); +void BufferedProducer::add_message(const MessageBuilder& builder) { + MessageBuilder local_builder(get_topic(builder.topic().get_name())); + local_builder.partition(builder.partition()); + local_builder.key(builder.key()); + local_builder.payload(builder.payload()); + + IndexType index = messages_.size(); + messages_.emplace(index, std::move(local_builder)); } template void BufferedProducer::flush() { - for (const auto& message_pair : messages_) { + for (auto& message_pair : messages_) { produce_message(message_pair.first, message_pair.second); } @@ -143,45 +111,24 @@ const Producer& BufferedProducer::get_producer() const { } template -void BufferedProducer::add_message(const std::string& topic, - const Partition& partition, - BufferType key, BufferType payload) { - buffer_message(topic, partition, key, payload); -} - -template -template -void BufferedProducer::buffer_message(const std::string& topic, Args&&... args) { - IndexType index = messages_.size(); - BufferedMessage message{get_topic_index(topic), std::forward(args)...}; - messages_.emplace(index, std::move(message)); -} - -template -unsigned BufferedProducer::get_topic_index(const std::string& topic) { +const Topic& BufferedProducer::get_topic(const std::string& topic) { auto iter = topic_mapping_.find(topic); if (iter == topic_mapping_.end()) { unsigned index = topics_.size(); topics_.push_back(producer_.get_topic(topic)); iter = topic_mapping_.emplace(topic, index).first; } - return iter->second; + return topics_[iter->second]; } template void BufferedProducer::produce_message(IndexType index, - const BufferedMessage& message) { + MessageBuilder& builder) { bool sent = false; + builder.user_data(reinterpret_cast(index)); while (!sent) { try { - if (message.key) { - producer_.produce(topics_[message.topic_index], message.partition, *message.key, - message.payload, reinterpret_cast(index)); - } - else { - producer_.produce(topics_[message.topic_index], message.partition, {} /*key*/, - message.payload, reinterpret_cast(index)); - } + producer_.produce(builder); sent = true; } catch (const HandleException& ex) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0c2730f..3172403 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,6 +11,7 @@ set(SOURCES topic_partition_list.cpp metadata.cpp error.cpp + message_builder.cpp kafka_handle_base.cpp producer.cpp diff --git a/src/message_builder.cpp b/src/message_builder.cpp new file mode 100644 index 0000000..34b5b26 --- /dev/null +++ b/src/message_builder.cpp @@ -0,0 +1,50 @@ +#include "message_builder.h" +#include + +namespace cppkafka { + +MessageBuilder::MessageBuilder(const Topic& topic) +: topic_(topic) { +} + +MessageBuilder& MessageBuilder::partition(Partition value) { + partition_ = value; + return *this; +} + +MessageBuilder& MessageBuilder::key(const Buffer& value) { + key_ = Buffer(value.get_data(), value.get_size()); + return *this; +} + +MessageBuilder& MessageBuilder::payload(const Buffer& value) { + payload_ = Buffer(value.get_data(), value.get_size()); + return *this; +} + +MessageBuilder& MessageBuilder::user_data(void* value) { + user_data_ = value; + return *this; +} + +const Topic& MessageBuilder::topic() const { + return topic_; +} + +const Partition& MessageBuilder::partition() const { + return partition_; +} + +const Buffer& MessageBuilder::key() const { + return key_; +} + +const Buffer& MessageBuilder::payload() const { + return payload_; +} + +void* MessageBuilder::user_data() const { + return user_data_; +} + +} // cppkafka diff --git a/src/producer.cpp b/src/producer.cpp index 5cd00cf..16a4670 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -62,23 +62,14 @@ Producer::PayloadPolicy Producer::get_payload_policy() const { return message_payload_policy_; } -void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { - produce(topic, partition, Buffer{} /*key*/, payload, nullptr /*user_data*/); -} - -void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& key, - const Buffer& payload) { - produce(topic, partition, key, payload, nullptr /*user_data*/); -} - -void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& key, - const Buffer& payload, void* user_data) { - void* payload_ptr = (void*)payload.get_data(); - void* key_ptr = (void*)key.get_data(); +void Producer::produce(const MessageBuilder& builder) { + void* payload_ptr = (void*)builder.payload().get_data(); + void* key_ptr = (void*)builder.key().get_data(); const int policy = static_cast(message_payload_policy_); - int result = rd_kafka_produce(topic.get_handle(), partition.get_partition(), - policy, payload_ptr, payload.get_size(), - key_ptr, key.get_size(), user_data); + int result = rd_kafka_produce(builder.topic().get_handle(), + builder.partition().get_partition(), + policy, payload_ptr, builder.payload().get_size(), + key_ptr, builder.key().get_size(), builder.user_data()); if (result == -1) { throw HandleException(rd_kafka_errno2err(errno)); } diff --git a/tests/compacted_topic_processor_test.cpp b/tests/compacted_topic_processor_test.cpp index 7e0defd..16c032f 100644 --- a/tests/compacted_topic_processor_test.cpp +++ b/tests/compacted_topic_processor_test.cpp @@ -88,11 +88,13 @@ TEST_F(CompactedTopicProcessorTest, Consume) { }; for (const auto& element_pair : elements) { const ElementType& element = element_pair.second; - producer.produce(topic, element.partition, element_pair.first, element.value); + MessageBuilder builder(topic); + builder.partition(element.partition).key(element_pair.first).payload(element.value); + producer.produce(builder); } // Now erase the first element string deleted_key = "42"; - producer.produce(topic, 0, deleted_key, {} /*no payload*/); + producer.produce(MessageBuilder(topic).partition(0).key(deleted_key)); for (size_t i = 0; i < 10; ++i) { compacted_consumer.process_event(); diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 558f144..13fca06 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -118,7 +118,7 @@ TEST_F(ConsumerTest, AssignmentCallback) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, payload); + producer.produce(MessageBuilder(topic).partition(partition).payload(payload)); runner.try_join(); // All 3 partitions should be ours @@ -173,7 +173,7 @@ TEST_F(ConsumerTest, Rebalance) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, payload); + producer.produce(MessageBuilder(topic).partition(partition).payload(payload)); runner1.try_join(); runner2.try_join(); @@ -215,7 +215,7 @@ TEST_F(ConsumerTest, OffsetCommit) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, payload); + producer.produce(MessageBuilder(topic).partition(partition).payload(payload)); runner.try_join(); ASSERT_EQ(1, runner.get_messages().size()); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index a80280f..379e7ab 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -116,7 +116,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world! 1"; - producer.produce(topic, partition, payload); + producer.produce(MessageBuilder(topic).partition(partition).payload(payload)); runner.try_join(); const auto& messages = runner.get_messages(); @@ -147,7 +147,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) { Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world! 2"; string key = "such key"; - producer.produce(topic, partition, key, payload); + producer.produce(MessageBuilder(topic).partition(partition).key(key).payload(payload)); runner.try_join(); const auto& messages = runner.get_messages(); @@ -179,7 +179,7 @@ TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { for (size_t i = 0; i < message_count; ++i) { string payload = payload_base + to_string(i); payloads.insert(payload); - producer.produce(topic, {} /*unassigned partition*/, payload); + producer.produce(MessageBuilder(topic).payload(payload)); } runner.try_join(); @@ -224,7 +224,7 @@ TEST_F(ProducerTest, Callbacks) { Producer producer(move(config)); Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config); - producer.produce(topic, {}, key, payload); + producer.produce(MessageBuilder(topic).key(key).payload(payload)); producer.poll(); runner.try_join(); @@ -266,7 +266,7 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { Producer producer(move(config)); Topic topic = producer.get_topic(KAFKA_TOPIC); - producer.produce(topic, {}, key, payload); + producer.produce(MessageBuilder(topic).key(key).payload(payload)); producer.poll(); runner.try_join(); @@ -289,8 +289,9 @@ TEST_F(ProducerTest, BufferedProducer) { BufferedProducer producer(make_producer_config()); string payload = "Hello world! 2"; string key = "such key"; - producer.add_message(KAFKA_TOPIC, partition, key, payload); - producer.add_message(KAFKA_TOPIC, partition, payload); + Topic topic = producer.get_producer().get_topic(KAFKA_TOPIC); + producer.add_message(MessageBuilder(topic).partition(partition).key(key).payload(payload)); + producer.add_message(MessageBuilder(topic).partition(partition).payload(payload)); producer.flush(); runner.try_join();