From d2ef14a0d10ac47205c7f943c38bfa0d3db35922 Mon Sep 17 00:00:00 2001 From: Ken Moore Date: Thu, 3 Mar 2016 15:16:55 -0500 Subject: [PATCH] Large update to the dispatcher/events systems. Now there is a new DispatcherParsing.h file where any subsystem-specific dispatcher calls may be detected/parsed/handled as needed, and the iohyve fetch routine was updated to use this routine (for a proof of concept - could probably still use a bit more log output). --- src/server/Dispatcher.cpp | 128 +++++++++++++-------------- src/server/Dispatcher.h | 22 ++--- src/server/DispatcherParsing.h | 51 +++++++++++ src/server/EventWatcher.cpp | 6 +- src/server/EventWatcher.h | 2 +- src/server/WebBackend.cpp | 6 +- src/server/WebSocket.cpp | 3 +- src/server/library/sysadm-iohyve.cpp | 7 +- src/server/library/sysadm-iohyve.h | 2 +- src/server/main.cpp | 2 +- src/server/server.pro | 3 +- 11 files changed, 141 insertions(+), 91 deletions(-) create mode 100644 src/server/DispatcherParsing.h diff --git a/src/server/Dispatcher.cpp b/src/server/Dispatcher.cpp index 45be95c..47bd466 100644 --- a/src/server/Dispatcher.cpp +++ b/src/server/Dispatcher.cpp @@ -5,6 +5,8 @@ // ================================= #include "Dispatcher.h" +#include "DispatcherParsing.h" + #include "globals.h" @@ -13,6 +15,11 @@ // ================================ DProcess::DProcess(QObject *parent) : QProcess(parent){ //Setup the process + bool notify = false; + uptimer = new QTimer(this); + uptimer->setSingleShot(true); + uptimer->setInterval(1000); //1 second intervals + connect(uptimer, SIGNAL(timeout()), this, SLOT(emitUpdate()) ); this->setProcessEnvironment(QProcessEnvironment::systemEnvironment()); this->setProcessChannelMode(QProcess::MergedChannels); connect(this, SIGNAL(readyRead()), this, SLOT(updateLog()) ); @@ -25,24 +32,30 @@ DProcess::~DProcess(){ } void DProcess::startProc(){ + cmds.removeAll(""); //make sure no empty commands if(cmds.isEmpty()){ - t_finished = QDateTime::currentDateTime(); - emit ProcFinished(ID); + proclog.insert("state","finished"); + proclog.insert("time_finished", QDateTime::currentDateTime().toString(Qt::ISODate)); + proclog.remove("current_cmd"); + emit ProcFinished(ID, proclog); return; } - QString cmd = cmds.takeFirst(); + cCmd = cmds.takeFirst(); success = false; //not finished yet - if(!proclog.isEmpty()){ proclog.append("\n"); } - else{ //first cmd started - t_started = QDateTime::currentDateTime(); - rawcmds = cmds; rawcmds.prepend(cmd); + if(proclog.isEmpty()){ + //first cmd started + proclog.insert("time_started", QDateTime::currentDateTime().toString(Qt::ISODate)); + proclog.insert("cmd_list",QJsonArray::fromStringList(cmds)); + proclog.insert("process_id",ID); + proclog.insert("state","running"); + //rawcmds = cmds; rawcmds.prepend(cCmd); //setup internal connections connect(this, SIGNAL(finished(int, QProcess::ExitStatus)), this, SLOT(cmdFinished(int, QProcess::ExitStatus)) ); connect(this, SIGNAL(error(QProcess::ProcessError)), this, SLOT(cmdError(QProcess::ProcessError)) ); - } - proclog.append("[Running Command: "+cmd+" ]"); + } + proclog.insert("current_cmd",cCmd); //qDebug() << "Proc Starting:" << ID << cmd; - this->start(cmd); + this->start(cCmd); } bool DProcess::isRunning(){ @@ -53,7 +66,7 @@ bool DProcess::isDone(){ return (!this->isRunning() && !proclog.isEmpty()); } -QString DProcess::getProcLog(){ +QJsonObject DProcess::getProcLog(){ //Now return the current version of the log return proclog; } @@ -64,33 +77,35 @@ void DProcess::cmdError(QProcess::ProcessError err){ } void DProcess::cmdFinished(int ret, QProcess::ExitStatus status){ + if(uptimer->isActive()){ uptimer->stop(); } //use the finished signal instead //determine success/failure success = (status==QProcess::NormalExit && ret==0); //update the log before starting another command - proclog.append( this->readAllStandardOutput() ); + proclog.insert(cCmd, proclog.value(cCmd).toString().append(this->readAllStandardOutput()) ); + proclog.insert("return_codes/"+cCmd, QString::number(ret)); + //Now run any additional commands //qDebug() << "Proc Finished:" << ID << success << proclog; - if(success && !cmds.isEmpty()){ startProc(); } - else if(success){ - t_finished = QDateTime::currentDateTime(); - emit ProcFinished(ID); - emit Finished(ID, ret, proclog); + if(success && !cmds.isEmpty()){ + emit ProcUpdate(ID, proclog); + startProc(); }else{ - if(status==QProcess::NormalExit){ - proclog.append("\n[Command Failed: " + QString::number(ret)+" ]"); - }else{ - proclog.append("\n[Command Failed: Process Crashed ]"); - } - t_finished = QDateTime::currentDateTime(); - emit ProcFinished(ID); - emit Finished(ID, ret, proclog); + proclog.insert("state","finished"); + proclog.remove("current_cmd"); + proclog.insert("time_finished", QDateTime::currentDateTime().toString(Qt::ISODate)); + emit ProcFinished(ID, proclog); } } void DProcess::updateLog(){ - proclog.append( this->readAllStandardOutput() ); - emit ProcessOutput(proclog); + proclog.insert(cCmd, proclog.value(cCmd).toString().append(this->readAllStandardOutput()) ); + if(!uptimer->isActive()){ uptimer->start(); } } + +void DProcess::emitUpdate(){ + emit ProcUpdate(ID, proclog); +} + // ================================ // Dispatcher Class // ================================ @@ -147,46 +162,39 @@ DProcess* Dispatcher::createProcess(QString ID, QStringList cmds){ void Dispatcher::mkProcs(Dispatcher::PROC_QUEUE queue, DProcess *P){ //qDebug() << "mkProcs()"; //P->moveToThread(this->thread()); - connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) ); + connect(P, SIGNAL(ProcFinished(QString, QJsonObject)), this, SLOT(ProcFinished(QString, QJsonObject)) ); + connect(P, SIGNAL(ProcUpdate(QString, QJsonObject)), this, SLOT(ProcUpdated(QString, QJsonObject)) ); QList list; if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list HASH[queue] << P; //add this proc to the end of the list CheckQueues(); } -void Dispatcher::ProcFinished(QString ID){ +void Dispatcher::ProcFinished(QString ID, QJsonObject log){ //Find the process with this ID and close it down (with proper events) qDebug() << " - Got Proc Finished Signal:" << ID; - /*bool found = false; - for(int i=0; i(i); - if(HASH.contains(queue)){ - QList list = HASH[queue]; - for(int l=0; lID==ID){ - qDebug() << "Delete Proc on Finished:" << list[l]->ID; - QJsonObject obj; - obj.insert("log",list[l]->getProcLog()); - obj.insert("success", list[l]->success ? "true" : "false" ); - obj.insert("proc_id", ID); - obj.insert("cmd_list", QJsonArray::fromStringList( list[l]->rawcmds ) ); - obj.insert("time_started", list[l]->t_started.toString(Qt::ISODate) ); - obj.insert("time_finished", list[l]->t_finished.toString(Qt::ISODate) ); - emit DispatchFinished(obj); - list.takeAt(l)->deleteLater(); - LogManager::log(LogManager::DISPATCH, obj); - found = true; - l--; - } - } //end loop over queue list - } - }//end loop over queue enumeration*/ + LogManager::log(LogManager::DISPATCH, log); + //First emit any subsystem-specific event, falling back on the raw log + QJsonObject ev = CreateDispatcherEventNotification(ID,log); + if(!ev.isEmpty()){ + emit DispatchEvent(ev); + }else{ + emit DispatchEvent(log); + } + QTimer::singleShot(20,this, SLOT(CheckQueues()) ); } +void Dispatcher::ProcUpdated(QString ID, QJsonObject log){ + //See if this needs to generate an event + QJsonObject ev = CreateDispatcherEventNotification(ID,log); + if(!ev.isEmpty()){ + emit DispatchEvent(ev); + } +} + void Dispatcher::CheckQueues(){ - qDebug() << " - Check Queues"; - for(int i=0; i(i); if(HASH.contains(queue)){ QList list = HASH[queue]; @@ -194,16 +202,6 @@ void Dispatcher::CheckQueues(){ if(j>0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time if( !list[j]->isRunning() ){ if(list[j]->isDone() ){ - qDebug() << "Delete Proc:" << list[j]->ID; - QJsonObject obj; - obj.insert("log",list[j]->getProcLog()); - obj.insert("success", list[j]->success ? "true" : "false" ); - obj.insert("proc_id", list[j]->ID); - obj.insert("cmd_list", QJsonArray::fromStringList( list[j]->rawcmds ) ); - obj.insert("time_started", list[j]->t_started.toString(Qt::ISODate) ); - obj.insert("time_finished", list[j]->t_finished.toString(Qt::ISODate) ); - emit DispatchFinished(obj); - LogManager::log(LogManager::DISPATCH, obj); list.takeAt(j)->deleteLater(); j--; }else{ diff --git a/src/server/Dispatcher.h b/src/server/Dispatcher.h index 2d3683d..b857da4 100644 --- a/src/server/Dispatcher.h +++ b/src/server/Dispatcher.h @@ -21,11 +21,11 @@ public: //output variables for logging purposes bool success; - QDateTime t_started, t_finished; - QStringList rawcmds; //copy of cmds at start of process + //QDateTime t_started, t_finished; + // QStringList rawcmds; //copy of cmds at start of process //Get the current process log (can be run during/after the process runs) - QString getProcLog(); + QJsonObject getProcLog(); //Process Status bool isRunning(); bool isDone(); @@ -34,19 +34,20 @@ public slots: void startProc(); private: - QString proclog; + QString cCmd; + QJsonObject proclog; + QTimer *uptimer; private slots: void cmdError(QProcess::ProcessError); void cmdFinished(int, QProcess::ExitStatus); void updateLog(); //readyRead() signal - + void emitUpdate(); signals: - void ProcFinished(QString); //ID + void ProcFinished(QString, QJsonObject); // ID / log //Generic signals for subsystem usage (no direct proc access later) - void ProcessOutput(QString); - void Finished(QString, int, QString); //ID, retcode, log + void ProcUpdate(QString, QJsonObject); // ID/log }; @@ -82,12 +83,13 @@ private: private slots: void mkProcs(Dispatcher::PROC_QUEUE, DProcess *P); - void ProcFinished(QString ID); + void ProcFinished(QString ID, QJsonObject log); + void ProcUpdated(QString ID, QJsonObject log); void CheckQueues(); signals: //Main signals - void DispatchFinished(QJsonObject obj); //obj is the data associated with the process + void DispatchEvent(QJsonObject obj); //obj is the data associated with the process void DispatchStarting(QString ID); //Signals for private usage diff --git a/src/server/DispatcherParsing.h b/src/server/DispatcherParsing.h new file mode 100644 index 0000000..4fec645 --- /dev/null +++ b/src/server/DispatcherParsing.h @@ -0,0 +1,51 @@ +// =============================== +// PC-BSD REST API Server +// Available under the 3-clause BSD License +// Written by: Ken Moore 2015-2016 +// ================================= +// These static classes are for defining custom Dispatcher/Event notifications +// for individual subsystems +//================================= +#ifndef _PCBSD_SYSADM_DISPATCH_SUBSYSTEM_FILTER_SYSTEM_H +#define _PCBSD_SYSADM_DISPATCH_SUBSYSTEM_FILTER_SYSTEM_H + +#include "globals-qt.h" +#include "EventWatcher.h" + +static QJsonObject CreateDispatcherEventNotification(QString ID, QJsonObject log){ + //key outputs - need to set these if an event is going to be sent out + QJsonObject args; //any arguments to send out + QString namesp, name; //the namespace/name of the subsystem used + //Quick flags/simplifications for use later + QString cCmd, cLog; //Current command/log for that command (might be a chain of commands) + cCmd = log.value("current_cmd").toString(); //This is usually empty if the proc finished + if(!cCmd.isEmpty()){ cLog = log.value(cCmd).toString(); } + bool isFinished = log.contains("return_codes/"+cCmd) || cCmd.isEmpty(); + qDebug() << "Check Dispatcher Event:"; + qDebug() << " - RAW LOG:" << log; + qDebug() << "cCmd:" << cCmd << "cLog:" << cLog << "isFinished:" << isFinished; + //Now parse the notification based on the dispatch ID or current command + //NOTE: There might be a random string on the end of the ID (to accomodate similar process calls) + if(ID.startsWith("sysadm_iohyve")){ + namesp = "sysadm"; name="iohyve"; + //Now perform additional cmd/system based filtering + if(ID.section("::",0,0)=="sysadm_iohyve_fetch" || cCmd.startsWith("iohyve fetch ")){ + //Do some parsing of the log + if(isFinished){ + args.insert("state","finished"); + }else{ + args.insert("state","running"); + args.insert("progress", cLog.section("\n",-1, QString::SectionSkipEmpty)); //send the last line of the fetch + } + } + + } + + //Now assemble the output as needed + if(namesp.isEmpty() || name.isEmpty()){ return QJsonObject(); } //no event + args.insert("event_system",namesp+"/"+name); + return args; +} + + +#endif \ No newline at end of file diff --git a/src/server/EventWatcher.cpp b/src/server/EventWatcher.cpp index 6de9923..702276f 100644 --- a/src/server/EventWatcher.cpp +++ b/src/server/EventWatcher.cpp @@ -111,13 +111,13 @@ void EventWatcher::DispatchStarting(QString ID){ obj.insert("process_id", ID); obj.insert("state", "running"); LogManager::log(LogManager::EV_DISPATCH, obj); - qDebug() << "Got Dispatch starting: sending event..."; + //qDebug() << "Got Dispatch starting: sending event..."; emit NewEvent(DISPATCHER, obj); } -void EventWatcher::DispatchFinished(QJsonObject obj){ +void EventWatcher::DispatchEvent(QJsonObject obj){ LogManager::log(LogManager::EV_DISPATCH, obj); - qDebug() << "Got Dispatch Finished: sending event..."; + //qDebug() << "Got Dispatch Finished: sending event..."; emit NewEvent(DISPATCHER, obj); } diff --git a/src/server/EventWatcher.h b/src/server/EventWatcher.h index e6ed4f6..865121a 100644 --- a/src/server/EventWatcher.h +++ b/src/server/EventWatcher.h @@ -55,7 +55,7 @@ public slots: //Slots for the global Dispatcher to connect to void DispatchStarting(QString); - void DispatchFinished(QJsonObject); + void DispatchEvent(QJsonObject); private slots: //File watcher signals diff --git a/src/server/WebBackend.cpp b/src/server/WebBackend.cpp index 6e5a30f..8135de2 100644 --- a/src/server/WebBackend.cpp +++ b/src/server/WebBackend.cpp @@ -595,10 +595,8 @@ RestOutputStruct::ExitCode WebSocket::EvaluateSysadmIohyveRequest(const QJsonVal } else if(act=="fetchiso"){ ok = true; - DProcess fetchproc; - out->insert("fetchiso", sysadm::Iohyve::fetchISO(in_args.toObject(), &fetchproc)); - connect(&fetchproc, SIGNAL(ProcessOutput(QString)), this, SLOT(slotIohyveFetchProcessOutput(QString)) ); - connect(&fetchproc, SIGNAL(Finished(QString, int, QString)), this, SLOT(slotIohyveFetchDone(QString, int, QString)) ); + //DProcess fetchproc; + out->insert("fetchiso", sysadm::Iohyve::fetchISO(in_args.toObject())); } else if(act=="install"){ ok = true; diff --git a/src/server/WebSocket.cpp b/src/server/WebSocket.cpp index 6398129..4a25955 100644 --- a/src/server/WebSocket.cpp +++ b/src/server/WebSocket.cpp @@ -393,8 +393,9 @@ void WebSocket::SslError(const QList &err){ //sslErrors() signal // PUBLIC SLOTS // ====================== void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QJsonValue msg){ - qDebug() << "Got Socket Event Update:" << msg; + //qDebug() << "Got Socket Event Update:" << msg; if(msg.isNull()){ msg = EVENTS->lastEvent(evtype); } + if(msg.isNull()){ return; } //nothing to send if( !ForwardEvents.contains(evtype) ){ return; } RestOutputStruct out; out.CODE = RestOutputStruct::OK; diff --git a/src/server/library/sysadm-iohyve.cpp b/src/server/library/sysadm-iohyve.cpp index 071b7d0..dbddca0 100644 --- a/src/server/library/sysadm-iohyve.cpp +++ b/src/server/library/sysadm-iohyve.cpp @@ -44,7 +44,7 @@ QJsonObject Iohyve::createGuest(QJsonObject jsin) { } // Queue the fetch of an ISO -QJsonObject Iohyve::fetchISO(QJsonObject jsin, DProcess *returnproc) { +QJsonObject Iohyve::fetchISO(QJsonObject jsin) { QJsonObject retObject; QStringList keys = jsin.keys(); @@ -57,11 +57,10 @@ QJsonObject Iohyve::fetchISO(QJsonObject jsin, DProcess *returnproc) { QString url = jsin.value("url").toString(); // Create a unique ID for this queued action - QString ID = QUuid::createUuid().toString(); + QString ID = "sysadm_iohyve_fetch::"+QUuid::createUuid().toString(); // Queue the fetch action - returnproc = DISPATCHER->queueProcess(ID, "iohyve fetch " + url); - qDebug() << "returnproc" << returnproc; + DISPATCHER->queueProcess(ID, "iohyve fetch " + url); // Return some details to user that the action was queued retObject.insert("command", "iohyve fetch " + url); diff --git a/src/server/library/sysadm-iohyve.h b/src/server/library/sysadm-iohyve.h index beee371..a5a0f21 100644 --- a/src/server/library/sysadm-iohyve.h +++ b/src/server/library/sysadm-iohyve.h @@ -16,7 +16,7 @@ namespace sysadm{ class Iohyve{ public: static QJsonObject createGuest(QJsonObject); - static QJsonObject fetchISO(QJsonObject, DProcess *); + static QJsonObject fetchISO(QJsonObject); static QJsonObject installGuest(QJsonObject); static QJsonObject isSetup(); static QJsonObject listVMs(); diff --git a/src/server/main.cpp b/src/server/main.cpp index ccc845d..16ce691 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -140,7 +140,7 @@ int main( int argc, char ** argv ) qInstallMessageHandler(MessageOutput); //Connect the background classes - QObject::connect(DISPATCHER, SIGNAL(DispatchFinished(QJsonObject)), EVENTS, SLOT(DispatchFinished(QJsonObject)) ); + QObject::connect(DISPATCHER, SIGNAL(DispatchEvent(QJsonObject)), EVENTS, SLOT(DispatchEvent(QJsonObject)) ); QObject::connect(DISPATCHER, SIGNAL(DispatchStarting(QString)), EVENTS, SLOT(DispatchStarting(QString)) ); //Create the daemon diff --git a/src/server/server.pro b/src/server/server.pro index bda91ec..3ecb740 100644 --- a/src/server/server.pro +++ b/src/server/server.pro @@ -13,7 +13,8 @@ HEADERS += globals.h globals-qt.h \ SslServer.h \ EventWatcher.h \ LogManager.h \ - Dispatcher.h + Dispatcher.h \ + DispatcherParsing.h SOURCES += main.cpp \ WebServer.cpp \