Allow metadata object to be non-owning (#73)

This commit is contained in:
Alex Damian
2018-05-28 14:30:01 -04:00
committed by Matias Fontanini
parent 841e632fbd
commit f543810515
3 changed files with 73 additions and 3 deletions

View File

@@ -140,7 +140,24 @@ private:
*/
class CPPKAFKA_API Metadata {
public:
Metadata(const rd_kafka_metadata_t* ptr);
/**
* \brief Creates a Metadata object that doesn't take ownership of the handle
*
* \param handle The handle to be used
*/
static Metadata make_non_owning(const rd_kafka_metadata_t* handle);
/**
* \brief Constructs an empty metadata object
*
* \remark Using any methods except Metadata::get_handle on an empty metadata is undefined behavior
*/
Metadata();
/**
* Constructor
*/
Metadata(const rd_kafka_metadata_t* handle);
/**
* Gets the brokers' metadata
@@ -165,9 +182,23 @@ public:
* \param prefix The prefix to be looked up
*/
std::vector<TopicMetadata> get_topics_prefixed(const std::string& prefix) const;
/**
* Indicates whether this metadata is valid (not null)
*/
explicit operator bool() const;
/**
* Returns the rdkakfa handle
*/
const rd_kafka_metadata_t* get_handle() const;
private:
using HandlePtr = std::unique_ptr<const rd_kafka_metadata_t, decltype(&rd_kafka_metadata_destroy)>;
struct NonOwningTag { };
Metadata(const rd_kafka_metadata_t* handle, NonOwningTag);
HandlePtr handle_;
};

View File

@@ -83,6 +83,13 @@ public:
*/
bool is_partition_available(int partition) const;
/**
* Indicates whether this topic is valid (not null)
*/
explicit operator bool() const {
return handle_ != nullptr;
}
/**
* Returns the rdkakfa handle
*/

View File

@@ -27,6 +27,7 @@
*
*/
#include <assert.h>
#include "metadata.h"
#include "error.h"
@@ -110,12 +111,31 @@ uint16_t BrokerMetadata::get_port() const {
// Metadata
Metadata::Metadata(const rd_kafka_metadata_t* ptr)
: handle_(ptr, &rd_kafka_metadata_destroy) {
void dummy_metadata_destroyer(const rd_kafka_metadata_t*) {
}
Metadata Metadata::make_non_owning(const rd_kafka_metadata_t* handle) {
return Metadata(handle, NonOwningTag{});
}
Metadata::Metadata()
: handle_(nullptr, nullptr) {
}
Metadata::Metadata(const rd_kafka_metadata_t* handle)
: handle_(handle, &rd_kafka_metadata_destroy) {
}
Metadata::Metadata(const rd_kafka_metadata_t* handle, NonOwningTag)
: handle_(handle, &dummy_metadata_destroyer) {
}
vector<BrokerMetadata> Metadata::get_brokers() const {
assert(handle_);
vector<BrokerMetadata> output;
for (int i = 0; i < handle_->broker_cnt; ++i) {
const rd_kafka_metadata_broker_t& broker = handle_->brokers[i];
@@ -125,6 +145,7 @@ vector<BrokerMetadata> Metadata::get_brokers() const {
}
vector<TopicMetadata> Metadata::get_topics() const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
@@ -134,6 +155,7 @@ vector<TopicMetadata> Metadata::get_topics() const {
}
vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics) const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
@@ -145,6 +167,7 @@ vector<TopicMetadata> Metadata::get_topics(const unordered_set<string>& topics)
}
vector<TopicMetadata> Metadata::get_topics_prefixed(const string& prefix) const {
assert(handle_);
vector<TopicMetadata> output;
for (int i = 0; i < handle_->topic_cnt; ++i) {
const rd_kafka_metadata_topic_t& topic = handle_->topics[i];
@@ -156,4 +179,13 @@ vector<TopicMetadata> Metadata::get_topics_prefixed(const string& prefix) const
return output;
}
Metadata::operator bool() const {
return handle_ != nullptr;
}
const rd_kafka_metadata_t* Metadata::get_handle() const {
return handle_.get();
}
} // cppkafka