Assume testing kafka cluster is >= 0.10

This commit is contained in:
Matias Fontanini
2017-10-14 08:38:43 -07:00
parent 853396acab
commit 179e669c06

View File

@@ -33,7 +33,9 @@ public:
Configuration make_producer_config() { Configuration make_producer_config() {
Configuration config = { Configuration config = {
{ "metadata.broker.list", KAFKA_TEST_INSTANCE }, { "metadata.broker.list", KAFKA_TEST_INSTANCE },
{ "queue.buffering.max.ms", 0 } { "queue.buffering.max.ms", 0 },
{ "api.version.request", true },
{ "queue.buffering.max.ms", 50 }
}; };
return config; return config;
} }
@@ -42,7 +44,8 @@ public:
Configuration config = { Configuration config = {
{ "metadata.broker.list", KAFKA_TEST_INSTANCE }, { "metadata.broker.list", KAFKA_TEST_INSTANCE },
{ "enable.auto.commit", false }, { "enable.auto.commit", false },
{ "group.id", "producer_test" } { "group.id", "producer_test" },
{ "api.version.request", true }
}; };
return config; return config;
} }
@@ -91,7 +94,11 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
Producer producer(make_producer_config()); Producer producer(make_producer_config());
string payload = "Hello world! 2"; string payload = "Hello world! 2";
string key = "such key"; string key = "such key";
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition).key(key).payload(payload)); const milliseconds timestamp{15};
producer.produce(MessageBuilder(KAFKA_TOPIC).partition(partition)
.key(key)
.payload(payload)
.timestamp(timestamp));
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();
@@ -102,8 +109,8 @@ TEST_F(ProducerTest, OneMessageUsingKey) {
EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition()); EXPECT_EQ(partition, message.get_partition());
EXPECT_FALSE(message.get_error()); EXPECT_FALSE(message.get_error());
// NOTE: if this line fails, then you're using kafka 0.10+ and that's okay EXPECT_TRUE(message.get_timestamp());
EXPECT_FALSE(message.get_timestamp()); EXPECT_EQ(timestamp, message.get_timestamp()->get_timestamp());
} }
TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
@@ -168,7 +175,9 @@ TEST_F(ProducerTest, Callbacks) {
Producer producer(move(config)); Producer producer(move(config));
producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload)); producer.produce(MessageBuilder(KAFKA_TOPIC).key(key).payload(payload));
producer.poll(); while (producer.get_out_queue_length() > 0) {
producer.poll();
}
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();