Allow building a configuration from an option list

This commit is contained in:
Matias Fontanini
2016-06-25 09:24:18 -07:00
parent a2e7f6db0d
commit c878049253
14 changed files with 256 additions and 16 deletions

View File

@@ -29,8 +29,9 @@ using namespace cppkafka;
int main() {
// Create the config
Configuration config;
config.set("metadata.broker.list", "127.0.0.1:2181");
Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" }
};
// Create the producer
Producer producer(config);

View File

@@ -52,11 +52,12 @@ int main(int argc, char* argv[]) {
signal(SIGINT, [](int) { running = false; });
// Construct the configuration
Configuration config;
config.set("metadata.broker.list", brokers);
config.set("group.id", group_id);
Configuration config = {
{ "metadata.broker.list", brokers },
{ "group.id", group_id },
// Disable auto commit
config.set("enable.auto.commit", false);
{ "enable.auto.commit", false }
};
// Create the consumer
Consumer consumer(config);

View File

@@ -55,8 +55,9 @@ int main(int argc, char* argv[]) {
}
// Construct the configuration
Configuration config;
config.set("metadata.broker.list", brokers);
Configuration config = {
{ "metadata.broker.list", brokers }
};
// Create the producer
Producer producer(config);

View File

@@ -33,6 +33,7 @@
#include <memory>
#include <string>
#include <functional>
#include <initializer_list>
#include <chrono>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
@@ -82,6 +83,16 @@ public:
*/
Configuration();
/**
* Constructs a Configuration object using a list of options
*/
Configuration(const std::vector<ConfigurationOption>& options);
/**
* Constructs a Configuration object using a list of options
*/
Configuration(const std::initializer_list<ConfigurationOption>& options);
/**
* \brief Sets an attribute.
*
@@ -130,7 +141,7 @@ public:
/**
* Sets the default topic configuration
*/
Configuration& set_default_topic_configuration(boost::optional<TopicConfiguration> config);
Configuration& set_default_topic_configuration(TopicConfiguration config);
/**
* Returns true iff the given property name has been set

View File

@@ -32,7 +32,9 @@
#include <string>
#include <map>
#include <vector>
#include "exceptions.h"
#include "configuration_option.h"
namespace cppkafka {
@@ -65,6 +67,16 @@ public:
return proxy_set(name, value);
}
/**
* Sets a list of options
*/
Concrete& set(const std::vector<ConfigurationOption>& options) {
for (const auto& option : options) {
proxy_set(option.get_key(), option.get_value());
}
return static_cast<Concrete&>(*this);
}
/**
* \brief Gets a value, converting it to the given type.
*

View File

@@ -0,0 +1,84 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_CONFIGURATION_OPTION_H
#define CPPKAFKA_CONFIGURATION_OPTION_H
#include <string>
#include <type_traits>
namespace cppkafka {
/**
* Wrapper over a configuration (key, value) pair
*/
class ConfigurationOption {
public:
/**
* Construct using a std::string value
*/
ConfigurationOption(const std::string& key, const std::string& value);
/**
* Construct using a const char* value
*/
ConfigurationOption(const std::string& key, const char* value);
/**
* Construct using a bool value
*/
ConfigurationOption(const std::string& key, bool value);
/**
* Construct using any integral value
*/
template <typename T,
typename = typename std::enable_if<std::is_integral<T>::value>::type>
ConfigurationOption(const std::string& key, T value)
: ConfigurationOption(key, std::to_string(value)) {
}
/**
* Gets the key
*/
const std::string& get_key() const;
/**
* Gets the value
*/
const std::string& get_value() const;
private:
std::string key_;
std::string value_;
};
} // cppkafka
#endif // CPPKAFKA_CONFIGURATION_OPTION_H

View File

@@ -54,9 +54,10 @@ class TopicConfiguration;
*
* \code
* // Create a configuration and set the group.id and broker list fields
* Configuration config;
* config.set("metadata.broker.list", "127.0.0.1:9092");
* config.set("group.id", "foo");
* Configuration config = {
* { "metadata.broker.list", "127.0.0.1:9092" },
* { "group.id", "foo" }
* };
*
* // Create a consumer
* Consumer consumer(config);

View File

@@ -57,8 +57,9 @@ class TopicConfiguration;
*
* \code
* // Set the broker list
* Configuration config;
* config.set("metadata.broker.list", "127.0.0.1:9092");
* Configuration config = {
* { "metadata.broker.list", "127.0.0.1:9092" }
* };
*
* // Create a producer
* Producer producer(config);

View File

