Add initial producer code

This commit is contained in:
Matias Fontanini
2016-05-16 20:46:08 -07:00
parent f86c9c1f57
commit 1cc8f027c9
18 changed files with 359 additions and 13 deletions

View File

@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 2.8.1)
project(cppkafka)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka)

28
include/cppkafka/buffer.h Normal file
View File

@@ -0,0 +1,28 @@
#ifndef CPPKAFKA_BUFFER_H
#define CPPKAFKA_BUFFER_H
namespace cppkafka {
class Buffer {
public:
using DataType = unsigned char;
Buffer();
Buffer(const std::string& data);
Buffer(const DataType* data, size_t size);
Buffer(const Buffer&) = delete;
Buffer(Buffer&&) = delete;
Buffer& operator=(const Buffer&) = delete;
Buffer& operator=(Buffer&&) = delete;
const DataType* get_data() const;
size_t get_size() const;
private:
const unsigned char* data_;
size_t size_;
};
} // cppkafka
#endif // CPPKAFKA_BUFFER_H

View File

@@ -16,6 +16,8 @@ public:
Configuration& operator=(Configuration&& rhs) noexcept = default;
void set(const std::string& name, const std::string& value);
rd_kafka_conf_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy)>;

View File

@@ -3,22 +3,33 @@
#include <stdexcept>
#include <string>
#include <librdkafka/rdkafka.h>
namespace cppkafka {
class KafkaException : public std::exception {
};
class KafkaConfigException : public KafkaException {
class Exception : public std::exception {
public:
KafkaConfigException(const std::string& config_name, const std::string& error);
Exception(std::string message);
const char* what() const noexcept;
private:
std::string message_;
};
class ConfigException : public Exception {
public:
ConfigException(const std::string& config_name, const std::string& error);
};
class HandleException : public Exception {
public:
HandleException(rd_kafka_resp_err_t error_code);
rd_kafka_resp_err_t get_error_code() const;
private:
rd_kafka_resp_err_t error_code_;
};
} // cppkafka
#endif // CPPKAFKA_EXCEPTIONS_H

View File

@@ -0,0 +1,33 @@
#ifndef CPPKAFKA_KAFKA_HANDLE_BASE_H
#define CPPKAFKA_KAFKA_HANDLE_BASE_H
#include <string>
#include <memory>
#include <librdkafka/rdkafka.h>
namespace cppkafka {
class Topic;
class TopicConfiguration;
class KafkaHandleBase {
public:
virtual ~KafkaHandleBase() = default;
rd_kafka_t* get_handle();
Topic get_topic(const std::string& name);
Topic get_topic(const std::string& name, const TopicConfiguration& config);
protected:
KafkaHandleBase();
KafkaHandleBase(rd_kafka_t* handle);
void set_handle(rd_kafka_t* handle);
private:
using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>;
HandlePtr handle_;
};
} // cppkafka
#endif // CPPKAFKA_KAFKA_HANDLE_BASE_H

View File

@@ -0,0 +1,18 @@
#ifndef CPPKAFKA_PARTITION_H
#define CPPKAFKA_PARTITION_H
namespace cppkafka {
class Partition {
public:
Partition();
Partition(int partition);
int get_partition() const;
private:
int partition_;
};
} // cppkafka
#endif // CPPKAFKA_PARTITION_H

View File

@@ -0,0 +1,30 @@
#ifndef CPPKAFKA_PRODUCER_H
#define CPPKAFKA_PRODUCER_H
#include <memory>
#include "kafka_handle_base.h"
#include "configuration.h"
namespace cppkafka {
class Topic;
class Buffer;
class Partition;
class Producer : public KafkaHandleBase {
public:
Producer(Configuration config);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key);
void produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key, void* user_data);
private:
Configuration config_;
int message_payload_policy_;
};
} // cppkafka
#endif // CPPKAFKA_PRODUCER_H

