From f15b59cb138adf940cab3f7772ebf79ba19c5630 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sat, 9 Jun 2018 14:49:50 -0700 Subject: [PATCH] Fix compacted topic processor test * Use buffered producer on compacted topic processor test * Add include directives for callback invoker where missing * Consume until EOF on compacted topic processor test --- include/cppkafka/utils/backoff_committer.h | 1 + include/cppkafka/utils/buffered_producer.h | 1 + tests/compacted_topic_processor_test.cpp | 15 ++++++++++----- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/include/cppkafka/utils/backoff_committer.h b/include/cppkafka/utils/backoff_committer.h index 7c15b1a..d54f2e8 100644 --- a/include/cppkafka/utils/backoff_committer.h +++ b/include/cppkafka/utils/backoff_committer.h @@ -36,6 +36,7 @@ #include #include "../consumer.h" #include "backoff_performer.h" +#include "../detail/callback_invoker.h" namespace cppkafka { diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index 74febbd..c9bcb65 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -42,6 +42,7 @@ #include #include "../producer.h" #include "../message.h" +#include "../detail/callback_invoker.h" namespace cppkafka { diff --git a/tests/compacted_topic_processor_test.cpp b/tests/compacted_topic_processor_test.cpp index 4b4c36d..0894ccc 100644 --- a/tests/compacted_topic_processor_test.cpp +++ b/tests/compacted_topic_processor_test.cpp @@ -5,7 +5,7 @@ #include #include #include -#include "cppkafka/producer.h" +#include "cppkafka/utils/buffered_producer.h" #include "cppkafka/consumer.h" #include "cppkafka/utils/compacted_topic_processor.h" #include "test_utils.h" @@ -65,11 +65,15 @@ TEST_CASE("consumption", "[consumer][compacted]") { events.push_back(event); }); consumer.subscribe({ KAFKA_TOPICS[0] }); - consumer.poll(); - consumer.poll(); - consumer.poll(); + set eof_partitions; + while (eof_partitions.size() != static_cast(KAFKA_NUM_PARTITIONS)) { + Message msg = consumer.poll(); + if (msg && msg.is_eof()) { + eof_partitions.insert(msg.get_partition()); + } + } - Producer producer(make_producer_config()); + BufferedProducer producer(make_producer_config()); struct ElementType { string value; @@ -88,6 +92,7 @@ TEST_CASE("consumption", "[consumer][compacted]") { // Now erase the first element string deleted_key = "42"; producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key)); + producer.flush(); for (size_t i = 0; i < 10; ++i) { compacted_consumer.process_event();