intial polling version

This commit is contained in:
accelerated
2018-04-27 00:20:47 -04:00
parent 429ec92369
commit 15be627f8e
15 changed files with 856 additions and 68 deletions

View File

@@ -35,7 +35,7 @@
#include <chrono>
#include <functional>
#include "kafka_handle_base.h"
#include "message.h"
#include "queue.h"
#include "macros.h"
#include "error.h"
@@ -54,7 +54,7 @@ class TopicConfiguration;
* Semi-simple code showing how to use this class
*
* \code
* // Create a configuration and set the group.id and broker list fields
* // Create a configuration and set the group.id and broker list fields
* Configuration config = {
* { "metadata.broker.list", "127.0.0.1:9092" },
* { "group.id", "foo" }
@@ -74,13 +74,13 @@ class TopicConfiguration;
* consumer.set_revocation_callback([&](const TopicPartitionList& topic_partitions) {
* cout << topic_partitions.size() << " partitions revoked!" << endl;
* });
*
* // Subscribe
*
* // Subscribe
* consumer.subscribe({ "my_topic" });
* while (true) {
* // Poll. This will optionally return a message. It's necessary to check if it's a valid
* // one before using it
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* if (!msg.get_error()) {
* // It's an actual message. Get the payload and print it to stdout
@@ -103,12 +103,12 @@ public:
/**
* \brief Creates an instance of a consumer.
*
* Note that the configuration *must contain* the group.id attribute set or this
*
* Note that the configuration *must contain* the group.id attribute set or this
* will throw.
*
* \param config The configuration to be used
*/
*/
Consumer(Configuration config);
Consumer(const Consumer&) = delete;
Consumer(Consumer&&) = delete;
@@ -124,7 +124,7 @@ public:
/**
* \brief Sets the topic/partition assignment callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -138,7 +138,7 @@ public:
/**
* \brief Sets the topic/partition revocation callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -153,7 +153,7 @@ public:
/**
* \brief Sets the rebalance error callback
*
*
* The Consumer class will use rd_kafka_conf_set_rebalance_cb and will handle the
* rebalance, converting from rdkafka topic partition list handles into TopicPartitionList
* and executing the assignment/revocation/rebalance_error callbacks.
@@ -188,9 +188,9 @@ public:
/**
* \brief Unassigns the current topic/partition assignment
*
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* This translates into a call to rd_kafka_assign using a null as the topic partition list
* parameter
*/
*/
void unassign();
/**
@@ -262,7 +262,9 @@ public:
*
* This translates into a call to rd_kafka_get_watermark_offsets
*
* \param topic_partition The topic/partition to get the offsets from
* \param topic_partition The topic/partition to get the offsets from
*
* \return A pair of offsets {low, high}
*/
OffsetTuple get_offsets(const TopicPartition& topic_partition) const;
@@ -272,6 +274,8 @@ public:
* This translates into a call to rd_kafka_committed
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_committed(const TopicPartitionList& topic_partitions) const;
@@ -281,6 +285,8 @@ public:
* This translates into a call to rd_kafka_position
*
* \param topic_partitions The topic/partition list to be queried
*
* \return The topic partition list
*/
TopicPartitionList get_offsets_position(const TopicPartitionList& topic_partitions) const;
@@ -295,6 +301,8 @@ public:
* \brief Gets the current topic/partition list assignment
*
* This translates to a call to rd_kafka_assignment
*
* \return The topic partition list
*/
TopicPartitionList get_assignment() const;
@@ -302,21 +310,29 @@ public:
* \brief Gets the group member id
*
* This translates to a call to rd_kafka_memberid
*
* \return The id
*/
std::string get_member_id() const;
/**
* Gets the partition assignment callback.
* \brief Gets the partition assignment callback.
*
* \return The callback reference
*/
const AssignmentCallback& get_assignment_callback() const;
/**
* Gets the partition revocation callback.
* \brief Gets the partition revocation callback.
*
* \return The callback reference
*/
const RevocationCallback& get_revocation_callback() const;
/**
* Gets the rebalance error callback.
* \brief Gets the rebalance error callback.
*
* \return The callback reference
*/
const RebalanceErrorCallback& get_rebalance_error_callback() const;
@@ -326,16 +342,16 @@ public:
* This will call rd_kafka_consumer_poll.
*
* Note that you need to call poll periodically as a keep alive mechanism, otherwise the broker
* will think this consumer is down and will trigger a rebalance (if using dynamic
* will think this consumer is down and will trigger a rebalance (if using dynamic
* subscription).
*
* The timeout used on this call will be the one configured via Consumer::set_timeout.
*
* The returned message *might* be empty. If's necessary to check that it's a valid one before
* using it:
*
* \return A message. The returned message *might* be empty. If's necessary to check
* that it's a valid one before using it:
*
* \code
* Message msg = consumer.poll();
* Message msg = consumer.poll();
* if (msg) {
* // It's a valid message!
* }
@@ -350,6 +366,8 @@ public:
* instead of the one configured on this Consumer.
*
* \param timeout The timeout to be used on this call
*
* \return A message
*/
Message poll(std::chrono::milliseconds timeout);
@@ -359,8 +377,10 @@ public:
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
*
* \return A list of messages
*/
std::vector<Message> poll_batch(size_t max_batch_size);
MessageList poll_batch(size_t max_batch_size);
/**
* \brief Polls for a batch of messages
@@ -369,8 +389,42 @@ public:
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
*
* \return A list of messages
*/
std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
/**
* \brief Get the global event queue servicing this consumer corresponding to
* rd_kafka_queue_get_main and which is polled via rd_kafka_poll
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
Queue get_main_queue() const;
/**
* \brief Get the consumer group queue servicing corresponding to
* rd_kafka_queue_get_consumer and which is polled via rd_kafka_consumer_poll
*
* \return A Queue object
*/
Queue get_consumer_queue() const;
/**
* \brief Get the queue belonging to this partition. If the consumer is not assigned to this
* partition, an empty queue will be returned
*
* \param partition The partition object
*
* \return A Queue object
*
* \remark Note that this call will disable forwarding to the consumer_queue.
* To restore forwarding (if desired) call Queue::forward_to_queue(consumer_queue)
*/
Queue get_partition_queue(const TopicPartition& partition) const;
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);

View File

@@ -46,6 +46,7 @@
#include <cppkafka/message_builder.h>
#include <cppkafka/metadata.h>
#include <cppkafka/producer.h>
#include <cppkafka/queue.h>
#include <cppkafka/topic.h>
#include <cppkafka/topic_configuration.h>
#include <cppkafka/topic_partition.h>
@@ -55,5 +56,6 @@
#include <cppkafka/utils/buffered_producer.h>
#include <cppkafka/utils/compacted_topic_processor.h>
#include <cppkafka/utils/consumer_dispatcher.h>
#include <cppkafka/utils/roundrobin_poll_adapter.h>
#endif

View File

@@ -122,6 +122,18 @@ private:
Error error_;
};
/**
* Queue exception for rd_kafka_queue_t errors
*/
class CPPKAFKA_API QueueException : public Exception {
public:
QueueException(Error error);
Error get_error() const;
private:
Error error_;
};
} // cppkafka
#endif // CPPKAFKA_EXCEPTIONS_H

View File

@@ -136,6 +136,8 @@ private:
std::vector<GroupMemberInformation> members_;
};
using GroupInformationList = std::vector<GroupInformation>;
} // cppkafka
#endif // CPPKAFKA_GROUP_INFORMATION_H

