Fixes to queue polling and making them non-owning

This commit is contained in:
accelerated
2018-05-02 15:31:34 -04:00
parent 65f35dcd39
commit 169ea4f8ed
4 changed files with 52 additions and 45 deletions

View File

@@ -164,7 +164,10 @@ public:
MessageList poll_batch(size_t max_batch_size, std::chrono::milliseconds timeout);
private:
void consume_batch(MessageList& messages, ssize_t& count, std::chrono::milliseconds timeout);
void consume_batch(Queue& queue,
MessageList& messages,
ssize_t& count,
std::chrono::milliseconds timeout);
class CircularBuffer {
public:

View File

@@ -249,23 +249,23 @@ MessageList Consumer::poll_batch(size_t max_batch_size, milliseconds timeout) {
// on the off-chance that check_error() does not throw an error
return MessageList();
}
return MessageList(raw_messages.begin(), raw_messages.end());
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
}
Queue Consumer::get_main_queue() const {
Queue queue(rd_kafka_queue_get_main(get_handle()));
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_main(get_handle())));
queue.disable_queue_forwarding();
return queue;
}
Queue Consumer::get_consumer_queue() const {
return rd_kafka_queue_get_consumer(get_handle());
return Queue::make_non_owning(rd_kafka_queue_get_consumer(get_handle()));
}
Queue Consumer::get_partition_queue(const TopicPartition& partition) const {
Queue queue(rd_kafka_queue_get_partition(get_handle(),
partition.get_topic().c_str(),
partition.get_partition()));
Queue queue(Queue::make_non_owning(rd_kafka_queue_get_partition(get_handle(),
partition.get_topic().c_str(),
partition.get_partition())));
queue.disable_queue_forwarding();
return queue;
}

View File

@@ -100,11 +100,11 @@ MessageList Queue::consume_batch(size_t max_batch_size) const {
MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) const {
vector<rd_kafka_message_t*> raw_messages(max_batch_size);
ssize_t num_messages = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (num_messages == -1) {
ssize_t result = rd_kafka_consume_batch_queue(handle_.get(),
static_cast<int>(timeout.count()),
raw_messages.data(),
raw_messages.size());
if (result == -1) {
rd_kafka_resp_err_t error = rd_kafka_last_error();
if (error != RD_KAFKA_RESP_ERR_NO_ERROR) {
throw QueueException(error);
@@ -112,7 +112,7 @@ MessageList Queue::consume_batch(size_t max_batch_size, milliseconds timeout) co
return MessageList();
}
// Build message list
return MessageList(raw_messages.begin(), raw_messages.end());
return MessageList(raw_messages.begin(), raw_messages.begin() + result);
}
} //cppkafka

View File

@@ -80,20 +80,21 @@ Message RoundRobinPollAdapter::poll() {
}
Message RoundRobinPollAdapter::poll(milliseconds timeout) {
size_t num_queues = partition_queues_.get_queues().size();
// Always give priority to group and global events
Message message = consumer_queue_.consume(num_queues ? milliseconds(0) : timeout);
if (!message) {
while (num_queues--) {
//consume the next partition
message = partition_queues_.get_next_queue().consume();
if (message) {
return message;
}
Message message = consumer_queue_.consume(milliseconds(0));
if (message) {
return message;
}
size_t num_queues = partition_queues_.get_queues().size();
while (num_queues--) {
//consume the next partition (non-blocking)
message = partition_queues_.get_next_queue().consume(milliseconds(0));
if (message) {
return message;
}
}
// wait on the next queue
return partition_queues_.get_next_queue().consume(timeout);
// We still don't have a valid message so we block on the event queue
return consumer_queue_.consume(timeout);
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) {
@@ -101,34 +102,39 @@ MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) {
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) {
size_t num_queues = partition_queues_.get_queues().size();
MessageList messages;
ssize_t count = max_batch_size;
// batch from the group event queue first
MessageList messages = consumer_queue_.consume_batch(count, num_queues ? milliseconds(0) : timeout);
count -= messages.size();
// batch from the group event queue first (non-blocking)
consume_batch(consumer_queue_, messages, count, milliseconds(0));
size_t num_queues = partition_queues_.get_queues().size();
while ((count > 0) && (num_queues--)) {
// batch from the next partition
consume_batch(messages, count, milliseconds(0));
// batch from the next partition (non-blocking)
consume_batch(partition_queues_.get_next_queue(), messages, count, milliseconds(0));
}
// we still have space left in the buffer
if (count > 0) {
// wait on the next queue
consume_batch(messages, count, timeout);
// wait on the event queue until timeout
consume_batch(consumer_queue_, messages, count, timeout);
}
return messages;
}
void RoundRobinPollAdapter::consume_batch(MessageList& messages, ssize_t& count, milliseconds timeout)
void RoundRobinPollAdapter::consume_batch(Queue& queue,
MessageList& messages,
ssize_t& count,
milliseconds timeout)
{
MessageList partition_messages = partition_queues_.get_next_queue().consume_batch(count, timeout);
if (partition_messages.empty()) {
MessageList queue_messages = queue.consume_batch(count, timeout);
if (queue_messages.empty()) {
return;
}
// concatenate both lists
messages.insert(messages.end(),
make_move_iterator(partition_messages.begin()),
make_move_iterator(partition_messages.end()));
make_move_iterator(queue_messages.begin()),
make_move_iterator(queue_messages.end()));
// reduce total batch count
count -= partition_messages.size();
count -= queue_messages.size();
}
void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) {
@@ -148,12 +154,10 @@ void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) {
void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) {
for (const auto& partition : partitions) {
// get the queue associated with this partition
auto qit = partition_queues_.get_queues().find(partition);
if (qit != partition_queues_.get_queues().end()) {
// restore forwarding on this queue
qit->second.forward_to_queue(consumer_queue_);
auto toppar_it = partition_queues_.get_queues().find(partition);
if (toppar_it != partition_queues_.get_queues().end()) {
// remove this queue from the list
partition_queues_.get_queues().erase(qit);
partition_queues_.get_queues().erase(toppar_it);
}
}
// reset the queue iterator
@@ -174,8 +178,8 @@ void RoundRobinPollAdapter::on_rebalance_error(Error error) {
void RoundRobinPollAdapter::restore_forwarding() {
// forward all partition queues
for (const auto& toppar_queue : partition_queues_.get_queues()) {
toppar_queue.second.forward_to_queue(consumer_queue_);
for (const auto& toppar : partition_queues_.get_queues()) {
toppar.second.forward_to_queue(consumer_queue_);
}
}