Merge pull request #63 from accelerated/partition_poll

round robin polling for assigned partitions
This commit is contained in:
Matias Fontanini
2018-05-30 11:15:02 -07:00
committed by GitHub
30 changed files with 1637 additions and 256 deletions

View File

@@ -35,7 +35,7 @@
#include <chrono>
#include <functional>
#include "kafka_handle_base.h"
#include "message.h"
#include "queue.h"
#include "macros.h"
#include "error.h"
@@ -54,7 +54,7 @@ class TopicConfiguration;
* Semi-simple code showing how to use this class
*
* \code
* // Create a configuration and set the group.id and broker list fields
* // Create a configuration and set the group.id and broker list fields
* Configuration config = {
* { "metadata.broker.list", "127.0.0.1:9092" },
* { "group.id", "foo" }
@@ -74,13 +74,13 @@ class TopicConfiguration;
* consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) {
* cout << topic_partitions.size() << " partitions revoked!" << endl;
* });
*
* // Subscribe
*
* // Subscribe
* consumer.subscribe({ "my_topic" });
* while (true) {
* // Poll. This will optionally return a message. It's necessary to check if it's a valid
* // one before using it
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* if (!msg.get_error()) {
* // It's an actual message. Get the payload and print it to stdout
@@ -103,12 +103,12 @@ public:
/**
* \brief Creates an instance of a consumer.
*
* Note that the configuration *must contain* the group.id attribute set or this
*
* Note that the configuration *must contain* the group.id attribute set or this
* will throw.
*
* \param config The configuration to be used
*/
*/
Consumer(Configuration config);
Consumer(const Consumer&) = delete;
Consumer(Consumer&&) = delete;
@@ -116,7 +116,7 @@ public:
Consumer& operator=(Consumer&&) = delete;
/**
* \brief Closes and estroys the rdkafka handle
* \brief Closes and destroys the rdkafka handle
*
* This will call Consumer::close before destroying the handle
*/
@@ -124,7 +124,7 @@ public:
/**
* \brief Sets the topic/partition assignment callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -138,7 +138,7 @@ public:
/**
* \brief Sets the topic/partition revocation callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -153,7 +153,7 @@ public:
/**
* \brief Sets the rebalance error callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -188,9 +188,9 @@ public:
/**
* \brief Unassigns the current topic/partition assignment
*
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* parameter
*/
*/
void unassign();
/**
@@ -262,7 +262,9 @@ public:
*
* This translates into a call to rd_kafka_get_watermark_offsets
*
* \param topic_partition The topic/partition to get the offsets from
* \param topic_partition The topic/partition to get the offsets from
*
* \return A pair of offsets {low, high}
*/
OffsetTuple get_offsets(const TopicPartition& topic_partition) const;
@@ -272,6 +274,8 @@ public:
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
@@ -281,6 +285,8 @@ public:
* This translates into a call to rd_kafka_position
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
@@ -295,6 +301,8 @@ public:
* \brief Gets the current topic/partition list assignment
*
* This translates to a call to rd_kafka_assignment
*
* \return The topic partition list
*/
TopicPartitionList get_assignment() const;
@@ -302,21 +310,29 @@ public:
* \brief Gets the group member id
*
* This translates to a call to rd_kafka_memberid
*
* \return The id
*/
std::string get_member_id() const;
/**
* Gets the partition assignment callback.
* \brief Gets the partition assignment callback.
*
* \return The callback reference
*/
const AssignmentCallback& get_assignment_callback() const;
/**
* Gets the partition revocation callback.
* \brief Gets the partition revocation callback.
*
* \return The callback reference
*/
const RevocationCallback& get_revocation_callback() const;
/**
* Gets the rebalance error callback.
* \brief Gets the rebalance error callback.
*
* \return The callback reference
*/
const RebalanceErrorCallback& get_rebalance_error_callback() const;
@@ -326,16 +342,16 @@ public:
* This will call rd_kafka_consumer_poll.
*
* Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker
* will think this consumer is down and will trigger a rebalance (if using dynamic
* will think this consumer is down and will trigger a rebalance (if using dynamic
* subscription).
*
* The timeout used on this call will be the one configured via Consumer::set_timeout.
*
* The returned message *might* be empty. If's necessary to check that it's a valid one before
* using it:
*
* \return A message. The returned message *might* be empty. It's necessary to check
* that it's valid before using it:
*
* \code
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* }
@@ -350,6 +366,8 @@ public:
* instead of the one configured on this Consumer.
*
* \param timeout The timeout to be used on this call
*
* \return A message
*/
Message poll(std::chrono::milliseconds timeout);
@@ -359,8 +377,10 @@ public:
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
*
* \return A list of messages
*/
std::vector<Message> poll_batch(size_t max_batch_size);
MessageList poll_batch(size_t max_batch_size);
/**
* \brief Polls for a batch of messages
@@ -369,8 +389,42 @@ public:
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
*
* \return A list of messages
*/
std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
/**
* \brief Get the global event queue servicing this consumer corresponding to
* rd_kafka_queue_get_main and which is polled via rd_kafka_poll
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
Queue get_main_queue() const;
/**
* \brief Get the consumer group queue servicing corresponding to
* rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll
*
* \return A Queue object
*/
Queue get_consumer_queue() const;
/**
* \brief Get the queue belonging to this partition. If the consumer is not assigned to this
* partition, an empty queue will be returned
*
* \param partition The partition object
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
Queue get_partition_queue(const TopicPartition& partition) const;
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);

View File

@@ -46,6 +46,7 @@
#include <cppkafka/message_builder.h>
#include <cppkafka/metadata.h>
#include <cppkafka/producer.h>
#include <cppkafka/queue.h>
#include <cppkafka/topic.h>
#include <cppkafka/topic_configuration.h>
#include <cppkafka/topic_partition.h>
@@ -55,5 +56,8 @@
#include <cppkafka/utils/buffered_producer.h>
#include <cppkafka/utils/compacted_topic_processor.h>
#include <cppkafka/utils/consumer_dispatcher.h>
#include <cppkafka/utils/poll_interface.h>
#include <cppkafka/utils/poll_strategy_base.h>
#include <cppkafka/utils/roundrobin_poll_strategy.h>
#endif

View File

@@ -122,6 +122,18 @@ private:
Error error_;
};
/**
* Queue exception for rd_kafka_queue_t errors
*/
class CPPKAFKA_API QueueException : public Exception {
public:
QueueException(Error error);
Error get_error() const;
private:
Error error_;
};
} // cppkafka
#endif // CPPKAFKA_EXCEPTIONS_H

View File

@@ -136,6 +136,8 @@ private:
std::vector<GroupMemberInformation> members_;
};
using GroupInformationList = std::vector<GroupInformation>;
} // cppkafka
#endif // CPPKAFKA_GROUP_INFORMATION_H

View File

