From f781afe5cf1c21a822abf103b70704557764d730 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 16 Apr 2017 09:47:39 -0700 Subject: [PATCH] Allow using generic message builders not tied to Buffer class --- CMakeLists.txt | 5 - examples/CMakeLists.txt | 1 + include/cppkafka/message_builder.h | 237 +++++++++++++++------ include/cppkafka/utils/buffered_producer.h | 22 +- src/CMakeLists.txt | 4 +- src/message_builder.cpp | 50 ----- tests/CMakeLists.txt | 1 + 7 files changed, 193 insertions(+), 127 deletions(-) delete mode 100644 src/message_builder.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b3dd353..28293cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,14 +25,9 @@ else() add_definitions("-DCPPKAFKA_STATIC=1") endif() -include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka) - # Look for Boost (just need boost.optional headers here) find_package(Boost REQUIRED) -include_directories(${Boost_INCLUDE_DIRS}) - find_package(RdKafka REQUIRED) -include_directories(${RDKAFKA_INCLUDE_DIR}) if (HAVE_OFFSETS_FOR_TIMES) message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times") diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 1f58205..6a98e74 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,6 +4,7 @@ if (Boost_PROGRAM_OPTIONS_FOUND) link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) + include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp) add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp) diff --git a/include/cppkafka/message_builder.h b/include/cppkafka/message_builder.h index 036d647..05474cb 100644 --- a/include/cppkafka/message_builder.h +++ b/include/cppkafka/message_builder.h @@ -38,6 +38,166 @@ namespace cppkafka { +/** + * \brief Base template class for message construction + */ +template +class CPPKAFKA_API BasicMessageBuilder { +public: + /** + * Construct a BasicMessageBuilder + * + * \param topic The topic into which this message would be produced + */ + BasicMessageBuilder(const Topic& topic); + + /** + * Sets the partition into which this message will be produced + * + * \param value The partition to be used + */ + Concrete& partition(Partition value); + + /** + * Sets the message's key + * + * \param value The key to be used + */ + Concrete& key(const BufferType& value); + + /** + * Sets the message's key + * + * \param value The key to be used + */ + Concrete& key(BufferType&& value); + + /** + * Sets the message's payload + * + * \param value The payload to be used + */ + Concrete& payload(const BufferType& value); + + /** + * Sets the message's payload + * + * \param value The payload to be used + */ + Concrete& payload(BufferType&& value); + + /** + * Sets the message's user data pointer + * + * \param value Pointer to the user data to be used on the produce call + */ + Concrete& user_data(void* value); + + /** + * Gets the topic this message will be produced into + */ + const Topic& topic() const; + + /** + * Gets the partition this message will be produced into + */ + const Partition& partition() const; + + /** + * Gets the message's key + */ + const BufferType& key() const; + + /** + * Gets the message's payload + */ + const BufferType& payload() const; + + /** + * Gets the message's user data pointer + */ + void* user_data() const; +private: + void construct_buffer(BufferType& lhs, const BufferType& rhs); + + const Topic& topic_; + Partition partition_; + BufferType key_; + BufferType payload_; + void* user_data_; +}; + +template +BasicMessageBuilder::BasicMessageBuilder(const Topic& topic) +: topic_(topic) { +} + +template +C& BasicMessageBuilder::partition(Partition value) { + partition_ = value; + return static_cast(*this); +} + +template +C& BasicMessageBuilder::key(const T& value) { + static_cast(*this).construct_buffer(key_, value); + return static_cast(*this); +} + +template +C& BasicMessageBuilder::key(T&& value) { + key_ = std::move(value); + return static_cast(*this); +} + +template +C& BasicMessageBuilder::payload(const T& value) { + static_cast(*this).construct_buffer(payload_, value); + return static_cast(*this); +} + +template +C& BasicMessageBuilder::payload(T&& value) { + payload_ = std::move(value); + return static_cast(*this); +} + +template +C& BasicMessageBuilder::user_data(void* value) { + user_data_ = value; + return static_cast(*this); +} + +template +const Topic& BasicMessageBuilder::topic() const { + return topic_; +} + +template +const Partition& BasicMessageBuilder::partition() const { + return partition_; +} + +template +const T& BasicMessageBuilder::key() const { + return key_; +} + +template +const T& BasicMessageBuilder::payload() const { + return payload_; +} + +template +void* BasicMessageBuilder::user_data() const { + return user_data_; +} + +template +void BasicMessageBuilder::construct_buffer(T& lhs, const T& rhs) { + lhs = rhs; +} + /** * \brief Message builder class * @@ -56,73 +216,22 @@ namespace cppkafka { * producer.produce(MessageBuilder(topic).partition(5).payload(payload)); * \endcode */ -class CPPKAFKA_API MessageBuilder { +class MessageBuilder : public BasicMessageBuilder { public: - /** - * Construct a MessageBuilder - * - * \param topic The topic into which this message would be produced - */ - MessageBuilder(const Topic& topic); + using BasicMessageBuilder::BasicMessageBuilder; - /** - * Sets the partition into which this message will be produced - * - * \param value The partition to be used - */ - MessageBuilder& partition(Partition value); + void construct_buffer(Buffer& lhs, const Buffer& rhs) { + lhs = Buffer(rhs.get_data(), rhs.get_size()); + } +}; - /** - * Sets the message's key - * - * \param value The key to be used - */ - MessageBuilder& key(const Buffer& value); - - /** - * Sets the message's payload - * - * \param value The payload to be used - */ - MessageBuilder& payload(const Buffer& value); - - /** - * Sets the message's user data pointer - * - * \param value Pointer to the user data to be used on the produce call - */ - MessageBuilder& user_data(void* value); - - /** - * Gets the topic this message will be produced into - */ - const Topic& topic() const; - - /** - * Gets the partition this message will be produced into - */ - const Partition& partition() const; - - /** - * Gets the message's key - */ - const Buffer& key() const; - - /** - * Gets the message's payload - */ - const Buffer& payload() const; - - /** - * Gets the message's user data pointer - */ - void* user_data() const; -private: - const Topic& topic_; - Partition partition_; - Buffer key_; - Buffer payload_; - void* user_data_; +/** + * \brief Message builder class for a specific data type + */ +template +class ConcreteMessageBuilder : public BasicMessageBuilder> { +public: + using BasicMessageBuilder>::BasicMessageBuilder; }; } // cppkafka diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index b51c17e..8fce20c 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -17,6 +17,11 @@ namespace cppkafka { template class BufferedProducer { public: + /** + * Concrete builder + */ + using Builder = ConcreteMessageBuilder; + /** * \brief Constructs a buffered producer using the provided configuration * @@ -54,12 +59,12 @@ private: using IndexType = std::conditional::type; const Topic& get_topic(const std::string& topic); - void produce_message(IndexType index, MessageBuilder& message); + void produce_message(IndexType index, Builder& message); Configuration prepare_configuration(Configuration config); void on_delivery_report(const Message& message); Producer producer_; - std::map messages_; + std::map messages_; std::vector failed_indexes_; IndexType current_index_{0}; std::vector topics_; @@ -74,7 +79,7 @@ BufferedProducer::BufferedProducer(Configuration config) template void BufferedProducer::add_message(const MessageBuilder& builder) { - MessageBuilder local_builder(get_topic(builder.topic().get_name())); + Builder local_builder(get_topic(builder.topic().get_name())); local_builder.partition(builder.partition()); local_builder.key(builder.key()); local_builder.payload(builder.payload()); @@ -122,13 +127,16 @@ const Topic& BufferedProducer::get_topic(const std::string& topic) { } template -void BufferedProducer::produce_message(IndexType index, - MessageBuilder& builder) { +void BufferedProducer::produce_message(IndexType index, Builder& builder) { bool sent = false; - builder.user_data(reinterpret_cast(index)); + MessageBuilder local_builder(builder.topic()); + local_builder.partition(builder.partition()) + .key(builder.key()) + .payload(builder.payload()) + .user_data(reinterpret_cast(index)); while (!sent) { try { - producer_.produce(builder); + producer_.produce(local_builder); sent = true; } catch (const HandleException& ex) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3172403..e77feff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -11,13 +11,15 @@ set(SOURCES topic_partition_list.cpp metadata.cpp error.cpp - message_builder.cpp kafka_handle_base.cpp producer.cpp consumer.cpp ) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka) +include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) + add_library(cppkafka ${CPPKAFKA_LIBRARY_TYPE} ${SOURCES}) set_target_properties(cppkafka PROPERTIES VERSION ${CPPKAFKA_VERSION} SOVERSION ${CPPKAFKA_VERSION}) diff --git a/src/message_builder.cpp b/src/message_builder.cpp deleted file mode 100644 index 34b5b26..0000000 --- a/src/message_builder.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include "message_builder.h" -#include - -namespace cppkafka { - -MessageBuilder::MessageBuilder(const Topic& topic) -: topic_(topic) { -} - -MessageBuilder& MessageBuilder::partition(Partition value) { - partition_ = value; - return *this; -} - -MessageBuilder& MessageBuilder::key(const Buffer& value) { - key_ = Buffer(value.get_data(), value.get_size()); - return *this; -} - -MessageBuilder& MessageBuilder::payload(const Buffer& value) { - payload_ = Buffer(value.get_data(), value.get_size()); - return *this; -} - -MessageBuilder& MessageBuilder::user_data(void* value) { - user_data_ = value; - return *this; -} - -const Topic& MessageBuilder::topic() const { - return topic_; -} - -const Partition& MessageBuilder::partition() const { - return partition_; -} - -const Buffer& MessageBuilder::key() const { - return key_; -} - -const Buffer& MessageBuilder::payload() const { - return payload_; -} - -void* MessageBuilder::user_data() const { - return user_data_; -} - -} // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 78832c0..0b7d4df 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,5 +1,6 @@ include_directories(${GOOGLETEST_INCLUDE}) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) +include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) link_directories(${GOOGLETEST_LIBRARY}) link_libraries(cppkafka ${RDKAFKA_LIBRARY} gtest gtest_main pthread)