Allocators (#118)

* Added allocator support for consumers and buffered producer

* Changed MessageList back to std::vector<Message> for consistency with the allocator API
This commit is contained in:
Alex Damian
2018-10-16 11:57:11 -04:00
committed by Matias Fontanini
parent d77e7466b8
commit 9af4330c6d
10 changed files with 247 additions and 165 deletions

View File

@@ -379,10 +379,14 @@ public:
* This can return one or more messages
*
* \param max_batch_size The maximum amount of messages expected
* \param alloc The optionally supplied allocator for allocating messages
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size);
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size);
/**
* \brief Polls for a batch of messages
@@ -391,10 +395,16 @@ public:
*
* \param max_batch_size The maximum amount of messages expected
* \param timeout The timeout for this operation
* \param alloc The optionally supplied allocator for allocating messages
*
* \return A list of messages
*/
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);
/**
* \brief Get the global event queue servicing this consumer corresponding to
@@ -430,6 +440,7 @@ public:
private:
static void rebalance_proxy(rd_kafka_t *handle, rd_kafka_resp_err_t error,
rd_kafka_topic_partition_list_t *partitions, void *opaque);
static Queue get_queue(rd_kafka_queue_t* handle);
void close();
void commit(const Message& msg, bool async);
void commit(const TopicPartitionList* topic_partitions, bool async);
@@ -440,6 +451,30 @@ private:
RebalanceErrorCallback rebalance_error_callback_;
};
// Implementations
template <typename Allocator>
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
const Allocator& alloc) {
return poll_batch(max_batch_size, get_timeout(), alloc);
}
template <typename Allocator>
std::vector<Message, Allocator> Consumer::poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) {
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle() , timeout.count(), raw_messages.data(),
raw_messages.size());
if (result == -1) {
check_error(rd_kafka_last_error());
// on the off-chance that check_error() does not throw an error
return std::vector<Message, Allocator>(alloc);
}
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
}
} // cppkafka
#endif // CPP_KAFKA_CONSUMER_H

View File

@@ -138,9 +138,14 @@ public:
*
* \param max_batch_size The max number of messages to consume if available
*
* \param alloc The optionally supplied allocator for the message list
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size) const;
template <typename Allocator>
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
const Allocator& alloc) const;
std::vector<Message> consume_batch(size_t max_batch_size) const;
/**
* \brief Consumes a batch of messages from this queue
@@ -151,9 +156,16 @@ public:
*
* \param timeout The timeout to be used on this call
*
* \param alloc The optionally supplied allocator for the message list
*
* \return A list of messages. Could be empty if there's nothing to consume
*/
MessageList consume_batch(size_t max_batch_size, std::chrono::milliseconds timeout) const;
template <typename Allocator>
std::vector<Message, Allocator> consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) const;
std::vector<Message> consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) const;
/**
* Indicates whether this queue is valid (not null)
@@ -178,6 +190,32 @@ private:
using QueueList = std::vector<Queue>;
template <typename Allocator>
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
const Allocator& alloc) const {
return consume_batch(max_batch_size, timeout_ms_, alloc);
}
template <typename Allocator>
std::vector<Message, Allocator> Queue::consume_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) const {
std::vector<rd_kafka_message_t*> raw_messages(max_batch_size);
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (result == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
}
return std::vector<Message, Allocator>(alloc);
}
// Build message list
return std::vector<Message, Allocator>(raw_messages.begin(), raw_messages.begin() + result, alloc);
}
} // cppkafka
#endif //CPPKAFKA_QUEUE_H

View File

@@ -83,7 +83,8 @@ namespace cppkafka {
* the messages *after* the ProduceSuccessCallback has reported a successful delivery to avoid memory
* corruptions.
*/
template <typename BufferType>
template <typename BufferType,
typename Allocator = std::allocator<ConcreteMessageBuilder<BufferType>>>
class CPPKAFKA_API BufferedProducer {
public:
enum class FlushMethod { Sync, ///< Empty the buffer and wait for acks from the broker
@@ -92,6 +93,7 @@ public:
* Concrete builder
*/
using Builder = ConcreteMessageBuilder<BufferType>;
using QueueType = std::deque<Builder, Allocator>;
/**
* Callback to indicate a message was delivered to the broker
@@ -115,8 +117,9 @@ public:
* \brief Constructs a buffered producer using the provided configuration
*
* \param config The configuration to be used on the actual Producer object
* \param alloc The optionally supplied allocator for the internal message buffer
*/
BufferedProducer(Configuration config);
BufferedProducer(Configuration config, const Allocator& alloc = Allocator());
/**
* \brief Adds a message to the producer's buffer.
@@ -390,7 +393,6 @@ protected:
#endif
private:
using QueueType = std::deque<Builder>;
enum class MessagePriority { Low, High };
enum class SenderType { Sync, Async };
@@ -466,28 +468,30 @@ Producer::PayloadPolicy get_default_payload_policy<Buffer>() {
return Producer::PayloadPolicy::PASSTHROUGH_PAYLOAD;
}
template <typename BufferType>
BufferedProducer<BufferType>::BufferedProducer(Configuration config)
: producer_(prepare_configuration(std::move(config))) {
template <typename BufferType, typename Allocator>
BufferedProducer<BufferType, Allocator>::BufferedProducer(Configuration config,
const Allocator& alloc)
: producer_(prepare_configuration(std::move(config))),
messages_(alloc) {
producer_.set_payload_policy(get_default_payload_policy<BufferType>());
#ifdef KAFKA_TEST_INSTANCE
test_params_ = nullptr;
#endif
}
template <typename BufferType>
void BufferedProducer<BufferType>::add_message(const MessageBuilder& builder) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(const MessageBuilder& builder) {
add_message(Builder(builder)); //make ConcreteBuilder
}
template <typename BufferType>
void BufferedProducer<BufferType>::add_message(Builder builder) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::add_message(Builder builder) {
add_tracker(SenderType::Async, builder);
do_add_message(move(builder), MessagePriority::Low, true);
}
template <typename BufferType>
void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
add_tracker(SenderType::Async, builder_clone);
@@ -498,8 +502,8 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::sync_produce(const MessageBuilder& builder) {
if (has_internal_data_) {
MessageBuilder builder_clone(builder.clone());
TrackerPtr tracker = add_tracker(SenderType::Sync, builder_clone);
@@ -519,13 +523,13 @@ void BufferedProducer<BufferType>::sync_produce(const MessageBuilder& builder) {
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::produce(const Message& message) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::produce(const Message& message) {
async_produce(MessageBuilder(message), true);
}
template <typename BufferType>
void BufferedProducer<BufferType>::async_flush() {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::async_flush() {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
{
@@ -538,8 +542,8 @@ void BufferedProducer<BufferType>::async_flush() {
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::flush(bool preserve_order) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::flush(bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
QueueType flush_queue; // flush from temporary queue
@@ -558,8 +562,8 @@ void BufferedProducer<BufferType>::flush(bool preserve_order) {
}
}
template <typename BufferType>
bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout,
template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::flush(std::chrono::milliseconds timeout,
bool preserve_order) {
if (preserve_order) {
CounterGuard<size_t> counter_guard(flushes_in_progress_);
@@ -582,8 +586,8 @@ bool BufferedProducer<BufferType>::flush(std::chrono::milliseconds timeout,
}
}
template <typename BufferType>
void BufferedProducer<BufferType>::wait_for_acks() {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::wait_for_acks() {
while (pending_acks_ > 0) {
try {
producer_.flush();
@@ -600,8 +604,8 @@ void BufferedProducer<BufferType>::wait_for_acks() {
}
}
template <typename BufferType>
bool BufferedProducer<BufferType>::wait_for_acks(std::chrono::milliseconds timeout) {
template <typename BufferType, typename Allocator>
bool BufferedProducer<BufferType, Allocator>::wait_for_acks(std::chrono::milliseconds timeout) {
auto remaining = timeout;
auto start_time = std::chrono::high_resolution_clock::now();
while ((pending_acks_ > 0) && (remaining.count() > 0)) {
@@ -625,47 +629,47 @@ bool BufferedProducer<BufferType>::wait_for_acks(std::chrono::milliseconds timeo
return (pending_acks_ == 0);
}
template <typename BufferType>
void BufferedProducer<BufferType>::clear() {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::clear() {
std::lock_guard<std::mutex> lock(mutex_);
QueueType tmp;
std::swap(tmp, messages_);
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_buffer_size() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_buffer_size() const {
return messages_.size();
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_max_buffer_size(ssize_t max_buffer_size) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_max_buffer_size(ssize_t max_buffer_size) {
if (max_buffer_size < -1) {
throw Exception("Invalid buffer size.");
}
max_buffer_size_ = max_buffer_size;
}
template <typename BufferType>
ssize_t BufferedProducer<BufferType>::get_max_buffer_size() const {
template <typename BufferType, typename Allocator>
ssize_t BufferedProducer<BufferType, Allocator>::get_max_buffer_size() const {
return max_buffer_size_;
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_flush_method(FlushMethod method) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_flush_method(FlushMethod method) {
flush_method_ = method;
}
template <typename BufferType>
typename BufferedProducer<BufferType>::FlushMethod
BufferedProducer<BufferType>::get_flush_method() const {
template <typename BufferType, typename Allocator>
typename BufferedProducer<BufferType, Allocator>::FlushMethod
BufferedProducer<BufferType, Allocator>::get_flush_method() const {
return flush_method_;
}
template <typename BufferType>
template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
MessagePriority priority,
bool do_flush) {
void BufferedProducer<BufferType, Allocator>::do_add_message(BuilderType&& builder,
MessagePriority priority,
bool do_flush) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (priority == MessagePriority::High) {
@@ -685,73 +689,73 @@ void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder,
}
}
template <typename BufferType>
Producer& BufferedProducer<BufferType>::get_producer() {
template <typename BufferType, typename Allocator>
Producer& BufferedProducer<BufferType, Allocator>::get_producer() {
return producer_;
}
template <typename BufferType>
const Producer& BufferedProducer<BufferType>::get_producer() const {
template <typename BufferType, typename Allocator>
const Producer& BufferedProducer<BufferType, Allocator>::get_producer() const {
return producer_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_pending_acks() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_pending_acks() const {
return pending_acks_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_produced() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_total_messages_produced() const {
return total_messages_produced_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_total_messages_dropped() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_total_messages_dropped() const {
return total_messages_dropped_;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_flushes_in_progress() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_flushes_in_progress() const {
return flushes_in_progress_;
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_max_number_retries(size_t max_number_retries) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_max_number_retries(size_t max_number_retries) {
if (!has_internal_data_ && (max_number_retries > 0)) {
has_internal_data_ = true; //enable once
}
max_number_retries_ = max_number_retries;
}
template <typename BufferType>
size_t BufferedProducer<BufferType>::get_max_number_retries() const {
template <typename BufferType, typename Allocator>
size_t BufferedProducer<BufferType, Allocator>::get_max_number_retries() const {
return max_number_retries_;
}
template <typename BufferType>
typename BufferedProducer<BufferType>::Builder
BufferedProducer<BufferType>::make_builder(std::string topic) {
template <typename BufferType, typename Allocator>
typename BufferedProducer<BufferType, Allocator>::Builder
BufferedProducer<BufferType, Allocator>::make_builder(std::string topic) {
return Builder(std::move(topic));
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_produce_failure_callback(ProduceFailureCallback callback) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_failure_callback(ProduceFailureCallback callback) {
produce_failure_callback_ = std::move(callback);
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_produce_success_callback(ProduceSuccessCallback callback) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_produce_success_callback(ProduceSuccessCallback callback) {
produce_success_callback_ = std::move(callback);
}
template <typename BufferType>
void BufferedProducer<BufferType>::set_flush_failure_callback(FlushFailureCallback callback) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::set_flush_failure_callback(FlushFailureCallback callback) {
flush_failure_callback_ = std::move(callback);
}
template <typename BufferType>
template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType>::produce_message(BuilderType&& builder) {
void BufferedProducer<BufferType, Allocator>::produce_message(BuilderType&& builder) {
using builder_type = typename std::decay<BuilderType>::type;
while (true) {
try {
@@ -774,9 +778,9 @@ void BufferedProducer<BufferType>::produce_message(BuilderType&& builder) {
}
}
template <typename BufferType>
template <typename BufferType, typename Allocator>
template <typename BuilderType>
void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool throw_on_error) {
void BufferedProducer<BufferType, Allocator>::async_produce(BuilderType&& builder, bool throw_on_error) {
try {
TestParameters* test_params = get_test_parameters();
if (test_params && test_params->force_produce_error_) {
@@ -802,16 +806,16 @@ void BufferedProducer<BufferType>::async_produce(BuilderType&& builder, bool thr
}
}
template <typename BufferType>
Configuration BufferedProducer<BufferType>::prepare_configuration(Configuration config) {
template <typename BufferType, typename Allocator>
Configuration BufferedProducer<BufferType, Allocator>::prepare_configuration(Configuration config) {
using std::placeholders::_2;
auto callback = std::bind(&BufferedProducer<BufferType>::on_delivery_report, this, _2);
auto callback = std::bind(&BufferedProducer<BufferType, Allocator>::on_delivery_report, this, _2);
config.set_delivery_report_callback(std::move(callback));
return config;
}
template <typename BufferType>
void BufferedProducer<BufferType>::on_delivery_report(const Message& message) {
template <typename BufferType, typename Allocator>
void BufferedProducer<BufferType, Allocator>::on_delivery_report(const Message& message) {
//Get tracker data
TestParameters* test_params = get_test_parameters();
TrackerPtr tracker = has_internal_data_ ?

View File

@@ -108,7 +108,7 @@ struct PollInterface {
* otherwise the broker will think this consumer is down and will trigger a rebalance
* (if using dynamic subscription)
*/
virtual MessageList poll_batch(size_t max_batch_size) = 0;
virtual std::vector<Message> poll_batch(size_t max_batch_size) = 0;
/**
* \brief Polls all assigned partitions for a batch of new messages in round-robin fashion
@@ -122,7 +122,7 @@ struct PollInterface {
*
* \return A list of messages
*/
virtual MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout) = 0;
virtual std::vector<Message> poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout) = 0;
};
} //cppkafka

