From 95a35404a17fbecabedf240bcb9e1e5455d2c17e Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Mon, 19 Jul 2021 08:35:17 -0700 Subject: [PATCH] Adding processing for connection event --- openapi/ucentralfws.yaml | 6 +++++ src/NewConnectionHandler.cpp | 48 +++++++++++++++++++++++++----------- src/NewConnectionHandler.h | 3 ++- src/RESTAPI_FMSObjects.cpp | 2 ++ src/RESTAPI_FMSObjects.h | 1 + src/StorageService.h | 1 + src/storage_deviceInfo.cpp | 34 ++++++++++++++++++++++--- src/storage_deviceInfo.h | 10 +++++--- src/uCentralProtocol.h | 14 ++++++++++- 9 files changed, 96 insertions(+), 23 deletions(-) diff --git a/openapi/ucentralfws.yaml b/openapi/ucentralfws.yaml index c5efd24..f36f9e2 100644 --- a/openapi/ucentralfws.yaml +++ b/openapi/ucentralfws.yaml @@ -191,6 +191,12 @@ components: lastUpdate: type: integer format: uint64 + status: + type: string + enum: + - connected + - disconnected + - unknown DeviceConnectionInformationList: type: object diff --git a/src/NewConnectionHandler.cpp b/src/NewConnectionHandler.cpp index e7b94a1..8e0dd88 100644 --- a/src/NewConnectionHandler.cpp +++ b/src/NewConnectionHandler.cpp @@ -11,6 +11,7 @@ #include "StorageService.h" #include "LatestFirmwareCache.h" #include "Utils.h" +#include "uCentralProtocol.h" /* @@ -50,20 +51,20 @@ namespace uCentral { std::string EndPoint; - if(Object->has("system")) { - auto SystemObj = Object->getObject("system"); - if(SystemObj->has("host")) - EndPoint = SystemObj->get("host").toString(); + if(Object->has(uCentralProtocol::SYSTEM)) { + auto SystemObj = Object->getObject(uCentralProtocol::SYSTEM); + if(SystemObj->has(uCentralProtocol::HOST)) + EndPoint = SystemObj->get(uCentralProtocol::HOST).toString(); } - if(Object->has("payload")) { - auto PayloadObj = Object->getObject("payload"); - if(PayloadObj->has("capabilities")) { - auto CapObj = PayloadObj->getObject("capabilities"); - if(CapObj->has("compatible")) { - DeviceType = CapObj->get("compatible").toString(); - Serial = PayloadObj->get("serial").toString(); - Revision = PayloadObj->get("firmware").toString(); + if(Object->has(uCentralProtocol::PAYLOAD)) { + auto PayloadObj = Object->getObject(uCentralProtocol::PAYLOAD); + if(PayloadObj->has(uCentralProtocol::CAPABILITIES)) { + auto CapObj = PayloadObj->getObject(uCentralProtocol::CAPABILITIES); + if(CapObj->has(uCentralProtocol::COMPATIBLE)) { + DeviceType = CapObj->get(uCentralProtocol::COMPATIBLE).toString(); + Serial = PayloadObj->get(uCentralProtocol::SERIAL).toString(); + Revision = PayloadObj->get(uCentralProtocol::FIRMWARE).toString(); std::cout << "ConnectionEvent: SerialNumber: " << SerialNumber << " DeviceType: " << DeviceType << " Revision:" << Revision << std::endl; FMSObjects::FirmwareAgeDetails FA; if(Storage()->ComputeFirmwareAge(DeviceType, Revision, FA)) { @@ -74,6 +75,23 @@ namespace uCentral { Logger_.information(Poco::format("Device %s connection. Firmware age cannot be determined",SerialNumber)); } } + } else if(PayloadObj->has(uCentralProtocol::DISCONNECTION)) { + auto DisconnectMessage = PayloadObj->getObject(uCentralProtocol::DISCONNECTION); + if(DisconnectMessage->has(uCentralProtocol::SERIAL) && DisconnectMessage->has(uCentralProtocol::TIMESTAMP)) { + auto SNum = DisconnectMessage->get(uCentralProtocol::SERIALNUMBER).toString(); + auto Timestamp = DisconnectMessage->get(uCentralProtocol::TIMESTAMP); + Storage()->SetDeviceDisconnected(SNum,EndPoint); + } + } else if(PayloadObj->has(uCentralProtocol::PING)) { + auto PingMessage = PayloadObj->getObject(uCentralProtocol::PING); + if( PingMessage->has(uCentralProtocol::FIRMWARE) && + PingMessage->has(uCentralProtocol::SERIALNUMBER) && + PingMessage->has(uCentralProtocol::COMPATIBLE)) { + auto Revision = PingMessage->get(uCentralProtocol::FIRMWARE).toString(); + auto SerialNUmber = PingMessage->get( uCentralProtocol::SERIALNUMBER).toString(); + auto DeviceType = PingMessage->get( uCentralProtocol::COMPATIBLE).toString(); + Storage()->SetDeviceRevision(SerialNumber, Revision, DeviceType, EndPoint); + } } } } @@ -82,13 +100,15 @@ namespace uCentral { int NewConnectionHandler::Start() { Types::TopicNotifyFunction F = [this](std::string s1,std::string s2) { this->ConnectionReceived(s1,s2); }; - WatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F); + ConnectionWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F); + HealthcheckWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::HEALTHCHECK, F); Worker_.start(*this); return 0; }; void NewConnectionHandler::Stop() { - KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, WatcherId_); + KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, ConnectionWatcherId_); + KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, HealthcheckWatcherId_); Running_ = false; Worker_.wakeUp(); Worker_.join(); diff --git a/src/NewConnectionHandler.h b/src/NewConnectionHandler.h index 5c9d3ee..6ccf2c2 100644 --- a/src/NewConnectionHandler.h +++ b/src/NewConnectionHandler.h @@ -32,7 +32,8 @@ namespace uCentral { static NewConnectionHandler *instance_; Poco::Thread Worker_; std::atomic_bool Running_ = false; - int WatcherId_=0; + int ConnectionWatcherId_=0; + int HealthcheckWatcherId_=0; Types::StringPairQueue NewConnections_; NewConnectionHandler() noexcept: diff --git a/src/RESTAPI_FMSObjects.cpp b/src/RESTAPI_FMSObjects.cpp index 70e7724..e2578d8 100644 --- a/src/RESTAPI_FMSObjects.cpp +++ b/src/RESTAPI_FMSObjects.cpp @@ -190,6 +190,7 @@ namespace uCentral::FMSObjects { field_to_json(Obj, "deviceType", deviceType); field_to_json(Obj, "endPoint", endPoint); field_to_json(Obj, "lastUpdate", lastUpdate); + field_to_json(Obj, "status", status); } bool DeviceConnectionInformation::from_json(const Poco::JSON::Object::Ptr &Obj) { @@ -199,6 +200,7 @@ namespace uCentral::FMSObjects { field_from_json(Obj, "deviceType", deviceType); field_from_json(Obj, "endPoint", endPoint); field_from_json(Obj, "lastUpdate", lastUpdate); + field_from_json(Obj, "status", status); return true; } catch(...) { diff --git a/src/RESTAPI_FMSObjects.h b/src/RESTAPI_FMSObjects.h index 704f296..913472e 100644 --- a/src/RESTAPI_FMSObjects.h +++ b/src/RESTAPI_FMSObjects.h @@ -105,6 +105,7 @@ namespace uCentral::FMSObjects { std::string deviceType; std::string endPoint; uint64_t lastUpdate; + std::string status; void to_json(Poco::JSON::Object &Obj) const; bool from_json(const Poco::JSON::Object::Ptr &Obj); }; diff --git a/src/StorageService.h b/src/StorageService.h index 1bd1e0d..395e435 100644 --- a/src/StorageService.h +++ b/src/StorageService.h @@ -74,6 +74,7 @@ namespace uCentral { bool GetDevices(uint64_t From, uint64_t HowMany, std::vector & Devices); bool GetDevice(std::string &SerialNumber, FMSObjects::DeviceConnectionInformation & Device); + bool SetDeviceDisconnected(std::string &SerialNumber, std::string &EndPoint); static Storage *instance() { if (instance_ == nullptr) { diff --git a/src/storage_deviceInfo.cpp b/src/storage_deviceInfo.cpp index 72680df..a66026c 100644 --- a/src/storage_deviceInfo.cpp +++ b/src/storage_deviceInfo.cpp @@ -9,7 +9,8 @@ "revision=?, " "deviceType=?, " "endPoint=?, " - "lastUpdate=? " + "lastUpdate=?, " + "status=? */ namespace uCentral { @@ -19,6 +20,7 @@ namespace uCentral { F.deviceType = T.get<2>(); F.endPoint = T.get<3>(); F.lastUpdate = T.get<4>(); + F.status = T.get<5>(); return true; } @@ -28,6 +30,7 @@ namespace uCentral { T.set<2>(F.deviceType); T.set<3>(F.endPoint); T.set<4>(F.lastUpdate); + T.set<5>(F.status); return true; } @@ -58,14 +61,15 @@ namespace uCentral { if(!DeviceExists) { std::string st{"INSERT INTO " + DBNAME_DEVICES + " (" + DBFIELDS_DEVICES_SELECT + - ") VALUES(?,?,?,?,?)"}; + ") VALUES(?,?,?,?,?,?)"}; Logger_.information(Poco::format("New device '%s' connected", SerialNumber)); FMSObjects::DeviceConnectionInformation DI{ .serialNumber = SerialNumber, .revision = Revision, .deviceType = DeviceType, .endPoint = EndPoint, - .lastUpdate = (uint64_t)std::time(nullptr)}; + .lastUpdate = (uint64_t)std::time(nullptr), + .status = "connected"}; DevicesRecordList InsertRecords; DevicesRecord R; @@ -79,11 +83,12 @@ namespace uCentral { uint64_t Now = (uint64_t)std::time(nullptr); // std::cout << "Updating device: " << SerialNumber << std::endl; - std::string st{"UPDATE " + DBNAME_DEVICES + " set revision=?, lastUpdate=?, endpoint=? " + " where serialNumber=?"}; + std::string st{"UPDATE " + DBNAME_DEVICES + " set revision=?, lastUpdate=?, endpoint=?, status=? " + " where serialNumber=?"}; Update << ConvertParams(st) , Poco::Data::Keywords::use(Revision), Poco::Data::Keywords::use(Now), Poco::Data::Keywords::use(EndPoint), + "connected", Poco::Data::Keywords::use(SerialNumber); Update.execute(); @@ -98,6 +103,27 @@ namespace uCentral { } + bool Storage::SetDeviceDisconnected(std::string &SerialNumber, std::string &EndPoint) { + try { + Poco::Data::Session Sess = Pool_->get(); + Poco::Data::Statement Update(Sess); + uint64_t Now = (uint64_t)std::time(nullptr); + + // std::cout << "Updating device: " << SerialNumber << std::endl; + std::string st{"UPDATE " + DBNAME_DEVICES + " set lastUpdate=?, endpoint=?, status=? " + " where serialNumber=?"}; + Update << ConvertParams(st) , + Poco::Data::Keywords::use(Now), + Poco::Data::Keywords::use(EndPoint), + "disconnected", + Poco::Data::Keywords::use(SerialNumber); + Update.execute(); + } catch (const Poco::Exception &E) { + Logger_.log(E); + } + return false; + } + + bool Storage::GetDevices(uint64_t From, uint64_t HowMany, std::vector & Devices) { try { Poco::Data::Session Sess = Pool_->get(); diff --git a/src/storage_deviceInfo.h b/src/storage_deviceInfo.h index e69c4cd..35bcbef 100644 --- a/src/storage_deviceInfo.h +++ b/src/storage_deviceInfo.h @@ -15,7 +15,8 @@ namespace uCentral { "revision varchar, " "deviceType varchar, " "endPoint varchar, " - "lastUpdate bigint " + "lastUpdate bigint, " + "status varchar " }; static const std::string DBFIELDS_DEVICES_SELECT{ @@ -24,6 +25,7 @@ namespace uCentral { "deviceType, " "endPoint, " "lastUpdate " + "status " }; static const std::string DBFIELDS_DEVICES_UPDATE { @@ -31,7 +33,8 @@ namespace uCentral { "revision=?, " "deviceType=?, " "endPoint=?, " - "lastUpdate=? " + "lastUpdate=?, " + "status=? " }; typedef Poco::Tuple< @@ -39,7 +42,8 @@ namespace uCentral { std::string, std::string, std::string, - uint64_t> DevicesRecord; + uint64_t, + std::string> DevicesRecord; typedef std::vector DevicesRecordList; } diff --git a/src/uCentralProtocol.h b/src/uCentralProtocol.h index 8cf8acc..87c649d 100644 --- a/src/uCentralProtocol.h +++ b/src/uCentralProtocol.h @@ -79,6 +79,15 @@ namespace uCentral::uCentralProtocol { static const char * VERBOSE = "verbose"; static const char * BANDS = "bands"; static const char * CHANNELS = "channels"; + static const char * PASSWORD = "password"; + static const char * DEVICEUPDATE = "deviceupdate"; + + static const char * SERIALNUMBER = "serialNumber"; + static const char * COMPATIBLE = "compatible"; + static const char * DISCONNECTION = "disconnection"; + static const char * TIMESTAMP = "timestamp"; + static const char * SYSTEM = "system"; + static const char * HOST = "host"; enum EVENT_MSG { ET_UNKNOWN, @@ -89,7 +98,8 @@ namespace uCentral::uCentralProtocol { ET_CRASHLOG, ET_PING, ET_CFGPENDING, - ET_RECOVERY + ET_RECOVERY, + ET_DEVICEUPDATE }; static EVENT_MSG EventFromString(const std::string & Method) { @@ -109,6 +119,8 @@ namespace uCentral::uCentralProtocol { return ET_CFGPENDING; } else if (!Poco::icompare(Method, RECOVERY)) { return ET_RECOVERY; + } else if (!Poco::icompare(Method, DEVICEUPDATE)) { + return ET_DEVICEUPDATE; } else return ET_UNKNOWN; };