mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-11-04 12:37:54 +00:00
Add zookeeper support
This commit is contained in:
@@ -27,5 +27,15 @@ add_dependencies(cppkafka googletest)
|
|||||||
|
|
||||||
option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON)
|
option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON)
|
||||||
|
|
||||||
|
if (ENABLE_ZOOKEEPER)
|
||||||
|
set(CPPKAFKA_HAVE_ZOOKEEPER ON)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Configuration file
|
||||||
|
configure_file(
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h.in"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/include/cppkafka/config.h"
|
||||||
|
)
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
add_subdirectory(tests)
|
add_subdirectory(tests)
|
||||||
7
include/cppkafka/config.h.in
Normal file
7
include/cppkafka/config.h.in
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
#ifndef CPPKAFKA_CONFIG_H
|
||||||
|
#define CPPKAFKA_CONFIG_H
|
||||||
|
|
||||||
|
/* Define if the zookeeper extensions are enabled */
|
||||||
|
#cmakedefine CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|
||||||
|
#endif // CPPKAFKA_CONFIG_H
|
||||||
@@ -33,6 +33,8 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <unordered_set>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <boost/optional.hpp>
|
#include <boost/optional.hpp>
|
||||||
#include <librdkafka/rdkafka.h>
|
#include <librdkafka/rdkafka.h>
|
||||||
@@ -79,6 +81,7 @@ public:
|
|||||||
void set_socket_callback(SocketCallback callback);
|
void set_socket_callback(SocketCallback callback);
|
||||||
void set_default_topic_configuration(boost::optional<TopicConfiguration> config);
|
void set_default_topic_configuration(boost::optional<TopicConfiguration> config);
|
||||||
|
|
||||||
|
bool has_property(const std::string& name) const;
|
||||||
rd_kafka_conf_t* get_handle() const;
|
rd_kafka_conf_t* get_handle() const;
|
||||||
std::string get(const std::string& name) const;
|
std::string get(const std::string& name) const;
|
||||||
const DeliveryReportCallback& get_delivery_report_callback() const;
|
const DeliveryReportCallback& get_delivery_report_callback() const;
|
||||||
@@ -91,8 +94,10 @@ public:
|
|||||||
const boost::optional<TopicConfiguration>& get_default_topic_configuration() const;
|
const boost::optional<TopicConfiguration>& get_default_topic_configuration() const;
|
||||||
boost::optional<TopicConfiguration>& get_default_topic_configuration();
|
boost::optional<TopicConfiguration>& get_default_topic_configuration();
|
||||||
private:
|
private:
|
||||||
|
static const std::unordered_set<std::string> VALID_EXTENSIONS;
|
||||||
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
|
using HandlePtr = ClonablePtr<rd_kafka_conf_t, decltype(&rd_kafka_conf_destroy),
|
||||||
decltype(&rd_kafka_conf_dup)>;
|
decltype(&rd_kafka_conf_dup)>;
|
||||||
|
using PropertiesMap = std::unordered_map<std::string, std::string>;
|
||||||
|
|
||||||
Configuration(rd_kafka_conf_t* ptr);
|
Configuration(rd_kafka_conf_t* ptr);
|
||||||
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
|
static HandlePtr make_handle(rd_kafka_conf_t* ptr);
|
||||||
@@ -106,6 +111,7 @@ private:
|
|||||||
LogCallback log_callback_;
|
LogCallback log_callback_;
|
||||||
StatsCallback stats_callback_;
|
StatsCallback stats_callback_;
|
||||||
SocketCallback socket_callback_;
|
SocketCallback socket_callback_;
|
||||||
|
PropertiesMap extension_properties_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
@@ -42,6 +42,10 @@
|
|||||||
#include "topic_partition_list.h"
|
#include "topic_partition_list.h"
|
||||||
#include "topic_configuration.h"
|
#include "topic_configuration.h"
|
||||||
#include "configuration.h"
|
#include "configuration.h"
|
||||||
|
#include "config.h"
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
#include "zookeeper/zookeeper_subscriber.h"
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
|
|
||||||
@@ -93,6 +97,10 @@ private:
|
|||||||
Configuration config_;
|
Configuration config_;
|
||||||
TopicConfigurationMap topic_configurations_;
|
TopicConfigurationMap topic_configurations_;
|
||||||
std::mutex topic_configurations_mutex_;
|
std::mutex topic_configurations_mutex_;
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
// This could be an optional but apparently move construction is only supported as of 1.56
|
||||||
|
std::unique_ptr<ZookeeperSubscriber> zookeeper_subscriber_;
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
};
|
};
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
63
include/cppkafka/zookeeper/zookeeper_pool.h
Normal file
63
include/cppkafka/zookeeper/zookeeper_pool.h
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, Matias Fontanini
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions are
|
||||||
|
* met:
|
||||||
|
*
|
||||||
|
* * Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
* * Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the following disclaimer
|
||||||
|
* in the documentation and/or other materials provided with the
|
||||||
|
* distribution.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||||
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||||
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||||
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||||
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CPPKAFKA_ZOOKEEPER_POOL_H
|
||||||
|
#define CPPKAFKA_ZOOKEEPER_POOL_H
|
||||||
|
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
#include <chrono>
|
||||||
|
#include <mutex>
|
||||||
|
#include "zookeeper/zookeeper_watcher.h"
|
||||||
|
|
||||||
|
namespace cppkafka {
|
||||||
|
|
||||||
|
class ZookeeperSubscriber;
|
||||||
|
|
||||||
|
class ZookeeperPool {
|
||||||
|
public:
|
||||||
|
static ZookeeperPool& instance();
|
||||||
|
|
||||||
|
ZookeeperSubscriber subscribe(const std::string& endpoint,
|
||||||
|
std::chrono::milliseconds receive_timeout,
|
||||||
|
ZookeeperWatcher::WatcherCallback callback);
|
||||||
|
void unsubscribe(const ZookeeperSubscriber& subscriber);
|
||||||
|
std::string get_brokers(const std::string& endpoint);
|
||||||
|
|
||||||
|
size_t get_subscriber_count(const std::string& endpoint) const;
|
||||||
|
private:
|
||||||
|
using WatchersMap = std::map<std::string, ZookeeperWatcher>;
|
||||||
|
|
||||||
|
WatchersMap watchers_;
|
||||||
|
mutable std::mutex watchers_mutex_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // cppkafka
|
||||||
|
|
||||||
|
#endif // CPPKAFKA_ZOOKEEPER_POOL_H
|
||||||
55
include/cppkafka/zookeeper/zookeeper_subscriber.h
Normal file
55
include/cppkafka/zookeeper/zookeeper_subscriber.h
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, Matias Fontanini
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions are
|
||||||
|
* met:
|
||||||
|
*
|
||||||
|
* * Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
* * Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the following disclaimer
|
||||||
|
* in the documentation and/or other materials provided with the
|
||||||
|
* distribution.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||||
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||||
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||||
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||||
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
|
||||||
|
#define CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace cppkafka {
|
||||||
|
|
||||||
|
class ZookeeperSubscriber {
|
||||||
|
public:
|
||||||
|
ZookeeperSubscriber(std::string endpoint, std::string subscription_id);
|
||||||
|
ZookeeperSubscriber(ZookeeperSubscriber&&) = default;
|
||||||
|
ZookeeperSubscriber(const ZookeeperSubscriber&) = delete;
|
||||||
|
ZookeeperSubscriber& operator=(ZookeeperSubscriber&&);
|
||||||
|
ZookeeperSubscriber& operator=(const ZookeeperSubscriber&) = delete;
|
||||||
|
~ZookeeperSubscriber();
|
||||||
|
|
||||||
|
const std::string& get_endpoint() const;
|
||||||
|
const std::string& get_subscription_id() const;
|
||||||
|
private:
|
||||||
|
std::string endpoint_;
|
||||||
|
std::string subscription_id_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // cppkafka
|
||||||
|
|
||||||
|
#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
|
||||||
@@ -33,6 +33,9 @@
|
|||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <map>
|
||||||
|
#include <functional>
|
||||||
|
#include <mutex>
|
||||||
#include <zookeeper/zookeeper.h>
|
#include <zookeeper/zookeeper.h>
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
@@ -41,21 +44,33 @@ class ZookeeperWatcher {
|
|||||||
public:
|
public:
|
||||||
static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT;
|
static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT;
|
||||||
|
|
||||||
|
using WatcherCallback = std::function<void(const std::string& brokers)>;
|
||||||
|
|
||||||
ZookeeperWatcher(const std::string& endpoint);
|
ZookeeperWatcher(const std::string& endpoint);
|
||||||
ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout);
|
ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout);
|
||||||
|
|
||||||
|
void setup_watcher();
|
||||||
|
|
||||||
|
std::string subscribe(WatcherCallback callback);
|
||||||
|
void unsubscribe(const std::string& id);
|
||||||
|
|
||||||
std::string get_brokers();
|
std::string get_brokers();
|
||||||
|
size_t get_subscriber_count() const;
|
||||||
private:
|
private:
|
||||||
using HandlePtr = std::unique_ptr<zhandle_t, decltype(&zookeeper_close)>;
|
|
||||||
|
|
||||||
static const std::string BROKERS_PATH;
|
static const std::string BROKERS_PATH;
|
||||||
|
|
||||||
|
using HandlePtr = std::unique_ptr<zhandle_t, decltype(&zookeeper_close)>;
|
||||||
|
using CallbackMap = std::map<std::string, WatcherCallback>;
|
||||||
|
|
||||||
static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path,
|
static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path,
|
||||||
void* ctx);
|
void* ctx);
|
||||||
void handle_event(int type, int state, const char* path);
|
void handle_event(int type, int state, const char* path);
|
||||||
|
std::string generate_id();
|
||||||
|
|
||||||
HandlePtr handle_;
|
HandlePtr handle_;
|
||||||
|
CallbackMap callbacks_;
|
||||||
|
mutable std::mutex callbacks_mutex_;
|
||||||
|
size_t id_counter_{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
@@ -16,8 +16,11 @@ set(SOURCES
|
|||||||
)
|
)
|
||||||
|
|
||||||
if (ENABLE_ZOOKEEPER)
|
if (ENABLE_ZOOKEEPER)
|
||||||
|
add_definitions("-DCPPKAFKA_ENABLE_ZOOKEEPER=1")
|
||||||
set(ZOOKEEPER_SOURCES
|
set(ZOOKEEPER_SOURCES
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp)
|
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscriber.cpp)
|
||||||
# Add json library as include dir (header only)
|
# Add json library as include dir (header only)
|
||||||
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json)
|
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json)
|
||||||
set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES})
|
set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES})
|
||||||
|
|||||||
@@ -34,10 +34,12 @@
|
|||||||
#include "message.h"
|
#include "message.h"
|
||||||
#include "producer.h"
|
#include "producer.h"
|
||||||
#include "consumer.h"
|
#include "consumer.h"
|
||||||
|
#include "config.h"
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
using std::move;
|
using std::move;
|
||||||
using std::vector;
|
using std::vector;
|
||||||
|
using std::unordered_set;
|
||||||
|
|
||||||
using boost::optional;
|
using boost::optional;
|
||||||
|
|
||||||
@@ -45,6 +47,12 @@ using std::chrono::milliseconds;
|
|||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
|
|
||||||
|
const unordered_set<string> Configuration::VALID_EXTENSIONS = {
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
"zookeeper", "zookeeper.receive.timeout.ms"
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
};
|
||||||
|
|
||||||
// Callback proxies
|
// Callback proxies
|
||||||
|
|
||||||
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
|
void delivery_report_callback_proxy(rd_kafka_t*, const rd_kafka_message_t* msg, void *opaque) {
|
||||||
@@ -125,6 +133,11 @@ Configuration::Configuration(rd_kafka_conf_t* ptr)
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Configuration::set(const string& name, const string& value) {
|
void Configuration::set(const string& name, const string& value) {
|
||||||
|
if (VALID_EXTENSIONS.count(name)) {
|
||||||
|
// This is one of cppkafka's extensions
|
||||||
|
extension_properties_.emplace(name, value);
|
||||||
|
}
|
||||||
|
else {
|
||||||
char error_buffer[512];
|
char error_buffer[512];
|
||||||
rd_kafka_conf_res_t result;
|
rd_kafka_conf_res_t result;
|
||||||
result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
|
result = rd_kafka_conf_set(handle_.get(), name.data(), value.data(), error_buffer,
|
||||||
@@ -133,6 +146,7 @@ void Configuration::set(const string& name, const string& value) {
|
|||||||
throw ConfigException(name, error_buffer);
|
throw ConfigException(name, error_buffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
|
void Configuration::set_delivery_report_callback(DeliveryReportCallback callback) {
|
||||||
delivery_report_callback_ = move(callback);
|
delivery_report_callback_ = move(callback);
|
||||||
@@ -173,11 +187,29 @@ void Configuration::set_default_topic_configuration(optional<TopicConfiguration>
|
|||||||
default_topic_config_ = move(config);
|
default_topic_config_ = move(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Configuration::has_property(const string& name) const {
|
||||||
|
if (VALID_EXTENSIONS.count(name)) {
|
||||||
|
return extension_properties_.count(name) == 1;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
size_t size = 0;
|
||||||
|
return rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size) == RD_KAFKA_CONF_OK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rd_kafka_conf_t* Configuration::get_handle() const {
|
rd_kafka_conf_t* Configuration::get_handle() const {
|
||||||
return handle_.get();
|
return handle_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
string Configuration::get(const string& name) const {
|
string Configuration::get(const string& name) const {
|
||||||
|
if (VALID_EXTENSIONS.count(name)) {
|
||||||
|
auto iter = extension_properties_.find(name);
|
||||||
|
if (iter == extension_properties_.end()) {
|
||||||
|
throw ConfigOptionNotFound(name);
|
||||||
|
}
|
||||||
|
return iter->second;
|
||||||
|
}
|
||||||
|
else {
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size);
|
auto result = rd_kafka_conf_get(handle_.get(), name.data(), nullptr, &size);
|
||||||
if (result != RD_KAFKA_CONF_OK) {
|
if (result != RD_KAFKA_CONF_OK) {
|
||||||
@@ -187,6 +219,7 @@ string Configuration::get(const string& name) const {
|
|||||||
rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size);
|
rd_kafka_conf_get(handle_.get(), name.data(), buffer.data(), &size);
|
||||||
return string(buffer.data());
|
return string(buffer.data());
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
|
const Configuration::DeliveryReportCallback& Configuration::get_delivery_report_callback() const {
|
||||||
return delivery_report_callback_;
|
return delivery_report_callback_;
|
||||||
|
|||||||
@@ -31,6 +31,9 @@
|
|||||||
#include "exceptions.h"
|
#include "exceptions.h"
|
||||||
#include "topic.h"
|
#include "topic.h"
|
||||||
#include "topic_partition_list.h"
|
#include "topic_partition_list.h"
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
#include "zookeeper/zookeeper_pool.h"
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
using std::vector;
|
using std::vector;
|
||||||
@@ -38,6 +41,7 @@ using std::move;
|
|||||||
using std::make_tuple;
|
using std::make_tuple;
|
||||||
using std::lock_guard;
|
using std::lock_guard;
|
||||||
using std::mutex;
|
using std::mutex;
|
||||||
|
using std::exception;
|
||||||
using std::chrono::milliseconds;
|
using std::chrono::milliseconds;
|
||||||
|
|
||||||
namespace cppkafka {
|
namespace cppkafka {
|
||||||
@@ -120,6 +124,32 @@ const Configuration& KafkaHandleBase::get_configuration() const {
|
|||||||
|
|
||||||
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
|
||||||
handle_ = HandlePtr(handle, &rd_kafka_destroy);
|
handle_ = HandlePtr(handle, &rd_kafka_destroy);
|
||||||
|
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|
||||||
|
if (config_.has_property("zookeeper")) {
|
||||||
|
string endpoint = config_.get("zookeeper");
|
||||||
|
milliseconds timeout = ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT;
|
||||||
|
if (config_.has_property("zookeeper.receive.timeout.ms")) {
|
||||||
|
try {
|
||||||
|
timeout = milliseconds(stoi(config_.get("zookeeper.receive.timeout.ms")));
|
||||||
|
}
|
||||||
|
catch (exception&) {
|
||||||
|
throw ZookeeperException("Invalid zookeeper receive timeout");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
auto& pool = ZookeeperPool::instance();
|
||||||
|
auto callback = [&](const string& brokers) {
|
||||||
|
// Add the brokers and poll
|
||||||
|
rd_kafka_brokers_add(handle_.get(), brokers.data());
|
||||||
|
rd_kafka_poll(handle_.get(), 10);
|
||||||
|
};
|
||||||
|
ZookeeperSubscriber subscriber = pool.subscribe(endpoint, timeout, callback);
|
||||||
|
zookeeper_subscriber_.reset(new ZookeeperSubscriber(move(subscriber)));
|
||||||
|
callback(pool.get_brokers(endpoint));
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
}
|
}
|
||||||
|
|
||||||
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
|
Topic KafkaHandleBase::get_topic(const string& name, rd_kafka_topic_conf_t* conf) {
|
||||||
|
|||||||
89
src/zookeeper/zookeeper_pool.cpp
Normal file
89
src/zookeeper/zookeeper_pool.cpp
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016, Matias Fontanini
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* Redistribution and use in source and binary forms, with or without
|
||||||
|
* modification, are permitted provided that the following conditions are
|
||||||
|
* met:
|
||||||
|
*
|
||||||
|
* * Redistributions of source code must retain the above copyright
|
||||||
|
* notice, this list of conditions and the following disclaimer.
|
||||||
|
* * Redistributions in binary form must reproduce the above
|
||||||
|
* copyright notice, this list of conditions and the following disclaimer
|
||||||
|
* in the documentation and/or other materials provided with the
|
||||||
|
* distribution.
|
||||||
|
*
|
||||||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||||
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||||
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||||
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||||
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||||
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||||
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "zookeeper/zookeeper_pool.h"
|
||||||
|
#include "zookeeper/zookeeper_subscriber.h"
|
||||||
|
#include "exceptions.h"
|
||||||
|
|
||||||
|
using std::string;
|
||||||
|
using std::mutex;
|
||||||
|
using std::lock_guard;
|
||||||
|
using std::forward_as_tuple;
|
||||||
|
using std::piecewise_construct;
|
||||||
|
using std::chrono::milliseconds;
|
||||||
|
|
||||||
|
namespace cppkafka {
|
||||||
|
|
||||||
|
ZookeeperPool& ZookeeperPool::instance() {
|
||||||
|
static ZookeeperPool the_instance;
|
||||||
|
return the_instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint,
|
||||||
|
milliseconds receive_timeout,
|
||||||
|
ZookeeperWatcher::WatcherCallback callback) {
|
||||||
|
lock_guard<mutex> _(watchers_mutex_);
|
||||||
|
auto iter = watchers_.find(endpoint);
|
||||||
|
if (iter == watchers_.end()) {
|
||||||
|
iter = watchers_.emplace(piecewise_construct, forward_as_tuple(endpoint),
|
||||||
|
forward_as_tuple(endpoint, receive_timeout)).first;
|
||||||
|
}
|
||||||
|
string id = iter->second.subscribe(move(callback));
|
||||||
|
return ZookeeperSubscriber(endpoint, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ZookeeperPool::unsubscribe(const ZookeeperSubscriber& subscriber) {
|
||||||
|
lock_guard<mutex> _(watchers_mutex_);
|
||||||
|
auto iter = watchers_.find(subscriber.get_endpoint());
|
||||||
|
if (iter != watchers_.end()) {
|
||||||
|
iter->second.unsubscribe(subscriber.get_subscription_id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
string ZookeeperPool::get_brokers(const string& endpoint) {
|
||||||
|
lock_guard<mutex> _(watchers_mutex_);
|
||||||
|
auto iter = watchers_.find(endpoint);
|
||||||
|
if (iter == watchers_.end()) {
|
||||||
|
throw ZookeeperException("No zookeeper watcher for given endpoint");
|
||||||
|
}
|
||||||
|
return iter->second.get_brokers();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ZookeeperPool::get_subscriber_count(const string& endpoint) const {
|
||||||
|
lock_guard<mutex> _(watchers_mutex_);
|
||||||
|
auto iter = watchers_.find(endpoint);
|
||||||
|
if (iter == watchers_.end()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return iter->second.get_subscriber_count();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // cppkafka
|
||||||
25
src/zookeeper/zookeeper_subscriber.cpp
Normal file
25
src/zookeeper/zookeeper_subscriber.cpp
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
#include "zookeeper/zookeeper_subscriber.h"
|
||||||
|
#include "zookeeper/zookeeper_pool.h"
|
||||||
|
|
||||||
|
using std::string;
|
||||||
|
|
||||||
|
namespace cppkafka {
|
||||||
|
|
||||||
|
ZookeeperSubscriber::ZookeeperSubscriber(string endpoint, string subscription_id)
|
||||||
|
: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ZookeeperSubscriber::~ZookeeperSubscriber() {
|
||||||
|
ZookeeperPool::instance().unsubscribe(*this);
|
||||||
|
}
|
||||||
|
|
||||||
|
const string& ZookeeperSubscriber::get_endpoint() const {
|
||||||
|
return endpoint_;
|
||||||
|
}
|
||||||
|
|
||||||
|
const string& ZookeeperSubscriber::get_subscription_id() const {
|
||||||
|
return subscription_id_;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // cppkafka
|
||||||
@@ -29,15 +29,20 @@
|
|||||||
|
|
||||||
#include <stdexcept>
|
#include <stdexcept>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
#include <chrono>
|
||||||
#include <picojson.h>
|
#include <picojson.h>
|
||||||
#include "zookeeper/zookeeper_watcher.h"
|
#include "zookeeper/zookeeper_watcher.h"
|
||||||
#include "exceptions.h"
|
#include "exceptions.h"
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
|
using std::lock_guard;
|
||||||
|
using std::mutex;
|
||||||
using std::unique_ptr;
|
using std::unique_ptr;
|
||||||
using std::ostringstream;
|
using std::ostringstream;
|
||||||
using std::runtime_error;
|
using std::runtime_error;
|
||||||
|
using std::make_shared;
|
||||||
using std::chrono::milliseconds;
|
using std::chrono::milliseconds;
|
||||||
|
using std::chrono::system_clock;
|
||||||
|
|
||||||
using picojson::value;
|
using picojson::value;
|
||||||
using picojson::object;
|
using picojson::object;
|
||||||
@@ -63,10 +68,23 @@ ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_
|
|||||||
handle_ = HandlePtr(raw_handle, &zookeeper_close);
|
handle_ = HandlePtr(raw_handle, &zookeeper_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
string ZookeeperWatcher::subscribe(WatcherCallback callback) {
|
||||||
|
lock_guard<mutex> _(callbacks_mutex_);
|
||||||
|
string id = generate_id();
|
||||||
|
callbacks_.emplace(id, move(callback));
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ZookeeperWatcher::unsubscribe(const string& id) {
|
||||||
|
lock_guard<mutex> _(callbacks_mutex_);
|
||||||
|
callbacks_.erase(id);
|
||||||
|
}
|
||||||
|
|
||||||
string ZookeeperWatcher::get_brokers() {
|
string ZookeeperWatcher::get_brokers() {
|
||||||
|
auto handle = handle_.get();
|
||||||
using VectorPtr = unique_ptr<String_vector, decltype(&deallocate_String_vector)>;
|
using VectorPtr = unique_ptr<String_vector, decltype(&deallocate_String_vector)>;
|
||||||
String_vector brokers;
|
String_vector brokers;
|
||||||
if (zoo_get_children(handle_.get(), BROKERS_PATH.data(), 1, &brokers) != ZOK) {
|
if (zoo_get_children(handle, BROKERS_PATH.data(), 1, &brokers) != ZOK) {
|
||||||
throw ZookeeperException("Failed to get broker list from zookeeper");
|
throw ZookeeperException("Failed to get broker list from zookeeper");
|
||||||
}
|
}
|
||||||
// RAII up this pointer
|
// RAII up this pointer
|
||||||
@@ -76,7 +94,7 @@ string ZookeeperWatcher::get_brokers() {
|
|||||||
char config_line[1024];
|
char config_line[1024];
|
||||||
string path = "/brokers/ids/" + string(brokers.data[i]);
|
string path = "/brokers/ids/" + string(brokers.data[i]);
|
||||||
int config_len = sizeof(config_line);
|
int config_len = sizeof(config_line);
|
||||||
zoo_get(handle_.get(), path.data(), 0, config_line, &config_len, NULL);
|
zoo_get(handle, path.data(), 0, config_line, &config_len, NULL);
|
||||||
if (config_len > 0) {
|
if (config_len > 0) {
|
||||||
config_line[config_len] = 0;
|
config_line[config_len] = 0;
|
||||||
|
|
||||||
@@ -102,6 +120,11 @@ string ZookeeperWatcher::get_brokers() {
|
|||||||
return oss.str();
|
return oss.str();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t ZookeeperWatcher::get_subscriber_count() const {
|
||||||
|
lock_guard<mutex> _(callbacks_mutex_);
|
||||||
|
return callbacks_.size();
|
||||||
|
}
|
||||||
|
|
||||||
void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path,
|
void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path,
|
||||||
void* ctx) {
|
void* ctx) {
|
||||||
auto self = static_cast<ZookeeperWatcher*>(ctx);
|
auto self = static_cast<ZookeeperWatcher*>(ctx);
|
||||||
@@ -111,8 +134,17 @@ void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const
|
|||||||
void ZookeeperWatcher::handle_event(int type, int state, const char* path) {
|
void ZookeeperWatcher::handle_event(int type, int state, const char* path) {
|
||||||
if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) {
|
if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) {
|
||||||
string brokers = get_brokers();
|
string brokers = get_brokers();
|
||||||
// TODO: Callback!
|
lock_guard<mutex> _(callbacks_mutex_);
|
||||||
|
for (const auto& callbackPair : callbacks_) {
|
||||||
|
callbackPair.second(brokers);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
string ZookeeperWatcher::generate_id() {
|
||||||
|
ostringstream oss;
|
||||||
|
oss << id_counter_++ << "-" << system_clock::now().time_since_epoch().count();
|
||||||
|
return oss.str();
|
||||||
|
}
|
||||||
|
|
||||||
} // cppkafka
|
} // cppkafka
|
||||||
|
|||||||
@@ -18,6 +18,10 @@ endmacro()
|
|||||||
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
|
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
|
||||||
add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"")
|
add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"")
|
||||||
|
|
||||||
|
if (ENABLE_ZOOKEEPER)
|
||||||
|
link_libraries(zookeeper_mt)
|
||||||
|
endif()
|
||||||
|
|
||||||
create_test(consumer)
|
create_test(consumer)
|
||||||
create_test(producer)
|
create_test(producer)
|
||||||
create_test(kafka_handle_base)
|
create_test(kafka_handle_base)
|
||||||
@@ -25,5 +29,4 @@ create_test(topic_partition_list)
|
|||||||
create_test(configuration)
|
create_test(configuration)
|
||||||
if (ENABLE_ZOOKEEPER)
|
if (ENABLE_ZOOKEEPER)
|
||||||
create_test(zookeeper_watcher)
|
create_test(zookeeper_watcher)
|
||||||
target_link_libraries(zookeeper_watcher_test zookeeper_mt)
|
|
||||||
endif()
|
endif()
|
||||||
|
|||||||
@@ -276,3 +276,46 @@ TEST_F(ProducerTest, PartitionerCallbackOnDefaultTopicConfig) {
|
|||||||
EXPECT_EQ(partition, message.get_partition());
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
EXPECT_TRUE(callback_called);
|
EXPECT_TRUE(callback_called);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|
||||||
|
#include "zookeeper/zookeeper_pool.h"
|
||||||
|
|
||||||
|
TEST_F(ProducerTest, ConnectUsingZookeeper) {
|
||||||
|
int partition = 0;
|
||||||
|
|
||||||
|
Configuration producer_config;
|
||||||
|
producer_config.set("zookeeper", ZOOKEEPER_TEST_INSTANCE);
|
||||||
|
|
||||||
|
Configuration consumer_config = producer_config;
|
||||||
|
consumer_config.set("enable.auto.commit", false);
|
||||||
|
consumer_config.set("group.id", "producer_test");
|
||||||
|
|
||||||
|
// Create a consumer and assign this topic/partition
|
||||||
|
Consumer consumer(consumer_config);
|
||||||
|
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
|
||||||
|
ConsumerRunner runner(consumer, 1, 1);
|
||||||
|
|
||||||
|
// Now create a producer and produce a message
|
||||||
|
Producer producer(producer_config);
|
||||||
|
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
||||||
|
string payload = "Hello world! 2";
|
||||||
|
string key = "such key";
|
||||||
|
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
|
||||||
|
Buffer(key.data(), key.size()));
|
||||||
|
runner.try_join();
|
||||||
|
|
||||||
|
const auto& messages = runner.get_messages();
|
||||||
|
ASSERT_EQ(1, messages.size());
|
||||||
|
const auto& message = messages[0];
|
||||||
|
EXPECT_EQ(payload, message.get_payload().as_string());
|
||||||
|
EXPECT_EQ(key, message.get_key().as_string());
|
||||||
|
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
|
||||||
|
EXPECT_EQ(partition, message.get_partition());
|
||||||
|
EXPECT_EQ(0, message.get_error());
|
||||||
|
|
||||||
|
// We should have 2 watchers
|
||||||
|
EXPECT_EQ(2, ZookeeperPool::instance().get_subscriber_count(ZOOKEEPER_TEST_INSTANCE));
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif // CPPKAFKA_HAVE_ZOOKEEPER
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include "cppkafka/zookeeper/zookeeper_watcher.h"
|
#include "cppkafka/zookeeper/zookeeper_watcher.h"
|
||||||
|
#include "cppkafka/exceptions.h"
|
||||||
|
|
||||||
using std::string;
|
using std::string;
|
||||||
|
|
||||||
@@ -15,3 +16,16 @@ TEST_F(ZookeeperWatcherTest, GetBrokers) {
|
|||||||
string brokers = watcher.get_brokers();
|
string brokers = watcher.get_brokers();
|
||||||
EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers);
|
EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ZookeeperWatcherTest, InvalidEndpointThrows) {
|
||||||
|
ZookeeperWatcher watcher("127.0.0.1:1212");
|
||||||
|
EXPECT_THROW(watcher.get_brokers(), ZookeeperException);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ZookeeperWatcherTest, SubscribeUnsubscribe) {
|
||||||
|
ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE);
|
||||||
|
string id = watcher.subscribe([](const string&) { });
|
||||||
|
EXPECT_EQ(1, watcher.get_subscriber_count());
|
||||||
|
watcher.unsubscribe(id);
|
||||||
|
EXPECT_EQ(0, watcher.get_subscriber_count());
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user