From 4fccf277e0e1a6389ea44156bb74022e43c364c0 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 4 Jun 2016 19:15:32 -0700 Subject: [PATCH] Add partitioner callback to topic configuration --- include/cppkafka/kafka_handle_base.h | 8 ++++++- include/cppkafka/topic.h | 7 +++++- include/cppkafka/topic_configuration.h | 16 ++++++++++++- src/kafka_handle_base.cpp | 16 ++++++++++--- src/topic.cpp | 13 ++++++++++ src/topic_configuration.cpp | 33 ++++++++++++++++++++++++++ tests/producer_test.cpp | 18 +++++++++++--- 7 files changed, 102 insertions(+), 9 deletions(-) diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index fd304aa..aaf95e3 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -4,16 +4,18 @@ #include #include #include +#include +#include #include #include "metadata.h" #include "topic_partition.h" #include "topic_partition_list.h" +#include "topic_configuration.h" #include "configuration.h" namespace cppkafka { class Topic; -class TopicConfiguration; class KafkaHandleBase { public: @@ -46,13 +48,17 @@ private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; using HandlePtr = std::unique_ptr; + using TopicConfigurationMap = std::unordered_map; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Metadata get_metadata(rd_kafka_topic_t* topic_ptr); + void save_topic_config(const std::string& topic_name, TopicConfiguration config); HandlePtr handle_; std::chrono::milliseconds timeout_ms_; Configuration config_; + TopicConfigurationMap topic_configurations_; + std::mutex topic_configurations_mutex_; }; } // cppkafka diff --git a/include/cppkafka/topic.h b/include/cppkafka/topic.h index 857e615..2f956ab 100644 --- a/include/cppkafka/topic.h +++ b/include/cppkafka/topic.h @@ -10,14 +10,19 @@ namespace cppkafka { class Topic { public: + static Topic make_non_owning(rd_kafka_topic_t* handle); + Topic(rd_kafka_topic_t* handle); std::string get_name() const; - rd_kafka_topic_t* get_handle() const; private: using HandlePtr = std::unique_ptr; + struct NonOwningTag { }; + + Topic(rd_kafka_topic_t* handle, NonOwningTag); + HandlePtr handle_; }; diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index aa56f84..3e97496 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -2,26 +2,40 @@ #define CPPKAFKA_TOPIC_CONFIGURATION_H #include +#include #include #include "clonable_ptr.h" namespace cppkafka { +class Topic; +class Buffer; + class TopicConfiguration { public: + using PartitionerCallback = std::function; + TopicConfiguration(); void set(const std::string& name, const std::string& value); + void set_partitioner_callback(PartitionerCallback callback); + + void set_as_opaque(); + + const PartitionerCallback& get_partitioner_callback() const; rd_kafka_topic_conf_t* get_handle() const; private: - using HandlePtr = ClonablePtr; TopicConfiguration(rd_kafka_topic_conf_t* ptr); static HandlePtr make_handle(rd_kafka_topic_conf_t* ptr); HandlePtr handle_; + PartitionerCallback partitioner_callback_; }; } // cppkafka diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 5eb3e75..5e72af7 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -1,12 +1,13 @@ #include "kafka_handle_base.h" #include "exceptions.h" -#include "topic_configuration.h" #include "topic.h" #include "topic_partition_list.h" using std::string; using std::vector; using std::move; +using std::lock_guard; +using std::mutex; using std::chrono::milliseconds; namespace cppkafka { @@ -46,11 +47,14 @@ rd_kafka_t* KafkaHandleBase::get_handle() { } Topic KafkaHandleBase::get_topic(const string& name) { + save_topic_config(name, TopicConfiguration{}); return get_topic(name, nullptr); } -Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicConfig) { - return get_topic(name, topicConfig.get_handle()); +Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) { + auto handle = config.get_handle(); + save_topic_config(name, move(config)); + return get_topic(name, rd_kafka_topic_conf_dup(handle)); } Metadata KafkaHandleBase::get_metadata() { @@ -89,6 +93,12 @@ Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) { return Metadata(metadata); } +void KafkaHandleBase::save_topic_config(const string& topic_name, TopicConfiguration config) { + lock_guard _(topic_configurations_mutex_); + auto iter = topic_configurations_.emplace(topic_name, move(config)).first; + iter->second.set_as_opaque(); +} + void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw HandleException(error); diff --git a/src/topic.cpp b/src/topic.cpp index 63721e4..df16fc8 100644 --- a/src/topic.cpp +++ b/src/topic.cpp @@ -5,11 +5,24 @@ using std::string; namespace cppkafka { +void dummy_topic_destroyer(rd_kafka_topic_t*) { + +} + +Topic Topic::make_non_owning(rd_kafka_topic_t* handle) { + return Topic(handle, NonOwningTag{}); +} + Topic::Topic(rd_kafka_topic_t* handle) : handle_(handle, &rd_kafka_topic_destroy) { } +Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag) +: handle_(handle, &dummy_topic_destroyer) { + +} + string Topic::get_name() const { return rd_kafka_topic_name(handle_.get()); } diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index ae73024..d2a58ba 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -1,11 +1,30 @@ #include "topic_configuration.h" #include #include "exceptions.h" +#include "topic.h" +#include "buffer.h" using std::string; namespace cppkafka { +int32_t partitioner_callback_proxy(const rd_kafka_topic_t* handle, const void *key_ptr, + size_t key_size, int32_t partition_count, + void* topic_opaque, void* message_opaque) { + const TopicConfiguration* config = static_cast(topic_opaque); + const auto& callback = config->get_partitioner_callback(); + if (callback) { + Topic topic = Topic::make_non_owning(const_cast(handle)); + Buffer key(static_cast(key_ptr), key_size); + return callback(topic, key, partition_count); + } + else { + return rd_kafka_msg_partitioner_consistent_random(handle, key_ptr, key_size, + partition_count, topic_opaque, + message_opaque); + } +} + TopicConfiguration::TopicConfiguration() : handle_(make_handle(rd_kafka_topic_conf_new())) { @@ -26,6 +45,20 @@ void TopicConfiguration::set(const string& name, const string& value) { } } +void TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) { + partitioner_callback_ = move(callback); + rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy); +} + +void TopicConfiguration::set_as_opaque() { + rd_kafka_topic_conf_set_opaque(handle_.get(), this); +} + +const TopicConfiguration::PartitionerCallback& +TopicConfiguration::get_partitioner_callback() const { + return partitioner_callback_; +} + rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { return handle_.get(); } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 95517d8..ebc686c 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -197,15 +197,27 @@ TEST_F(ProducerTest, Callbacks) { // Now create a producer and produce a message string payload = "Hello world!"; + string key = "hehe"; bool deliver_report_called = false; Configuration config = make_producer_config(); config.set_delivery_report_callback([&](Producer&, const Message& msg) { EXPECT_EQ(payload, msg.get_payload().as_string()); deliver_report_called = true; }); + + TopicConfiguration topic_config; + topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, + int32_t partition_count) { + EXPECT_EQ(key, msg_key.as_string()); + EXPECT_EQ(3, partition_count); + EXPECT_EQ(KAFKA_TOPIC, topic.get_name()); + return 0; + }); + Producer producer(move(config)); - Topic topic = producer.get_topic(KAFKA_TOPIC); - producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + Topic topic = producer.get_topic(KAFKA_TOPIC, topic_config); + producer.produce(topic, {}, Buffer(payload.data(), payload.size()), + Buffer(key.data(), key.size())); producer.poll(); runner.try_join(); @@ -213,7 +225,7 @@ TEST_F(ProducerTest, Callbacks) { ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; EXPECT_EQ(payload, message.get_payload().as_string()); - EXPECT_EQ("", message.get_key().as_string()); + EXPECT_EQ(key, message.get_key().as_string()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(0, message.get_error());