@@ -39,6 +39,7 @@
#include <tuple>
#include <chrono>
#include <librdkafka/rdkafka.h>
#include "group_information.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "topic_configuration.h"
@@ -78,7 +79,7 @@ public:
/**
* \brief Resumes consumption/production from the given topic/partition list
*
* This translates into a call to rd_kafka_resume_partitions
* This translates into a call to rd_kafka_resume_partitions
*
* \param topic_partitions The topic/partition list to resume consuming/producing from/to
*/
@@ -108,11 +109,15 @@ public:
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
/**
* Gets the rdkafka handle
* \brief Gets the rdkafka handle
*
* \return The rdkafka handle
*/
rd_kafka_t* get_handle() const;
@@ -123,7 +128,9 @@ public:
* configuration provided in the Configuration object for this consumer/producer handle,
* if any.
*
* \param name The name of the topic to be created
* \param name The name of the topic to be created
*
* \return A topic
*/
Topic get_topic(const std::string& name);
@@ -134,15 +141,19 @@ public:
*
* \param name The name of the topic to be created
* \param config The configuration to be used for the new topic
*
* \return A topic
*/
Topic get_topic(const std::string& name, TopicConfiguration config);
/**
* \brief Gets metadata for brokers, topics, partitions, etc
*
* This translates into a call to rd_kafka_metadata
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* This translates into a call to rd_kafka_metadata
* \return The metadata
*/
Metadata get_metadata(bool all_topics = true) const;
@@ -153,20 +164,26 @@ public:
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic) const;
/**
* Gets the consumer group information
* \brief Gets the consumer group information
*
* \param name The name of the consumer group to look up
*
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name);
/**
* Gets all consumer groups
* \brief Gets all consumer groups
*
* \return A list of consumer groups
*/
std::vector<GroupInformation> get_consumer_groups();
GroupInformationList get_consumer_groups();
/**
* \brief Gets topic/partition offsets based on timestamps
@@ -174,23 +191,31 @@ public:
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
/**
* Returns the kafka handle name
* \brief Get the kafka handle name
*
* \return The handle name
*/
std::string get_name() const;
/**
* Gets the configured timeout.
* \brief Gets the configured timeout.
*
* \return The configured timeout
*
* \sa KafkaHandleBase::set_timeout
*/
std::chrono::milliseconds get_timeout() const;
/**
* Gets the handle's configuration
* \brief Gets the handle's configuration
*
* \return A reference to the configuration object
*/
const Configuration& get_configuration() const;
@@ -198,6 +223,8 @@ public:
* \brief Gets the length of the out queue
*
* This calls rd_kafka_outq_len
*
* \return The length of the queue
*/
int get_out_queue_length() const;
@@ -221,7 +248,7 @@ private:
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
std::vector<GroupInformation> fetch_consumer_groups(const char* name);
GroupInformationList fetch_consumer_groups(const char* name);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
std::chrono::milliseconds timeout_ms_;

View File

@@ -177,6 +177,8 @@ private:
Buffer key_;
};
using MessageList = std::vector<Message>;
/**
* Represents a message's timestamp
*/

183
include/cppkafka/queue.h Normal file
View File

@@ -0,0 +1,183 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <vector>
#include <memory>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
#include "macros.h"
#include "message.h"
#ifndef CPPKAFKA_QUEUE_H
#define CPPKAFKA_QUEUE_H
namespace cppkafka {
/**
* \brief Represents a rdkafka queue
*
* This is a simple wrapper over a rd_kafka_queue_t*
*/
class CPPKAFKA_API Queue {
public:
/**
* \brief Creates a Queue object that doesn't take ownership of the handle
*
* \param handle The handle to be used
*/
static Queue make_non_owning(rd_kafka_queue_t* handle);
/**
* \brief Constructs an empty queue
*
* Note that using any methods except Queue::get_handle on an empty queue is undefined
* behavior
*/
Queue();
/**
* \brief Constructs a queue using a handle
*
* This will take ownership of the handle
*
* \param handle The handle to be used
*/
Queue(rd_kafka_queue_t* handle);
/**
* Returns the rdkakfa handle
*/
rd_kafka_queue_t* get_handle() const;
/**
* \brief Returns the length of the queue
*
* This translates to a call to rd_kafka_queue_length
*/
size_t get_length() const;
/**
* \brief Forward to another queue
*
* This translates to a call to rd_kafka_queue_forward
*/
void forward_to_queue(const Queue& forward_queue) const;
/**
* \brief Disable forwarding to another queue
*
* This translates to a call to rd_kafka_queue_forward(NULL)
*/
void disable_queue_forwarding() const;
/**
* \brief Sets the timeout for consume operations
*
* This timeout is applied when calling consume()
*
* \param timeout The timeout to be set
*/
void set_timeout(std::chrono::milliseconds timeout);
/**
* Gets the configured timeout.
*
* \sa Queue::set_timeout
*/
std::chrono::milliseconds get_timeout() const;
/**
* \brief Consume a message from this queue
*
* This translates to a call to rd_kafka_consume_queue using the configured timeout for this object
*
* \return A message
*/
Message consume() const;
/**
* \brief Consume a message from this queue
*
* Same as consume() but the specified timeout will be used instead of the configured one
*
* \param timeout The timeout to be used on this call
*
* \return A message
*/
Message consume(std::chrono::milliseconds timeout) const;
/**
* \brief Consumes a batch of messages from this queue
*
* This translates to a call to rd_kafka_consume_batch_queue using the configured timeout for this object
*
* \param max_batch_size The max number of messages to consume if available
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size) const;
/**
* \brief Consumes a batch of messages from this queue
*
* Same as Queue::consume_batch(size_t) but the specified timeout will be used instead of the configured one
*
* \param max_batch_size The max number of messages to consume if available
*
* \param timeout The timeout to be used on this call
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
/**
* Indicates whether this queue is valid (not null)
*/
explicit operator bool() const {
return handle_ != nullptr;
}
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
using HandlePtr = std::unique_ptr<rd_kafka_queue_t, decltype(&rd_kafka_queue_destroy)>;
struct NonOwningTag { };
Queue(rd_kafka_queue_t* handle, NonOwningTag);
// Members
HandlePtr handle_;
std::chrono::milliseconds timeout_ms_;
};
using QueueList = std::vector<Queue>;
} // cppkafka
#endif //CPPKAFKA_QUEUE_H

View File

@@ -178,6 +178,13 @@ public:
*/
void clear();
/**
* \brief Get the number of messages in the buffer
*
* \return The number of messages
*/
size_t get_buffer_size() const;
/**
* \brief Sets the maximum amount of messages to be enqueued in the buffer.
*
@@ -199,13 +206,6 @@ public:
*/
ssize_t get_max_buffer_size() const;
/**
* \brief Get the number of unsent messages in the buffer
*
* \return The number of messages
*/
size_t get_buffer_size() const;
/**
* \brief Get the number of messages not yet acked by the broker
*
@@ -400,6 +400,11 @@ void BufferedProducer<BufferType>::clear() {
std::swap(tmp, messages_);
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_buffer_size() const {
return messages_.size();
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_max_buffer_size(ssize_t max_buffer_size) {
if (max_buffer_size < -1) {
@@ -413,11 +418,6 @@ ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
return max_buffer_size_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_buffer_size() const {
return messages_.size();
}
template <typename BufferType>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,

View File

@@ -238,7 +238,7 @@ private:
}
// Finds the first functor that accepts the parameters in a tuple and returns it. If no
// such functor is found, a static asertion will occur
// such functor is found, a static assertion will occur
template <typename Tuple, typename... Functors>
const typename find_type<Tuple, Functors...>::type&
find_matching_functor(const Functors&... functors) {

View File

@@ -0,0 +1,130 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_POLL_INTERFACE_H
#define CPPKAFKA_POLL_INTERFACE_H
#include "../consumer.h"
namespace cppkafka {
/**
* \interface PollInterface
*
* \brief Interface defining polling methods for the Consumer class
*/
struct PollInterface {
virtual ~PollInterface() = default;
/**
* \brief Get the underlying consumer controlled by this strategy
*
* \return A reference to the consumer instance
*/
virtual Consumer& get_consumer() = 0;
/**
* \brief Sets the timeout for polling functions
*
* This calls Consumer::set_timeout
*
* \param timeout The timeout to be set
*/
virtual void set_timeout(std::chrono::milliseconds timeout) = 0;
/**
* \brief Gets the timeout for polling functions
*
* This calls Consumer::get_timeout
*
* \return The timeout
*/
virtual std::chrono::milliseconds get_timeout() = 0;
/**
* \brief Polls all assigned partitions for new messages in round-robin fashion
*
* Each call to poll() will first consume from the global event queue and if there are
* no pending events, will attempt to consume from all partitions until a valid message is found.
* The timeout used on this call will be the one configured via PollInterface::set_timeout.
*
* \return A message. The returned message *might* be empty. It's necessary to check
* that it's a valid one before using it (see example above).
*
* \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism,
* otherwise the broker will think this consumer is down and will trigger a rebalance
* (if using dynamic subscription)
*/
virtual Message poll() = 0;
/**
* \brief Polls for new messages
*
* Same as the other overload of PollInterface::poll but the provided
* timeout will be used instead of the one configured on this Consumer.
*
* \param timeout The timeout to be used on this call
*/
virtual Message poll(std::chrono::milliseconds timeout) = 0;
/**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
*
* Each call to poll_batch() will first attempt to consume from the global event queue
* and if the maximum batch number has not yet been filled, will attempt to fill it by
* reading the remaining messages from each partition.
*
* \param max_batch_size The maximum amount of messages expected
*
* \return A list of messages
*
* \remark You need to call poll() or poll_batch() periodically as a keep alive mechanism,
* otherwise the broker will think this consumer is down and will trigger a rebalance
* (if using dynamic subscription)
*/
virtual MessageList poll_batch(size_t max_batch_size) = 0;
/**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
*
* Same as the other overload of PollInterface::poll_batch but the provided
* timeout will be used instead of the one configured on this Consumer.
*
* \param max_batch_size The maximum amount of messages expected
*
* \param timeout The timeout for this operation
*
* \return A list of messages
*/
virtual MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout) = 0;
};
} //cppkafka
#endif //CPPKAFKA_POLL_INTERFACE_H