26
include/cppkafka/topic.h Normal file
View File

@@ -0,0 +1,26 @@
#ifndef CPPKAFKA_TOPIC_H
#define CPPKAFKA_TOPIC_H
#include <string>
#include <memory>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
namespace cppkafka {
class Topic {
public:
Topic(rd_kafka_topic_t* handle);
std::string get_name() const;
rd_kafka_topic_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_topic_t, decltype(&rd_kafka_topic_destroy)>;
HandlePtr handle_;
};
} // cppkafka
#endif // CPPKAFKA_TOPIC_H

View File

@@ -16,6 +16,8 @@ public:
TopicConfiguration& operator=(TopicConfiguration&& rhs) noexcept = default;
void set(const std::string& name, const std::string& value);
rd_kafka_topic_conf_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<rd_kafka_topic_conf_t,
decltype(&rd_kafka_topic_conf_destroy)>;

View File

@@ -2,6 +2,11 @@ set(SOURCES
configuration.cpp
topic_configuration.cpp
exceptions.cpp
topic.cpp
partition.cpp
kafka_handle_base.cpp
producer.cpp
)
add_library(cppkafka ${SOURCES})

30
src/buffer.cpp Normal file
View File

@@ -0,0 +1,30 @@
#include "buffer.h"
using std::string;
namespace cppkafka {
Buffer::Buffer()
: data_(nullptr), size_(0) {
}
Buffer::Buffer(const string& data)
: data_(data.data()), size_(data.size()) {
}
Buffer::Buffer(const DataType* data, size_t size)
: data_(data), size_(size) {
}
const Buffer::DataType* Buffer::get_data() const {
return data_;
}
size_t Buffer::get_size() const {
return size_;
}
} // cppkafka

View File

@@ -23,6 +23,7 @@ Configuration::Configuration(const Configuration& rhs)
Configuration& Configuration::operator=(const Configuration& rhs) {
handle_.reset(rd_kafka_conf_dup(rhs.handle_.get()));
return *this;
}
void Configuration::set(const string& name, const string& value) {
@@ -31,10 +32,14 @@ void Configuration::set(const string& name, const string& value) {
result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
sizeof(error_buffer));
if (result != RD_KAFKA_CONF_OK) {
throw KafkaConfigException(name, error_buffer);
throw ConfigException(name, error_buffer);
}
}
rd_kafka_conf_t* Configuration::get_handle() const {
return handle_.get();
}
Configuration::HandlePtr Configuration::make_handle(rd_kafka_conf_t* ptr) {
return HandlePtr(ptr, &rd_kafka_conf_destroy);
}

View File

@@ -4,13 +4,33 @@ using std::string;
namespace cppkafka {
KafkaConfigException::KafkaConfigException(const string& config_name,
const string& error)
: message_("Failed to set " + config_name + ": " + error) {
// Exception
Exception::Exception(string message)
: message_(move(message)) {
}
const char* KafkaConfigException::what() const noexcept {
const char* Exception::what() const noexcept {
return message_.data();
}
// ConfigException
ConfigException::ConfigException(const string& config_name, const string& error)
: Exception("Failed to set " + config_name + ": " + error) {
}
// HandleException
HandleException::HandleException(rd_kafka_resp_err_t error_code)
: Exception(rd_kafka_err2str(error_code)), error_code_(error_code) {
}
rd_kafka_resp_err_t HandleException::get_error_code() const {
return error_code_;
}
} // cppkafka

45
src/kafka_handle_base.cpp Normal file
View File

@@ -0,0 +1,45 @@
#include "kafka_handle_base.h"
#include "exceptions.h"
#include "topic_configuration.h"
#include "topic.h"
using std::string;
namespace cppkafka {
KafkaHandleBase::KafkaHandleBase()
: handle_(nullptr, nullptr) {
}
KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle)
: handle_(handle, &rd_kafka_destroy) {
}
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
handle_ = HandlePtr(handle, &rd_kafka_destroy);
}
rd_kafka_t* KafkaHandleBase::get_handle() {
return handle_.get();
}
Topic KafkaHandleBase::get_topic(const string& name) {
rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(), nullptr);
if (!topic) {
throw Exception("Failed to create topic handle");
}
return Topic(topic);
}
Topic KafkaHandleBase::get_topic(const string& name, const TopicConfiguration& config) {
rd_kafka_topic_t* topic = rd_kafka_topic_new(handle_.get(), name.data(),
config.get_handle());
if (!topic) {
throw Exception("Failed to create topic handle");
}
return Topic(topic);
}
} // cppkafka

