stephb9959
2023-09-23 15:27:40 -07:00
parent 5e1f3e0e31
commit 5052a818ff
2 changed files with 18 additions and 10 deletions

2
build
View File

@@ -1 +1 @@
7
8

View File

@@ -99,9 +99,12 @@ namespace OpenWifi {
try {
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
if (Msg != nullptr) {
Producer.produce(cppkafka::MessageBuilder(Msg->Topic())
.key(Msg->Key())
.payload(Msg->Payload()));
auto NewMessage = cppkafka::MessageBuilder(Msg->Topic());
NewMessage.key(Msg->Key());
NewMessage.partition(0);
NewMessage.payload(Msg->Payload());
Producer.produce(NewMessage);
Producer.flush();
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
@@ -157,17 +160,19 @@ namespace OpenWifi {
});
bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);
Types::StringVec Topics;
KafkaManager()->Topics(Topics);
Consumer.subscribe(Topics);
Running_ = true;
std::vector<cppkafka::Message> MsgVec;
while (Running_) {
try {
std::vector<cppkafka::Message> MsgVec =
Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100));
MsgVec.clear();
MsgVec.reserve(BatchSize);
MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000));
for (auto const &Msg : MsgVec) {
if (!Msg)
continue;
@@ -177,12 +182,12 @@ namespace OpenWifi {
fmt::format("Error: {}", Msg.get_error().to_string()));
}
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.async_commit(Msg);
Consumer.commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
@@ -355,7 +360,10 @@ namespace OpenWifi {
}
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return SystemInfoWrapper_ + PayLoad + "}";
return fmt::format( R"lit({{ "system" : {{ "id" : {},
"host" : "{}" }},
"payload" : {} }})lit", MicroServiceID(),
MicroServicePrivateEndPoint(), PayLoad ) ;
}
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,