View File

@@ -0,0 +1,150 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_POLL_STRATEGY_BASE_H
#define CPPKAFKA_POLL_STRATEGY_BASE_H
#include <map>
#include <boost/any.hpp>
#include "../queue.h"
#include "../topic_partition_list.h"
#include "poll_interface.h"
namespace cppkafka {
/**
* \brief Contains a partition queue and generic metadata which can be used to store
* related (user-specific) information.
*/
struct QueueData {
Queue queue;
boost::any metadata;
};
/**
* \class PollStrategyBase
*
* \brief Base implementation of the PollInterface
*/
class PollStrategyBase : public PollInterface {
public:
using QueueMap = std::map<TopicPartition, QueueData>;
/**
* \brief Constructor
*
* \param consumer A reference to the polled consumer instance
*/
explicit PollStrategyBase(Consumer& consumer);
/**
* \brief Destructor
*/
~PollStrategyBase();
/**
* \sa PollInterface::set_timeout
*/
void set_timeout(std::chrono::milliseconds timeout) override;
/**
* \sa PollInterface::get_timeout
*/
std::chrono::milliseconds get_timeout() override;
/**
* \sa PollInterface::get_consumer
*/
Consumer& get_consumer() final;
protected:
/**
* \brief Get the queues from all assigned partitions
*
* \return A map of queues indexed by partition
*/
QueueMap& get_partition_queues();
/**
* \brief Get the main consumer queue which services the underlying Consumer object
*
* \return The consumer queue
*/
QueueData& get_consumer_queue();
/**
* \brief Reset the internal state of the queues.
*
* Use this function to reset the state of any polling strategy or algorithm.
*
* \remark This function gets called by on_assignement(), on_revocation() and on_rebalance_error()
*/
virtual void reset_state();
/**
* \brief Function to be called when a new partition assignment takes place
*
* This method contains a default implementation. It adds all the new queues belonging
* to the provided partition list and calls reset_state().
*
* \param partitions Assigned topic partitions
*/
virtual void on_assignment(TopicPartitionList& partitions);
/**
* \brief Function to be called when an old partition assignment gets revoked
*
* This method contains a default implementation. It removes all the queues
* belonging to the provided partition list and calls reset_state().
*
* \param partitions Revoked topic partitions
*/
virtual void on_revocation(const TopicPartitionList& partitions);
/**
* \brief Function to be called when a topic rebalance error happens
*
* This method contains a default implementation. Calls reset_state().
*
* \param error The rebalance error
*/
virtual void on_rebalance_error(Error error);
private:
Consumer& consumer_;
QueueData consumer_queue_;
QueueMap partition_queues_;
Consumer::AssignmentCallback assignment_callback_;
Consumer::RevocationCallback revocation_callback_;
Consumer::RebalanceErrorCallback rebalance_error_callback_;
};
} //cppkafka
#endif //CPPKAFKA_POLL_STRATEGY_BASE_H

View File

@@ -0,0 +1,135 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H
#define CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H
#include <map>
#include <string>
#include "../exceptions.h"
#include "../consumer.h"
#include "../queue.h"
#include "poll_strategy_base.h"
namespace cppkafka {
/**
* \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin
* polling mechanism.
*
* The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of
* messages from each partition in turn. For performance reasons, librdkafka pre-fetches batches
* of messages from the kafka broker (one batch from each partition), and stores them locally in
* partition queues. Since all the internal partition queues are forwarded by default unto the
* group consumer queue (one per consumer), these batches end up being polled and consumed in the
* same sequence order.
* This adapter allows fair round-robin polling of all assigned partitions, one message at a time
* (or one batch at a time if poll_batch() is used). Note that poll_batch() has nothing to do with
* the internal batching mechanism of librdkafka.
*
* Example code on how to use this:
*
* \code
* // Create a consumer
* Consumer consumer(...);
* consumer.subscribe({ "my_topic" });
*
* // Optionally set the callbacks. This must be done *BEFORE* creating the strategy adapter
* consumer.set_assignment_callback(...);
* consumer.set_revocation_callback(...);
* consumer.set_rebalance_error_callback(...);
*
* // Create the adapter and use it for polling
* RoundRobinPollStrategy poll_strategy(consumer);
*
* while (true) {
* // Poll each partition in turn
* Message msg = poll_strategy.poll();
* if (msg) {
* // process valid message
* }
* }
* }
* \endcode
*
* \warning Calling directly poll() or poll_batch() on the Consumer object while using this adapter will
* lead to undesired results since the RoundRobinPollStrategy modifies the internal queuing mechanism of
* the Consumer instance it owns.
*/
class RoundRobinPollStrategy : public PollStrategyBase {
public:
RoundRobinPollStrategy(Consumer& consumer);
~RoundRobinPollStrategy();
/**
* \sa PollInterface::poll
*/
Message poll() override;
/**
* \sa PollInterface::poll
*/
Message poll(std::chrono::milliseconds timeout) override;
/**
* \sa PollInterface::poll_batch
*/
MessageList poll_batch(size_t max_batch_size) override;
/**
* \sa PollInterface::poll_batch
*/
MessageList poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) override;
protected:
/**
* \sa PollStrategyBase::reset_state
*/
void reset_state() final;
QueueData& get_next_queue();
private:
void consume_batch(Queue& queue,
MessageList& messages,
ssize_t& count,
std::chrono::milliseconds timeout);
void restore_forwarding();
// Members
QueueMap::iterator queue_iter_;
};
} //cppkafka
#endif //CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H

View File

@@ -5,6 +5,7 @@ set(SOURCES
exceptions.cpp
topic.cpp
buffer.cpp
queue.cpp
message.cpp
topic_partition.cpp
topic_partition_list.cpp
@@ -18,6 +19,8 @@ set(SOURCES
utils/backoff_performer.cpp
utils/backoff_committer.cpp
utils/poll_strategy_base.cpp
utils/roundrobin_poll_strategy.cpp
)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)

View File