View File

@@ -102,14 +102,21 @@ public:
/**
* \sa PollInterface::poll_batch
*/
MessageList poll_batch(size_t max_batch_size) override;
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size) override;
/**
* \sa PollInterface::poll_batch
*/
MessageList poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) override;
template <typename Allocator>
std::vector<Message, Allocator> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc);
std::vector<Message> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout) override;
protected:
/**
* \sa PollStrategyBase::reset_state
@@ -119,10 +126,12 @@ protected:
QueueData& get_next_queue();
private:
template <typename Allocator>
void consume_batch(Queue& queue,
MessageList& messages,
std::vector<Message, Allocator>& messages,
ssize_t& count,
std::chrono::milliseconds timeout);
std::chrono::milliseconds timeout,
const Allocator& alloc);
void restore_forwarding();
@@ -130,6 +139,53 @@ private:
QueueMap::iterator queue_iter_;
};
// Implementations
template <typename Allocator>
std::vector<Message, Allocator> RoundRobinPollStrategy::poll_batch(size_t max_batch_size,
const Allocator& alloc) {
return poll_batch(max_batch_size, get_consumer().get_timeout(), alloc);
}
template <typename Allocator>
std::vector<Message, Allocator> RoundRobinPollStrategy::poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout,
const Allocator& alloc) {
std::vector<Message, Allocator> messages(alloc);
ssize_t count = max_batch_size;
// batch from the group event queue first (non-blocking)
consume_batch(get_consumer_queue().queue, messages, count, std::chrono::milliseconds(0), alloc);
size_t num_queues = get_partition_queues().size();
while ((count > 0) && (num_queues--)) {
// batch from the next partition (non-blocking)
consume_batch(get_next_queue().queue, messages, count, std::chrono::milliseconds(0), alloc);
}
// we still have space left in the buffer
if (count > 0) {
// wait on the event queue until timeout
consume_batch(get_consumer_queue().queue, messages, count, timeout, alloc);
}
return messages;
}
template <typename Allocator>
void RoundRobinPollStrategy::consume_batch(Queue& queue,
std::vector<Message, Allocator>& messages,
ssize_t& count,
std::chrono::milliseconds timeout,
const Allocator& alloc) {
std::vector<Message, Allocator> queue_messages = queue.consume_batch(count, timeout, alloc);
if (queue_messages.empty()) {
return;
}
// concatenate both lists
messages.insert(messages.end(),
make_move_iterator(queue_messages.begin()),
make_move_iterator(queue_messages.end()));
// reduce total batch count
count -= queue_messages.size();
}
} //cppkafka
#endif //CPPKAFKA_ROUNDROBIN_POLL_STRATEGY_H

View File

@@ -44,12 +44,13 @@ using std::ostringstream;
using std::chrono::milliseconds;
using std::toupper;
using std::equal;
using std::allocator;
namespace cppkafka {
// See: https://github.com/edenhill/librdkafka/issues/1792
const int rd_kafka_queue_refcount_bug_version = 0x000b0500;
Queue get_queue(rd_kafka_queue_t* handle) {
Queue Consumer::get_queue(rd_kafka_queue_t* handle) {
if (rd_kafka_version() <= rd_kafka_queue_refcount_bug_version) {
return Queue::make_non_owning(handle);
}
@@ -255,22 +256,12 @@ Message Consumer::poll(milliseconds timeout) {
return rd_kafka_consumer_poll(get_handle(), static_cast<int>(timeout.count()));
}
MessageList Consumer::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_timeout());
std::vector<Message> Consumer::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_timeout(), allocator<Message>());
}
MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
// Note that this will leak the queue when using rdkafka < 0.11.5 (see get_queue comment)
Queue queue(get_queue(rd_kafka_queue_get_consumer(get_handle())));
ssize_t result = rd_kafka_consume_batch_queue(queue.get_handle() , timeout.count(), raw_messages.data(),
raw_messages.size());
if (result == -1) {
check_error(rd_kafka_last_error());
// on the off-chance that check_error() does not throw an error
return MessageList();
}
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
std::vector<Message> Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
return poll_batch(max_batch_size, timeout, allocator<Message>());
}
Queue Consumer::get_main_queue() const {

View File

@@ -32,6 +32,7 @@
using std::vector;
using std::exception;
using std::chrono::milliseconds;
using std::allocator;
namespace cppkafka {
@@ -94,25 +95,13 @@ Message Queue::consume(milliseconds timeout) const {
return Message(rd_kafka_consume_queue(handle_.get(), static_cast<int>(timeout.count())));
}
MessageList Queue::consume_batch(size_t max_batch_size) const {
return consume_batch(max_batch_size, timeout_ms_);
std::vector<Message> Queue::consume_batch(size_t max_batch_size) const {
return consume_batch(max_batch_size, timeout_ms_, allocator<Message>());
}
MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (result == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
}
return MessageList();
}
// Build message list
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
std::vector<Message> Queue::consume_batch(size_t max_batch_size,
milliseconds timeout) const {
return consume_batch(max_batch_size, timeout, allocator<Message>());
}
} //cppkafka

View File

@@ -32,6 +32,7 @@
using std::string;
using std::chrono::milliseconds;
using std::make_move_iterator;
using std::allocator;
namespace cppkafka {
@@ -67,46 +68,15 @@ Message RoundRobinPollStrategy::poll(milliseconds timeout) {
return get_consumer_queue().queue.consume(timeout);
}
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_consumer().get_timeout());
std::vector<Message> RoundRobinPollStrategy::poll_batch(size_t max_batch_size) {
return poll_batch(max_batch_size, get_consumer().get_timeout(), allocator<Message>());
}
MessageList RoundRobinPollStrategy::poll_batch(size_t max_batch_size, milliseconds timeout) {
MessageList messages;
ssize_t count = max_batch_size;
// batch from the group event queue first (non-blocking)
consume_batch(get_consumer_queue().queue, messages, count, milliseconds(0));
size_t num_queues = get_partition_queues().size();
while ((count > 0) && (num_queues--)) {
// batch from the next partition (non-blocking)
consume_batch(get_next_queue().queue, messages, count, milliseconds(0));
}
// we still have space left in the buffer
if (count > 0) {
// wait on the event queue until timeout
consume_batch(get_consumer_queue().queue, messages, count, timeout);
}
return messages;
std::vector<Message> RoundRobinPollStrategy::poll_batch(size_t max_batch_size,
milliseconds timeout) {
return poll_batch(max_batch_size, timeout, allocator<Message>());
}
void RoundRobinPollStrategy::consume_batch(Queue& queue,
MessageList& messages,
ssize_t& count,
milliseconds timeout) {
MessageList queue_messages = queue.consume_batch(count, timeout);
if (queue_messages.empty()) {
return;
}
// concatenate both lists
messages.insert(messages.end(),
make_move_iterator(queue_messages.begin()),
make_move_iterator(queue_messages.end()));
// reduce total batch count
count -= queue_messages.size();
}
void RoundRobinPollStrategy::restore_forwarding() {
// forward all partition queues
for (const auto& toppar : get_partition_queues()) {

View File

@@ -48,9 +48,9 @@ public:
void delete_polling_strategy();
Message poll();
Message poll(std::chrono::milliseconds timeout);
MessageList poll_batch(size_t max_batch_size);
MessageList poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);
std::vector<Message> poll_batch(size_t max_batch_size);
std::vector<Message> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);
void set_timeout(std::chrono::milliseconds timeout);
std::chrono::milliseconds get_timeout();
private:

View File

@@ -19,7 +19,6 @@ using cppkafka::Consumer;
using cppkafka::BasicConsumerDispatcher;
using cppkafka::Message;
using cppkafka::MessageList;
using cppkafka::TopicPartition;
//==================================================================================
@@ -89,7 +88,7 @@ BasicConsumerRunner<ConsumerType>::~BasicConsumerRunner() {
}
template <typename ConsumerType>
const MessageList& BasicConsumerRunner<ConsumerType>::get_messages() const {
const std::vector<Message>& BasicConsumerRunner<ConsumerType>::get_messages() const {
return messages_;
}
@@ -135,7 +134,7 @@ Message PollStrategyAdapter::poll(milliseconds timeout) {
}
inline
MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size) {
std::vector<Message> PollStrategyAdapter::poll_batch(size_t max_batch_size) {
if (strategy_) {
return strategy_->poll_batch(max_batch_size);
}
@@ -143,8 +142,8 @@ MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size) {
}
inline
MessageList PollStrategyAdapter::poll_batch(size_t max_batch_size,
milliseconds timeout) {
std::vector<Message> PollStrategyAdapter::poll_batch(size_t max_batch_size,
milliseconds timeout) {
if (strategy_) {
return strategy_->poll_batch(max_batch_size, timeout);
}