Add ConsumerDispatcher class

This commit is contained in:
Matias Fontanini
2017-06-10 15:25:28 -07:00
parent 67022dbb68
commit f0ec0bfb10
6 changed files with 331 additions and 18 deletions

View File

@@ -2,6 +2,7 @@
#include <chrono>
#include <condition_variable>
#include "test_utils.h"
#include "cppkafka/utils/consumer_dispatcher.h"
using std::vector;
using std::move;
@@ -16,7 +17,9 @@ using std::chrono::milliseconds;
using std::chrono::seconds;
using cppkafka::Consumer;
using cppkafka::ConsumerDispatcher;
using cppkafka::Message;
using cppkafka::TopicPartition;
ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) {
@@ -27,27 +30,38 @@ ConsumerRunner::ConsumerRunner(Consumer& consumer, size_t expected, size_t parti
consumer_.set_timeout(milliseconds(500));
size_t number_eofs = 0;
auto start = system_clock::now();
while (system_clock::now() - start < seconds(20)) {
if (expected > 0 && messages_.size() == expected) {
break;
}
if (expected == 0 && number_eofs >= partitions) {
break;
}
Message msg = consumer_.poll();
if (msg && number_eofs != partitions &&
msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
number_eofs++;
ConsumerDispatcher dispatcher(consumer_);
dispatcher.run(
// Message callback
[&](Message msg) {
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
messages_.push_back(move(msg));
}
},
// EOF callback
[&](const TopicPartition& topic_partition) {
if (number_eofs != partitions) {
number_eofs++;
if (number_eofs == partitions) {
lock_guard<mutex> _(mtx);
booted = true;
cond.notify_one();
}
}
},
// Timeout callback
[&]() {
if (expected > 0 && messages_.size() == expected) {
dispatcher.stop();
}
if (expected == 0 && number_eofs >= partitions) {
dispatcher.stop();
}
if (system_clock::now() - start >= seconds(20)) {
dispatcher.stop();
}
}
else if (msg && !msg.get_error() && number_eofs == partitions) {
messages_.push_back(move(msg));
}
}
);
if (number_eofs < partitions) {
lock_guard<mutex> _(mtx);
booted = true;