From 0cf8369ef9d128dc959f0c75358faadfa6532bcc Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 12 Jun 2016 10:43:38 -0700 Subject: [PATCH] Add zookeeper support --- CMakeLists.txt | 10 +++ include/cppkafka/config.h.in | 7 ++ include/cppkafka/configuration.h | 6 ++ include/cppkafka/kafka_handle_base.h | 8 ++ include/cppkafka/zookeeper/zookeeper_pool.h | 63 +++++++++++++ .../cppkafka/zookeeper/zookeeper_subscriber.h | 55 ++++++++++++ .../cppkafka/zookeeper/zookeeper_watcher.h | 19 +++- src/CMakeLists.txt | 5 +- src/configuration.cpp | 59 +++++++++--- src/kafka_handle_base.cpp | 30 +++++++ src/zookeeper/zookeeper_pool.cpp | 89 +++++++++++++++++++ src/zookeeper/zookeeper_subscriber.cpp | 25 ++++++ src/zookeeper/zookeeper_watcher.cpp | 40 ++++++++- tests/CMakeLists.txt | 5 +- tests/producer_test.cpp | 43 +++++++++ tests/zookeeper_watcher_test.cpp | 14 +++ 16 files changed, 457 insertions(+), 21 deletions(-) create mode 100644 include/cppkafka/config.h.in create mode 100644 include/cppkafka/zookeeper/zookeeper_pool.h create mode 100644 include/cppkafka/zookeeper/zookeeper_subscriber.h create mode 100644 src/zookeeper/zookeeper_pool.cpp create mode 100644 src/zookeeper/zookeeper_subscriber.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 504770c..b95e395 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,5 +27,15 @@ add_dependencies(cppkafka googletest) option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON) +if (ENABLE_ZOOKEEPER) + set(CPPKAFKA_HAVE_ZOOKEEPER ON) +endif() + +# Configuration file +configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h.in" + "${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h" +) + enable_testing() add_subdirectory(tests) \ No newline at end of file diff --git a/include/cppkafka/config.h.in b/include/cppkafka/config.h.in new file mode 100644 index 0000000..e13ea4e --- /dev/null +++ b/include/cppkafka/config.h.in @@ -0,0 +1,7 @@ +#ifndef CPPKAFKA_CONFIG_H +#define CPPKAFKA_CONFIG_H + +/* Define if the zookeeper extensions are enabled */ +#cmakedefine CPPKAFKA_HAVE_ZOOKEEPER + +#endif // CPPKAFKA_CONFIG_H diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index c524203..8d4f87b 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -33,6 +33,8 @@ #include #include #include +#include +#include #include #include #include @@ -79,6 +81,7 @@ public: void set_socket_callback(SocketCallback callback); void set_default_topic_configuration(boost::optional config); + bool has_property(const std::string& name) const; rd_kafka_conf_t* get_handle() const; std::string get(const std::string& name) const; const DeliveryReportCallback& get_delivery_report_callback() const; @@ -91,8 +94,10 @@ public: const boost::optional& get_default_topic_configuration() const; boost::optional& get_default_topic_configuration(); private: + static const std::unordered_set VALID_EXTENSIONS; using HandlePtr = ClonablePtr; + using PropertiesMap = std::unordered_map; Configuration(rd_kafka_conf_t* ptr); static HandlePtr make_handle(rd_kafka_conf_t* ptr); @@ -106,6 +111,7 @@ private: LogCallback log_callback_; StatsCallback stats_callback_; SocketCallback socket_callback_; + PropertiesMap extension_properties_; }; } // cppkafka diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index e688416..26a4520 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -42,6 +42,10 @@ #include "topic_partition_list.h" #include "topic_configuration.h" #include "configuration.h" +#include "config.h" +#ifdef CPPKAFKA_HAVE_ZOOKEEPER + #include "zookeeper/zookeeper_subscriber.h" +#endif // CPPKAFKA_HAVE_ZOOKEEPER namespace cppkafka { @@ -93,6 +97,10 @@ private: Configuration config_; TopicConfigurationMap topic_configurations_; std::mutex topic_configurations_mutex_; + #ifdef CPPKAFKA_HAVE_ZOOKEEPER + // This could be an optional but apparently move construction is only supported as of 1.56 + std::unique_ptr zookeeper_subscriber_; + #endif // CPPKAFKA_HAVE_ZOOKEEPER }; } // cppkafka diff --git a/include/cppkafka/zookeeper/zookeeper_pool.h b/include/cppkafka/zookeeper/zookeeper_pool.h new file mode 100644 index 0000000..e2a6d9a --- /dev/null +++ b/include/cppkafka/zookeeper/zookeeper_pool.h @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_ZOOKEEPER_POOL_H +#define CPPKAFKA_ZOOKEEPER_POOL_H + +#include +#include +#include +#include +#include "zookeeper/zookeeper_watcher.h" + +namespace cppkafka { + +class ZookeeperSubscriber; + +class ZookeeperPool { +public: + static ZookeeperPool& instance(); + + ZookeeperSubscriber subscribe(const std::string& endpoint, + std::chrono::milliseconds receive_timeout, + ZookeeperWatcher::WatcherCallback callback); + void unsubscribe(const ZookeeperSubscriber& subscriber); + std::string get_brokers(const std::string& endpoint); + + size_t get_subscriber_count(const std::string& endpoint) const; +private: + using WatchersMap = std::map; + + WatchersMap watchers_; + mutable std::mutex watchers_mutex_; +}; + +} // cppkafka + +#endif // CPPKAFKA_ZOOKEEPER_POOL_H diff --git a/include/cppkafka/zookeeper/zookeeper_subscriber.h b/include/cppkafka/zookeeper/zookeeper_subscriber.h new file mode 100644 index 0000000..8cc82be --- /dev/null +++ b/include/cppkafka/zookeeper/zookeeper_subscriber.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H +#define CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H + +#include + +namespace cppkafka { + +class ZookeeperSubscriber { +public: + ZookeeperSubscriber(std::string endpoint, std::string subscription_id); + ZookeeperSubscriber(ZookeeperSubscriber&&) = default; + ZookeeperSubscriber(const ZookeeperSubscriber&) = delete; + ZookeeperSubscriber& operator=(ZookeeperSubscriber&&); + ZookeeperSubscriber& operator=(const ZookeeperSubscriber&) = delete; + ~ZookeeperSubscriber(); + + const std::string& get_endpoint() const; + const std::string& get_subscription_id() const; +private: + std::string endpoint_; + std::string subscription_id_; +}; + +} // cppkafka + +#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H diff --git a/include/cppkafka/zookeeper/zookeeper_watcher.h b/include/cppkafka/zookeeper/zookeeper_watcher.h index 91167c4..94c8c79 100644 --- a/include/cppkafka/zookeeper/zookeeper_watcher.h +++ b/include/cppkafka/zookeeper/zookeeper_watcher.h @@ -33,6 +33,9 @@ #include #include #include +#include +#include +#include #include namespace cppkafka { @@ -41,21 +44,33 @@ class ZookeeperWatcher { public: static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT; + using WatcherCallback = std::function; + ZookeeperWatcher(const std::string& endpoint); ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout); + void setup_watcher(); + + std::string subscribe(WatcherCallback callback); + void unsubscribe(const std::string& id); std::string get_brokers(); + size_t get_subscriber_count() const; private: - using HandlePtr = std::unique_ptr; - static const std::string BROKERS_PATH; + using HandlePtr = std::unique_ptr; + using CallbackMap = std::map; + static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path, void* ctx); void handle_event(int type, int state, const char* path); + std::string generate_id(); HandlePtr handle_; + CallbackMap callbacks_; + mutable std::mutex callbacks_mutex_; + size_t id_counter_{0}; }; } // cppkafka diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 380a37f..9b73d19 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,8 +16,11 @@ set(SOURCES ) if (ENABLE_ZOOKEEPER) + add_definitions("-DCPPKAFKA_ENABLE_ZOOKEEPER=1") set(ZOOKEEPER_SOURCES - ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp) + ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscriber.cpp) # Add json library as include dir (header only) include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json) set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES}) diff --git a/src/configuration.cpp b/src/configuration.cpp index 191359c..6de5499 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -34,10 +34,12 @@ #include "message.h" #include "producer.h" #include "consumer.h" +#include "config.h" using std::string; using std::move; using std::vector; +using std::unordered_set; using boost::optional; @@ -45,6 +47,12 @@ using std::chrono::milliseconds; namespace cppkafka { +const unordered_set Configuration::VALID_EXTENSIONS = { + #ifdef CPPKAFKA_HAVE_ZOOKEEPER + "zookeeper", "zookeeper.receive.timeout.ms" + #endif // CPPKAFKA_HAVE_ZOOKEEPER +}; + // Callback proxies void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { @@ -125,12 +133,18 @@ Configuration::Configuration(rd_kafka_conf_t* ptr) } void Configuration::set(const string& name, const string& value) { - char error_buffer[512]; - rd_kafka_conf_res_t result; - result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, - sizeof(error_buffer)); - if (result != RD_KAFKA_CONF_OK) { - throw ConfigException(name, error_buffer); + if (VALID_EXTENSIONS.count(name)) { + // This is one of cppkafka's extensions + extension_properties_.emplace(name, value); + } + else { + char error_buffer[512]; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, + sizeof(error_buffer)); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigException(name, error_buffer); + } } } @@ -173,19 +187,38 @@ void Configuration::set_default_topic_configuration(optional default_topic_config_ = move(config); } +bool Configuration::has_property(const string& name) const { + if (VALID_EXTENSIONS.count(name)) { + return extension_properties_.count(name) == 1; + } + else { + size_t size = 0; + return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK; + } +} + rd_kafka_conf_t* Configuration::get_handle() const { return handle_.get(); } string Configuration::get(const string& name) const { - size_t size = 0; - auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); - if (result != RD_KAFKA_CONF_OK) { - throw ConfigOptionNotFound(name); + if (VALID_EXTENSIONS.count(name)) { + auto iter = extension_properties_.find(name); + if (iter == extension_properties_.end()) { + throw ConfigOptionNotFound(name); + } + return iter->second; + } + else { + size_t size = 0; + auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigOptionNotFound(name); + } + vector buffer(size); + rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); + return string(buffer.data()); } - vector buffer(size); - rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); - return string(buffer.data()); } const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 64ee30b..d327ca0 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -31,6 +31,9 @@ #include "exceptions.h" #include "topic.h" #include "topic_partition_list.h" +#ifdef CPPKAFKA_HAVE_ZOOKEEPER + #include "zookeeper/zookeeper_pool.h" +#endif // CPPKAFKA_HAVE_ZOOKEEPER using std::string; using std::vector; @@ -38,6 +41,7 @@ using std::move; using std::make_tuple; using std::lock_guard; using std::mutex; +using std::exception; using std::chrono::milliseconds; namespace cppkafka { @@ -120,6 +124,32 @@ const Configuration& KafkaHandleBase::get_configuration() const { void KafkaHandleBase::set_handle(rd_kafka_t* handle) { handle_ = HandlePtr(handle, &rd_kafka_destroy); + + #ifdef CPPKAFKA_HAVE_ZOOKEEPER + + if (config_.has_property("zookeeper")) { + string endpoint = config_.get("zookeeper"); + milliseconds timeout = ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT; + if (config_.has_property("zookeeper.receive.timeout.ms")) { + try { + timeout = milliseconds(stoi(config_.get("zookeeper.receive.timeout.ms"))); + } + catch (exception&) { + throw ZookeeperException("Invalid zookeeper receive timeout"); + } + } + auto& pool = ZookeeperPool::instance(); + auto callback = [&](const string& brokers) { + // Add the brokers and poll + rd_kafka_brokers_add(handle_.get(), brokers.data()); + rd_kafka_poll(handle_.get(), 10); + }; + ZookeeperSubscriber subscriber = pool.subscribe(endpoint, timeout, callback); + zookeeper_subscriber_.reset(new ZookeeperSubscriber(move(subscriber))); + callback(pool.get_brokers(endpoint)); + } + + #endif // CPPKAFKA_HAVE_ZOOKEEPER } Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) { diff --git a/src/zookeeper/zookeeper_pool.cpp b/src/zookeeper/zookeeper_pool.cpp new file mode 100644 index 0000000..ccde30c --- /dev/null +++ b/src/zookeeper/zookeeper_pool.cpp @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2016, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "zookeeper/zookeeper_pool.h" +#include "zookeeper/zookeeper_subscriber.h" +#include "exceptions.h" + +using std::string; +using std::mutex; +using std::lock_guard; +using std::forward_as_tuple; +using std::piecewise_construct; +using std::chrono::milliseconds; + +namespace cppkafka { + +ZookeeperPool& ZookeeperPool::instance() { + static ZookeeperPool the_instance; + return the_instance; +} + +ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint, + milliseconds receive_timeout, + ZookeeperWatcher::WatcherCallback callback) { + lock_guard _(watchers_mutex_); + auto iter = watchers_.find(endpoint); + if (iter == watchers_.end()) { + iter = watchers_.emplace(piecewise_construct, forward_as_tuple(endpoint), + forward_as_tuple(endpoint, receive_timeout)).first; + } + string id = iter->second.subscribe(move(callback)); + return ZookeeperSubscriber(endpoint, id); +} + +void ZookeeperPool::unsubscribe(const ZookeeperSubscriber& subscriber) { + lock_guard _(watchers_mutex_); + auto iter = watchers_.find(subscriber.get_endpoint()); + if (iter != watchers_.end()) { + iter->second.unsubscribe(subscriber.get_subscription_id()); + } +} + +string ZookeeperPool::get_brokers(const string& endpoint) { + lock_guard _(watchers_mutex_); + auto iter = watchers_.find(endpoint); + if (iter == watchers_.end()) { + throw ZookeeperException("No zookeeper watcher for given endpoint"); + } + return iter->second.get_brokers(); +} + +size_t ZookeeperPool::get_subscriber_count(const string& endpoint) const { + lock_guard _(watchers_mutex_); + auto iter = watchers_.find(endpoint); + if (iter == watchers_.end()) { + return 0; + } + else { + return iter->second.get_subscriber_count(); + } +} + +} // cppkafka diff --git a/src/zookeeper/zookeeper_subscriber.cpp b/src/zookeeper/zookeeper_subscriber.cpp new file mode 100644 index 0000000..275880b --- /dev/null +++ b/src/zookeeper/zookeeper_subscriber.cpp @@ -0,0 +1,25 @@ +#include "zookeeper/zookeeper_subscriber.h" +#include "zookeeper/zookeeper_pool.h" + +using std::string; + +namespace cppkafka { + +ZookeeperSubscriber::ZookeeperSubscriber(string endpoint, string subscription_id) +: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) { + +} + +ZookeeperSubscriber::~ZookeeperSubscriber() { + ZookeeperPool::instance().unsubscribe(*this); +} + +const string& ZookeeperSubscriber::get_endpoint() const { + return endpoint_; +} + +const string& ZookeeperSubscriber::get_subscription_id() const { + return subscription_id_; +} + +} // cppkafka diff --git a/src/zookeeper/zookeeper_watcher.cpp b/src/zookeeper/zookeeper_watcher.cpp index 08a4f02..bf57e56 100644 --- a/src/zookeeper/zookeeper_watcher.cpp +++ b/src/zookeeper/zookeeper_watcher.cpp @@ -29,15 +29,20 @@ #include #include +#include #include #include "zookeeper/zookeeper_watcher.h" #include "exceptions.h" using std::string; +using std::lock_guard; +using std::mutex; using std::unique_ptr; using std::ostringstream; using std::runtime_error; +using std::make_shared; using std::chrono::milliseconds; +using std::chrono::system_clock; using picojson::value; using picojson::object; @@ -52,7 +57,7 @@ ZookeeperWatcher::ZookeeperWatcher(const string& endpoint) } -ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_timeout) +ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_timeout) : handle_(nullptr, nullptr) { auto raw_handle = zookeeper_init(endpoint.data(), &ZookeeperWatcher::handle_event_proxy, receive_timeout.count(), nullptr, this, 0); @@ -63,10 +68,23 @@ ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_ handle_ = HandlePtr(raw_handle, &zookeeper_close); } +string ZookeeperWatcher::subscribe(WatcherCallback callback) { + lock_guard _(callbacks_mutex_); + string id = generate_id(); + callbacks_.emplace(id, move(callback)); + return id; +} + +void ZookeeperWatcher::unsubscribe(const string& id) { + lock_guard _(callbacks_mutex_); + callbacks_.erase(id); +} + string ZookeeperWatcher::get_brokers() { + auto handle = handle_.get(); using VectorPtr = unique_ptr; String_vector brokers; - if (zoo_get_children(handle_.get(), BROKERS_PATH.data(), 1, &brokers) != ZOK) { + if (zoo_get_children(handle, BROKERS_PATH.data(), 1, &brokers) != ZOK) { throw ZookeeperException("Failed to get broker list from zookeeper"); } // RAII up this pointer @@ -76,7 +94,7 @@ string ZookeeperWatcher::get_brokers() { char config_line[1024]; string path = "/brokers/ids/" + string(brokers.data[i]); int config_len = sizeof(config_line); - zoo_get(handle_.get(), path.data(), 0, config_line, &config_len, NULL); + zoo_get(handle, path.data(), 0, config_line, &config_len, NULL); if (config_len > 0) { config_line[config_len] = 0; @@ -102,6 +120,11 @@ string ZookeeperWatcher::get_brokers() { return oss.str(); } +size_t ZookeeperWatcher::get_subscriber_count() const { + lock_guard _(callbacks_mutex_); + return callbacks_.size(); +} + void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path, void* ctx) { auto self = static_cast(ctx); @@ -111,8 +134,17 @@ void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const void ZookeeperWatcher::handle_event(int type, int state, const char* path) { if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) { string brokers = get_brokers(); - // TODO: Callback! + lock_guard _(callbacks_mutex_); + for (const auto& callbackPair : callbacks_) { + callbackPair.second(brokers); + } } } +string ZookeeperWatcher::generate_id() { + ostringstream oss; + oss << id_counter_++ << "-" << system_clock::now().time_since_epoch().count(); + return oss.str(); +} + } // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 35432a4..2cdec40 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -18,6 +18,10 @@ endmacro() add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"") +if (ENABLE_ZOOKEEPER) + link_libraries(zookeeper_mt) +endif() + create_test(consumer) create_test(producer) create_test(kafka_handle_base) @@ -25,5 +29,4 @@ create_test(topic_partition_list) create_test(configuration) if (ENABLE_ZOOKEEPER) create_test(zookeeper_watcher) - target_link_libraries(zookeeper_watcher_test zookeeper_mt) endif() diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 86208f1..6f1e633 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -276,3 +276,46 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { EXPECT_EQ(partition, message.get_partition()); EXPECT_TRUE(callback_called); } + +#ifdef CPPKAFKA_HAVE_ZOOKEEPER + +#include "zookeeper/zookeeper_pool.h" + +TEST_F(ProducerTest, ConnectUsingZookeeper) { + int partition = 0; + + Configuration producer_config; + producer_config.set("zookeeper", ZOOKEEPER_TEST_INSTANCE); + + Configuration consumer_config = producer_config; + consumer_config.set("enable.auto.commit", false); + consumer_config.set("group.id", "producer_test"); + + // Create a consumer and assign this topic/partition + Consumer consumer(consumer_config); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 1, 1); + + // Now create a producer and produce a message + Producer producer(producer_config); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world! 2"; + string key = "such key"; + producer.produce(topic, partition, Buffer(payload.data(), payload.size()), + Buffer(key.data(), key.size())); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(1, messages.size()); + const auto& message = messages[0]; + EXPECT_EQ(payload, message.get_payload().as_string()); + EXPECT_EQ(key, message.get_key().as_string()); + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(partition, message.get_partition()); + EXPECT_EQ(0, message.get_error()); + + // We should have 2 watchers + EXPECT_EQ(2, ZookeeperPool::instance().get_subscriber_count(ZOOKEEPER_TEST_INSTANCE)); +} + +#endif // CPPKAFKA_HAVE_ZOOKEEPER diff --git a/tests/zookeeper_watcher_test.cpp b/tests/zookeeper_watcher_test.cpp index c654c34..b397f9b 100644 --- a/tests/zookeeper_watcher_test.cpp +++ b/tests/zookeeper_watcher_test.cpp @@ -1,5 +1,6 @@ #include #include "cppkafka/zookeeper/zookeeper_watcher.h" +#include "cppkafka/exceptions.h" using std::string; @@ -15,3 +16,16 @@ TEST_F(ZookeeperWatcherTest, GetBrokers) { string brokers = watcher.get_brokers(); EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers); } + +TEST_F(ZookeeperWatcherTest, InvalidEndpointThrows) { + ZookeeperWatcher watcher("127.0.0.1:1212"); + EXPECT_THROW(watcher.get_brokers(), ZookeeperException); +} + +TEST_F(ZookeeperWatcherTest, SubscribeUnsubscribe) { + ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE); + string id = watcher.subscribe([](const string&) { }); + EXPECT_EQ(1, watcher.get_subscriber_count()); + watcher.unsubscribe(id); + EXPECT_EQ(0, watcher.get_subscriber_count()); +}