diff --git a/README.md b/README.md index ef7cc19..de487ca 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,9 @@ using namespace cppkafka; int main() { // Create the config - Configuration config; - config.set("metadata.broker.list", "127.0.0.1:2181"); + Configuration config = { + { "metadata.broker.list", "127.0.0.1:9092" } + }; // Create the producer Producer producer(config); diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp index 6d82289..59bf2b0 100644 --- a/examples/kafka_consumer.cpp +++ b/examples/kafka_consumer.cpp @@ -52,11 +52,12 @@ int main(int argc, char* argv[]) { signal(SIGINT, [](int) { running = false; }); // Construct the configuration - Configuration config; - config.set("metadata.broker.list", brokers); - config.set("group.id", group_id); - // Disable auto commit - config.set("enable.auto.commit", false); + Configuration config = { + { "metadata.broker.list", brokers }, + { "group.id", group_id }, + // Disable auto commit + { "enable.auto.commit", false } + }; // Create the consumer Consumer consumer(config); diff --git a/examples/kafka_producer.cpp b/examples/kafka_producer.cpp index ebbee8d..65b7b27 100644 --- a/examples/kafka_producer.cpp +++ b/examples/kafka_producer.cpp @@ -55,8 +55,9 @@ int main(int argc, char* argv[]) { } // Construct the configuration - Configuration config; - config.set("metadata.broker.list", brokers); + Configuration config = { + { "metadata.broker.list", brokers } + }; // Create the producer Producer producer(config); diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 6b1907d..df53254 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -82,6 +83,16 @@ public: */ Configuration(); + /** + * Constructs a Configuration object using a list of options + */ + Configuration(const std::vector& options); + + /** + * Constructs a Configuration object using a list of options + */ + Configuration(const std::initializer_list& options); + /** * \brief Sets an attribute. * @@ -130,7 +141,7 @@ public: /** * Sets the default topic configuration */ - Configuration& set_default_topic_configuration(boost::optional config); + Configuration& set_default_topic_configuration(TopicConfiguration config); /** * Returns true iff the given property name has been set diff --git a/include/cppkafka/configuration_base.h b/include/cppkafka/configuration_base.h index 4812fe5..b3b08fb 100644 --- a/include/cppkafka/configuration_base.h +++ b/include/cppkafka/configuration_base.h @@ -32,7 +32,9 @@ #include #include +#include #include "exceptions.h" +#include "configuration_option.h" namespace cppkafka { @@ -65,6 +67,16 @@ public: return proxy_set(name, value); } + /** + * Sets a list of options + */ + Concrete& set(const std::vector& options) { + for (const auto& option : options) { + proxy_set(option.get_key(), option.get_value()); + } + return static_cast(*this); + } + /** * \brief Gets a value, converting it to the given type. * diff --git a/include/cppkafka/configuration_option.h b/include/cppkafka/configuration_option.h new file mode 100644 index 0000000..66205fc --- /dev/null +++ b/include/cppkafka/configuration_option.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_CONFIGURATION_OPTION_H +#define CPPKAFKA_CONFIGURATION_OPTION_H + +#include +#include + +namespace cppkafka { + +/** + * Wrapper over a configuration (key, value) pair + */ +class ConfigurationOption { +public: + /** + * Construct using a std::string value + */ + ConfigurationOption(const std::string& key, const std::string& value); + + /** + * Construct using a const char* value + */ + ConfigurationOption(const std::string& key, const char* value); + + /** + * Construct using a bool value + */ + ConfigurationOption(const std::string& key, bool value); + + /** + * Construct using any integral value + */ + template ::value>::type> + ConfigurationOption(const std::string& key, T value) + : ConfigurationOption(key, std::to_string(value)) { + + } + + /** + * Gets the key + */ + const std::string& get_key() const; + + /** + * Gets the value + */ + const std::string& get_value() const; +private: + std::string key_; + std::string value_; +}; + +} // cppkafka + +#endif // CPPKAFKA_CONFIGURATION_OPTION_H diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 8da9c76..b364b17 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -54,9 +54,10 @@ class TopicConfiguration; * * \code * // Create a configuration and set the group.id and broker list fields - * Configuration config; - * config.set("metadata.broker.list", "127.0.0.1:9092"); - * config.set("group.id", "foo"); + * Configuration config = { + * { "metadata.broker.list", "127.0.0.1:9092" }, + * { "group.id", "foo" } + * }; * * // Create a consumer * Consumer consumer(config); diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index b552e51..f7d99a3 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -57,8 +57,9 @@ class TopicConfiguration; * * \code * // Set the broker list - * Configuration config; - * config.set("metadata.broker.list", "127.0.0.1:9092"); + * Configuration config = { + * { "metadata.broker.list", "127.0.0.1:9092" } + * }; * * // Create a producer * Producer producer(config); diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index a8a9efd..41cd059 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -32,6 +32,7 @@ #include #include +#include #include #include "clonable_ptr.h" #include "configuration_base.h" @@ -71,6 +72,16 @@ public: */ TopicConfiguration(); + /** + * Constructs a TopicConfiguration object using a list of options + */ + TopicConfiguration(const std::vector& options); + + /** + * Constructs a TopicConfiguration object using a list of options + */ + TopicConfiguration(const std::initializer_list& options); + /** * Sets an option * diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a0385d0..b3319f5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,6 +1,7 @@ set(SOURCES configuration.cpp topic_configuration.cpp + configuration_option.cpp exceptions.cpp topic.cpp partition.cpp diff --git a/src/configuration.cpp b/src/configuration.cpp index fb8fd70..5f6c27d 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -39,6 +39,7 @@ using std::string; using std::map; using std::move; using std::vector; +using std::initializer_list; using boost::optional; @@ -120,6 +121,16 @@ Configuration::Configuration() } +Configuration::Configuration(const vector& options) +: Configuration() { + set(options); +} + +Configuration::Configuration(const initializer_list& options) +: Configuration() { + set(options); +} + Configuration::Configuration(rd_kafka_conf_t* ptr) : handle_(make_handle(ptr)) { @@ -179,7 +190,7 @@ Configuration& Configuration::set_socket_callback(SocketCallback callback) { } Configuration& -Configuration::set_default_topic_configuration(optional config) { +Configuration::set_default_topic_configuration(TopicConfiguration config) { default_topic_config_ = std::move(config); return *this; } diff --git a/src/configuration_option.cpp b/src/configuration_option.cpp new file mode 100644 index 0000000..926130d --- /dev/null +++ b/src/configuration_option.cpp @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "configuration_option.h" + +using std::string; + +namespace cppkafka { + +ConfigurationOption::ConfigurationOption(const string& key, const string& value) +: key_(key), value_(value) { + +} + +ConfigurationOption::ConfigurationOption(const string& key, const char* value) +: key_(key), value_(value) { + +} + +ConfigurationOption::ConfigurationOption(const string& key, bool value) +: key_(key), value_(value ? "true" : "false") { + +} + +const string& ConfigurationOption::get_key() const { + return key_; +} + +const string& ConfigurationOption::get_value() const { + return value_; +} + +} // cppkafka diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index b9bc48e..e95acfc 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -37,6 +37,7 @@ using std::string; using std::map; using std::vector; +using std::initializer_list; namespace cppkafka { @@ -62,6 +63,16 @@ TopicConfiguration::TopicConfiguration() } +TopicConfiguration::TopicConfiguration(const vector& options) +: TopicConfiguration() { + set(options); +} + +TopicConfiguration::TopicConfiguration(const initializer_list& options) +: TopicConfiguration() { + set(options); +} + TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr) : handle_(make_handle(ptr)) { diff --git a/tests/configuration_test.cpp b/tests/configuration_test.cpp index 90b461d..19ff60c 100644 --- a/tests/configuration_test.cpp +++ b/tests/configuration_test.cpp @@ -31,6 +31,41 @@ TEST_F(ConfigurationTest, GetSetTopicConfig) { EXPECT_THROW(config.get("asd"), ConfigOptionNotFound); } +TEST_F(ConfigurationTest, ConfigSetMultiple) { + Configuration config = { + { "group.id", "foo" }, + { "metadata.broker.list", string("asd:9092") }, + { "message.max.bytes", 2000 }, + { "topic.metadata.refresh.sparse", true } + }; + EXPECT_EQ("foo", config.get("group.id")); + EXPECT_EQ("asd:9092", config.get("metadata.broker.list")); + EXPECT_EQ(2000, config.get("message.max.bytes")); + EXPECT_EQ(true, config.get("topic.metadata.refresh.sparse")); +} + +TEST_F(ConfigurationTest, TopicConfigSetMultiple) { + TopicConfiguration config = { + { "compression.codec", "none" }, + { "offset.store.method", string("file") }, + { "request.required.acks", 2 }, + { "produce.offset.report", true } + }; + EXPECT_EQ("none", config.get("compression.codec")); + EXPECT_EQ("file", config.get("offset.store.method")); + EXPECT_EQ(2, config.get("request.required.acks")); + EXPECT_EQ(true, config.get("produce.offset.report")); +} + +TEST_F(ConfigurationTest, SetDefaultTopicConfiguration) { + Configuration config; + config.set_default_topic_configuration({{ "request.required.acks", 2 }}); + + const auto& topic_config = config.get_default_topic_configuration(); + EXPECT_TRUE(topic_config); + EXPECT_EQ(2, topic_config->get("request.required.acks")); +} + TEST_F(ConfigurationTest, SetOverloads) { Configuration config; config.set("enable.auto.commit", true);