Remove zookeeper support

This commit is contained in:
Matias Fontanini
2016-06-16 20:22:36 -07:00
parent e1c2ee34fe
commit 0fb0afc4f4
27 changed files with 42 additions and 800 deletions

2
.gitignore vendored
View File

@@ -1 +1 @@
include/cppkafka/config.h
build

View File

@@ -12,22 +12,6 @@ include_directories(${Boost_INCLUDE_DIRS})
find_package(RdKafka REQUIRED)
option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON)
if (ENABLE_ZOOKEEPER)
find_package(Zookeeper REQUIRED)
message(STATUS "Found zookeeper library")
include_directories(${ZOOKEEPER_INCLUDE_DIR})
set(CPPKAFKA_HAVE_ZOOKEEPER ON)
endif()
# Configuration file
configure_file(
"${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h.in"
"${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h"
)
add_subdirectory(src)
add_subdirectory(include)

View File

@@ -12,8 +12,6 @@ High level C++ wrapper for _rdkafka_
* _cppkafka_ provides an API to produce messages as well as consuming messages but the latter is only supported via the high level consumer API (introduced on kafka 0.9). Therefore, you need **rdkakfa >= 0.9** in order to use it. Other wrapped functionalities are also provided, like fetching metadata, offsets, etc.
* _cppkafka_ adds smooth **zookeeper** support: you only need to add a _"zookeeper"_ attribute to your configuration object and your producer/consumer will automatically load the initial broker list from it while also listening for broker list updates.
# It's simple!
_cppkafka_'s API is simple. For example, this code creates a producer writes a message into some partition:
@@ -26,7 +24,7 @@ using namespace std;
int main() {
// Create the config
Configuration config;
config.set("zookeeper", "127.0.0.1:2181");
config.set("metadata.broker.list", "127.0.0.1:2181");
// Create the producer
Producer producer(config);
// Get the topic we'll write into
@@ -42,8 +40,7 @@ int main() {
In order to compile _cppkafka_ you need:
* _rdkafka >= 0.9_
* Optionally _libzookeeper_. This is used by default, but you can disable it invoking cmake
using the `-DENABLE_ZOOKEEPER=0` argument.
* _CMake_
* A compiler with good C++11 support. This was tested successfully on _g++ 4.8.3_.
* The boost library. _cppkafka_ only requires boost.optional, which is a header only library,
so this doesn't add any additional runtime dependencies.
@@ -63,4 +60,3 @@ If you want to use _cppkafka_, you'll need to link your application with:
* _cppkafka_
* _rdkafka_
* _zookeeper_ if you enabled it during build time (it's on by default).

View File

@@ -1,27 +0,0 @@
find_path(ZOOKEEPER_ROOT_DIR
NAMES include/zookeeper/zookeeper.h
)
find_path(ZOOKEEPER_INCLUDE_DIR
NAMES zookeeper/zookeeper.h
HINTS ${ZOOKEEPER_ROOT_DIR}/include
)
set (HINT_DIR ${ZOOKEEPER_ROOT_DIR}/lib)
find_library(ZOOKEEPER_LIBRARY
NAMES zookeeper_mt zookeeper_st
HINTS ${HINT_DIR}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(ZOOKEEPER DEFAULT_MSG
ZOOKEEPER_LIBRARY
ZOOKEEPER_INCLUDE_DIR
)
mark_as_advanced(
ZOOKEEPER_ROOT_DIR
ZOOKEEPER_INCLUDE_DIR
ZOOKEEPER_LIBRARY
)

View File

@@ -1,12 +1,12 @@
find_package(Boost COMPONENTS program_options)
if (Boost_PROGRAM_OPTIONS_FOUND)
link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY} ${ZOOKEEPER_LIBRARY})
link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
add_executable(kafka_producer kafka_producer.cpp)
add_executable(kafka_consumer kafka_consumer.cpp)
add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp)
add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp)
add_custom_target(examples DEPENDS kafka_producer kafka_consumer)
else()
message(STATUS "Disabling examples since boost.program_options was not found")

