diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 84e0e46..95750d9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -19,6 +19,7 @@ add_executable(cppkafka_tests consumer_test.cpp roundrobin_poll_test.cpp headers_test.cpp + test_utils.cpp # Main file test_main.cpp diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index f02a12b..4baa896 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -35,7 +35,7 @@ static Configuration make_producer_config() { return config; } -static Configuration make_consumer_config(const string& group_id = "consumer_test") { +static Configuration make_consumer_config(const string& group_id = make_consumer_group_id()) { Configuration config; config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); config.set("enable.auto.commit", false); @@ -85,11 +85,12 @@ TEST_CASE("message consumption", "[consumer]") { TEST_CASE("consumer rebalance", "[consumer]") { TopicPartitionList assignment1; TopicPartitionList assignment2; + const string group_id = make_consumer_group_id(); bool revocation_called = false; int partition = 0; // Create a consumer and subscribe to the topic - Consumer consumer1(make_consumer_config()); + Consumer consumer1(make_consumer_config(group_id)); consumer1.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment1 = topic_partitions; }); @@ -100,7 +101,7 @@ TEST_CASE("consumer rebalance", "[consumer]") { ConsumerRunner runner1(consumer1, 1, KAFKA_NUM_PARTITIONS); // Create a second consumer and subscribe to the topic - Consumer consumer2(make_consumer_config()); + Consumer consumer2(make_consumer_config(group_id)); consumer2.set_assignment_callback([&](const TopicPartitionList& topic_partitions) { assignment2 = topic_partitions; }); diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index bf4130c..693954c 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -42,7 +42,7 @@ static Configuration make_consumer_config() { Configuration config = { { "metadata.broker.list", KAFKA_TEST_INSTANCE }, { "enable.auto.commit", false }, - { "group.id", "producer_test" }, + { "group.id", make_consumer_group_id() }, { "api.version.request", true } }; return config; diff --git a/tests/roundrobin_poll_test.cpp b/tests/roundrobin_poll_test.cpp index 99414dd..22bde53 100644 --- a/tests/roundrobin_poll_test.cpp +++ b/tests/roundrobin_poll_test.cpp @@ -7,13 +7,14 @@ #include #include #include -#include +#include #include "cppkafka/cppkafka.h" #include "test_utils.h" using std::vector; using std::move; using std::string; +using std::exception; using std::thread; using std::set; using std::mutex; @@ -33,18 +34,18 @@ using namespace cppkafka; // Helper functions //================================================================================== static Configuration make_producer_config() { - Configuration config; - config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + Configuration config = { + { "metadata.broker.list", KAFKA_TEST_INSTANCE }, + }; return config; } -static Configuration make_consumer_config(const string& group_id = "rr_consumer_test") { - Configuration config; - config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); - config.set("enable.auto.commit", true); - config.set("enable.auto.offset.store", true ); - config.set("auto.commit.interval.ms", 100); - config.set("group.id", group_id); +static Configuration make_consumer_config(const string& group_id = make_consumer_group_id()) { + Configuration config = { + { "metadata.broker.list", KAFKA_TEST_INSTANCE }, + { "enable.auto.commit", false }, + { "group.id", group_id }, + }; return config; } @@ -63,44 +64,6 @@ static vector make_roundrobin_partition_vector(int total_messages) { // TESTS //======================================================================== -TEST_CASE("serial consumer test", "[roundrobin consumer]") { - int messages_per_partition = 3; - int total_messages = KAFKA_NUM_PARTITIONS * messages_per_partition; - - // Create a consumer and subscribe to the topic - Consumer consumer(make_consumer_config()); - TopicPartitionList partitions; - for (int i = 0; i < KAFKA_NUM_PARTITIONS; partitions.emplace_back(KAFKA_TOPICS[0], i++)); - consumer.assign(partitions); - - // Start the runner with the original consumer - ConsumerRunner runner(consumer, total_messages, KAFKA_NUM_PARTITIONS); - - // Produce messages so we stop the consumer - Producer producer(make_producer_config()); - string payload = "Serial"; - - // push 3 messages in each partition - for (int i = 0; i < total_messages; ++i) { - producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload)); - } - producer.flush(); - runner.try_join(); - - // Check that we have all messages - REQUIRE(runner.get_messages().size() == total_messages); - - // messages should have sequential identical partition ids in groups of - int expected_partition; - for (int i = 0; i < total_messages; ++i) { - if ((i % messages_per_partition) == 0) { - expected_partition = runner.get_messages()[i].get_partition(); - } - REQUIRE(runner.get_messages()[i].get_partition() == expected_partition); - REQUIRE((string)runner.get_messages()[i].get_payload() == payload); - } -} - TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { TopicPartitionList assignment; int messages_per_partition = 3; @@ -119,9 +82,21 @@ TEST_CASE("roundrobin consumer test", "[roundrobin consumer]") { // push 3 messages in each partition for (int i = 0; i < total_messages; ++i) { - producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(i%KAFKA_NUM_PARTITIONS).payload(payload)); + producer.produce(MessageBuilder(KAFKA_TOPICS[0]) + .partition(i % KAFKA_NUM_PARTITIONS) + .payload(payload)); + } + for (int i = 0; i < 3; ++i) { + try { + producer.flush(); + break; + } + catch (const exception& ex) { + if (i == 2) { + throw; + } + } } - producer.flush(); runner.try_join(); // Check that we have all messages diff --git a/tests/test_utils.cpp b/tests/test_utils.cpp new file mode 100644 index 0000000..13e22c7 --- /dev/null +++ b/tests/test_utils.cpp @@ -0,0 +1,94 @@ +#include +#include +#include +#include +#include +#include "test_utils.h" + +using std::chrono::duration_cast; +using std::chrono::milliseconds; +using std::chrono::seconds; +using std::chrono::system_clock; +using std::hex; +using std::move; +using std::numeric_limits; +using std::ostringstream; +using std::random_device; +using std::string; +using std::uniform_int_distribution; +using std::unique_ptr; +using std::vector; + +//================================================================================== +// PollStrategyAdapter +//================================================================================== + +PollStrategyAdapter::PollStrategyAdapter(Configuration config) + : Consumer(config) { +} + +void PollStrategyAdapter::add_polling_strategy(unique_ptr poll_strategy) { + strategy_ = move(poll_strategy); +} + +void PollStrategyAdapter::delete_polling_strategy() { + strategy_.reset(); +} + +Message PollStrategyAdapter::poll() { + if (strategy_) { + return strategy_->poll(); + } + return Consumer::poll(); +} + +Message PollStrategyAdapter::poll(milliseconds timeout) { + if (strategy_) { + return strategy_->poll(timeout); + } + return Consumer::poll(timeout); +} + +vector PollStrategyAdapter::poll_batch(size_t max_batch_size) { + if (strategy_) { + return strategy_->poll_batch(max_batch_size); + } + return Consumer::poll_batch(max_batch_size); +} + +vector PollStrategyAdapter::poll_batch(size_t max_batch_size, milliseconds timeout) { + if (strategy_) { + return strategy_->poll_batch(max_batch_size, timeout); + } + return Consumer::poll_batch(max_batch_size, timeout); +} + +void PollStrategyAdapter::set_timeout(milliseconds timeout) { + if (strategy_) { + strategy_->set_timeout(timeout); + } + else { + Consumer::set_timeout(timeout); + } +} + +milliseconds PollStrategyAdapter::get_timeout() { + if (strategy_) { + return strategy_->get_timeout(); + } + return Consumer::get_timeout(); +} + +// Misc + +string make_consumer_group_id() { + ostringstream output; + output << hex; + + random_device rd; + uniform_int_distribution distribution(0, numeric_limits::max()); + const auto now = duration_cast(system_clock::now().time_since_epoch()); + const auto random_number = distribution(rd); + output << now.count() << random_number; + return output.str(); +} diff --git a/tests/test_utils.h b/tests/test_utils.h index 8b882a2..030e1c0 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -1,6 +1,7 @@ #ifndef CPPKAFKA_TEST_UTILS_H #define CPPKAFKA_TEST_UTILS_H +#include #include #include #include "cppkafka/consumer.h" @@ -57,9 +58,14 @@ private: std::unique_ptr strategy_; }; +// Misc + +std::string make_consumer_group_id(); + using PollConsumerRunner = BasicConsumerRunner; using ConsumerRunner = BasicConsumerRunner; + #include "test_utils_impl.h" #endif // CPPKAFKA_TEST_UTILS_H diff --git a/tests/test_utils_impl.h b/tests/test_utils_impl.h index 46b423a..7afceb5 100644 --- a/tests/test_utils_impl.h +++ b/tests/test_utils_impl.h @@ -1,7 +1,6 @@ #include #include #include -#include "test_utils.h" #include "cppkafka/utils/consumer_dispatcher.h" using std::vector; @@ -45,7 +44,8 @@ BasicConsumerRunner::BasicConsumerRunner(ConsumerType& consumer, } }, // EOF callback - [&](typename BasicConsumerDispatcher::EndOfFile, const TopicPartition& topic_partition) { + [&](typename BasicConsumerDispatcher::EndOfFile, + const TopicPartition& topic_partition) { if (number_eofs != partitions) { number_eofs++; if (number_eofs == partitions) { @@ -99,73 +99,4 @@ void BasicConsumerRunner::try_join() { } } -//================================================================================== -// PollStrategyAdapter -//================================================================================== -inline -PollStrategyAdapter::PollStrategyAdapter(Configuration config) - : Consumer(config) { -} - -inline -void PollStrategyAdapter::add_polling_strategy(std::unique_ptr poll_strategy) { - strategy_ = std::move(poll_strategy); -} - -inline -void PollStrategyAdapter::delete_polling_strategy() { - strategy_.reset(); -} - -inline -Message PollStrategyAdapter::poll() { - if (strategy_) { - return strategy_->poll(); - } - return Consumer::poll(); -} - -inline -Message PollStrategyAdapter::poll(milliseconds timeout) { - if (strategy_) { - return strategy_->poll(timeout); - } - return Consumer::poll(timeout); -} - -inline -std::vector PollStrategyAdapter::poll_batch(size_t max_batch_size) { - if (strategy_) { - return strategy_->poll_batch(max_batch_size); - } - return Consumer::poll_batch(max_batch_size); -} - -inline -std::vector PollStrategyAdapter::poll_batch(size_t max_batch_size, - milliseconds timeout) { - if (strategy_) { - return strategy_->poll_batch(max_batch_size, timeout); - } - return Consumer::poll_batch(max_batch_size, timeout); -} - -inline -void PollStrategyAdapter::set_timeout(milliseconds timeout) { - if (strategy_) { - strategy_->set_timeout(timeout); - } - else { - Consumer::set_timeout(timeout); - } -} - -inline -milliseconds PollStrategyAdapter::get_timeout() { - if (strategy_) { - return strategy_->get_timeout(); - } - return Consumer::get_timeout(); -} -