From 8d04cbc05983a138466524b2e92cc6f68f3049b1 Mon Sep 17 00:00:00 2001 From: Ivan Chvets Date: Thu, 13 Jun 2024 13:18:29 -0400 Subject: [PATCH] fix: modified kafka manager to use poll in producer https://telecominfraproject.atlassian.net/browse/WIFI-13597 NOTE: This fix is port of https://github.com/Telecominfraproject/wlan-cloud-ucentralgw/pull/360 Summary of changes: - Modified code in KafkaManager to use poll instead of flush for every messages sent. flush is used only on empty internal notification queue in idle times. Signed-off-by: Ivan Chvets --- src/framework/KafkaManager.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 9605736..38cdcb1 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,7 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.flush(); + Producer.poll((std::chrono::milliseconds) 0); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -117,6 +117,10 @@ namespace OpenWifi { } catch (...) { poco_error(Logger_, "std::exception"); } + if (Queue_.size() == 0) { + // message queue is empty, flush all previously sent messages + Producer.flush(); + } Note = Queue_.waitDequeueNotification(); } poco_information(Logger_, "Stopped...");