View File

@@ -16,22 +16,18 @@ using cppkafka::Message;
namespace po = boost::program_options;
#ifndef CPPKAFKA_HAVE_ZOOKEEPER
static_assert(false, "Examples require the zookeeper extension");
#endif
bool running = true;
int main(int argc, char* argv[]) {
string zookeeper_endpoint;
string brokers;
string topic_name;
string group_id;
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("zookeeper", po::value<string>(&zookeeper_endpoint)->required(),
"the zookeeper endpoint")
("brokers", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("group-id", po::value<string>(&group_id)->required(),
@@ -56,7 +52,7 @@ int main(int argc, char* argv[]) {
// Construct the configuration
Configuration config;
config.set("zookeeper", zookeeper_endpoint);
config.set("metadata.broker.list", brokers);
config.set("group.id", group_id);
// Disable auto commit
config.set("enable.auto.commit", false);
@@ -67,6 +63,8 @@ int main(int argc, char* argv[]) {
// Subscribe to the topic
consumer.subscribe({ topic_name });
cout << "Consuming messages from topic " << topic_name << endl;
// Now read lines and write them into kafka
while (running) {
// Try to consume a message

View File

@@ -18,20 +18,16 @@ using cppkafka::Partition;
namespace po = boost::program_options;
#ifndef CPPKAFKA_HAVE_ZOOKEEPER
static_assert(false, "Examples require the zookeeper extension");
#endif
int main(int argc, char* argv[]) {
string zookeeper_endpoint;
string brokers;
string topic_name;
int partition_value = -1;
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("zookeeper", po::value<string>(&zookeeper_endpoint)->required(),
"the zookeeper endpoint")
("brokers", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("partition", po::value<int>(&partition_value),
@@ -60,13 +56,15 @@ int main(int argc, char* argv[]) {
// Construct the configuration
Configuration config;
config.set("zookeeper", zookeeper_endpoint);
config.set("metadata.broker.list", brokers);
// Create the producer
Producer producer(config);
// Get the topic we want
Topic topic = producer.get_topic(topic_name);
cout << "Producing messages into topic " << topic_name << endl;
// Now read lines and write them into kafka
string line;
while (getline(cin, line)) {

View File

@@ -4,4 +4,3 @@ install(
DESTINATION include/cppkafka
COMPONENT Headers
)
add_subdirectory(zookeeper)

View File

@@ -1,7 +0,0 @@
#ifndef CPPKAFKA_CONFIG_H
#define CPPKAFKA_CONFIG_H
/* Define if the zookeeper extensions are enabled */
#cmakedefine CPPKAFKA_HAVE_ZOOKEEPER
#endif // CPPKAFKA_CONFIG_H

View File

@@ -87,16 +87,6 @@ public:
*
* This will call rd_kafka_conf_set under the hood.
*
* If the zookeeper extension is enabled (cppkafka is build with -DENABLE_ZOOKEEPER=1), then
* this accepts 2 extra attribute names:
*
* - "zookeeper" which indicates the zookeeper endpoint to connect to
* - "zookeeper.receive.timeout.ms" which indicates the zookeeper receive timeout
*
* When the "zookeeper" attribute is used, a Consumer or Producer constructed using this
* configuration will use zookeeper under the hood to get the broker list and watch for
* broker updates.
*
* \param name The name of the attribute
* \param value The value of the attribute
*/
@@ -204,10 +194,8 @@ public:
*/
boost::optional<TopicConfiguration>& get_default_topic_configuration();
private:
static const std::unordered_set<std::string> VALID_EXTENSIONS;
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
decltype(&rd_kafka_conf_dup)>;
using PropertiesMap = std::unordered_map<std::string, std::string>;
Configuration(rd_kafka_conf_t* ptr);
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
@@ -221,7 +209,6 @@ private:
LogCallback log_callback_;
StatsCallback stats_callback_;
SocketCallback socket_callback_;
PropertiesMap extension_properties_;
};
} // cppkafka

View File

@@ -52,10 +52,9 @@ class TopicConfiguration;
* Semi-simple code showing how to use this class
*
* \code
* // Create a configuration and set the group.id and zookeeper fields
* // Create a configuration and set the group.id and broker list fields
* Configuration config;
* // This is only valid when using the zookeeper extension
* config.set("zookeeper", "127.0.0.1:2181");
* config.set("metadata.broker.list", "127.0.0.1:9092");
* config.set("group.id", "foo");
*
* // Create a consumer

View File

@@ -76,14 +76,6 @@ private:
rd_kafka_resp_err_t error_code_;
};
/**
* An exception when using zookeeper
*/
class ZookeeperException : public Exception {
public:
using Exception::Exception;
};
} // cppkafka
#endif // CPPKAFKA_EXCEPTIONS_H

View File

@@ -42,10 +42,6 @@
#include "topic_partition_list.h"
#include "topic_configuration.h"
#include "configuration.h"
#include "config.h"
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
#include "zookeeper/zookeeper_subscription.h"
#endif // CPPKAFKA_HAVE_ZOOKEEPER
namespace cppkafka {
@@ -180,10 +176,6 @@ private:
Configuration config_;
TopicConfigurationMap topic_configurations_;
std::mutex topic_configurations_mutex_;
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
// This could be an optional but apparently move construction is only supported as of 1.56
std::unique_ptr<ZookeeperSubscription> zookeeper_subscription_;
#endif // CPPKAFKA_HAVE_ZOOKEEPER
};
} // cppkafka

View File

@@ -55,9 +55,9 @@ class TopicConfiguration;
* In order to produce messages you could do something like:
*
* \code
* // Use the zookeeper extension
* // Set the broker list
* Configuration config;
* config.set("zookeeper", "127.0.0.1:2181");
* config.set("metadata.broker.list", "127.0.0.1:9092");
*
* // Create a producer
* Producer producer(config);

View File

@@ -1,6 +0,0 @@
file(GLOB INCLUDE_FILES "*.h")
install(
FILES ${INCLUDE_FILES}
DESTINATION include/cppkafka/zookeeper
COMPONENT Headers
)

View File

@@ -1,97 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ZOOKEEPER_POOL_H
#define CPPKAFKA_ZOOKEEPER_POOL_H
#include <map>
#include <string>
#include <chrono>
#include <mutex>
#include "zookeeper/zookeeper_watcher.h"
namespace cppkafka {
class ZookeeperSubscription;
/**
* \brief Pool of zookeeper handles
*
* This class is used internally by cppkafka.
*/
class ZookeeperPool {
public:
/**
* Get singleton instance
*/
static ZookeeperPool& instance();
/**
* Subscribe to the given endpoint
*
* \param endpoint The zookeeper endpoint to subscribe to
* \param receive_timeout The zookeeper receive timeout
* \param callback The callback to be executed on updates
*
* \return A ZookeeperSubscription that will auto-unsubscribe upon destruction
*/
ZookeeperSubscription subscribe(const std::string& endpoint,
std::chrono::milliseconds receive_timeout,
ZookeeperWatcher::WatcherCallback callback);
/**
* Unsubscribes from a previous subscription
*
* \param subscriber The subscriber return by a previous call to ZookeeperPool::subscribe
*/
void unsubscribe(const ZookeeperSubscription& subscriber);
/**
* \brief Gets the broker list for the given zookeeper endpoint
*
* Requires having previously called subscribe for this endpoint at least once.
*
* \param endpoint The endpoint for which to get the broker list
*/
std::string get_brokers(const std::string& endpoint);
/**
* Gets the amount of subscribers for the given zookeeper endpoint
*/
size_t get_subscriber_count(const std::string& endpoint) const;
private:
using WatchersMap = std::map<std::string, ZookeeperWatcher>;
WatchersMap watchers_;
mutable std::mutex watchers_mutex_;
};
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_POOL_H

View File

@@ -1,63 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H
#define CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H
#include <string>
namespace cppkafka {
/**
* \cond
*/
class ZookeeperSubscription {
public:
ZookeeperSubscription(std::string endpoint, std::string subscription_id);
ZookeeperSubscription(ZookeeperSubscription&&) = default;
ZookeeperSubscription(const ZookeeperSubscription&) = delete;
ZookeeperSubscription& operator=(ZookeeperSubscription&&);
ZookeeperSubscription& operator=(const ZookeeperSubscription&) = delete;
~ZookeeperSubscription();
const std::string& get_endpoint() const;
const std::string& get_subscription_id() const;
private:
std::string endpoint_;
std::string subscription_id_;
};
/**
* \endcond
*/
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H

View File

@@ -1,85 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ZOOKEEPER_WATCHER_H
#define CPPKAFKA_ZOOKEEPER_WATCHER_H
#include <memory>
#include <string>
#include <chrono>
#include <map>
#include <functional>
#include <mutex>
#include <zookeeper/zookeeper.h>
namespace cppkafka {
/**
* \cond
*/
class ZookeeperWatcher {
public:
static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT;
using WatcherCallback = std::function<void(const std::string& brokers)>;
ZookeeperWatcher(const std::string& endpoint);
ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout);
void setup_watcher();
std::string subscribe(WatcherCallback callback);
void unsubscribe(const std::string& id);
std::string get_brokers();
size_t get_subscriber_count() const;
private:
static const std::string BROKERS_PATH;
using HandlePtr = std::unique_ptr<zhandle_t, decltype(&zookeeper_close)>;
using CallbackMap = std::map<std::string, WatcherCallback>;
static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path,
void* ctx);
void handle_event(int type, int state, const char* path);
std::string generate_id();
HandlePtr handle_;
CallbackMap callbacks_;
mutable std::mutex callbacks_mutex_;
size_t id_counter_{0};
};
/**
* \endcond
*/
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_WATCHER_H

View File

@@ -15,17 +15,6 @@ set(SOURCES
consumer.cpp
)
if (ENABLE_ZOOKEEPER)
add_definitions("-DCPPKAFKA_ENABLE_ZOOKEEPER=1")
set(ZOOKEEPER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscription.cpp)
# Add json library as include dir (header only)
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json)
set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES})
endif()
add_library(cppkafka ${SOURCES})
install(

View File

@@ -34,7 +34,6 @@
#include "message.h"
#include "producer.h"
#include "consumer.h"
#include "config.h"
using std::string;
using std::move;
@@ -47,12 +46,6 @@ using std::chrono::milliseconds;
namespace cppkafka {
const unordered_set<string> Configuration::VALID_EXTENSIONS = {
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
"zookeeper", "zookeeper.receive.timeout.ms"
#endif // CPPKAFKA_HAVE_ZOOKEEPER
};
// Callback proxies
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
@@ -133,11 +126,6 @@ Configuration::Configuration(rd_kafka_conf_t* ptr)
}
void Configuration::set(const string& name, const string& value) {
if (VALID_EXTENSIONS.count(name)) {
// This is one of cppkafka's extensions
extension_properties_.emplace(name, value);
}
else {
char error_buffer[512];
rd_kafka_conf_res_t result;
result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
@@ -146,7 +134,6 @@ void Configuration::set(const string& name, const string& value) {
throw ConfigException(name, error_buffer);
}
}
}
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
delivery_report_callback_ = move(callback);
@@ -188,28 +175,15 @@ void Configuration::set_default_topic_configuration(optional<TopicConfiguration>
}
bool Configuration::has_property(const string& name) const {
if (VALID_EXTENSIONS.count(name)) {
return extension_properties_.count(name) == 1;
}
else {
size_t size = 0;
return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK;
}
}
rd_kafka_conf_t* Configuration::get_handle() const {
return handle_.get();
}
string Configuration::get(const string& name) const {
if (VALID_EXTENSIONS.count(name)) {
auto iter = extension_properties_.find(name);
if (iter == extension_properties_.end()) {
throw ConfigOptionNotFound(name);
}
return iter->second;
}
else {
size_t size = 0;
auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size);
if (result != RD_KAFKA_CONF_OK) {
@@ -219,7 +193,6 @@ string Configuration::get(const string& name) const {
rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size);
return string(buffer.data());
}
}
const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
return delivery_report_callback_;

View File

@@ -31,9 +31,6 @@
#include "exceptions.h"
#include "topic.h"
#include "topic_partition_list.h"
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
#include "zookeeper/zookeeper_pool.h"
#endif // CPPKAFKA_HAVE_ZOOKEEPER
using std::string;
using std::vector;
@@ -126,32 +123,6 @@ const Configuration& KafkaHandleBase::get_configuration() const {
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
handle_ = HandlePtr(handle, &rd_kafka_destroy);
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
if (config_.has_property("zookeeper")) {
string endpoint = config_.get("zookeeper");
milliseconds timeout = ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT;
if (config_.has_property("zookeeper.receive.timeout.ms")) {
try {
timeout = milliseconds(stoi(config_.get("zookeeper.receive.timeout.ms")));
}
catch (exception&) {
throw ZookeeperException("Invalid zookeeper receive timeout");
}
}
auto& pool = ZookeeperPool::instance();
auto callback = [&](const string& brokers) {
// Add the brokers and poll
rd_kafka_brokers_add(handle_.get(), brokers.data());
rd_kafka_poll(handle_.get(), 10);
};
ZookeeperSubscription subscriber = pool.subscribe(endpoint, timeout, callback);
zookeeper_subscription_.reset(new ZookeeperSubscription(move(subscriber)));
callback(pool.get_brokers(endpoint));
}
#endif // CPPKAFKA_HAVE_ZOOKEEPER
}
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {

View File

@@ -1,89 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "zookeeper/zookeeper_pool.h"
#include "zookeeper/zookeeper_subscription.h"
#include "exceptions.h"
using std::string;
using std::mutex;
using std::lock_guard;
using std::forward_as_tuple;
using std::piecewise_construct;
using std::chrono::milliseconds;
namespace cppkafka {
ZookeeperPool& ZookeeperPool::instance() {
static ZookeeperPool the_instance;
return the_instance;
}
ZookeeperSubscription ZookeeperPool::subscribe(const string& endpoint,
milliseconds receive_timeout,
ZookeeperWatcher::WatcherCallback callback) {
lock_guard<mutex> _(watchers_mutex_);
auto iter = watchers_.find(endpoint);
if (iter == watchers_.end()) {
iter = watchers_.emplace(piecewise_construct, forward_as_tuple(endpoint),
forward_as_tuple(endpoint, receive_timeout)).first;
}
string id = iter->second.subscribe(move(callback));
return ZookeeperSubscription(endpoint, id);
}
void ZookeeperPool::unsubscribe(const ZookeeperSubscription& subscriber) {
lock_guard<mutex> _(watchers_mutex_);
auto iter = watchers_.find(subscriber.get_endpoint());
if (iter != watchers_.end()) {
iter->second.unsubscribe(subscriber.get_subscription_id());
}
}
string ZookeeperPool::get_brokers(const string& endpoint) {
lock_guard<mutex> _(watchers_mutex_);
auto iter = watchers_.find(endpoint);
if (iter == watchers_.end()) {
throw ZookeeperException("No zookeeper watcher for given endpoint");
}
return iter->second.get_brokers();
}
size_t ZookeeperPool::get_subscriber_count(const string& endpoint) const {
lock_guard<mutex> _(watchers_mutex_);
auto iter = watchers_.find(endpoint);
if (iter == watchers_.end()) {
return 0;
}
else {
return iter->second.get_subscriber_count();
}
}
} // cppkafka

View File

@@ -1,25 +0,0 @@
#include "zookeeper/zookeeper_subscription.h"
#include "zookeeper/zookeeper_pool.h"
using std::string;
namespace cppkafka {
ZookeeperSubscription::ZookeeperSubscription(string endpoint, string subscription_id)
: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) {
}
ZookeeperSubscription::~ZookeeperSubscription() {
ZookeeperPool::instance().unsubscribe(*this);
}
const string& ZookeeperSubscription::get_endpoint() const {
return endpoint_;
}
const string& ZookeeperSubscription::get_subscription_id() const {
return subscription_id_;
}
} // cppkafka

View File

@@ -1,150 +0,0 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdexcept>
#include <sstream>
#include <chrono>
#include <picojson.h>
#include "zookeeper/zookeeper_watcher.h"
#include "exceptions.h"
using std::string;
using std::lock_guard;
using std::mutex;
using std::unique_ptr;
using std::ostringstream;
using std::runtime_error;
using std::make_shared;
using std::chrono::milliseconds;
using std::chrono::system_clock;
using picojson::value;
using picojson::object;
namespace cppkafka {
const milliseconds ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT{10000};
const string ZookeeperWatcher::BROKERS_PATH = "/brokers/ids";
ZookeeperWatcher::ZookeeperWatcher(const string& endpoint)
: ZookeeperWatcher(endpoint, DEFAULT_RECEIVE_TIMEOUT) {
}
ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_timeout)
: handle_(nullptr, nullptr) {
auto raw_handle = zookeeper_init(endpoint.data(), &ZookeeperWatcher::handle_event_proxy,
receive_timeout.count(), nullptr, this, 0);
if (!raw_handle) {
// TODO: make this a proper exception
throw runtime_error("Failed to create zookeeper handle");
}
handle_ = HandlePtr(raw_handle, &zookeeper_close);
}
string ZookeeperWatcher::subscribe(WatcherCallback callback) {
lock_guard<mutex> _(callbacks_mutex_);
string id = generate_id();
callbacks_.emplace(id, move(callback));
return id;
}
void ZookeeperWatcher::unsubscribe(const string& id) {
lock_guard<mutex> _(callbacks_mutex_);
callbacks_.erase(id);
}
string ZookeeperWatcher::get_brokers() {
auto handle = handle_.get();
using VectorPtr = unique_ptr<String_vector, decltype(&deallocate_String_vector)>;
String_vector brokers;
if (zoo_get_children(handle, BROKERS_PATH.data(), 1, &brokers) != ZOK) {
throw ZookeeperException("Failed to get broker list from zookeeper");
}
// RAII up this pointer
ostringstream oss;
VectorPtr _(&brokers, &deallocate_String_vector);
for (int i = 0; i < brokers.count; i++) {
char config_line[1024];
string path = "/brokers/ids/" + string(brokers.data[i]);
int config_len = sizeof(config_line);
zoo_get(handle, path.data(), 0, config_line, &config_len, NULL);
if (config_len > 0) {
config_line[config_len] = 0;
value root;
string error = picojson::parse(root, config_line);
if (!error.empty() || !root.is<object>()) {
throw ZookeeperException("Failed to parse zookeeper json: " + error);
}
const value::object& json_object = root.get<object>();
const value& host_json = json_object.at("host");
const value& port_json = json_object.at("port");
if (!host_json.is<string>() || !port_json.is<double>()) {
throw ZookeeperException("Invalid JSON received from zookeeper");
}
string host = host_json.get<string>();
int port = port_json.get<double>();
if (i != 0) {
oss << ",";
}
oss << host << ":" << port;
}
}
return oss.str();
}
size_t ZookeeperWatcher::get_subscriber_count() const {
lock_guard<mutex> _(callbacks_mutex_);
return callbacks_.size();
}
void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path,
void* ctx) {
auto self = static_cast<ZookeeperWatcher*>(ctx);
self->handle_event(type, state, path);
}
void ZookeeperWatcher::handle_event(int type, int state, const char* path) {
if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) {
string brokers = get_brokers();
lock_guard<mutex> _(callbacks_mutex_);
for (const auto& callbackPair : callbacks_) {
callbackPair.second(brokers);
}
}
}
string ZookeeperWatcher::generate_id() {
ostringstream oss;
oss << id_counter_++ << "-" << system_clock::now().time_since_epoch().count();
return oss.str();
}
} // cppkafka

View File

@@ -6,10 +6,6 @@ link_libraries(cppkafka ${RDKAFKA_LIBRARY} gtest gtest_main pthread)
set(KAFKA_TEST_INSTANCE "kafka-vm:9092"
CACHE STRING "The kafka instance to which to connect to run tests")
set(ZOOKEEPER_TEST_INSTANCE "kafka-vm:2181"
CACHE STRING "The zookeeper instance to which to connect to run tests")
add_custom_target(tests)
macro(create_test test_name)
@@ -19,19 +15,9 @@ macro(create_test test_name)
endmacro()
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"")
if (ENABLE_ZOOKEEPER)
link_libraries(${ZOOKEEPER_LIBRARY})
endif()
create_test(consumer)
create_test(producer)
create_test(kafka_handle_base)
create_test(topic_partition_list)
create_test(configuration)
create_test(buffer)
if (ENABLE_ZOOKEEPER)
create_test(zookeeper_watcher)
endif()

View File

@@ -272,45 +272,3 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
EXPECT_EQ(partition, message.get_partition());
EXPECT_TRUE(callback_called);
}
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
#include "zookeeper/zookeeper_pool.h"
TEST_F(ProducerTest, ConnectUsingZookeeper) {
int partition = 0;
Configuration producer_config;
producer_config.set("zookeeper", ZOOKEEPER_TEST_INSTANCE);
Configuration consumer_config = producer_config;
consumer_config.set("enable.auto.commit", false);
consumer_config.set("group.id", "producer_test");
// Create a consumer and assign this topic/partition
Consumer consumer(consumer_config);
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, 1, 1);
// Now create a producer and produce a message
Producer producer(producer_config);
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world! 2";
string key = "such key";
producer.produce(topic, partition, payload, key);
runner.try_join();
const auto& messages = runner.get_messages();
ASSERT_EQ(1, messages.size());
const auto& message = messages[0];
EXPECT_EQ(Buffer(payload), message.get_payload());
EXPECT_EQ(Buffer(key), message.get_key());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
// We should have 2 watchers
EXPECT_EQ(2, ZookeeperPool::instance().get_subscriber_count(ZOOKEEPER_TEST_INSTANCE));
}
#endif // CPPKAFKA_HAVE_ZOOKEEPER

View File

@@ -1,31 +0,0 @@
#include <gtest/gtest.h>
#include "cppkafka/zookeeper/zookeeper_watcher.h"
#include "cppkafka/exceptions.h"
using std::string;
using namespace cppkafka;
class ZookeeperWatcherTest : public testing::Test {
public:
};
TEST_F(ZookeeperWatcherTest, GetBrokers) {
ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE);
string brokers = watcher.get_brokers();
EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers);
}
TEST_F(ZookeeperWatcherTest, InvalidEndpointThrows) {
ZookeeperWatcher watcher("127.0.0.1:1212");
EXPECT_THROW(watcher.get_brokers(), ZookeeperException);
}
TEST_F(ZookeeperWatcherTest, SubscribeUnsubscribe) {
ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE);
string id = watcher.subscribe([](const string&) { });
EXPECT_EQ(1, watcher.get_subscriber_count());
watcher.unsubscribe(id);
EXPECT_EQ(0, watcher.get_subscriber_count());
}