mirror of
https://github.com/outbackdingo/sysadm.git
synced 2026-01-27 18:20:23 +00:00
Finish up the overhaul of the Events subsystem and start adding in the Life Preserver event handling/systems.
This commit is contained in:
@@ -25,7 +25,13 @@ void EventWatcher::start(){
|
||||
WatcherUpdate(DISPATCHWORKING); //load it initially
|
||||
}
|
||||
|
||||
QString EventWatcher::lastEvent(EVENT_TYPE type){
|
||||
EventWatcher::EVENT_TYPE EventWatcher::typeFromString(QString typ){
|
||||
if(typ=="dispatcher"){ return DISPATCHER; }
|
||||
else if(typ=="life-preserver"){ return LIFEPRESERVER; }
|
||||
else{ return BADEVENT; }
|
||||
}
|
||||
|
||||
QJsonValue EventWatcher::lastEvent(EVENT_TYPE type){
|
||||
if(HASH.contains(type)){ return HASH.value(type); }
|
||||
else{ return ""; }
|
||||
}
|
||||
@@ -51,6 +57,15 @@ void EventWatcher::WatcherUpdate(QString path){
|
||||
HASH.insert(DISPATCHER,stat); //save for later
|
||||
//Forward those contents on to the currently-open sockets
|
||||
emit NewEvent(DISPATCHER, stat);
|
||||
}else if(path==LPLOG){
|
||||
//Main Life Preserver Log File
|
||||
|
||||
}else if(path==LPERRLOG){
|
||||
//Life Preserver Error log
|
||||
|
||||
}else if(path==tmpLPRepFile){
|
||||
//Life Preserver Replication Log (currently-running replication)
|
||||
|
||||
}
|
||||
|
||||
//Make sure this file/dir is not removed from the watcher
|
||||
|
||||
@@ -9,23 +9,33 @@
|
||||
#include "globals-qt.h"
|
||||
|
||||
#define DISPATCHWORKING QString("/var/tmp/appcafe/dispatch-queue.working")
|
||||
#define LPLOG QString("/var/log/lpreserver/lpreserver.log")
|
||||
#define LPERRLOG QString("/var/log/lpreserver/error.log")
|
||||
#define LPREPLOGDIR QString("/var/log/lpreserver/")
|
||||
|
||||
class EventWatcher : public QObject{
|
||||
Q_OBJECT
|
||||
public:
|
||||
//Add more event types here as needed
|
||||
enum EVENT_TYPE{ DISPATCHER };
|
||||
enum EVENT_TYPE{ BADEVENT, DISPATCHER, LIFEPRESERVER};
|
||||
|
||||
EventWatcher();
|
||||
~EventWatcher();
|
||||
|
||||
void start();
|
||||
|
||||
QString lastEvent(EVENT_TYPE type);
|
||||
//Convert a string into the type flag
|
||||
static EVENT_TYPE typeFromString(QString);
|
||||
|
||||
//Retrieve the most recent event message for a particular type of event
|
||||
QJsonValue lastEvent(EVENT_TYPE type);
|
||||
|
||||
private:
|
||||
QFileSystemWatcher *watcher;
|
||||
QHash<EVENT_TYPE, QString> HASH;
|
||||
QHash<unsigned int, QJsonValue> HASH;
|
||||
//HASH Note: Fields 1-99 reserved for EVENT_TYPE enum (last message of that type)
|
||||
// Fields 100-199 reserved for Life Preserver logs (all types)
|
||||
QString tmpLPRepFile;
|
||||
|
||||
QString readFile(QString path);
|
||||
|
||||
@@ -36,6 +46,6 @@ private slots:
|
||||
void WatcherUpdate(QString);
|
||||
|
||||
signals:
|
||||
void NewEvent(EVENT_TYPE ev, QString msg); //type/message
|
||||
void NewEvent(EVENT_TYPE ev, QJsonValue); //type/message
|
||||
};
|
||||
#endif
|
||||
|
||||
@@ -5,12 +5,7 @@
|
||||
// =================================
|
||||
#ifndef _PCBSD_REST_SERVER_REST_STRUCTS_H
|
||||
#define _PCBSD_REST_SERVER_REST_STRUCTS_H
|
||||
#include <QString>
|
||||
#include <QStringList>
|
||||
#include <QDateTime>
|
||||
#include <QJsonValue>
|
||||
#include <QJsonDocument>
|
||||
#include <QJsonObject>
|
||||
#include "globals-qt.h"
|
||||
|
||||
#define CurHttpVersion QString("HTTP/1.1")
|
||||
|
||||
@@ -33,9 +28,12 @@ public:
|
||||
//Raw Text
|
||||
QStringList Header; //REST Headers
|
||||
QString Body; //Everything else
|
||||
//User Permissions level
|
||||
bool fullaccess;
|
||||
|
||||
RestInputStruct(QString message = ""){
|
||||
HTTPVERSION = CurHttpVersion; //default value
|
||||
fullaccess = false;
|
||||
if(message.isEmpty()){ return; }
|
||||
//Pull out any REST headers
|
||||
Body = message;
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
#define DEBUG 0
|
||||
#define SCLISTDELIM QString("::::") //SysCache List Delimiter
|
||||
RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(QJsonObject *out){
|
||||
RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(bool allaccess, QJsonObject *out){
|
||||
//Probe the various subsystems to see what is available through this server
|
||||
//Output format:
|
||||
/*<out>{
|
||||
@@ -26,7 +26,7 @@ RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(QJsonObject *out){
|
||||
<namespace2/name2> : <read/write/other>,
|
||||
}
|
||||
*/
|
||||
bool allaccess = AUTHSYSTEM->hasFullAccess(SockAuthToken);
|
||||
//bool allaccess = AUTHSYSTEM->hasFullAccess(SockAuthToken);
|
||||
// - syscache
|
||||
if(QFile::exists("/var/run/syscache.pipe")){
|
||||
out->insert("rpc/syscache","read"); //no write to syscache - only reads
|
||||
@@ -47,25 +47,25 @@ RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(QJsonObject *out){
|
||||
return RestOutputStruct::OK;
|
||||
}
|
||||
|
||||
RestOutputStruct::ExitCode WebSocket::EvaluateBackendRequest(QString namesp, QString name, const QJsonValue args, QJsonObject *out){
|
||||
RestOutputStruct::ExitCode WebSocket::EvaluateBackendRequest(const RestInputStruct &IN, QJsonObject *out){
|
||||
/*Inputs:
|
||||
"namesp" - namespace for the request
|
||||
"name" - name of the request
|
||||
"args" - JSON input arguments structure
|
||||
"out" - JSON output arguments structure
|
||||
*/
|
||||
namesp = namesp.toLower(); name = name.toLower();
|
||||
QString namesp = IN.namesp.toLower(); QString name = IN.name.toLower();
|
||||
//Go through and forward this request to the appropriate sub-system
|
||||
if(namesp=="rpc" && name=="query"){
|
||||
return AvailableSubsystems(out);
|
||||
return AvailableSubsystems(IN.fullaccess, out);
|
||||
}else if(namesp=="rpc" && name=="syscache"){
|
||||
return EvaluateSyscacheRequest(args, out);
|
||||
return EvaluateSyscacheRequest(IN.args, out);
|
||||
}else if(namesp=="rpc" && name=="dispatcher"){
|
||||
return EvaluateDispatcherRequest(args, out);
|
||||
return EvaluateDispatcherRequest(IN.args, out);
|
||||
}else if(namesp=="sysadm" && name=="network"){
|
||||
return EvaluateSysadmNetworkRequest(args, out);
|
||||
return EvaluateSysadmNetworkRequest(IN.args, out);
|
||||
}else if(namesp=="sysadm" && name=="lifepreserver"){
|
||||
return EvaluateSysadmLifePreserverRequest(args, out);
|
||||
return EvaluateSysadmLifePreserverRequest(IN.args, out);
|
||||
}else{
|
||||
return RestOutputStruct::BADREQUEST;
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ WebSocket::WebSocket(QWebSocket *sock, QString ID, AuthorizationManager *auth){
|
||||
SockAuthToken.clear(); //nothing set initially
|
||||
SOCKET = sock;
|
||||
TSOCKET = 0;
|
||||
SendAppCafeEvents = false;
|
||||
AUTHSYSTEM = auth;
|
||||
idletimer = new QTimer(this);
|
||||
idletimer->setInterval(IDLETIMEOUTMINS*60000); //connection timout for idle sockets
|
||||
@@ -32,7 +31,6 @@ WebSocket::WebSocket(QSslSocket *sock, QString ID, AuthorizationManager *auth){
|
||||
SockAuthToken.clear(); //nothing set initially
|
||||
TSOCKET = sock;
|
||||
SOCKET = 0;
|
||||
SendAppCafeEvents = false;
|
||||
AUTHSYSTEM = auth;
|
||||
idletimer = new QTimer(this);
|
||||
idletimer->setInterval(IDLETIMEOUTMINS*60000); //connection timout for idle sockets
|
||||
@@ -109,8 +107,6 @@ void WebSocket::EvaluateREST(QString msg){
|
||||
out.Header << "Accept: text/json";
|
||||
out.Header << "Content-Type: text/json; charset=utf-8";
|
||||
this->sendReply(out.assembleMessage());
|
||||
/* if(SOCKET!=0){ SOCKET->sendTextMessage(out.assembleMessage()); }
|
||||
else if(TSOCKET!=0){ TSOCKET->write(out.assembleMessage().toUtf8().data()); }*/
|
||||
}else{
|
||||
EvaluateRequest(IN);
|
||||
}
|
||||
@@ -172,9 +168,11 @@ void WebSocket::EvaluateRequest(const RestInputStruct &REQ){
|
||||
|
||||
}else if( AUTHSYSTEM->checkAuth(SockAuthToken) ){ //validate current Authentication token
|
||||
//Now provide access to the various subsystems
|
||||
// First get/set the permissions flag into the input structure
|
||||
out.in_struct.fullaccess = AUTHSYSTEM->hasFullAccess(SockAuthToken);
|
||||
//Pre-set any output fields
|
||||
QJsonObject outargs;
|
||||
out.CODE = EvaluateBackendRequest(out.in_struct.namesp, out.in_struct.name, out.in_struct.args, &outargs);
|
||||
out.CODE = EvaluateBackendRequest(out.in_struct, &outargs);
|
||||
out.out_args = outargs;
|
||||
}else{
|
||||
//Bad/No authentication
|
||||
@@ -190,21 +188,27 @@ void WebSocket::EvaluateRequest(const RestInputStruct &REQ){
|
||||
if(out.in_struct.args.isObject()){ evlist << JsonValueToString(out.in_struct.args); }
|
||||
else if(out.in_struct.args.isArray()){ evlist = JsonArrayToStringList(out.in_struct.args.toArray()); }
|
||||
//Now subscribe/unsubscribe to these events
|
||||
if(out.in_struct.name=="subscribe"){
|
||||
if(evlist.contains("dispatcher")){
|
||||
SendAppCafeEvents = true;
|
||||
outargs.insert("subscribe",QJsonValue("dispatcher"));
|
||||
QTimer::singleShot(100, this, SLOT(AppCafeStatusUpdate()) );
|
||||
}
|
||||
}else if(out.in_struct.name=="unsubscribe"){
|
||||
if(evlist.contains("dispatcher")){
|
||||
SendAppCafeEvents = false;
|
||||
outargs.insert("unsubscribe",QJsonValue("dispatcher"));
|
||||
int sub = -1; //bad input
|
||||
if(out.in_struct.name=="subscribe"){ sub = 1; }
|
||||
else if(out.in_struct.name=="unsubscribe"){ sub = 0; }
|
||||
|
||||
if(sub>=0 && !evlist.isEmpty() ){
|
||||
for(int i=0; i<evlist.length(); i++){
|
||||
EventWatcher::EVENT_TYPE type = EventWatcher::typeFromString(evlist[i]);
|
||||
if(type==EventWatcher::BADEVENT){ continue; }
|
||||
outargs.insert(out.in_struct.name,QJsonValue(evlist[i]));
|
||||
if(sub==1){
|
||||
ForwardEvents << type;
|
||||
QTimer::singleShot(100, this, SLOT(EventUpdate(type)) );
|
||||
}else{
|
||||
ForwardEvents.removeAll(type);
|
||||
}
|
||||
}
|
||||
out.out_args = outargs;
|
||||
}else{
|
||||
outargs.insert("unknown",QJsonValue("unknown"));
|
||||
//Bad/No authentication
|
||||
out.CODE = RestOutputStruct::BADREQUEST;
|
||||
}
|
||||
out.out_args = outargs;
|
||||
}else{
|
||||
//Bad/No authentication
|
||||
out.CODE = RestOutputStruct::UNAUTHORIZED;
|
||||
@@ -212,9 +216,11 @@ void WebSocket::EvaluateRequest(const RestInputStruct &REQ){
|
||||
//Other namespace - check whether auth has already been established before continuing
|
||||
}else if( AUTHSYSTEM->checkAuth(SockAuthToken) ){ //validate current Authentication token
|
||||
//Now provide access to the various subsystems
|
||||
// First get/set the permissions flag into the input structure
|
||||
out.in_struct.fullaccess = AUTHSYSTEM->hasFullAccess(SockAuthToken);
|
||||
//Pre-set any output fields
|
||||
QJsonObject outargs;
|
||||
out.CODE = EvaluateBackendRequest(out.in_struct.namesp, out.in_struct.name, out.in_struct.args, &outargs);
|
||||
out.CODE = EvaluateBackendRequest(out.in_struct, &outargs);
|
||||
out.out_args = outargs;
|
||||
}else{
|
||||
//Error in inputs - assemble the return error message
|
||||
@@ -335,18 +341,21 @@ void WebSocket::SslError(const QList<QSslError> &err){ //sslErrors() signal
|
||||
// ======================
|
||||
// PUBLIC SLOTS
|
||||
// ======================
|
||||
void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QString msg){
|
||||
if(msg.isEmpty()){ msg = EVENTS->lastEvent(evtype); }
|
||||
void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QJsonValue msg){
|
||||
if(msg.isUndefined()){ msg = EVENTS->lastEvent(evtype); }
|
||||
//qDebug() << "Socket Status Update:" << msg;
|
||||
if(evtype==EventWatcher::DISPATCHER){
|
||||
if(!SendAppCafeEvents){ return; } //don't report events on this socket
|
||||
RestOutputStruct out;
|
||||
out.CODE = RestOutputStruct::OK;
|
||||
out.in_struct.name = "dispatcher";
|
||||
out.in_struct.namesp = "events";
|
||||
out.out_args = QJsonValue(msg);//outargs;
|
||||
if(!ForwardEvents.contains(evtype)){ return; }
|
||||
RestOutputStruct out;
|
||||
out.CODE = RestOutputStruct::OK;
|
||||
out.in_struct.namesp = "events";
|
||||
out.out_args = msg;
|
||||
out.Header << "Content-Type: text/json; charset=utf-8"; //REST header info
|
||||
//Now send the message back through the socket
|
||||
this->sendReply(out.assembleMessage());
|
||||
} //end of DISPATCH event
|
||||
if(evtype==EventWatcher::DISPATCHER){
|
||||
out.in_struct.name = "dispatcher";
|
||||
}else if(evtype==EventWatcher::LIFEPRESERVER){
|
||||
out.in_struct.name = "life-preserver";
|
||||
}
|
||||
|
||||
//Now send the message back through the socket
|
||||
this->sendReply(out.assembleMessage());
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ private:
|
||||
QSslSocket *TSOCKET;
|
||||
QString SockID, SockAuthToken;
|
||||
AuthorizationManager *AUTHSYSTEM;
|
||||
bool SendAppCafeEvents;
|
||||
QList<EventWatcher::EVENT_TYPE> ForwardEvents;
|
||||
|
||||
void sendReply(QString msg);
|
||||
|
||||
@@ -40,9 +40,9 @@ private:
|
||||
|
||||
//Backend request/reply functions (contained in WebBackend.cpp)
|
||||
// -- Subsystem listing routine
|
||||
RestOutputStruct::ExitCode AvailableSubsystems(QJsonObject *out);
|
||||
RestOutputStruct::ExitCode AvailableSubsystems(bool fullaccess, QJsonObject *out);
|
||||
// -- Main subsystem parser
|
||||
RestOutputStruct::ExitCode EvaluateBackendRequest(QString namesp, QString name, const QJsonValue in_args, QJsonObject *out);
|
||||
RestOutputStruct::ExitCode EvaluateBackendRequest(const RestInputStruct&, QJsonObject *out);
|
||||
// -- Individual subsystems
|
||||
RestOutputStruct::ExitCode EvaluateSyscacheRequest(const QJsonValue in_args, QJsonObject *out);
|
||||
RestOutputStruct::ExitCode EvaluateDispatcherRequest(const QJsonValue in_args, QJsonObject *out);
|
||||
@@ -66,7 +66,7 @@ private slots:
|
||||
void SslError(const QList<QSslError>&); //sslErrors() signal
|
||||
|
||||
public slots:
|
||||
void EventUpdate(EventWatcher::EVENT_TYPE, QString msg = "");
|
||||
void EventUpdate(EventWatcher::EVENT_TYPE, QJsonValue = QJsonValue() );
|
||||
|
||||
signals:
|
||||
void SocketClosed(QString); //ID
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include <QUrl>
|
||||
#include <QFile>
|
||||
#include <QDir>
|
||||
#include <QDateTime>
|
||||
#include <QTextStream>
|
||||
#include <QProcess>
|
||||
#include <QSslKey>
|
||||
|
||||
Reference in New Issue
Block a user