diff --git a/tests/consumer_test.cpp b/tests/consumer_test.cpp index 7283207..a64de5e 100644 --- a/tests/consumer_test.cpp +++ b/tests/consumer_test.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include "cppkafka/consumer.h" @@ -21,6 +22,7 @@ using std::tie; using std::condition_variable; using std::lock_guard; using std::unique_lock; +using std::make_move_iterator; using std::chrono::seconds; using std::chrono::milliseconds; using std::chrono::system_clock; @@ -232,8 +234,15 @@ TEST_F(ConsumerTest, ConsumeBatch) { producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).payload(payload)); producer.flush(); - vector messages = consumer.poll_batch(2); - ASSERT_EQ(2, messages.size()); - EXPECT_EQ(payload, messages[0].get_payload()); - EXPECT_EQ(payload, messages[1].get_payload()); + vector all_messages; + int i = 0; + while (i < 5 && all_messages.size() != 2) { + vector messages = consumer.poll_batch(2); + all_messages.insert(all_messages.end(), make_move_iterator(messages.begin()), + make_move_iterator(messages.end())); + ++i; + } + ASSERT_EQ(2, all_messages.size()); + EXPECT_EQ(payload, all_messages[0].get_payload()); + EXPECT_EQ(payload, all_messages[1].get_payload()); }