@@ -32,6 +32,7 @@
#include <string>
#include <functional>
#include <initializer_list>
#include <librdkafka/rdkafka.h>
#include "clonable_ptr.h"
#include "configuration_base.h"
@@ -71,6 +72,16 @@ public:
*/
TopicConfiguration();
/**
* Constructs a TopicConfiguration object using a list of options
*/
TopicConfiguration(const std::vector<ConfigurationOption>& options);
/**
* Constructs a TopicConfiguration object using a list of options
*/
TopicConfiguration(const std::initializer_list<ConfigurationOption>& options);
/**
* Sets an option
*

View File

@@ -1,6 +1,7 @@
set(SOURCES
configuration.cpp
topic_configuration.cpp
configuration_option.cpp
exceptions.cpp
topic.cpp
partition.cpp

View File

@@ -39,6 +39,7 @@ using std::string;
using std::map;
using std::move;
using std::vector;
using std::initializer_list;
using boost::optional;
@@ -120,6 +121,16 @@ Configuration::Configuration()
}
Configuration::Configuration(const vector<ConfigurationOption>& options)
: Configuration() {
set(options);
}
Configuration::Configuration(const initializer_list<ConfigurationOption>& options)
: Configuration() {
set(options);
}
Configuration::Configuration(rd_kafka_conf_t* ptr)
: handle_(make_handle(ptr)) {
@@ -179,7 +190,7 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) {
}
Configuration&
Configuration::set_default_topic_configuration(optional<TopicConfiguration> config) {
Configuration::set_default_topic_configuration(TopicConfiguration config) {
default_topic_config_ = std::move(config);
return *this;
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "configuration_option.h"
using std::string;
namespace cppkafka {
ConfigurationOption::ConfigurationOption(const string& key, const string& value)
: key_(key), value_(value) {
}
ConfigurationOption::ConfigurationOption(const string& key, const char* value)
: key_(key), value_(value) {
}
ConfigurationOption::ConfigurationOption(const string& key, bool value)
: key_(key), value_(value ? "true" : "false") {
}
const string& ConfigurationOption::get_key() const {
return key_;
}
const string& ConfigurationOption::get_value() const {
return value_;
}
} // cppkafka

View File

@@ -37,6 +37,7 @@
using std::string;
using std::map;
using std::vector;
using std::initializer_list;
namespace cppkafka {
@@ -62,6 +63,16 @@ TopicConfiguration::TopicConfiguration()
}
TopicConfiguration::TopicConfiguration(const vector<ConfigurationOption>& options)
: TopicConfiguration() {
set(options);
}
TopicConfiguration::TopicConfiguration(const initializer_list<ConfigurationOption>& options)
: TopicConfiguration() {
set(options);
}
TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr)
: handle_(make_handle(ptr)) {

View File

@@ -31,6 +31,41 @@ TEST_F(ConfigurationTest, GetSetTopicConfig) {
EXPECT_THROW(config.get("asd"), ConfigOptionNotFound);
}
TEST_F(ConfigurationTest, ConfigSetMultiple) {
Configuration config = {
{ "group.id", "foo" },
{ "metadata.broker.list", string("asd:9092") },
{ "message.max.bytes", 2000 },
{ "topic.metadata.refresh.sparse", true }
};
EXPECT_EQ("foo", config.get("group.id"));
EXPECT_EQ("asd:9092", config.get("metadata.broker.list"));
EXPECT_EQ(2000, config.get<int>("message.max.bytes"));
EXPECT_EQ(true, config.get<bool>("topic.metadata.refresh.sparse"));
}
TEST_F(ConfigurationTest, TopicConfigSetMultiple) {
TopicConfiguration config = {
{ "compression.codec", "none" },
{ "offset.store.method", string("file") },
{ "request.required.acks", 2 },
{ "produce.offset.report", true }
};
EXPECT_EQ("none", config.get("compression.codec"));
EXPECT_EQ("file", config.get("offset.store.method"));
EXPECT_EQ(2, config.get<int>("request.required.acks"));
EXPECT_EQ(true, config.get<bool>("produce.offset.report"));
}
TEST_F(ConfigurationTest, SetDefaultTopicConfiguration) {
Configuration config;
config.set_default_topic_configuration({{ "request.required.acks", 2 }});
const auto& topic_config = config.get_default_topic_configuration();
EXPECT_TRUE(topic_config);
EXPECT_EQ(2, topic_config->get<int>("request.required.acks"));
}
TEST_F(ConfigurationTest, SetOverloads) {
Configuration config;
config.set("enable.auto.commit", true);