mirror of
https://github.com/Telecominfraproject/wlan-cloud-lib-cppkafka.git
synced 2025-10-30 18:17:58 +00:00
Add first producer test
This commit is contained in:
3
.gitmodules
vendored
Normal file
3
.gitmodules
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
[submodule "third_party/googletest"]
|
||||
path = third_party/googletest
|
||||
url = https://github.com/google/googletest.git
|
||||
@@ -5,4 +5,25 @@ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
|
||||
|
||||
include_directories(${CMAKE_SOURCE_DIR}/include/cppkafka)
|
||||
|
||||
add_subdirectory(src)
|
||||
add_subdirectory(src)
|
||||
|
||||
set(GOOGLETEST_ROOT ${CMAKE_SOURCE_DIR}/third_party/googletest)
|
||||
set(GOOGLETEST_INCLUDE ${GOOGLETEST_ROOT}/googletest/include)
|
||||
set(GOOGLETEST_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/googletest)
|
||||
set(GOOGLETEST_LIBRARY ${GOOGLETEST_BINARY_DIR}/googletest)
|
||||
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(
|
||||
googletest
|
||||
DOWNLOAD_COMMAND ""
|
||||
SOURCE_DIR ${GOOGLETEST_ROOT}
|
||||
BINARY_DIR ${GOOGLETEST_BINARY_DIR}
|
||||
CMAKE_CACHE_ARGS "-DBUILD_GTEST:bool=ON" "-DBUILD_GMOCK:bool=OFF"
|
||||
"-Dgtest_force_shared_crt:bool=ON"
|
||||
INSTALL_COMMAND ""
|
||||
)
|
||||
# Make sure we build googletest before anything else
|
||||
add_dependencies(cppkafka googletest)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(tests)
|
||||
@@ -2,6 +2,7 @@
|
||||
#define CPPKAFKA_BUFFER_H
|
||||
|
||||
#include <cstddef>
|
||||
#include <algorithm>
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
@@ -11,6 +12,11 @@ public:
|
||||
|
||||
Buffer();
|
||||
Buffer(const DataType* data, size_t size);
|
||||
template <typename ForwardIterator>
|
||||
Buffer(const ForwardIterator& start, const ForwardIterator& end) :
|
||||
data_((const DataType*)&*start), size_(std::distance(start, end)) {
|
||||
|
||||
}
|
||||
|
||||
Buffer(const Buffer&) = delete;
|
||||
Buffer(Buffer&&) = default;
|
||||
@@ -19,8 +25,10 @@ public:
|
||||
|
||||
const DataType* get_data() const;
|
||||
size_t get_size() const;
|
||||
|
||||
std::string as_string() const;
|
||||
private:
|
||||
const unsigned char* data_;
|
||||
const DataType* data_;
|
||||
size_t size_;
|
||||
};
|
||||
|
||||
|
||||
@@ -20,6 +20,11 @@ public:
|
||||
using RebalanceErrorCallback = std::function<void(rd_kafka_resp_err_t)>;
|
||||
|
||||
Consumer(Configuration config);
|
||||
Consumer(const Consumer&) = delete;
|
||||
Consumer(Consumer&) = delete;
|
||||
Consumer& operator=(const Consumer&) = delete;
|
||||
Consumer& operator=(Consumer&&) = delete;
|
||||
~Consumer();
|
||||
|
||||
void set_timeout(const std::chrono::milliseconds timeout);
|
||||
void set_assignment_callback(AssignmentCallback callback);
|
||||
|
||||
@@ -11,6 +11,7 @@ namespace cppkafka {
|
||||
|
||||
class Message {
|
||||
public:
|
||||
Message();
|
||||
Message(rd_kafka_message_t* handle);
|
||||
Message(const Message&) = delete;
|
||||
Message(Message&& rhs) = default;
|
||||
|
||||
@@ -4,6 +4,9 @@
|
||||
#include <memory>
|
||||
#include "kafka_handle_base.h"
|
||||
#include "configuration.h"
|
||||
#include "buffer.h"
|
||||
#include "topic.h"
|
||||
#include "partition.h"
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
#include <memory>
|
||||
#include <algorithm>
|
||||
#include <initializer_list>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
#include "topic_partition.h"
|
||||
|
||||
@@ -15,6 +16,7 @@ public:
|
||||
static TopicPartitionList make_non_owning(rd_kafka_topic_partition_list_t* handle);
|
||||
|
||||
TopicPartitionList();
|
||||
TopicPartitionList(const std::initializer_list<TopicPartition>& topic_partitions);
|
||||
TopicPartitionList(rd_kafka_topic_partition_list_t* handle);
|
||||
TopicPartitionList(size_t size);
|
||||
template <typename ForwardIterator>
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
#include "buffer.h"
|
||||
|
||||
using std::string;
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
Buffer::Buffer()
|
||||
@@ -20,4 +22,8 @@ size_t Buffer::get_size() const {
|
||||
return size_;
|
||||
}
|
||||
|
||||
string Buffer::as_string() const {
|
||||
return string(data_, data_ + size_);
|
||||
}
|
||||
|
||||
} // cppkafka
|
||||
|
||||
@@ -23,14 +23,20 @@ Consumer::Consumer(Configuration config)
|
||||
char error_buffer[512];
|
||||
// Set ourselves as the opaque pointer
|
||||
rd_kafka_conf_set_opaque(config.get_handle(), this);
|
||||
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER, config.get_handle(),
|
||||
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_CONSUMER,
|
||||
rd_kafka_conf_dup(config.get_handle()),
|
||||
error_buffer, sizeof(error_buffer));
|
||||
if (!ptr) {
|
||||
throw Exception("Failed to create consumer handle: " + string(error_buffer));
|
||||
}
|
||||
rd_kafka_poll_set_consumer(ptr);
|
||||
set_handle(ptr);
|
||||
}
|
||||
|
||||
Consumer::~Consumer() {
|
||||
close();
|
||||
}
|
||||
|
||||
void Consumer::set_timeout(const milliseconds timeout) {
|
||||
timeout_ms_ = timeout;
|
||||
}
|
||||
@@ -126,7 +132,7 @@ TopicPartitionList Consumer::get_assignment() {
|
||||
|
||||
Message Consumer::poll() {
|
||||
rd_kafka_message_t* message = rd_kafka_consumer_poll(get_handle(), timeout_ms_.count());
|
||||
return Message(message);
|
||||
return message ? Message(message) : Message();
|
||||
}
|
||||
|
||||
void Consumer::commit(const Message& msg, bool async) {
|
||||
|
||||
@@ -4,6 +4,11 @@ using std::string;
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
Message::Message()
|
||||
: handle_(nullptr, nullptr) {
|
||||
|
||||
}
|
||||
|
||||
Message::Message(rd_kafka_message_t* handle)
|
||||
: handle_(handle, &rd_kafka_message_destroy),
|
||||
payload_((const Buffer::DataType*)handle_->payload, handle_->len),
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
#include <errno.h>
|
||||
#include "producer.h"
|
||||
#include "exceptions.h"
|
||||
#include "buffer.h"
|
||||
#include "topic.h"
|
||||
#include "partition.h"
|
||||
|
||||
using std::move;
|
||||
using std::string;
|
||||
@@ -13,11 +10,13 @@ namespace cppkafka {
|
||||
Producer::Producer(Configuration config)
|
||||
: config_(move(config)) {
|
||||
char error_buffer[512];
|
||||
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER, config_.get_handle(),
|
||||
rd_kafka_t* ptr = rd_kafka_new(RD_KAFKA_PRODUCER,
|
||||
rd_kafka_conf_dup(config_.get_handle()),
|
||||
error_buffer, sizeof(error_buffer));
|
||||
if (!ptr) {
|
||||
throw Exception("Failed to create producer handle: " + string(error_buffer));
|
||||
}
|
||||
rd_kafka_set_log_level(ptr, 7);
|
||||
set_handle(ptr);
|
||||
set_payload_policy(Producer::COPY_PAYLOAD);
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
#include "topic_partition.h"
|
||||
#include "exceptions.h"
|
||||
|
||||
using std::initializer_list;
|
||||
|
||||
namespace cppkafka {
|
||||
|
||||
const size_t TopicPartitionList::DEFAULT_CONTAINER_SIZE = 5;
|
||||
@@ -20,6 +22,13 @@ TopicPartitionList::TopicPartitionList()
|
||||
|
||||
}
|
||||
|
||||
TopicPartitionList::TopicPartitionList(const initializer_list<TopicPartition>& topic_partitions)
|
||||
: TopicPartitionList(topic_partitions.size()) {
|
||||
for (const auto& value : topic_partitions) {
|
||||
add(value);
|
||||
}
|
||||
}
|
||||
|
||||
TopicPartitionList::TopicPartitionList(rd_kafka_topic_partition_list_t* handle)
|
||||
: handle_(make_handle(handle)) {
|
||||
|
||||
|
||||
17
tests/CMakeLists.txt
Normal file
17
tests/CMakeLists.txt
Normal file
@@ -0,0 +1,17 @@
|
||||
include_directories(${GOOGLETEST_INCLUDE})
|
||||
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include/)
|
||||
|
||||
link_directories(${GOOGLETEST_LIBRARY})
|
||||
link_libraries(cppkafka rdkafka gtest gtest_main pthread)
|
||||
|
||||
set(KAFKA_TEST_INSTANCE "127.0.0.1:9092"
|
||||
CACHE STRING "The kafka instance to which connect to run tests")
|
||||
|
||||
macro(create_test test_name)
|
||||
add_executable(${test_name}_test "${test_name}_test.cpp")
|
||||
add_test(${test_name} ${test_name}_test)
|
||||
endmacro()
|
||||
|
||||
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
|
||||
|
||||
create_test(producer)
|
||||
117
tests/producer_test.cpp
Normal file
117
tests/producer_test.cpp
Normal file
@@ -0,0 +1,117 @@
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <gtest/gtest.h>
|
||||
#include "cppkafka/producer.h"
|
||||
#include "cppkafka/consumer.h"
|
||||
|
||||
using std::string;
|
||||
using std::move;
|
||||
using std::thread;
|
||||
using std::mutex;
|
||||
using std::unique_lock;
|
||||
using std::lock_guard;
|
||||
using std::condition_variable;
|
||||
|
||||
using std::chrono::system_clock;
|
||||
using std::chrono::seconds;
|
||||
using std::chrono::milliseconds;
|
||||
|
||||
using namespace cppkafka;
|
||||
|
||||
class ConsumerRunner {
|
||||
public:
|
||||
ConsumerRunner(Consumer& consumer, size_t expected)
|
||||
: consumer_(consumer) {
|
||||
bool booted = false;
|
||||
mutex mtx;
|
||||
condition_variable cond;
|
||||
thread_ = thread([&, expected]() {
|
||||
consumer_.set_timeout(milliseconds(100));
|
||||
bool found_eof = false;
|
||||
auto start = system_clock::now();
|
||||
while (system_clock::now() - start < seconds(10) && messages_.size() < expected) {
|
||||
Message msg = consumer_.poll();
|
||||
if (msg && !found_eof && msg.get_error() == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
|
||||
found_eof = true;
|
||||
lock_guard<mutex> _(mtx);
|
||||
booted = true;
|
||||
cond.notify_one();
|
||||
}
|
||||
else if (msg && msg.get_error() == 0) {
|
||||
messages_.push_back(move(msg));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
unique_lock<mutex> lock(mtx);
|
||||
while (!booted) {
|
||||
cond.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
ConsumerRunner(const ConsumerRunner&) = delete;
|
||||
ConsumerRunner& operator=(const ConsumerRunner&) = delete;
|
||||
|
||||
~ConsumerRunner() {
|
||||
try_join();
|
||||
}
|
||||
|
||||
const std::vector<Message>& get_messages() const {
|
||||
return messages_;
|
||||
}
|
||||
|
||||
void try_join() {
|
||||
if (thread_.joinable()) {
|
||||
thread_.join();
|
||||
}
|
||||
}
|
||||
private:
|
||||
Consumer& consumer_;
|
||||
thread thread_;
|
||||
std::vector<Message> messages_;
|
||||
};
|
||||
|
||||
class ProducerTest : public testing::Test {
|
||||
public:
|
||||
static const string KAFKA_TOPIC;
|
||||
|
||||
Configuration make_producer_config() {
|
||||
Configuration config;
|
||||
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
|
||||
return config;
|
||||
}
|
||||
|
||||
Configuration make_consumer_config() {
|
||||
Configuration config;
|
||||
config.set("metadata.broker.list", KAFKA_TEST_INSTANCE);
|
||||
config.set("enable.auto.commit", "false");
|
||||
config.set("group.id", "producer_test");
|
||||
return config;
|
||||
}
|
||||
};
|
||||
|
||||
const string ProducerTest::KAFKA_TOPIC = "cppkafka_test1";
|
||||
|
||||
TEST_F(ProducerTest, test1) {
|
||||
int partition = 0;
|
||||
|
||||
// Create a consumer and assign this topic/partition
|
||||
Consumer consumer(make_consumer_config());
|
||||
consumer.assign({ TopicPartition(KAFKA_TOPIC, partition) });
|
||||
ConsumerRunner runner(consumer, 1);
|
||||
|
||||
// Now create a producer and produce a message
|
||||
Producer producer(make_producer_config());
|
||||
Topic topic = producer.get_topic(KAFKA_TOPIC);
|
||||
string payload = "Hello world!";
|
||||
producer.produce(topic, partition, Buffer(payload.begin(), payload.end()));
|
||||
runner.try_join();
|
||||
|
||||
const auto& messages = runner.get_messages();
|
||||
ASSERT_EQ(1, messages.size());
|
||||
EXPECT_EQ(payload, messages[0].get_payload().as_string());
|
||||
}
|
||||
1
third_party/googletest
vendored
Submodule
1
third_party/googletest
vendored
Submodule
Submodule third_party/googletest added at 0a439623f7
Reference in New Issue
Block a user