@@ -52,14 +52,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}
Consumer::Consumer(Configuration config)
Consumer::Consumer(Configuration config)
: KafkaHandleBase(move(config)) {
char error_buffer[512];
rd_kafka_conf_t* config_handle = get_configuration_handle();
// Set ourselves as the opaque pointer
rd_kafka_conf_set_opaque(config_handle, this);
rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_conf_dup(config_handle),
error_buffer, sizeof(error_buffer));
if (!ptr) {
@@ -165,7 +165,7 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
const int partition = topic_partition.get_partition();
rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(),
partition, &low, &high);
check_error(result);
@@ -232,16 +232,14 @@ Message Consumer::poll() {
}
Message Consumer::poll(milliseconds timeout) {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),
static_cast<int>(timeout.count()));
return message ? Message(message) : Message();
return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count()));
}
vector<Message> Consumer::poll_batch(size_t max_batch_size) {
MessageList Consumer::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_timeout());
}
vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(get_handle());
ssize_t result = rd_kafka_consume_batch_queue(queue, timeout.count(), raw_messages.data(),
@@ -249,15 +247,27 @@ vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout
if (result == -1) {
check_error(rd_kafka_last_error());
// on the off-chance that check_error() does not throw an error
result = 0;
return MessageList();
}
vector<Message> output;
raw_messages.resize(result);
output.reserve(result);
for (const auto ptr : raw_messages) {
output.emplace_back(ptr);
}
return output;
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
}
Queue Consumer::get_main_queue() const {
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_main(get_handle())));
queue.disable_queue_forwarding();
return queue;
}
Queue Consumer::get_consumer_queue() const {
return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle()));
}
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(),
partition.get_topic().c_str(),
partition.get_partition())));
queue.disable_queue_forwarding();
return queue;
}
void Consumer::close() {

View File

@@ -108,4 +108,15 @@ Error ConsumerException::get_error() const {
return error_;
}
// QueueException
QueueException::QueueException(Error error)
: Exception(error.to_string()), error_(error) {
}
Error QueueException::get_error() const {
return error_;
}
} // cppkafka

View File

@@ -57,10 +57,9 @@ Message::Message(rd_kafka_message_t* handle, NonOwningTag)
}
Message::Message(HandlePtr handle)
: handle_(move(handle)),
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
key_((const Buffer::DataType*)handle_->key, handle_->key_len) {
: handle_(move(handle)),
payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) {
}
// MessageTimestamp

118
src/queue.cpp Normal file
View File

@@ -0,0 +1,118 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "queue.h"
#include "exceptions.h"
using std::vector;
using std::exception;
using std::chrono::milliseconds;
namespace cppkafka {
void dummy_deleter(rd_kafka_queue_t*) {
}
const milliseconds Queue::DEFAULT_TIMEOUT{1000};
Queue Queue::make_non_owning(rd_kafka_queue_t* handle) {
return Queue(handle, NonOwningTag{});
}
Queue::Queue()
: handle_(nullptr, nullptr),
timeout_ms_(DEFAULT_TIMEOUT) {
}
Queue::Queue(rd_kafka_queue_t* handle)
: handle_(handle, &rd_kafka_queue_destroy),
timeout_ms_(DEFAULT_TIMEOUT) {
}
Queue::Queue(rd_kafka_queue_t* handle, NonOwningTag)
: handle_(handle, &dummy_deleter) {
}
rd_kafka_queue_t* Queue::get_handle() const {
return handle_.get();
}
size_t Queue::get_length() const {
return rd_kafka_queue_length(handle_.get());
}
void Queue::forward_to_queue(const Queue& forward_queue) const {
return rd_kafka_queue_forward(handle_.get(), forward_queue.handle_.get());
}
void Queue::disable_queue_forwarding() const {
return rd_kafka_queue_forward(handle_.get(), nullptr);
}
void Queue::set_timeout(milliseconds timeout) {
timeout_ms_ = timeout;
}
milliseconds Queue::get_timeout() const {
return timeout_ms_;
}
Message Queue::consume() const {
return consume(timeout_ms_);
}
Message Queue::consume(milliseconds timeout) const {
return Message(rd_kafka_consume_queue(handle_.get(), static_cast<int>(timeout.count())));
}
MessageList Queue::consume_batch(size_t max_batch_size) const {
return consume_batch(max_batch_size, timeout_ms_);
}
MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (result == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
}
return MessageList();
}
// Build message list
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
}
} //cppkafka

View File

@@ -34,7 +34,7 @@ using std::string;
namespace cppkafka {
void dummy_topic_destroyer(rd_kafka_topic_t*) {
void dummy_deleter(rd_kafka_topic_t*) {
}
@@ -47,13 +47,13 @@ Topic::Topic()
}
Topic::Topic(rd_kafka_topic_t* handle)
Topic::Topic(rd_kafka_topic_t* handle)
: handle_(handle, &rd_kafka_topic_destroy) {
}
Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag)
: handle_(handle, &dummy_topic_destroyer) {
: handle_(handle, &dummy_deleter) {
}

View File

@@ -0,0 +1,129 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "utils/poll_strategy_base.h"
#include "consumer.h"
using std::chrono::milliseconds;
namespace cppkafka {
PollStrategyBase::PollStrategyBase(Consumer& consumer)
: consumer_(consumer),
consumer_queue_(QueueData{consumer.get_consumer_queue(), boost::any()}) {
// get all currently active partition assignments
TopicPartitionList assignment = consumer_.get_assignment();
on_assignment(assignment);
// take over the assignment callback
assignment_callback_ = consumer.get_assignment_callback();
consumer_.set_assignment_callback([this](TopicPartitionList& partitions) {
on_assignment(partitions);
});
// take over the revocation callback
revocation_callback_ = consumer.get_revocation_callback();
consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) {
on_revocation(partitions);
});
// take over the rebalance error callback
rebalance_error_callback_ = consumer.get_rebalance_error_callback();
consumer_.set_rebalance_error_callback([this](Error error) {
on_rebalance_error(error);
});
}
PollStrategyBase::~PollStrategyBase() {
//reset the original callbacks
consumer_.set_assignment_callback(assignment_callback_);
consumer_.set_revocation_callback(revocation_callback_);
consumer_.set_rebalance_error_callback(rebalance_error_callback_);
}
void PollStrategyBase::set_timeout(milliseconds timeout) {
consumer_.set_timeout(timeout);
}
milliseconds PollStrategyBase::get_timeout() {
return consumer_.get_timeout();
}
Consumer& PollStrategyBase::get_consumer() {
return consumer_;
}
QueueData& PollStrategyBase::get_consumer_queue() {
return consumer_queue_;
}
PollStrategyBase::QueueMap& PollStrategyBase::get_partition_queues() {
return partition_queues_;
}
void PollStrategyBase::reset_state() {
}
void PollStrategyBase::on_assignment(TopicPartitionList& partitions) {
// populate partition queues
for (const auto& partition : partitions) {
// get the queue associated with this partition
partition_queues_.emplace(partition, QueueData{consumer_.get_partition_queue(partition), boost::any()});
}
reset_state();
// call original consumer callback if any
if (assignment_callback_) {
assignment_callback_(partitions);
}
}
void PollStrategyBase::on_revocation(const TopicPartitionList& partitions) {
for (const auto& partition : partitions) {
// get the queue associated with this partition
auto toppar_it = partition_queues_.find(partition);
if (toppar_it != partition_queues_.end()) {
// remove this queue from the list
partition_queues_.erase(toppar_it);
}
}
reset_state();
// call original consumer callback if any
if (revocation_callback_) {
revocation_callback_(partitions);
}
}
void PollStrategyBase::on_rebalance_error(Error error) {
reset_state();
// call original consumer callback if any
if (rebalance_error_callback_) {
rebalance_error_callback_(error);
}
}
} //cppkafka

View File

