diff --git a/CMakeLists.txt b/CMakeLists.txt index 8bf4fa9..bd9d398 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 2.8.1) project(cppkafka) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall") include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka) diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h new file mode 100644 index 0000000..a84b337 --- /dev/null +++ b/include/cppkafka/buffer.h @@ -0,0 +1,28 @@ +#ifndef CPPKAFKA_BUFFER_H +#define CPPKAFKA_BUFFER_H + +namespace cppkafka { + +class Buffer { +public: + using DataType = unsigned char; + + Buffer(); + Buffer(const std::string& data); + Buffer(const DataType* data, size_t size); + + Buffer(const Buffer&) = delete; + Buffer(Buffer&&) = delete; + Buffer& operator=(const Buffer&) = delete; + Buffer& operator=(Buffer&&) = delete; + + const DataType* get_data() const; + size_t get_size() const; +private: + const unsigned char* data_; + size_t size_; +}; + +} // cppkafka + +#endif // CPPKAFKA_BUFFER_H diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 74dc7f2..28290cd 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -16,6 +16,8 @@ public: Configuration& operator=(Configuration&& rhs) noexcept = default; void set(const std::string& name, const std::string& value); + + rd_kafka_conf_t* get_handle() const; private: using HandlePtr = std::unique_ptr; diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index f6f3351..b3165e9 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -3,22 +3,33 @@ #include #include +#include namespace cppkafka { -class KafkaException : public std::exception { - -}; - -class KafkaConfigException : public KafkaException { +class Exception : public std::exception { public: - KafkaConfigException(const std::string& config_name, const std::string& error); + Exception(std::string message); const char* what() const noexcept; private: std::string message_; }; +class ConfigException : public Exception { +public: + ConfigException(const std::string& config_name, const std::string& error); +}; + +class HandleException : public Exception { +public: + HandleException(rd_kafka_resp_err_t error_code); + + rd_kafka_resp_err_t get_error_code() const; +private: + rd_kafka_resp_err_t error_code_; +}; + } // cppkafka #endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h new file mode 100644 index 0000000..32bbeec --- /dev/null +++ b/include/cppkafka/kafka_handle_base.h @@ -0,0 +1,33 @@ +#ifndef CPPKAFKA_KAFKA_HANDLE_BASE_H +#define CPPKAFKA_KAFKA_HANDLE_BASE_H + +#include +#include +#include + +namespace cppkafka { + +class Topic; +class TopicConfiguration; + +class KafkaHandleBase { +public: + virtual ~KafkaHandleBase() = default; + + rd_kafka_t* get_handle(); + Topic get_topic(const std::string& name); + Topic get_topic(const std::string& name, const TopicConfiguration& config); +protected: + KafkaHandleBase(); + KafkaHandleBase(rd_kafka_t* handle); + + void set_handle(rd_kafka_t* handle); +private: + using HandlePtr = std::unique_ptr; + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_KAFKA_HANDLE_BASE_H diff --git a/include/cppkafka/partition.h b/include/cppkafka/partition.h new file mode 100644 index 0000000..1533de6 --- /dev/null +++ b/include/cppkafka/partition.h @@ -0,0 +1,18 @@ +#ifndef CPPKAFKA_PARTITION_H +#define CPPKAFKA_PARTITION_H + +namespace cppkafka { + +class Partition { +public: + Partition(); + Partition(int partition); + + int get_partition() const; +private: + int partition_; +}; + +} // cppkafka + +#endif // CPPKAFKA_PARTITION_H diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h new file mode 100644 index 0000000..b6abac1 --- /dev/null +++ b/include/cppkafka/producer.h @@ -0,0 +1,30 @@ +#ifndef CPPKAFKA_PRODUCER_H +#define CPPKAFKA_PRODUCER_H + +#include +#include "kafka_handle_base.h" +#include "configuration.h" + +namespace cppkafka { + +class Topic; +class Buffer; +class Partition; + +class Producer : public KafkaHandleBase { +public: + Producer(Configuration config); + + void produce(const Topic& topic, const Partition& partition, const Buffer& payload); + void produce(const Topic& topic, const Partition& partition, const Buffer& payload, + const Buffer& key); + void produce(const Topic& topic, const Partition& partition, const Buffer& payload, + const Buffer& key, void* user_data); +private: + Configuration config_; + int message_payload_policy_; +}; + +} // cppkafka + +#endif // CPPKAFKA_PRODUCER_H diff --git a/include/cppkafka/topic.h b/include/cppkafka/topic.h new file mode 100644 index 0000000..857e615 --- /dev/null +++ b/include/cppkafka/topic.h @@ -0,0 +1,26 @@ +#ifndef CPPKAFKA_TOPIC_H +#define CPPKAFKA_TOPIC_H + +#include +#include +#include +#include + +namespace cppkafka { + +class Topic { +public: + Topic(rd_kafka_topic_t* handle); + + std::string get_name() const; + + rd_kafka_topic_t* get_handle() const; +private: + using HandlePtr = std::unique_ptr; + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_TOPIC_H diff --git a/include/cppkafka/topic_configuration.h b/include/cppkafka/topic_configuration.h index 8b340b0..2e9d705 100644 --- a/include/cppkafka/topic_configuration.h +++ b/include/cppkafka/topic_configuration.h @@ -16,6 +16,8 @@ public: TopicConfiguration& operator=(TopicConfiguration&& rhs) noexcept = default; void set(const std::string& name, const std::string& value); + + rd_kafka_topic_conf_t* get_handle() const; private: using HandlePtr = std::unique_ptr; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4271ba8..0962acf 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,6 +2,11 @@ set(SOURCES configuration.cpp topic_configuration.cpp exceptions.cpp + topic.cpp + partition.cpp + + kafka_handle_base.cpp + producer.cpp ) add_library(cppkafka ${SOURCES}) \ No newline at end of file diff --git a/src/buffer.cpp b/src/buffer.cpp new file mode 100644 index 0000000..5c289d2 --- /dev/null +++ b/src/buffer.cpp @@ -0,0 +1,30 @@ +#include "buffer.h" + +using std::string; + +namespace cppkafka { + +Buffer::Buffer() +: data_(nullptr), size_(0) { + +} + +Buffer::Buffer(const string& data) +: data_(data.data()), size_(data.size()) { + +} + +Buffer::Buffer(const DataType* data, size_t size) +: data_(data), size_(size) { + +} + +const Buffer::DataType* Buffer::get_data() const { + return data_; +} + +size_t Buffer::get_size() const { + return size_; +} + +} // cppkafka diff --git a/src/configuration.cpp b/src/configuration.cpp index ee91332..ae91852 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -23,6 +23,7 @@ Configuration::Configuration(const Configuration& rhs) Configuration& Configuration::operator=(const Configuration& rhs) { handle_.reset(rd_kafka_conf_dup(rhs.handle_.get())); + return *this; } void Configuration::set(const string& name, const string& value) { @@ -31,10 +32,14 @@ void Configuration::set(const string& name, const string& value) { result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, sizeof(error_buffer)); if (result != RD_KAFKA_CONF_OK) { - throw KafkaConfigException(name, error_buffer); + throw ConfigException(name, error_buffer); } } +rd_kafka_conf_t* Configuration::get_handle() const { + return handle_.get(); +} + Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_conf_destroy); } diff --git a/src/exceptions.cpp b/src/exceptions.cpp index bed98f0..cfed337 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -4,13 +4,33 @@ using std::string; namespace cppkafka { -KafkaConfigException::KafkaConfigException(const string& config_name, - const string& error) -: message_("Failed to set " + config_name + ": " + error) { +// Exception + +Exception::Exception(string message) +: message_(move(message)) { + } -const char* KafkaConfigException::what() const noexcept { +const char* Exception::what() const noexcept { return message_.data(); } +// ConfigException + +ConfigException::ConfigException(const string& config_name, const string& error) +: Exception("Failed to set " + config_name + ": " + error) { + +} + +// HandleException + +HandleException::HandleException(rd_kafka_resp_err_t error_code) +: Exception(rd_kafka_err2str(error_code)), error_code_(error_code) { + +} + +rd_kafka_resp_err_t HandleException::get_error_code() const { + return error_code_; +} + } // cppkafka diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp new file mode 100644 index 0000000..564bf60 --- /dev/null +++ b/src/kafka_handle_base.cpp @@ -0,0 +1,45 @@ +#include "kafka_handle_base.h" +#include "exceptions.h" +#include "topic_configuration.h" +#include "topic.h" + +using std::string; + +namespace cppkafka { + +KafkaHandleBase::KafkaHandleBase() +: handle_(nullptr, nullptr) { + +} + +KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) +: handle_(handle, &rd_kafka_destroy) { + +} + +void KafkaHandleBase::set_handle(rd_kafka_t* handle) { + handle_ = HandlePtr(handle, &rd_kafka_destroy); +} + +rd_kafka_t* KafkaHandleBase::get_handle() { + return handle_.get(); +} + +Topic KafkaHandleBase::get_topic(const string& name) { + rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), nullptr); + if (!topic) { + throw Exception("Failed to create topic handle"); + } + return Topic(topic); +} + +Topic KafkaHandleBase::get_topic(const string& name, const TopicConfiguration& config) { + rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), + config.get_handle()); + if (!topic) { + throw Exception("Failed to create topic handle"); + } + return Topic(topic); +} + +} // cppkafka diff --git a/src/partition.cpp b/src/partition.cpp new file mode 100644 index 0000000..714e77e --- /dev/null +++ b/src/partition.cpp @@ -0,0 +1,20 @@ +#include "partition.h" +#include + +namespace cppkafka { + +Partition::Partition() +: partition_(RD_KAFKA_PARTITION_UA) { + +} + +Partition::Partition(int partition) +: partition_(partition) { + +} + +int Partition::get_partition() const { + return partition_; +} + +} // cppkafka diff --git a/src/producer.cpp b/src/producer.cpp new file mode 100644 index 0000000..07d44fa --- /dev/null +++ b/src/producer.cpp @@ -0,0 +1,45 @@ +#include +#include "producer.h" +#include "exceptions.h" +#include "buffer.h" +#include "topic.h" +#include "partition.h" + +using std::move; +using std::string; + +namespace cppkafka { + +Producer::Producer(Configuration config) +: config_(move(config)), message_payload_policy_(RD_KAFKA_MSG_F_COPY) { + char error_buffer[512]; + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(), + error_buffer, sizeof(error_buffer)); + if (!ptr) { + throw Exception("Failed to create producer handle: " + string(error_buffer)); + } + set_handle(ptr); +} + +void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) { + produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/); +} + +void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload, + const Buffer& key) { + produce(topic, partition, payload, key, nullptr /*user_data*/); +} + +void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload, + const Buffer& key, void* user_data) { + void* payload_ptr = (void*)payload.get_data(); + void* key_ptr = (void*)key.get_data(); + int result = rd_kafka_produce(topic.get_handle(), partition.get_partition(), + message_payload_policy_, payload_ptr, payload.get_size(), + key_ptr, key.get_size(), user_data); + if (result == -1) { + throw HandleException(rd_kafka_errno2err(errno)); + } +} + +} // cppkafka diff --git a/src/topic.cpp b/src/topic.cpp new file mode 100644 index 0000000..63721e4 --- /dev/null +++ b/src/topic.cpp @@ -0,0 +1,21 @@ +#include "topic.h" + +using std::move; +using std::string; + +namespace cppkafka { + +Topic::Topic(rd_kafka_topic_t* handle) +: handle_(handle, &rd_kafka_topic_destroy) { + +} + +string Topic::get_name() const { + return rd_kafka_topic_name(handle_.get()); +} + +rd_kafka_topic_t* Topic::get_handle() const { + return handle_.get(); +} + +} // cppkafka diff --git a/src/topic_configuration.cpp b/src/topic_configuration.cpp index 3d419a3..eaf761f 100644 --- a/src/topic_configuration.cpp +++ b/src/topic_configuration.cpp @@ -23,6 +23,7 @@ TopicConfiguration::TopicConfiguration(const TopicConfiguration& rhs) TopicConfiguration& TopicConfiguration::operator=(const TopicConfiguration& rhs) { handle_.reset(rd_kafka_topic_conf_dup(rhs.handle_.get())); + return *this; } void TopicConfiguration::set(const string& name, const string& value) { @@ -31,10 +32,14 @@ void TopicConfiguration::set(const string& name, const string& value) { result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer, sizeof(error_buffer)); if (result != RD_KAFKA_CONF_OK) { - throw KafkaConfigException(name, error_buffer); + throw ConfigException(name, error_buffer); } } +rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const { + return handle_.get(); +} + TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) { return HandlePtr(ptr, &rd_kafka_topic_conf_destroy); }