Add some documentation

This commit is contained in:
Matias Fontanini
2016-06-12 17:48:58 -07:00
parent 0cf8369ef9
commit 9751acd8df
19 changed files with 602 additions and 60 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
include/cppkafka/config.h

View File

@@ -35,11 +35,27 @@
namespace cppkafka {
/**
* \brief Represents a view of a buffer.
*
* This is only a view, hence you should convert the contents of a buffer into
* some other container if you want to store it somewhere.
*/
class Buffer {
public:
using DataType = unsigned char;
/**
* Constructs an empty buffer
*/
Buffer();
/**
* Constructs a buffer from a pointer and a size
*
* \param data A pointer to some type of size 1
* \param size The size of the buffer
*/
template <typename T>
Buffer(const T* data, size_t size)
: data_(reinterpret_cast<const DataType*>(data)), size_(size) {
@@ -51,9 +67,19 @@ public:
Buffer& operator=(const Buffer&) = delete;
Buffer& operator=(Buffer&&) = default;
/**
* Getter for the data pointer
*/
const DataType* get_data() const;
/**
* Getter for the size of the buffer
*/
size_t get_size() const;
/**
* Converts the contents of the buffer into a string
*/
std::string as_string() const;
private:
const DataType* data_;

View File

@@ -34,19 +34,41 @@
namespace cppkafka {
/**
* Smart pointer which allows copying via a clone functor
*/
template <typename T, typename Deleter, typename Cloner>
class ClonablePtr {
public:
/**
* Creates an instance
*
* \param ptr The pointer to be wrapped
* \param deleter The deleter functor
* \param cloner The clone functor
*/
ClonablePtr(T* ptr, const Deleter& deleter, const Cloner& cloner)
: handle_(ptr, deleter), cloner_(cloner) {
}
/**
* \brief Copies the given ClonablePtr
*
* Cloning will be done by invoking the Cloner type
*
* \param rhs The pointer to be copied
*/
ClonablePtr(const ClonablePtr& rhs)
: handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) {
}
/**
* Copies and assigns the given pointer
*
* \param rhs The pointer to be copied
*/
ClonablePtr& operator=(const ClonablePtr& rhs) {
handle_.reset(cloner_(rhs.handle_.get()));
return *this;
@@ -56,6 +78,9 @@ public:
ClonablePtr& operator=(ClonablePtr&&) = default;
~ClonablePtr() = default;
/**
* Getter for the internal pointer
*/
T* get() const {
return handle_.get();
}

View File

@@ -50,6 +50,14 @@ class Producer;
class Consumer;
class KafkaHandleBase;
/**
* \brief Represents a global configuration (rd_kafka_conf_t).
*
* This wraps an rdkafka configuration handle. It can safely be copied (will use
* rd_kafka_conf_dup under the hood) and moved.
*
* Some other overloads for Configuration::set are given via ConfigurationBase.
*/
class Configuration : public ConfigurationBase<Configuration> {
public:
using DeliveryReportCallback = std::function<void(Producer& producer, const Message&)>;
@@ -69,29 +77,121 @@ public:
using ConfigurationBase<Configuration>::set;
/**
* Default constructs a Configuration object
*/
Configuration();
/**
* \brief Sets an attribute.
*
* This will call rd_kafka_conf_set under the hood
*
* \param name The name of the attribute
* \param value The value of the attribute
*/
void set(const std::string& name, const std::string& value);
/**
* Sets the delivery report callback (invokes rd_kafka_conf_set_dr_msg_cb)
*/
void set_delivery_report_callback(DeliveryReportCallback callback);
/**
* Sets the offset commit callback (invokes rd_kafka_conf_set_offset_commit_cb)
*/
void set_offset_commit_callback(OffsetCommitCallback callback);
/**
* Sets the error callback (invokes rd_kafka_conf_set_error_cb)
*/
void set_error_callback(ErrorCallback callback);
/**
* Sets the throttle callback (invokes rd_kafka_conf_set_throttle_cb)
*/
void set_throttle_callback(ThrottleCallback callback);
/**
* Sets the log callback (invokes rd_kafka_conf_set_log_cb)
*/
void set_log_callback(LogCallback callback);
/**
* Sets the stats callback (invokes rd_kafka_conf_set_stats_cb)
*/
void set_stats_callback(StatsCallback callback);
/**
* Sets the socket callback (invokes rd_kafka_conf_set_socket_cb)
*/
void set_socket_callback(SocketCallback callback);
/**
* Sets the default topic configuration
*/
void set_default_topic_configuration(boost::optional<TopicConfiguration> config);
/**
* Returns true iff the given property name has been set
*/
bool has_property(const std::string& name) const;
/**
* Gets the rdkafka configuration handle
*/
rd_kafka_conf_t* get_handle() const;
/**
* Gets an option value
*
* \throws ConfigOptionNotFound if the option is not present
*/
std::string get(const std::string& name) const;
/**
* Gets the delivery report callback
*/
const DeliveryReportCallback& get_delivery_report_callback() const;
/**
* Gets the offset commit callback
*/
const OffsetCommitCallback& get_offset_commit_callback() const;
/**
* Gets the error callback
*/
const ErrorCallback& get_error_callback() const;
/**
* Gets the throttle callback
*/
const ThrottleCallback& get_throttle_callback() const;
/**
* Gets the log callback
*/
const LogCallback& get_log_callback() const;
/**
* Gets the stats callback
*/
const StatsCallback& get_stats_callback() const;
/**
* Gets the socket callback
*/
const SocketCallback& get_socket_callback() const;
/**
* Gets the default topic configuration
*/
const boost::optional<TopicConfiguration>& get_default_topic_configuration() const;
/**
* Gets the default topic configuration
*/
boost::optional<TopicConfiguration>& get_default_topic_configuration();
private:
static const std::unordered_set<std::string> VALID_EXTENSIONS;

View File

@@ -37,17 +37,25 @@ namespace cppkafka {
template <typename Concrete>
class ConfigurationBase {
public:
/**
* Sets a bool value
*/
void set(const std::string& name, bool value) {
proxy_set(name, value ? "true" : "false");
}
// Overload for any integral value
/**
* Sets a value of any integral value
*/
template <typename T,
typename = typename std::enable_if<std::is_integral<T>::value>::type>
void set(const std::string& name, T value) {
proxy_set(name, std::to_string(value));
}
/**
* Sets a cstring value
*/
void set(const std::string& name, const char* value) {
proxy_set(name, value);
}

View File

@@ -41,43 +41,266 @@ namespace cppkafka {
class TopicConfiguration;
/**
* \brief High level kafka consumer class
*
* Wrapper for the high level consumer API provided by rdkafka. Most methods are just
* a one to one mapping to rdkafka functions.
*
* This class allows hooking up to assignments/revocations via callbacks.
*
* Semi-simple code showing how to use this class
*
* \code
* // Create a configuration and set the group.id and zookeeper fields
* Configuration config;
* // This is only valid when using the zookeeper extension
* config.set("zookeeper", "127.0.0.1:2181");
* config.set("group.id", "foo");
*
* // Create a consumer
* Consumer consumer(config);
*
* // Set the assignment callback
* consumer.set_assignment_callback([&](vector<TopicPartition>& topic_partitions) {
* // Here you could fetch offsets and do something, altering the offsets on the
* // topic_partitions vector if needed
* cout << "Got assigned " << topic_partitions.count() << " partitions!" << endl;
* });
*
* // Set the revocation callback
* consumer.set_revocation_callback([&](const vector<TopicPartition>& topic_partitions) {
* cout << topic_partitions.size() << " partitions revoked!" << endl;
* });
*
* // Subscribe
* consumer.subscribe({ "my_topic" });
* while (true) {
* // Poll. This will optionally return a message. It's necessary to check if it's a valid
* // one before using it or bad things will happen
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* if (msg.get_error() == 0) {
* // It's an actual message. Get the payload and print it to stdout
* cout << msg.get_payload().as_string() << endl;
* }
* else {
* // Is it an error notification
* // ...
* }
* }
* }
* \endcode
*/
class Consumer : public KafkaHandleBase {
public:
using AssignmentCallback = std::function<void(TopicPartitionList&)>;
using RevocationCallback = std::function<void(const TopicPartitionList&)>;
using RebalanceErrorCallback = std::function<void(rd_kafka_resp_err_t)>;
/**
* \brief Creates an instance of a consumer.
*
* Note that the configuration *must contain* the group.id attribute set or this
* will throw.
*
* \param config The configuration to be used
*/
Consumer(Configuration config);
Consumer(const Consumer&) = delete;
Consumer(Consumer&) = delete;
Consumer& operator=(const Consumer&) = delete;
Consumer& operator=(Consumer&&) = delete;
/**
* \brief Closes and estroys the rdkafka handle
*
* This will call Consumer::close before destroying the handle
*/
~Consumer();
/**
* \brief Sets the topic/partition assignment callback
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into vector<TopicPartition>
* and executing the assignment/revocation/rebalance_error callbacks.
*
* \note You *do not need* to call Consumer::assign with the provided topic parttitions. This
* will be handled automatically by cppkafka.
*
* \param callback The topic/partition assignment callback
*/
void set_assignment_callback(AssignmentCallback callback);
/**
* \brief Sets the topic/partition revocation callback
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into vector<TopicPartition>
* and executing the assignment/revocation/rebalance_error callbacks.
*
* \note You *do not need* to call Consumer::assign with an empty topic partition list or
* anything like that. That's handled automatically by cppkafka. This is just a notifitation
* so your application code can react to revocations
*
* \param callback The topic/partition revocation callback
*/
void set_revocation_callback(RevocationCallback callback);
/**
* \brief Sets the rebalance error callback
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into vector<TopicPartition>
* and executing the assignment/revocation/rebalance_error callbacks.
*
* \param callback The rebalance error callback
*/
void set_rebalance_error_callback(RebalanceErrorCallback callback);
/**
* \brief Subscribes to the given list of topics
*
* This translates to a call to rd_kafka_subscribe
*
* \param topics The topics to subscribe to
*/
void subscribe(const std::vector<std::string>& topics);
/**
* \brief Unsubscribes to the current subscription list
*
* This translates to a call to rd_kafka_unsubscribe
*/
void unsubscribe();
/**
* \brief Sets the current topic/partition assignment
*
* This translates into a call to rd_kafka_assign
*/
void assign(const TopicPartitionList& topic_partitions);
/**
* \brief Unassigns the current topic/partition assignment
*
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* parameter
*/
void unassign();
/**
* \brief Closes the consumer session
*
* This translates into a call to rd_kafka_consumer_close
*/
void close();
/**
* \brief Commits the given message synchronously
*
* This translates into a call to rd_kafka_commit_message
*
* \param msg The message to be committed
*/
void commit(const Message& msg);
/**
* \brief Commits the given message asynchronously
*
* This translates into a call to rd_kafka_commit_message
*
* \param msg The message to be committed
*/
void async_commit(const Message& msg);
/**
* \brief Commits the offsets on the given topic/partitions synchronously
*
* This translates into a call to rd_kafka_commit
*
* \param topic_partitions The topic/partition list to be committed
*/
void commit(const TopicPartitionList& topic_partitions);
/**
* \brief Commits the offsets on the given topic/partitions asynchronously
*
* This translates into a call to rd_kafka_commit
*
* \param topic_partitions The topic/partition list to be committed
*/
void async_commit(const TopicPartitionList& topic_partitions);
OffsetTuple get_offsets(const std::string& topic, int partition) const;
/**
* \brief Gets the minimum and maximum offsets for the given topic/partition
*
* This translates into a call to rd_kafka_get_watermark_offsets
*
* \param topic_partition The topic/partition to get the offsets from
*/
OffsetTuple get_offsets(const TopicPartition& topic_partition) const;
/**
* \brief Gets the offsets committed for the given topic/partition list
*
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
/**
* \brief Gets the offset positions for the given topic/partition list
*
* This translates into a call to rd_kafka_position
*
* \param topic_partitions The topic/partition list to be queried
*/
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
TopicPartitionList get_subscription() const;
/**
* \brief Gets the current topic subscription
*
* This translates to a call to rd_kafka_subscription
*/
std::vector<std::string> get_subscription() const;
/**
* \brief Gets the current topic/partition list assignment
*
* This translates to a call to rd_kafka_assignment
*/
TopicPartitionList get_assignment() const;
/**
* \brief Gets the group member id
*
* This translates to a call to rd_kafka_memberid
*/
std::string get_member_id() const;
/**
* \brief Polls for new messages
*
* This will call rd_kafka_consumer_poll.
*
* Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker
* will think this consumer is down and will trigger a rebalance (if using dynamic
* subscription).
*
* The returned message *might* be empty. If's necessary to check that it's a valid one before
* using it:
*
* \code
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* }
* \endcode
*/
Message poll();
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,

View File

@@ -36,6 +36,9 @@
namespace cppkafka {
/**
* Base class for all cppkafka exceptions
*/
class Exception : public std::exception {
public:
Exception(std::string message);
@@ -45,16 +48,25 @@ private:
std::string message_;
};
/**
* A configuration related error
*/
class ConfigException : public Exception {
public:
ConfigException(const std::string& config_name, const std::string& error);
};
/**
* Indicates a configuration option was not set
*/
class ConfigOptionNotFound : public Exception {
public:
ConfigOptionNotFound(const std::string& config_name);
};
/**
* A generic rdkafka handle error
*/
class HandleException : public Exception {
public:
HandleException(rd_kafka_resp_err_t error_code);
@@ -64,6 +76,9 @@ private:
rd_kafka_resp_err_t error_code_;
};
/**
* An exception when using zookeeper
*/
class ZookeeperException : public Exception {
public:
using Exception::Exception;

View File

@@ -44,13 +44,16 @@
#include "configuration.h"
#include "config.h"
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
#include "zookeeper/zookeeper_subscriber.h"
#include "zookeeper/zookeeper_subscription.h"
#endif // CPPKAFKA_HAVE_ZOOKEEPER
namespace cppkafka {
class Topic;
/**
* Base class for kafka consumer/producer
*/
class KafkaHandleBase {
public:
using OffsetTuple = std::tuple<int64_t, int64_t>;
@@ -61,20 +64,100 @@ public:
KafkaHandleBase& operator=(const KafkaHandleBase&) = delete;
KafkaHandleBase& operator=(KafkaHandleBase&&) = delete;
/**
* \brief Pauses consumption/production from the given topic/partition list
*
* This translates into a call to rd_kafka_pause_partitions
*
* \param topic_partitions The topic/partition list to pause consuming/producing from/to
*/
void pause_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Resumes consumption/production from the given topic/partition list
*
* This translates into a call to rd_kafka_resume_partitions
*
* \param topic_partitions The topic/partition list to resume consuming/producing from/to
*/
void resume_partitions(const TopicPartitionList& topic_partitions);
/**
* \brief Sets the timeout for operations that require a timeout
*
* This timeout is applied to operations like polling, querying for offsets, etc
*
* \param timeout The timeout to be set
*/
void set_timeout(const std::chrono::milliseconds& timeout);
OffsetTuple query_offsets(const std::string& topic, int partition) const;
/**
* \brief Queries the offset for the given topic/partition
*
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
/**
* Gets the rdkafka handle
*/
rd_kafka_t* get_handle() const;
/**
* \brief Creates a topic handle
*
* This translates into a call to rd_kafka_topic_new. This will use the default topic
* configuration provided in the Configuration object for this consumer/producer handle,
* if any.
*
* \param name The name of the topic to be created
*/
Topic get_topic(const std::string& name);
/**
* \brief Creates a topic handle
*
* This translates into a call to rd_kafka_topic_new.
*
* \param name The name of the topic to be created
* \param config The configuration to be used for the new topic
*/
Topic get_topic(const std::string& name, TopicConfiguration config);
/**
* \brief Gets metadata for brokers, topics, partitions, etc
*
* This translates into a call to rd_kafka_metadata
*/
Metadata get_metadata() const;
/**
* \brief Gets general metadata but only fetches metadata for the given topic rather than
* all of them
*
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*/
Metadata get_metadata(const Topic& topic) const;
/**
* Returns the kafka handle name
*/
std::string get_name() const;
/**
* Gets the configured timeout.
*
* \sa KafkaHandleBase::set_timeout
*/
std::chrono::milliseconds get_timeout() const;
/**
* Gets the handle's configuration
*/
const Configuration& get_configuration() const;
protected:
KafkaHandleBase(Configuration config);
@@ -99,7 +182,7 @@ private:
std::mutex topic_configurations_mutex_;
#ifdef CPPKAFKA_HAVE_ZOOKEEPER
// This could be an optional but apparently move construction is only supported as of 1.56
std::unique_ptr<ZookeeperSubscriber> zookeeper_subscriber_;
std::unique_ptr<ZookeeperSubscription> zookeeper_subscription_;
#endif // CPPKAFKA_HAVE_ZOOKEEPER
};

View File

@@ -38,18 +38,52 @@
namespace cppkafka {
class ZookeeperSubscriber;
class ZookeeperSubscription;
/**
* \brief Pool of zookeeper handles
*
* This class is used internally by cppkafka.
*/
class ZookeeperPool {
public:
/**
* Get singleton instance
*/
static ZookeeperPool& instance();
ZookeeperSubscriber subscribe(const std::string& endpoint,
/**
* Subscribe to the given endpoint
*
* \param endpoint The zookeeper endpoint to subscribe to
* \param receive_timeout The zookeeper receive timeout
* \param callback The callback to be executed on updates
*
* \return A ZookeeperSubscription that will auto-unsubscribe upon destruction
*/
ZookeeperSubscription subscribe(const std::string& endpoint,
std::chrono::milliseconds receive_timeout,
ZookeeperWatcher::WatcherCallback callback);
void unsubscribe(const ZookeeperSubscriber& subscriber);
/**
* Unsubscribes from a previous subscription
*
* \param subscriber The subscriber return by a previous call to ZookeeperPool::subscribe
*/
void unsubscribe(const ZookeeperSubscription& subscriber);
/**
* \brief Gets the broker list for the given zookeeper endpoint
*
* Requires having previously called subscribe for this endpoint at least once.
*
* \param endpoint The endpoint for which to get the broker list
*/
std::string get_brokers(const std::string& endpoint);
/**
* Gets the amount of subscribers for the given zookeeper endpoint
*/
size_t get_subscriber_count(const std::string& endpoint) const;
private:
using WatchersMap = std::map<std::string, ZookeeperWatcher>;

View File

@@ -27,29 +27,37 @@
*
*/
#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
#define CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H
#define CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H
#include <string>
namespace cppkafka {
class ZookeeperSubscriber {
/**
* \cond
*/
class ZookeeperSubscription {
public:
ZookeeperSubscriber(std::string endpoint, std::string subscription_id);
ZookeeperSubscriber(ZookeeperSubscriber&&) = default;
ZookeeperSubscriber(const ZookeeperSubscriber&) = delete;
ZookeeperSubscriber& operator=(ZookeeperSubscriber&&);
ZookeeperSubscriber& operator=(const ZookeeperSubscriber&) = delete;
~ZookeeperSubscriber();
ZookeeperSubscription(std::string endpoint, std::string subscription_id);
ZookeeperSubscription(ZookeeperSubscription&&) = default;
ZookeeperSubscription(const ZookeeperSubscription&) = delete;
ZookeeperSubscription& operator=(ZookeeperSubscription&&);
ZookeeperSubscription& operator=(const ZookeeperSubscription&) = delete;
~ZookeeperSubscription();
const std::string& get_endpoint() const;
const std::string& get_subscription_id() const;
private:
std::string endpoint_;
std::string subscription_id_;
};
/**
* \endcond
*/
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H
#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H

View File

@@ -40,6 +40,9 @@
namespace cppkafka {
/**
* \cond
*/
class ZookeeperWatcher {
public:
static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT;
@@ -73,6 +76,10 @@ private:
size_t id_counter_{0};
};
/**
* \endcond
*/
} // cppkafka
#endif // CPPKAFKA_ZOOKEEPER_WATCHER_H

View File

@@ -20,7 +20,7 @@ if (ENABLE_ZOOKEEPER)
set(ZOOKEEPER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscriber.cpp)
${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscription.cpp)
# Add json library as include dir (header only)
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json)
set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES})

View File

@@ -126,9 +126,11 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) {
commit(topic_partitions, true);
}
KafkaHandleBase::OffsetTuple Consumer::get_offsets(const string& topic, int partition) const {
KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const {
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(),
partition, &low, &high);
check_error(result);
@@ -152,12 +154,18 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const
return convert(topic_list_handle);
}
TopicPartitionList Consumer::get_subscription() const {
vector<string> Consumer::get_subscription() const {
rd_kafka_resp_err_t error;
rd_kafka_topic_partition_list_t* list = nullptr;
error = rd_kafka_subscription(get_handle(), &list);
check_error(error);
return convert(make_handle(list));
auto handle = make_handle(list);
vector<string> output;
for (const auto& topic_partition : convert(handle)) {
output.push_back(topic_partition.get_topic());
}
return output;
}
TopicPartitionList Consumer::get_assignment() const {

View File

@@ -91,10 +91,12 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config)
return get_topic(name, rd_kafka_topic_conf_dup(handle));
}
KafkaHandleBase::OffsetTuple KafkaHandleBase::query_offsets(const string& topic,
int partition) const {
KafkaHandleBase::OffsetTuple
KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const {
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(),
partition, &low, &high,
timeout_ms_.count());
@@ -144,8 +146,8 @@ void KafkaHandleBase::set_handle(rd_kafka_t* handle) {
rd_kafka_brokers_add(handle_.get(), brokers.data());
rd_kafka_poll(handle_.get(), 10);
};
ZookeeperSubscriber subscriber = pool.subscribe(endpoint, timeout, callback);
zookeeper_subscriber_.reset(new ZookeeperSubscriber(move(subscriber)));
ZookeeperSubscription subscriber = pool.subscribe(endpoint, timeout, callback);
zookeeper_subscription_.reset(new ZookeeperSubscription(move(subscriber)));
callback(pool.get_brokers(endpoint));
}

View File

@@ -28,7 +28,7 @@
*/
#include "zookeeper/zookeeper_pool.h"
#include "zookeeper/zookeeper_subscriber.h"
#include "zookeeper/zookeeper_subscription.h"
#include "exceptions.h"
using std::string;
@@ -45,7 +45,7 @@ ZookeeperPool& ZookeeperPool::instance() {
return the_instance;
}
ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint,
ZookeeperSubscription ZookeeperPool::subscribe(const string& endpoint,
milliseconds receive_timeout,
ZookeeperWatcher::WatcherCallback callback) {
lock_guard<mutex> _(watchers_mutex_);
@@ -55,10 +55,10 @@ ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint,
forward_as_tuple(endpoint, receive_timeout)).first;
}
string id = iter->second.subscribe(move(callback));
return ZookeeperSubscriber(endpoint, id);
return ZookeeperSubscription(endpoint, id);
}
void ZookeeperPool::unsubscribe(const ZookeeperSubscriber& subscriber) {
void ZookeeperPool::unsubscribe(const ZookeeperSubscription& subscriber) {
lock_guard<mutex> _(watchers_mutex_);
auto iter = watchers_.find(subscriber.get_endpoint());
if (iter != watchers_.end()) {

View File

@@ -1,25 +0,0 @@
#include "zookeeper/zookeeper_subscriber.h"
#include "zookeeper/zookeeper_pool.h"
using std::string;
namespace cppkafka {
ZookeeperSubscriber::ZookeeperSubscriber(string endpoint, string subscription_id)
: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) {
}
ZookeeperSubscriber::~ZookeeperSubscriber() {
ZookeeperPool::instance().unsubscribe(*this);
}
const string& ZookeeperSubscriber::get_endpoint() const {
return endpoint_;
}
const string& ZookeeperSubscriber::get_subscription_id() const {
return subscription_id_;
}
} // cppkafka

View File

@@ -0,0 +1,25 @@
#include "zookeeper/zookeeper_subscription.h"
#include "zookeeper/zookeeper_pool.h"
using std::string;
namespace cppkafka {
ZookeeperSubscription::ZookeeperSubscription(string endpoint, string subscription_id)
: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) {
}
ZookeeperSubscription::~ZookeeperSubscription() {
ZookeeperPool::instance().unsubscribe(*this);
}
const string& ZookeeperSubscription::get_endpoint() const {
return endpoint_;
}
const string& ZookeeperSubscription::get_subscription_id() const {
return subscription_id_;
}
} // cppkafka

View File

@@ -130,12 +130,14 @@ TEST_F(ConsumerTest, AssignmentCallback) {
}
EXPECT_EQ(1, runner.get_messages().size());
EXPECT_EQ(vector<string>{ KAFKA_TOPIC }, consumer.get_subscription());
assignment = consumer.get_assignment();
EXPECT_EQ(3, assignment.size());
int64_t low;
int64_t high;
tie(low, high) = consumer.get_offsets(KAFKA_TOPIC, partition);
tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition });
EXPECT_GT(high, low);
EXPECT_EQ(high, runner.get_messages().back().get_offset() + 1);
}

View File

@@ -128,7 +128,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) {
int64_t low;
int64_t high;
tie(low, high) = producer.query_offsets(KAFKA_TOPIC, partition);
tie(low, high) = producer.query_offsets({ KAFKA_TOPIC, partition });
EXPECT_GT(high, low);
}