Fix compacted topic processor test

* Use buffered producer on compacted topic processor test

* Add include directives for callback invoker where missing

* Consume until EOF on compacted topic processor test
This commit is contained in:
Matias Fontanini
2018-06-09 14:49:50 -07:00
committed by GitHub
parent 5dcede6411
commit f15b59cb13
3 changed files with 12 additions and 5 deletions

View File

@@ -36,6 +36,7 @@
#include <string> #include <string>
#include "../consumer.h" #include "../consumer.h"
#include "backoff_performer.h" #include "backoff_performer.h"
#include "../detail/callback_invoker.h"
namespace cppkafka { namespace cppkafka {

View File

@@ -42,6 +42,7 @@
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include "../producer.h" #include "../producer.h"
#include "../message.h" #include "../message.h"
#include "../detail/callback_invoker.h"
namespace cppkafka { namespace cppkafka {

View File

@@ -5,7 +5,7 @@
#include <map> #include <map>
#include <condition_variable> #include <condition_variable>
#include <catch.hpp> #include <catch.hpp>
#include "cppkafka/producer.h" #include "cppkafka/utils/buffered_producer.h"
#include "cppkafka/consumer.h" #include "cppkafka/consumer.h"
#include "cppkafka/utils/compacted_topic_processor.h" #include "cppkafka/utils/compacted_topic_processor.h"
#include "test_utils.h" #include "test_utils.h"
@@ -65,11 +65,15 @@ TEST_CASE("consumption", "[consumer][compacted]") {
events.push_back(event); events.push_back(event);
}); });
consumer.subscribe({ KAFKA_TOPICS[0] }); consumer.subscribe({ KAFKA_TOPICS[0] });
consumer.poll(); set<int> eof_partitions;
consumer.poll(); while (eof_partitions.size() != static_cast<size_t>(KAFKA_NUM_PARTITIONS)) {
consumer.poll(); Message msg = consumer.poll();
if (msg && msg.is_eof()) {
eof_partitions.insert(msg.get_partition());
}
}
Producer producer(make_producer_config()); BufferedProducer<string> producer(make_producer_config());
struct ElementType { struct ElementType {
string value; string value;
@@ -88,6 +92,7 @@ TEST_CASE("consumption", "[consumer][compacted]") {
// Now erase the first element // Now erase the first element
string deleted_key = "42"; string deleted_key = "42";
producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key)); producer.produce(MessageBuilder(KAFKA_TOPICS[0]).partition(0).key(deleted_key));
producer.flush();
for (size_t i = 0; i < 10; ++i) { for (size_t i = 0; i < 10; ++i) {
compacted_consumer.process_event(); compacted_consumer.process_event();