diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 9605736..c398368 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -107,7 +107,7 @@ namespace OpenWifi { NewMessage.partition(0); NewMessage.payload(Msg->Payload()); Producer.produce(NewMessage); - Producer.flush(); + Producer.poll((std::chrono::milliseconds) 0); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -117,8 +117,13 @@ namespace OpenWifi { } catch (...) { poco_error(Logger_, "std::exception"); } + if (Queue_.size() == 0) { + // message queue is empty, flush all previously sent messages + Producer.flush(); + } Note = Queue_.waitDequeueNotification(); } + Producer.flush(); poco_information(Logger_, "Stopped..."); } @@ -324,4 +329,4 @@ namespace OpenWifi { partitions.front().get_partition())); } -} // namespace OpenWifi \ No newline at end of file +} // namespace OpenWifi