diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 64ad0b6..7b7238d 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -33,7 +33,9 @@ public: Configuration make_producer_config() { Configuration config = { { "metadata.broker.list", KAFKA_TEST_INSTANCE }, - { "queue.buffering.max.ms", 0 } + { "queue.buffering.max.ms", 0 }, + { "api.version.request", true }, + { "queue.buffering.max.ms", 50 } }; return config; } @@ -42,7 +44,8 @@ public: Configuration config = { { "metadata.broker.list", KAFKA_TEST_INSTANCE }, { "enable.auto.commit", false }, - { "group.id", "producer_test" } + { "group.id", "producer_test" }, + { "api.version.request", true } }; return config; } @@ -91,7 +94,11 @@ TEST_F(ProducerTest, OneMessageUsingKey) { Producer producer(make_producer_config()); string payload = "Hello world! 2"; string key = "such key"; - producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); + const milliseconds timestamp{15}; + producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition) + .key(key) + .payload(payload) + .timestamp(timestamp)); runner.try_join(); const auto& messages = runner.get_messages(); @@ -102,8 +109,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) { EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(partition, message.get_partition()); EXPECT_FALSE(message.get_error()); - // NOTE: if this line fails, then you're using kafka 0.10+ and that's okay - EXPECT_FALSE(message.get_timestamp()); + EXPECT_TRUE(message.get_timestamp()); + EXPECT_EQ(timestamp, message.get_timestamp()->get_timestamp()); } TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { @@ -168,7 +175,9 @@ TEST_F(ProducerTest, Callbacks) { Producer producer(move(config)); producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload)); - producer.poll(); + while (producer.get_out_queue_length() > 0) { + producer.poll(); + } runner.try_join(); const auto& messages = runner.get_messages();