Add initial zookeeper wrapper code

This commit is contained in:
Matias Fontanini
2016-06-11 08:18:09 -07:00
parent eed3ac270a
commit 65126b27f1
8 changed files with 1270 additions and 2 deletions

View File

@@ -25,5 +25,7 @@ ExternalProject_Add(
# Make sure we build googletest before anything else
add_dependencies(cppkafka googletest)
option(ENABLE_ZOOKEEPER "Whether to enable zookeeper support" ON)
enable_testing()
add_subdirectory(tests)

View File

@@ -64,6 +64,11 @@ private:
rd_kafka_resp_err_t error_code_;
};
class ZookeeperException : public Exception {
public:
using Exception::Exception;
};
} // cppkafka
#endif // CPPKAFKA_EXCEPTIONS_H

View File

@@ -0,0 +1,63 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ZOOKEEPER_WATCHER_H
#define CPPKAFKA_ZOOKEEPER_WATCHER_H
#include <memory>
#include <string>
#include <chrono>
#include <zookeeper/zookeeper.h>
namespace cppkafka {
class ZookeeperWatcher {
public:
static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT;
ZookeeperWatcher(const std::string& endpoint);
ZookeeperWatcher(const std::string& endpoint, std::chrono::milliseconds receive_timeout);
std::string get_brokers();
private:
using HandlePtr = std::unique_ptr<zhandle_t, decltype(&zookeeper_close)>;
static const std::string BROKERS_PATH;
static void handle_event_proxy(zhandle_t* zh, int type, int state, const char* path,
void* ctx);
void handle_event(int type, int state, const char* path);
HandlePtr handle_;
};
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_WATCHER_H

View File

@@ -15,4 +15,12 @@ set(SOURCES
consumer.cpp
)
if (ENABLE_ZOOKEEPER)
set(ZOOKEEPER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp)
# Add json library as include dir (header only)
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json)
set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES})
endif()
add_library(cppkafka ${SOURCES})

View File

@@ -0,0 +1,118 @@
/*
* Copyright (c) 2016, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <stdexcept>
#include <sstream>
#include <picojson.h>
#include "zookeeper/zookeeper_watcher.h"
#include "exceptions.h"
using std::string;
using std::unique_ptr;
using std::ostringstream;
using std::runtime_error;
using std::chrono::milliseconds;
using picojson::value;
using picojson::object;
namespace cppkafka {
const milliseconds ZookeeperWatcher::DEFAULT_RECEIVE_TIMEOUT{10000};
const string ZookeeperWatcher::BROKERS_PATH = "/brokers/ids";
ZookeeperWatcher::ZookeeperWatcher(const string& endpoint)
: ZookeeperWatcher(endpoint, DEFAULT_RECEIVE_TIMEOUT) {
}
ZookeeperWatcher::ZookeeperWatcher(const string& endpoint, milliseconds receive_timeout)
: handle_(nullptr, nullptr) {
auto raw_handle = zookeeper_init(endpoint.data(), &ZookeeperWatcher::handle_event_proxy,
receive_timeout.count(), nullptr, this, 0);
if (!raw_handle) {
// TODO: make this a proper exception
throw runtime_error("Failed to create zookeeper handle");
}
handle_ = HandlePtr(raw_handle, &zookeeper_close);
}
string ZookeeperWatcher::get_brokers() {
using VectorPtr = unique_ptr<String_vector, decltype(&deallocate_String_vector)>;
String_vector brokers;
if (zoo_get_children(handle_.get(), BROKERS_PATH.data(), 1, &brokers) != ZOK) {
throw ZookeeperException("Failed to get broker list from zookeeper");
}
// RAII up this pointer
ostringstream oss;
VectorPtr _(&brokers, &deallocate_String_vector);
for (int i = 0; i < brokers.count; i++) {
char config_line[1024];
string path = "/brokers/ids/" + string(brokers.data[i]);
int config_len = sizeof(config_line);
zoo_get(handle_.get(), path.data(), 0, config_line, &config_len, NULL);
if (config_len > 0) {
config_line[config_len] = 0;
value root;
string error = picojson::parse(root, config_line);
if (!error.empty() || !root.is<object>()) {
throw ZookeeperException("Failed to parse zookeeper json: " + error);
}
const value::object& json_object = root.get<object>();
const value& host_json = json_object.at("host");
const value& port_json = json_object.at("port");
if (!host_json.is<string>() || !port_json.is<double>()) {
throw ZookeeperException("Invalid JSON received from zookeeper");
}
string host = host_json.get<string>();
int port = port_json.get<double>();
if (i != 0) {
oss << ",";
}
oss << host << ":" << port;
}
}
return oss.str();
}
void ZookeeperWatcher::handle_event_proxy(zhandle_t*, int type, int state, const char* path,
void* ctx) {
auto self = static_cast<ZookeeperWatcher*>(ctx);
self->handle_event(type, state, path);
}
void ZookeeperWatcher::handle_event(int type, int state, const char* path) {
if (type == ZOO_CHILD_EVENT && path == BROKERS_PATH) {
string brokers = get_brokers();
// TODO: Callback!
}
}
} // cppkafka

View File

@@ -4,8 +4,11 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/)
link_directories(${GOOGLETEST_LIBRARY})
link_libraries(cppkafka rdkafka gtest gtest_main pthread)
set(KAFKA_TEST_INSTANCE "127.0.0.1:9092"
CACHE STRING "The kafka instance to which connect to run tests")
set(KAFKA_TEST_INSTANCE "kafka-vm:9092"
CACHE STRING "The kafka instance to which to connect to run tests")
set(ZOOKEEPER_TEST_INSTANCE "kafka-vm:2181"
CACHE STRING "The zookeeper instance to which to connect to run tests")
macro(create_test test_name)
add_executable(${test_name}_test "${test_name}_test.cpp")
@@ -13,9 +16,14 @@ macro(create_test test_name)
endmacro()
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
add_definitions("-DZOOKEEPER_TEST_INSTANCE=\"${ZOOKEEPER_TEST_INSTANCE}\"")
create_test(consumer)
create_test(producer)
create_test(kafka_handle_base)
create_test(topic_partition_list)
create_test(configuration)
if (ENABLE_ZOOKEEPER)
create_test(zookeeper_watcher)
target_link_libraries(zookeeper_watcher_test zookeeper_mt)
endif()

View File

@@ -0,0 +1,17 @@
#include <gtest/gtest.h>
#include "cppkafka/zookeeper/zookeeper_watcher.h"
using std::string;
using namespace cppkafka;
class ZookeeperWatcherTest : public testing::Test {
public:
};
TEST_F(ZookeeperWatcherTest, GetBrokers) {
ZookeeperWatcher watcher(ZOOKEEPER_TEST_INSTANCE);
string brokers = watcher.get_brokers();
EXPECT_EQ(KAFKA_TEST_INSTANCE, brokers);
}

1047
third_party/json/picojson.h vendored Normal file

File diff suppressed because it is too large Load Diff