diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 3bb8d0f..da81f54 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -11,10 +11,10 @@ public: using DataType = unsigned char; Buffer(); - Buffer(const DataType* data, size_t size); - template - Buffer(const ForwardIterator& start, const ForwardIterator& end) : - data_((const DataType*)&*start), size_(std::distance(start, end)) { + template + Buffer(const T* data, size_t size) + : data_(reinterpret_cast(data)), size_(size) { + static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte"); } diff --git a/src/buffer.cpp b/src/buffer.cpp index c81ff6d..c99817b 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -9,11 +9,6 @@ Buffer::Buffer() } -Buffer::Buffer(const DataType* data, size_t size) -: data_(data), size_(size) { - -} - const Buffer::DataType* Buffer::get_data() const { return data_; } diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp index 30cea4b..1cd7023 100644 --- a/tests/producer_test.cpp +++ b/tests/producer_test.cpp @@ -1,12 +1,15 @@ #include #include #include +#include #include #include #include "cppkafka/producer.h" #include "cppkafka/consumer.h" using std::string; +using std::to_string; +using std::set; using std::move; using std::thread; using std::mutex; @@ -22,22 +25,24 @@ using namespace cppkafka; class ConsumerRunner { public: - ConsumerRunner(Consumer& consumer, size_t expected) + ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions) : consumer_(consumer) { bool booted = false; mutex mtx; condition_variable cond; - thread_ = thread([&, expected]() { - consumer_.set_timeout(milliseconds(100)); - bool found_eof = false; + thread_ = thread([&, expected, partitions]() { + consumer_.set_timeout(milliseconds(500)); + size_t number_eofs = 0; auto start = system_clock::now(); while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { Message msg = consumer_.poll(); - if (msg && !found_eof && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - found_eof = true; - lock_guard _(mtx); - booted = true; - cond.notify_one(); + if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + number_eofs++; + if (number_eofs == partitions) { + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } } else if (msg && msg.get_error() == 0) { messages_.push_back(move(msg)); @@ -96,22 +101,88 @@ public: const string ProducerTest::KAFKA_TOPIC = "cppkafka_test1"; -TEST_F(ProducerTest, test1) { +TEST_F(ProducerTest, OneMessageOnFixedPartition) { int partition = 0; // Create a consumer and assign this topic/partition Consumer consumer(make_consumer_config()); consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); - ConsumerRunner runner(consumer, 1); + ConsumerRunner runner(consumer, 1, 1); // Now create a producer and produce a message Producer producer(make_producer_config()); Topic topic = producer.get_topic(KAFKA_TOPIC); string payload = "Hello world!"; - producer.produce(topic, partition, Buffer(payload.begin(), payload.end())); + producer.produce(topic, partition, Buffer(payload.data(), payload.size())); runner.try_join(); const auto& messages = runner.get_messages(); ASSERT_EQ(1, messages.size()); - EXPECT_EQ(payload, messages[0].get_payload().as_string()); + const auto& message = messages[0]; + EXPECT_EQ(payload, message.get_payload().as_string()); + EXPECT_EQ("", message.get_key().as_string()); + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(partition, message.get_partition()); + EXPECT_EQ(0, message.get_error()); +} + +TEST_F(ProducerTest, OneMessageUsingKey) { + int partition = 0; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 1, 1); + + // Now create a producer and produce a message + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world!"; + string key = "such key"; + producer.produce(topic, partition, Buffer(payload.data(), payload.size()), + Buffer(key.data(), key.size())); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(1, messages.size()); + const auto& message = messages[0]; + EXPECT_EQ(payload, message.get_payload().as_string()); + EXPECT_EQ(key, message.get_key().as_string()); + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(partition, message.get_partition()); + EXPECT_EQ(0, message.get_error()); +} + +TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) { + size_t message_count = 10; + int partitions = 3; + set payloads; + + // Create a consumer and subscribe to this topic + Consumer consumer(make_consumer_config()); + consumer.subscribe({ KAFKA_TOPIC }); + ConsumerRunner runner(consumer, message_count, partitions); + + // Now create a producer and produce a message + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload_base = "Hello world "; + for (size_t i = 0; i < message_count; ++i) { + string payload = payload_base + to_string(i); + payloads.insert(payload); + producer.produce(topic, {} /*unassigned partition*/, + Buffer(payload.data(), payload.size())); + } + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(message_count, messages.size()); + for (const auto& message : messages) { + EXPECT_EQ(KAFKA_TOPIC, message.get_topic()); + EXPECT_EQ(1, payloads.erase(message.get_payload().as_string())); + EXPECT_EQ(0, message.get_error()); + EXPECT_EQ("", message.get_key().as_string()); + EXPECT_GE(message.get_partition(), 0); + EXPECT_LT(message.get_partition(), 3); + } }