diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 6a98e74..70d67d6 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -6,9 +6,15 @@ if (Boost_PROGRAM_OPTIONS_FOUND) include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include) include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR}) - add_executable(kafka_producer EXCLUDE_FROM_ALL kafka_producer.cpp) - add_executable(kafka_consumer EXCLUDE_FROM_ALL kafka_consumer.cpp) - add_custom_target(examples DEPENDS kafka_producer kafka_consumer) + add_custom_target(examples) + macro(create_example example_name) + add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") + add_dependencies(examples ${example_name}) + endmacro() + + create_example(kafka_producer) + create_example(kafka_consumer) + create_example(metadata) else() message(STATUS "Disabling examples since boost.program_options was not found") endif() diff --git a/examples/metadata.cpp b/examples/metadata.cpp new file mode 100644 index 0000000..a1baa50 --- /dev/null +++ b/examples/metadata.cpp @@ -0,0 +1,88 @@ +#include +#include +#include +#include +#include "cppkafka/consumer.h" +#include "cppkafka/configuration.h" +#include "cppkafka/metadata.h" +#include "cppkafka/topic.h" + +using std::string; +using std::exception; +using std::cout; +using std::endl; + +using cppkafka::Consumer; +using cppkafka::Exception; +using cppkafka::Configuration; +using cppkafka::Topic; +using cppkafka::Metadata; +using cppkafka::TopicMetadata; +using cppkafka::BrokerMetadata; + +namespace po = boost::program_options; + +bool running = true; + +int main(int argc, char* argv[]) { + string brokers; + string group_id; + + po::options_description options("Options"); + options.add_options() + ("help,h", "produce this help message") + ("brokers,b", po::value(&brokers)->required(), + "the kafka broker list") + ("group-id,g", po::value(&group_id)->required(), + "the consumer group id") + ; + + po::variables_map vm; + + try { + po::store(po::command_line_parser(argc, argv).options(options).run(), vm); + po::notify(vm); + } + catch (exception& ex) { + cout << "Error parsing options: " << ex.what() << endl; + cout << endl; + cout << options << endl; + return 1; + } + + // Stop processing on SIGINT + signal(SIGINT, [](int) { running = false; }); + + // Construct the configuration + Configuration config = { + { "metadata.broker.list", brokers }, + { "group.id", group_id }, + // Disable auto commit + { "enable.auto.commit", false } + }; + + try { + // Construct a consumer + Consumer consumer(config); + + // Fetch the metadata + Metadata metadata = consumer.get_metadata(); + + // Iterate over brokers + cout << "Found the following brokers: " << endl; + for (const BrokerMetadata& broker : metadata.get_brokers()) { + cout << "* " << broker.get_host() << endl; + } + cout << endl; + + // Iterate over topics + cout << "Found the following topics: " << endl; + for (const TopicMetadata& topic : metadata.get_topics()) { + cout << "* " << topic.get_name() << ": " << topic.get_partitions().size() + << " partitions" << endl; + } + } + catch (const Exception& ex) { + cout << "Error fetching metadata: " << ex.what() << endl; + } +} \ No newline at end of file