From c300a9bf3511f078450522d61cafb0b9835989b3 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Wed, 22 Jun 2016 20:03:28 -0700 Subject: [PATCH] Allow chaining set calls on Configuration and TopicConfiguration --- include/cppkafka/configuration.h | 18 ++++++++--------- include/cppkafka/configuration_base.h | 16 +++++++-------- include/cppkafka/topic_configuration.h | 6 +++--- src/configuration.cpp | 28 +++++++++++++++++--------- src/topic_configuration.cpp | 9 ++++++--- tests/configuration_test.cpp | 6 ++++-- 6 files changed, 49 insertions(+), 34 deletions(-) diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index d717246..6b1907d 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -90,47 +90,47 @@ public: * \param name The name of the attribute * \param value The value of the attribute */ - void set(const std::string& name, const std::string& value); + Configuration& set(const std::string& name, const std::string& value); /** * Sets the delivery report callback (invokes rd_kafka_conf_set_dr_msg_cb) */ - void set_delivery_report_callback(DeliveryReportCallback callback); + Configuration& set_delivery_report_callback(DeliveryReportCallback callback); /** * Sets the offset commit callback (invokes rd_kafka_conf_set_offset_commit_cb) */ - void set_offset_commit_callback(OffsetCommitCallback callback); + Configuration& set_offset_commit_callback(OffsetCommitCallback callback); /** * Sets the error callback (invokes rd_kafka_conf_set_error_cb) */ - void set_error_callback(ErrorCallback callback); + Configuration& set_error_callback(ErrorCallback callback); /** * Sets the throttle callback (invokes rd_kafka_conf_set_throttle_cb) */ - void set_throttle_callback(ThrottleCallback callback); + Configuration& set_throttle_callback(ThrottleCallback callback); /** * Sets the log callback (invokes rd_kafka_conf_set_log_cb) */ - void set_log_callback(LogCallback callback); + Configuration& set_log_callback(LogCallback callback); /** * Sets the stats callback (invokes rd_kafka_conf_set_stats_cb) */ - void set_stats_callback(StatsCallback callback); + Configuration& set_stats_callback(StatsCallback callback); /** * Sets the socket callback (invokes rd_kafka_conf_set_socket_cb) */ - void set_socket_callback(SocketCallback callback); + Configuration& set_socket_callback(SocketCallback callback); /** * Sets the default topic configuration */ - void set_default_topic_configuration(boost::optional config); + Configuration& set_default_topic_configuration(boost::optional config); /** * Returns true iff the given property name has been set diff --git a/include/cppkafka/configuration_base.h b/include/cppkafka/configuration_base.h index 0c56ca5..4812fe5 100644 --- a/include/cppkafka/configuration_base.h +++ b/include/cppkafka/configuration_base.h @@ -45,8 +45,8 @@ public: /** * Sets a bool value */ - void set(const std::string& name, bool value) { - proxy_set(name, value ? "true" : "false"); + Concrete& set(const std::string& name, bool value) { + return proxy_set(name, value ? "true" : "false"); } /** @@ -54,15 +54,15 @@ public: */ template ::value>::type> - void set(const std::string& name, T value) { - proxy_set(name, std::to_string(value)); + Concrete& set(const std::string& name, T value) { + return proxy_set(name, std::to_string(value)); } /** * Sets a cstring value */ - void set(const std::string& name, const char* value) { - proxy_set(name, value); + Concrete& set(const std::string& name, const char* value) { + return proxy_set(name, value); } /** @@ -92,8 +92,8 @@ protected: return output; } private: - void proxy_set(const std::string& name, const std::string& value) { - static_cast(*this).set(name, value); + Concrete& proxy_set(const std::string& name, const std::string& value) { + return static_cast(*this).set(name, value); } static std::string convert(const std::string& value, Type2Type) { diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index c996cbb..a8a9efd 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -77,14 +77,14 @@ public: * \param name The name of the option * \param value The value of the option */ - void set(const std::string& name, const std::string& value); + TopicConfiguration& set(const std::string& name, const std::string& value); /** * \brief Sets the partitioner callback * * This translates into a call to rd_kafka_topic_conf_set_partitioner_cb */ - void set_partitioner_callback(PartitionerCallback callback); + TopicConfiguration& set_partitioner_callback(PartitionerCallback callback); /** * \brief Sets the "this" pointer as the opaque pointer for this handle @@ -92,7 +92,7 @@ public: * This method will be called by consumers/producers when the topic configuration object * has been put in a persistent memory location. Users of cppkafka do not need to use this. */ - void set_as_opaque(); + TopicConfiguration& set_as_opaque(); /** * Gets the partitioner callback diff --git a/src/configuration.cpp b/src/configuration.cpp index a73b33d..fb8fd70 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -125,7 +125,7 @@ Configuration::Configuration(rd_kafka_conf_t* ptr) } -void Configuration::set(const string& name, const string& value) { +Configuration& Configuration::set(const string& name, const string& value) { char error_buffer[512]; rd_kafka_conf_res_t result; result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, @@ -133,45 +133,55 @@ void Configuration::set(const string& name, const string& value) { if (result != RD_KAFKA_CONF_OK) { throw ConfigException(name, error_buffer); } + return *this; } -void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { +Configuration& Configuration::set_delivery_report_callback(DeliveryReportCallback callback) { delivery_report_callback_ = move(callback); rd_kafka_conf_set_dr_msg_cb(handle_.get(), &delivery_report_callback_proxy); + return *this; } -void Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { +Configuration& Configuration::set_offset_commit_callback(OffsetCommitCallback callback) { offset_commit_callback_ = move(callback); rd_kafka_conf_set_offset_commit_cb(handle_.get(), &offset_commit_callback_proxy); + return *this; } -void Configuration::set_error_callback(ErrorCallback callback) { +Configuration& Configuration::set_error_callback(ErrorCallback callback) { error_callback_ = move(callback); rd_kafka_conf_set_error_cb(handle_.get(), &error_callback_proxy); + return *this; } -void Configuration::set_throttle_callback(ThrottleCallback callback) { +Configuration& Configuration::set_throttle_callback(ThrottleCallback callback) { throttle_callback_ = move(callback); rd_kafka_conf_set_throttle_cb(handle_.get(), &throttle_callback_proxy); + return *this; } -void Configuration::set_log_callback(LogCallback callback) { +Configuration& Configuration::set_log_callback(LogCallback callback) { log_callback_ = move(callback); rd_kafka_conf_set_log_cb(handle_.get(), &log_callback_proxy); + return *this; } -void Configuration::set_stats_callback(StatsCallback callback) { +Configuration& Configuration::set_stats_callback(StatsCallback callback) { stats_callback_ = move(callback); rd_kafka_conf_set_stats_cb(handle_.get(), &stats_callback_proxy); + return *this; } -void Configuration::set_socket_callback(SocketCallback callback) { +Configuration& Configuration::set_socket_callback(SocketCallback callback) { socket_callback_ = move(callback); rd_kafka_conf_set_socket_cb(handle_.get(), &socket_callback_proxy); + return *this; } -void Configuration::set_default_topic_configuration(optional config) { +Configuration& +Configuration::set_default_topic_configuration(optional config) { default_topic_config_ = std::move(config); + return *this; } bool Configuration::has_property(const string& name) const { diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index 128fcd5..b9bc48e 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -67,7 +67,7 @@ TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr) } -void TopicConfiguration::set(const string& name, const string& value) { +TopicConfiguration& TopicConfiguration::set(const string& name, const string& value) { char error_buffer[512]; rd_kafka_conf_res_t result; result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer, @@ -75,15 +75,18 @@ void TopicConfiguration::set(const string& name, const string& value) { if (result != RD_KAFKA_CONF_OK) { throw ConfigException(name, error_buffer); } + return *this; } -void TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) { +TopicConfiguration& TopicConfiguration::set_partitioner_callback(PartitionerCallback callback) { partitioner_callback_ = move(callback); rd_kafka_topic_conf_set_partitioner_cb(handle_.get(), &partitioner_callback_proxy); + return *this; } -void TopicConfiguration::set_as_opaque() { +TopicConfiguration& TopicConfiguration::set_as_opaque() { rd_kafka_topic_conf_set_opaque(handle_.get(), this); + return *this; } const TopicConfiguration::PartitionerCallback& diff --git a/tests/configuration_test.cpp b/tests/configuration_test.cpp index 1127a2f..90b461d 100644 --- a/tests/configuration_test.cpp +++ b/tests/configuration_test.cpp @@ -13,8 +13,9 @@ public: TEST_F(ConfigurationTest, GetSetConfig) { Configuration config; - config.set("group.id", "foo"); + config.set("group.id", "foo").set("metadata.broker.list", "asd:9092"); EXPECT_EQ("foo", config.get("group.id")); + EXPECT_EQ("asd:9092", config.get("metadata.broker.list")); EXPECT_EQ("foo", config.get("group.id")); EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); @@ -22,8 +23,9 @@ TEST_F(ConfigurationTest, GetSetConfig) { TEST_F(ConfigurationTest, GetSetTopicConfig) { TopicConfiguration config; - config.set("auto.commit.enable", true); + config.set("auto.commit.enable", true).set("offset.store.method", "broker"); EXPECT_EQ("true", config.get("auto.commit.enable")); + EXPECT_EQ("broker", config.get("offset.store.method")); EXPECT_EQ(true, config.get("auto.commit.enable")); EXPECT_THROW(config.get("asd"), ConfigOptionNotFound);