View File

@@ -39,6 +39,7 @@
#include <tuple>
#include <chrono>
#include <librdkafka/rdkafka.h>
#include "group_information.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
#include "topic_configuration.h"
@@ -108,11 +109,15 @@ public:
* This translates into a call to rd_kafka_query_watermark_offsets
*
* \param topic_partition The topic/partition to be queried
*
* \return A pair of watermark offsets {low, high}
*/
OffsetTuple query_offsets(const TopicPartition& topic_partition) const;
/**
* Gets the rdkafka handle
* \brief Gets the rdkafka handle
*
* \return The rdkafka handle
*/
rd_kafka_t* get_handle() const;
@@ -123,7 +128,9 @@ public:
* configuration provided in the Configuration object for this consumer/producer handle,
* if any.
*
* \param name The name of the topic to be created
* \param name The name of the topic to be created
*
* \return A topic
*/
Topic get_topic(const std::string& name);
@@ -134,15 +141,19 @@ public:
*
* \param name The name of the topic to be created
* \param config The configuration to be used for the new topic
*
* \return A topic
*/
Topic get_topic(const std::string& name, TopicConfiguration config);
/**
* \brief Gets metadata for brokers, topics, partitions, etc
*
* This translates into a call to rd_kafka_metadata
*
* \param all_topics Whether to fetch metadata about all topics or only locally known ones
*
* This translates into a call to rd_kafka_metadata
* \return The metadata
*/
Metadata get_metadata(bool all_topics = true) const;
@@ -153,20 +164,26 @@ public:
* This translates into a call to rd_kafka_metadata
*
* \param topic The topic to fetch information for
*
* \return The topic metadata
*/
TopicMetadata get_metadata(const Topic& topic) const;
/**
* Gets the consumer group information
* \brief Gets the consumer group information
*
* \param name The name of the consumer group to look up
*
* \return The group information
*/
GroupInformation get_consumer_group(const std::string& name);
/**
* Gets all consumer groups
* \brief Gets all consumer groups
*
* \return A list of consumer groups
*/
std::vector<GroupInformation> get_consumer_groups();
GroupInformationList get_consumer_groups();
/**
* \brief Gets topic/partition offsets based on timestamps
@@ -174,23 +191,31 @@ public:
* This translates into a call to rd_kafka_offsets_for_times
*
* \param queries A map from topic/partition to the timestamp to be used
*
* \return A topic partition list
*/
TopicPartitionList get_offsets_for_times(const TopicPartitionsTimestampsMap& queries) const;
/**
* Returns the kafka handle name
* \brief Get the kafka handle name
*
* \return The handle name
*/
std::string get_name() const;
/**
* Gets the configured timeout.
* \brief Gets the configured timeout.
*
* \return The configured timeout
*
* \sa KafkaHandleBase::set_timeout
*/
std::chrono::milliseconds get_timeout() const;
/**
* Gets the handle's configuration
* \brief Gets the handle's configuration
*
* \return A reference to the configuration object
*/
const Configuration& get_configuration() const;
@@ -198,6 +223,8 @@ public:
* \brief Gets the length of the out queue
*
* This calls rd_kafka_outq_len
*
* \return The length of the queue
*/
int get_out_queue_length() const;
@@ -221,7 +248,7 @@ private:
Topic get_topic(const std::string& name, rd_kafka_topic_conf_t* conf);
Metadata get_metadata(bool all_topics, rd_kafka_topic_t* topic_ptr) const;
std::vector<GroupInformation> fetch_consumer_groups(const char* name);
GroupInformationList fetch_consumer_groups(const char* name);
void save_topic_config(const std::string& topic_name, TopicConfiguration config);
std::chrono::milliseconds timeout_ms_;

