From 9751acd8df6bb92c9b0e75fe760210ef017a2bab Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 12 Jun 2016 17:48:58 -0700 Subject: [PATCH] Add some documentation --- .gitignore | 1 + include/cppkafka/buffer.h | 26 ++ include/cppkafka/clonable_ptr.h | 25 ++ include/cppkafka/configuration.h | 100 ++++++++ include/cppkafka/configuration_base.h | 10 +- include/cppkafka/consumer.h | 227 +++++++++++++++++- include/cppkafka/exceptions.h | 15 ++ include/cppkafka/kafka_handle_base.h | 89 ++++++- include/cppkafka/zookeeper/zookeeper_pool.h | 44 +++- ..._subscriber.h => zookeeper_subscription.h} | 28 ++- .../cppkafka/zookeeper/zookeeper_watcher.h | 7 + src/CMakeLists.txt | 2 +- src/consumer.cpp | 14 +- src/kafka_handle_base.cpp | 10 +- src/zookeeper/zookeeper_pool.cpp | 8 +- src/zookeeper/zookeeper_subscriber.cpp | 25 -- src/zookeeper/zookeeper_subscription.cpp | 25 ++ tests/consumer_test.cpp | 4 +- tests/producer_test.cpp | 2 +- 19 files changed, 602 insertions(+), 60 deletions(-) create mode 100644 .gitignore rename include/cppkafka/zookeeper/{zookeeper_subscriber.h => zookeeper_subscription.h} (73%) delete mode 100644 src/zookeeper/zookeeper_subscriber.cpp create mode 100644 src/zookeeper/zookeeper_subscription.cpp diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..603b88a --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +include/cppkafka/config.h diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index ef8a0b2..6c6d0f9 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -35,11 +35,27 @@ namespace cppkafka { +/** + * \brief Represents a view of a buffer. + * + * This is only a view, hence you should convert the contents of a buffer into + * some other container if you want to store it somewhere. + */ class Buffer { public: using DataType = unsigned char; + /** + * Constructs an empty buffer + */ Buffer(); + + /** + * Constructs a buffer from a pointer and a size + * + * \param data A pointer to some type of size 1 + * \param size The size of the buffer + */ template Buffer(const T* data, size_t size) : data_(reinterpret_cast(data)), size_(size) { @@ -51,9 +67,19 @@ public: Buffer& operator=(const Buffer&) = delete; Buffer& operator=(Buffer&&) = default; + /** + * Getter for the data pointer + */ const DataType* get_data() const; + + /** + * Getter for the size of the buffer + */ size_t get_size() const; + /** + * Converts the contents of the buffer into a string + */ std::string as_string() const; private: const DataType* data_; diff --git a/include/cppkafka/clonable_ptr.h b/include/cppkafka/clonable_ptr.h index 3b959f5..da865a7 100644 --- a/include/cppkafka/clonable_ptr.h +++ b/include/cppkafka/clonable_ptr.h @@ -34,19 +34,41 @@ namespace cppkafka { +/** + * Smart pointer which allows copying via a clone functor + */ template class ClonablePtr { public: + /** + * Creates an instance + * + * \param ptr The pointer to be wrapped + * \param deleter The deleter functor + * \param cloner The clone functor + */ ClonablePtr(T* ptr, const Deleter& deleter, const Cloner& cloner) : handle_(ptr, deleter), cloner_(cloner) { } + /** + * \brief Copies the given ClonablePtr + * + * Cloning will be done by invoking the Cloner type + * + * \param rhs The pointer to be copied + */ ClonablePtr(const ClonablePtr& rhs) : handle_(rhs.cloner_(rhs.handle_.get()), rhs.handle_.get_deleter()), cloner_(rhs.cloner_) { } + /** + * Copies and assigns the given pointer + * + * \param rhs The pointer to be copied + */ ClonablePtr& operator=(const ClonablePtr& rhs) { handle_.reset(cloner_(rhs.handle_.get())); return *this; @@ -56,6 +78,9 @@ public: ClonablePtr& operator=(ClonablePtr&&) = default; ~ClonablePtr() = default; + /** + * Getter for the internal pointer + */ T* get() const { return handle_.get(); } diff --git a/include/cppkafka/configuration.h b/include/cppkafka/configuration.h index 8d4f87b..d347005 100644 --- a/include/cppkafka/configuration.h +++ b/include/cppkafka/configuration.h @@ -50,6 +50,14 @@ class Producer; class Consumer; class KafkaHandleBase; +/** + * \brief Represents a global configuration (rd_kafka_conf_t). + * + * This wraps an rdkafka configuration handle. It can safely be copied (will use + * rd_kafka_conf_dup under the hood) and moved. + * + * Some other overloads for Configuration::set are given via ConfigurationBase. + */ class Configuration : public ConfigurationBase { public: using DeliveryReportCallback = std::function; @@ -69,29 +77,121 @@ public: using ConfigurationBase::set; + /** + * Default constructs a Configuration object + */ Configuration(); + /** + * \brief Sets an attribute. + * + * This will call rd_kafka_conf_set under the hood + * + * \param name The name of the attribute + * \param value The value of the attribute + */ void set(const std::string& name, const std::string& value); + + /** + * Sets the delivery report callback (invokes rd_kafka_conf_set_dr_msg_cb) + */ void set_delivery_report_callback(DeliveryReportCallback callback); + + /** + * Sets the offset commit callback (invokes rd_kafka_conf_set_offset_commit_cb) + */ void set_offset_commit_callback(OffsetCommitCallback callback); + + /** + * Sets the error callback (invokes rd_kafka_conf_set_error_cb) + */ void set_error_callback(ErrorCallback callback); + + /** + * Sets the throttle callback (invokes rd_kafka_conf_set_throttle_cb) + */ void set_throttle_callback(ThrottleCallback callback); + + /** + * Sets the log callback (invokes rd_kafka_conf_set_log_cb) + */ void set_log_callback(LogCallback callback); + + /** + * Sets the stats callback (invokes rd_kafka_conf_set_stats_cb) + */ void set_stats_callback(StatsCallback callback); + + /** + * Sets the socket callback (invokes rd_kafka_conf_set_socket_cb) + */ void set_socket_callback(SocketCallback callback); + + /** + * Sets the default topic configuration + */ void set_default_topic_configuration(boost::optional config); + /** + * Returns true iff the given property name has been set + */ bool has_property(const std::string& name) const; + + /** + * Gets the rdkafka configuration handle + */ rd_kafka_conf_t* get_handle() const; + + /** + * Gets an option value + * + * \throws ConfigOptionNotFound if the option is not present + */ std::string get(const std::string& name) const; + + /** + * Gets the delivery report callback + */ const DeliveryReportCallback& get_delivery_report_callback() const; + + /** + * Gets the offset commit callback + */ const OffsetCommitCallback& get_offset_commit_callback() const; + + /** + * Gets the error callback + */ const ErrorCallback& get_error_callback() const; + + /** + * Gets the throttle callback + */ const ThrottleCallback& get_throttle_callback() const; + + /** + * Gets the log callback + */ const LogCallback& get_log_callback() const; + + /** + * Gets the stats callback + */ const StatsCallback& get_stats_callback() const; + + /** + * Gets the socket callback + */ const SocketCallback& get_socket_callback() const; + + /** + * Gets the default topic configuration + */ const boost::optional& get_default_topic_configuration() const; + + /** + * Gets the default topic configuration + */ boost::optional& get_default_topic_configuration(); private: static const std::unordered_set VALID_EXTENSIONS; diff --git a/include/cppkafka/configuration_base.h b/include/cppkafka/configuration_base.h index cb20fb3..be57835 100644 --- a/include/cppkafka/configuration_base.h +++ b/include/cppkafka/configuration_base.h @@ -37,17 +37,25 @@ namespace cppkafka { template class ConfigurationBase { public: + /** + * Sets a bool value + */ void set(const std::string& name, bool value) { proxy_set(name, value ? "true" : "false"); } - // Overload for any integral value + /** + * Sets a value of any integral value + */ template ::value>::type> void set(const std::string& name, T value) { proxy_set(name, std::to_string(value)); } + /** + * Sets a cstring value + */ void set(const std::string& name, const char* value) { proxy_set(name, value); } diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 3f6609e..c001c95 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -41,43 +41,266 @@ namespace cppkafka { class TopicConfiguration; +/** + * \brief High level kafka consumer class + * + * Wrapper for the high level consumer API provided by rdkafka. Most methods are just + * a one to one mapping to rdkafka functions. + * + * This class allows hooking up to assignments/revocations via callbacks. + * + * Semi-simple code showing how to use this class + * + * \code + * // Create a configuration and set the group.id and zookeeper fields + * Configuration config; + * // This is only valid when using the zookeeper extension + * config.set("zookeeper", "127.0.0.1:2181"); + * config.set("group.id", "foo"); + * + * // Create a consumer + * Consumer consumer(config); + * + * // Set the assignment callback + * consumer.set_assignment_callback([&](vector& topic_partitions) { + * // Here you could fetch offsets and do something, altering the offsets on the + * // topic_partitions vector if needed + * cout << "Got assigned " << topic_partitions.count() << " partitions!" << endl; + * }); + * + * // Set the revocation callback + * consumer.set_revocation_callback([&](const vector& topic_partitions) { + * cout << topic_partitions.size() << " partitions revoked!" << endl; + * }); + * + * // Subscribe + * consumer.subscribe({ "my_topic" }); + * while (true) { + * // Poll. This will optionally return a message. It's necessary to check if it's a valid + * // one before using it or bad things will happen + * Message msg = consumer.poll(); + * if (msg) { + * // It's a valid message! + * if (msg.get_error() == 0) { + * // It's an actual message. Get the payload and print it to stdout + * cout << msg.get_payload().as_string() << endl; + * } + * else { + * // Is it an error notification + * // ... + * } + * } + * } + * \endcode + */ class Consumer : public KafkaHandleBase { public: using AssignmentCallback = std::function; using RevocationCallback = std::function; using RebalanceErrorCallback = std::function; + /** + * \brief Creates an instance of a consumer. + * + * Note that the configuration *must contain* the group.id attribute set or this + * will throw. + * + * \param config The configuration to be used + */ Consumer(Configuration config); Consumer(const Consumer&) = delete; Consumer(Consumer&) = delete; Consumer& operator=(const Consumer&) = delete; Consumer& operator=(Consumer&&) = delete; + + /** + * \brief Closes and estroys the rdkafka handle + * + * This will call Consumer::close before destroying the handle + */ ~Consumer(); + /** + * \brief Sets the topic/partition assignment callback + * + * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the + * rebalance, converting from rdkafka topic partition list handles into vector + * and executing the assignment/revocation/rebalance_error callbacks. + * + * \note You *do not need* to call Consumer::assign with the provided topic parttitions. This + * will be handled automatically by cppkafka. + * + * \param callback The topic/partition assignment callback + */ void set_assignment_callback(AssignmentCallback callback); + + /** + * \brief Sets the topic/partition revocation callback + * + * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the + * rebalance, converting from rdkafka topic partition list handles into vector + * and executing the assignment/revocation/rebalance_error callbacks. + * + * \note You *do not need* to call Consumer::assign with an empty topic partition list or + * anything like that. That's handled automatically by cppkafka. This is just a notifitation + * so your application code can react to revocations + * + * \param callback The topic/partition revocation callback + */ void set_revocation_callback(RevocationCallback callback); + + /** + * \brief Sets the rebalance error callback + * + * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the + * rebalance, converting from rdkafka topic partition list handles into vector + * and executing the assignment/revocation/rebalance_error callbacks. + * + * \param callback The rebalance error callback + */ void set_rebalance_error_callback(RebalanceErrorCallback callback); + /** + * \brief Subscribes to the given list of topics + * + * This translates to a call to rd_kafka_subscribe + * + * \param topics The topics to subscribe to + */ void subscribe(const std::vector& topics); + + /** + * \brief Unsubscribes to the current subscription list + * + * This translates to a call to rd_kafka_unsubscribe + */ void unsubscribe(); + /** + * \brief Sets the current topic/partition assignment + * + * This translates into a call to rd_kafka_assign + */ void assign(const TopicPartitionList& topic_partitions); + + /** + * \brief Unassigns the current topic/partition assignment + * + * This translates into a call to rd_kafka_assign using a null as the topic partition list + * parameter + */ void unassign(); + + /** + * \brief Closes the consumer session + * + * This translates into a call to rd_kafka_consumer_close + */ void close(); + /** + * \brief Commits the given message synchronously + * + * This translates into a call to rd_kafka_commit_message + * + * \param msg The message to be committed + */ void commit(const Message& msg); + + /** + * \brief Commits the given message asynchronously + * + * This translates into a call to rd_kafka_commit_message + * + * \param msg The message to be committed + */ void async_commit(const Message& msg); + + /** + * \brief Commits the offsets on the given topic/partitions synchronously + * + * This translates into a call to rd_kafka_commit + * + * \param topic_partitions The topic/partition list to be committed + */ void commit(const TopicPartitionList& topic_partitions); + + /** + * \brief Commits the offsets on the given topic/partitions asynchronously + * + * This translates into a call to rd_kafka_commit + * + * \param topic_partitions The topic/partition list to be committed + */ void async_commit(const TopicPartitionList& topic_partitions); - OffsetTuple get_offsets(const std::string& topic, int partition) const; + /** + * \brief Gets the minimum and maximum offsets for the given topic/partition + * + * This translates into a call to rd_kafka_get_watermark_offsets + * + * \param topic_partition The topic/partition to get the offsets from + */ + OffsetTuple get_offsets(const TopicPartition& topic_partition) const; + /** + * \brief Gets the offsets committed for the given topic/partition list + * + * This translates into a call to rd_kafka_committed + * + * \param topic_partitions The topic/partition list to be queried + */ TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; + + /** + * \brief Gets the offset positions for the given topic/partition list + * + * This translates into a call to rd_kafka_position + * + * \param topic_partitions The topic/partition list to be queried + */ TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; - TopicPartitionList get_subscription() const; + + /** + * \brief Gets the current topic subscription + * + * This translates to a call to rd_kafka_subscription + */ + std::vector get_subscription() const; + + /** + * \brief Gets the current topic/partition list assignment + * + * This translates to a call to rd_kafka_assignment + */ TopicPartitionList get_assignment() const; + + /** + * \brief Gets the group member id + * + * This translates to a call to rd_kafka_memberid + */ std::string get_member_id() const; + /** + * \brief Polls for new messages + * + * This will call rd_kafka_consumer_poll. + * + * Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker + * will think this consumer is down and will trigger a rebalance (if using dynamic + * subscription). + * + * The returned message *might* be empty. If's necessary to check that it's a valid one before + * using it: + * + * \code + * Message msg = consumer.poll(); + * if (msg) { + * // It's a valid message! + * } + * \endcode + */ Message poll(); private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index 4305bc5..4b912d6 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -36,6 +36,9 @@ namespace cppkafka { +/** + * Base class for all cppkafka exceptions + */ class Exception : public std::exception { public: Exception(std::string message); @@ -45,16 +48,25 @@ private: std::string message_; }; +/** + * A configuration related error + */ class ConfigException : public Exception { public: ConfigException(const std::string& config_name, const std::string& error); }; +/** + * Indicates a configuration option was not set + */ class ConfigOptionNotFound : public Exception { public: ConfigOptionNotFound(const std::string& config_name); }; +/** + * A generic rdkafka handle error + */ class HandleException : public Exception { public: HandleException(rd_kafka_resp_err_t error_code); @@ -64,6 +76,9 @@ private: rd_kafka_resp_err_t error_code_; }; +/** + * An exception when using zookeeper + */ class ZookeeperException : public Exception { public: using Exception::Exception; diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 26a4520..e45d348 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -44,13 +44,16 @@ #include "configuration.h" #include "config.h" #ifdef CPPKAFKA_HAVE_ZOOKEEPER - #include "zookeeper/zookeeper_subscriber.h" + #include "zookeeper/zookeeper_subscription.h" #endif // CPPKAFKA_HAVE_ZOOKEEPER namespace cppkafka { class Topic; +/** + * Base class for kafka consumer/producer + */ class KafkaHandleBase { public: using OffsetTuple = std::tuple; @@ -61,20 +64,100 @@ public: KafkaHandleBase& operator=(const KafkaHandleBase&) = delete; KafkaHandleBase& operator=(KafkaHandleBase&&) = delete; + /** + * \brief Pauses consumption/production from the given topic/partition list + * + * This translates into a call to rd_kafka_pause_partitions + * + * \param topic_partitions The topic/partition list to pause consuming/producing from/to + */ void pause_partitions(const TopicPartitionList& topic_partitions); + + /** + * \brief Resumes consumption/production from the given topic/partition list + * + * This translates into a call to rd_kafka_resume_partitions + * + * \param topic_partitions The topic/partition list to resume consuming/producing from/to + */ void resume_partitions(const TopicPartitionList& topic_partitions); + /** + * \brief Sets the timeout for operations that require a timeout + * + * This timeout is applied to operations like polling, querying for offsets, etc + * + * \param timeout The timeout to be set + */ void set_timeout(const std::chrono::milliseconds& timeout); - OffsetTuple query_offsets(const std::string& topic, int partition) const; + /** + * \brief Queries the offset for the given topic/partition + * + * This translates into a call to rd_kafka_query_watermark_offsets + * + * \param topic_partition The topic/partition to be queried + */ + OffsetTuple query_offsets(const TopicPartition& topic_partition) const; + /** + * Gets the rdkafka handle + */ rd_kafka_t* get_handle() const; + + /** + * \brief Creates a topic handle + * + * This translates into a call to rd_kafka_topic_new. This will use the default topic + * configuration provided in the Configuration object for this consumer/producer handle, + * if any. + * + * \param name The name of the topic to be created + */ Topic get_topic(const std::string& name); + + /** + * \brief Creates a topic handle + * + * This translates into a call to rd_kafka_topic_new. + * + * \param name The name of the topic to be created + * \param config The configuration to be used for the new topic + */ Topic get_topic(const std::string& name, TopicConfiguration config); + + /** + * \brief Gets metadata for brokers, topics, partitions, etc + * + * This translates into a call to rd_kafka_metadata + */ Metadata get_metadata() const; + + /** + * \brief Gets general metadata but only fetches metadata for the given topic rather than + * all of them + * + * This translates into a call to rd_kafka_metadata + * + * \param topic The topic to fetch information for + */ Metadata get_metadata(const Topic& topic) const; + + /** + * Returns the kafka handle name + */ std::string get_name() const; + + /** + * Gets the configured timeout. + * + * \sa KafkaHandleBase::set_timeout + */ std::chrono::milliseconds get_timeout() const; + + /** + * Gets the handle's configuration + */ const Configuration& get_configuration() const; protected: KafkaHandleBase(Configuration config); @@ -99,7 +182,7 @@ private: std::mutex topic_configurations_mutex_; #ifdef CPPKAFKA_HAVE_ZOOKEEPER // This could be an optional but apparently move construction is only supported as of 1.56 - std::unique_ptr zookeeper_subscriber_; + std::unique_ptr zookeeper_subscription_; #endif // CPPKAFKA_HAVE_ZOOKEEPER }; diff --git a/include/cppkafka/zookeeper/zookeeper_pool.h b/include/cppkafka/zookeeper/zookeeper_pool.h index e2a6d9a..37fb0d4 100644 --- a/include/cppkafka/zookeeper/zookeeper_pool.h +++ b/include/cppkafka/zookeeper/zookeeper_pool.h @@ -38,18 +38,52 @@ namespace cppkafka { -class ZookeeperSubscriber; +class ZookeeperSubscription; +/** + * \brief Pool of zookeeper handles + * + * This class is used internally by cppkafka. + */ class ZookeeperPool { public: + /** + * Get singleton instance + */ static ZookeeperPool& instance(); - ZookeeperSubscriber subscribe(const std::string& endpoint, - std::chrono::milliseconds receive_timeout, - ZookeeperWatcher::WatcherCallback callback); - void unsubscribe(const ZookeeperSubscriber& subscriber); + /** + * Subscribe to the given endpoint + * + * \param endpoint The zookeeper endpoint to subscribe to + * \param receive_timeout The zookeeper receive timeout + * \param callback The callback to be executed on updates + * + * \return A ZookeeperSubscription that will auto-unsubscribe upon destruction + */ + ZookeeperSubscription subscribe(const std::string& endpoint, + std::chrono::milliseconds receive_timeout, + ZookeeperWatcher::WatcherCallback callback); + + /** + * Unsubscribes from a previous subscription + * + * \param subscriber The subscriber return by a previous call to ZookeeperPool::subscribe + */ + void unsubscribe(const ZookeeperSubscription& subscriber); + + /** + * \brief Gets the broker list for the given zookeeper endpoint + * + * Requires having previously called subscribe for this endpoint at least once. + * + * \param endpoint The endpoint for which to get the broker list + */ std::string get_brokers(const std::string& endpoint); + /** + * Gets the amount of subscribers for the given zookeeper endpoint + */ size_t get_subscriber_count(const std::string& endpoint) const; private: using WatchersMap = std::map; diff --git a/include/cppkafka/zookeeper/zookeeper_subscriber.h b/include/cppkafka/zookeeper/zookeeper_subscription.h similarity index 73% rename from include/cppkafka/zookeeper/zookeeper_subscriber.h rename to include/cppkafka/zookeeper/zookeeper_subscription.h index 8cc82be..c4df39f 100644 --- a/include/cppkafka/zookeeper/zookeeper_subscriber.h +++ b/include/cppkafka/zookeeper/zookeeper_subscription.h @@ -27,29 +27,37 @@ * */ -#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H -#define CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H +#ifndef CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H +#define CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H #include namespace cppkafka { -class ZookeeperSubscriber { +/** + * \cond + */ +class ZookeeperSubscription { public: - ZookeeperSubscriber(std::string endpoint, std::string subscription_id); - ZookeeperSubscriber(ZookeeperSubscriber&&) = default; - ZookeeperSubscriber(const ZookeeperSubscriber&) = delete; - ZookeeperSubscriber& operator=(ZookeeperSubscriber&&); - ZookeeperSubscriber& operator=(const ZookeeperSubscriber&) = delete; - ~ZookeeperSubscriber(); + ZookeeperSubscription(std::string endpoint, std::string subscription_id); + ZookeeperSubscription(ZookeeperSubscription&&) = default; + ZookeeperSubscription(const ZookeeperSubscription&) = delete; + ZookeeperSubscription& operator=(ZookeeperSubscription&&); + ZookeeperSubscription& operator=(const ZookeeperSubscription&) = delete; + ~ZookeeperSubscription(); const std::string& get_endpoint() const; + const std::string& get_subscription_id() const; private: std::string endpoint_; std::string subscription_id_; }; +/** + * \endcond + */ + } // cppkafka -#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIBER_H +#endif // CPPKAFKA_ZOOKEEPER_SUBSCRIPTION_H diff --git a/include/cppkafka/zookeeper/zookeeper_watcher.h b/include/cppkafka/zookeeper/zookeeper_watcher.h index 94c8c79..8a722d8 100644 --- a/include/cppkafka/zookeeper/zookeeper_watcher.h +++ b/include/cppkafka/zookeeper/zookeeper_watcher.h @@ -40,6 +40,9 @@ namespace cppkafka { +/** + * \cond + */ class ZookeeperWatcher { public: static const std::chrono::milliseconds DEFAULT_RECEIVE_TIMEOUT; @@ -73,6 +76,10 @@ private: size_t id_counter_{0}; }; +/** + * \endcond + */ + } // cppkafka #endif // CPPKAFKA_ZOOKEEPER_WATCHER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9b73d19..0e4ed79 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,7 +20,7 @@ if (ENABLE_ZOOKEEPER) set(ZOOKEEPER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_watcher.cpp ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_pool.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscriber.cpp) + ${CMAKE_CURRENT_SOURCE_DIR}/zookeeper/zookeeper_subscription.cpp) # Add json library as include dir (header only) include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/../third_party/json) set(SOURCES ${SOURCES} ${ZOOKEEPER_SOURCES}) diff --git a/src/consumer.cpp b/src/consumer.cpp index dc7adef..a765116 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -126,9 +126,11 @@ void Consumer::async_commit(const TopicPartitionList& topic_partitions) { commit(topic_partitions, true); } -KafkaHandleBase::OffsetTuple Consumer::get_offsets(const string& topic, int partition) const { +KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_partition) const { int64_t low; int64_t high; + const string& topic = topic_partition.get_topic(); + const int partition = topic_partition.get_partition(); rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), partition, &low, &high); check_error(result); @@ -152,12 +154,18 @@ Consumer::get_offsets_position(const TopicPartitionList& topic_partitions) const return convert(topic_list_handle); } -TopicPartitionList Consumer::get_subscription() const { +vector Consumer::get_subscription() const { rd_kafka_resp_err_t error; rd_kafka_topic_partition_list_t* list = nullptr; error = rd_kafka_subscription(get_handle(), &list); check_error(error); - return convert(make_handle(list)); + + auto handle = make_handle(list); + vector output; + for (const auto& topic_partition : convert(handle)) { + output.push_back(topic_partition.get_topic()); + } + return output; } TopicPartitionList Consumer::get_assignment() const { diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index d327ca0..234dfc8 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -91,10 +91,12 @@ Topic KafkaHandleBase::get_topic(const string& name, TopicConfiguration config) return get_topic(name, rd_kafka_topic_conf_dup(handle)); } -KafkaHandleBase::OffsetTuple KafkaHandleBase::query_offsets(const string& topic, - int partition) const { +KafkaHandleBase::OffsetTuple +KafkaHandleBase::query_offsets(const TopicPartition& topic_partition) const { int64_t low; int64_t high; + const string& topic = topic_partition.get_topic(); + const int partition = topic_partition.get_partition(); rd_kafka_resp_err_t result = rd_kafka_query_watermark_offsets(handle_.get(), topic.data(), partition, &low, &high, timeout_ms_.count()); @@ -144,8 +146,8 @@ void KafkaHandleBase::set_handle(rd_kafka_t* handle) { rd_kafka_brokers_add(handle_.get(), brokers.data()); rd_kafka_poll(handle_.get(), 10); }; - ZookeeperSubscriber subscriber = pool.subscribe(endpoint, timeout, callback); - zookeeper_subscriber_.reset(new ZookeeperSubscriber(move(subscriber))); + ZookeeperSubscription subscriber = pool.subscribe(endpoint, timeout, callback); + zookeeper_subscription_.reset(new ZookeeperSubscription(move(subscriber))); callback(pool.get_brokers(endpoint)); } diff --git a/src/zookeeper/zookeeper_pool.cpp b/src/zookeeper/zookeeper_pool.cpp index ccde30c..9aa4174 100644 --- a/src/zookeeper/zookeeper_pool.cpp +++ b/src/zookeeper/zookeeper_pool.cpp @@ -28,7 +28,7 @@ */ #include "zookeeper/zookeeper_pool.h" -#include "zookeeper/zookeeper_subscriber.h" +#include "zookeeper/zookeeper_subscription.h" #include "exceptions.h" using std::string; @@ -45,7 +45,7 @@ ZookeeperPool& ZookeeperPool::instance() { return the_instance; } -ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint, +ZookeeperSubscription ZookeeperPool::subscribe(const string& endpoint, milliseconds receive_timeout, ZookeeperWatcher::WatcherCallback callback) { lock_guard _(watchers_mutex_); @@ -55,10 +55,10 @@ ZookeeperSubscriber ZookeeperPool::subscribe(const string& endpoint, forward_as_tuple(endpoint, receive_timeout)).first; } string id = iter->second.subscribe(move(callback)); - return ZookeeperSubscriber(endpoint, id); + return ZookeeperSubscription(endpoint, id); } -void ZookeeperPool::unsubscribe(const ZookeeperSubscriber& subscriber) { +void ZookeeperPool::unsubscribe(const ZookeeperSubscription& subscriber) { lock_guard _(watchers_mutex_); auto iter = watchers_.find(subscriber.get_endpoint()); if (iter != watchers_.end()) { diff --git a/src/zookeeper/zookeeper_subscriber.cpp b/src/zookeeper/zookeeper_subscriber.cpp deleted file mode 100644 index 275880b..0000000 --- a/src/zookeeper/zookeeper_subscriber.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include "zookeeper/zookeeper_subscriber.h" -#include "zookeeper/zookeeper_pool.h" - -using std::string; - -namespace cppkafka { - -ZookeeperSubscriber::ZookeeperSubscriber(string endpoint, string subscription_id) -: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) { - -} - -ZookeeperSubscriber::~ZookeeperSubscriber() { - ZookeeperPool::instance().unsubscribe(*this); -} - -const string& ZookeeperSubscriber::get_endpoint() const { - return endpoint_; -} - -const string& ZookeeperSubscriber::get_subscription_id() const { - return subscription_id_; -} - -} // cppkafka diff --git a/src/zookeeper/zookeeper_subscription.cpp b/src/zookeeper/zookeeper_subscription.cpp new file mode 100644 index 0000000..68fee1a --- /dev/null +++ b/src/zookeeper/zookeeper_subscription.cpp @@ -0,0 +1,25 @@ +#include "zookeeper/zookeeper_subscription.h" +#include "zookeeper/zookeeper_pool.h" + +using std::string; + +namespace cppkafka { + +ZookeeperSubscription::ZookeeperSubscription(string endpoint, string subscription_id) +: endpoint_(move(endpoint)), subscription_id_(move(subscription_id)) { + +} + +ZookeeperSubscription::~ZookeeperSubscription() { + ZookeeperPool::instance().unsubscribe(*this); +} + +const string& ZookeeperSubscription::get_endpoint() const { + return endpoint_; +} + +const string& ZookeeperSubscription::get_subscription_id() const { + return subscription_id_; +} + +} // cppkafka diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 2d205aa..f5a6f59 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -130,12 +130,14 @@ TEST_F(ConsumerTest, AssignmentCallback) { } EXPECT_EQ(1, runner.get_messages().size()); + EXPECT_EQ(vector{ KAFKA_TOPIC }, consumer.get_subscription()); + assignment = consumer.get_assignment(); EXPECT_EQ(3, assignment.size()); int64_t low; int64_t high; - tie(low, high) = consumer.get_offsets(KAFKA_TOPIC, partition); + tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition }); EXPECT_GT(high, low); EXPECT_EQ(high, runner.get_messages().back().get_offset() + 1); } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 6f1e633..b9abc03 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -128,7 +128,7 @@ TEST_F(ProducerTest, OneMessageOnFixedPartition) { int64_t low; int64_t high; - tie(low, high) = producer.query_offsets(KAFKA_TOPIC, partition); + tie(low, high) = producer.query_offsets({ KAFKA_TOPIC, partition }); EXPECT_GT(high, low); }