diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 70d67d6..4a1dcd4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,6 +15,7 @@ if (Boost_PROGRAM_OPTIONS_FOUND) create_example(kafka_producer) create_example(kafka_consumer) create_example(metadata) + create_example(consumers_information) else() message(STATUS "Disabling examples since boost.program_options was not found") endif() diff --git a/examples/consumers_information.cpp b/examples/consumers_information.cpp new file mode 100644 index 0000000..b8dc492 --- /dev/null +++ b/examples/consumers_information.cpp @@ -0,0 +1,87 @@ +#include +#include +#include +#include "cppkafka/producer.h" +#include "cppkafka/configuration.h" +#include "cppkafka/group_information.h" +#include "cppkafka/topic.h" + +using std::string; +using std::exception; +using std::vector; +using std::cout; +using std::endl; + +using cppkafka::Producer; +using cppkafka::Exception; +using cppkafka::Configuration; +using cppkafka::Topic; +using cppkafka::GroupInformation; +using cppkafka::GroupMemberInformation; + +namespace po = boost::program_options; + +int main(int argc, char* argv[]) { + string brokers; + string group_id; + + po::options_description options("Options"); + options.add_options() + ("help,h", "produce this help message") + ("brokers,b", po::value(&brokers)->required(), + "the kafka broker list") + ("group-id,g", po::value(&group_id), + "only fetch consumer group information for the specified one") + ; + + po::variables_map vm; + + try { + po::store(po::command_line_parser(argc, argv).options(options).run(), vm); + po::notify(vm); + } + catch (exception& ex) { + cout << "Error parsing options: " << ex.what() << endl; + cout << endl; + cout << options << endl; + return 1; + } + + // Construct the configuration + Configuration config = { + { "metadata.broker.list", brokers }, + // Disable auto commit + { "enable.auto.commit", false } + }; + + try { + // Construct a producer + Producer producer(config); + + // Fetch the group information + vector groups = [&]() { + if (!group_id.empty()) { + return vector{producer.get_consumer_group(group_id)}; + } + else { + return producer.get_consumer_groups(); + } + }(); + + if (groups.empty()) { + cout << "Found no consumers" << endl; + return 0; + } + cout << "Found the following consumers: " << endl; + for (const GroupInformation& group : groups) { + cout << "* \"" << group.get_name() << "\" having the following members: " << endl; + for (const GroupMemberInformation& info : group.get_members()) { + cout << " - " << info.get_member_id() << " @ " << info.get_client_host() << endl; + } + cout << endl; + } + } + catch (const Exception& ex) { + cout << "Error fetching group information: " << ex.what() << endl; + } +} \ No newline at end of file