@@ -0,0 +1,131 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "utils/roundrobin_poll_strategy.h"
using std::string;
using std::chrono::milliseconds;
using std::make_move_iterator;
namespace cppkafka {
RoundRobinPollStrategy::RoundRobinPollStrategy(Consumer& consumer)
: PollStrategyBase(consumer) {
reset_state();
}
RoundRobinPollStrategy::~RoundRobinPollStrategy() {
restore_forwarding();
}
Message RoundRobinPollStrategy::poll() {
return poll(get_consumer().get_timeout());
}
Message RoundRobinPollStrategy::poll(milliseconds timeout) {
// Always give priority to group and global events
Message message = get_consumer_queue().queue.consume(milliseconds(0));
if (message) {
return message;
}
size_t num_queues = get_partition_queues().size();
while (num_queues--) {
//consume the next partition (non-blocking)
message = get_next_queue().queue.consume(milliseconds(0));
if (message) {
return message;
}
}
// We still don't have a valid message so we block on the event queue
return get_consumer_queue().queue.consume(timeout);
}
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_consumer().get_timeout());
}
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, milliseconds timeout) {
MessageList messages;
ssize_t count = max_batch_size;
// batch from the group event queue first (non-blocking)
consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0));
size_t num_queues = get_partition_queues().size();
while ((count > 0) && (num_queues--)) {
// batch from the next partition (non-blocking)
consume_batch(get_next_queue().queue, messages, count, milliseconds(0));
}
// we still have space left in the buffer
if (count > 0) {
// wait on the event queue until timeout
consume_batch(get_consumer_queue().queue, messages, count, timeout);
}
return messages;
}
void RoundRobinPollStrategy::consume_batch(Queue& queue,
MessageList& messages,
ssize_t& count,
milliseconds timeout) {
MessageList queue_messages = queue.consume_batch(count, timeout);
if (queue_messages.empty()) {
return;
}
// concatenate both lists
messages.insert(messages.end(),
make_move_iterator(queue_messages.begin()),
make_move_iterator(queue_messages.end()));
// reduce total batch count
count -= queue_messages.size();
}
void RoundRobinPollStrategy::restore_forwarding() {
// forward all partition queues
for (const auto& toppar : get_partition_queues()) {
toppar.second.queue.forward_to_queue(get_consumer_queue().queue);
}
}
QueueData& RoundRobinPollStrategy::get_next_queue() {
if (get_partition_queues().empty()) {
throw QueueException(RD_KAFKA_RESP_ERR__STATE);
}
if (++queue_iter_ == get_partition_queues().end()) {
queue_iter_ = get_partition_queues().begin();
}
return queue_iter_->second;
}
void RoundRobinPollStrategy::reset_state() {
queue_iter_ = get_partition_queues().begin();
}
} //cppkafka

View File

