diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index c089611..7622061 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -4,12 +4,14 @@ include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR}) add_custom_target(examples) macro(create_example example_name) - add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp") - add_dependencies(examples ${example_name}) + string(REPLACE "_" "-" sanitized_name ${example_name}) + add_executable(${sanitized_name} EXCLUDE_FROM_ALL "${example_name}_example.cpp") + add_dependencies(examples ${sanitized_name}) endmacro() -create_example(kafka_producer) -create_example(kafka_consumer) -create_example(kafka_consumer_dispatcher) +create_example(producer) +create_example(buffered_producer) +create_example(consumer) +create_example(consumer_dispatcher) create_example(metadata) create_example(consumers_information) diff --git a/examples/buffered_producer_example.cpp b/examples/buffered_producer_example.cpp new file mode 100644 index 0000000..2ae1d6f --- /dev/null +++ b/examples/buffered_producer_example.cpp @@ -0,0 +1,96 @@ +#include +#include +#include +#include "cppkafka/utils/buffered_producer.h" +#include "cppkafka/configuration.h" + +using std::string; +using std::exception; +using std::getline; +using std::cin; +using std::cout; +using std::endl; + +using cppkafka::BufferedProducer; +using cppkafka::Configuration; +using cppkafka::Topic; +using cppkafka::MessageBuilder; +using cppkafka::Message; + +namespace po = boost::program_options; + +int main(int argc, char* argv[]) { + string brokers; + string topic_name; + int partition_value = -1; + + po::options_description options("Options"); + options.add_options() + ("help,h", "produce this help message") + ("brokers,b", po::value(&brokers)->required(), + "the kafka broker list") + ("topic,t", po::value(&topic_name)->required(), + "the topic in which to write to") + ("partition,p", po::value(&partition_value), + "the partition to write into (unassigned if not provided)") + ; + + po::variables_map vm; + + try { + po::store(po::command_line_parser(argc, argv).options(options).run(), vm); + po::notify(vm); + } + catch (exception& ex) { + cout << "Error parsing options: " << ex.what() << endl; + cout << endl; + cout << options << endl; + return 1; + } + + // Create a message builder for this topic + MessageBuilder builder(topic_name); + + // Get the partition we want to write to. If no partition is provided, this will be + // an unassigned one + if (partition_value != -1) { + builder.partition(partition_value); + } + + // Construct the configuration + Configuration config = { + { "metadata.broker.list", brokers } + }; + + // Create the producer + BufferedProducer producer(config); + + // Set a produce success callback + producer.set_produce_success_callback([](const Message& msg) { + cout << "Successfully produced message with payload " << msg.get_payload() << endl; + }); + // Set a produce failure callback + producer.set_produce_failure_callback([](const Message& msg) { + cout << "Failed to produce message with payload " << msg.get_payload() << endl; + // Return false so we stop trying to produce this message + return false; + }); + + cout << "Producing messages into topic " << topic_name << endl; + + // Now read lines and write them into kafka + string line; + while (getline(cin, line)) { + // Set the payload on this builder + builder.payload(line); + + // Add the message we've built to the buffered producer + producer.add_message(builder); + + // Now flush so we: + // * emit the buffered message + // * poll the producer so we dispatch on delivery report callbacks and + // therefore get the produce failure/success callbacks + producer.flush(); + } +} diff --git a/examples/kafka_consumer_dispatcher.cpp b/examples/consumer_dispatcher_example.cpp similarity index 100% rename from examples/kafka_consumer_dispatcher.cpp rename to examples/consumer_dispatcher_example.cpp diff --git a/examples/kafka_consumer.cpp b/examples/consumer_example.cpp similarity index 100% rename from examples/kafka_consumer.cpp rename to examples/consumer_example.cpp diff --git a/examples/consumers_information.cpp b/examples/consumers_information_example.cpp similarity index 100% rename from examples/consumers_information.cpp rename to examples/consumers_information_example.cpp diff --git a/examples/metadata.cpp b/examples/metadata_example.cpp similarity index 100% rename from examples/metadata.cpp rename to examples/metadata_example.cpp diff --git a/examples/kafka_producer.cpp b/examples/producer_example.cpp similarity index 100% rename from examples/kafka_producer.cpp rename to examples/producer_example.cpp diff --git a/include/cppkafka/utils/buffered_producer.h b/include/cppkafka/utils/buffered_producer.h index cd802a5..ffeff53 100644 --- a/include/cppkafka/utils/buffered_producer.h +++ b/include/cppkafka/utils/buffered_producer.h @@ -99,7 +99,10 @@ public: using ProduceSuccessCallback = std::function; /** - * Callback to indicate a message failed to be produced by the broker + * Callback to indicate a message failed to be produced by the broker. + * + * The returned bool indicates whether the BufferedProducer should try to produce + * the message again after each failure. */ using ProduceFailureCallback = std::function;