Files
wlan-cloud-lib-cppkafka/tests/consumer_test.cpp
Matias Fontanini cb2c8877d8 Move tests to use catch instead of googletest (#56)
* Port buffer test to use Catch2

* Move compacted topic processor test to Catch2

* Move configuration tests to Catch2

* Rename configuration test cases

* Move topic partition list test to Catch2

* Move handle base tests to Catch2

* Move producer tests to Catch2

* Move consumer tests to catch2

* Use CHECK on tests when appropriate

* Remove googletest

* Show tests' progress as they run

* Update message when Catch2 is not checked out

* Remove references to googletest

* Run cppkafka_tests manually on travis

* Print amount of time taken by each test case
2018-04-24 03:20:48 +01:00

243 lines
8.0 KiB
C++

#include <vector>
#include <thread>
#include <set>
#include <mutex>
#include <chrono>
#include <iterator>
#include <condition_variable>
#include <catch.hpp>
#include "cppkafka/consumer.h"
#include "cppkafka/producer.h"
#include "cppkafka/utils/consumer_dispatcher.h"
#include "cppkafka/utils/buffered_producer.h"
#include "test_utils.h"
using std::vector;
using std::move;
using std::string;
using std::thread;
using std::set;
using std::mutex;
using std::tie;
using std::condition_variable;
using std::lock_guard;
using std::unique_lock;
using std::make_move_iterator;
using std::chrono::seconds;
using std::chrono::milliseconds;
using std::chrono::system_clock;
using namespace cppkafka;
const string KAFKA_TOPIC = "cppkafka_test1";
static Configuration make_producer_config() {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
return config;
}
static Configuration make_consumer_config(const string& group_id = "consumer_test") {
Configuration config;
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
config.set("enable.auto.commit", false);
config.set("group.id", group_id);
return config;
}
TEST_CASE("message consumption", "[consumer]") {
TopicPartitionList assignment;
int partition = 0;
// Create a consumer and subscribe to the topic
Consumer consumer(make_consumer_config());
consumer.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
assignment = topic_partitions;
});
consumer.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner(consumer, 1, 3);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner.try_join();
// All 3 partitions should be ours
REQUIRE(assignment.size() == 3);
set<int> partitions = { 0, 1, 2 };
for (const auto& topic_partition : assignment) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
REQUIRE(runner.get_messages().size() == 1);
CHECK(consumer.get_subscription() == vector<string>{ KAFKA_TOPIC });
assignment = consumer.get_assignment();
CHECK(assignment.size() == 3);
int64_t low;
int64_t high;
tie(low, high) = consumer.get_offsets({ KAFKA_TOPIC, partition });
CHECK(high > low);
CHECK(runner.get_messages().back().get_offset() + 1 == high);
}
TEST_CASE("consumer rebalance", "[consumer]") {
TopicPartitionList assignment1;
TopicPartitionList assignment2;
bool revocation_called = false;
int partition = 0;
// Create a consumer and subscribe to the topic
Consumer consumer1(make_consumer_config());
consumer1.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
assignment1 = topic_partitions;
});
consumer1.set_revocation_callback([&](const TopicPartitionList&) {
revocation_called = true;
});
consumer1.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner1(consumer1, 1, 3);
// Create a second consumer and subscribe to the topic
Consumer consumer2(make_consumer_config());
consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) {
assignment2 = topic_partitions;
});
consumer2.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner2(consumer2, 1, 1);
CHECK(revocation_called == true);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner1.try_join();
runner2.try_join();
// All 3 partitions should be assigned
CHECK(assignment1.size() + assignment2.size() == 3);
set<int> partitions = { 0, 1, 2 };
for (const auto& topic_partition : assignment1) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
for (const auto& topic_partition : assignment2) {
CHECK(topic_partition.get_topic() == KAFKA_TOPIC);
CHECK(partitions.erase(topic_partition.get_partition()) == true);
}
CHECK(runner1.get_messages().size() + runner2.get_messages().size() == 1);
}
TEST_CASE("consumer offset commit", "[consumer]") {
int partition = 0;
int64_t message_offset = 0;
bool offset_commit_called = false;
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
config.set_offset_commit_callback([&](Consumer&, Error error,
const TopicPartitionList& topic_partitions) {
offset_commit_called = true;
CHECK(!!error == false);
REQUIRE(topic_partitions.size() == 1);
CHECK(topic_partitions[0].get_topic() == KAFKA_TOPIC);
CHECK(topic_partitions[0].get_partition() == 0);
CHECK(topic_partitions[0].get_offset() == message_offset + 1);
});
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
ConsumerRunner runner(consumer, 1, 1);
// Produce a message just so we stop the consumer
Producer producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
runner.try_join();
REQUIRE(runner.get_messages().size() == 1);
const Message& msg = runner.get_messages()[0];
message_offset = msg.get_offset();
consumer.commit(msg);
for (size_t i = 0; i < 3 && !offset_commit_called; ++i) {
consumer.poll();
}
CHECK(offset_commit_called == true);
}
TEST_CASE("consumer throttle", "[consumer]") {
int partition = 0;
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("offset_commit");
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
{
ConsumerRunner runner(consumer, 0, 1);
runner.try_join();
}
// Produce a message just so we stop the consumer
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world!";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.flush();
size_t callback_executed_count = 0;
ConsumerDispatcher dispatcher(consumer);
dispatcher.run(
[&](Message msg) {
callback_executed_count++;
if (callback_executed_count == 3) {
return Message();
}
return move(msg);
},
[&](ConsumerDispatcher::Timeout) {
if (callback_executed_count == 3) {
dispatcher.stop();
}
}
);
CHECK(callback_executed_count == 3);
}
TEST_CASE("consume batch", "[consumer]") {
int partition = 0;
// Create a consumer and subscribe to the topic
Configuration config = make_consumer_config("test");
Consumer consumer(config);
consumer.assign({ { KAFKA_TOPIC, 0 } });
{
ConsumerRunner runner(consumer, 0, 1);
runner.try_join();
}
// Produce a message just so we stop the consumer
BufferedProducer<string> producer(make_producer_config());
string payload = "Hello world!";
// Produce it twice
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload));
producer.flush();
vector<Message> all_messages;
int i = 0;
while (i < 5 && all_messages.size() != 2) {
vector<Message> messages = consumer.poll_batch(2);
all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()),
make_move_iterator(messages.end()));
++i;
}
REQUIRE(all_messages.size() == 2);
CHECK(all_messages[0].get_payload() == payload);
CHECK(all_messages[1].get_payload() == payload);
}