@@ -7,14 +7,10 @@ set(KAFKA_TEST_INSTANCE "kafka-vm:9092"
add_custom_target(tests)
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp)
target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread)
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
add_executable(
cppkafka_tests
EXCLUDE_FROM_ALL
buffer_test.cpp
compacted_topic_processor_test.cpp
configuration_test.cpp
@@ -22,10 +18,11 @@ add_executable(
kafka_handle_base_test.cpp
producer_test.cpp
consumer_test.cpp
roundrobin_poll_test.cpp
# Main file
test_main.cpp
)
target_link_libraries(cppkafka_tests cppkafka-test)
target_link_libraries(cppkafka_tests cppkafka ${RDKAFKA_LIBRARY} pthread)
add_dependencies(tests cppkafka_tests)
add_test(cppkafka cppkafka_tests)

View File

@@ -8,6 +8,7 @@
#include "cppkafka/producer.h"
#include "cppkafka/consumer.h"
#include "cppkafka/utils/compacted_topic_processor.h"
#include "test_utils.h"
using std::string;
using std::to_string;
@@ -29,8 +30,6 @@ using std::chrono::milliseconds;
using namespace cppkafka;
static const string KAFKA_TOPIC = "cppkafka_test1";
static Configuration make_producer_config() {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -65,7 +64,7 @@ TEST_CASE("consumption", "[consumer][compacted]") {
compacted_consumer.set_event_handler([&](const Event& event) {
events.push_back(event);
});
consumer.subscribe({ KAFKA_TOPIC });
consumer.subscribe({ KAFKA_TOPICS[0] });
consumer.poll();
consumer.poll();
consumer.poll();
@@ -82,13 +81,13 @@ TEST_CASE("consumption", "[consumer][compacted]") {
};
for (const auto& element_pair : elements) {
const ElementType& element = element_pair.second;
MessageBuilder builder(KAFKA_TOPIC);
MessageBuilder builder(KAFKA_TOPICS[0]);
builder.partition(element.partition).key(element_pair.first).payload(element.value);
producer.produce(builder);
}
// Now erase the first element
string deleted_key = "42";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(0).key(deleted_key));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key));
for (size_t i = 0; i < 10; ++i) {
compacted_consumer.process_event();

View File

@@ -29,8 +29,6 @@ using std::chrono::system_clock;
using namespace cppkafka;
const string KAFKA_TOPIC = "cppkafka_test1";
static Configuration make_producer_config() {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -54,31 +52,32 @@ TEST_CASE("message consumption", "[consumer]") {
consumer.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
assignment = topic_partitions;
});
consumer.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner(consumer, 1, 3);
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, 1, KAFKA_NUM_PARTITIONS);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
runner.try_join();
// All 3 partitions should be ours
REQUIRE(assignment.size() == 3);
set<int> partitions = { 0, 1, 2 };
// All partitions should be ours
REQUIRE(assignment.size() == KAFKA_NUM_PARTITIONS);
set<int> partitions;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++));
for (const auto& topic_partition : assignment) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
REQUIRE(runner.get_messages().size() == 1);
CHECK(consumer.get_subscription() == vector<string>{ KAFKA_TOPIC });
CHECK(consumer.get_subscription() == vector<string>{ KAFKA_TOPICS[0] });
assignment = consumer.get_assignment();
CHECK(assignment.size() == 3);
CHECK(assignment.size() == KAFKA_NUM_PARTITIONS);
int64_t low;
int64_t high;
tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition });
tie(low, high) = consumer.get_offsets({ KAFKA_TOPICS[0], partition });
CHECK(high > low);
CHECK(runner.get_messages().back().get_offset() + 1 == high);
}
@@ -97,15 +96,15 @@ TEST_CASE("consumer rebalance", "[consumer]") {
consumer1.set_revocation_callback([&](const TopicPartitionList&) {
revocation_called = true;
});
consumer1.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner1(consumer1, 1, 3);
consumer1.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner1(consumer1, 1, KAFKA_NUM_PARTITIONS);
// Create a second consumer and subscribe to the topic
Consumer consumer2(make_consumer_config());
consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
assignment2 = topic_partitions;
});
consumer2.subscribe({ KAFKA_TOPIC });
consumer2.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner2(consumer2, 1, 1);
CHECK(revocation_called == true);
@@ -113,19 +112,20 @@ TEST_CASE("consumer rebalance", "[consumer]") {
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
runner1.try_join();
runner2.try_join();
// All 3 partitions should be assigned
CHECK(assignment1.size() + assignment2.size() == 3);
set<int> partitions = { 0, 1, 2 };
// All partitions should be assigned
CHECK(assignment1.size() + assignment2.size() == KAFKA_NUM_PARTITIONS);
set<int> partitions;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace(i++));
for (const auto& topic_partition : assignment1) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
for (const auto& topic_partition : assignment2) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(topic_partition.get_topic() == KAFKA_TOPICS[0]);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
CHECK(runner1.get_messages().size() + runner2.get_messages().size() == 1);
@@ -143,18 +143,18 @@ TEST_CASE("consumer offset commit", "[consumer]") {
offset_commit_called = true;
CHECK(!!error == false);
REQUIRE(topic_partitions.size() == 1);
CHECK(topic_partitions[0].get_topic() == KAFKA_TOPIC);
CHECK(topic_partitions[0].get_topic() == KAFKA_TOPICS[0]);
CHECK(topic_partitions[0].get_partition() == 0);
CHECK(topic_partitions[0].get_offset() == message_offset + 1);
});
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
ConsumerRunner runner(consumer, 1, 1);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
runner.try_join();
REQUIRE(runner.get_messages().size() == 1);
@@ -173,7 +173,7 @@ TEST_CASE("consumer throttle", "[consumer]") {
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
{
ConsumerRunner runner(consumer, 0, 1);
@@ -183,7 +183,7 @@ TEST_CASE("consumer throttle", "[consumer]") {
// Produce a message just so we stop the consumer
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.flush();
size_t callback_executed_count = 0;
@@ -213,7 +213,7 @@ TEST_CASE("consume batch", "[consumer]") {
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("test");
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
consumer.assign({ { KAFKA_TOPICS[0], 0 } });
{
ConsumerRunner runner(consumer, 0, 1);
@@ -224,14 +224,14 @@ TEST_CASE("consume batch", "[consumer]") {
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world!";
// Produce it twice
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.flush();
vector<Message> all_messages;
MessageList all_messages;
int i = 0;
while (i < 5 && all_messages.size() != 2) {
vector<Message> messages = consumer.poll_batch(2);
MessageList messages = consumer.poll_batch(2);
all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()),
make_move_iterator(messages.end()));
++i;

View File

@@ -14,8 +14,6 @@ using std::string;
using namespace cppkafka;
static const string KAFKA_TOPIC = "cppkafka_test1";
Configuration make_config() {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
@@ -45,6 +43,9 @@ uint16_t get_kafka_port() {
}
TEST_CASE("metadata", "[handle_base]") {
if (KAFKA_TOPICS.size() < 2) {
return; //skip test
}
Producer producer({});
producer.add_brokers(KAFKA_TEST_INSTANCE);
Metadata metadata = producer.get_metadata();
@@ -59,7 +60,7 @@ TEST_CASE("metadata", "[handle_base]") {
}
SECTION("topics") {
unordered_set<string> topic_names = { "cppkafka_test1", "cppkafka_test2" };
unordered_set<string> topic_names = { KAFKA_TOPICS[0], KAFKA_TOPICS[1] };
size_t found_topics = 0;
const vector<TopicMetadata>& topics = metadata.get_topics();
@@ -68,8 +69,9 @@ TEST_CASE("metadata", "[handle_base]") {
for (const auto& topic : topics) {
if (topic_names.count(topic.get_name()) == 1) {
const vector<PartitionMetadata>& partitions = topic.get_partitions();
REQUIRE(partitions.size() == 3);
set<int32_t> expected_ids = { 0, 1, 2 };
REQUIRE(partitions.size() == KAFKA_NUM_PARTITIONS);
set<int32_t> expected_ids;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_ids.emplace(i++));
for (const PartitionMetadata& partition : partitions) {
REQUIRE(expected_ids.erase(partition.get_id()) == 1);
for (int32_t replica : partition.get_replicas()) {
@@ -90,8 +92,8 @@ TEST_CASE("metadata", "[handle_base]") {
CHECK(metadata.get_topics_prefixed("cppkafka_").size() == topic_names.size());
// Now get the whole metadata only for this topic
Topic topic = producer.get_topic(KAFKA_TOPIC);
CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPIC);
Topic topic = producer.get_topic(KAFKA_TOPICS[0]);
CHECK(producer.get_metadata(topic).get_name() == KAFKA_TOPICS[0]);
}
}
@@ -106,7 +108,7 @@ TEST_CASE("consumer groups", "[handle_base]") {
// Build consumer
Consumer consumer(config);
consumer.subscribe({ KAFKA_TOPIC });
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, 0, 3);
runner.try_join();
@@ -120,11 +122,8 @@ TEST_CASE("consumer groups", "[handle_base]") {
MemberAssignmentInformation assignment = member.get_member_assignment();
CHECK(assignment.get_version() == 0);
TopicPartitionList expected_topic_partitions = {
{ KAFKA_TOPIC, 0 },
{ KAFKA_TOPIC, 1 },
{ KAFKA_TOPIC, 2 }
};
TopicPartitionList expected_topic_partitions;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; expected_topic_partitions.emplace_back(KAFKA_TOPICS[0], i++));
TopicPartitionList topic_partitions = assignment.get_topic_partitions();
sort(topic_partitions.begin(), topic_partitions.end());
CHECK(topic_partitions == expected_topic_partitions);

View File

@@ -28,8 +28,6 @@ using std::ref;
using namespace cppkafka;
static const string KAFKA_TOPIC = "cppkafka_test1";
static Configuration make_producer_config() {
Configuration config = {
{ "metadata.broker.list", KAFKA_TEST_INSTANCE },
@@ -54,7 +52,7 @@ void producer_run(BufferedProducer<string>& producer,
int& exit_flag, condition_variable& clear,
int num_messages,
int partition) {
MessageBuilder builder(KAFKA_TOPIC);
MessageBuilder builder(KAFKA_TOPICS[0]);
string key("wassup?");
string payload("nothing much!");
@@ -93,7 +91,7 @@ TEST_CASE("simple production", "[producer]") {
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
ConsumerRunner runner(consumer, 1, 1);
Configuration config = make_producer_config();
@@ -101,7 +99,7 @@ TEST_CASE("simple production", "[producer]") {
// Now create a producer and produce a message
const string payload = "Hello world! 1";
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
runner.try_join();
const auto& messages = runner.get_messages();
@@ -109,13 +107,13 @@ TEST_CASE("simple production", "[producer]") {
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(!!message.get_key() == false);
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
int64_t low;
int64_t high;
tie(low, high) = producer.query_offsets({ KAFKA_TOPIC, partition });
tie(low, high) = producer.query_offsets({ KAFKA_TOPICS[0], partition });
CHECK(high > low);
}
@@ -124,7 +122,7 @@ TEST_CASE("simple production", "[producer]") {
const string key = "such key";
const milliseconds timestamp{15};
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
.key(key)
.payload(payload)
.timestamp(timestamp));
@@ -135,7 +133,7 @@ TEST_CASE("simple production", "[producer]") {
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(message.get_key() == key);
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
REQUIRE(!!message.get_timestamp() == true);
@@ -147,7 +145,7 @@ TEST_CASE("simple production", "[producer]") {
const string key = "replay key";
const milliseconds timestamp{15};
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
.key(key)
.payload(payload)
.timestamp(timestamp));
@@ -167,7 +165,7 @@ TEST_CASE("simple production", "[producer]") {
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(message.get_key() == key);
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
REQUIRE(!!message.get_timestamp() == true);
@@ -188,14 +186,14 @@ TEST_CASE("simple production", "[producer]") {
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
int32_t partition_count) {
CHECK(msg_key == key);
CHECK(partition_count == 3);
CHECK(topic.get_name() == KAFKA_TOPIC);
CHECK(partition_count == KAFKA_NUM_PARTITIONS);
CHECK(topic.get_name() == KAFKA_TOPICS[0]);
return 0;
});
config.set_default_topic_configuration(topic_config);
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload));
while (producer.get_out_queue_length() > 0) {
producer.poll();
}
@@ -206,7 +204,7 @@ TEST_CASE("simple production", "[producer]") {
const auto& message = messages[0];
CHECK(message.get_payload() == payload);
CHECK(message.get_key() == key);
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
CHECK(delivery_report_called == true);
@@ -222,15 +220,15 @@ TEST_CASE("simple production", "[producer]") {
topic_config.set_partitioner_callback([&](const Topic& topic, const Buffer& msg_key,
int32_t partition_count) {
CHECK(msg_key == key);
CHECK(partition_count == 3);
CHECK(topic.get_name() == KAFKA_TOPIC);
CHECK(partition_count == KAFKA_NUM_PARTITIONS);
CHECK(topic.get_name() == KAFKA_TOPICS[0]);
callback_called = true;
return 0;
});
config.set_default_topic_configuration(topic_config);
Producer producer(config);
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).key(key).payload(payload));
producer.poll();
runner.try_join();
@@ -244,13 +242,12 @@ TEST_CASE("simple production", "[producer]") {
TEST_CASE("multiple messages", "[producer]") {
size_t message_count = 10;
int partitions = 3;
set<string> payloads;
// Create a consumer and subscribe to this topic
Consumer consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner(consumer, message_count, partitions);
consumer.subscribe({ KAFKA_TOPICS[0] });
ConsumerRunner runner(consumer, message_count, KAFKA_NUM_PARTITIONS);
// Now create a producer and produce a message
Producer producer(make_producer_config());
@@ -258,19 +255,19 @@ TEST_CASE("multiple messages", "[producer]") {
for (size_t i = 0; i < message_count; ++i) {
const string payload = payload_base + to_string(i);
payloads.insert(payload);
producer.produce(MessageBuilder(KAFKA_TOPIC).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).payload(payload));
}
runner.try_join();
const auto& messages = runner.get_messages();
REQUIRE(messages.size() == message_count);
for (const auto& message : messages) {
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(payloads.erase(message.get_payload()) == 1);
CHECK(!!message.get_error() == false);
CHECK(!!message.get_key() == false);
CHECK(message.get_partition() >= 0);
CHECK(message.get_partition() < 3);
CHECK(message.get_partition() < KAFKA_NUM_PARTITIONS);
}
}
@@ -279,22 +276,22 @@ TEST_CASE("buffered producer", "[producer][buffered_producer]") {
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
ConsumerRunner runner(consumer, 3, 1);
// Now create a buffered producer and produce two messages
BufferedProducer<string> producer(make_producer_config());
const string payload = "Hello world! 2";
const string key = "such key";
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition)
producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition)
.key(key)
.payload(payload));
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.flush();
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.wait_for_acks();
// Add another one but then clear it
producer.add_message(producer.make_builder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.add_message(producer.make_builder(KAFKA_TOPICS[0]).partition(partition).payload(payload));
producer.clear();
runner.try_join();
@@ -302,7 +299,7 @@ TEST_CASE("buffered producer", "[producer][buffered_producer]") {
REQUIRE(messages.size() == 3);
const auto& message = messages[0];
CHECK(message.get_key() == key);
CHECK(message.get_topic() == KAFKA_TOPIC);
CHECK(message.get_topic() == KAFKA_TOPICS[0]);
CHECK(message.get_partition() == partition);
CHECK(!!message.get_error() == false);
@@ -319,7 +316,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") {
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
ConsumerRunner runner(consumer, 3, 1);
// Now create a buffered producer and produce two messages
@@ -332,7 +329,7 @@ TEST_CASE("buffered producer with limited buffer", "[producer]") {
// Limit the size of the internal buffer
producer.set_max_buffer_size(num_messages-1);
while (num_messages--) {
producer.add_message(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload));
producer.add_message(MessageBuilder(KAFKA_TOPICS[0]).partition(partition).key(key).payload(payload));
}
REQUIRE(producer.get_buffer_size() == 1);
@@ -354,7 +351,7 @@ TEST_CASE("multi-threaded buffered producer", "[producer][buffered_producer]") {
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
consumer.assign({ TopicPartition(KAFKA_TOPICS[0], partition) });
ConsumerRunner runner(consumer, num_messages, 1);
BufferedProducer<string> producer(make_producer_config());

View File

@@ -0,0 +1,164 @@
#include <vector>
#include <thread>
#include <set>
#include <mutex>
#include <chrono>
#include <iterator>
#include <condition_variable>
#include <catch.hpp>
#include <memory>
#include <iostream>
#include "cppkafka/cppkafka.h"
#include "test_utils.h"
using std::vector;
using std::move;
using std::string;
using std::thread;
using std::set;
using std::mutex;
using std::tie;
using std::condition_variable;
using std::lock_guard;
using std::unique_lock;
using std::unique_ptr;
using std::make_move_iterator;
using std::chrono::seconds;
using std::chrono::milliseconds;
using std::chrono::system_clock;
using namespace cppkafka;
//==================================================================================
// Helper functions
//==================================================================================
static Configuration make_producer_config() {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
return config;
}
static Configuration make_consumer_config(const string& group_id = "rr_consumer_test") {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
config.set("enable.auto.commit", true);
config.set("enable.auto.offset.store", true );
config.set("auto.commit.interval.ms", 100);
config.set("group.id", group_id);
return config;
}
static vector<int> make_roundrobin_partition_vector(int total_messages) {
vector<int> partition_order;
for (int i = 0, partition = 0; i < total_messages+1; ++i) {
if ((i % KAFKA_NUM_PARTITIONS) == 0) {
partition = 0;
}
partition_order.push_back(partition++);
}
return partition_order;
}
//========================================================================
// TESTS
//========================================================================
TEST_CASE("serial consumer test", "[roundrobin consumer]") {
int messages_per_partition = 3;
int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition;
// Create a consumer and subscribe to the topic
Consumer consumer(make_consumer_config());
TopicPartitionList partitions;
for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++));
consumer.assign(partitions);
// Start the runner with the original consumer
ConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
// Produce messages so we stop the consumer
Producer producer(make_producer_config());
string payload = "Serial";
// push 3 messages in each partition
for (int i = 0; i < total_messages; ++i) {
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
}
producer.flush();
runner.try_join();
// Check that we have all messages
REQUIRE(runner.get_messages().size() == total_messages);
// messages should have sequential identical partition ids in groups of <messages_per_partition>
int expected_partition;
for (int i = 0; i < total_messages; ++i) {
if ((i % messages_per_partition) == 0) {
expected_partition = runner.get_messages()[i].get_partition();
}
REQUIRE(runner.get_messages()[i].get_partition() == expected_partition);
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
}
}
TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") {
TopicPartitionList assignment;
int messages_per_partition = 3;
int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition;
// Create a consumer and subscribe to the topic
PollStrategyAdapter consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPICS[0] });
consumer.add_polling_strategy(unique_ptr<PollInterface>(new RoundRobinPollStrategy(consumer)));
PollConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
// Produce messages so we stop the consumer
Producer producer(make_producer_config());
string payload = "RoundRobin";
// push 3 messages in each partition
for (int i = 0; i < total_messages; ++i) {
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
}
producer.flush();
runner.try_join();
// Check that we have all messages
REQUIRE(runner.get_messages().size() == total_messages);
// Check that we have one message from each partition in desired order
vector<int> partition_order = make_roundrobin_partition_vector(total_messages+KAFKA_NUM_PARTITIONS);
int partition_idx;
for (int i = 0; i < total_messages; ++i) {
if (i == 0) {
// find first polled partition index
partition_idx = runner.get_messages()[i].get_partition();
}
REQUIRE(runner.get_messages()[i].get_partition() == partition_order[i+partition_idx]);
REQUIRE((string)runner.get_messages()[i].get_payload() == payload);
}
//============ resume original poll strategy =============//
//validate that once the round robin strategy is deleted, normal poll works as before
consumer.delete_polling_strategy();
ConsumerRunner serial_runner(consumer, total_messages, KAFKA_NUM_PARTITIONS);
payload = "SerialPolling";
// push 3 messages in each partition
for (int i = 0; i < total_messages; ++i) {
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload));
}
producer.flush();
serial_runner.try_join();
// Check that we have all messages
REQUIRE(serial_runner.get_messages().size() == total_messages);
for (int i = 0; i < total_messages; ++i) {
REQUIRE((string)serial_runner.get_messages()[i].get_payload() == payload);
}
}

View File

@@ -15,6 +15,9 @@ using Catch::TestCaseStats;
using Catch::Totals;
using Catch::Session;
std::vector<std::string> KAFKA_TOPICS = {"cppkafka_test1", "cppkafka_test2"};
int KAFKA_NUM_PARTITIONS = 3;
namespace cppkafka {
class InstantTestReporter : public ConsoleReporter {

View File

@@ -1,91 +0,0 @@
#include <mutex>
#include <chrono>
#include <condition_variable>
#include "test_utils.h"
#include "cppkafka/utils/consumer_dispatcher.h"
using std::vector;
using std::move;
using std::thread;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
using std::condition_variable;
using std::chrono::system_clock;
using std::chrono::milliseconds;
using std::chrono::seconds;
using cppkafka::Consumer;
using cppkafka::ConsumerDispatcher;
using cppkafka::Message;
using cppkafka::TopicPartition;
ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
ConsumerDispatcher dispatcher(consumer_);
dispatcher.run(
// Message callback
[&](Message msg) {
if (number_eofs == partitions) {
messages_.push_back(move(msg));
}
},
// EOF callback
[&](ConsumerDispatcher::EndOfFile, const TopicPartition& topic_partition) {
if (number_eofs != partitions) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
},
// Every time there's any event callback
[&](ConsumerDispatcher::Event) {
if (expected > 0 && messages_.size() == expected) {
dispatcher.stop();
}
if (expected == 0 && number_eofs >= partitions) {
dispatcher.stop();
}
if (system_clock::now() - start >= seconds(20)) {
dispatcher.stop();
}
}
);
if (number_eofs < partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
ConsumerRunner::~ConsumerRunner() {
try_join();
}
const vector<Message>& ConsumerRunner::get_messages() const {
return messages_;
}
void ConsumerRunner::try_join() {
if (thread_.joinable()) {
thread_.join();
}
}

View File

@@ -4,21 +4,62 @@
#include <thread>
#include <vector>
#include "cppkafka/consumer.h"
#include "cppkafka/utils/roundrobin_poll_strategy.h"
#include "cppkafka/utils/consumer_dispatcher.h"
class ConsumerRunner {
extern const std::vector<std::string> KAFKA_TOPICS;
extern const int KAFKA_NUM_PARTITIONS;
using namespace cppkafka;
//==================================================================================
// BasicConsumerRunner
//==================================================================================
template <typename ConsumerType>
class BasicConsumerRunner {
public:
ConsumerRunner(cppkafka::Consumer& consumer, size_t expected, size_t partitions);
ConsumerRunner(const ConsumerRunner&) = delete;
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
~ConsumerRunner();
BasicConsumerRunner(ConsumerType& consumer,
size_t expected,
size_t partitions);
BasicConsumerRunner(const BasicConsumerRunner&) = delete;
BasicConsumerRunner& operator=(const BasicConsumerRunner&) = delete;
~BasicConsumerRunner();
const std::vector<cppkafka::Message>& get_messages() const;
void try_join();
private:
cppkafka::Consumer& consumer_;
ConsumerType& consumer_;
std::thread thread_;
std::vector<cppkafka::Message> messages_;
};
//==================================================================================
// PollStrategyAdapter
//==================================================================================
/**
* \brief Specific implementation which can be used with other
* util classes such as BasicConsumerDispatcher.
*/
class PollStrategyAdapter : public Consumer {
public:
PollStrategyAdapter(Configuration config);
void add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy);
void delete_polling_strategy();
Message poll();
Message poll(std::chrono::milliseconds timeout);
MessageList poll_batch(size_t max_batch_size);
MessageList poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);
void set_timeout(std::chrono::milliseconds timeout);
std::chrono::milliseconds get_timeout();
private:
std::unique_ptr<PollInterface> strategy_;
};
using PollConsumerRunner = BasicConsumerRunner<PollStrategyAdapter>;
using ConsumerRunner = BasicConsumerRunner<Consumer>;
#include "test_utils_impl.h"
#endif // CPPKAFKA_TEST_UTILS_H

172
tests/test_utils_impl.h Normal file
View File

@@ -0,0 +1,172 @@
#include <mutex>
#include <chrono>
#include <condition_variable>
#include "test_utils.h"
#include "cppkafka/utils/consumer_dispatcher.h"
using std::vector;
using std::move;
using std::thread;
using std::mutex;
using std::lock_guard;
using std::unique_lock;
using std::condition_variable;
using std::chrono::system_clock;
using std::chrono::milliseconds;
using std::chrono::seconds;
using cppkafka::Consumer;
using cppkafka::BasicConsumerDispatcher;
using cppkafka::Message;
using cppkafka::MessageList;
using cppkafka::TopicPartition;
//==================================================================================
// BasicConsumerRunner
//==================================================================================
template <typename ConsumerType>
BasicConsumerRunner<ConsumerType>::BasicConsumerRunner(ConsumerType& consumer,
size_t expected,
size_t partitions)
: consumer_(consumer) {
bool booted = false;
mutex mtx;
condition_variable cond;
thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
BasicConsumerDispatcher<ConsumerType> dispatcher(consumer_);
dispatcher.run(
// Message callback
[&](Message msg) {
if (number_eofs == partitions) {
messages_.push_back(move(msg));
}
},
// EOF callback
[&](typename BasicConsumerDispatcher<ConsumerType>::EndOfFile, const TopicPartition& topic_partition) {
if (number_eofs != partitions) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
},
// Every time there's any event callback
[&](typename BasicConsumerDispatcher<ConsumerType>::Event) {
if (expected > 0 && messages_.size() == expected) {
dispatcher.stop();
}
if (expected == 0 && number_eofs >= partitions) {
dispatcher.stop();
}
if (system_clock::now() - start >= seconds(20)) {
dispatcher.stop();
}
}
);
// dispatcher has stopped
if (number_eofs < partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
});
unique_lock<mutex> lock(mtx);
while (!booted) {
cond.wait(lock);
}
}
template <typename ConsumerType>
BasicConsumerRunner<ConsumerType>::~BasicConsumerRunner() {
try_join();
}
template <typename ConsumerType>
const MessageList& BasicConsumerRunner<ConsumerType>::get_messages() const {
return messages_;
}
template <typename ConsumerType>
void BasicConsumerRunner<ConsumerType>::try_join() {
if (thread_.joinable()) {
thread_.join();
}
}
//==================================================================================
// PollStrategyAdapter
//==================================================================================
inline
PollStrategyAdapter::PollStrategyAdapter(Configuration config)
: Consumer(config) {
}
inline
void PollStrategyAdapter::add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy) {
strategy_ = std::move(poll_strategy);
}
inline
void PollStrategyAdapter::delete_polling_strategy() {
strategy_.reset();
}
inline
Message PollStrategyAdapter::poll() {
if (strategy_) {
return strategy_->poll();
}
return Consumer::poll();
}
inline
Message PollStrategyAdapter::poll(milliseconds timeout) {
if (strategy_) {
return strategy_->poll(timeout);
}
return Consumer::poll(timeout);
}
inline
MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size) {
if (strategy_) {
return strategy_->poll_batch(max_batch_size);
}
return Consumer::poll_batch(max_batch_size);
}
inline
MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size,
milliseconds timeout) {
if (strategy_) {
return strategy_->poll_batch(max_batch_size, timeout);
}
return Consumer::poll_batch(max_batch_size, timeout);
}
inline
void PollStrategyAdapter::set_timeout(milliseconds timeout) {
if (strategy_) {
strategy_->set_timeout(timeout);
}
else {
Consumer::set_timeout(timeout);
}
}
inline
milliseconds PollStrategyAdapter::get_timeout() {
if (strategy_) {
return strategy_->get_timeout();
}
return Consumer::get_timeout();
}