Allow dumping all options in a configuration

This commit is contained in:
Matias Fontanini
2016-06-18 08:23:25 -07:00
parent 2532a2d614
commit 84d98b38e5
6 changed files with 53 additions and 4 deletions

View File

@@ -33,8 +33,6 @@
#include <memory>
#include <string>
#include <functional>
#include <unordered_map>
#include <unordered_set>
#include <chrono>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
@@ -149,6 +147,11 @@ public:
*/
std::string get(const std::string& name) const;
/**
* Gets all options, including default values which are set by rdkafka
*/
std::map<std::string, std::string> get_all() const;
/**
* Gets the delivery report callback
*/

View File

@@ -31,6 +31,7 @@
#define CPPKAFKA_CONFIGURATION_BASE_H
#include <string>
#include <map>
namespace cppkafka {
@@ -59,6 +60,14 @@ public:
void set(const std::string& name, const char* value) {
proxy_set(name, value);
}
protected:
static std::map<std::string, std::string> parse_dump(const char** values, size_t count) {
std::map<std::string, std::string> output;
for (size_t i = 0; i < count; i += 2) {
output[values[i]] = values[i + 1];
}
return output;
}
private:
void proxy_set(const std::string& name, const std::string& value) {
static_cast<Concrete&>(*this).set(name, value);

View File

@@ -109,6 +109,11 @@ public:
*/
std::string get(const std::string& name) const;
/**
* Gets all options, including default values which are set by rdkafka
*/
std::map<std::string, std::string> get_all() const;
/**
* Gets the rdkafka handle
*/

View File

@@ -36,6 +36,7 @@
#include "consumer.h"
using std::string;
using std::map;
using std::move;
using std::vector;
using std::unordered_set;
@@ -194,6 +195,14 @@ string Configuration::get(const string& name) const {
return string(buffer.data());
}
map<string, string> Configuration::get_all() const {
size_t count = 0;
const char** all = rd_kafka_conf_dump(handle_.get(), &count);
map<string, string> output = parse_dump(all, count);
rd_kafka_conf_dump_free(all, count);
return output;
}
const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
return delivery_report_callback_;
}

View File

@@ -35,6 +35,7 @@
#include "buffer.h"
using std::string;
using std::map;
using std::vector;
namespace cppkafka {
@@ -106,6 +107,14 @@ string TopicConfiguration::get(const string& name) const {
return string(buffer.data());
}
map<string, string> TopicConfiguration::get_all() const {
size_t count = 0;
const char** all = rd_kafka_topic_conf_dump(handle_.get(), &count);
map<string, string> output = parse_dump(all, count);
rd_kafka_conf_dump_free(all, count);
return output;
}
rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
return handle_.get();
}

View File

@@ -19,7 +19,7 @@ TEST_F(ConfigurationTest, GetSetConfig) {
TEST_F(ConfigurationTest, GetSetTopicConfig) {
TopicConfiguration config;
config.set("auto.commit.enable", "true");
config.set("auto.commit.enable", true);
EXPECT_EQ("true", config.get("auto.commit.enable"));
EXPECT_THROW(config.get("asd"), ConfigOptionNotFound);
@@ -33,3 +33,17 @@ TEST_F(ConfigurationTest, SetOverloads) {
EXPECT_EQ("true", config.get("enable.auto.commit"));
EXPECT_EQ("100", config.get("auto.commit.interval.ms"));
}
TEST_F(ConfigurationTest, GetAll) {
Configuration config;
config.set("enable.auto.commit", false);
auto option_map = config.get_all();
EXPECT_EQ("false", option_map.at("enable.auto.commit"));
}
TEST_F(ConfigurationTest, TopicGetAll) {
TopicConfiguration config;
config.set("auto.commit.enable", false);
auto option_map = config.get_all();
EXPECT_EQ("false", option_map.at("auto.commit.enable"));
}