Minor fixes on examples

This commit is contained in:
Matias Fontanini
2017-04-23 14:21:49 -07:00
parent b5a7c70993
commit 03189b82a1
2 changed files with 14 additions and 9 deletions

View File

@@ -26,13 +26,13 @@ int main(int argc, char* argv[]) {
po::options_description options("Options");
options.add_options()
("help,h", "produce this help message")
("brokers", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("group-id", po::value<string>(&group_id)->required(),
"the consumer group id")
("help,h", "produce this help message")
("brokers,b", po::value<string>(&brokers)->required(),
"the kafka broker list")
("topic,t", po::value<string>(&topic_name)->required(),
"the topic in which to write to")
("group-id,g", po::value<string>(&group_id)->required(),
"the consumer group id")
;
po::variables_map vm;
@@ -84,7 +84,8 @@ int main(int argc, char* argv[]) {
if (msg) {
// If we managed to get a message
if (msg.get_error()) {
if (msg.get_error() != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// Ignore EOF notifications from rdkafka
if (!msg.is_eof()) {
cout << "[+] Received error notification: " << msg.get_error() << endl;
}
}

View File

@@ -48,7 +48,9 @@ int main(int argc, char* argv[]) {
return 1;
}
// Create a message builder for this topic
MessageBuilder builder(topic_name);
// Get the partition we want to write to. If no partition is provided, this will be
// an unassigned one
if (partition_value != -1) {
@@ -68,8 +70,10 @@ int main(int argc, char* argv[]) {
// Now read lines and write them into kafka
string line;
while (getline(cin, line)) {
// Set the payload on this builder
builder.payload(line);
// Write the string into the partition
// Actually produce the message we've built
producer.produce(builder);
}
}