From efd2321828f888c672a96b38a774ded416cca898 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 5 Jun 2016 10:04:48 -0700 Subject: [PATCH] Allow setting default topic config --- include/cppkafka/clonable_ptr.h | 1 + include/cppkafka/configuration.h | 6 ++++ include/cppkafka/kafka_handle_base.h | 1 - src/configuration.cpp | 15 ++++++++++ src/kafka_handle_base.cpp | 12 ++++---- tests/producer_test.cpp | 45 ++++++++++++++++++++++++++-- 6 files changed, 70 insertions(+), 10 deletions(-) diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 7b6b92e..284acdb 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -20,6 +20,7 @@ public: ClonablePtr& operator=(const ClonablePtr& rhs) { handle_.reset(cloner_(rhs.handle_.get())); + return *this; } ClonablePtr(ClonablePtr&&) = default; diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 9a622dc..aa9970e 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -5,8 +5,10 @@ #include #include #include +#include #include #include "topic_partition_list.h" +#include "topic_configuration.h" #include "clonable_ptr.h" namespace cppkafka { @@ -43,6 +45,7 @@ public: void set_log_callback(LogCallback callback); void set_stats_callback(StatsCallback callback); void set_socket_callback(SocketCallback callback); + void set_default_topic_configuration(boost::optional config); rd_kafka_conf_t* get_handle() const; const DeliveryReportCallback& get_delivery_report_callback() const; @@ -52,6 +55,8 @@ public: const LogCallback& get_log_callback() const; const StatsCallback& get_stats_callback() const; const SocketCallback& get_socket_callback() const; + const boost::optional& get_default_topic_configuration() const; + boost::optional& get_default_topic_configuration(); private: using HandlePtr = ClonablePtr; @@ -60,6 +65,7 @@ private: static HandlePtr make_handle(rd_kafka_conf_t* ptr); HandlePtr handle_; + boost::optional default_topic_config_; DeliveryReportCallback delivery_report_callback_; OffsetCommitCallback offset_commit_callback_; ErrorCallback error_callback_; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index aaf95e3..5cd6cbe 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -39,7 +39,6 @@ public: const Configuration& get_configuration() const; protected: KafkaHandleBase(Configuration config); - KafkaHandleBase(rd_kafka_t* handle); void set_handle(rd_kafka_t* handle); void check_error(rd_kafka_resp_err_t error); diff --git a/src/configuration.cpp b/src/configuration.cpp index 71228b4..e36a0e2 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -6,6 +6,9 @@ #include "consumer.h" using std::string; +using std::move; + +using boost::optional; using std::chrono::milliseconds; @@ -135,6 +138,10 @@ void Configuration::set_socket_callback(SocketCallback callback) { rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy); } +void Configuration::set_default_topic_configuration(optional config) { + default_topic_config_ = move(config); +} + rd_kafka_conf_t* Configuration::get_handle() const { return handle_.get(); } @@ -167,6 +174,14 @@ const Configuration::SocketCallback& Configuration::get_socket_callback() const return socket_callback_; } +const optional& Configuration::get_default_topic_configuration() const { + return default_topic_config_; +} + +optional& Configuration::get_default_topic_configuration() { + return default_topic_config_; +} + Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_conf_destroy, &rd_kafka_conf_dup); } diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 5e72af7..9b8f0d5 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -16,12 +16,12 @@ const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; KafkaHandleBase::KafkaHandleBase(Configuration config) : handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT), config_(move(config)) { - -} - -KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) -: handle_(handle, &rd_kafka_destroy), timeout_ms_(DEFAULT_TIMEOUT) { - + auto& maybe_config = config_.get_default_topic_configuration(); + if (maybe_config) { + maybe_config->set_as_opaque(); + auto conf_handle = rd_kafka_topic_conf_dup(maybe_config->get_handle()); + rd_kafka_conf_set_default_topic_conf(config_.get_handle(), conf_handle); + } } void KafkaHandleBase::pause_partitions(const TopicPartitionList& topic_partitions) { diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index ebc686c..eca7c0b 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -112,7 +112,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { // Now create a producer and produce a message Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); - string payload = "Hello world!"; + string payload = "Hello world! 1"; producer.produce(topic, partition, Buffer(payload.data(), payload.size())); runner.try_join(); @@ -137,7 +137,7 @@ TEST_F(ProducerTest, OneMessageUsingKey) { // Now create a producer and produce a message Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); - string payload = "Hello world!"; + string payload = "Hello world! 2"; string key = "such key"; producer.produce(topic, partition, Buffer(payload.data(), payload.size()), Buffer(key.data(), key.size())); @@ -196,7 +196,7 @@ TEST_F(ProducerTest, Callbacks) { ConsumerRunner runner(consumer, 1, 1); // Now create a producer and produce a message - string payload = "Hello world!"; + string payload = "Hello world! 3"; string key = "hehe"; bool deliver_report_called = false; Configuration config = make_producer_config(); @@ -231,3 +231,42 @@ TEST_F(ProducerTest, Callbacks) { EXPECT_EQ(0, message.get_error()); EXPECT_TRUE(deliver_report_called); } + +TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { + int partition = 0; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 1, 1); + + // Now create a producer and produce a message + string payload = "Hello world! 4"; + string key = "hehe"; + bool callback_called = false; + + Configuration config = make_producer_config(); + TopicConfiguration topic_config; + topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key, + int32_t partition_count) { + EXPECT_EQ(key, msg_key.as_string()); + EXPECT_EQ(3, partition_count); + EXPECT_EQ(KAFKA_TOPIC, topic.get_name()); + callback_called = true; + return 0; + }); + config.set_default_topic_configuration(topic_config); + + Producer producer(move(config)); + Topic topic = producer.get_topic(KAFKA_TOPIC); + producer.produce(topic, {}, Buffer(payload.data(), payload.size()), + Buffer(key.data(), key.size())); + producer.poll(); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(1, messages.size()); + const auto& message = messages[0]; + EXPECT_EQ(partition, message.get_partition()); + EXPECT_TRUE(callback_called); +}