From 04c366f1143160e4e0a0beb9036b22abb10ad1a1 Mon Sep 17 00:00:00 2001 From: Ken Moore Date: Fri, 29 Jan 2016 16:24:40 -0500 Subject: [PATCH] Finish adding in the new dispatcher system (untested). This should be ready for trying out in some subsystem now. Note: to queue up a job you just need to run one of the DISPATCHER->queue() functions (there are a few overloaded versions for simplicity) --- src/server/Dispatcher.cpp | 53 ++++++++++++++++++++++++++----------- src/server/Dispatcher.h | 12 +++------ src/server/EventWatcher.cpp | 32 ++++++++++++++++++---- src/server/EventWatcher.h | 6 ++++- src/server/LogManager.h | 5 ++-- src/server/WebBackend.cpp | 6 ++--- src/server/globals.h | 8 +++--- src/server/main.cpp | 5 ++++ src/server/server.pro | 6 +++-- 9 files changed, 91 insertions(+), 42 deletions(-) diff --git a/src/server/Dispatcher.cpp b/src/server/Dispatcher.cpp index 411edb9..74f6ff2 100644 --- a/src/server/Dispatcher.cpp +++ b/src/server/Dispatcher.cpp @@ -20,7 +20,7 @@ DProcess::DProcess(QObject *parent) : QProcess(parent){ } DProcess::~DProcess(){ - if(this->state()!=QProcess::NotRunning)){ + if( this->state()!=QProcess::NotRunning ){ this->terminate(); } } @@ -30,11 +30,14 @@ void DProcess::startProc(){ finished = QDateTime::currentDateTime(); emit ProcFinished(ID); return; + }else if(proclog.isEmpty()){ + started = QDateTime::currentDateTime(); //first cmd started + rawcmds = cmds; + }else{ + proclog.append("\n"); } QString cmd = cmds.takeFirst(); success = false; //not finished yet - if(!proclog.isEmpty()){ proclog.append("\n"); } - else{ started = QDateTime::currentDateTime(); } //first cmd started proclog.append("[Running Command: "+cmd+" ]"); this->start(cmd); } @@ -105,8 +108,10 @@ void Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QStringL QList list; if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list HASH[queue] << P; //add this proc to the end of the list - if(queue==NO_QUEUE || HASH[queue].length()==1){ P->startProc(); } //go ahead and start it now - else{ CheckQueues(); } + if(queue==NO_QUEUE || HASH[queue].length()==1){ + emit DispatchStarting(P->ID); + P->startProc(); //go ahead and start it now + }else{ CheckQueues(); } } // === PRIVATE === @@ -115,6 +120,7 @@ DProcess* Dispatcher:: createProcess(QString ID, QStringList cmds){ DProcess* P = new DProcess(this); P->cmds = cmds; P->ID = ID; + P->success = false; connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) ); return P; } @@ -122,22 +128,23 @@ DProcess* Dispatcher:: createProcess(QString ID, QStringList cmds){ // === PRIVATE SLOTS === void Dispatcher::ProcFinished(QString ID){ //Find the process with this ID and close it down (with proper events) - for int i=0; i(i); + bool found = false; + for(int i=0; i(i); if(HASH.contains(queue)){ QList list = HASH[queue]; - bool found = false; for(int l=0; lID==ID){ QJsonObject obj; - obj.insert("log",list[l].procLog()); - obj.insert("success", list[l].success ? "true" : "false" ); + obj.insert("log",list[l]->getProcLog()); + obj.insert("success", list[l]->success ? "true" : "false" ); obj.insert("proc_id", ID); - obj.insert("cmd_list", QJsonArray::fromStringList( list[l].rawcmds ); - obj.insert("time_started", list[l].started.toString(QT::ISODate) ); - obj.insert("time_finished", list[l].finished.toString(QT::ISODate) ); + obj.insert("cmd_list", QJsonArray::fromStringList( list[l]->rawcmds ) ); + obj.insert("time_started", list[l]->started.toString(Qt::ISODate) ); + obj.insert("time_finished", list[l]->finished.toString(Qt::ISODate) ); + emit DispatchFinished(ID, list[l]->success); delete list.takeAt(l); - LogManager::log(LogManager::EV_DISPATCH, obj); + LogManager::log(LogManager::DISPATCH, obj); found = true; } } //end loop over queue list @@ -147,5 +154,19 @@ void Dispatcher::ProcFinished(QString ID){ } void Dispatcher::CheckQueues(){ - + for(int i=0; i(i); + if(HASH.contains(queue)){ + QList list = HASH[queue]; + for(int j=0; j0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time + if(!list[j]->isRunning() && list[j]->getProcLog().isEmpty()){ + //Need to start this one - has not run yet + emit DispatchStarting(list[j]->ID); + list[j]->startProc(); + } + } //end loop over list + } + + } //end loop over queue types } diff --git a/src/server/Dispatcher.h b/src/server/Dispatcher.h index 19d5af1..5a40332 100644 --- a/src/server/Dispatcher.h +++ b/src/server/Dispatcher.h @@ -14,7 +14,7 @@ class DProcess : public QProcess{ Q_OBJECT public: - DProcess(QObject parent = 0); + DProcess(QObject *parent = 0); ~DProcess(); QString ID; @@ -68,16 +68,10 @@ private: QString queue_file; //Internal lists - QHash > QUEUES; + QHash > HASH; //Simplification routine for setting up a process - DProcess* createProcess(QString ID, QStringList cmds){ - DProcess* P = new DProcess(this); - P->cmds = cmds; - P->ID = ID; - connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) ); - return P; - } + DProcess* createProcess(QString ID, QStringList cmds); private slots: void ProcFinished(QString ID); diff --git a/src/server/EventWatcher.cpp b/src/server/EventWatcher.cpp index 6c53046..7b08090 100644 --- a/src/server/EventWatcher.cpp +++ b/src/server/EventWatcher.cpp @@ -5,6 +5,8 @@ // ================================= #include "EventWatcher.h" +#include "globals.h" + // === PUBLIC === EventWatcher::EventWatcher(){ starting = true; @@ -24,9 +26,9 @@ EventWatcher::~EventWatcher(){ void EventWatcher::start(){ // - DISPATCH Events starting = true; - if(!QFile::exists(DISPATCHWORKING)){ QProcess::execute("touch "+DISPATCHWORKING); } + //if(!QFile::exists(DISPATCHWORKING)){ QProcess::execute("touch "+DISPATCHWORKING); } //qDebug() << " Dispatcher Events:" << DISPATCHWORKING; - WatcherUpdate(DISPATCHWORKING); //load it initially (will also add it to the watcher) + //WatcherUpdate(DISPATCHWORKING); //load it initially (will also add it to the watcher) // - Life Preserver Events WatcherUpdate(LPLOG); //load it initially (will also add it to the watcher); WatcherUpdate(LPERRLOG); //load it initially (will also add it to the watcher); @@ -86,10 +88,29 @@ double EventWatcher::displayToDoubleK(QString displayNumber){ return num; } +// === PUBLIC SLOTS === +//Slots for the global Dispatcher to connect to +void EventWatcher::DispatchStarting(QString ID){ + QJsonObject obj; + obj.insert("process_id", ID); + obj.insert("state", "running"); + LogManager::log(LogManager::EV_DISPATCH, obj); + emit NewEvent(DISPATCHER, obj); +} + +void EventWatcher::DispatchFinished(QString ID, bool success){ + QJsonObject obj; + obj.insert("process_id", ID); + obj.insert("state", "finished"); + obj.insert("result", success ? "success" : "failure"); + LogManager::log(LogManager::EV_DISPATCH, obj); + emit NewEvent(DISPATCHER, obj); +} + // === PRIVATE SLOTS === void EventWatcher::WatcherUpdate(const QString &path){ if(!starting){ qDebug() << "Event Watcher Update:" << path; } - if(path==DISPATCHWORKING){ + /*if(path==DISPATCHWORKING){ //Read the file contents QString stat = readFile(DISPATCHWORKING); if(stat.simplified().isEmpty()){ stat = "idle"; } @@ -97,7 +118,8 @@ void EventWatcher::WatcherUpdate(const QString &path){ HASH.insert(DISPATCHER,stat); //save for later //Forward those contents on to the currently-open sockets emit NewEvent(DISPATCHER, QJsonValue(stat) ); - }else if(path==LPLOG){ + }else*/ + if(path==LPLOG){ //Main Life Preserver Log File ReadLPLogFile(); }else if(path==LPERRLOG){ @@ -121,7 +143,7 @@ void EventWatcher::CheckLogFiles(){ if(!watched.contains(LPLOG) && QFile::exists(LPLOG)){ watcher->addPath(LPLOG); } if(!watched.contains(LPERRLOG) && QFile::exists(LPERRLOG)){ watcher->addPath(LPERRLOG); } if(!watched.contains(tmpLPRepFile) && QFile::exists(tmpLPRepFile)){ watcher->addPath(tmpLPRepFile); } - if(!watched.contains(DISPATCHWORKING) && QFile::exists(LPLOG)){ watcher->addPath(DISPATCHWORKING); } + //if(!watched.contains(DISPATCHWORKING) && QFile::exists(LPLOG)){ watcher->addPath(DISPATCHWORKING); } //qDebug() << "watched:" << watcher->files() << watcher->directories(); } diff --git a/src/server/EventWatcher.h b/src/server/EventWatcher.h index 7f464ab..35e6bed 100644 --- a/src/server/EventWatcher.h +++ b/src/server/EventWatcher.h @@ -8,7 +8,7 @@ #include "globals-qt.h" -#define DISPATCHWORKING QString("/var/tmp/appcafe/dispatch-queue.working") +//#define DISPATCHWORKING QString("/var/tmp/appcafe/dispatch-queue.working") #define LPLOG QString("/var/log/lpreserver/lpreserver.log") #define LPERRLOG QString("/var/log/lpreserver/error.log") #define LPREPLOGDIR QString("/var/log/lpreserver/") @@ -49,6 +49,10 @@ private: public slots: void start(); + //Slots for the global Dispatcher to connect to + void DispatchStarting(QString); + void DispatchFinished(QString, bool); + private slots: //File watcher signals void WatcherUpdate(const QString&); diff --git a/src/server/LogManager.h b/src/server/LogManager.h index 7237c59..eee6ac0 100644 --- a/src/server/LogManager.h +++ b/src/server/LogManager.h @@ -15,7 +15,7 @@ //=========================================== // Event Files (EV_*): JSON input/output (full event) // HOST: String input/output (simple messages) -// +// DISPATCH: Full log of dispatcher processes //=========================================== #define LOGDIR QString("/var/log/sysadm") @@ -24,11 +24,12 @@ class LogManager{ public: //Enumeration of common log files (will automatically use proper file) // === ADD NEW FILE SUPPORT HERE === - enum LOG_FILE {HOST, EV_DISPATCH, EV_LP}; + enum LOG_FILE {HOST, DISPATCH, EV_DISPATCH, EV_LP}; //Conversion function for flag->path static QString flagToPath(LogManager::LOG_FILE flag){ QString filepath; if(flag==HOST){ filepath.append("hostinfo"); } + else if(flag==DISPATCH){ filepath.append("dispatcher"); } else if(flag==EV_DISPATCH){ filepath.append("events-dispatcher"); } else if(flag==EV_LP){ filepath.append("events-lifepreserver"); } else{ return ""; } //invalid file given diff --git a/src/server/WebBackend.cpp b/src/server/WebBackend.cpp index 26d331b..08559e5 100644 --- a/src/server/WebBackend.cpp +++ b/src/server/WebBackend.cpp @@ -35,10 +35,10 @@ RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(bool allaccess, QJsonO out->insert("rpc/syscache","read"); //no write to syscache - only reads } // - dispatcher - if(DispatcherClient::DispatcherAvailable()){ + //if(DispatcherClient::DispatcherAvailable()){ //"read" is the event notifications, "write" is the ability to queue up jobs out->insert("rpc/dispatcher", allaccess ? "read/write" : "read"); - } + //} // - network out->insert("sysadm/network","read/write"); @@ -143,7 +143,7 @@ RestOutputStruct::ExitCode WebSocket::EvaluateDispatcherRequest(const QJsonValue }else{ return RestOutputStruct::BADREQUEST; } //Run the Request (should be one value for each in_req) - QStringList values = DispatcherClient::parseInputs(in_req, AUTHSYSTEM);; + QStringList values = DispatcherClient::parseInputs(in_req, AUTHSYSTEM); while(values.length() < in_req.length()){ values << "[ERROR]"; } //ensure lists are same length //Format the result diff --git a/src/server/globals.h b/src/server/globals.h index ad9ec30..05dd4d7 100644 --- a/src/server/globals.h +++ b/src/server/globals.h @@ -8,15 +8,15 @@ #include "globals-qt.h" +#include "LogManager.h" + //Global variables/classes (intially created in main.cpp) extern QSettings *CONFIG; #include "EventWatcher.h" extern EventWatcher *EVENTS; -//#include "ProcessQueue.h" -//extern ProcessQueue *PQUEUE; -//#include "LogManager.h" -//extern LogManager *LOGS; +#include "Dispatcher.h" +extern Dispatcher *DISPATCHER; //Special defines #define WSPORTNUMBER 12150 // WebSocket server default port diff --git a/src/server/main.cpp b/src/server/main.cpp index a32abaa..5b3deba 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -15,6 +15,7 @@ //Create any global classes QSettings *CONFIG = new QSettings("PCBSD","sysadm"); EventWatcher *EVENTS = new EventWatcher(); +Dispatcher *DISPATCHER = new Dispatcher(); //Create the default logfile QFile logfile; @@ -76,6 +77,10 @@ int main( int argc, char ** argv ) logfile.open(QIODevice::WriteOnly | QIODevice::Append); qInstallMessageHandler(MessageOutput); + //Connect the background classes + QObject::connect(DISPATCHER, SIGNAL(DispatchFinished(QString, bool)), EVENTS, SLOT(DispatchFinished(QString,bool)) ); + QObject::connect(DISPATCHER, SIGNAL(DispatchStarting(QString)), EVENTS, SLOT(DispatchStarting(QString)) ); + //Create the daemon qDebug() << "Starting the PC-BSD sysadm server...." << (websocket ? "(WebSocket)" : "(TCP)"); WebServer *w = new WebServer(); diff --git a/src/server/server.pro b/src/server/server.pro index 81e4a8a..a006f6b 100644 --- a/src/server/server.pro +++ b/src/server/server.pro @@ -13,7 +13,8 @@ HEADERS += globals.h globals-qt.h \ AuthorizationManager.h \ SslServer.h \ EventWatcher.h \ - LogManager.h + LogManager.h \ + Dispatcher.h SOURCES += main.cpp \ WebServer.cpp \ @@ -23,7 +24,8 @@ SOURCES += main.cpp \ dispatcher-client.cpp \ AuthorizationManager.cpp \ EventWatcher.cpp \ - LogManager.cpp + LogManager.cpp \ + Dispatcher.cpp TARGET=sysadm-server