From f86c9c1f5785cb2122a5c217bccc9f55fb27e58d Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Thu, 12 May 2016 20:37:58 -0700 Subject: [PATCH] Add initial config classes --- CMakeLists.txt | 8 +++++ include/cppkafka/configuration.h | 30 ++++++++++++++++++ include/cppkafka/exceptions.h | 24 +++++++++++++++ include/cppkafka/topic_configuration.h | 31 +++++++++++++++++++ src/CMakeLists.txt | 7 +++++ src/configuration.cpp | 42 ++++++++++++++++++++++++++ src/exceptions.cpp | 16 ++++++++++ src/topic_configuration.cpp | 42 ++++++++++++++++++++++++++ 8 files changed, 200 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 include/cppkafka/configuration.h create mode 100644 include/cppkafka/exceptions.h create mode 100644 include/cppkafka/topic_configuration.h create mode 100644 src/CMakeLists.txt create mode 100644 src/configuration.cpp create mode 100644 src/exceptions.cpp create mode 100644 src/topic_configuration.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..8bf4fa9 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,8 @@ +cmake_minimum_required(VERSION 2.8.1) +project(cppkafka) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + +include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka) + +add_subdirectory(src) \ No newline at end of file diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h new file mode 100644 index 0000000..74dc7f2 --- /dev/null +++ b/include/cppkafka/configuration.h @@ -0,0 +1,30 @@ +#ifndef CPPKAFKA_CONFIGURATION_H +#define CPPKAFKA_CONFIGURATION_H + +#include +#include +#include + +namespace cppkafka { + +class Configuration { +public: + Configuration(); + Configuration(const Configuration& rhs); + Configuration(Configuration&& rhs) noexcept = default; + Configuration& operator=(const Configuration& rhs); + Configuration& operator=(Configuration&& rhs) noexcept = default; + + void set(const std::string& name, const std::string& value); +private: + using HandlePtr = std::unique_ptr; + + Configuration(rd_kafka_conf_t* ptr); + static HandlePtr make_handle(rd_kafka_conf_t* ptr); + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_CONFIGURATION_H diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h new file mode 100644 index 0000000..f6f3351 --- /dev/null +++ b/include/cppkafka/exceptions.h @@ -0,0 +1,24 @@ +#ifndef CPPKAFKA_EXCEPTIONS_H +#define CPPKAFKA_EXCEPTIONS_H + +#include +#include + +namespace cppkafka { + +class KafkaException : public std::exception { + +}; + +class KafkaConfigException : public KafkaException { +public: + KafkaConfigException(const std::string& config_name, const std::string& error); + + const char* what() const noexcept; +private: + std::string message_; +}; + +} // cppkafka + +#endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h new file mode 100644 index 0000000..8b340b0 --- /dev/null +++ b/include/cppkafka/topic_configuration.h @@ -0,0 +1,31 @@ +#ifndef CPPKAFKA_TOPIC_CONFIGURATION_H +#define CPPKAFKA_TOPIC_CONFIGURATION_H + +#include +#include +#include + +namespace cppkafka { + +class TopicConfiguration { +public: + TopicConfiguration(); + TopicConfiguration(const TopicConfiguration& rhs); + TopicConfiguration(TopicConfiguration&& rhs) noexcept = default; + TopicConfiguration& operator=(const TopicConfiguration& rhs); + TopicConfiguration& operator=(TopicConfiguration&& rhs) noexcept = default; + + void set(const std::string& name, const std::string& value); +private: + using HandlePtr = std::unique_ptr; + + TopicConfiguration(rd_kafka_topic_conf_t* ptr); + static HandlePtr make_handle(rd_kafka_topic_conf_t* ptr); + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_TOPIC_CONFIGURATION_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt new file mode 100644 index 0000000..4271ba8 --- /dev/null +++ b/src/CMakeLists.txt @@ -0,0 +1,7 @@ +set(SOURCES + configuration.cpp + topic_configuration.cpp + exceptions.cpp +) + +add_library(cppkafka ${SOURCES}) \ No newline at end of file diff --git a/src/configuration.cpp b/src/configuration.cpp new file mode 100644 index 0000000..ee91332 --- /dev/null +++ b/src/configuration.cpp @@ -0,0 +1,42 @@ +#include "configuration.h" +#include +#include "exceptions.h" + +using std::string; + +namespace cppkafka { + +Configuration::Configuration() +: handle_(make_handle(rd_kafka_conf_new())) { + +} + +Configuration::Configuration(rd_kafka_conf_t* ptr) +: handle_(make_handle(ptr)) { + +} + +Configuration::Configuration(const Configuration& rhs) +: handle_(make_handle(rd_kafka_conf_dup(rhs.handle_.get()))) { + +} + +Configuration& Configuration::operator=(const Configuration& rhs) { + handle_.reset(rd_kafka_conf_dup(rhs.handle_.get())); +} + +void Configuration::set(const string& name, const string& value) { + char error_buffer[512]; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, + sizeof(error_buffer)); + if (result != RD_KAFKA_CONF_OK) { + throw KafkaConfigException(name, error_buffer); + } +} + +Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { + return HandlePtr(ptr, &rd_kafka_conf_destroy); +} + +} // cppkafka diff --git a/src/exceptions.cpp b/src/exceptions.cpp new file mode 100644 index 0000000..bed98f0 --- /dev/null +++ b/src/exceptions.cpp @@ -0,0 +1,16 @@ +#include "exceptions.h" + +using std::string; + +namespace cppkafka { + +KafkaConfigException::KafkaConfigException(const string& config_name, + const string& error) +: message_("Failed to set " + config_name + ": " + error) { +} + +const char* KafkaConfigException::what() const noexcept { + return message_.data(); +} + +} // cppkafka diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp new file mode 100644 index 0000000..3d419a3 --- /dev/null +++ b/src/topic_configuration.cpp @@ -0,0 +1,42 @@ +#include "topic_configuration.h" +#include +#include "exceptions.h" + +using std::string; + +namespace cppkafka { + +TopicConfiguration::TopicConfiguration() +: handle_(make_handle(rd_kafka_topic_conf_new())) { + +} + +TopicConfiguration::TopicConfiguration(rd_kafka_topic_conf_t* ptr) +: handle_(make_handle(ptr)) { + +} + +TopicConfiguration::TopicConfiguration(const TopicConfiguration& rhs) +: handle_(make_handle(rd_kafka_topic_conf_dup(rhs.handle_.get()))) { + +} + +TopicConfiguration& TopicConfiguration::operator=(const TopicConfiguration& rhs) { + handle_.reset(rd_kafka_topic_conf_dup(rhs.handle_.get())); +} + +void TopicConfiguration::set(const string& name, const string& value) { + char error_buffer[512]; + rd_kafka_conf_res_t result; + result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer, + sizeof(error_buffer)); + if (result != RD_KAFKA_CONF_OK) { + throw KafkaConfigException(name, error_buffer); + } +} + +TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) { + return HandlePtr(ptr, &rd_kafka_topic_conf_destroy); +} + +} // cppkafka