diff --git a/examples/kafka_consumer.cpp b/examples/kafka_consumer.cpp index ba8bf39..69f0669 100644 --- a/examples/kafka_consumer.cpp +++ b/examples/kafka_consumer.cpp @@ -26,13 +26,13 @@ int main(int argc, char* argv[]) { po::options_description options("Options"); options.add_options() - ("help,h", "produce this help message") - ("brokers", po::value(&brokers)->required(), - "the kafka broker list") - ("topic", po::value(&topic_name)->required(), - "the topic in which to write to") - ("group-id", po::value(&group_id)->required(), - "the consumer group id") + ("help,h", "produce this help message") + ("brokers,b", po::value(&brokers)->required(), + "the kafka broker list") + ("topic,t", po::value(&topic_name)->required(), + "the topic in which to write to") + ("group-id,g", po::value(&group_id)->required(), + "the consumer group id") ; po::variables_map vm; @@ -84,7 +84,8 @@ int main(int argc, char* argv[]) { if (msg) { // If we managed to get a message if (msg.get_error()) { - if (msg.get_error() != RD_KAFKA_RESP_ERR__PARTITION_EOF) { + // Ignore EOF notifications from rdkafka + if (!msg.is_eof()) { cout << "[+] Received error notification: " << msg.get_error() << endl; } } diff --git a/examples/kafka_producer.cpp b/examples/kafka_producer.cpp index 1684eaa..fadf3d5 100644 --- a/examples/kafka_producer.cpp +++ b/examples/kafka_producer.cpp @@ -48,7 +48,9 @@ int main(int argc, char* argv[]) { return 1; } + // Create a message builder for this topic MessageBuilder builder(topic_name); + // Get the partition we want to write to. If no partition is provided, this will be // an unassigned one if (partition_value != -1) { @@ -68,8 +70,10 @@ int main(int argc, char* argv[]) { // Now read lines and write them into kafka string line; while (getline(cin, line)) { + // Set the payload on this builder builder.payload(line); - // Write the string into the partition + + // Actually produce the message we've built producer.produce(builder); } }