diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index ad5bece..8f49b6d 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -35,7 +35,7 @@ #include #include #include "kafka_handle_base.h" -#include "message.h" +#include "queue.h" #include "macros.h" #include "error.h" @@ -54,7 +54,7 @@ class TopicConfiguration; * Semi-simple code showing how to use this class * * \code - * // Create a configuration and set the group.id and broker list fields + * // Create a configuration and set the group.id and broker list fields * Configuration config = { * { "metadata.broker.list", "127.0.0.1:9092" }, * { "group.id", "foo" } @@ -74,13 +74,13 @@ class TopicConfiguration; * consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) { * cout << topic_partitions.size() << " partitions revoked!" << endl; * }); - * - * // Subscribe + * + * // Subscribe * consumer.subscribe({ "my_topic" }); * while (true) { * // Poll. This will optionally return a message. It's necessary to check if it's a valid * // one before using it - * Message msg = consumer.poll(); + * Message msg = consumer.poll(); * if (msg) { * if (!msg.get_error()) { * // It's an actual message. Get the payload and print it to stdout @@ -103,12 +103,12 @@ public: /** * \brief Creates an instance of a consumer. - * - * Note that the configuration *must contain* the group.id attribute set or this + * + * Note that the configuration *must contain* the group.id attribute set or this * will throw. * * \param config The configuration to be used - */ + */ Consumer(Configuration config); Consumer(const Consumer&) = delete; Consumer(Consumer&&) = delete; @@ -124,7 +124,7 @@ public: /** * \brief Sets the topic/partition assignment callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -138,7 +138,7 @@ public: /** * \brief Sets the topic/partition revocation callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -153,7 +153,7 @@ public: /** * \brief Sets the rebalance error callback - * + * * The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the * rebalance, converting from rdkafka topic partition list handles into TopicPartitionList * and executing the assignment/revocation/rebalance_error callbacks. @@ -188,9 +188,9 @@ public: /** * \brief Unassigns the current topic/partition assignment * - * This translates into a call to rd_kafka_assign using a null as the topic partition list + * This translates into a call to rd_kafka_assign using a null as the topic partition list * parameter - */ + */ void unassign(); /** @@ -262,7 +262,9 @@ public: * * This translates into a call to rd_kafka_get_watermark_offsets * - * \param topic_partition The topic/partition to get the offsets from + * \param topic_partition The topic/partition to get the offsets from + * + * \return A pair of offsets {low, high} */ OffsetTuple get_offsets(const TopicPartition& topic_partition) const; @@ -272,6 +274,8 @@ public: * This translates into a call to rd_kafka_committed * * \param topic_partitions The topic/partition list to be queried + * + * \return The topic partition list */ TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const; @@ -281,6 +285,8 @@ public: * This translates into a call to rd_kafka_position * * \param topic_partitions The topic/partition list to be queried + * + * \return The topic partition list */ TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const; @@ -295,6 +301,8 @@ public: * \brief Gets the current topic/partition list assignment * * This translates to a call to rd_kafka_assignment + * + * \return The topic partition list */ TopicPartitionList get_assignment() const; @@ -302,21 +310,29 @@ public: * \brief Gets the group member id * * This translates to a call to rd_kafka_memberid + * + * \return The id */ std::string get_member_id() const; /** - * Gets the partition assignment callback. + * \brief Gets the partition assignment callback. + * + * \return The callback reference */ const AssignmentCallback& get_assignment_callback() const; /** - * Gets the partition revocation callback. + * \brief Gets the partition revocation callback. + * + * \return The callback reference */ const RevocationCallback& get_revocation_callback() const; /** - * Gets the rebalance error callback. + * \brief Gets the rebalance error callback. + * + * \return The callback reference */ const RebalanceErrorCallback& get_rebalance_error_callback() const; @@ -326,16 +342,16 @@ public: * This will call rd_kafka_consumer_poll. * * Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker - * will think this consumer is down and will trigger a rebalance (if using dynamic + * will think this consumer is down and will trigger a rebalance (if using dynamic * subscription). * * The timeout used on this call will be the one configured via Consumer::set_timeout. * - * The returned message *might* be empty. If's necessary to check that it's a valid one before - * using it: - * + * \return A message. The returned message *might* be empty. If's necessary to check + * that it's a valid one before using it: + * * \code - * Message msg = consumer.poll(); + * Message msg = consumer.poll(); * if (msg) { * // It's a valid message! * } @@ -350,6 +366,8 @@ public: * instead of the one configured on this Consumer. * * \param timeout The timeout to be used on this call + * + * \return A message */ Message poll(std::chrono::milliseconds timeout); @@ -359,8 +377,10 @@ public: * This can return one or more messages * * \param max_batch_size The maximum amount of messages expected + * + * \return A list of messages */ - std::vector poll_batch(size_t max_batch_size); + MessageList poll_batch(size_t max_batch_size); /** * \brief Polls for a batch of messages @@ -369,8 +389,42 @@ public: * * \param max_batch_size The maximum amount of messages expected * \param timeout The timeout for this operation + * + * \return A list of messages */ - std::vector poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + + /** + * \brief Get the global event queue servicing this consumer corresponding to + * rd_kafka_queue_get_main and which is polled via rd_kafka_poll + * + * \return A Queue object + * + * \remark Note that this call will disable forwarding to the consumer_queue. + * To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue) + */ + Queue get_main_queue() const; + + /** + * \brief Get the consumer group queue servicing corresponding to + * rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll + * + * \return A Queue object + */ + Queue get_consumer_queue() const; + + /** + * \brief Get the queue belonging to this partition. If the consumer is not assigned to this + * partition, an empty queue will be returned + * + * \param partition The partition object + * + * \return A Queue object + * + * \remark Note that this call will disable forwarding to the consumer_queue. + * To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue) + */ + Queue get_partition_queue(const TopicPartition& partition) const; private: static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error, rd_kafka_topic_partition_list_t *partitions, void *opaque); diff --git a/include/cppkafka/cppkafka.h b/include/cppkafka/cppkafka.h index 793a4aa..3bf4110 100644 --- a/include/cppkafka/cppkafka.h +++ b/include/cppkafka/cppkafka.h @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -55,5 +56,6 @@ #include #include #include +#include #endif diff --git a/include/cppkafka/exceptions.h b/include/cppkafka/exceptions.h index 323c3e5..8bfd801 100644 --- a/include/cppkafka/exceptions.h +++ b/include/cppkafka/exceptions.h @@ -122,6 +122,18 @@ private: Error error_; }; +/** + * Queue exception for rd_kafka_queue_t errors + */ +class CPPKAFKA_API QueueException : public Exception { +public: + QueueException(Error error); + + Error get_error() const; +private: + Error error_; +}; + } // cppkafka #endif // CPPKAFKA_EXCEPTIONS_H diff --git a/include/cppkafka/group_information.h b/include/cppkafka/group_information.h index 483568f..41d3cda 100644 --- a/include/cppkafka/group_information.h +++ b/include/cppkafka/group_information.h @@ -136,6 +136,8 @@ private: std::vector members_; }; +using GroupInformationList = std::vector; + } // cppkafka #endif // CPPKAFKA_GROUP_INFORMATION_H diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index 68dd61b..9ebcff3 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -39,6 +39,7 @@ #include #include #include +#include "group_information.h" #include "topic_partition.h" #include "topic_partition_list.h" #include "topic_configuration.h" @@ -108,11 +109,15 @@ public: * This translates into a call to rd_kafka_query_watermark_offsets * * \param topic_partition The topic/partition to be queried + * + * \return A pair of watermark offsets {low, high} */ OffsetTuple query_offsets(const TopicPartition& topic_partition) const; /** - * Gets the rdkafka handle + * \brief Gets the rdkafka handle + * + * \return The rdkafka handle */ rd_kafka_t* get_handle() const; @@ -123,7 +128,9 @@ public: * configuration provided in the Configuration object for this consumer/producer handle, * if any. * - * \param name The name of the topic to be created + * \param name The name of the topic to be created + * + * \return A topic */ Topic get_topic(const std::string& name); @@ -134,15 +141,19 @@ public: * * \param name The name of the topic to be created * \param config The configuration to be used for the new topic + * + * \return A topic */ Topic get_topic(const std::string& name, TopicConfiguration config); /** * \brief Gets metadata for brokers, topics, partitions, etc * + * This translates into a call to rd_kafka_metadata + * * \param all_topics Whether to fetch metadata about all topics or only locally known ones * - * This translates into a call to rd_kafka_metadata + * \return The metadata */ Metadata get_metadata(bool all_topics = true) const; @@ -153,20 +164,26 @@ public: * This translates into a call to rd_kafka_metadata * * \param topic The topic to fetch information for + * + * \return The topic metadata */ TopicMetadata get_metadata(const Topic& topic) const; /** - * Gets the consumer group information + * \brief Gets the consumer group information * * \param name The name of the consumer group to look up + * + * \return The group information */ GroupInformation get_consumer_group(const std::string& name); /** - * Gets all consumer groups + * \brief Gets all consumer groups + * + * \return A list of consumer groups */ - std::vector get_consumer_groups(); + GroupInformationList get_consumer_groups(); /** * \brief Gets topic/partition offsets based on timestamps @@ -174,23 +191,31 @@ public: * This translates into a call to rd_kafka_offsets_for_times * * \param queries A map from topic/partition to the timestamp to be used + * + * \return A topic partition list */ TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const; /** - * Returns the kafka handle name + * \brief Get the kafka handle name + * + * \return The handle name */ std::string get_name() const; /** - * Gets the configured timeout. + * \brief Gets the configured timeout. + * + * \return The configured timeout * * \sa KafkaHandleBase::set_timeout */ std::chrono::milliseconds get_timeout() const; /** - * Gets the handle's configuration + * \brief Gets the handle's configuration + * + * \return A reference to the configuration object */ const Configuration& get_configuration() const; @@ -198,6 +223,8 @@ public: * \brief Gets the length of the out queue * * This calls rd_kafka_outq_len + * + * \return The length of the queue */ int get_out_queue_length() const; @@ -221,7 +248,7 @@ private: Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf); Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const; - std::vector fetch_consumer_groups(const char* name); + GroupInformationList fetch_consumer_groups(const char* name); void save_topic_config(const std::string& topic_name, TopicConfiguration config); std::chrono::milliseconds timeout_ms_; diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 5866124..a15f947 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -177,6 +177,8 @@ private: Buffer key_; }; +using MessageList = std::vector; + /** * Represents a message's timestamp */ diff --git a/include/cppkafka/queue.h b/include/cppkafka/queue.h new file mode 100644 index 0000000..bb31dcb --- /dev/null +++ b/include/cppkafka/queue.h @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include +#include +#include +#include +#include "macros.h" +#include "message.h" + +#ifndef CPPKAFKA_QUEUE_H +#define CPPKAFKA_QUEUE_H + +namespace cppkafka { +/** + * \brief Represents a rdkafka queue + * + * This is a simple wrapper over a rd_kafka_queue_t* + */ +class CPPKAFKA_API Queue { +public: + /** + * \brief Creates a Queue object that doesn't take ownership of the handle + * + * \param handle The handle to be used + */ + static Queue make_non_owning(rd_kafka_queue_t* handle); + + /** + * \brief Constructs an empty queue + * + * Note that using any methods except Queue::get_handle on an empty queue is undefined + * behavior + */ + Queue(); + + /** + * \brief Constructs a queue using a handle + * + * This will take ownership of the handle + * + * \param handle The handle to be used + */ + Queue(rd_kafka_queue_t* handle); + + /** + * Returns the rdkakfa handle + */ + rd_kafka_queue_t* get_handle() const; + + /** + * \brief Returns the length of the queue + * + * This translates to a call to rd_kafka_queue_length + */ + size_t get_length() const; + + /** + * \brief Forward to another queue + * + * This translates to a call to rd_kafka_queue_forward + */ + void forward_to_queue(const Queue& forward_queue) const; + + /** + * \brief Disable forwarding to another queue + * + * This translates to a call to rd_kafka_queue_forward(NULL) + */ + void disable_queue_forwarding() const; + + /** + * \brief Sets the timeout for consume operations + * + * This timeout is applied when calling consume() + * + * \param timeout The timeout to be set + */ + void set_consume_timeout(std::chrono::milliseconds timeout); + + /** + * Gets the configured timeout. + * + * \sa Queue::set_timeout + */ + std::chrono::milliseconds get_consume_timeout() const; + + /** + * \brief Consume a message from this queue + * + * This translates to a call to rd_kafka_consume_queue using the configured timeout for this object + * + * \return A message + */ + Message consume() const; + + /** + * \brief Consume a message from this queue + * + * Same as consume() but the specified timeout will be used instead of the configured one + * + * \param timeout The timeout to be used on this call + * + * \return A message + */ + Message consume(std::chrono::milliseconds timeout) const; + + /** + * \brief Consumes a batch of messages from this queue + * + * This translates to a call to rd_kafka_consume_batch_queue using the configured timeout for this object + * + * \param max_batch_size The max number of messages to consume if available + * + * \return A list of messages. Could be empty if there's nothing to consume + */ + MessageList consume_batch(size_t max_batch_size) const; + + /** + * \brief Consumes a batch of messages from this queue + * + * Same as Queue::consume_batch(size_t) but the specified timeout will be used instead of the configured one + * + * \param max_batch_size The max number of messages to consume if available + * + * \param timeout The timeout to be used on this call + * + * \return A list of messages. Could be empty if there's nothing to consume + */ + MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const; + + /** + * Indicates whether this queue is valid (not null) + */ + explicit operator bool() const { + return handle_ != nullptr; + } + +private: + static const std::chrono::milliseconds DEFAULT_TIMEOUT; + + using HandlePtr = std::unique_ptr; + + struct NonOwningTag { }; + + Queue(rd_kafka_queue_t* handle, NonOwningTag); + + // Members + HandlePtr handle_; + std::chrono::milliseconds timeout_ms_; +}; + +using QueueList = std::vector; + +} // cppkafka + +#endif //CPPKAFKA_QUEUE_H diff --git a/include/cppkafka/utils/roundrobin_poll_adapter.h b/include/cppkafka/utils/roundrobin_poll_adapter.h new file mode 100644 index 0000000..dee42b1 --- /dev/null +++ b/include/cppkafka/utils/roundrobin_poll_adapter.h @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H +#define CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H + +#include +#include "../exceptions.h" +#include "../consumer.h" +#include "../queue.h" + +namespace cppkafka { + +/** + * \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin + * polling mechanism. + * + * The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of + * messages from each partition in turn. For performance reasons, librdkafka pre-fetches batches + * of messages from the kafka broker (one batch from each partition), and stores them locally in + * partition queues. Since all the internal partition queues are forwarded by default unto the + * group consumer queue (one per consumer), these batches end up being queued in sequence or arrival. + * For instance, a topic with 4 partitions (each containing N messages) will end up being queued as + * N1|N2|N3|N4 in the consumer queue. This means that for the Consumer to process messages from the + * 4th partition, it needs to consume 3xN messages. The larger the number of partitions, the more + * starvation occurs. While this behavior is acceptable for some applications, real-time applications + * sensitive to timing or those where messages must be processed more or less in the same order as + * they're being produced, the default librdkafka behavior is unacceptable. + * Fortunately, librdkafka exposes direct access to its partition queues which means that various + * polling strategies can be implemented to suit needs. + * This adapter allows fair round-robin polling of all assigned partitions, one message at a time + * (or one batch at a time if poll_batch() is used). Note that poll_batch() has nothing to do with + * the internal batching mechanism of librdkafka. + * + * Example code on how to use this: + * + * \code + * // Create a consumer + * Consumer consumer(...); + * + * // Optionally set the callbacks. This must be done *BEFORE* creating the adapter + * consumer.set_assignment_callback(...); + * consumer.set_revocation_callback(...); + * consumer.set_rebalance_error_callback(...); + * + * // Create the adapter and use it for polling + * RoundRobinPollAdapter adapter(consumer); + * + * // Subscribe *AFTER* the adapter has been created + * consumer.subscribe({ "my_topic" }); + * + * while (true) { + * // Poll each partition in turn + * Message msg = adapter.poll(); + * if (msg) { + * // process valid message + * } + * } + * } + * \endcode + * + * \warning Calling directly poll() or poll_batch() on the Consumer object while using this adapter will + * lead to undesired results since the RoundRobinPollAdapter modifies the internal queuing mechanism of + * the Consumer instance it owns. + */ +class RoundRobinPollAdapter +{ +public: + RoundRobinPollAdapter(Consumer& consumer); + + ~RoundRobinPollAdapter(); + + /** + * \brief Sets the timeout for polling functions + * + * This calls Consumer::set_timeout + * + * \param timeout The timeout to be set + */ + void set_timeout(std::chrono::milliseconds timeout); + + /** + * \brief Gets the timeout for polling functions + * + * This calls Consumer::get_timeout + * + * \return The timeout + */ + std::chrono::milliseconds get_timeout(); + + /** + * \brief Polls all assigned partitions for new messages in round-robin fashion + * + * Each call to poll() will result in another partition being polled. Aside from + * the partition, this function will also poll the main queue for events. If an + * event is found, it is immediately returned. As such the main queue has higher + * priority than the partition queues. Because of this, you + * need to call poll periodically as a keep alive mechanism, otherwise the broker + * will think this consumer is down and will trigger a rebalance (if using dynamic + * subscription). + * The timeout used on this call will be the one configured via RoundRobinPollAdapter::set_timeout. + * + * \return A message. The returned message *might* be empty. It's necessary to check + * that it's a valid one before using it (see example above). + */ + Message poll(); + + /** + * \brief Polls for new messages + * + * Same as the other overload of RoundRobinPollAdapter::poll but the provided + * timeout will be used instead of the one configured on this Consumer. + * + * \param timeout The timeout to be used on this call + */ + Message poll(std::chrono::milliseconds timeout); + + /** + * \brief Polls all assigned partitions for a batch of new messages in round-robin fashion + * + * Each call to poll() will result in another partition being polled. Aside from + * the partition, this function will also poll the main queue for events. If a batch of + * events is found, it is prepended to the returned message list. If after polling the + * main queue the batch size has reached max_batch_size, it is immediately returned and + * the partition is no longer polled. Otherwise the partition is polled for the remaining + * messages up to the max_batch_size limit. + * Because of this, you need to call poll periodically as a keep alive mechanism, + * otherwise the broker will think this consumer is down and will trigger a rebalance + * (if using dynamic subscription). + * + * \param max_batch_size The maximum amount of messages expected + * + * \return A list of messages + */ + MessageList poll_batch(size_t max_batch_size); + + /** + * \brief Polls for a batch of messages depending on the configured PollStrategy + * + * Same as the other overload of RoundRobinPollAdapter::poll_batch but the provided + * timeout will be used instead of the one configured on this Consumer. + * + * \param max_batch_size The maximum amount of messages expected + * \param timeout The timeout for this operation + * + * \return A list of messages + */ + MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); + +private: + class CircularBuffer { + using qlist = std::list; + using qiter = qlist::iterator; + public: + qlist& ref() { return queues_; } + Queue& next() { + if (queues_.empty()) { + throw QueueException(RD_KAFKA_RESP_ERR__STATE); + } + if (++iter_ == queues_.end()) { + iter_ = queues_.begin(); + } + return *iter_; + } + void rewind() { iter_ = queues_.begin(); } + private: + qlist queues_; + qiter iter_ = queues_.begin(); + }; + + void on_assignment(TopicPartitionList& partitions); + void on_revocation(const TopicPartitionList& partitions); + void on_rebalance_error(Error error); + void restore_forwarding(); + + // Members + Consumer& consumer_; + Consumer::AssignmentCallback assignment_callback_; + Consumer::RevocationCallback revocation_callback_; + Consumer::RebalanceErrorCallback rebalance_error_callback_; + Queue consumer_queue_; + CircularBuffer partition_queues_; +}; + +} //cppkafka + +#endif //CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index fa018d5..71a14b7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,24 +1,4 @@ -set(SOURCES - configuration.cpp - topic_configuration.cpp - configuration_option.cpp - exceptions.cpp - topic.cpp - buffer.cpp - message.cpp - topic_partition.cpp - topic_partition_list.cpp - metadata.cpp - group_information.cpp - error.cpp - - kafka_handle_base.cpp - producer.cpp - consumer.cpp - - utils/backoff_performer.cpp - utils/backoff_committer.cpp -) +file(GLOB SOURCES *.cpp utils/*.cpp) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka) include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) diff --git a/src/consumer.cpp b/src/consumer.cpp index f250cbc..eab195c 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -52,14 +52,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error, static_cast(opaque)->handle_rebalance(error, list); } -Consumer::Consumer(Configuration config) +Consumer::Consumer(Configuration config) : KafkaHandleBase(move(config)) { char error_buffer[512]; rd_kafka_conf_t* config_handle = get_configuration_handle(); // Set ourselves as the opaque pointer rd_kafka_conf_set_opaque(config_handle, this); rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy); - rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, rd_kafka_conf_dup(config_handle), error_buffer, sizeof(error_buffer)); if (!ptr) { @@ -165,7 +165,7 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p int64_t low; int64_t high; const string& topic = topic_partition.get_topic(); - const int partition = topic_partition.get_partition(); + const int partition = topic_partition.get_partition(); rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(), partition, &low, &high); check_error(result); @@ -232,9 +232,7 @@ Message Consumer::poll() { } Message Consumer::poll(milliseconds timeout) { - rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), - static_cast(timeout.count())); - return message ? Message(message) : Message(); + return rd_kafka_consumer_poll(get_handle(), static_cast(timeout.count())); } vector Consumer::poll_batch(size_t max_batch_size) { @@ -260,6 +258,24 @@ vector Consumer::poll_batch(size_t max_batch_size, milliseconds timeout return output; } +Queue Consumer::get_main_queue() const { + Queue queue = Queue::make_non_owning(rd_kafka_queue_get_main(get_handle())); + queue.disable_queue_forwarding(); + return queue; +} + +Queue Consumer::get_consumer_queue() const { + return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle())); +} + +Queue Consumer::get_partition_queue(const TopicPartition& partition) const { + Queue queue = Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(), + partition.get_topic().c_str(), + partition.get_partition())); + queue.disable_queue_forwarding(); + return queue; +} + void Consumer::close() { rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle()); check_error(error); diff --git a/src/exceptions.cpp b/src/exceptions.cpp index fbf98e1..5cfa083 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -108,4 +108,15 @@ Error ConsumerException::get_error() const { return error_; } +// QueueException + +QueueException::QueueException(Error error) +: Exception(error.to_string()), error_(error) { + +} + +Error QueueException::get_error() const { + return error_; +} + } // cppkafka diff --git a/src/message.cpp b/src/message.cpp index a52a0a7..d9c0870 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -57,10 +57,9 @@ Message::Message(rd_kafka_message_t* handle, NonOwningTag) } Message::Message(HandlePtr handle) -: handle_(move(handle)), - payload_((const Buffer::DataType*)handle_->payload, handle_->len), - key_((const Buffer::DataType*)handle_->key, handle_->key_len) { - +: handle_(move(handle)), + payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()), + key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) { } // MessageTimestamp diff --git a/src/queue.cpp b/src/queue.cpp new file mode 100644 index 0000000..af56715 --- /dev/null +++ b/src/queue.cpp @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#include "queue.h" +#include "exceptions.h" + +using std::vector; +using std::exception; +using std::chrono::milliseconds; + +namespace cppkafka { + +void dummy_deleter(rd_kafka_queue_t*) { + +} + +const milliseconds Queue::DEFAULT_TIMEOUT{1000}; + +Queue Queue::make_non_owning(rd_kafka_queue_t* handle) { + return Queue(handle, NonOwningTag{}); +} + +Queue::Queue() +: handle_(nullptr, nullptr), + timeout_ms_(DEFAULT_TIMEOUT) { + +} + +Queue::Queue(rd_kafka_queue_t* handle) +: handle_(handle, &rd_kafka_queue_destroy), + timeout_ms_(DEFAULT_TIMEOUT) { + +} + +Queue::Queue(rd_kafka_queue_t* handle, NonOwningTag) +: handle_(handle, &dummy_deleter) { + +} + +rd_kafka_queue_t* Queue::get_handle() const { + return handle_.get(); +} + +size_t Queue::get_length() const { + return rd_kafka_queue_length(handle_.get()); +} + +void Queue::forward_to_queue(const Queue& forward_queue) const { + return rd_kafka_queue_forward(handle_.get(), forward_queue.handle_.get()); +} + +void Queue::disable_queue_forwarding() const { + return rd_kafka_queue_forward(handle_.get(), nullptr); +} + +void Queue::set_consume_timeout(milliseconds timeout) { + timeout_ms_ = timeout; +} + +milliseconds Queue::get_consume_timeout() const { + return timeout_ms_; +} + +Message Queue::consume() const { + return consume(timeout_ms_); +} + +Message Queue::consume(milliseconds timeout) const { + return Message(rd_kafka_consume_queue(handle_.get(), static_cast(timeout.count()))); +} + +MessageList Queue::consume_batch(size_t max_batch_size) const { + return consume_batch(max_batch_size, timeout_ms_); +} + +MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { + MessageList message_list; + vector raw_message_list(max_batch_size); + ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(), + static_cast(timeout.count()), + raw_message_list.data(), + max_batch_size); + if (num_messages == -1) { + rd_kafka_resp_err_t error = rd_kafka_last_error(); + if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { + throw QueueException(error); + } + return message_list; + } + raw_message_list.resize(num_messages); + message_list.reserve(num_messages); + for (auto&& message : raw_message_list) { + message_list.emplace_back(message); + } + return message_list; +} + +} //cppkafka diff --git a/src/topic.cpp b/src/topic.cpp index 7ecdc63..9f31148 100644 --- a/src/topic.cpp +++ b/src/topic.cpp @@ -34,7 +34,7 @@ using std::string; namespace cppkafka { -void dummy_topic_destroyer(rd_kafka_topic_t*) { +void dummy_deleter(rd_kafka_topic_t*) { } @@ -47,13 +47,13 @@ Topic::Topic() } -Topic::Topic(rd_kafka_topic_t* handle) +Topic::Topic(rd_kafka_topic_t* handle) : handle_(handle, &rd_kafka_topic_destroy) { } Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag) -: handle_(handle, &dummy_topic_destroyer) { +: handle_(handle, &dummy_deleter) { } diff --git a/src/utils/roundrobin_poll_adapter.cpp b/src/utils/roundrobin_poll_adapter.cpp new file mode 100644 index 0000000..83df923 --- /dev/null +++ b/src/utils/roundrobin_poll_adapter.cpp @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2017, Matias Fontanini + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "utils/roundrobin_poll_adapter.h" + +using std::chrono::milliseconds; +using std::make_move_iterator; + +namespace cppkafka { + +RoundRobinPollAdapter::RoundRobinPollAdapter(Consumer& consumer) +: consumer_(consumer), + assignment_callback_(consumer.get_assignment_callback()), + revocation_callback_(consumer.get_revocation_callback()), + rebalance_error_callback_(consumer.get_rebalance_error_callback()), + consumer_queue_(consumer.get_consumer_queue()) { + // take over the assignment callback + consumer_.set_assignment_callback([this](TopicPartitionList& partitions) { + on_assignment(partitions); + }); + // take over the revocation callback + consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) { + on_revocation(partitions); + }); + // take over the rebalance error callback + consumer_.set_rebalance_error_callback([this](Error error) { + on_rebalance_error(error); + }); + // make sure we don't have any active subscriptions + if (!consumer_.get_subscription().empty()) { + throw ConsumerException(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION); + } +} + +RoundRobinPollAdapter::~RoundRobinPollAdapter() { + restore_forwarding(); + //set the original callbacks + consumer_.set_assignment_callback(assignment_callback_); + consumer_.set_revocation_callback(revocation_callback_); + consumer_.set_rebalance_error_callback(rebalance_error_callback_); +} + +void RoundRobinPollAdapter::set_timeout(milliseconds timeout) { + consumer_.set_timeout(timeout); +} + +milliseconds RoundRobinPollAdapter::get_timeout() { + return consumer_.get_timeout(); +} + +Message RoundRobinPollAdapter::poll() { + return poll(consumer_.get_timeout()); +} + +Message RoundRobinPollAdapter::poll(milliseconds timeout) { + bool empty_list = partition_queues_.ref().empty(); + // Poll group event queue first + Message message = consumer_queue_.consume(empty_list ? timeout : milliseconds(0)); + if (message) { + return message; + } + if (!empty_list) { + //consume the next partition + message = partition_queues_.next().consume(timeout); + } + return message; +} + +MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) { + return poll_batch(max_batch_size, consumer_.get_timeout()); +} + +MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) { + bool empty_list = partition_queues_.ref().empty(); + ssize_t remaining_count = max_batch_size; + // batch from the group event queue first + MessageList messages = consumer_queue_.consume_batch(remaining_count, + empty_list ? timeout : milliseconds(0)); + remaining_count -= messages.size(); + if ((remaining_count <= 0) || empty_list) { + // the entire batch was filled + return messages; + } + // batch from the next partition + MessageList partition_messages = partition_queues_.next().consume_batch(remaining_count, timeout); + if (messages.empty()) { + return partition_messages; + } + if (partition_messages.empty()) { + return messages; + } + // concatenate both lists + messages.reserve(messages.size() + partition_messages.size()); + messages.insert(messages.end(), + make_move_iterator(partition_messages.begin()), + make_move_iterator(partition_messages.end())); + return messages; +} + +void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) { + //populate partition queues + for (const auto& partition : partitions) { + partition_queues_.ref().push_back(consumer_.get_partition_queue(partition)); + } + // call original consumer callback if any + if (assignment_callback_) { + assignment_callback_(partitions); + } +} + +void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) { + // put all partitions queues back to their initial state + restore_forwarding(); + // empty the circular queue list + partition_queues_.ref().clear(); + // reset the queue iterator + partition_queues_.rewind(); + // call original consumer callback if any + if (revocation_callback_) { + revocation_callback_(partitions); + } +} + +void RoundRobinPollAdapter::on_rebalance_error(Error error) { + // call original consumer callback if any + if (rebalance_error_callback_) { + rebalance_error_callback_(error); + } +} + +void RoundRobinPollAdapter::restore_forwarding() { + // forward all partition queues + for (const auto& queue : partition_queues_.ref()) { + queue.forward_to_queue(consumer_queue_); + } +} + +} //cppkafka