20
src/partition.cpp Normal file
View File

@@ -0,0 +1,20 @@
#include "partition.h"
#include <librdkafka/rdkafka.h>
namespace cppkafka {
Partition::Partition()
: partition_(RD_KAFKA_PARTITION_UA) {
}
Partition::Partition(int partition)
: partition_(partition) {
}
int Partition::get_partition() const {
return partition_;
}
} // cppkafka

45
src/producer.cpp Normal file
View File

@@ -0,0 +1,45 @@
#include <errno.h>
#include "producer.h"
#include "exceptions.h"
#include "buffer.h"
#include "topic.h"
#include "partition.h"
using std::move;
using std::string;
namespace cppkafka {
Producer::Producer(Configuration config)
: config_(move(config)), message_payload_policy_(RD_KAFKA_MSG_F_COPY) {
char error_buffer[512];
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(),
error_buffer, sizeof(error_buffer));
if (!ptr) {
throw Exception("Failed to create producer handle: " + string(error_buffer));
}
set_handle(ptr);
}
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload) {
produce(topic, partition, payload, Buffer{} /*key*/, nullptr /*user_data*/);
}
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key) {
produce(topic, partition, payload, key, nullptr /*user_data*/);
}
void Producer::produce(const Topic& topic, const Partition& partition, const Buffer& payload,
const Buffer& key, void* user_data) {
void* payload_ptr = (void*)payload.get_data();
void* key_ptr = (void*)key.get_data();
int result = rd_kafka_produce(topic.get_handle(), partition.get_partition(),
message_payload_policy_, payload_ptr, payload.get_size(),
key_ptr, key.get_size(), user_data);
if (result == -1) {
throw HandleException(rd_kafka_errno2err(errno));
}
}
} // cppkafka

21
src/topic.cpp Normal file
View File

@@ -0,0 +1,21 @@
#include "topic.h"
using std::move;
using std::string;
namespace cppkafka {
Topic::Topic(rd_kafka_topic_t* handle)
: handle_(handle, &rd_kafka_topic_destroy) {
}
string Topic::get_name() const {
return rd_kafka_topic_name(handle_.get());
}
rd_kafka_topic_t* Topic::get_handle() const {
return handle_.get();
}
} // cppkafka

View File

@@ -23,6 +23,7 @@ TopicConfiguration::TopicConfiguration(const TopicConfiguration& rhs)
TopicConfiguration& TopicConfiguration::operator=(const TopicConfiguration& rhs) {
handle_.reset(rd_kafka_topic_conf_dup(rhs.handle_.get()));
return *this;
}
void TopicConfiguration::set(const string& name, const string& value) {
@@ -31,10 +32,14 @@ void TopicConfiguration::set(const string& name, const string& value) {
result = rd_kafka_topic_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
sizeof(error_buffer));
if (result != RD_KAFKA_CONF_OK) {
throw KafkaConfigException(name, error_buffer);
throw ConfigException(name, error_buffer);
}
}
rd_kafka_topic_conf_t* TopicConfiguration::get_handle() const {
return handle_.get();
}
TopicConfiguration::HandlePtr TopicConfiguration::make_handle(rd_kafka_topic_conf_t* ptr) {
return HandlePtr(ptr, &rd_kafka_topic_conf_destroy);
}