diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index da81f54..3e869e0 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -15,7 +15,6 @@ public: Buffer(const T* data, size_t size) : data_(reinterpret_cast(data)), size_(size) { static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte"); - } Buffer(const Buffer&) = delete; diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 7454e62..d0680e5 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -26,7 +26,6 @@ public: Consumer& operator=(Consumer&&) = delete; ~Consumer(); - void set_timeout(const std::chrono::milliseconds timeout); void set_assignment_callback(AssignmentCallback callback); void set_revocation_callback(RevocationCallback callback); void set_rebalance_error_callback(RebalanceErrorCallback callback); @@ -50,8 +49,6 @@ public: Message poll(); private: - static const std::chrono::milliseconds DEFAULT_TIMEOUT; - static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); @@ -59,7 +56,6 @@ private: void commit(const TopicPartitionList& topic_partitions, bool async); void handle_rebalance(rd_kafka_resp_err_t err, const TopicPartitionList& topic_partitions); - std::chrono::milliseconds timeout_ms_; AssignmentCallback assignment_callback_; RevocationCallback revocation_callback_; RebalanceErrorCallback rebalance_error_callback_; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index ee08739..bedbc8c 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -3,8 +3,10 @@ #include #include +#include #include #include "topic_partition_list.h" +#include "metadata.h" namespace cppkafka { @@ -22,9 +24,14 @@ public: void pause_partitions(const TopicPartitionList& topic_partitions); void resume_partitions(const TopicPartitionList& topic_partitions); + void set_timeout(const std::chrono::milliseconds& timeout); + rd_kafka_t* get_handle(); Topic get_topic(const std::string& name); Topic get_topic(const std::string& name, TopicConfiguration config); + Metadata get_metadata(); + Metadata get_metadata(const Topic& topic); + std::chrono::milliseconds get_timeout() const; protected: KafkaHandleBase(); KafkaHandleBase(rd_kafka_t* handle); @@ -32,11 +39,15 @@ protected: void set_handle(rd_kafka_t* handle); void check_error(rd_kafka_resp_err_t error); private: + static const std::chrono::milliseconds DEFAULT_TIMEOUT; + using HandlePtr = std::unique_ptr; Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); + Metadata get_metadata(rd_kafka_topic_t* topic_ptr); HandlePtr handle_; + std::chrono::milliseconds timeout_ms_; }; } // cppkafka diff --git a/include/cppkafka/metadata.h b/include/cppkafka/metadata.h new file mode 100644 index 0000000..95494e0 --- /dev/null +++ b/include/cppkafka/metadata.h @@ -0,0 +1,72 @@ +#ifndef CPPKAFKA_METADATA_H +#define CPPKAFKA_METADATA_H + +#include +#include +#include +#include +#include +#include + +namespace cppkafka { + +class PartitionMetadata { +public: + PartitionMetadata(const rd_kafka_metadata_partition& partition); + + uint32_t get_id() const; + rd_kafka_resp_err_t get_error() const; + int32_t get_leader() const; + const std::vector& get_replicas() const; + const std::vector& get_in_sync_replica_brokers() const; +private: + int32_t id_; + rd_kafka_resp_err_t error_; + int32_t leader_; + std::vector replicas_; + std::vector isrs_; +}; + +class TopicMetadata { +public: + TopicMetadata(const rd_kafka_metadata_topic& topic); + + const std::string& get_topic() const; + rd_kafka_resp_err_t get_error() const; + const std::vector& get_partitions() const; +private: + std::string topic_; + rd_kafka_resp_err_t error_; + std::vector partitions_; +}; + +class BrokerMetadata { +public: + BrokerMetadata(const rd_kafka_metadata_broker_t& broker); + + const std::string& get_host() const; + int32_t get_id() const; + uint16_t get_port() const; +private: + const std::string host_; + int32_t id_; + uint16_t port_; +}; + +class Metadata { +public: + Metadata(const rd_kafka_metadata_t* ptr); + + std::vector get_brokers() const; + std::vector get_topics() const; + std::vector get_topics(const std::unordered_set& topics) const; + std::vector get_topics(const std::string& prefix) const; +private: + using HandlePtr = std::unique_ptr; + + HandlePtr handle_; +}; + +} // cppkafka + +#endif // CPPKAFKA_METADATA_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2cd8c2f..26916a0 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES message.cpp topic_partition.cpp topic_partition_list.cpp + metadata.cpp kafka_handle_base.cpp producer.cpp diff --git a/src/consumer.cpp b/src/consumer.cpp index 9863f8a..8887855 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -10,16 +10,13 @@ using std::chrono::milliseconds; namespace cppkafka { -const milliseconds Consumer::DEFAULT_TIMEOUT{1000}; - void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque) { TopicPartitionList list = TopicPartitionList::make_non_owning(partitions); static_cast(opaque)->handle_rebalance(error, list); } -Consumer::Consumer(Configuration config) -: timeout_ms_(DEFAULT_TIMEOUT) { +Consumer::Consumer(Configuration config) { char error_buffer[512]; // Set ourselves as the opaque pointer rd_kafka_conf_set_opaque(config.get_handle(), this); @@ -37,10 +34,6 @@ Consumer::~Consumer() { close(); } -void Consumer::set_timeout(const milliseconds timeout) { - timeout_ms_ = timeout; -} - void Consumer::set_assignment_callback(AssignmentCallback callback) { assignment_callback_ = move(callback); } @@ -101,7 +94,7 @@ TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& top // Copy the list, let rd_kafka change it and return it TopicPartitionList output = topic_partitions; rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(), - timeout_ms_.count()); + get_timeout().count()); check_error(error); return output; } @@ -131,7 +124,8 @@ TopicPartitionList Consumer::get_assignment() { } Message Consumer::poll() { - rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); + rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), + get_timeout().count()); return message ? Message(message) : Message(); } diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 58626e5..bd0265f 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -4,16 +4,19 @@ #include "topic.h" using std::string; +using std::chrono::milliseconds; namespace cppkafka { +const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; + KafkaHandleBase::KafkaHandleBase() -: handle_(nullptr, nullptr) { +: handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) { } KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle) -: handle_(handle, &rd_kafka_destroy) { +: handle_(handle, &rd_kafka_destroy), timeout_ms_(DEFAULT_TIMEOUT) { } @@ -29,6 +32,10 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio check_error(error); } +void KafkaHandleBase::set_timeout(const milliseconds& timeout) { + timeout_ms_ = timeout; +} + rd_kafka_t* KafkaHandleBase::get_handle() { return handle_.get(); } @@ -41,6 +48,18 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicCon return get_topic(name, topicConfig.get_handle()); } +Metadata KafkaHandleBase::get_metadata() { + return get_metadata(nullptr); +} + +Metadata KafkaHandleBase::get_metadata(const Topic& topic) { + return get_metadata(topic.get_handle()); +} + +milliseconds KafkaHandleBase::get_timeout() const { + return timeout_ms_; +} + void KafkaHandleBase::set_handle(rd_kafka_t* handle) { handle_ = HandlePtr(handle, &rd_kafka_destroy); } @@ -53,6 +72,14 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf return Topic(topic); } +Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) { + const rd_kafka_metadata_t* metadata; + rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr, + topic_ptr, &metadata, timeout_ms_.count()); + check_error(error); + return Metadata(metadata); +} + void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw HandleException(error); diff --git a/src/metadata.cpp b/src/metadata.cpp new file mode 100644 index 0000000..05c834a --- /dev/null +++ b/src/metadata.cpp @@ -0,0 +1,129 @@ +#include "metadata.h" + +using std::string; +using std::vector; +using std::unordered_set; + +namespace cppkafka { + +// PartitionMetadata + +PartitionMetadata::PartitionMetadata(const rd_kafka_metadata_partition& partition) +: id_(partition.id), error_(partition.err), leader_(partition.leader) { + for (int i = 0; i < partition.replica_cnt; ++i) { + replicas_.push_back(partition.replicas[i]); + } + for (int i = 0; i < partition.isr_cnt; ++i) { + isrs_.push_back(partition.isrs[i]); + } +} + +uint32_t PartitionMetadata::get_id() const { + return id_; +} + +rd_kafka_resp_err_t PartitionMetadata::get_error() const { + return error_; +} + +int32_t PartitionMetadata::get_leader() const { + return leader_; +} + +const vector& PartitionMetadata::get_replicas() const { + return replicas_; +} + +const vector& PartitionMetadata::get_in_sync_replica_brokers() const { + return isrs_; +} + +// TopicMetadata + +TopicMetadata::TopicMetadata(const rd_kafka_metadata_topic& topic) +: topic_(topic.topic), error_(topic.err) { + for (int i = 0; i < topic.partition_cnt; ++i) { + partitions_.emplace_back(topic.partitions[i]); + } +} + +const string& TopicMetadata::get_topic() const { + return topic_; +} + +rd_kafka_resp_err_t TopicMetadata::get_error() const { + return error_; +} + +const vector& TopicMetadata::get_partitions() const { + return partitions_; +} + +// BrokerMetadata + +BrokerMetadata::BrokerMetadata(const rd_kafka_metadata_broker_t& broker) +: host_(broker.host), id_(broker.id), port_(static_cast(broker.port)) { + +} + +const string& BrokerMetadata::get_host() const { + return host_; +} + +int32_t BrokerMetadata::get_id() const { + return id_; +} + +uint16_t BrokerMetadata::get_port() const { + return port_; +} + +// Metadata + +Metadata::Metadata(const rd_kafka_metadata_t* ptr) +: handle_(ptr, &rd_kafka_metadata_destroy) { + +} + +vector Metadata::get_brokers() const { + vector output; + for (int i = 0; i < handle_->broker_cnt; ++i) { + const rd_kafka_metadata_broker_t& broker = handle_->brokers[i]; + output.emplace_back(broker); + } + return output; +} + +vector Metadata::get_topics() const { + vector output; + for (int i = 0; i < handle_->topic_cnt; ++i) { + const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; + output.emplace_back(topic); + } + return output; +} + +vector Metadata::get_topics(const unordered_set& topics) const { + vector output; + for (int i = 0; i < handle_->topic_cnt; ++i) { + const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; + if (topics.count(topic.topic)) { + output.emplace_back(topic); + } + } + return output; +} + +vector Metadata::get_topics(const string& prefix) const { + vector output; + for (int i = 0; i < handle_->topic_cnt; ++i) { + const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; + string topic_name = topic.topic; + if (topic_name.find(prefix) == 0) { + output.emplace_back(topic); + } + } + return output; +} + +} // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index cded023..c7add58 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -15,3 +15,4 @@ endmacro() add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") create_test(producer) +create_test(kafka_handle_base) diff --git a/tests/kafka_handle_base_test.cpp b/tests/kafka_handle_base_test.cpp new file mode 100644 index 0000000..20d4227 --- /dev/null +++ b/tests/kafka_handle_base_test.cpp @@ -0,0 +1,97 @@ +#include +#include +#include +#include "cppkafka/producer.h" + +using std::vector; +using std::set; +using std::unordered_set; +using std::string; + +using namespace cppkafka; + +class KafkaHandleBaseTest : public testing::Test { +public: + static const string KAFKA_TOPIC; + + Configuration make_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + return config; + } + + string get_kafka_host() { + string uri = KAFKA_TEST_INSTANCE; + size_t index = uri.find(':'); + if (index == string::npos) { + return uri; + } + else { + return uri.substr(0, index); + } + } + + uint16_t get_kafka_port() { + string uri = KAFKA_TEST_INSTANCE; + size_t index = uri.find(':'); + if (index == string::npos) { + return 9092; + } + else { + return stoul(uri.substr(index + 1)); + } + } +}; + +const string KafkaHandleBaseTest::KAFKA_TOPIC = "cppkafka_test1"; + +TEST_F(KafkaHandleBaseTest, BrokersMetadata) { + Producer producer(make_config()); + Metadata metadata = producer.get_metadata(); + + vector brokers = metadata.get_brokers(); + ASSERT_EQ(1, brokers.size()); + const auto& broker = brokers[0]; + // TODO: resolve this + //EXPECT_EQ(get_kafka_host(), broker.get_host()); + EXPECT_EQ(get_kafka_port(), broker.get_port()); +} + +TEST_F(KafkaHandleBaseTest, TopicsMetadata) { + unordered_set topic_names = { "cppkafka_test1", "cppkafka_test2" }; + size_t found_topics = 0; + + Producer producer(make_config()); + Metadata metadata = producer.get_metadata(); + + const vector& topics = metadata.get_topics(); + ASSERT_GE(topics.size(), 2); + + for (const auto& topic : topics) { + if (topic_names.count(topic.get_topic()) == 1) { + const vector& partitions = topic.get_partitions(); + EXPECT_EQ(3, partitions.size()); + set expected_ids = { 0, 1, 2 }; + for (const PartitionMetadata& partition : partitions) { + EXPECT_EQ(1, expected_ids.erase(partition.get_id())); + for (int32_t replica : partition.get_replicas()) { + EXPECT_EQ(0, replica); + } + for (int32_t isr : partition.get_in_sync_replica_brokers()) { + EXPECT_EQ(0, isr); + } + } + found_topics++; + } + } + EXPECT_EQ(topic_names.size(), found_topics); + + // Find by names + EXPECT_EQ(topic_names.size(), metadata.get_topics(topic_names).size()); + // Find by prefix + EXPECT_EQ(topic_names.size(), metadata.get_topics("cppkafka_").size()); + + // Now get the whole metadata only for this topic + Topic topic = producer.get_topic(KAFKA_TOPIC); + EXPECT_EQ(1, producer.get_metadata(topic).get_topics().size()); +}