stephb9959
2023-05-18 08:46:04 -07:00
parent ed336bec0c
commit 21a4eb2a71
19 changed files with 78 additions and 75 deletions

2
build
View File

@@ -1 +1 @@
53
54

View File

@@ -232,7 +232,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(Disconnect, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber, OS.str());
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber, std::make_shared<std::string>(OS.str()));
} catch (...) {
}
}
@@ -725,7 +725,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(PingObject, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
return;
} break;

View File

@@ -25,7 +25,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::ALERTS, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::ALERTS, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
}
}

View File

@@ -32,7 +32,7 @@ namespace OpenWifi {
Event.set("payload", EventDetails);
std::ostringstream OS;
Event.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, OS.str());
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, std::make_shared<std::string>(OS.str()));
}
}
@@ -51,7 +51,7 @@ namespace OpenWifi {
Event.set("payload", EventDetails);
std::ostringstream OS;
Event.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, OS.str());
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber, std::make_shared<std::string>(OS.str()));
}
}
@@ -232,7 +232,7 @@ namespace OpenWifi {
ParamsObj->set(uCentralProtocol::TIMESTAMP, Utils::Now());
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::CONNECTION, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
} else {
poco_warning(

View File

@@ -38,7 +38,7 @@ namespace OpenWifi {
std::ostringstream OS;
FullEvent.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber_,
OS.str());
std::make_shared<std::string>(OS.str()));
}
}
} catch (const Poco::Exception &E) {

View File

@@ -64,7 +64,7 @@ namespace OpenWifi {
std::ostringstream OS;
ParamsObj->set("timestamp", Utils::Now());
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::HEALTHCHECK, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::HEALTHCHECK, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
} else {
poco_warning(Logger_, fmt::format("HEALTHCHECK({}): Missing parameter", CId_));

View File

@@ -59,7 +59,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::STATE, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::STATE, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
GWWebSocketNotifications::SingleDevice_t N;

View File

@@ -27,9 +27,10 @@ namespace OpenWifi {
std::ostringstream SS;
Payload->stringify(SS);
auto now = Utils::Now();
auto KafkaPayload = std::make_shared<std::string>(SS.str());
if (ParamsObj->has("adhoc")) {
KafkaManager()->PostMessage(KafkaTopics::DEVICE_TELEMETRY, SerialNumber_,
SS.str());
KafkaPayload);
return;
}
if (TelemetryWebSocketRefCount_) {
@@ -38,7 +39,7 @@ namespace OpenWifi {
// std::endl;
TelemetryWebSocketPackets_++;
State_.websocketPackets = TelemetryWebSocketPackets_;
TelemetryStream()->NotifyEndPoint(SerialNumberInt_, SS.str());
TelemetryStream()->NotifyEndPoint(SerialNumberInt_, *KafkaPayload);
} else {
StopWebSocketTelemetry(CommandManager()->Next_RPC_ID());
}
@@ -49,7 +50,7 @@ namespace OpenWifi {
TelemetryKafkaPackets_++;
State_.kafkaPackets = TelemetryKafkaPackets_;
KafkaManager()->PostMessage(KafkaTopics::DEVICE_TELEMETRY, SerialNumber_,
SS.str());
KafkaPayload);
} else {
StopKafkaTelemetry(CommandManager()->Next_RPC_ID());
}

View File

@@ -25,7 +25,7 @@ namespace OpenWifi {
Poco::JSON::Stringifier Stringify;
std::ostringstream OS;
Stringify.condense(ParamsObj, OS);
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, OS.str());
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, std::make_shared<std::string>(OS.str()));
}
}
}

View File

@@ -236,7 +236,7 @@ namespace OpenWifi {
std::ostringstream OS;
FullEvent.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, "system",
OS.str());
std::make_shared<std::string>(OS.str()));
}
void AP_WS_Server::Stop() {

View File

@@ -14,7 +14,8 @@ namespace OpenWifi {
Event.set("payload", payload_);
std::ostringstream OS;
Event.stringify(OS);
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, Utils::IntToSerialNumber(serialNumber_), OS.str());
auto payload = std::make_shared<std::string>(OS.str());
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, Utils::IntToSerialNumber(serialNumber_), payload);
}
}

View File

@@ -1024,7 +1024,7 @@ namespace OpenWifi {
RESTAPI_RPC::WaitForCommand(CMD_RPC, APCommands::Commands::wifiscan, false, Cmd, Params,
*Request, *Response, timeout, nullptr, this, Logger_);
if (Cmd.ErrorCode == 0) {
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, Cmd.Results);
KafkaManager()->PostMessage(KafkaTopics::WIFISCAN, SerialNumber_, std::make_shared<std::string>(Cmd.Results));
}
}
@@ -1069,7 +1069,7 @@ namespace OpenWifi {
Logger_);
if (Cmd.ErrorCode == 0) {
KafkaManager()->PostMessage(KafkaTopics::DEVICE_EVENT_QUEUE, SerialNumber_,
Cmd.Results);
std::make_shared<std::string>(Cmd.Results));
}
return;
}

