diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 2072811..07e41f3 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -64,7 +64,7 @@ public: /** * \brief Pauses consumption/production from the given topic/partition list * - * This translates into a call to rd_kafka_pause_partitions + * This translates into a call to rd_kafka_pause_partitions * * \param topic_partitions The topic/partition list to pause consuming/producing from/to */ @@ -88,6 +88,15 @@ public: */ void set_timeout(const std::chrono::milliseconds& timeout); + /** + * \brief Adds one or more brokers to this handle's broker list + * + * This calls rd_kafka_brokers_add using the provided broker list. + * + * \param brokers The broker list endpoint string + */ + void add_brokers(const std::string& brokers); + /** * \brief Queries the offset for the given topic/partition * diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 0f515af..9148e74 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -74,6 +74,10 @@ void KafkaHandleBase::set_timeout(const milliseconds& timeout) { timeout_ms_ = timeout; } +void KafkaHandleBase::add_brokers(const string& brokers) { + rd_kafka_brokers_add(handle_.get(), brokers.data()); +} + rd_kafka_t* KafkaHandleBase::get_handle() const { return handle_.get(); } diff --git a/tests/kafka_handle_base_test.cpp b/tests/kafka_handle_base_test.cpp index b9da55d..c424af4 100644 --- a/tests/kafka_handle_base_test.cpp +++ b/tests/kafka_handle_base_test.cpp @@ -47,7 +47,8 @@ public: const string KafkaHandleBaseTest::KAFKA_TOPIC = "cppkafka_test1"; TEST_F(KafkaHandleBaseTest, BrokersMetadata) { - Producer producer(make_config()); + Producer producer({}); + producer.add_brokers(KAFKA_TEST_INSTANCE); Metadata metadata = producer.get_metadata(); vector brokers = metadata.get_brokers();