Allow using generic message builders not tied to Buffer class

This commit is contained in:
Matias Fontanini
2017-04-16 09:47:39 -07:00
parent 861e41a792
commit f781afe5cf
7 changed files with 193 additions and 127 deletions

View File

@@ -25,14 +25,9 @@ else()
add_definitions("-DCPPKAFKA_STATIC=1") add_definitions("-DCPPKAFKA_STATIC=1")
endif() endif()
include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka)
# Look for Boost (just need boost.optional headers here) # Look for Boost (just need boost.optional headers here)
find_package(Boost REQUIRED) find_package(Boost REQUIRED)
include_directories(${Boost_INCLUDE_DIRS})
find_package(RdKafka REQUIRED) find_package(RdKafka REQUIRED)
include_directories(${RDKAFKA_INCLUDE_DIR})
if (HAVE_OFFSETS_FOR_TIMES) if (HAVE_OFFSETS_FOR_TIMES)
message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times") message(STATUS "Enabling support for KafkaHandleBase::get_offsets_for_times")

View File

@@ -4,6 +4,7 @@ if (Boost_PROGRAM_OPTIONS_FOUND)
link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR})
add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp) add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp)
add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp) add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp)

View File

@@ -38,6 +38,166 @@
namespace cppkafka { namespace cppkafka {
/**
* \brief Base template class for message construction
*/
template <typename BufferType, typename Concrete>
class CPPKAFKA_API BasicMessageBuilder {
public:
/**
* Construct a BasicMessageBuilder
*
* \param topic The topic into which this message would be produced
*/
BasicMessageBuilder(const Topic& topic);
/**
* Sets the partition into which this message will be produced
*
* \param value The partition to be used
*/
Concrete& partition(Partition value);
/**
* Sets the message's key
*
* \param value The key to be used
*/
Concrete& key(const BufferType& value);
/**
* Sets the message's key
*
* \param value The key to be used
*/
Concrete& key(BufferType&& value);
/**
* Sets the message's payload
*
* \param value The payload to be used
*/
Concrete& payload(const BufferType& value);
/**
* Sets the message's payload
*
* \param value The payload to be used
*/
Concrete& payload(BufferType&& value);
/**
* Sets the message's user data pointer
*
* \param value Pointer to the user data to be used on the produce call
*/
Concrete& user_data(void* value);
/**
* Gets the topic this message will be produced into
*/
const Topic& topic() const;
/**
* Gets the partition this message will be produced into
*/
const Partition& partition() const;
/**
* Gets the message's key
*/
const BufferType& key() const;
/**
* Gets the message's payload
*/
const BufferType& payload() const;
/**
* Gets the message's user data pointer
*/
void* user_data() const;
private:
void construct_buffer(BufferType& lhs, const BufferType& rhs);
const Topic& topic_;
Partition partition_;
BufferType key_;
BufferType payload_;
void* user_data_;
};
template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Topic& topic)
: topic_(topic) {
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::partition(Partition value) {
partition_ = value;
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::key(const T& value) {
static_cast<C&>(*this).construct_buffer(key_, value);
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::key(T&& value) {
key_ = std::move(value);
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::payload(const T& value) {
static_cast<C&>(*this).construct_buffer(payload_, value);
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::payload(T&& value) {
payload_ = std::move(value);
return static_cast<C&>(*this);
}
template <typename T, typename C>
C& BasicMessageBuilder<T, C>::user_data(void* value) {
user_data_ = value;
return static_cast<C&>(*this);
}
template <typename T, typename C>
const Topic& BasicMessageBuilder<T, C>::topic() const {
return topic_;
}
template <typename T, typename C>
const Partition& BasicMessageBuilder<T, C>::partition() const {
return partition_;
}
template <typename T, typename C>
const T& BasicMessageBuilder<T, C>::key() const {
return key_;
}
template <typename T, typename C>
const T& BasicMessageBuilder<T, C>::payload() const {
return payload_;
}
template <typename T, typename C>
void* BasicMessageBuilder<T, C>::user_data() const {
return user_data_;
}
template <typename T, typename C>
void BasicMessageBuilder<T, C>::construct_buffer(T& lhs, const T& rhs) {
lhs = rhs;
}
/** /**
* \brief Message builder class * \brief Message builder class
* *
@@ -56,73 +216,22 @@ namespace cppkafka {
* producer.produce(MessageBuilder(topic).partition(5).payload(payload)); * producer.produce(MessageBuilder(topic).partition(5).payload(payload));
* \endcode * \endcode
*/ */
class CPPKAFKA_API MessageBuilder { class MessageBuilder : public BasicMessageBuilder<Buffer, MessageBuilder> {
public: public:
/** using BasicMessageBuilder::BasicMessageBuilder;
* Construct a MessageBuilder
*
* \param topic The topic into which this message would be produced
*/
MessageBuilder(const Topic& topic);
/** void construct_buffer(Buffer& lhs, const Buffer& rhs) {
* Sets the partition into which this message will be produced lhs = Buffer(rhs.get_data(), rhs.get_size());
* }
* \param value The partition to be used };
*/
MessageBuilder& partition(Partition value);
/** /**
* Sets the message's key * \brief Message builder class for a specific data type
* */
* \param value The key to be used template <typename T>
*/ class ConcreteMessageBuilder : public BasicMessageBuilder<T, ConcreteMessageBuilder<T>> {
MessageBuilder& key(const Buffer& value); public:
using BasicMessageBuilder<T, ConcreteMessageBuilder<T>>::BasicMessageBuilder;
/**
* Sets the message's payload
*
* \param value The payload to be used
*/
MessageBuilder& payload(const Buffer& value);
/**
* Sets the message's user data pointer
*
* \param value Pointer to the user data to be used on the produce call
*/
MessageBuilder& user_data(void* value);
/**
* Gets the topic this message will be produced into
*/
const Topic& topic() const;
/**
* Gets the partition this message will be produced into
*/
const Partition& partition() const;
/**
* Gets the message's key
*/
const Buffer& key() const;
/**
* Gets the message's payload
*/
const Buffer& payload() const;
/**
* Gets the message's user data pointer
*/
void* user_data() const;
private:
const Topic& topic_;
Partition partition_;
Buffer key_;
Buffer payload_;
void* user_data_;
}; };
} // cppkafka } // cppkafka

View File

@@ -17,6 +17,11 @@ namespace cppkafka {
template <typename BufferType> template <typename BufferType>
class BufferedProducer { class BufferedProducer {
public: public:
/**
* Concrete builder
*/
using Builder = ConcreteMessageBuilder<BufferType>;
/** /**
* \brief Constructs a buffered producer using the provided configuration * \brief Constructs a buffered producer using the provided configuration
* *
@@ -54,12 +59,12 @@ private:
using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type; using IndexType = std::conditional<sizeof(void*) == 8, uint64_t, uint32_t>::type;
const Topic& get_topic(const std::string& topic); const Topic& get_topic(const std::string& topic);
void produce_message(IndexType index, MessageBuilder& message); void produce_message(IndexType index, Builder& message);
Configuration prepare_configuration(Configuration config); Configuration prepare_configuration(Configuration config);
void on_delivery_report(const Message& message); void on_delivery_report(const Message& message);
Producer producer_; Producer producer_;
std::map<IndexType, MessageBuilder> messages_; std::map<IndexType, Builder> messages_;
std::vector<IndexType> failed_indexes_; std::vector<IndexType> failed_indexes_;
IndexType current_index_{0}; IndexType current_index_{0};
std::vector<Topic> topics_; std::vector<Topic> topics_;
@@ -74,7 +79,7 @@ BufferedProducer<BufferType>::BufferedProducer(Configuration config)
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) { void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
MessageBuilder local_builder(get_topic(builder.topic().get_name())); Builder local_builder(get_topic(builder.topic().get_name()));
local_builder.partition(builder.partition()); local_builder.partition(builder.partition());
local_builder.key(builder.key()); local_builder.key(builder.key());
local_builder.payload(builder.payload()); local_builder.payload(builder.payload());
@@ -122,13 +127,16 @@ const Topic& BufferedProducer<BufferType>::get_topic(const std::string& topic) {
} }
template <typename BufferType> template <typename BufferType>
void BufferedProducer<BufferType>::produce_message(IndexType index, void BufferedProducer<BufferType>::produce_message(IndexType index, Builder& builder) {
MessageBuilder& builder) {
bool sent = false; bool sent = false;
builder.user_data(reinterpret_cast<void*>(index)); MessageBuilder local_builder(builder.topic());
local_builder.partition(builder.partition())
.key(builder.key())
.payload(builder.payload())
.user_data(reinterpret_cast<void*>(index));
while (!sent) { while (!sent) {
try { try {
producer_.produce(builder); producer_.produce(local_builder);
sent = true; sent = true;
} }
catch (const HandleException& ex) { catch (const HandleException& ex) {

View File

@@ -11,13 +11,15 @@ set(SOURCES
topic_partition_list.cpp topic_partition_list.cpp
metadata.cpp metadata.cpp
error.cpp error.cpp
message_builder.cpp
kafka_handle_base.cpp kafka_handle_base.cpp
producer.cpp producer.cpp
consumer.cpp consumer.cpp
) )
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR})
add_library(cppkafka ${CPPKAFKA_LIBRARY_TYPE} ${SOURCES}) add_library(cppkafka ${CPPKAFKA_LIBRARY_TYPE} ${SOURCES})
set_target_properties(cppkafka PROPERTIES VERSION ${CPPKAFKA_VERSION} set_target_properties(cppkafka PROPERTIES VERSION ${CPPKAFKA_VERSION}
SOVERSION ${CPPKAFKA_VERSION}) SOVERSION ${CPPKAFKA_VERSION})

View File

@@ -1,50 +0,0 @@
#include "message_builder.h"
#include <boost/utility/typed_in_place_factory.hpp>
namespace cppkafka {
MessageBuilder::MessageBuilder(const Topic& topic)
: topic_(topic) {
}
MessageBuilder& MessageBuilder::partition(Partition value) {
partition_ = value;
return *this;
}
MessageBuilder& MessageBuilder::key(const Buffer& value) {
key_ = Buffer(value.get_data(), value.get_size());
return *this;
}
MessageBuilder& MessageBuilder::payload(const Buffer& value) {
payload_ = Buffer(value.get_data(), value.get_size());
return *this;
}
MessageBuilder& MessageBuilder::user_data(void* value) {
user_data_ = value;
return *this;
}
const Topic& MessageBuilder::topic() const {
return topic_;
}
const Partition& MessageBuilder::partition() const {
return partition_;
}
const Buffer& MessageBuilder::key() const {
return key_;
}
const Buffer& MessageBuilder::payload() const {
return payload_;
}
void* MessageBuilder::user_data() const {
return user_data_;
}
} // cppkafka

View File

@@ -1,5 +1,6 @@
include_directories(${GOOGLETEST_INCLUDE}) include_directories(${GOOGLETEST_INCLUDE})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR})
link_directories(${GOOGLETEST_LIBRARY}) link_directories(${GOOGLETEST_LIBRARY})
link_libraries(cppkafka ${RDKAFKA_LIBRARY} gtest gtest_main pthread) link_libraries(cppkafka ${RDKAFKA_LIBRARY} gtest gtest_main pthread)