From 42d44b056ed7fd855a0628d43cfa76c08ab8d39d Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sat, 23 Sep 2023 15:28:13 -0700 Subject: [PATCH] https://telecominfraproject.atlassian.net/browse/WIFI-12954 Signed-off-by: stephb9959 --- build | 2 +- src/framework/KafkaManager.cpp | 26 +++++++++++++++++--------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/build b/build index f11c82a..9a03714 100644 --- a/build +++ b/build @@ -1 +1 @@ -9 \ No newline at end of file +10 \ No newline at end of file diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index d32833e..d903652 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -99,9 +99,12 @@ namespace OpenWifi { try { auto Msg = dynamic_cast(Note.get()); if (Msg != nullptr) { - Producer.produce(cppkafka::MessageBuilder(Msg->Topic()) - .key(Msg->Key()) - .payload(Msg->Payload())); + auto NewMessage = cppkafka::MessageBuilder(Msg->Topic()); + NewMessage.key(Msg->Key()); + NewMessage.partition(0); + NewMessage.payload(Msg->Payload()); + Producer.produce(NewMessage); + Producer.flush(); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -157,17 +160,19 @@ namespace OpenWifi { }); bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); - auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20); + auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100); Types::StringVec Topics; KafkaManager()->Topics(Topics); Consumer.subscribe(Topics); Running_ = true; + std::vector MsgVec; while (Running_) { try { - std::vector MsgVec = - Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100)); + MsgVec.clear(); + MsgVec.reserve(BatchSize); + MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000)); for (auto const &Msg : MsgVec) { if (!Msg) continue; @@ -177,12 +182,12 @@ namespace OpenWifi { fmt::format("Error: {}", Msg.get_error().to_string())); } if (!AutoCommit) - Consumer.async_commit(Msg); + Consumer.commit(Msg); continue; } KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload()); if (!AutoCommit) - Consumer.async_commit(Msg); + Consumer.commit(Msg); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, @@ -355,7 +360,10 @@ namespace OpenWifi { } [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { - return SystemInfoWrapper_ + PayLoad + "}"; + return fmt::format( R"lit({{ "system" : {{ "id" : {}, + "host" : "{}" }}, + "payload" : {} }})lit", MicroServiceID(), + MicroServicePrivateEndPoint(), PayLoad ) ; } uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,