From 4475209ce1986a8021ab44e3d4b9194351dd00be Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Sun, 22 May 2016 19:47:23 -0700 Subject: [PATCH] Add first producer test --- .gitmodules | 3 + CMakeLists.txt | 23 ++++- include/cppkafka/buffer.h | 10 +- include/cppkafka/consumer.h | 5 + include/cppkafka/message.h | 1 + include/cppkafka/producer.h | 3 + include/cppkafka/topic_partition_list.h | 2 + src/buffer.cpp | 6 ++ src/consumer.cpp | 10 +- src/message.cpp | 5 + src/producer.cpp | 7 +- src/topic_partition_list.cpp | 9 ++ tests/CMakeLists.txt | 17 ++++ tests/producer_test.cpp | 117 ++++++++++++++++++++++++ third_party/googletest | 1 + 15 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 .gitmodules create mode 100644 tests/CMakeLists.txt create mode 100644 tests/producer_test.cpp create mode 160000 third_party/googletest diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..5a4e85a --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "third_party/googletest"] + path = third_party/googletest + url = https://github.com/google/googletest.git diff --git a/CMakeLists.txt b/CMakeLists.txt index bd9d398..73b3fe8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,4 +5,25 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall") include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka) -add_subdirectory(src) \ No newline at end of file +add_subdirectory(src) + +set(GOOGLETEST_ROOT ${CMAKE_SOURCE_DIR}/third_party/googletest) +set(GOOGLETEST_INCLUDE ${GOOGLETEST_ROOT}/googletest/include) +set(GOOGLETEST_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/googletest) +set(GOOGLETEST_LIBRARY ${GOOGLETEST_BINARY_DIR}/googletest) + +include(ExternalProject) +ExternalProject_Add( + googletest + DOWNLOAD_COMMAND "" + SOURCE_DIR ${GOOGLETEST_ROOT} + BINARY_DIR ${GOOGLETEST_BINARY_DIR} + CMAKE_CACHE_ARGS "-DBUILD_GTEST:bool=ON" "-DBUILD_GMOCK:bool=OFF" + "-Dgtest_force_shared_crt:bool=ON" + INSTALL_COMMAND "" +) +# Make sure we build googletest before anything else +add_dependencies(cppkafka googletest) + +enable_testing() +add_subdirectory(tests) \ No newline at end of file diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index dfa28b0..3bb8d0f 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -2,6 +2,7 @@ #define CPPKAFKA_BUFFER_H #include +#include namespace cppkafka { @@ -11,6 +12,11 @@ public: Buffer(); Buffer(const DataType* data, size_t size); + template + Buffer(const ForwardIterator& start, const ForwardIterator& end) : + data_((const DataType*)&*start), size_(std::distance(start, end)) { + + } Buffer(const Buffer&) = delete; Buffer(Buffer&&) = default; @@ -19,8 +25,10 @@ public: const DataType* get_data() const; size_t get_size() const; + + std::string as_string() const; private: - const unsigned char* data_; + const DataType* data_; size_t size_; }; diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 4241f22..7454e62 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -20,6 +20,11 @@ public: using RebalanceErrorCallback = std::function; Consumer(Configuration config); + Consumer(const Consumer&) = delete; + Consumer(Consumer&) = delete; + Consumer& operator=(const Consumer&) = delete; + Consumer& operator=(Consumer&&) = delete; + ~Consumer(); void set_timeout(const std::chrono::milliseconds timeout); void set_assignment_callback(AssignmentCallback callback); diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index 19e9e9c..4910175 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -11,6 +11,7 @@ namespace cppkafka { class Message { public: + Message(); Message(rd_kafka_message_t* handle); Message(const Message&) = delete; Message(Message&& rhs) = default; diff --git a/include/cppkafka/producer.h b/include/cppkafka/producer.h index 7a51e1b..8986089 100644 --- a/include/cppkafka/producer.h +++ b/include/cppkafka/producer.h @@ -4,6 +4,9 @@ #include #include "kafka_handle_base.h" #include "configuration.h" +#include "buffer.h" +#include "topic.h" +#include "partition.h" namespace cppkafka { diff --git a/include/cppkafka/topic_partition_list.h b/include/cppkafka/topic_partition_list.h index 1d58471..1bc0f98 100644 --- a/include/cppkafka/topic_partition_list.h +++ b/include/cppkafka/topic_partition_list.h @@ -3,6 +3,7 @@ #include #include +#include #include #include "topic_partition.h" @@ -15,6 +16,7 @@ public: static TopicPartitionList make_non_owning(rd_kafka_topic_partition_list_t* handle); TopicPartitionList(); + TopicPartitionList(const std::initializer_list& topic_partitions); TopicPartitionList(rd_kafka_topic_partition_list_t* handle); TopicPartitionList(size_t size); template diff --git a/src/buffer.cpp b/src/buffer.cpp index 3e0136c..c81ff6d 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -1,5 +1,7 @@ #include "buffer.h" +using std::string; + namespace cppkafka { Buffer::Buffer() @@ -20,4 +22,8 @@ size_t Buffer::get_size() const { return size_; } +string Buffer::as_string() const { + return string(data_, data_ + size_); +} + } // cppkafka diff --git a/src/consumer.cpp b/src/consumer.cpp index 13f1cba..9863f8a 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -23,14 +23,20 @@ Consumer::Consumer(Configuration config) char error_buffer[512]; // Set ourselves as the opaque pointer rd_kafka_conf_set_opaque(config.get_handle(), this); - rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(), + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, + rd_kafka_conf_dup(config.get_handle()), error_buffer, sizeof(error_buffer)); if (!ptr) { throw Exception("Failed to create consumer handle: " + string(error_buffer)); } + rd_kafka_poll_set_consumer(ptr); set_handle(ptr); } +Consumer::~Consumer() { + close(); +} + void Consumer::set_timeout(const milliseconds timeout) { timeout_ms_ = timeout; } @@ -126,7 +132,7 @@ TopicPartitionList Consumer::get_assignment() { Message Consumer::poll() { rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count()); - return Message(message); + return message ? Message(message) : Message(); } void Consumer::commit(const Message& msg, bool async) { diff --git a/src/message.cpp b/src/message.cpp index f7457d2..2bef564 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -4,6 +4,11 @@ using std::string; namespace cppkafka { +Message::Message() +: handle_(nullptr, nullptr) { + +} + Message::Message(rd_kafka_message_t* handle) : handle_(handle, &rd_kafka_message_destroy), payload_((const Buffer::DataType*)handle_->payload, handle_->len), diff --git a/src/producer.cpp b/src/producer.cpp index a25e946..db7e3dc 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -1,9 +1,6 @@ #include #include "producer.h" #include "exceptions.h" -#include "buffer.h" -#include "topic.h" -#include "partition.h" using std::move; using std::string; @@ -13,11 +10,13 @@ namespace cppkafka { Producer::Producer(Configuration config) : config_(move(config)) { char error_buffer[512]; - rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(), + rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, + rd_kafka_conf_dup(config_.get_handle()), error_buffer, sizeof(error_buffer)); if (!ptr) { throw Exception("Failed to create producer handle: " + string(error_buffer)); } + rd_kafka_set_log_level(ptr, 7); set_handle(ptr); set_payload_policy(Producer::COPY_PAYLOAD); } diff --git a/src/topic_partition_list.cpp b/src/topic_partition_list.cpp index bec78e3..c8b09d3 100644 --- a/src/topic_partition_list.cpp +++ b/src/topic_partition_list.cpp @@ -2,6 +2,8 @@ #include "topic_partition.h" #include "exceptions.h" +using std::initializer_list; + namespace cppkafka { const size_t TopicPartitionList::DEFAULT_CONTAINER_SIZE = 5; @@ -20,6 +22,13 @@ TopicPartitionList::TopicPartitionList() } +TopicPartitionList::TopicPartitionList(const initializer_list& topic_partitions) +: TopicPartitionList(topic_partitions.size()) { + for (const auto& value : topic_partitions) { + add(value); + } +} + TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle) : handle_(make_handle(handle)) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt new file mode 100644 index 0000000..cded023 --- /dev/null +++ b/tests/CMakeLists.txt @@ -0,0 +1,17 @@ +include_directories(${GOOGLETEST_INCLUDE}) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/) + +link_directories(${GOOGLETEST_LIBRARY}) +link_libraries(cppkafka rdkafka gtest gtest_main pthread) + +set(KAFKA_TEST_INSTANCE "127.0.0.1:9092" + CACHE STRING "The kafka instance to which connect to run tests") + +macro(create_test test_name) + add_executable(${test_name}_test "${test_name}_test.cpp") + add_test(${test_name} ${test_name}_test) +endmacro() + +add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"") + +create_test(producer) diff --git a/tests/producer_test.cpp b/tests/producer_test.cpp new file mode 100644 index 0000000..30cea4b --- /dev/null +++ b/tests/producer_test.cpp @@ -0,0 +1,117 @@ +#include +#include +#include +#include +#include +#include "cppkafka/producer.h" +#include "cppkafka/consumer.h" + +using std::string; +using std::move; +using std::thread; +using std::mutex; +using std::unique_lock; +using std::lock_guard; +using std::condition_variable; + +using std::chrono::system_clock; +using std::chrono::seconds; +using std::chrono::milliseconds; + +using namespace cppkafka; + +class ConsumerRunner { +public: + ConsumerRunner(Consumer& consumer, size_t expected) + : consumer_(consumer) { + bool booted = false; + mutex mtx; + condition_variable cond; + thread_ = thread([&, expected]() { + consumer_.set_timeout(milliseconds(100)); + bool found_eof = false; + auto start = system_clock::now(); + while (system_clock::now() - start < seconds(10) && messages_.size() < expected) { + Message msg = consumer_.poll(); + if (msg && !found_eof && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + found_eof = true; + lock_guard _(mtx); + booted = true; + cond.notify_one(); + } + else if (msg && msg.get_error() == 0) { + messages_.push_back(move(msg)); + } + } + }); + + unique_lock lock(mtx); + while (!booted) { + cond.wait(lock); + } + } + + + + ConsumerRunner(const ConsumerRunner&) = delete; + ConsumerRunner& operator=(const ConsumerRunner&) = delete; + + ~ConsumerRunner() { + try_join(); + } + + const std::vector& get_messages() const { + return messages_; + } + + void try_join() { + if (thread_.joinable()) { + thread_.join(); + } + } +private: + Consumer& consumer_; + thread thread_; + std::vector messages_; +}; + +class ProducerTest : public testing::Test { +public: + static const string KAFKA_TOPIC; + + Configuration make_producer_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + return config; + } + + Configuration make_consumer_config() { + Configuration config; + config.set("metadata.broker.list", KAFKA_TEST_INSTANCE); + config.set("enable.auto.commit", "false"); + config.set("group.id", "producer_test"); + return config; + } +}; + +const string ProducerTest::KAFKA_TOPIC = "cppkafka_test1"; + +TEST_F(ProducerTest, test1) { + int partition = 0; + + // Create a consumer and assign this topic/partition + Consumer consumer(make_consumer_config()); + consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) }); + ConsumerRunner runner(consumer, 1); + + // Now create a producer and produce a message + Producer producer(make_producer_config()); + Topic topic = producer.get_topic(KAFKA_TOPIC); + string payload = "Hello world!"; + producer.produce(topic, partition, Buffer(payload.begin(), payload.end())); + runner.try_join(); + + const auto& messages = runner.get_messages(); + ASSERT_EQ(1, messages.size()); + EXPECT_EQ(payload, messages[0].get_payload().as_string()); +} diff --git a/third_party/googletest b/third_party/googletest new file mode 160000 index 0000000..0a43962 --- /dev/null +++ b/third_party/googletest @@ -0,0 +1 @@ +Subproject commit 0a439623f75c029912728d80cb7f1b8b48739ca4