Merge pull request #107 from Telecominfraproject/WIFI-13597-fix-kafka-producer-using-poll

Wifi 13597 fix kafka producer using poll
This commit is contained in:
i-chvets
2024-07-12 11:01:19 -04:00
committed by GitHub

View File

@@ -107,8 +107,17 @@ namespace OpenWifi {
NewMessage.partition(0);
NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage);
if (Queue_.size() < 100) {
// use flush when internal queue is lightly loaded, i.e. flush after each
// message
Producer.flush();
}
else {
// use poll when internal queue is loaded to allow messages to be sent in
// batches
Producer.poll((std::chrono::milliseconds) 0);
}
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
fmt::format("Caught a Kafka exception (producer): {}", E.what()));
@@ -117,8 +126,13 @@ namespace OpenWifi {
} catch (...) {
poco_error(Logger_, "std::exception");
}
if (Queue_.size() == 0) {
// message queue is empty, flush all previously sent messages
Producer.flush();
}
Note = Queue_.waitDequeueNotification();
}
Producer.flush();
poco_information(Logger_, "Stopped...");
}