mirror of
				https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
				synced 2025-10-31 10:37:46 +00:00 
			
		
		
		
	Add metadata objects and tests for them
This commit is contained in:
		| @@ -15,7 +15,6 @@ public: | ||||
|     Buffer(const T* data, size_t size)  | ||||
|     : data_(reinterpret_cast<const DataType*>(data)), size_(size) { | ||||
|         static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte"); | ||||
|  | ||||
|     } | ||||
|  | ||||
|     Buffer(const Buffer&) = delete; | ||||
|   | ||||
| @@ -26,7 +26,6 @@ public: | ||||
|     Consumer& operator=(Consumer&&) = delete; | ||||
|     ~Consumer(); | ||||
|  | ||||
|     void set_timeout(const std::chrono::milliseconds timeout); | ||||
|     void set_assignment_callback(AssignmentCallback callback); | ||||
|     void set_revocation_callback(RevocationCallback callback); | ||||
|     void set_rebalance_error_callback(RebalanceErrorCallback callback); | ||||
| @@ -50,8 +49,6 @@ public: | ||||
|  | ||||
|     Message poll(); | ||||
| private: | ||||
|     static const std::chrono::milliseconds DEFAULT_TIMEOUT; | ||||
|  | ||||
|     static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, | ||||
|                                 rd_kafka_topic_partition_list_t *partitions, void *opaque); | ||||
|  | ||||
| @@ -59,7 +56,6 @@ private: | ||||
|     void commit(const TopicPartitionList& topic_partitions, bool async); | ||||
|     void handle_rebalance(rd_kafka_resp_err_t err, const TopicPartitionList& topic_partitions); | ||||
|  | ||||
|     std::chrono::milliseconds timeout_ms_; | ||||
|     AssignmentCallback assignment_callback_; | ||||
|     RevocationCallback revocation_callback_; | ||||
|     RebalanceErrorCallback rebalance_error_callback_; | ||||
|   | ||||
| @@ -3,8 +3,10 @@ | ||||
|  | ||||
| #include <string> | ||||
| #include <memory> | ||||
| #include <chrono> | ||||
| #include <librdkafka/rdkafka.h> | ||||
| #include "topic_partition_list.h" | ||||
| #include "metadata.h" | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| @@ -22,9 +24,14 @@ public: | ||||
|     void pause_partitions(const TopicPartitionList& topic_partitions); | ||||
|     void resume_partitions(const TopicPartitionList& topic_partitions); | ||||
|  | ||||
|     void set_timeout(const std::chrono::milliseconds& timeout); | ||||
|  | ||||
|     rd_kafka_t* get_handle(); | ||||
|     Topic get_topic(const std::string& name); | ||||
|     Topic get_topic(const std::string& name, TopicConfiguration config); | ||||
|     Metadata get_metadata(); | ||||
|     Metadata get_metadata(const Topic& topic); | ||||
|     std::chrono::milliseconds get_timeout() const; | ||||
| protected: | ||||
|     KafkaHandleBase(); | ||||
|     KafkaHandleBase(rd_kafka_t* handle); | ||||
| @@ -32,11 +39,15 @@ protected: | ||||
|     void set_handle(rd_kafka_t* handle); | ||||
|     void check_error(rd_kafka_resp_err_t error); | ||||
| private: | ||||
|     static const std::chrono::milliseconds DEFAULT_TIMEOUT; | ||||
|  | ||||
|     using HandlePtr = std::unique_ptr<rd_kafka_t, decltype(&rd_kafka_destroy)>; | ||||
|  | ||||
|     Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); | ||||
|     Metadata get_metadata(rd_kafka_topic_t* topic_ptr); | ||||
|  | ||||
|     HandlePtr handle_; | ||||
|     std::chrono::milliseconds timeout_ms_; | ||||
| }; | ||||
|  | ||||
| } // cppkafka | ||||
|   | ||||
							
								
								
									
										72
									
								
								include/cppkafka/metadata.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										72
									
								
								include/cppkafka/metadata.h
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,72 @@ | ||||
| #ifndef CPPKAFKA_METADATA_H | ||||
| #define CPPKAFKA_METADATA_H | ||||
|  | ||||
| #include <memory> | ||||
| #include <string> | ||||
| #include <vector> | ||||
| #include <cstdint> | ||||
| #include <unordered_set> | ||||
| #include <librdkafka/rdkafka.h> | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| class PartitionMetadata { | ||||
| public: | ||||
|     PartitionMetadata(const rd_kafka_metadata_partition& partition); | ||||
|  | ||||
|     uint32_t get_id() const; | ||||
|     rd_kafka_resp_err_t get_error() const; | ||||
|     int32_t get_leader() const; | ||||
|     const std::vector<int32_t>& get_replicas() const; | ||||
|     const std::vector<int32_t>& get_in_sync_replica_brokers() const; | ||||
| private: | ||||
|     int32_t id_; | ||||
|     rd_kafka_resp_err_t error_; | ||||
|     int32_t leader_; | ||||
|     std::vector<int32_t> replicas_; | ||||
|     std::vector<int32_t> isrs_; | ||||
| }; | ||||
|  | ||||
| class TopicMetadata { | ||||
| public: | ||||
|     TopicMetadata(const rd_kafka_metadata_topic& topic); | ||||
|  | ||||
|     const std::string& get_topic() const; | ||||
|     rd_kafka_resp_err_t get_error() const; | ||||
|     const std::vector<PartitionMetadata>& get_partitions() const; | ||||
| private: | ||||
|     std::string topic_; | ||||
|     rd_kafka_resp_err_t error_; | ||||
|     std::vector<PartitionMetadata> partitions_; | ||||
| }; | ||||
|  | ||||
| class BrokerMetadata { | ||||
| public: | ||||
|     BrokerMetadata(const rd_kafka_metadata_broker_t& broker); | ||||
|  | ||||
|     const std::string& get_host() const; | ||||
|     int32_t get_id() const; | ||||
|     uint16_t get_port() const; | ||||
| private: | ||||
|     const std::string host_; | ||||
|     int32_t id_; | ||||
|     uint16_t port_; | ||||
| }; | ||||
|  | ||||
| class Metadata { | ||||
| public: | ||||
|     Metadata(const rd_kafka_metadata_t* ptr); | ||||
|  | ||||
|     std::vector<BrokerMetadata> get_brokers() const; | ||||
|     std::vector<TopicMetadata> get_topics() const; | ||||
|     std::vector<TopicMetadata> get_topics(const std::unordered_set<std::string>& topics) const; | ||||
|     std::vector<TopicMetadata> get_topics(const std::string& prefix) const; | ||||
| private: | ||||
|     using HandlePtr = std::unique_ptr<const rd_kafka_metadata_t, decltype(&rd_kafka_metadata_destroy)>; | ||||
|  | ||||
|     HandlePtr handle_; | ||||
| }; | ||||
|  | ||||
| } // cppkafka | ||||
|  | ||||
| #endif // CPPKAFKA_METADATA_H | ||||
| @@ -8,6 +8,7 @@ set(SOURCES | ||||
|     message.cpp | ||||
|     topic_partition.cpp | ||||
|     topic_partition_list.cpp | ||||
|     metadata.cpp | ||||
|  | ||||
|     kafka_handle_base.cpp | ||||
|     producer.cpp | ||||
|   | ||||
| @@ -10,16 +10,13 @@ using std::chrono::milliseconds; | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| const milliseconds Consumer::DEFAULT_TIMEOUT{1000}; | ||||
|  | ||||
| void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, | ||||
|                                rd_kafka_topic_partition_list_t *partitions, void *opaque) { | ||||
|     TopicPartitionList list = TopicPartitionList::make_non_owning(partitions); | ||||
|     static_cast<Consumer*>(opaque)->handle_rebalance(error, list); | ||||
| } | ||||
|  | ||||
| Consumer::Consumer(Configuration config)  | ||||
| : timeout_ms_(DEFAULT_TIMEOUT) { | ||||
| Consumer::Consumer(Configuration config) { | ||||
|     char error_buffer[512]; | ||||
|     // Set ourselves as the opaque pointer | ||||
|     rd_kafka_conf_set_opaque(config.get_handle(), this); | ||||
| @@ -37,10 +34,6 @@ Consumer::~Consumer() { | ||||
|     close(); | ||||
| } | ||||
|  | ||||
| void Consumer::set_timeout(const milliseconds timeout) { | ||||
|     timeout_ms_ = timeout; | ||||
| } | ||||
|  | ||||
| void Consumer::set_assignment_callback(AssignmentCallback callback) { | ||||
|     assignment_callback_ = move(callback); | ||||
| } | ||||
| @@ -101,7 +94,7 @@ TopicPartitionList Consumer::get_offsets_committed(const TopicPartitionList& top | ||||
|     // Copy the list, let rd_kafka change it and return it | ||||
|     TopicPartitionList output = topic_partitions; | ||||
|     rd_kafka_resp_err_t error = rd_kafka_committed(get_handle(), output.get_handle(), | ||||
|                                                    timeout_ms_.count()); | ||||
|                                                    get_timeout().count()); | ||||
|     check_error(error); | ||||
|     return output; | ||||
| } | ||||
| @@ -131,7 +124,8 @@ TopicPartitionList Consumer::get_assignment() { | ||||
| } | ||||
|  | ||||
| Message Consumer::poll() { | ||||
|     rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); | ||||
|     rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),  | ||||
|                                                          get_timeout().count()); | ||||
|     return message ? Message(message) : Message(); | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -4,16 +4,19 @@ | ||||
| #include "topic.h" | ||||
|  | ||||
| using std::string; | ||||
| using std::chrono::milliseconds; | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| const milliseconds KafkaHandleBase::DEFAULT_TIMEOUT{1000}; | ||||
|  | ||||
| KafkaHandleBase::KafkaHandleBase()  | ||||
| : handle_(nullptr, nullptr) { | ||||
| : handle_(nullptr, nullptr), timeout_ms_(DEFAULT_TIMEOUT) { | ||||
|  | ||||
| } | ||||
|  | ||||
| KafkaHandleBase::KafkaHandleBase(rd_kafka_t* handle)  | ||||
| : handle_(handle, &rd_kafka_destroy) { | ||||
| : handle_(handle, &rd_kafka_destroy), timeout_ms_(DEFAULT_TIMEOUT) { | ||||
|  | ||||
| } | ||||
|  | ||||
| @@ -29,6 +32,10 @@ void KafkaHandleBase::resume_partitions(const TopicPartitionList& topic_partitio | ||||
|     check_error(error); | ||||
| } | ||||
|  | ||||
| void KafkaHandleBase::set_timeout(const milliseconds& timeout) { | ||||
|     timeout_ms_ = timeout; | ||||
| } | ||||
|  | ||||
| rd_kafka_t* KafkaHandleBase::get_handle() { | ||||
|     return handle_.get(); | ||||
| } | ||||
| @@ -41,6 +48,18 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration topicCon | ||||
|     return get_topic(name, topicConfig.get_handle()); | ||||
| } | ||||
|  | ||||
| Metadata KafkaHandleBase::get_metadata() { | ||||
|     return get_metadata(nullptr); | ||||
| } | ||||
|  | ||||
| Metadata KafkaHandleBase::get_metadata(const Topic& topic) { | ||||
|     return get_metadata(topic.get_handle()); | ||||
| } | ||||
|  | ||||
| milliseconds KafkaHandleBase::get_timeout() const { | ||||
|     return timeout_ms_; | ||||
| } | ||||
|  | ||||
| void KafkaHandleBase::set_handle(rd_kafka_t* handle) { | ||||
|     handle_ = HandlePtr(handle, &rd_kafka_destroy); | ||||
| } | ||||
| @@ -53,6 +72,14 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf | ||||
|     return Topic(topic); | ||||
| } | ||||
|  | ||||
| Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) { | ||||
|     const rd_kafka_metadata_t* metadata; | ||||
|     rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr,  | ||||
|                                                   topic_ptr, &metadata, timeout_ms_.count()); | ||||
|     check_error(error); | ||||
|     return Metadata(metadata); | ||||
| } | ||||
|  | ||||
| void KafkaHandleBase::check_error(rd_kafka_resp_err_t error) { | ||||
|     if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { | ||||
|         throw HandleException(error); | ||||
|   | ||||
							
								
								
									
										129
									
								
								src/metadata.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								src/metadata.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,129 @@ | ||||
| #include "metadata.h" | ||||
|  | ||||
| using std::string; | ||||
| using std::vector; | ||||
| using std::unordered_set; | ||||
|  | ||||
| namespace cppkafka { | ||||
|  | ||||
| // PartitionMetadata | ||||
|  | ||||
| PartitionMetadata::PartitionMetadata(const rd_kafka_metadata_partition& partition)  | ||||
| : id_(partition.id), error_(partition.err), leader_(partition.leader) { | ||||
|     for (int i = 0; i < partition.replica_cnt; ++i) { | ||||
|         replicas_.push_back(partition.replicas[i]); | ||||
|     } | ||||
|     for (int i = 0; i < partition.isr_cnt; ++i) { | ||||
|         isrs_.push_back(partition.isrs[i]); | ||||
|     } | ||||
| } | ||||
|  | ||||
| uint32_t PartitionMetadata::get_id() const { | ||||
|     return id_; | ||||
| } | ||||
|  | ||||
| rd_kafka_resp_err_t PartitionMetadata::get_error() const { | ||||
|     return error_; | ||||
| } | ||||
|  | ||||
| int32_t PartitionMetadata::get_leader() const { | ||||
|     return leader_; | ||||
| } | ||||
|  | ||||
| const vector<int32_t>& PartitionMetadata::get_replicas() const { | ||||
|     return replicas_; | ||||
| } | ||||
|  | ||||
| const vector<int32_t>& PartitionMetadata::get_in_sync_replica_brokers() const { | ||||
|     return isrs_; | ||||
| } | ||||
|  | ||||
| // TopicMetadata  | ||||
|  | ||||
| TopicMetadata::TopicMetadata(const rd_kafka_metadata_topic& topic)  | ||||
| : topic_(topic.topic), error_(topic.err) { | ||||
|     for (int i = 0; i < topic.partition_cnt; ++i) { | ||||
|         partitions_.emplace_back(topic.partitions[i]); | ||||
|     } | ||||
| } | ||||
|  | ||||
| const string& TopicMetadata::get_topic() const { | ||||
|     return topic_; | ||||
| } | ||||
|  | ||||
| rd_kafka_resp_err_t TopicMetadata::get_error() const { | ||||
|     return error_; | ||||
| } | ||||
|  | ||||
| const vector<PartitionMetadata>& TopicMetadata::get_partitions() const { | ||||
|     return partitions_; | ||||
| } | ||||
|  | ||||
| // BrokerMetadata | ||||
|  | ||||
| BrokerMetadata::BrokerMetadata(const rd_kafka_metadata_broker_t& broker)  | ||||
| : host_(broker.host), id_(broker.id), port_(static_cast<uint16_t>(broker.port)) { | ||||
|  | ||||
| } | ||||
|  | ||||
| const string& BrokerMetadata::get_host() const { | ||||
|     return host_; | ||||
| } | ||||
|  | ||||
| int32_t BrokerMetadata::get_id() const { | ||||
|     return id_; | ||||
| } | ||||
|  | ||||
| uint16_t BrokerMetadata::get_port() const { | ||||
|     return port_; | ||||
| } | ||||
|  | ||||
| // Metadata | ||||
|  | ||||
| Metadata::Metadata(const rd_kafka_metadata_t* ptr)  | ||||
| : handle_(ptr, &rd_kafka_metadata_destroy) { | ||||
|  | ||||
| } | ||||
|  | ||||
| vector<BrokerMetadata> Metadata::get_brokers() const { | ||||
|     vector<BrokerMetadata> output; | ||||
|     for (int i = 0; i < handle_->broker_cnt; ++i) { | ||||
|         const rd_kafka_metadata_broker_t& broker = handle_->brokers[i]; | ||||
|         output.emplace_back(broker); | ||||
|     } | ||||
|     return output; | ||||
| } | ||||
|  | ||||
| vector<TopicMetadata> Metadata::get_topics() const { | ||||
|     vector<TopicMetadata> output; | ||||
|     for (int i = 0; i < handle_->topic_cnt; ++i) { | ||||
|         const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; | ||||
|         output.emplace_back(topic); | ||||
|     } | ||||
|     return output; | ||||
| } | ||||
|  | ||||
| vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics) const { | ||||
|     vector<TopicMetadata> output; | ||||
|     for (int i = 0; i < handle_->topic_cnt; ++i) { | ||||
|         const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; | ||||
|         if (topics.count(topic.topic)) { | ||||
|             output.emplace_back(topic); | ||||
|         } | ||||
|     } | ||||
|     return output; | ||||
| } | ||||
|  | ||||
| vector<TopicMetadata> Metadata::get_topics(const string& prefix) const { | ||||
|     vector<TopicMetadata> output; | ||||
|     for (int i = 0; i < handle_->topic_cnt; ++i) { | ||||
|         const rd_kafka_metadata_topic_t& topic = handle_->topics[i]; | ||||
|         string topic_name = topic.topic;  | ||||
|         if (topic_name.find(prefix) == 0) { | ||||
|             output.emplace_back(topic); | ||||
|         } | ||||
|     } | ||||
|     return output; | ||||
| } | ||||
|  | ||||
| } // cppkafka | ||||
| @@ -15,3 +15,4 @@ endmacro() | ||||
| add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") | ||||
|  | ||||
| create_test(producer) | ||||
| create_test(kafka_handle_base) | ||||
|   | ||||
							
								
								
									
										97
									
								
								tests/kafka_handle_base_test.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										97
									
								
								tests/kafka_handle_base_test.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,97 @@ | ||||
| #include <set> | ||||
| #include <unordered_set> | ||||
| #include <gtest/gtest.h> | ||||
| #include "cppkafka/producer.h" | ||||
|  | ||||
| using std::vector; | ||||
| using std::set; | ||||
| using std::unordered_set; | ||||
| using std::string; | ||||
|  | ||||
| using namespace cppkafka; | ||||
|  | ||||
| class KafkaHandleBaseTest : public testing::Test { | ||||
| public: | ||||
|     static const string KAFKA_TOPIC; | ||||
|  | ||||
|     Configuration make_config() { | ||||
|         Configuration config; | ||||
|         config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); | ||||
|         return config; | ||||
|     } | ||||
|  | ||||
|     string get_kafka_host() { | ||||
|         string uri = KAFKA_TEST_INSTANCE; | ||||
|         size_t index = uri.find(':'); | ||||
|         if (index == string::npos) { | ||||
|             return uri; | ||||
|         } | ||||
|         else { | ||||
|             return uri.substr(0, index); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     uint16_t get_kafka_port() { | ||||
|         string uri = KAFKA_TEST_INSTANCE; | ||||
|         size_t index = uri.find(':'); | ||||
|         if (index == string::npos) { | ||||
|             return 9092; | ||||
|         } | ||||
|         else { | ||||
|             return stoul(uri.substr(index + 1)); | ||||
|         } | ||||
|     } | ||||
| }; | ||||
|  | ||||
| const string KafkaHandleBaseTest::KAFKA_TOPIC = "cppkafka_test1"; | ||||
|  | ||||
| TEST_F(KafkaHandleBaseTest, BrokersMetadata) { | ||||
|     Producer producer(make_config()); | ||||
|     Metadata metadata = producer.get_metadata(); | ||||
|  | ||||
|     vector<BrokerMetadata> brokers = metadata.get_brokers(); | ||||
|     ASSERT_EQ(1, brokers.size()); | ||||
|     const auto& broker = brokers[0]; | ||||
|     // TODO: resolve this | ||||
|     //EXPECT_EQ(get_kafka_host(), broker.get_host()); | ||||
|     EXPECT_EQ(get_kafka_port(), broker.get_port()); | ||||
| } | ||||
|  | ||||
| TEST_F(KafkaHandleBaseTest, TopicsMetadata) { | ||||
|     unordered_set<string> topic_names = { "cppkafka_test1", "cppkafka_test2" }; | ||||
|     size_t found_topics = 0; | ||||
|  | ||||
|     Producer producer(make_config()); | ||||
|     Metadata metadata = producer.get_metadata(); | ||||
|  | ||||
|     const vector<TopicMetadata>& topics = metadata.get_topics(); | ||||
|     ASSERT_GE(topics.size(), 2); | ||||
|  | ||||
|     for (const auto& topic : topics) { | ||||
|         if (topic_names.count(topic.get_topic()) == 1) { | ||||
|             const vector<PartitionMetadata>& partitions = topic.get_partitions(); | ||||
|             EXPECT_EQ(3, partitions.size()); | ||||
|             set<int32_t> expected_ids = { 0, 1, 2 }; | ||||
|             for (const PartitionMetadata& partition : partitions) { | ||||
|                 EXPECT_EQ(1, expected_ids.erase(partition.get_id())); | ||||
|                 for (int32_t replica : partition.get_replicas()) { | ||||
|                     EXPECT_EQ(0, replica); | ||||
|                 } | ||||
|                 for (int32_t isr : partition.get_in_sync_replica_brokers()) { | ||||
|                     EXPECT_EQ(0, isr); | ||||
|                 } | ||||
|             } | ||||
|             found_topics++; | ||||
|         } | ||||
|     } | ||||
|     EXPECT_EQ(topic_names.size(), found_topics); | ||||
|  | ||||
|     // Find by names | ||||
|     EXPECT_EQ(topic_names.size(), metadata.get_topics(topic_names).size()); | ||||
|     // Find by prefix | ||||
|     EXPECT_EQ(topic_names.size(), metadata.get_topics("cppkafka_").size()); | ||||
|  | ||||
|     // Now get the whole metadata only for this topic | ||||
|     Topic topic = producer.get_topic(KAFKA_TOPIC); | ||||
|     EXPECT_EQ(1, producer.get_metadata(topic).get_topics().size()); | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Matias Fontanini
					Matias Fontanini