View File

@@ -14,18 +14,18 @@ namespace OpenWifi {
void EventBusManager::run() {
Running_ = true;
Utils::SetThreadName("fmwk:EventMgr");
auto Msg = MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN);
auto Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_JOIN));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
while (Running_) {
Poco::Thread::trySleep((unsigned long)MicroServiceDaemonBusTimer());
if (!Running_)
break;
Msg = MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE);
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(),
Msg, false);
}
Msg = MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE);
Msg = std::make_shared<std::string>(MicroServiceMakeSystemEventMessage(KafkaTopics::ServiceEvents::EVENT_LEAVE));
KafkaManager()->PostMessage(KafkaTopics::SERVICE_EVENTS, MicroServicePrivateEndPoint(), Msg,
false);
};

View File

@@ -180,7 +180,7 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic(), Msg.get_key(), Msg.get_payload());
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared<std::string>(Msg.get_payload()));
if (!AutoCommit)
Consumer.async_commit(Msg);
}
@@ -212,8 +212,8 @@ namespace OpenWifi {
}
}
void KafkaProducer::Produce(const std::string &Topic, const std::string &Key,
const std::string &Payload) {
void KafkaProducer::Produce(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
@@ -275,8 +275,8 @@ namespace OpenWifi {
}
}
void KafkaDispatcher::Dispatch(const std::string &Topic, const std::string &Key,
const std::string &Payload) {
void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
@@ -332,20 +332,21 @@ namespace OpenWifi {
}
}
void KafkaManager::PostMessage(const std::string &topic, const std::string &key,
const std::string &PayLoad, bool WrapMessage) {
void KafkaManager::PostMessage(const char *topic, const std::string &key,
const std::shared_ptr<std::string> PayLoad, bool WrapMessage) {
if (KafkaEnabled_) {
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}
void KafkaManager::Dispatch(const std::string &Topic, const std::string &Key,
const std::string &Payload) {
void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
const std::shared_ptr<std::string> Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string &PayLoad) {
return SystemInfoWrapper_ + PayLoad + "}";
[[nodiscard]] const std::shared_ptr<std::string> KafkaManager::WrapSystemId(const std::shared_ptr<std::string> PayLoad) {
*PayLoad = SystemInfoWrapper_ + *PayLoad + "}";
return PayLoad;
}
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,

View File

@@ -18,17 +18,17 @@ namespace OpenWifi {
class KafkaMessage : public Poco::Notification {
public:
KafkaMessage(const std::string &Topic, const std::string &Key, const std::string &Payload)
: Topic_(Topic), Key_(Key), Payload_(Payload) {}
KafkaMessage(const char * Topic, const std::string &Key, const std::shared_ptr<std::string> Payload)
: Topic_(Topic), Key_(Key), Payload_(std::move(Payload)) {}
inline const std::string &Topic() { return Topic_; }
inline const char * Topic() { return Topic_; }
inline const std::string &Key() { return Key_; }
inline const std::string &Payload() { return Payload_; }
inline const std::string &Payload() { return *Payload_; }
private:
std::string Topic_;
const char *Topic_;
std::string Key_;
std::string Payload_;
std::shared_ptr<std::string> Payload_;
};
class KafkaProducer : public Poco::Runnable {
@@ -36,7 +36,7 @@ namespace OpenWifi {
void run() override;
void Start();
void Stop();
void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
void Produce(const char *Topic, const std::string &Key, const std::shared_ptr<std::string> Payload);
private:
std::recursive_mutex Mutex_;
@@ -63,7 +63,7 @@ namespace OpenWifi {
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
void Dispatch(const std::string &Topic, const std::string &Key, const std::string &Payload);
void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr<std::string> Payload);
void run() override;
void Topics(std::vector<std::string> &T);
@@ -91,10 +91,10 @@ namespace OpenWifi {
int Start() override;
void Stop() override;
void PostMessage(const std::string &topic, const std::string &key,
const std::string &PayLoad, bool WrapMessage = true);
void Dispatch(const std::string &Topic, const std::string &Key, const std::string &Payload);
[[nodiscard]] std::string WrapSystemId(const std::string &PayLoad);
void PostMessage(const char *topic, const std::string &key,
const std::shared_ptr<std::string> PayLoad, bool WrapMessage = true);
void Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr<std::string> Payload);
[[nodiscard]] const std::shared_ptr<std::string> WrapSystemId(const std::shared_ptr<std::string> PayLoad);
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id);

View File

@@ -10,32 +10,32 @@
#include <string>
namespace OpenWifi::KafkaTopics {
static const std::string HEALTHCHECK{"healthcheck"};
static const std::string STATE{"state"};
static const std::string CONNECTION{"connection"};
static const std::string WIFISCAN{"wifiscan"};
static const std::string ALERTS{"alerts"};
static const std::string COMMAND{"command"};
static const std::string SERVICE_EVENTS{"service_events"};
static const std::string DEVICE_EVENT_QUEUE{"device_event_queue"};
static const std::string DEVICE_TELEMETRY{"device_telemetry"};
static const std::string PROVISIONING_CHANGE{"provisioning_change"};
static const char * HEALTHCHECK = "healthcheck";
static const char * STATE = "state";
static const char * CONNECTION = "connection";
static const char * WIFISCAN = "wifiscan";
static const char * ALERTS = "alerts";
static const char * COMMAND = "command";
static const char * SERVICE_EVENTS = "service_events";
static const char * DEVICE_EVENT_QUEUE = "device_event_queue";
static const char * DEVICE_TELEMETRY = "device_telemetry";
static const char * PROVISIONING_CHANGE = "provisioning_change";
namespace ServiceEvents {
static const std::string EVENT_JOIN{"join"};
static const std::string EVENT_LEAVE{"leave"};
static const std::string EVENT_KEEP_ALIVE{"keep-alive"};
static const std::string EVENT_REMOVE_TOKEN{"remove-token"};
static const char * EVENT_JOIN = "join";
static const char * EVENT_LEAVE = "leave";
static const char * EVENT_KEEP_ALIVE = "keep-alive";
static const char * EVENT_REMOVE_TOKEN = "remove-token";
namespace Fields {
static const std::string EVENT{"event"};
static const std::string ID{"id"};
static const std::string TYPE{"type"};
static const std::string PUBLIC{"publicEndPoint"};
static const std::string PRIVATE{"privateEndPoint"};
static const std::string KEY{"key"};
static const std::string VRSN{"version"};
static const std::string TOKEN{"token"};
static const char * EVENT = "event";
static const char * ID = "id";
static const char * TYPE = "type";
static const char * PUBLIC = "publicEndPoint";
static const char * PRIVATE = "privateEndPoint";
static const char * KEY = "key";
static const char * VRSN = "version";
static const char * TOKEN = "token";
} // namespace Fields
} // namespace ServiceEvents
} // namespace OpenWifi::KafkaTopics

View File

@@ -47,11 +47,11 @@ namespace OpenWifi {
void MicroServiceReload(const std::string &Type) { MicroService::instance().Reload(Type); }
const Types::StringVec MicroServiceGetLogLevelNames() {
Types::StringVec MicroServiceGetLogLevelNames() {
return MicroService::instance().GetLogLevelNames();
}
const Types::StringVec MicroServiceGetSubSystems() {
Types::StringVec MicroServiceGetSubSystems() {
return MicroService::instance().GetSubSystems();
}
@@ -79,7 +79,7 @@ namespace OpenWifi {
std::string MicroServiceGetUIURI() { return MicroService::instance().GetUIURI(); }
const SubSystemVec MicroServiceGetFullSubSystems() {
SubSystemVec MicroServiceGetFullSubSystems() {
return MicroService::instance().GetFullSubSystems();
}
@@ -87,7 +87,7 @@ namespace OpenWifi {
std::uint64_t MicroServiceDaemonBusTimer() { return MicroService::instance().DaemonBusTimer(); }
std::string MicroServiceMakeSystemEventMessage(const std::string &Type) {
std::string MicroServiceMakeSystemEventMessage(const char *Type) {
return MicroService::instance().MakeSystemEventMessage(Type);
}

View File

@@ -31,8 +31,8 @@ namespace OpenWifi {
void MicroServiceLoadConfigurationFile();
void MicroServiceReload();
void MicroServiceReload(const std::string &Type);
const Types::StringVec MicroServiceGetLogLevelNames();
const Types::StringVec MicroServiceGetSubSystems();
Types::StringVec MicroServiceGetLogLevelNames();
Types::StringVec MicroServiceGetSubSystems();
Types::StringPairVec MicroServiceGetLogLevels();
bool MicroServiceSetSubsystemLogLevel(const std::string &SubSystem, const std::string &Level);
void MicroServiceGetExtraConfiguration(Poco::JSON::Object &Answer);
@@ -40,10 +40,10 @@ namespace OpenWifi {
std::uint64_t MicroServiceUptimeTotalSeconds();
std::uint64_t MicroServiceStartTimeEpochTime();
std::string MicroServiceGetUIURI();
const SubSystemVec MicroServiceGetFullSubSystems();
SubSystemVec MicroServiceGetFullSubSystems();
std::string MicroServiceCreateUUID();
std::uint64_t MicroServiceDaemonBusTimer();
std::string MicroServiceMakeSystemEventMessage(const std::string &Type);
std::string MicroServiceMakeSystemEventMessage(const char *Type);
Poco::ThreadPool &MicroServiceTimerPool();
std::string MicroServiceConfigPath(const std::string &Key, const std::string &DefaultValue);
std::string MicroServiceWWWAssetsDir();

View File

@@ -590,7 +590,7 @@ namespace OpenWifi {
Message.set("timestamp", Utils::Now());
std::ostringstream StrPayload;
Message.stringify(StrPayload);
KafkaManager()->PostMessage(KafkaTopics::COMMAND, SerialNumber, StrPayload.str());
KafkaManager()->PostMessage(KafkaTopics::COMMAND, SerialNumber, std::make_shared<std::string>(StrPayload.str()));
}
return true;