Allow fetching metadata only for known topics

This commit is contained in:
Matias Fontanini
2017-04-15 16:53:37 -07:00
parent e26d7e7db2
commit 28f6253cfb
3 changed files with 10 additions and 7 deletions

View File

@@ -136,9 +136,11 @@ public:
/**
* \brief Gets metadata for brokers, topics, partitions, etc
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* This translates into a call to rd_kafka_metadata
*/
Metadata get_metadata() const;
Metadata get_metadata(bool all_topics = true) const;
/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
@@ -186,7 +188,7 @@ private:
using TopicConfigurationMap = std::unordered_map<std::string, TopicConfiguration>;
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(rd_kafka_topic_t* topic_ptr) const;
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
HandlePtr handle_;

View File

@@ -106,12 +106,12 @@ KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
return make_tuple(low, high);
}
Metadata KafkaHandleBase::get_metadata() const {
return get_metadata(nullptr);
Metadata KafkaHandleBase::get_metadata(bool all_topics) const {
return get_metadata(all_topics, nullptr);
}
TopicMetadata KafkaHandleBase::get_metadata(const Topic& topic) const {
Metadata md = get_metadata(topic.get_handle());
Metadata md = get_metadata(false, topic.get_handle());
auto topics = md.get_topics();
if (topics.empty()) {
throw Exception("Failed to find metadata for topic");
@@ -147,9 +147,9 @@ Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf
return Topic(topic);
}
Metadata KafkaHandleBase::get_metadata(rd_kafka_topic_t* topic_ptr) const {
Metadata KafkaHandleBase::get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const {
const rd_kafka_metadata_t* metadata;
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), topic_ptr != nullptr,
rd_kafka_resp_err_t error = rd_kafka_metadata(get_handle(), !!all_topics,
topic_ptr, &metadata, timeout_ms_.count());
check_error(error);
return Metadata(metadata);

View File

@@ -12,6 +12,7 @@ macro(create_test test_name)
add_executable(${test_name}_test EXCLUDE_FROM_ALL "${test_name}_test.cpp")
add_test(${test_name} ${test_name}_test)
add_dependencies(tests ${test_name}_test)
add_dependencies(${test_name}_test cppkafka)
endmacro()
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")