From 169ea4f8ed7ae451000cdf9bd02c57c2dcc85628 Mon Sep 17 00:00:00 2001 From: accelerated <> Date: Wed, 2 May 2018 15:31:34 -0400 Subject: [PATCH] Fixes to queue polling and making them non-owning --- .../cppkafka/utils/roundrobin_poll_adapter.h | 5 +- src/consumer.cpp | 12 ++-- src/queue.cpp | 12 ++-- src/utils/roundrobin_poll_adapter.cpp | 68 ++++++++++--------- 4 files changed, 52 insertions(+), 45 deletions(-) diff --git a/include/cppkafka/utils/roundrobin_poll_adapter.h b/include/cppkafka/utils/roundrobin_poll_adapter.h index 7a7fe9f..e0284fb 100644 --- a/include/cppkafka/utils/roundrobin_poll_adapter.h +++ b/include/cppkafka/utils/roundrobin_poll_adapter.h @@ -164,7 +164,10 @@ public: MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout); private: - void consume_batch(MessageList& messages, ssize_t& count, std::chrono::milliseconds timeout); + void consume_batch(Queue& queue, + MessageList& messages, + ssize_t& count, + std::chrono::milliseconds timeout); class CircularBuffer { public: diff --git a/src/consumer.cpp b/src/consumer.cpp index dfa55a5..ce84092 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -249,23 +249,23 @@ MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) { // on the off-chance that check_error() does not throw an error return MessageList(); } - return MessageList(raw_messages.begin(), raw_messages.end()); + return MessageList(raw_messages.begin(), raw_messages.begin() + result); } Queue Consumer::get_main_queue() const { - Queue queue(rd_kafka_queue_get_main(get_handle())); + Queue queue(Queue::make_non_owning(rd_kafka_queue_get_main(get_handle()))); queue.disable_queue_forwarding(); return queue; } Queue Consumer::get_consumer_queue() const { - return rd_kafka_queue_get_consumer(get_handle()); + return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle())); } Queue Consumer::get_partition_queue(const TopicPartition& partition) const { - Queue queue(rd_kafka_queue_get_partition(get_handle(), - partition.get_topic().c_str(), - partition.get_partition())); + Queue queue(Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(), + partition.get_topic().c_str(), + partition.get_partition()))); queue.disable_queue_forwarding(); return queue; } diff --git a/src/queue.cpp b/src/queue.cpp index a15f266..7e220e5 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -100,11 +100,11 @@ MessageList Queue::consume_batch(size_t max_batch_size) const { MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const { vector raw_messages(max_batch_size); - ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(), - static_cast(timeout.count()), - raw_messages.data(), - raw_messages.size()); - if (num_messages == -1) { + ssize_t result = rd_kafka_consume_batch_queue(handle_.get(), + static_cast(timeout.count()), + raw_messages.data(), + raw_messages.size()); + if (result == -1) { rd_kafka_resp_err_t error = rd_kafka_last_error(); if (error != RD_KAFKA_RESP_ERR_NO_ERROR) { throw QueueException(error); @@ -112,7 +112,7 @@ MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) co return MessageList(); } // Build message list - return MessageList(raw_messages.begin(), raw_messages.end()); + return MessageList(raw_messages.begin(), raw_messages.begin() + result); } } //cppkafka diff --git a/src/utils/roundrobin_poll_adapter.cpp b/src/utils/roundrobin_poll_adapter.cpp index 10b2d7b..158339d 100644 --- a/src/utils/roundrobin_poll_adapter.cpp +++ b/src/utils/roundrobin_poll_adapter.cpp @@ -80,20 +80,21 @@ Message RoundRobinPollAdapter::poll() { } Message RoundRobinPollAdapter::poll(milliseconds timeout) { - size_t num_queues = partition_queues_.get_queues().size(); // Always give priority to group and global events - Message message = consumer_queue_.consume(num_queues ? milliseconds(0) : timeout); - if (!message) { - while (num_queues--) { - //consume the next partition - message = partition_queues_.get_next_queue().consume(); - if (message) { - return message; - } + Message message = consumer_queue_.consume(milliseconds(0)); + if (message) { + return message; + } + size_t num_queues = partition_queues_.get_queues().size(); + while (num_queues--) { + //consume the next partition (non-blocking) + message = partition_queues_.get_next_queue().consume(milliseconds(0)); + if (message) { + return message; } } - // wait on the next queue - return partition_queues_.get_next_queue().consume(timeout); + // We still don't have a valid message so we block on the event queue + return consumer_queue_.consume(timeout); } MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) { @@ -101,34 +102,39 @@ MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) { } MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) { - size_t num_queues = partition_queues_.get_queues().size(); + MessageList messages; ssize_t count = max_batch_size; - // batch from the group event queue first - MessageList messages = consumer_queue_.consume_batch(count, num_queues ? milliseconds(0) : timeout); - count -= messages.size(); + + // batch from the group event queue first (non-blocking) + consume_batch(consumer_queue_, messages, count, milliseconds(0)); + size_t num_queues = partition_queues_.get_queues().size(); while ((count > 0) && (num_queues--)) { - // batch from the next partition - consume_batch(messages, count, milliseconds(0)); + // batch from the next partition (non-blocking) + consume_batch(partition_queues_.get_next_queue(), messages, count, milliseconds(0)); } + // we still have space left in the buffer if (count > 0) { - // wait on the next queue - consume_batch(messages, count, timeout); + // wait on the event queue until timeout + consume_batch(consumer_queue_, messages, count, timeout); } return messages; } -void RoundRobinPollAdapter::consume_batch(MessageList& messages, ssize_t& count, milliseconds timeout) +void RoundRobinPollAdapter::consume_batch(Queue& queue, + MessageList& messages, + ssize_t& count, + milliseconds timeout) { - MessageList partition_messages = partition_queues_.get_next_queue().consume_batch(count, timeout); - if (partition_messages.empty()) { + MessageList queue_messages = queue.consume_batch(count, timeout); + if (queue_messages.empty()) { return; } // concatenate both lists messages.insert(messages.end(), - make_move_iterator(partition_messages.begin()), - make_move_iterator(partition_messages.end())); + make_move_iterator(queue_messages.begin()), + make_move_iterator(queue_messages.end())); // reduce total batch count - count -= partition_messages.size(); + count -= queue_messages.size(); } void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) { @@ -148,12 +154,10 @@ void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) { void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) { for (const auto& partition : partitions) { // get the queue associated with this partition - auto qit = partition_queues_.get_queues().find(partition); - if (qit != partition_queues_.get_queues().end()) { - // restore forwarding on this queue - qit->second.forward_to_queue(consumer_queue_); + auto toppar_it = partition_queues_.get_queues().find(partition); + if (toppar_it != partition_queues_.get_queues().end()) { // remove this queue from the list - partition_queues_.get_queues().erase(qit); + partition_queues_.get_queues().erase(toppar_it); } } // reset the queue iterator @@ -174,8 +178,8 @@ void RoundRobinPollAdapter::on_rebalance_error(Error error) { void RoundRobinPollAdapter::restore_forwarding() { // forward all partition queues - for (const auto& toppar_queue : partition_queues_.get_queues()) { - toppar_queue.second.forward_to_queue(consumer_queue_); + for (const auto& toppar : partition_queues_.get_queues()) { + toppar.second.forward_to_queue(consumer_queue_); } }