Add more tests to producer

This commit is contained in:
Matias Fontanini
2016-05-23 21:03:34 -07:00
parent 4475209ce1
commit 6374062f47
3 changed files with 88 additions and 22 deletions

View File

@@ -11,10 +11,10 @@ public:
using DataType = unsigned char; using DataType = unsigned char;
Buffer(); Buffer();
Buffer(const DataType* data, size_t size); template <typename T>
template <typename ForwardIterator> Buffer(const T* data, size_t size)
Buffer(const ForwardIterator& start, const ForwardIterator& end) : : data_(reinterpret_cast<const DataType*>(data)), size_(size) {
data_((const DataType*)&*start), size_(std::distance(start, end)) { static_assert(sizeof(T) == 1, "Buffer must point to elements of 1 byte");
} }

View File

@@ -9,11 +9,6 @@ Buffer::Buffer()
} }
Buffer::Buffer(const DataType* data, size_t size)
: data_(data), size_(size) {
}
const Buffer::DataType* Buffer::get_data() const { const Buffer::DataType* Buffer::get_data() const {
return data_; return data_;
} }

View File

@@ -1,12 +1,15 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <chrono> #include <chrono>
#include <set>
#include <condition_variable> #include <condition_variable>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include "cppkafka/producer.h" #include "cppkafka/producer.h"
#include "cppkafka/consumer.h" #include "cppkafka/consumer.h"
using std::string; using std::string;
using std::to_string;
using std::set;
using std::move; using std::move;
using std::thread; using std::thread;
using std::mutex; using std::mutex;
@@ -22,22 +25,24 @@ using namespace cppkafka;
class ConsumerRunner { class ConsumerRunner {
public: public:
ConsumerRunner(Consumer& consumer, size_t expected) ConsumerRunner(Consumer& consumer, size_t expected, size_t partitions)
: consumer_(consumer) { : consumer_(consumer) {
bool booted = false; bool booted = false;
mutex mtx; mutex mtx;
condition_variable cond; condition_variable cond;
thread_ = thread([&, expected]() { thread_ = thread([&, expected, partitions]() {
consumer_.set_timeout(milliseconds(100)); consumer_.set_timeout(milliseconds(500));
bool found_eof = false; size_t number_eofs = 0;
auto start = system_clock::now(); auto start = system_clock::now();
while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { while (system_clock::now() - start < seconds(10) && messages_.size() < expected) {
Message msg = consumer_.poll(); Message msg = consumer_.poll();
if (msg && !found_eof && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { if (msg && number_eofs != partitions && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
found_eof = true; number_eofs++;
lock_guard<mutex> _(mtx); if (number_eofs == partitions) {
booted = true; lock_guard<mutex> _(mtx);
cond.notify_one(); booted = true;
cond.notify_one();
}
} }
else if (msg && msg.get_error() == 0) { else if (msg && msg.get_error() == 0) {
messages_.push_back(move(msg)); messages_.push_back(move(msg));
@@ -96,22 +101,88 @@ public:
const string ProducerTest::KAFKA_TOPIC = "cppkafka_test1"; const string ProducerTest::KAFKA_TOPIC = "cppkafka_test1";
TEST_F(ProducerTest, test1) { TEST_F(ProducerTest, OneMessageOnFixedPartition) {
int partition = 0; int partition = 0;
// Create a consumer and assign this topic/partition // Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config()); Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, 1); ConsumerRunner runner(consumer, 1, 1);
// Now create a producer and produce a message // Now create a producer and produce a message
Producer producer(make_producer_config()); Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC); Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!"; string payload = "Hello world!";
producer.produce(topic, partition, Buffer(payload.begin(), payload.end())); producer.produce(topic, partition, Buffer(payload.data(), payload.size()));
runner.try_join(); runner.try_join();
const auto& messages = runner.get_messages(); const auto& messages = runner.get_messages();
ASSERT_EQ(1, messages.size()); ASSERT_EQ(1, messages.size());
EXPECT_EQ(payload, messages[0].get_payload().as_string()); const auto& message = messages[0];
EXPECT_EQ(payload, message.get_payload().as_string());
EXPECT_EQ("", message.get_key().as_string());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
}
TEST_F(ProducerTest, OneMessageUsingKey) {
int partition = 0;
// Create a consumer and assign this topic/partition
Consumer consumer(make_consumer_config());
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
ConsumerRunner runner(consumer, 1, 1);
// Now create a producer and produce a message
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload = "Hello world!";
string key = "such key";
producer.produce(topic, partition, Buffer(payload.data(), payload.size()),
Buffer(key.data(), key.size()));
runner.try_join();
const auto& messages = runner.get_messages();
ASSERT_EQ(1, messages.size());
const auto& message = messages[0];
EXPECT_EQ(payload, message.get_payload().as_string());
EXPECT_EQ(key, message.get_key().as_string());
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(partition, message.get_partition());
EXPECT_EQ(0, message.get_error());
}
TEST_F(ProducerTest, MultipleMessagesUnassignedPartitions) {
size_t message_count = 10;
int partitions = 3;
set<string> payloads;
// Create a consumer and subscribe to this topic
Consumer consumer(make_consumer_config());
consumer.subscribe({ KAFKA_TOPIC });
ConsumerRunner runner(consumer, message_count, partitions);
// Now create a producer and produce a message
Producer producer(make_producer_config());
Topic topic = producer.get_topic(KAFKA_TOPIC);
string payload_base = "Hello world ";
for (size_t i = 0; i < message_count; ++i) {
string payload = payload_base + to_string(i);
payloads.insert(payload);
producer.produce(topic, {} /*unassigned partition*/,
Buffer(payload.data(), payload.size()));
}
runner.try_join();
const auto& messages = runner.get_messages();
ASSERT_EQ(message_count, messages.size());
for (const auto& message : messages) {
EXPECT_EQ(KAFKA_TOPIC, message.get_topic());
EXPECT_EQ(1, payloads.erase(message.get_payload().as_string()));
EXPECT_EQ(0, message.get_error());
EXPECT_EQ("", message.get_key().as_string());
EXPECT_GE(message.get_partition(), 0);
EXPECT_LT(message.get_partition(), 3);
}
} }