View File

@@ -177,6 +177,8 @@ private:
Buffer key_;
};
using MessageList = std::vector<Message>;
/**
* Represents a message's timestamp
*/

183
include/cppkafka/queue.h Normal file
View File

@@ -0,0 +1,183 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <vector>
#include <memory>
#include <boost/optional.hpp>
#include <librdkafka/rdkafka.h>
#include "macros.h"
#include "message.h"
#ifndef CPPKAFKA_QUEUE_H
#define CPPKAFKA_QUEUE_H
namespace cppkafka {
/**
* \brief Represents a rdkafka queue
*
* This is a simple wrapper over a rd_kafka_queue_t*
*/
class CPPKAFKA_API Queue {
public:
/**
* \brief Creates a Queue object that doesn't take ownership of the handle
*
* \param handle The handle to be used
*/
static Queue make_non_owning(rd_kafka_queue_t* handle);
/**
* \brief Constructs an empty queue
*
* Note that using any methods except Queue::get_handle on an empty queue is undefined
* behavior
*/
Queue();
/**
* \brief Constructs a queue using a handle
*
* This will take ownership of the handle
*
* \param handle The handle to be used
*/
Queue(rd_kafka_queue_t* handle);
/**
* Returns the rdkakfa handle
*/
rd_kafka_queue_t* get_handle() const;
/**
* \brief Returns the length of the queue
*
* This translates to a call to rd_kafka_queue_length
*/
size_t get_length() const;
/**
* \brief Forward to another queue
*
* This translates to a call to rd_kafka_queue_forward
*/
void forward_to_queue(const Queue& forward_queue) const;
/**
* \brief Disable forwarding to another queue
*
* This translates to a call to rd_kafka_queue_forward(NULL)
*/
void disable_queue_forwarding() const;
/**
* \brief Sets the timeout for consume operations
*
* This timeout is applied when calling consume()
*
* \param timeout The timeout to be set
*/
void set_consume_timeout(std::chrono::milliseconds timeout);
/**
* Gets the configured timeout.
*
* \sa Queue::set_timeout
*/
std::chrono::milliseconds get_consume_timeout() const;
/**
* \brief Consume a message from this queue
*
* This translates to a call to rd_kafka_consume_queue using the configured timeout for this object
*
* \return A message
*/
Message consume() const;
/**
* \brief Consume a message from this queue
*
* Same as consume() but the specified timeout will be used instead of the configured one
*
* \param timeout The timeout to be used on this call
*
* \return A message
*/
Message consume(std::chrono::milliseconds timeout) const;
/**
* \brief Consumes a batch of messages from this queue
*
* This translates to a call to rd_kafka_consume_batch_queue using the configured timeout for this object
*
* \param max_batch_size The max number of messages to consume if available
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size) const;
/**
* \brief Consumes a batch of messages from this queue
*
* Same as Queue::consume_batch(size_t) but the specified timeout will be used instead of the configured one
*
* \param max_batch_size The max number of messages to consume if available
*
* \param timeout The timeout to be used on this call
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
/**
* Indicates whether this queue is valid (not null)
*/
explicit operator bool() const {
return handle_ != nullptr;
}
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
using HandlePtr = std::unique_ptr<rd_kafka_queue_t, decltype(&rd_kafka_queue_destroy)>;
struct NonOwningTag { };
Queue(rd_kafka_queue_t* handle, NonOwningTag);
// Members
HandlePtr handle_;
std::chrono::milliseconds timeout_ms_;
};
using QueueList = std::vector<Queue>;
} // cppkafka
#endif //CPPKAFKA_QUEUE_H

