diff --git a/.gitignore b/.gitignore index 603b88a..378eac2 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1 @@ -include/cppkafka/config.h +build diff --git a/CMakeLists.txt b/CMakeLists.txt index 779aa7b..50d8fef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,22 +12,6 @@ include_directories(${Boost_INCLUDE_DIRS}) find_package(RdKafka REQUIRED) -option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON) -if (ENABLE_ZOOKEEPER) - find_package(Zookeeper REQUIRED) - - message(STATUS "Found zookeeper library") - include_directories(${ZOOKEEPER_INCLUDE_DIR}) - - set(CPPKAFKA_HAVE_ZOOKEEPER ON) -endif() - -# Configuration file -configure_file( - "${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h.in" - "${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h" -) - add_subdirectory(src) add_subdirectory(include) diff --git a/README.md b/README.md index 0770ba3..5ab2f5e 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,6 @@ High level C++ wrapper for _rdkafka_ * _cppkafka_ provides an API to produce messages as well as consuming messages but the latter is only supported via the high level consumer API (introduced on kafka 0.9). Therefore, you need **rdkakfa >= 0.9** in order to use it. Other wrapped functionalities are also provided, like fetching metadata, offsets, etc. -* _cppkafka_ adds smooth **zookeeper** support: you only need to add a _"zookeeper"_ attribute to your configuration object and your producer/consumer will automatically load the initial broker list from it while also listening for broker list updates. - # It's simple! _cppkafka_'s API is simple. For example, this code creates a producer writes a message into some partition: @@ -26,7 +24,7 @@ using namespace std; int main() { // Create the config Configuration config; - config.set("zookeeper", "127.0.0.1:2181"); + config.set("metadata.broker.list", "127.0.0.1:2181"); // Create the producer Producer producer(config); // Get the topic we'll write into @@ -42,8 +40,7 @@ int main() { In order to compile _cppkafka_ you need: * _rdkafka >= 0.9_ -* Optionally _libzookeeper_. This is used by default, but you can disable it invoking cmake -using the `-DENABLE_ZOOKEEPER=0` argument. +* _CMake_ * A compiler with good C++11 support. This was tested successfully on _g++ 4.8.3_. * The boost library. _cppkafka_ only requires boost.optional, which is a header only library, so this doesn't add any additional runtime dependencies. @@ -63,4 +60,3 @@ If you want to use _cppkafka_, you'll need to link your application with: * _cppkafka_ * _rdkafka_ -* _zookeeper_ if you enabled it during build time (it's on by default). diff --git a/cmake/FindZookeeper.cmake b/cmake/FindZookeeper.cmake deleted file mode 100644 index cb0cb9b..0000000 --- a/cmake/FindZookeeper.cmake +++ /dev/null @@ -1,27 +0,0 @@ -find_path(ZOOKEEPER_ROOT_DIR - NAMES include/zookeeper/zookeeper.h -) - -find_path(ZOOKEEPER_INCLUDE_DIR - NAMES zookeeper/zookeeper.h - HINTS ${ZOOKEEPER_ROOT_DIR}/include -) - -set (HINT_DIR ${ZOOKEEPER_ROOT_DIR}/lib) - -find_library(ZOOKEEPER_LIBRARY - NAMES zookeeper_mt zookeeper_st - HINTS ${HINT_DIR} -) - -include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(ZOOKEEPER DEFAULT_MSG - ZOOKEEPER_LIBRARY - ZOOKEEPER_INCLUDE_DIR -) - -mark_as_advanced( - ZOOKEEPER_ROOT_DIR - ZOOKEEPER_INCLUDE_DIR - ZOOKEEPER_LIBRARY -) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 5e690c3..1f58205 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,12 +1,12 @@ find_package(Boost COMPONENTS program_options) if (Boost_PROGRAM_OPTIONS_FOUND) - link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY} ${ZOOKEEPER_LIBRARY}) + link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY}) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) - add_executable(kafka_producer kafka_producer.cpp) - add_executable(kafka_consumer kafka_consumer.cpp) + add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp) + add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp) add_custom_target(examples DEPENDS kafka_producer kafka_consumer) else() message(STATUS "Disabling examples since boost.program_options was not found") diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp index ee206a8..46c7f3d 100644 --- a/examples/kafka_consumer.cpp +++ b/examples/kafka_consumer.cpp @@ -16,26 +16,22 @@ using cppkafka::Message; namespace po = boost::program_options; -#ifndef CPPKAFKA_HAVE_ZOOKEEPER - static_assert(false, "Examples require the zookeeper extension"); -#endif - bool running = true; int main(int argc, char* argv[]) { - string zookeeper_endpoint; + string brokers; string topic_name; string group_id; po::options_description options("Options"); options.add_options() - ("help,h", "produce this help message") - ("zookeeper", po::value(&zookeeper_endpoint)->required(), - "the zookeeper endpoint") - ("topic", po::value(&topic_name)->required(), - "the topic in which to write to") - ("group-id", po::value(&group_id)->required(), - "the consumer group id") + ("help,h", "produce this help message") + ("brokers", po::value(&brokers)->required(), + "the kafka broker list") + ("topic", po::value(&topic_name)->required(), + "the topic in which to write to") + ("group-id", po::value(&group_id)->required(), + "the consumer group id") ; po::variables_map vm; @@ -56,7 +52,7 @@ int main(int argc, char* argv[]) { // Construct the configuration Configuration config; - config.set("zookeeper", zookeeper_endpoint); + config.set("metadata.broker.list", brokers); config.set("group.id", group_id); // Disable auto commit config.set("enable.auto.commit", false); @@ -67,6 +63,8 @@ int main(int argc, char* argv[]) { // Subscribe to the topic consumer.subscribe({ topic_name }); + cout << "Consuming messages from topic " << topic_name << endl; + // Now read lines and write them into kafka while (running) { // Try to consume a message diff --git a/examples/kafka_producer.cpp b/examples/kafka_producer.cpp index ed2f3fd..ebbee8d 100644 --- a/examples/kafka_producer.cpp +++ b/examples/kafka_producer.cpp @@ -18,20 +18,16 @@ using cppkafka::Partition; namespace po = boost::program_options; -#ifndef CPPKAFKA_HAVE_ZOOKEEPER - static_assert(false, "Examples require the zookeeper extension"); -#endif - int main(int argc, char* argv[]) { - string zookeeper_endpoint; + string brokers; string topic_name; int partition_value = -1; po::options_description options("Options"); options.add_options() ("help,h", "produce this help message") - ("zookeeper", po::value(&zookeeper_endpoint)->required(), - "the zookeeper endpoint") + ("brokers", po::value(&brokers)->required(), + "the kafka broker list") ("topic", po::value(&topic_name)->required(), "the topic in which to write to") ("partition", po::value(&partition_value), @@ -60,13 +56,15 @@ int main(int argc, char* argv[]) { // Construct the configuration Configuration config; - config.set("zookeeper", zookeeper_endpoint); + config.set("metadata.broker.list", brokers); // Create the producer Producer producer(config); // Get the topic we want Topic topic = producer.get_topic(topic_name); + cout << "Producing messages into topic " << topic_name << endl; + // Now read lines and write them into kafka string line; while (getline(cin, line)) { diff --git a/include/cppkafka/CMakeLists.txt b/include/cppkafka/CMakeLists.txt index 010e084..2f8f6f2 100644 --- a/include/cppkafka/CMakeLists.txt +++ b/include/cppkafka/CMakeLists.txt @@ -4,4 +4,3 @@ install( DESTINATION include/cppkafka COMPONENT Headers ) -add_subdirectory(zookeeper) diff --git a/include/cppkafka/config.h.in b/include/cppkafka/config.h.in deleted file mode 100644 index e13ea4e..0000000 --- a/include/cppkafka/config.h.in +++ /dev/null @@ -1,7 +0,0 @@ -#ifndef CPPKAFKA_CONFIG_H -#define CPPKAFKA_CONFIG_H - -/* Define if the zookeeper extensions are enabled */ -#cmakedefine CPPKAFKA_HAVE_ZOOKEEPER - -#endif // CPPKAFKA_CONFIG_H diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index d59cb64..0e24e6d 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -87,16 +87,6 @@ public: * * This will call rd_kafka_conf_set under the hood. * - * If the zookeeper extension is enabled (cppkafka is build with -DENABLE_ZOOKEEPER=1), then - * this accepts 2 extra attribute names: - * - * - "zookeeper" which indicates the zookeeper endpoint to connect to - * - "zookeeper.receive.timeout.ms" which indicates the zookeeper receive timeout - * - * When the "zookeeper" attribute is used, a Consumer or Producer constructed using this - * configuration will use zookeeper under the hood to get the broker list and watch for - * broker updates. - * * \param name The name of the attribute * \param value The value of the attribute */ @@ -204,10 +194,8 @@ public: */ boost::optional& get_default_topic_configuration(); private: - static const std::unordered_set VALID_EXTENSIONS; using HandlePtr = ClonablePtr; - using PropertiesMap = std::unordered_map; Configuration(rd_kafka_conf_t* ptr); static HandlePtr make_handle(rd_kafka_conf_t* ptr); @@ -221,7 +209,6 @@ private: LogCallback log_callback_; StatsCallback stats_callback_; SocketCallback socket_callback_; - PropertiesMap extension_properties_; }; } // cppkafka diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 5211d2d..56230ca 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -52,10 +52,9 @@ class TopicConfiguration; * Semi-simple code showing how to use this class * * \code - * // Create a configuration and set the group.id and zookeeper fields + * // Create a configuration and set the group.id and broker list fields * Configuration config; - * // This is only valid when using the zookeeper extension - * config.set("zookeeper", "127.0.0.1:2181"); + * config.set("metadata.broker.list", "127.0.0.1:9092"); * config.set("group.id", "foo"); * * // Create a consumer diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index 4b912d6..3088e02 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -76,14 +76,6 @@ private: rd_kafka_resp_err_t error_code_; }; -/** - * An exception when using zookeeper - */ -class ZookeeperException : public Exception { -public: - using Exception::Exception; -}; - } // cppkafka #endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index e45d348..0b71276 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -42,10 +42,6 @@ #include "topic_partition_list.h" #include "topic_configuration.h" #include "configuration.h" -#include "config.h" -#ifdef CPPKAFKA_HAVE_ZOOKEEPER - #include "zookeeper/zookeeper_subscription.h" -#endif // CPPKAFKA_HAVE_ZOOKEEPER namespace cppkafka { @@ -180,10 +176,6 @@ private: Configuration config_; TopicConfigurationMap topic_configurations_; std::mutex topic_configurations_mutex_; - #ifdef CPPKAFKA_HAVE_ZOOKEEPER - // This could be an optional but apparently move construction is only supported as of 1.56 - std::unique_ptr zookeeper_subscription_; - #endif // CPPKAFKA_HAVE_ZOOKEEPER }; } // cppkafka diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 834e931..1125ca4 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -55,9 +55,9 @@ class TopicConfiguration; * In order to produce messages you could do something like: * * \code - * // Use the zookeeper extension + * // Set the broker list * Configuration config; - * config.set("zookeeper", "127.0.0.1:2181"); + * config.set("metadata.broker.list", "127.0.0.1:9092"); * * // Create a producer * Producer producer(config); diff --git a/include/cppkafka/zookeeper/CMakeLists.txt b/include/cppkafka/zookeeper/CMakeLists.txt deleted file mode 100644 index 9041bee..0000000 --- a/include/cppkafka/zookeeper/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -file(GLOB INCLUDE_FILES "*.h") -install( - FILES ${INCLUDE_FILES} - DESTINATION include/cppkafka/zookeeper - COMPONENT Headers -) diff --git a/include/cppkafka/zookeeper/zookeeper_pool.h b/include/cppkafka/zookeeper/zookeeper_pool.h deleted file mode 100644 index 37fb0d4..0000000 --- a/include/cppkafka/zookeeper/zookeeper_pool.h +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (c) 2016, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef CPPKAFKA_ZOOKEEPER_POOL_H -#define CPPKAFKA_ZOOKEEPER_POOL_H - -#include -#include -#include -#include -#include "zookeeper/zookeeper_watcher.h" - -namespace cppkafka { - -class ZookeeperSubscription; - -/** - * \brief Pool of zookeeper handles - * - * This class is used internally by cppkafka. - */ -class ZookeeperPool { -public: - /** - * Get singleton instance - */ - static ZookeeperPool& instance(); - - /** - * Subscribe to the given endpoint - * - * \param endpoint The zookeeper endpoint to subscribe to - * \param receive_timeout The zookeeper receive timeout - * \param callback The callback to be executed on updates - * - * \return A ZookeeperSubscription that will auto-unsubscribe upon destruction - */ - ZookeeperSubscription subscribe(const std::string& endpoint, - std::chrono::milliseconds receive_timeout, - ZookeeperWatcher::WatcherCallback callback); - - /** - * Unsubscribes from a previous subscription - * - * \param subscriber The subscriber return by a previous call to ZookeeperPool::subscribe - */ - void unsubscribe(const ZookeeperSubscription& subscriber); - - /** - * \brief Gets the broker list for the given zookeeper endpoint - * - * Requires having previously called subscribe for this endpoint at least once. - * - * \param endpoint The endpoint for which to get the broker list - */ - std::string get_brokers(const std::string& endpoint); - - /** - * Gets the amount of subscribers for the given zookeeper endpoint - */ - size_t get_subscriber_count(const std::string& endpoint) const; -private: - using WatchersMap = std::map; - - WatchersMap watchers_; - mutable std::mutex watchers_mutex_; -}; - -} // cppkafka - -#endif // CPPKAFKA_ZOOKEEPER_POOL_H diff --git a/include/cppkafka/zookeeper/zookeeper_subscription.h b/include/cppkafka/zookeeper/zookeeper_subscription.h deleted file mode 100644 index c4df39f..0000000 --- a/include/cppkafka/zookeeper/zookeeper_subscription.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2016, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H -#define CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H - -#include - -namespace cppkafka { - -/** - * \cond - */ -class ZookeeperSubscription { -public: - ZookeeperSubscription(std::string endpoint, std::string subscription_id); - ZookeeperSubscription(ZookeeperSubscription&&) = default; - ZookeeperSubscription(const ZookeeperSubscription&) = delete; - ZookeeperSubscription& operator=(ZookeeperSubscription&&); - ZookeeperSubscription& operator=(const ZookeeperSubscription&) = delete; - ~ZookeeperSubscription(); - - const std::string& get_endpoint() const; - - const std::string& get_subscription_id() const; -private: - std::string endpoint_; - std::string subscription_id_; -}; - -/** - * \endcond - */ - -} // cppkafka - -#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H diff --git a/include/cppkafka/zookeeper/zookeeper_watcher.h b/include/cppkafka/zookeeper/zookeeper_watcher.h deleted file mode 100644 index 8a722d8..0000000 --- a/include/cppkafka/zookeeper/zookeeper_watcher.h +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright (c) 2016, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef CPPKAFKA_ZOOKEEPER_WATCHER_H -#define CPPKAFKA_ZOOKEEPER_WATCHER_H - -#include -#include -#include -#include -#include -#include -#include - -namespace cppkafka { - -/** - * \cond - */ -class ZookeeperWatcher { -public: - static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT; - - using WatcherCallback = std::function; - - ZookeeperWatcher(const std::string& endpoint); - ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout); - - void setup_watcher(); - - std::string subscribe(WatcherCallback callback); - void unsubscribe(const std::string& id); - - std::string get_brokers(); - size_t get_subscriber_count() const; -private: - static const std::string BROKERS_PATH; - - using HandlePtr = std::unique_ptr; - using CallbackMap = std::map; - - static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path, - void* ctx); - void handle_event(int type, int state, const char* path); - std::string generate_id(); - - HandlePtr handle_; - CallbackMap callbacks_; - mutable std::mutex callbacks_mutex_; - size_t id_counter_{0}; -}; - -/** - * \endcond - */ - -} // cppkafka - -#endif // CPPKAFKA_ZOOKEEPER_WATCHER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a33acfa..adc904b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,17 +15,6 @@ set(SOURCES consumer.cpp ) -if (ENABLE_ZOOKEEPER) - add_definitions("-DCPPKAFKA_ENABLE_ZOOKEEPER=1") - set(ZOOKEEPER_SOURCES - ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscription.cpp) - # Add json library as include dir (header only) - include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json) - set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES}) -endif() - add_library(cppkafka ${SOURCES}) install( diff --git a/src/configuration.cpp b/src/configuration.cpp index 6de5499..51b8e1b 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -34,7 +34,6 @@ #include "message.h" #include "producer.h" #include "consumer.h" -#include "config.h" using std::string; using std::move; @@ -47,12 +46,6 @@ using std::chrono::milliseconds; namespace cppkafka { -const unordered_set Configuration::VALID_EXTENSIONS = { - #ifdef CPPKAFKA_HAVE_ZOOKEEPER - "zookeeper", "zookeeper.receive.timeout.ms" - #endif // CPPKAFKA_HAVE_ZOOKEEPER -}; - // Callback proxies void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) { @@ -133,18 +126,12 @@ Configuration::Configuration(rd_kafka_conf_t* ptr) } void Configuration::set(const string& name, const string& value) { - if (VALID_EXTENSIONS.count(name)) { - // This is one of cppkafka's extensions - extension_properties_.emplace(name, value); - } - else { - char error_buffer[512]; - rd_kafka_conf_res_t result; - result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, - sizeof(error_buffer)); - if (result != RD_KAFKA_CONF_OK) { - throw ConfigException(name, error_buffer); - } + char error_buffer[512]; + rd_kafka_conf_res_t result; + result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer, + sizeof(error_buffer)); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigException(name, error_buffer); } } @@ -188,13 +175,8 @@ void Configuration::set_default_topic_configuration(optional } bool Configuration::has_property(const string& name) const { - if (VALID_EXTENSIONS.count(name)) { - return extension_properties_.count(name) == 1; - } - else { - size_t size = 0; - return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK; - } + size_t size = 0; + return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK; } rd_kafka_conf_t* Configuration::get_handle() const { @@ -202,23 +184,14 @@ rd_kafka_conf_t* Configuration::get_handle() const { } string Configuration::get(const string& name) const { - if (VALID_EXTENSIONS.count(name)) { - auto iter = extension_properties_.find(name); - if (iter == extension_properties_.end()) { - throw ConfigOptionNotFound(name); - } - return iter->second; - } - else { - size_t size = 0; - auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); - if (result != RD_KAFKA_CONF_OK) { - throw ConfigOptionNotFound(name); - } - vector buffer(size); - rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); - return string(buffer.data()); + size_t size = 0; + auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size); + if (result != RD_KAFKA_CONF_OK) { + throw ConfigOptionNotFound(name); } + vector buffer(size); + rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size); + return string(buffer.data()); } const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 234dfc8..f7a0c8d 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -31,9 +31,6 @@ #include "exceptions.h" #include "topic.h" #include "topic_partition_list.h" -#ifdef CPPKAFKA_HAVE_ZOOKEEPER - #include "zookeeper/zookeeper_pool.h" -#endif // CPPKAFKA_HAVE_ZOOKEEPER using std::string; using std::vector; @@ -126,32 +123,6 @@ const Configuration& KafkaHandleBase::get_configuration() const { void KafkaHandleBase::set_handle(rd_kafka_t* handle) { handle_ = HandlePtr(handle, &rd_kafka_destroy); - - #ifdef CPPKAFKA_HAVE_ZOOKEEPER - - if (config_.has_property("zookeeper")) { - string endpoint = config_.get("zookeeper"); - milliseconds timeout = ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT; - if (config_.has_property("zookeeper.receive.timeout.ms")) { - try { - timeout = milliseconds(stoi(config_.get("zookeeper.receive.timeout.ms"))); - } - catch (exception&) { - throw ZookeeperException("Invalid zookeeper receive timeout"); - } - } - auto& pool = ZookeeperPool::instance(); - auto callback = [&](const string& brokers) { - // Add the brokers and poll - rd_kafka_brokers_add(handle_.get(), brokers.data()); - rd_kafka_poll(handle_.get(), 10); - }; - ZookeeperSubscription subscriber = pool.subscribe(endpoint, timeout, callback); - zookeeper_subscription_.reset(new ZookeeperSubscription(move(subscriber))); - callback(pool.get_brokers(endpoint)); - } - - #endif // CPPKAFKA_HAVE_ZOOKEEPER } Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) { diff --git a/src/zookeeper/zookeeper_pool.cpp b/src/zookeeper/zookeeper_pool.cpp deleted file mode 100644 index 9aa4174..0000000 --- a/src/zookeeper/zookeeper_pool.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright (c) 2016, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "zookeeper/zookeeper_pool.h" -#include "zookeeper/zookeeper_subscription.h" -#include "exceptions.h" - -using std::string; -using std::mutex; -using std::lock_guard; -using std::forward_as_tuple; -using std::piecewise_construct; -using std::chrono::milliseconds; - -namespace cppkafka { - -ZookeeperPool& ZookeeperPool::instance() { - static ZookeeperPool the_instance; - return the_instance; -} - -ZookeeperSubscription ZookeeperPool::subscribe(const string& endpoint, - milliseconds receive_timeout, - ZookeeperWatcher::WatcherCallback callback) { - lock_guard _(watchers_mutex_); - auto iter = watchers_.find(endpoint); - if (iter == watchers_.end()) { - iter = watchers_.emplace(piecewise_construct, forward_as_tuple(endpoint), - forward_as_tuple(endpoint, receive_timeout)).first; - } - string id = iter->second.subscribe(move(callback)); - return ZookeeperSubscription(endpoint, id); -} - -void ZookeeperPool::unsubscribe(const ZookeeperSubscription& subscriber) { - lock_guard _(watchers_mutex_); - auto iter = watchers_.find(subscriber.get_endpoint()); - if (iter != watchers_.end()) { - iter->second.unsubscribe(subscriber.get_subscription_id()); - } -} - -string ZookeeperPool::get_brokers(const string& endpoint) { - lock_guard _(watchers_mutex_); - auto iter = watchers_.find(endpoint); - if (iter == watchers_.end()) { - throw ZookeeperException("No zookeeper watcher for given endpoint"); - } - return iter->second.get_brokers(); -} - -size_t ZookeeperPool::get_subscriber_count(const string& endpoint) const { - lock_guard _(watchers_mutex_); - auto iter = watchers_.find(endpoint); - if (iter == watchers_.end()) { - return 0; - } - else { - return iter->second.get_subscriber_count(); - } -} - -} // cppkafka diff --git a/src/zookeeper/zookeeper_subscription.cpp b/src/zookeeper/zookeeper_subscription.cpp deleted file mode 100644 index 68fee1a..0000000 --- a/src/zookeeper/zookeeper_subscription.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include "zookeeper/zookeeper_subscription.h" -#include "zookeeper/zookeeper_pool.h" - -using std::string; - -namespace cppkafka { - -ZookeeperSubscription::ZookeeperSubscription(string endpoint, string subscription_id) -: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) { - -} - -ZookeeperSubscription::~ZookeeperSubscription() { - ZookeeperPool::instance().unsubscribe(*this); -} - -const string& ZookeeperSubscription::get_endpoint() const { - return endpoint_; -} - -const string& ZookeeperSubscription::get_subscription_id() const { - return subscription_id_; -} - -} // cppkafka diff --git a/src/zookeeper/zookeeper_watcher.cpp b/src/zookeeper/zookeeper_watcher.cpp deleted file mode 100644 index bf57e56..0000000 --- a/src/zookeeper/zookeeper_watcher.cpp +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Copyright (c) 2016, Matias Fontanini - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include -#include -#include -#include -#include "zookeeper/zookeeper_watcher.h" -#include "exceptions.h" - -using std::string; -using std::lock_guard; -using std::mutex; -using std::unique_ptr; -using std::ostringstream; -using std::runtime_error; -using std::make_shared; -using std::chrono::milliseconds; -using std::chrono::system_clock; - -using picojson::value; -using picojson::object; - -namespace cppkafka { - -const milliseconds ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT{10000}; -const string ZookeeperWatcher::BROKERS_PATH = "/brokers/ids"; - -ZookeeperWatcher::ZookeeperWatcher(const string& endpoint) -: ZookeeperWatcher(endpoint, DEFAULT_RECEIVE_TIMEOUT) { - -} - -ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_timeout) -: handle_(nullptr, nullptr) { - auto raw_handle = zookeeper_init(endpoint.data(), &ZookeeperWatcher::handle_event_proxy, - receive_timeout.count(), nullptr, this, 0); - if (!raw_handle) { - // TODO: make this a proper exception - throw runtime_error("Failed to create zookeeper handle"); - } - handle_ = HandlePtr(raw_handle, &zookeeper_close); -} - -string ZookeeperWatcher::subscribe(WatcherCallback callback) { - lock_guard _(callbacks_mutex_); - string id = generate_id(); - callbacks_.emplace(id, move(callback)); - return id; -} - -void ZookeeperWatcher::unsubscribe(const string& id) { - lock_guard _(callbacks_mutex_); - callbacks_.erase(id); -} - -string ZookeeperWatcher::get_brokers() { - auto handle = handle_.get(); - using VectorPtr = unique_ptr; - String_vector brokers; - if (zoo_get_children(handle, BROKERS_PATH.data(), 1, &brokers) != ZOK) { - throw ZookeeperException("Failed to get broker list from zookeeper"); - } - // RAII up this pointer - ostringstream oss; - VectorPtr _(&brokers, &deallocate_String_vector); - for (int i = 0; i < brokers.count; i++) { - char config_line[1024]; - string path = "/brokers/ids/" + string(brokers.data[i]); - int config_len = sizeof(config_line); - zoo_get(handle, path.data(), 0, config_line, &config_len, NULL); - if (config_len > 0) { - config_line[config_len] = 0; - - value root; - string error = picojson::parse(root, config_line); - if (!error.empty() || !root.is()) { - throw ZookeeperException("Failed to parse zookeeper json: " + error); - } - const value::object& json_object = root.get(); - const value& host_json = json_object.at("host"); - const value& port_json = json_object.at("port"); - if (!host_json.is() || !port_json.is()) { - throw ZookeeperException("Invalid JSON received from zookeeper"); - } - string host = host_json.get(); - int port = port_json.get(); - if (i != 0) { - oss << ","; - } - oss << host << ":" << port; - } - } - return oss.str(); -} - -size_t ZookeeperWatcher::get_subscriber_count() const { - lock_guard _(callbacks_mutex_); - return callbacks_.size(); -} - -void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path, - void* ctx) { - auto self = static_cast(ctx); - self->handle_event(type, state, path); -} - -void ZookeeperWatcher::handle_event(int type, int state, const char* path) { - if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) { - string brokers = get_brokers(); - lock_guard _(callbacks_mutex_); - for (const auto& callbackPair : callbacks_) { - callbackPair.second(brokers); - } - } -} - -string ZookeeperWatcher::generate_id() { - ostringstream oss; - oss << id_counter_++ << "-" << system_clock::now().time_since_epoch().count(); - return oss.str(); -} - -} // cppkafka diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6f941d6..ec325c3 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -6,10 +6,6 @@ link_libraries(cppkafka ${RDKAFKA_LIBRARY} gtest gtest_main pthread) set(KAFKA_TEST_INSTANCE "kafka-vm:9092" CACHE STRING "The kafka instance to which to connect to run tests") - -set(ZOOKEEPER_TEST_INSTANCE "kafka-vm:2181" - CACHE STRING "The zookeeper instance to which to connect to run tests") - add_custom_target(tests) macro(create_test test_name) @@ -19,19 +15,9 @@ macro(create_test test_name) endmacro() add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") -add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"") - -if (ENABLE_ZOOKEEPER) - link_libraries(${ZOOKEEPER_LIBRARY}) -endif() - create_test(consumer) create_test(producer) create_test(kafka_handle_base) create_test(topic_partition_list) create_test(configuration) create_test(buffer) - -if (ENABLE_ZOOKEEPER) - create_test(zookeeper_watcher) -endif() diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index a7a08ce..3871257 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -272,45 +272,3 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) { EXPECT_EQ(partition, message.get_partition()); EXPECT_TRUE(callback_called); } - -#ifdef CPPKAFKA_HAVE_ZOOKEEPER - -#include "zookeeper/zookeeper_pool.h" - -TEST_F(ProducerTest, ConnectUsingZookeeper) { - int partition = 0; - - Configuration producer_config; - producer_config.set("zookeeper", ZOOKEEPER_TEST_INSTANCE); - - Configuration consumer_config = producer_config; - consumer_config.set("enable.auto.commit", false); - consumer_config.set("group.id", "producer_test"); - - // Create a consumer and assign this topic/partition - Consumer consumer(consumer_config); - consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); - ConsumerRunner runner(consumer, 1, 1); - - // Now create a producer and produce a message - Producer producer(producer_config); - Topic topic = producer.get_topic(KAFKA_TOPIC); - string payload = "Hello world! 2"; - string key = "such key"; - producer.produce(topic, partition, payload, key); - runner.try_join(); - - const auto& messages = runner.get_messages(); - ASSERT_EQ(1, messages.size()); - const auto& message = messages[0]; - EXPECT_EQ(Buffer(payload), message.get_payload()); - EXPECT_EQ(Buffer(key), message.get_key()); - EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); - EXPECT_EQ(partition, message.get_partition()); - EXPECT_EQ(0, message.get_error()); - - // We should have 2 watchers - EXPECT_EQ(2, ZookeeperPool::instance().get_subscriber_count(ZOOKEEPER_TEST_INSTANCE)); -} - -#endif // CPPKAFKA_HAVE_ZOOKEEPER diff --git a/tests/zookeeper_watcher_test.cpp b/tests/zookeeper_watcher_test.cpp deleted file mode 100644 index b397f9b..0000000 --- a/tests/zookeeper_watcher_test.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include -#include "cppkafka/zookeeper/zookeeper_watcher.h" -#include "cppkafka/exceptions.h" - -using std::string; - -using namespace cppkafka; - -class ZookeeperWatcherTest : public testing::Test { -public: - -}; - -TEST_F(ZookeeperWatcherTest, GetBrokers) { - ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE); - string brokers = watcher.get_brokers(); - EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers); -} - -TEST_F(ZookeeperWatcherTest, InvalidEndpointThrows) { - ZookeeperWatcher watcher("127.0.0.1:1212"); - EXPECT_THROW(watcher.get_brokers(), ZookeeperException); -} - -TEST_F(ZookeeperWatcherTest, SubscribeUnsubscribe) { - ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE); - string id = watcher.subscribe([](const string&) { }); - EXPECT_EQ(1, watcher.get_subscriber_count()); - watcher.unsubscribe(id); - EXPECT_EQ(0, watcher.get_subscriber_count()); -}