Merge pull request #112 from Telecominfraproject/WIFI-13597-fix-kafka-producer-using-poll

Wifi 13597 fix kafka producer using poll
This commit is contained in:
i-chvets
2024-07-12 11:01:39 -04:00
committed by GitHub

View File

@@ -107,7 +107,7 @@ namespace OpenWifi {
NewMessage.partition(0); NewMessage.partition(0);
NewMessage.payload(Msg->Payload()); NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage); Producer.produce(NewMessage);
Producer.flush(); Producer.poll((std::chrono::milliseconds) 0);
} }
} catch (const cppkafka::HandleException &E) { } catch (const cppkafka::HandleException &E) {
poco_warning(Logger_, poco_warning(Logger_,
@@ -117,8 +117,13 @@ namespace OpenWifi {
} catch (...) { } catch (...) {
poco_error(Logger_, "std::exception"); poco_error(Logger_, "std::exception");
} }
if (Queue_.size() == 0) {
// message queue is empty, flush all previously sent messages
Producer.flush();
}
Note = Queue_.waitDequeueNotification(); Note = Queue_.waitDequeueNotification();
} }
Producer.flush();
poco_information(Logger_, "Stopped..."); poco_information(Logger_, "Stopped...");
} }
@@ -324,4 +329,4 @@ namespace OpenWifi {
partitions.front().get_partition())); partitions.front().get_partition()));
} }
} // namespace OpenWifi } // namespace OpenWifi