diff --git a/CMakeLists.txt b/CMakeLists.txt index 84f4507..acf6b27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,3 +50,4 @@ add_subdirectory(src) add_dependencies(cppkafka googletest) enable_testing() add_subdirectory(tests) +add_subdirectory(examples) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..d54891f --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,8 @@ +find_package(Boost REQUIRED COMPONENTS program_options) + +link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY} ${ZOOKEEPER_LIBRARY}) + +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) + +add_executable(kafka_producer kafka_producer.cpp) +add_executable(kafka_consumer kafka_consumer.cpp) diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp new file mode 100644 index 0000000..8555bbf --- /dev/null +++ b/examples/kafka_consumer.cpp @@ -0,0 +1,93 @@ +#include +#include +#include +#include +#include "cppkafka/consumer.h" +#include "cppkafka/configuration.h" + +using std::string; +using std::exception; +using std::cout; +using std::endl; + +using cppkafka::Consumer; +using cppkafka::Configuration; +using cppkafka::Message; + +namespace po = boost::program_options; + +#ifndef CPPKAFKA_HAVE_ZOOKEEPER + static_assert(false, "Examples require the zookeeper extension"); +#endif + +bool running = true; + +int main(int argc, char* argv[]) { + string zookeeper_endpoint; + string topic_name; + string group_id; + + po::options_description options("Options"); + options.add_options() + ("help,h", "produce this help message") + ("zookeeper", po::value(&zookeeper_endpoint)->required(), + "the zookeeper endpoint") + ("topic", po::value(&topic_name)->required(), + "the topic in which to write to") + ("group-id", po::value(&group_id)->required(), + "the consumer group id") + ; + + po::variables_map vm; + + try { + po::store(po::command_line_parser(argc, argv).options(options).run(), vm); + po::notify(vm); + } + catch (exception& ex) { + cout << "Error parsing options: " << ex.what() << endl; + cout << endl; + cout << options << endl; + return 1; + } + + // Stop processing on SIGINT + signal(SIGINT, [](int) { running = false; }); + + // Construct the configuration + Configuration config; + config.set("zookeeper", zookeeper_endpoint); + config.set("group.id", group_id); + // Disable auto commit + config.set("enable.auto.commit", false); + + // Create the consumer + Consumer consumer(config); + + // Subscribe to the topic + consumer.subscribe({ topic_name }); + + // Now read lines and write them into kafka + while (running) { + // Try to consume a message + Message msg = consumer.poll(); + if (msg) { + // If we managed to get a message + if (msg.has_error()) { + if (msg.get_error() != RD_KAFKA_RESP_ERR__PARTITION_EOF) { + cout << "[+] Received error notification: " << msg.get_error_string() << endl; + } + } + else { + // Print the key (if any) + if (msg.get_key()) { + cout << msg.get_key().as_string() << " -> "; + } + // Print the payload + cout << msg.get_payload().as_string() << endl; + // Now commit the message + consumer.commit(msg); + } + } + } +} diff --git a/examples/kafka_producer.cpp b/examples/kafka_producer.cpp new file mode 100644 index 0000000..ed2f3fd --- /dev/null +++ b/examples/kafka_producer.cpp @@ -0,0 +1,76 @@ +#include +#include +#include +#include "cppkafka/producer.h" +#include "cppkafka/configuration.h" + +using std::string; +using std::exception; +using std::getline; +using std::cin; +using std::cout; +using std::endl; + +using cppkafka::Producer; +using cppkafka::Configuration; +using cppkafka::Topic; +using cppkafka::Partition; + +namespace po = boost::program_options; + +#ifndef CPPKAFKA_HAVE_ZOOKEEPER + static_assert(false, "Examples require the zookeeper extension"); +#endif + +int main(int argc, char* argv[]) { + string zookeeper_endpoint; + string topic_name; + int partition_value = -1; + + po::options_description options("Options"); + options.add_options() + ("help,h", "produce this help message") + ("zookeeper", po::value(&zookeeper_endpoint)->required(), + "the zookeeper endpoint") + ("topic", po::value(&topic_name)->required(), + "the topic in which to write to") + ("partition", po::value(&partition_value), + "the partition to write into (unassigned if not provided)") + ; + + po::variables_map vm; + + try { + po::store(po::command_line_parser(argc, argv).options(options).run(), vm); + po::notify(vm); + } + catch (exception& ex) { + cout << "Error parsing options: " << ex.what() << endl; + cout << endl; + cout << options << endl; + return 1; + } + + // Get the partition we want to write to. If no partition is provided, this will be + // an unassigned one + Partition partition; + if (partition_value != -1) { + partition = partition_value; + } + + // Construct the configuration + Configuration config; + config.set("zookeeper", zookeeper_endpoint); + + // Create the producer + Producer producer(config); + // Get the topic we want + Topic topic = producer.get_topic(topic_name); + + // Now read lines and write them into kafka + string line; + while (getline(cin, line)) { + // Write the string into the partition + producer.produce(topic, partition, line); + } +} diff --git a/include/cppkafka/buffer.h b/include/cppkafka/buffer.h index 83edd74..a7c47cc 100644 --- a/include/cppkafka/buffer.h +++ b/include/cppkafka/buffer.h @@ -92,6 +92,11 @@ public: */ size_t get_size() const; + /** + * Checks whether this is a non empty buffer + */ + explicit operator bool() const; + /** * Converts the contents of the buffer into a string */ diff --git a/include/cppkafka/message.h b/include/cppkafka/message.h index d0375fe..36a5af9 100644 --- a/include/cppkafka/message.h +++ b/include/cppkafka/message.h @@ -84,6 +84,11 @@ public: */ rd_kafka_resp_err_t get_error() const; + /** + * Gets the error as a string + */ + std::string get_error_string() const; + /** * Gets the topic that this message belongs to */ diff --git a/src/buffer.cpp b/src/buffer.cpp index e4a1cc2..7c281f7 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -51,6 +51,10 @@ size_t Buffer::get_size() const { return size_; } +Buffer::operator bool() const { + return data_ != nullptr; +} + string Buffer::as_string() const { return string(data_, data_ + size_); } diff --git a/src/message.cpp b/src/message.cpp index 813e750..be7de2e 100644 --- a/src/message.cpp +++ b/src/message.cpp @@ -71,6 +71,10 @@ rd_kafka_resp_err_t Message::get_error() const { return handle_->err; } +string Message::get_error_string() const { + return rd_kafka_err2str(handle_->err); +} + int Message::get_partition() const { return handle_->partition; }