fix: modified code to use flush() when internal queue is not loaded

https://telecominfraproject.atlassian.net/browse/WIFI-13857

NOTE: This is port of https://github.com/Telecominfraproject/wlan-cloud-ucentralgw/pull/362

Signed-off-by: Ivan Chvets <ivan.chvets@kinarasystems.com>
This commit is contained in:
Ivan Chvets
2024-06-19 16:45:35 -04:00
parent 280ae24182
commit 574ea606c2

View File

@@ -107,8 +107,17 @@ namespace OpenWifi {
NewMessage.partition(0); NewMessage.partition(0);
NewMessage.payload(Msg->Payload()); NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage); Producer.produce(NewMessage);
if (Queue_.size() < 100) {
// use flush when internal queue is lightly loaded, i.e. flush after each
// message
Producer.flush();
}
else {
// use poll when internal queue is loaded to allow messages to be sent in
// batches
Producer.poll((std::chrono::milliseconds) 0); Producer.poll((std::chrono::milliseconds) 0);
} }
}
} catch (const cppkafka::HandleException &E) { } catch (const cppkafka::HandleException &E) {
poco_warning(Logger_, poco_warning(Logger_,
fmt::format("Caught a Kafka exception (producer): {}", E.what())); fmt::format("Caught a Kafka exception (producer): {}", E.what()));