From 65a60f1690a11f82e39c22cfd7e5d7f9d28c3c1c Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 5 Jun 2016 15:08:40 -0700 Subject: [PATCH] Allow getting config options and add multiple overloads for set --- include/cppkafka/configuration.h | 6 ++++- include/cppkafka/configuration_base.h | 33 ++++++++++++++++++++++++ include/cppkafka/exceptions.h | 5 ++++ include/cppkafka/topic_configuration.h | 6 ++++- src/configuration.cpp | 13 ++++++++++ src/exceptions.cpp | 7 ++++++ src/topic_configuration.cpp | 13 ++++++++++ tests/CMakeLists.txt | 1 + tests/configuration_test.cpp | 35 ++++++++++++++++++++++++++ tests/consumer_test.cpp | 2 +- tests/producer_test.cpp | 4 +-- 11 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 include/cppkafka/configuration_base.h create mode 100644 tests/configuration_test.cpp diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index aa9970e..05bc3cf 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -10,6 +10,7 @@ #include "topic_partition_list.h" #include "topic_configuration.h" #include "clonable_ptr.h" +#include "configuration_base.h" namespace cppkafka { @@ -18,7 +19,7 @@ class Producer; class Consumer; class KafkaHandleBase; -class Configuration { +class Configuration : public ConfigurationBase { public: using DeliveryReportCallback = std::function; using OffsetCommitCallback = std::function; using SocketCallback = std::function; + using ConfigurationBase::set; + Configuration(); void set(const std::string& name, const std::string& value); @@ -48,6 +51,7 @@ public: void set_default_topic_configuration(boost::optional config); rd_kafka_conf_t* get_handle() const; + std::string get(const std::string& name) const; const DeliveryReportCallback& get_delivery_report_callback() const; const OffsetCommitCallback& get_offset_commit_callback() const; const ErrorCallback& get_error_callback() const; diff --git a/include/cppkafka/configuration_base.h b/include/cppkafka/configuration_base.h new file mode 100644 index 0000000..be5d0c1 --- /dev/null +++ b/include/cppkafka/configuration_base.h @@ -0,0 +1,33 @@ +#ifndef CPPKAFKA_CONFIGURATION_BASE_H +#define CPPKAFKA_CONFIGURATION_BASE_H + +#include + +namespace cppkafka { + +template +class ConfigurationBase { +public: + void set(const std::string& name, bool value) { + proxy_set(name, value ? "true" : "false"); + } + + // Overload for any integral value + template ::value>::type> + void set(const std::string& name, T value) { + proxy_set(name, std::to_string(value)); + } + + void set(const std::string& name, const char* value) { + proxy_set(name, value); + } +private: + void proxy_set(const std::string& name, const std::string& value) { + static_cast(*this).set(name, value); + } +}; + +} // cppkafka + +#endif // CPPKAFKA_CONFIGURATION_BASE_H diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index b3165e9..c0a269a 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -21,6 +21,11 @@ public: ConfigException(const std::string& config_name, const std::string& error); }; +class ConfigOptionNotFound : public Exception { +public: + ConfigOptionNotFound(const std::string& config_name); +}; + class HandleException : public Exception { public: HandleException(rd_kafka_resp_err_t error_code); diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index 3e97496..f9920f9 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -5,17 +5,20 @@ #include #include #include "clonable_ptr.h" +#include "configuration_base.h" namespace cppkafka { class Topic; class Buffer; -class TopicConfiguration { +class TopicConfiguration : public ConfigurationBase { public: using PartitionerCallback = std::function; + using ConfigurationBase::set; + TopicConfiguration(); void set(const std::string& name, const std::string& value); @@ -26,6 +29,7 @@ public: const PartitionerCallback& get_partitioner_callback() const; rd_kafka_topic_conf_t* get_handle() const; + std::string get(const std::string& name) const; private: using HandlePtr = ClonablePtr #include #include "exceptions.h" #include "message.h" @@ -7,6 +8,7 @@ using std::string; using std::move; +using std::vector; using boost::optional; @@ -146,6 +148,17 @@ rd_kafka_conf_t* Configuration::get_handle() const { return handle_.get(); } +string Configuration::get(const string& name) const { + size_t size = 0; + auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigOptionNotFound(name); + } + vector buffer(size); + rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); + return string(buffer.data()); +} + const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { return delivery_report_callback_; } diff --git a/src/exceptions.cpp b/src/exceptions.cpp index cfed337..bb12ba7 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -22,6 +22,13 @@ ConfigException::ConfigException(const string& config_name, const string& error) } +// ConfigOptionNotFound + +ConfigOptionNotFound::ConfigOptionNotFound(const string& config_name) +: Exception(config_name + " not found") { + +} + // HandleException HandleException::HandleException(rd_kafka_resp_err_t error_code) diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index d2a58ba..f949e90 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -1,10 +1,12 @@ #include "topic_configuration.h" +#include #include #include "exceptions.h" #include "topic.h" #include "buffer.h" using std::string; +using std::vector; namespace cppkafka { @@ -63,6 +65,17 @@ rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { return handle_.get(); } +string TopicConfiguration::get(const string& name) const { + size_t size = 0; + auto result = rd_kafka_topic_conf_get(handle_.get(), name.data(), nullptr, &size); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigOptionNotFound(name); + } + vector buffer(size); + rd_kafka_topic_conf_get(handle_.get(), name.data(), buffer.data(), &size); + return string(buffer.data()); +} + TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_topic_conf_destroy, &rd_kafka_topic_conf_dup); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 88fb872..13661d8 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -18,3 +18,4 @@ create_test(consumer) create_test(producer) create_test(kafka_handle_base) create_test(topic_partition_list) +create_test(configuration) diff --git a/tests/configuration_test.cpp b/tests/configuration_test.cpp new file mode 100644 index 0000000..e8cad5c --- /dev/null +++ b/tests/configuration_test.cpp @@ -0,0 +1,35 @@ +#include +#include "cppkafka/configuration.h" +#include "cppkafka/exceptions.h" + +using namespace cppkafka; + +class ConfigurationTest : public testing::Test { +public: + +}; + +TEST_F(ConfigurationTest, GetSetConfig) { + Configuration config; + config.set("group.id", "foo"); + EXPECT_EQ("foo", config.get("group.id")); + + EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); +} + +TEST_F(ConfigurationTest, GetSetTopicConfig) { + TopicConfiguration config; + config.set("auto.commit.enable", "true"); + EXPECT_EQ("true", config.get("auto.commit.enable")); + + EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); +} + +TEST_F(ConfigurationTest, SetOverloads) { + Configuration config; + config.set("enable.auto.commit", true); + config.set("auto.commit.interval.ms", 100); + + EXPECT_EQ("true", config.get("enable.auto.commit")); + EXPECT_EQ("100", config.get("auto.commit.interval.ms")); +} \ No newline at end of file diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index c65a5b3..605bfc2 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -93,7 +93,7 @@ public: Configuration make_consumer_config(const string& group_id = "consumer_test") { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); - config.set("enable.auto.commit", "false"); + config.set("enable.auto.commit", false); config.set("group.id", group_id); return config; } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index eca7c0b..5b80c27 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -93,7 +93,7 @@ public: Configuration make_consumer_config() { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); - config.set("enable.auto.commit", "false"); + config.set("enable.auto.commit", false); config.set("group.id", "producer_test"); return config; } @@ -268,5 +268,5 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { ASSERT_EQ(1, messages.size()); const auto& message = messages[0]; EXPECT_EQ(partition, message.get_partition()); - EXPECT_TRUE(callback_called); + EXPECT_TRUE(callback_called); }