changes as per code review

This commit is contained in:
accelerated
2018-05-01 14:49:09 -04:00
parent 71afaba3e1
commit a1ce130bfd
7 changed files with 110 additions and 111 deletions

View File

@@ -30,7 +30,6 @@
#include "utils/roundrobin_poll_adapter.h"
using std::string;
using std::make_pair;
using std::chrono::milliseconds;
using std::make_move_iterator;
@@ -38,31 +37,31 @@ namespace cppkafka {
RoundRobinPollAdapter::RoundRobinPollAdapter(Consumer& consumer)
: consumer_(consumer),
assignment_callback_(consumer.get_assignment_callback()),
revocation_callback_(consumer.get_revocation_callback()),
rebalance_error_callback_(consumer.get_rebalance_error_callback()),
consumer_queue_(consumer.get_consumer_queue()) {
// get all currently active partition assignments
TopicPartitionList assignment = consumer_.get_assignment();
on_assignment(assignment);
// take over the assignment callback
assignment_callback_ = consumer.get_assignment_callback();
consumer_.set_assignment_callback([this](TopicPartitionList& partitions) {
on_assignment(partitions);
});
// take over the revocation callback
revocation_callback_ = consumer.get_revocation_callback();
consumer_.set_revocation_callback([this](const TopicPartitionList& partitions) {
on_revocation(partitions);
});
// take over the rebalance error callback
rebalance_error_callback_ = consumer.get_rebalance_error_callback();
consumer_.set_rebalance_error_callback([this](Error error) {
on_rebalance_error(error);
});
// make sure we don't have any active subscriptions
if (!consumer_.get_subscription().empty()) {
throw ConsumerException(RD_KAFKA_RESP_ERR__EXISTING_SUBSCRIPTION);
}
}
RoundRobinPollAdapter::~RoundRobinPollAdapter() {
restore_forwarding();
//set the original callbacks
//reset the original callbacks
consumer_.set_assignment_callback(assignment_callback_);
consumer_.set_revocation_callback(revocation_callback_);
consumer_.set_rebalance_error_callback(rebalance_error_callback_);
@@ -81,17 +80,20 @@ Message RoundRobinPollAdapter::poll() {
}
Message RoundRobinPollAdapter::poll(milliseconds timeout) {
bool empty_list = partition_queues_.ref().empty();
// Poll group event queue first
Message message = consumer_queue_.consume(empty_list ? timeout : milliseconds(0));
if (message) {
return message;
size_t num_queues = partition_queues_.get_queues().size();
// Always give priority to group and global events
Message message = consumer_queue_.consume(num_queues ? milliseconds(0) : timeout);
if (!message) {
while (num_queues--) {
//consume the next partition
message = partition_queues_.get_next_queue().consume();
if (message) {
return message;
}
}
}
if (!empty_list) {
//consume the next partition
message = partition_queues_.next().consume(timeout);
}
return message;
// wait on the next queue
return partition_queues_.get_next_queue().consume(timeout);
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) {
@@ -99,42 +101,42 @@ MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size) {
}
MessageList RoundRobinPollAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) {
bool empty_list = partition_queues_.ref().empty();
ssize_t remaining_count = max_batch_size;
size_t num_queues = partition_queues_.get_queues().size();
ssize_t count = max_batch_size;
// batch from the group event queue first
MessageList messages = consumer_queue_.consume_batch(remaining_count,
empty_list ? timeout : milliseconds(0));
remaining_count -= messages.size();
if ((remaining_count <= 0) || empty_list) {
// the entire batch was filled
return messages;
MessageList messages = consumer_queue_.consume_batch(count, num_queues ? milliseconds(0) : timeout);
count -= messages.size();
while ((count > 0) && (num_queues--)) {
// batch from the next partition
consume_batch(messages, count, milliseconds(0));
}
// batch from the next partition
MessageList partition_messages = partition_queues_.next().consume_batch(remaining_count, timeout);
if (messages.empty()) {
return partition_messages;
if (count > 0) {
// wait on the next queue
consume_batch(messages, count, timeout);
}
return messages;
}
void RoundRobinPollAdapter::consume_batch(MessageList& messages, ssize_t& count, milliseconds timeout)
{
MessageList partition_messages = partition_queues_.get_next_queue().consume_batch(count, timeout);
if (partition_messages.empty()) {
return messages;
return;
}
// concatenate both lists
messages.reserve(messages.size() + partition_messages.size());
messages.insert(messages.end(),
make_move_iterator(partition_messages.begin()),
make_move_iterator(partition_messages.end()));
return messages;
}
size_t RoundRobinPollAdapter::get_num_partitions() {
return partition_queues_.ref().size();
// reduce total batch count
count -= partition_messages.size();
}
void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) {
// populate partition queues
for (const auto& partition : partitions) {
// get the queue associated with this partition
CircularBuffer::toppar_t key = make_pair(partition.get_topic(), partition.get_partition());
partition_queues_.ref().emplace(key, consumer_.get_partition_queue(partition));
partition_queues_.get_queues().emplace(partition, consumer_.get_partition_queue(partition));
}
// reset the queue iterator
partition_queues_.rewind();
@@ -147,13 +149,12 @@ void RoundRobinPollAdapter::on_assignment(TopicPartitionList& partitions) {
void RoundRobinPollAdapter::on_revocation(const TopicPartitionList& partitions) {
for (const auto& partition : partitions) {
// get the queue associated with this partition
CircularBuffer::toppar_t key = make_pair(partition.get_topic(), partition.get_partition());
auto qit = partition_queues_.ref().find(key);
if (qit != partition_queues_.ref().end()) {
auto qit = partition_queues_.get_queues().find(partition);
if (qit != partition_queues_.get_queues().end()) {
// restore forwarding on this queue
qit->second.forward_to_queue(consumer_queue_);
// remove this queue from the list
partition_queues_.ref().erase(qit);
partition_queues_.get_queues().erase(qit);
}
}
// reset the queue iterator
@@ -174,7 +175,7 @@ void RoundRobinPollAdapter::on_rebalance_error(Error error) {
void RoundRobinPollAdapter::restore_forwarding() {
// forward all partition queues
for (const auto& toppar_queue : partition_queues_.ref()) {
for (const auto& toppar_queue : partition_queues_.get_queues()) {
toppar_queue.second.forward_to_queue(consumer_queue_);
}
}