diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 98ae3c9..c7a42e6 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -15,7 +15,7 @@ class TopicConfiguration; class Consumer : public KafkaHandleBase { public: - using AssignmentCallback = std::function; + using AssignmentCallback = std::function; using RevocationCallback = std::function; using RebalanceErrorCallback = std::function; diff --git a/src/consumer.cpp b/src/consumer.cpp index c065322..df0b828 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -26,6 +26,7 @@ Consumer::Consumer(Configuration config) { char error_buffer[512]; // Set ourselves as the opaque pointer rd_kafka_conf_set_opaque(config.get_handle(), this); + rd_kafka_conf_set_rebalance_cb(config.get_handle(), &Consumer::rebalance_proxy); rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, rd_kafka_conf_dup(config.get_handle()), error_buffer, sizeof(error_buffer)); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 866b41a..88fb872 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -14,6 +14,7 @@ endmacro() add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") +create_test(consumer) create_test(producer) create_test(kafka_handle_base) create_test(topic_partition_list) diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp new file mode 100644 index 0000000..03971e2 --- /dev/null +++ b/tests/consumer_test.cpp @@ -0,0 +1,183 @@ +#include +#include +#include +#include +#include +#include +#include +#include "cppkafka/consumer.h" +#include "cppkafka/producer.h" + +using std::vector; +using std::move; +using std::string; +using std::thread; +using std::set; +using std::mutex; +using std::condition_variable; +using std::lock_guard; +using std::unique_lock; +using std::chrono::seconds; +using std::chrono::milliseconds; +using std::chrono::system_clock; + +using namespace cppkafka; + +class ConsumerRunner { +public: + ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) + : consumer_(consumer) { + bool booted = false; + mutex mtx; + condition_variable cond; + thread_ = thread([&, expected, partitions]() { + consumer_.set_timeout(milliseconds(500)); + size_t number_eofs = 0; + auto start = system_clock::now(); + while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { + Message msg = consumer_.poll(); + if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + number_eofs++; + if (number_eofs == partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + } + else if (msg && msg.get_error() == 0) { + messages_.push_back(move(msg)); + } + } + }); + + unique_lock lock(mtx); + while (!booted) { + cond.wait(lock); + } + } + + + + ConsumerRunner(const ConsumerRunner&) = delete; + ConsumerRunner& operator=(const ConsumerRunner&) = delete; + + ~ConsumerRunner() { + try_join(); + } + + const std::vector& get_messages() const { + return messages_; + } + + void try_join() { + if (thread_.joinable()) { + thread_.join(); + } + } +private: + Consumer& consumer_; + thread thread_; + std::vector messages_; +}; + +class ConsumerTest : public testing::Test { +public: + static const string KAFKA_TOPIC; + + Configuration make_producer_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + return config; + } + + Configuration make_consumer_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + config.set("enable.auto.commit", "false"); + config.set("group.id", "consumer_test"); + return config; + } +}; + +const string ConsumerTest::KAFKA_TOPIC = "cppkafka_test1"; + +TEST_F(ConsumerTest, AssignmentCallback) { + vector assignment; + int partition = 0; + + // Create a consumer and subscribe to the topic + Consumer consumer(make_consumer_config()); + consumer.set_assignment_callback([&](const vector& topic_partitions) { + assignment = topic_partitions; + }); + consumer.subscribe({ KAFKA_TOPIC }); + ConsumerRunner runner(consumer, 1, 3); + + // Produce a message just so we stop the consumer + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world!"; + producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + runner.try_join(); + + // All 3 partitions should be ours + EXPECT_EQ(3, assignment.size()); + set partitions = { 0, 1, 2 }; + for (const auto& topic_partition : assignment) { + EXPECT_EQ(KAFKA_TOPIC, topic_partition.get_topic()); + EXPECT_TRUE(partitions.erase(topic_partition.get_partition())); + } + EXPECT_EQ(1, runner.get_messages().size()); + + assignment = consumer.get_assignment(); + EXPECT_EQ(3, assignment.size()); +} + +TEST_F(ConsumerTest, Rebalance) { + vector assignment1; + vector assignment2; + bool revocation_called = false; + int partition = 0; + + // Create a consumer and subscribe to the topic + Consumer consumer1(make_consumer_config()); + consumer1.set_assignment_callback([&](const vector& topic_partitions) { + assignment1 = topic_partitions; + }); + consumer1.set_revocation_callback([&](const vector&) { + revocation_called = true; + }); + consumer1.subscribe({ KAFKA_TOPIC }); + ConsumerRunner runner1(consumer1, 1, 3); + + // Create a second consumer and subscribe to the topic + Consumer consumer2(make_consumer_config()); + consumer2.set_assignment_callback([&](const vector& topic_partitions) { + assignment2 = topic_partitions; + }); + consumer2.subscribe({ KAFKA_TOPIC }); + ConsumerRunner runner2(consumer2, 1, 1); + + EXPECT_TRUE(revocation_called); + + // Produce a message just so we stop the consumer + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world!"; + producer.produce(topic, partition, Buffer(payload.data(), payload.size())); + runner1.try_join(); + runner2.try_join(); + + // All 3 partitions should be assigned + EXPECT_EQ(3, assignment1.size() + assignment2.size()); + set partitions = { 0, 1, 2 }; + for (const auto& topic_partition : assignment1) { + EXPECT_EQ(KAFKA_TOPIC, topic_partition.get_topic()); + EXPECT_TRUE(partitions.erase(topic_partition.get_partition())); + } + for (const auto& topic_partition : assignment2) { + EXPECT_EQ(KAFKA_TOPIC, topic_partition.get_topic()); + EXPECT_TRUE(partitions.erase(topic_partition.get_partition())); + } + EXPECT_EQ(1, runner1.get_messages().size() + runner2.get_messages().size()); +}