View File

@@ -0,0 +1,213 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#ifndef CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H
#define CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H
#include <list>
#include "../exceptions.h"
#include "../consumer.h"
#include "../queue.h"
namespace cppkafka {
/**
* \brief This adapter changes the default polling strategy of the Consumer into a fair round-robin
* polling mechanism.
*
* The default librdkafka (and cppkafka) poll() and poll_batch() behavior is to consume batches of
* messages from each partition in turn. For performance reasons, librdkafka pre-fetches batches
* of messages from the kafka broker (one batch from each partition), and stores them locally in
* partition queues. Since all the internal partition queues are forwarded by default unto the
* group consumer queue (one per consumer), these batches end up being queued in sequence or arrival.
* For instance, a topic with 4 partitions (each containing N messages) will end up being queued as
* N1|N2|N3|N4 in the consumer queue. This means that for the Consumer to process messages from the
* 4th partition, it needs to consume 3xN messages. The larger the number of partitions, the more
* starvation occurs. While this behavior is acceptable for some applications, real-time applications
* sensitive to timing or those where messages must be processed more or less in the same order as
* they're being produced, the default librdkafka behavior is unacceptable.
* Fortunately, librdkafka exposes direct access to its partition queues which means that various
* polling strategies can be implemented to suit needs.
* This adapter allows fair round-robin polling of all assigned partitions, one message at a time
* (or one batch at a time if poll_batch() is used). Note that poll_batch() has nothing to do with
* the internal batching mechanism of librdkafka.
*
* Example code on how to use this:
*
* \code
* // Create a consumer
* Consumer consumer(...);
*
* // Optionally set the callbacks. This must be done *BEFORE* creating the adapter
* consumer.set_assignment_callback(...);
* consumer.set_revocation_callback(...);
* consumer.set_rebalance_error_callback(...);
*
* // Create the adapter and use it for polling
* RoundRobinPollAdapter adapter(consumer);
*
* // Subscribe *AFTER* the adapter has been created
* consumer.subscribe({ "my_topic" });
*
* while (true) {
* // Poll each partition in turn
* Message msg = adapter.poll();
* if (msg) {
* // process valid message
* }
* }
* }
* \endcode
*
* \warning Calling directly poll() or poll_batch() on the Consumer object while using this adapter will
* lead to undesired results since the RoundRobinPollAdapter modifies the internal queuing mechanism of
* the Consumer instance it owns.
*/
class RoundRobinPollAdapter
{
public:
RoundRobinPollAdapter(Consumer& consumer);
~RoundRobinPollAdapter();
/**
* \brief Sets the timeout for polling functions
*
* This calls Consumer::set_timeout
*
* \param timeout The timeout to be set
*/
void set_timeout(std::chrono::milliseconds timeout);
/**
* \brief Gets the timeout for polling functions
*
* This calls Consumer::get_timeout
*
* \return The timeout
*/
std::chrono::milliseconds get_timeout();
/**
* \brief Polls all assigned partitions for new messages in round-robin fashion
*
* Each call to poll() will result in another partition being polled. Aside from
* the partition, this function will also poll the main queue for events. If an
* event is found, it is immediately returned. As such the main queue has higher
* priority than the partition queues. Because of this, you
* need to call poll periodically as a keep alive mechanism, otherwise the broker
* will think this consumer is down and will trigger a rebalance (if using dynamic
* subscription).
* The timeout used on this call will be the one configured via RoundRobinPollAdapter::set_timeout.
*
* \return A message. The returned message *might* be empty. It's necessary to check
* that it's a valid one before using it (see example above).
*/
Message poll();
/**
* \brief Polls for new messages
*
* Same as the other overload of RoundRobinPollAdapter::poll but the provided
* timeout will be used instead of the one configured on this Consumer.
*
* \param timeout The timeout to be used on this call
*/
Message poll(std::chrono::milliseconds timeout);
/**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
*
* Each call to poll() will result in another partition being polled. Aside from
* the partition, this function will also poll the main queue for events. If a batch of
* events is found, it is prepended to the returned message list. If after polling the
* main queue the batch size has reached max_batch_size, it is immediately returned and
* the partition is no longer polled. Otherwise the partition is polled for the remaining
* messages up to the max_batch_size limit.
* Because of this, you need to call poll periodically as a keep alive mechanism,
* otherwise the broker will think this consumer is down and will trigger a rebalance
* (if using dynamic subscription).
*
* \param max_batch_size The maximum amount of messages expected
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size);
/**
* \brief Polls for a batch of messages depending on the configured PollStrategy
*
* Same as the other overload of RoundRobinPollAdapter::poll_batch but the provided
* timeout will be used instead of the one configured on this Consumer.
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
private:
class CircularBuffer {
using qlist = std::list<Queue>;
using qiter = qlist::iterator;
public:
qlist& ref() { return queues_; }
Queue& next() {
if (queues_.empty()) {
throw QueueException(RD_KAFKA_RESP_ERR__STATE);
}
if (++iter_ == queues_.end()) {
iter_ = queues_.begin();
}
return *iter_;
}
void rewind() { iter_ = queues_.begin(); }
private:
qlist queues_;
qiter iter_ = queues_.begin();
};
void on_assignment(TopicPartitionList& partitions);
void on_revocation(const TopicPartitionList& partitions);
void on_rebalance_error(Error error);
void restore_forwarding();
// Members
Consumer& consumer_;
Consumer::AssignmentCallback assignment_callback_;
Consumer::RevocationCallback revocation_callback_;
Consumer::RebalanceErrorCallback rebalance_error_callback_;
Queue consumer_queue_;
CircularBuffer partition_queues_;
};
} //cppkafka
#endif //CPPKAFKA_ROUNDROBIN_POLL_ADAPTER_H

View File

@@ -1,24 +1,4 @@
set(SOURCES
configuration.cpp
topic_configuration.cpp
configuration_option.cpp
exceptions.cpp
topic.cpp
buffer.cpp
message.cpp
topic_partition.cpp
topic_partition_list.cpp
metadata.cpp
group_information.cpp
error.cpp
kafka_handle_base.cpp
producer.cpp
consumer.cpp
utils/backoff_performer.cpp
utils/backoff_committer.cpp
)
file(GLOB SOURCES *.cpp utils/*.cpp)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/cppkafka)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR})

View File

@@ -52,14 +52,14 @@ void Consumer::rebalance_proxy(rd_kafka_t*, rd_kafka_resp_err_t error,
static_cast<Consumer*>(opaque)->handle_rebalance(error, list);
}
Consumer::Consumer(Configuration config)
Consumer::Consumer(Configuration config)
: KafkaHandleBase(move(config)) {
char error_buffer[512];
rd_kafka_conf_t* config_handle = get_configuration_handle();
// Set ourselves as the opaque pointer
rd_kafka_conf_set_opaque(config_handle, this);
rd_kafka_conf_set_rebalance_cb(config_handle, &Consumer::rebalance_proxy);
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
rd_kafka_conf_dup(config_handle),
error_buffer, sizeof(error_buffer));
if (!ptr) {
@@ -165,7 +165,7 @@ KafkaHandleBase::OffsetTuple Consumer::get_offsets(const TopicPartition& topic_p
int64_t low;
int64_t high;
const string& topic = topic_partition.get_topic();
const int partition = topic_partition.get_partition();
const int partition = topic_partition.get_partition();
rd_kafka_resp_err_t result = rd_kafka_get_watermark_offsets(get_handle(), topic.data(),
partition, &low, &high);
check_error(result);
@@ -232,9 +232,7 @@ Message Consumer::poll() {
}
Message Consumer::poll(milliseconds timeout) {
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(),
static_cast<int>(timeout.count()));
return message ? Message(message) : Message();
return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count()));
}
vector<Message> Consumer::poll_batch(size_t max_batch_size) {
@@ -260,6 +258,24 @@ vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout
return output;
}
Queue Consumer::get_main_queue() const {
Queue queue = Queue::make_non_owning(rd_kafka_queue_get_main(get_handle()));
queue.disable_queue_forwarding();
return queue;
}
Queue Consumer::get_consumer_queue() const {
return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle()));
}
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
Queue queue = Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(),
partition.get_topic().c_str(),
partition.get_partition()));
queue.disable_queue_forwarding();
return queue;
}
void Consumer::close() {
rd_kafka_resp_err_t error = rd_kafka_consumer_close(get_handle());
check_error(error);

View File

@@ -108,4 +108,15 @@ Error ConsumerException::get_error() const {
return error_;
}
// QueueException
QueueException::QueueException(Error error)
: Exception(error.to_string()), error_(error) {
}
Error QueueException::get_error() const {
return error_;
}
} // cppkafka

View File

@@ -57,10 +57,9 @@ Message::Message(rd_kafka_message_t* handle, NonOwningTag)
}
Message::Message(HandlePtr handle)
: handle_(move(handle)),
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
key_((const Buffer::DataType*)handle_->key, handle_->key_len) {
: handle_(move(handle)),
payload_(handle_ ? Buffer((const Buffer::DataType*)handle_->payload, handle_->len) : Buffer()),
key_(handle_ ? Buffer((const Buffer::DataType*)handle_->key, handle_->key_len) : Buffer()) {
}
// MessageTimestamp

123
src/queue.cpp Normal file
View File

@@ -0,0 +1,123 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "queue.h"
#include "exceptions.h"
using std::vector;
using std::exception;
using std::chrono::milliseconds;
namespace cppkafka {
void dummy_deleter(rd_kafka_queue_t*) {
}
const milliseconds Queue::DEFAULT_TIMEOUT{1000};
Queue Queue::make_non_owning(rd_kafka_queue_t* handle) {
return Queue(handle, NonOwningTag{});
}
Queue::Queue()
: handle_(nullptr, nullptr),
timeout_ms_(DEFAULT_TIMEOUT) {
}
Queue::Queue(rd_kafka_queue_t* handle)
: handle_(handle, &rd_kafka_queue_destroy),
timeout_ms_(DEFAULT_TIMEOUT) {
}
Queue::Queue(rd_kafka_queue_t* handle, NonOwningTag)
: handle_(handle, &dummy_deleter) {
}
rd_kafka_queue_t* Queue::get_handle() const {
return handle_.get();
}
size_t Queue::get_length() const {
return rd_kafka_queue_length(handle_.get());
}
void Queue::forward_to_queue(const Queue& forward_queue) const {
return rd_kafka_queue_forward(handle_.get(), forward_queue.handle_.get());
}
void Queue::disable_queue_forwarding() const {
return rd_kafka_queue_forward(handle_.get(), nullptr);
}
void Queue::set_consume_timeout(milliseconds timeout) {
timeout_ms_ = timeout;
}
milliseconds Queue::get_consume_timeout() const {
return timeout_ms_;
}
Message Queue::consume() const {
return consume(timeout_ms_);
}
Message Queue::consume(milliseconds timeout) const {
return Message(rd_kafka_consume_queue(handle_.get(), static_cast<int>(timeout.count())));
}
MessageList Queue::consume_batch(size_t max_batch_size) const {
return consume_batch(max_batch_size, timeout_ms_);
}
MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
MessageList message_list;
vector<rd_kafka_message_t*> raw_message_list(max_batch_size);
ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_message_list.data(),
max_batch_size);
if (num_messages == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
}
return message_list;
}
raw_message_list.resize(num_messages);
message_list.reserve(num_messages);
for (auto&& message : raw_message_list) {
message_list.emplace_back(message);
}
return message_list;
}
} //cppkafka

View File

@@ -34,7 +34,7 @@ using std::string;
namespace cppkafka {
void dummy_topic_destroyer(rd_kafka_topic_t*) {
void dummy_deleter(rd_kafka_topic_t*) {
}
@@ -47,13 +47,13 @@ Topic::Topic()
}
Topic::Topic(rd_kafka_topic_t* handle)
Topic::Topic(rd_kafka_topic_t* handle)
: handle_(handle, &rd_kafka_topic_destroy) {
}
Topic::Topic(rd_kafka_topic_t* handle, NonOwningTag)
: handle_(handle, &dummy_topic_destroyer) {
: handle_(handle, &dummy_deleter) {
}

View File

@@ -0,0 +1,164 @@
/*
* Copyright (c) 2017, Matias Fontanini
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "utils/roundrobin_poll_adapter.h"
using std::chrono::milliseconds;
using std::make_move_iterator;
namespace cppkafka {
RoundRobinPollAdapter::RoundRobinPollAdapter(Consumer& consumer)
: consumer_(consumer),
assignment_callback_(consumer.get_assignment_callback()),
revocation_callback_(consumer.get_revocation_callback()),
rebalance_error_callback_(consumer.get_rebalance_error_callback()),
consumer_queue_(consumer.get_consumer_queue()) {
// take over the assignment callback
consumer_.set_assignment_callback([this](TopicPartitionList& partitions) {
on_assignment(partitions);
});
// take over the revocation callback
consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) {
on_revocation(partitions);
});
// take over the rebalance error callback
consumer_.set_rebalance_error_callback([this](Error error) {
on_rebalance_error(error);
});
// make sure we don't have any active subscriptions
if (!consumer_.get_subscription().empty()) {
throw ConsumerException(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION);
}
}
RoundRobinPollAdapter::~RoundRobinPollAdapter() {
restore_forwarding();
//set the original callbacks
consumer_.set_assignment_callback(assignment_callback_);
consumer_.set_revocation_callback(revocation_callback_);
consumer_.set_rebalance_error_callback(rebalance_error_callback_);
}
void RoundRobinPollAdapter::set_timeout(milliseconds timeout) {
consumer_.set_timeout(timeout);
}
milliseconds RoundRobinPollAdapter::get_timeout() {
return consumer_.get_timeout();
}
Message RoundRobinPollAdapter::poll() {
return poll(consumer_.get_timeout());
}
Message RoundRobinPollAdapter::poll(milliseconds timeout) {
bool empty_list = partition_queues_.ref().empty();
// Poll group event queue first
Message message = consumer_queue_.consume(empty_list ? timeout : milliseconds(0));
if (message) {
return message;
}
if (!empty_list) {
//consume the next partition
message = partition_queues_.next().consume(timeout);
}
return message;
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, consumer_.get_timeout());
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) {
bool empty_list = partition_queues_.ref().empty();
ssize_t remaining_count = max_batch_size;
// batch from the group event queue first
MessageList messages = consumer_queue_.consume_batch(remaining_count,
empty_list ? timeout : milliseconds(0));
remaining_count -= messages.size();
if ((remaining_count <= 0) || empty_list) {
// the entire batch was filled
return messages;
}
// batch from the next partition
MessageList partition_messages = partition_queues_.next().consume_batch(remaining_count, timeout);
if (messages.empty()) {
return partition_messages;
}
if (partition_messages.empty()) {
return messages;
}
// concatenate both lists
messages.reserve(messages.size() + partition_messages.size());
messages.insert(messages.end(),
make_move_iterator(partition_messages.begin()),
make_move_iterator(partition_messages.end()));
return messages;
}
void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) {
//populate partition queues
for (const auto& partition : partitions) {
partition_queues_.ref().push_back(consumer_.get_partition_queue(partition));
}
// call original consumer callback if any
if (assignment_callback_) {
assignment_callback_(partitions);
}
}
void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) {
// put all partitions queues back to their initial state
restore_forwarding();
// empty the circular queue list
partition_queues_.ref().clear();
// reset the queue iterator
partition_queues_.rewind();
// call original consumer callback if any
if (revocation_callback_) {
revocation_callback_(partitions);
}
}
void RoundRobinPollAdapter::on_rebalance_error(Error error) {
// call original consumer callback if any
if (rebalance_error_callback_) {
rebalance_error_callback_(error);
}
}
void RoundRobinPollAdapter::restore_forwarding() {
// forward all partition queues
for (const auto& queue : partition_queues_.ref()) {
queue.forward_to_queue(consumer_queue_);
}
}
} //cppkafka