Add KafkaHandleBase::add_brokers

This commit is contained in:
Matias Fontanini
2016-06-19 09:21:29 -07:00
parent 68229af8c7
commit 678ac88c9b
3 changed files with 16 additions and 2 deletions

View File

@@ -88,6 +88,15 @@ public:
*/ */
void set_timeout(const std::chrono::milliseconds& timeout); void set_timeout(const std::chrono::milliseconds& timeout);
/**
* \brief Adds one or more brokers to this handle's broker list
*
* This calls rd_kafka_brokers_add using the provided broker list.
*
* \param brokers The broker list endpoint string
*/
void add_brokers(const std::string& brokers);
/** /**
* \brief Queries the offset for the given topic/partition * \brief Queries the offset for the given topic/partition
* *

View File

@@ -74,6 +74,10 @@ void KafkaHandleBase::set_timeout(const milliseconds& timeout) {
timeout_ms_ = timeout; timeout_ms_ = timeout;
} }
void KafkaHandleBase::add_brokers(const string& brokers) {
rd_kafka_brokers_add(handle_.get(), brokers.data());
}
rd_kafka_t* KafkaHandleBase::get_handle() const { rd_kafka_t* KafkaHandleBase::get_handle() const {
return handle_.get(); return handle_.get();
} }

View File

@@ -47,7 +47,8 @@ public:
const string KafkaHandleBaseTest::KAFKA_TOPIC = "cppkafka_test1"; const string KafkaHandleBaseTest::KAFKA_TOPIC = "cppkafka_test1";
TEST_F(KafkaHandleBaseTest, BrokersMetadata) { TEST_F(KafkaHandleBaseTest, BrokersMetadata) {
Producer producer(make_config()); Producer producer({});
producer.add_brokers(KAFKA_TEST_INSTANCE);
Metadata metadata = producer.get_metadata(); Metadata metadata = producer.get_metadata();
vector<BrokerMetadata> brokers = metadata.get_brokers(); vector<BrokerMetadata> brokers = metadata.get_brokers();