From 84d98b38e5742e072bc9d23521f4cecbe2265889 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 18 Jun 2016 08:23:25 -0700 Subject: [PATCH] Allow dumping all options in a configuration --- include/cppkafka/configuration.h | 7 +++++-- include/cppkafka/configuration_base.h | 9 +++++++++ include/cppkafka/topic_configuration.h | 5 +++++ src/configuration.cpp | 9 +++++++++ src/topic_configuration.cpp | 9 +++++++++ tests/configuration_test.cpp | 18 ++++++++++++++++-- 6 files changed, 53 insertions(+), 4 deletions(-) diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 0e24e6d..0b881ea 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -33,8 +33,6 @@ #include #include #include -#include -#include #include #include #include @@ -149,6 +147,11 @@ public: */ std::string get(const std::string& name) const; + /** + * Gets all options, including default values which are set by rdkafka + */ + std::map get_all() const; + /** * Gets the delivery report callback */ diff --git a/include/cppkafka/configuration_base.h b/include/cppkafka/configuration_base.h index be57835..06c9f2e 100644 --- a/include/cppkafka/configuration_base.h +++ b/include/cppkafka/configuration_base.h @@ -31,6 +31,7 @@ #define CPPKAFKA_CONFIGURATION_BASE_H #include +#include namespace cppkafka { @@ -59,6 +60,14 @@ public: void set(const std::string& name, const char* value) { proxy_set(name, value); } +protected: + static std::map parse_dump(const char** values, size_t count) { + std::map output; + for (size_t i = 0; i < count; i += 2) { + output[values[i]] = values[i + 1]; + } + return output; + } private: void proxy_set(const std::string& name, const std::string& value) { static_cast(*this).set(name, value); diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index 11029df..6599308 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -109,6 +109,11 @@ public: */ std::string get(const std::string& name) const; + /** + * Gets all options, including default values which are set by rdkafka + */ + std::map get_all() const; + /** * Gets the rdkafka handle */ diff --git a/src/configuration.cpp b/src/configuration.cpp index 51b8e1b..c9886e9 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -36,6 +36,7 @@ #include "consumer.h" using std::string; +using std::map; using std::move; using std::vector; using std::unordered_set; @@ -194,6 +195,14 @@ string Configuration::get(const string& name) const { return string(buffer.data()); } +map Configuration::get_all() const { + size_t count = 0; + const char** all = rd_kafka_conf_dump(handle_.get(), &count); + map output = parse_dump(all, count); + rd_kafka_conf_dump_free(all, count); + return output; +} + const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { return delivery_report_callback_; } diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index a407a67..128fcd5 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -35,6 +35,7 @@ #include "buffer.h" using std::string; +using std::map; using std::vector; namespace cppkafka { @@ -106,6 +107,14 @@ string TopicConfiguration::get(const string& name) const { return string(buffer.data()); } +map TopicConfiguration::get_all() const { + size_t count = 0; + const char** all = rd_kafka_topic_conf_dump(handle_.get(), &count); + map output = parse_dump(all, count); + rd_kafka_conf_dump_free(all, count); + return output; +} + rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { return handle_.get(); } diff --git a/tests/configuration_test.cpp b/tests/configuration_test.cpp index e8cad5c..69475b8 100644 --- a/tests/configuration_test.cpp +++ b/tests/configuration_test.cpp @@ -19,7 +19,7 @@ TEST_F(ConfigurationTest, GetSetConfig) { TEST_F(ConfigurationTest, GetSetTopicConfig) { TopicConfiguration config; - config.set("auto.commit.enable", "true"); + config.set("auto.commit.enable", true); EXPECT_EQ("true", config.get("auto.commit.enable")); EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); @@ -32,4 +32,18 @@ TEST_F(ConfigurationTest, SetOverloads) { EXPECT_EQ("true", config.get("enable.auto.commit")); EXPECT_EQ("100", config.get("auto.commit.interval.ms")); -} \ No newline at end of file +} + +TEST_F(ConfigurationTest, GetAll) { + Configuration config; + config.set("enable.auto.commit", false); + auto option_map = config.get_all(); + EXPECT_EQ("false", option_map.at("enable.auto.commit")); +} + +TEST_F(ConfigurationTest, TopicGetAll) { + TopicConfiguration config; + config.set("auto.commit.enable", false); + auto option_map = config.get_all(); + EXPECT_EQ("false", option_map.at("auto.commit.enable")); +}