diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h new file mode 100644 index 0000000..b8ebd9a --- /dev/null +++ b/include/cppkafka/consumer.h @@ -0,0 +1,28 @@ +#ifndef CPP_KAFKA_CONSUMER_H +#define CPP_KAFKA_CONSUMER_H + +#include +#include +#include "kafka_handle_base.h" +#include "topic_partition_list.h" + +namespace cppkafka { + +class Configuration; +class TopicConfiguration; + +class Consumer : public KafkaHandleBase { +public: + Consumer(const Configuration& config); + + void subscribe(const std::vector& topics); + void unsubscribe(); + + void assign(const TopicPartitionList& topic_partitions); +private: + void check_error(rd_kafka_resp_err_t error); +}; + +} // cppkafka + +#endif // CPP_KAFKA_CONSUMER_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 76b8606..76fad22 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -13,6 +13,10 @@ class TopicConfiguration; class KafkaHandleBase { public: virtual ~KafkaHandleBase() = default; + KafkaHandleBase(const KafkaHandleBase&) = delete; + KafkaHandleBase(KafkaHandleBase&&) = delete; + KafkaHandleBase& operator=(const KafkaHandleBase&) = delete; + KafkaHandleBase& operator=(KafkaHandleBase&&) = delete; rd_kafka_t* get_handle(); Topic get_topic(const std::string& name); diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 14a53f7..d004fde 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -21,6 +21,7 @@ public: const Buffer& get_key() const; int64_t get_offset() const; void* private_data(); + explicit operator bool() const; rd_kafka_message_t* get_handle() const; private: diff --git a/include/cppkafka/topic_partition.h b/include/cppkafka/topic_partition.h new file mode 100644 index 0000000..4946980 --- /dev/null +++ b/include/cppkafka/topic_partition.h @@ -0,0 +1,26 @@ +#ifndef CPPKAFKA_TOPIC_PARTITION_H +#define CPPKAFKA_TOPIC_PARTITION_H + +#include +#include + +namespace cppkafka { + +class TopicPartition { +public: + TopicPartition(const std::string& topic); + TopicPartition(const std::string& topic, int partition); + TopicPartition(const std::string& topic, int partition, int64_t offset); + + const std::string& get_topic() const; + int get_partition() const; + int64_t get_offset() const; +private: + std::string topic_; + int partition_; + int64_t offset_; +}; + +} // cppkafka + +#endif // CPPKAFKA_TOPIC_PARTITION_H diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h new file mode 100644 index 0000000..8ff99d3 --- /dev/null +++ b/include/cppkafka/topic_partition_list.h @@ -0,0 +1,48 @@ +#ifndef CPPKAFKA_TOPIC_PARTITION_LIST_H +#define CPPKAFKA_TOPIC_PARTITION_LIST_H + +#include +#include +#include +#include "topic_partition.h" + +namespace cppkafka { + +class TopicPartition; + +class TopicPartitionList { +public: + TopicPartitionList(); + TopicPartitionList(size_t size); + template + TopicPartitionList(ForwardIterator start, const ForwardIterator& end) + : TopicPartitionList(std::distance(start, end)) { + while (start != end) { + add(*start); + ++start; + } + } + TopicPartitionList(const TopicPartitionList& rhs); + TopicPartitionList(TopicPartitionList&&) = default; + TopicPartitionList& operator=(const TopicPartitionList& rhs); + TopicPartitionList& operator=(TopicPartitionList&&) = default; + + void add(const TopicPartition& topic_partition); + size_t size() const; + bool empty() const; + + rd_kafka_topic_partition_list_t* get_handle() const; +private: + static const size_t DEFAULT_CONTAINER_SIZE; + + using HandlePtr = std::unique_ptr; + + static HandlePtr make_handle(rd_kafka_topic_partition_list_t* ptr); + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_TOPIC_PARTITION_LIST_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fc7ada5..2cd8c2f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -6,9 +6,12 @@ set(SOURCES partition.cpp buffer.cpp message.cpp + topic_partition.cpp + topic_partition_list.cpp kafka_handle_base.cpp producer.cpp + consumer.cpp ) add_library(cppkafka ${SOURCES}) \ No newline at end of file diff --git a/src/consumer.cpp b/src/consumer.cpp new file mode 100644 index 0000000..046bc1a --- /dev/null +++ b/src/consumer.cpp @@ -0,0 +1,45 @@ +#include "consumer.h" +#include "exceptions.h" +#include "configuration.h" +#include "topic_partition_list.h" + +using std::vector; +using std::string; + +namespace cppkafka { + +Consumer::Consumer(const Configuration& config) { + char error_buffer[512]; + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(), + error_buffer, sizeof(error_buffer)); + if (!ptr) { + throw Exception("Failed to create consumer handle: " + string(error_buffer)); + } + set_handle(ptr); +} + +void Consumer::subscribe(const vector& topics) { + TopicPartitionList list(topics.begin(), topics.end()); + rd_kafka_resp_err_t error = rd_kafka_subscribe(get_handle(), list.get_handle()); + check_error(error); +} + +void Consumer::unsubscribe() { + rd_kafka_resp_err_t error = rd_kafka_unsubscribe(get_handle()); + check_error(error); +} + +void Consumer::assign(const TopicPartitionList& topic_partitions) { + // If the list is empty, then we need to use a null pointer + auto handle = topic_partitions.empty() ? nullptr : topic_partitions.get_handle(); + rd_kafka_resp_err_t error = rd_kafka_assign(get_handle(), handle); + check_error(error); +} + +void Consumer::check_error(rd_kafka_resp_err_t error) { + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw HandleException(error); + } +} + +} // cppkafka diff --git a/src/message.cpp b/src/message.cpp index ce0ac61..f7457d2 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -43,6 +43,10 @@ void* Message::private_data() { return handle_->_private; } +Message::operator bool() const { + return handle_ != nullptr; +} + rd_kafka_message_t* Message::get_handle() const { return handle_.get(); } diff --git a/src/topic_partition.cpp b/src/topic_partition.cpp new file mode 100644 index 0000000..34bce0f --- /dev/null +++ b/src/topic_partition.cpp @@ -0,0 +1,35 @@ +#include +#include "topic_partition.h" + +using std::string; + +namespace cppkafka { + +TopicPartition::TopicPartition(const string& topic) +: TopicPartition(topic, RD_KAFKA_PARTITION_UA) { + +} + +TopicPartition::TopicPartition(const string& topic, int partition) +: TopicPartition(topic, partition, RD_KAFKA_OFFSET_INVALID) { + +} + +TopicPartition::TopicPartition(const string& topic, int partition, int64_t offset) +: topic_(topic), partition_(partition), offset_(offset) { + +} + +const string& TopicPartition::get_topic() const { + return topic_; +} + +int TopicPartition::get_partition() const { + return partition_; +} + +int64_t TopicPartition::get_offset() const { + return offset_; +} + +} // cppkafka diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp new file mode 100644 index 0000000..85445a8 --- /dev/null +++ b/src/topic_partition_list.cpp @@ -0,0 +1,53 @@ +#include "topic_partition_list.h" +#include "topic_partition.h" + +namespace cppkafka { + +const size_t TopicPartitionList::DEFAULT_CONTAINER_SIZE = 5; + +TopicPartitionList::TopicPartitionList() +: TopicPartitionList(DEFAULT_CONTAINER_SIZE) { + +} + +TopicPartitionList::TopicPartitionList(size_t size) +: handle_(make_handle(rd_kafka_topic_partition_list_new(size))) { + +} + +TopicPartitionList::TopicPartitionList(const TopicPartitionList& rhs) +: handle_(make_handle(rd_kafka_topic_partition_list_copy(rhs.get_handle()))) { + +} + +TopicPartitionList& TopicPartitionList::operator=(const TopicPartitionList& rhs) { + handle_.reset(rd_kafka_topic_partition_list_copy(rhs.get_handle())); + return *this; +} + +void TopicPartitionList::add(const TopicPartition& topic_partition) { + rd_kafka_topic_partition_t* element = nullptr; + element = rd_kafka_topic_partition_list_add(handle_.get(), + topic_partition.get_topic().data(), + topic_partition.get_partition()); + element->offset = topic_partition.get_offset(); +} + +size_t TopicPartitionList::size() const { + return handle_->cnt; +} + +bool TopicPartitionList::empty() const { + return size() == 0; +} + +rd_kafka_topic_partition_list_t* TopicPartitionList::get_handle() const { + return handle_.get(); +} + +TopicPartitionList::HandlePtr +TopicPartitionList::make_handle(rd_kafka_topic_partition_list_t* ptr) { + return HandlePtr(ptr, &rd_kafka_topic_partition_list_destroy); +} + +} // cppkafka