Adding processing for connection event

This commit is contained in:
stephb9959
2021-07-19 08:35:17 -07:00
parent 93a51b2472
commit 95a35404a1
9 changed files with 96 additions and 23 deletions

View File

@@ -191,6 +191,12 @@ components:
lastUpdate:
type: integer
format: uint64
status:
type: string
enum:
- connected
- disconnected
- unknown
DeviceConnectionInformationList:
type: object

View File

@@ -11,6 +11,7 @@
#include "StorageService.h"
#include "LatestFirmwareCache.h"
#include "Utils.h"
#include "uCentralProtocol.h"
/*
@@ -50,20 +51,20 @@ namespace uCentral {
std::string EndPoint;
if(Object->has("system")) {
auto SystemObj = Object->getObject("system");
if(SystemObj->has("host"))
EndPoint = SystemObj->get("host").toString();
if(Object->has(uCentralProtocol::SYSTEM)) {
auto SystemObj = Object->getObject(uCentralProtocol::SYSTEM);
if(SystemObj->has(uCentralProtocol::HOST))
EndPoint = SystemObj->get(uCentralProtocol::HOST).toString();
}
if(Object->has("payload")) {
auto PayloadObj = Object->getObject("payload");
if(PayloadObj->has("capabilities")) {
auto CapObj = PayloadObj->getObject("capabilities");
if(CapObj->has("compatible")) {
DeviceType = CapObj->get("compatible").toString();
Serial = PayloadObj->get("serial").toString();
Revision = PayloadObj->get("firmware").toString();
if(Object->has(uCentralProtocol::PAYLOAD)) {
auto PayloadObj = Object->getObject(uCentralProtocol::PAYLOAD);
if(PayloadObj->has(uCentralProtocol::CAPABILITIES)) {
auto CapObj = PayloadObj->getObject(uCentralProtocol::CAPABILITIES);
if(CapObj->has(uCentralProtocol::COMPATIBLE)) {
DeviceType = CapObj->get(uCentralProtocol::COMPATIBLE).toString();
Serial = PayloadObj->get(uCentralProtocol::SERIAL).toString();
Revision = PayloadObj->get(uCentralProtocol::FIRMWARE).toString();
std::cout << "ConnectionEvent: SerialNumber: " << SerialNumber << " DeviceType: " << DeviceType << " Revision:" << Revision << std::endl;
FMSObjects::FirmwareAgeDetails FA;
if(Storage()->ComputeFirmwareAge(DeviceType, Revision, FA)) {
@@ -74,6 +75,23 @@ namespace uCentral {
Logger_.information(Poco::format("Device %s connection. Firmware age cannot be determined",SerialNumber));
}
}
} else if(PayloadObj->has(uCentralProtocol::DISCONNECTION)) {
auto DisconnectMessage = PayloadObj->getObject(uCentralProtocol::DISCONNECTION);
if(DisconnectMessage->has(uCentralProtocol::SERIAL) && DisconnectMessage->has(uCentralProtocol::TIMESTAMP)) {
auto SNum = DisconnectMessage->get(uCentralProtocol::SERIALNUMBER).toString();
auto Timestamp = DisconnectMessage->get(uCentralProtocol::TIMESTAMP);
Storage()->SetDeviceDisconnected(SNum,EndPoint);
}
} else if(PayloadObj->has(uCentralProtocol::PING)) {
auto PingMessage = PayloadObj->getObject(uCentralProtocol::PING);
if( PingMessage->has(uCentralProtocol::FIRMWARE) &&
PingMessage->has(uCentralProtocol::SERIALNUMBER) &&
PingMessage->has(uCentralProtocol::COMPATIBLE)) {
auto Revision = PingMessage->get(uCentralProtocol::FIRMWARE).toString();
auto SerialNUmber = PingMessage->get( uCentralProtocol::SERIALNUMBER).toString();
auto DeviceType = PingMessage->get( uCentralProtocol::COMPATIBLE).toString();
Storage()->SetDeviceRevision(SerialNumber, Revision, DeviceType, EndPoint);
}
}
}
}
@@ -82,13 +100,15 @@ namespace uCentral {
int NewConnectionHandler::Start() {
Types::TopicNotifyFunction F = [this](std::string s1,std::string s2) { this->ConnectionReceived(s1,s2); };
WatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F);
ConnectionWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::CONNECTION, F);
HealthcheckWatcherId_ = KafkaManager()->RegisterTopicWatcher(KafkaTopics::HEALTHCHECK, F);
Worker_.start(*this);
return 0;
};
void NewConnectionHandler::Stop() {
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, WatcherId_);
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, ConnectionWatcherId_);
KafkaManager()->UnregisterTopicWatcher(KafkaTopics::CONNECTION, HealthcheckWatcherId_);
Running_ = false;
Worker_.wakeUp();
Worker_.join();

View File

@@ -32,7 +32,8 @@ namespace uCentral {
static NewConnectionHandler *instance_;
Poco::Thread Worker_;
std::atomic_bool Running_ = false;
int WatcherId_=0;
int ConnectionWatcherId_=0;
int HealthcheckWatcherId_=0;
Types::StringPairQueue NewConnections_;
NewConnectionHandler() noexcept:

View File

@@ -190,6 +190,7 @@ namespace uCentral::FMSObjects {
field_to_json(Obj, "deviceType", deviceType);
field_to_json(Obj, "endPoint", endPoint);
field_to_json(Obj, "lastUpdate", lastUpdate);
field_to_json(Obj, "status", status);
}
bool DeviceConnectionInformation::from_json(const Poco::JSON::Object::Ptr &Obj) {
@@ -199,6 +200,7 @@ namespace uCentral::FMSObjects {
field_from_json(Obj, "deviceType", deviceType);
field_from_json(Obj, "endPoint", endPoint);
field_from_json(Obj, "lastUpdate", lastUpdate);
field_from_json(Obj, "status", status);
return true;
} catch(...) {

View File

@@ -105,6 +105,7 @@ namespace uCentral::FMSObjects {
std::string deviceType;
std::string endPoint;
uint64_t lastUpdate;
std::string status;
void to_json(Poco::JSON::Object &Obj) const;
bool from_json(const Poco::JSON::Object::Ptr &Obj);
};

View File

@@ -74,6 +74,7 @@ namespace uCentral {
bool GetDevices(uint64_t From, uint64_t HowMany, std::vector<FMSObjects::DeviceConnectionInformation> & Devices);
bool GetDevice(std::string &SerialNumber, FMSObjects::DeviceConnectionInformation & Device);
bool SetDeviceDisconnected(std::string &SerialNumber, std::string &EndPoint);
static Storage *instance() {
if (instance_ == nullptr) {

View File

@@ -9,7 +9,8 @@
"revision=?, "
"deviceType=?, "
"endPoint=?, "
"lastUpdate=? "
"lastUpdate=?, "
"status=?
*/
namespace uCentral {
@@ -19,6 +20,7 @@ namespace uCentral {
F.deviceType = T.get<2>();
F.endPoint = T.get<3>();
F.lastUpdate = T.get<4>();
F.status = T.get<5>();
return true;
}
@@ -28,6 +30,7 @@ namespace uCentral {
T.set<2>(F.deviceType);
T.set<3>(F.endPoint);
T.set<4>(F.lastUpdate);
T.set<5>(F.status);
return true;
}
@@ -58,14 +61,15 @@ namespace uCentral {
if(!DeviceExists) {
std::string st{"INSERT INTO " + DBNAME_DEVICES + " (" +
DBFIELDS_DEVICES_SELECT +
") VALUES(?,?,?,?,?)"};
") VALUES(?,?,?,?,?,?)"};
Logger_.information(Poco::format("New device '%s' connected", SerialNumber));
FMSObjects::DeviceConnectionInformation DI{
.serialNumber = SerialNumber,
.revision = Revision,
.deviceType = DeviceType,
.endPoint = EndPoint,
.lastUpdate = (uint64_t)std::time(nullptr)};
.lastUpdate = (uint64_t)std::time(nullptr),
.status = "connected"};
DevicesRecordList InsertRecords;
DevicesRecord R;
@@ -79,11 +83,12 @@ namespace uCentral {
uint64_t Now = (uint64_t)std::time(nullptr);
// std::cout << "Updating device: " << SerialNumber << std::endl;
std::string st{"UPDATE " + DBNAME_DEVICES + " set revision=?, lastUpdate=?, endpoint=? " + " where serialNumber=?"};
std::string st{"UPDATE " + DBNAME_DEVICES + " set revision=?, lastUpdate=?, endpoint=?, status=? " + " where serialNumber=?"};
Update << ConvertParams(st) ,
Poco::Data::Keywords::use(Revision),
Poco::Data::Keywords::use(Now),
Poco::Data::Keywords::use(EndPoint),
"connected",
Poco::Data::Keywords::use(SerialNumber);
Update.execute();
@@ -98,6 +103,27 @@ namespace uCentral {
}
bool Storage::SetDeviceDisconnected(std::string &SerialNumber, std::string &EndPoint) {
try {
Poco::Data::Session Sess = Pool_->get();
Poco::Data::Statement Update(Sess);
uint64_t Now = (uint64_t)std::time(nullptr);
// std::cout << "Updating device: " << SerialNumber << std::endl;
std::string st{"UPDATE " + DBNAME_DEVICES + " set lastUpdate=?, endpoint=?, status=? " + " where serialNumber=?"};
Update << ConvertParams(st) ,
Poco::Data::Keywords::use(Now),
Poco::Data::Keywords::use(EndPoint),
"disconnected",
Poco::Data::Keywords::use(SerialNumber);
Update.execute();
} catch (const Poco::Exception &E) {
Logger_.log(E);
}
return false;
}
bool Storage::GetDevices(uint64_t From, uint64_t HowMany, std::vector<FMSObjects::DeviceConnectionInformation> & Devices) {
try {
Poco::Data::Session Sess = Pool_->get();

View File

@@ -15,7 +15,8 @@ namespace uCentral {
"revision varchar, "
"deviceType varchar, "
"endPoint varchar, "
"lastUpdate bigint "
"lastUpdate bigint, "
"status varchar "
};
static const std::string DBFIELDS_DEVICES_SELECT{
@@ -24,6 +25,7 @@ namespace uCentral {
"deviceType, "
"endPoint, "
"lastUpdate "
"status "
};
static const std::string DBFIELDS_DEVICES_UPDATE {
@@ -31,7 +33,8 @@ namespace uCentral {
"revision=?, "
"deviceType=?, "
"endPoint=?, "
"lastUpdate=? "
"lastUpdate=?, "
"status=? "
};
typedef Poco::Tuple<
@@ -39,7 +42,8 @@ namespace uCentral {
std::string,
std::string,
std::string,
uint64_t> DevicesRecord;
uint64_t,
std::string> DevicesRecord;
typedef std::vector<DevicesRecord> DevicesRecordList;
}

View File

@@ -79,6 +79,15 @@ namespace uCentral::uCentralProtocol {
static const char * VERBOSE = "verbose";
static const char * BANDS = "bands";
static const char * CHANNELS = "channels";
static const char * PASSWORD = "password";
static const char * DEVICEUPDATE = "deviceupdate";
static const char * SERIALNUMBER = "serialNumber";
static const char * COMPATIBLE = "compatible";
static const char * DISCONNECTION = "disconnection";
static const char * TIMESTAMP = "timestamp";
static const char * SYSTEM = "system";
static const char * HOST = "host";
enum EVENT_MSG {
ET_UNKNOWN,
@@ -89,7 +98,8 @@ namespace uCentral::uCentralProtocol {
ET_CRASHLOG,
ET_PING,
ET_CFGPENDING,
ET_RECOVERY
ET_RECOVERY,
ET_DEVICEUPDATE
};
static EVENT_MSG EventFromString(const std::string & Method) {
@@ -109,6 +119,8 @@ namespace uCentral::uCentralProtocol {
return ET_CFGPENDING;
} else if (!Poco::icompare(Method, RECOVERY)) {
return ET_RECOVERY;
} else if (!Poco::icompare(Method, DEVICEUPDATE)) {
return ET_DEVICEUPDATE;
} else
return ET_UNKNOWN;
};