Large update to the dispatcher/events systems. Now there is a new DispatcherParsing.h file where any subsystem-specific dispatcher calls may be detected/parsed/handled as needed, and the iohyve fetch routine was updated to use this routine (for a proof of concept - could probably still use a bit more log output).

This commit is contained in:
Ken Moore
2016-03-03 15:16:55 -05:00
parent 2312271340
commit d2ef14a0d1
11 changed files with 141 additions and 91 deletions

View File

@@ -5,6 +5,8 @@
// =================================
#include "Dispatcher.h"
#include "DispatcherParsing.h"
#include "globals.h"
@@ -13,6 +15,11 @@
// ================================
DProcess::DProcess(QObject *parent) : QProcess(parent){
//Setup the process
bool notify = false;
uptimer = new QTimer(this);
uptimer->setSingleShot(true);
uptimer->setInterval(1000); //1 second intervals
connect(uptimer, SIGNAL(timeout()), this, SLOT(emitUpdate()) );
this->setProcessEnvironment(QProcessEnvironment::systemEnvironment());
this->setProcessChannelMode(QProcess::MergedChannels);
connect(this, SIGNAL(readyRead()), this, SLOT(updateLog()) );
@@ -25,24 +32,30 @@ DProcess::~DProcess(){
}
void DProcess::startProc(){
cmds.removeAll(""); //make sure no empty commands
if(cmds.isEmpty()){
t_finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
proclog.insert("state","finished");
proclog.insert("time_finished", QDateTime::currentDateTime().toString(Qt::ISODate));
proclog.remove("current_cmd");
emit ProcFinished(ID, proclog);
return;
}
QString cmd = cmds.takeFirst();
cCmd = cmds.takeFirst();
success = false; //not finished yet
if(!proclog.isEmpty()){ proclog.append("\n"); }
else{ //first cmd started
t_started = QDateTime::currentDateTime();
rawcmds = cmds; rawcmds.prepend(cmd);
if(proclog.isEmpty()){
//first cmd started
proclog.insert("time_started", QDateTime::currentDateTime().toString(Qt::ISODate));
proclog.insert("cmd_list",QJsonArray::fromStringList(cmds));
proclog.insert("process_id",ID);
proclog.insert("state","running");
//rawcmds = cmds; rawcmds.prepend(cCmd);
//setup internal connections
connect(this, SIGNAL(finished(int, QProcess::ExitStatus)), this, SLOT(cmdFinished(int, QProcess::ExitStatus)) );
connect(this, SIGNAL(error(QProcess::ProcessError)), this, SLOT(cmdError(QProcess::ProcessError)) );
}
proclog.append("[Running Command: "+cmd+" ]");
}
proclog.insert("current_cmd",cCmd);
//qDebug() << "Proc Starting:" << ID << cmd;
this->start(cmd);
this->start(cCmd);
}
bool DProcess::isRunning(){
@@ -53,7 +66,7 @@ bool DProcess::isDone(){
return (!this->isRunning() && !proclog.isEmpty());
}
QString DProcess::getProcLog(){
QJsonObject DProcess::getProcLog(){
//Now return the current version of the log
return proclog;
}
@@ -64,33 +77,35 @@ void DProcess::cmdError(QProcess::ProcessError err){
}
void DProcess::cmdFinished(int ret, QProcess::ExitStatus status){
if(uptimer->isActive()){ uptimer->stop(); } //use the finished signal instead
//determine success/failure
success = (status==QProcess::NormalExit && ret==0);
//update the log before starting another command
proclog.append( this->readAllStandardOutput() );
proclog.insert(cCmd, proclog.value(cCmd).toString().append(this->readAllStandardOutput()) );
proclog.insert("return_codes/"+cCmd, QString::number(ret));
//Now run any additional commands
//qDebug() << "Proc Finished:" << ID << success << proclog;
if(success && !cmds.isEmpty()){ startProc(); }
else if(success){
t_finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
emit Finished(ID, ret, proclog);
if(success && !cmds.isEmpty()){
emit ProcUpdate(ID, proclog);
startProc();
}else{
if(status==QProcess::NormalExit){
proclog.append("\n[Command Failed: " + QString::number(ret)+" ]");
}else{
proclog.append("\n[Command Failed: Process Crashed ]");
}
t_finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
emit Finished(ID, ret, proclog);
proclog.insert("state","finished");
proclog.remove("current_cmd");
proclog.insert("time_finished", QDateTime::currentDateTime().toString(Qt::ISODate));
emit ProcFinished(ID, proclog);
}
}
void DProcess::updateLog(){
proclog.append( this->readAllStandardOutput() );
emit ProcessOutput(proclog);
proclog.insert(cCmd, proclog.value(cCmd).toString().append(this->readAllStandardOutput()) );
if(!uptimer->isActive()){ uptimer->start(); }
}
void DProcess::emitUpdate(){
emit ProcUpdate(ID, proclog);
}
// ================================
// Dispatcher Class
// ================================
@@ -147,46 +162,39 @@ DProcess* Dispatcher::createProcess(QString ID, QStringList cmds){
void Dispatcher::mkProcs(Dispatcher::PROC_QUEUE queue, DProcess *P){
//qDebug() << "mkProcs()";
//P->moveToThread(this->thread());
connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) );
connect(P, SIGNAL(ProcFinished(QString, QJsonObject)), this, SLOT(ProcFinished(QString, QJsonObject)) );
connect(P, SIGNAL(ProcUpdate(QString, QJsonObject)), this, SLOT(ProcUpdated(QString, QJsonObject)) );
QList<DProcess*> list;
if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list
HASH[queue] << P; //add this proc to the end of the list
CheckQueues();
}
void Dispatcher::ProcFinished(QString ID){
void Dispatcher::ProcFinished(QString ID, QJsonObject log){
//Find the process with this ID and close it down (with proper events)
qDebug() << " - Got Proc Finished Signal:" << ID;
/*bool found = false;
for(int i=0; i<enum_length && !found; i++){
Dispatcher::PROC_QUEUE queue = static_cast<Dispatcher::PROC_QUEUE>(i);
if(HASH.contains(queue)){
QList<DProcess*> list = HASH[queue];
for(int l=0; l<list.length() && !found; l++){
if(list[l]->ID==ID){
qDebug() << "Delete Proc on Finished:" << list[l]->ID;
QJsonObject obj;
obj.insert("log",list[l]->getProcLog());
obj.insert("success", list[l]->success ? "true" : "false" );
obj.insert("proc_id", ID);
obj.insert("cmd_list", QJsonArray::fromStringList( list[l]->rawcmds ) );
obj.insert("time_started", list[l]->t_started.toString(Qt::ISODate) );
obj.insert("time_finished", list[l]->t_finished.toString(Qt::ISODate) );
emit DispatchFinished(obj);
list.takeAt(l)->deleteLater();
LogManager::log(LogManager::DISPATCH, obj);
found = true;
l--;
}
} //end loop over queue list
}
}//end loop over queue enumeration*/
LogManager::log(LogManager::DISPATCH, log);
//First emit any subsystem-specific event, falling back on the raw log
QJsonObject ev = CreateDispatcherEventNotification(ID,log);
if(!ev.isEmpty()){
emit DispatchEvent(ev);
}else{
emit DispatchEvent(log);
}
QTimer::singleShot(20,this, SLOT(CheckQueues()) );
}
void Dispatcher::ProcUpdated(QString ID, QJsonObject log){
//See if this needs to generate an event
QJsonObject ev = CreateDispatcherEventNotification(ID,log);
if(!ev.isEmpty()){
emit DispatchEvent(ev);
}
}
void Dispatcher::CheckQueues(){
qDebug() << " - Check Queues";
for(int i=0; i<enum_length; i++){
for(int i=0; i<enum_length; i++){
PROC_QUEUE queue = static_cast<PROC_QUEUE>(i);
if(HASH.contains(queue)){
QList<DProcess*> list = HASH[queue];
@@ -194,16 +202,6 @@ void Dispatcher::CheckQueues(){
if(j>0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time
if( !list[j]->isRunning() ){
if(list[j]->isDone() ){
qDebug() << "Delete Proc:" << list[j]->ID;
QJsonObject obj;
obj.insert("log",list[j]->getProcLog());
obj.insert("success", list[j]->success ? "true" : "false" );
obj.insert("proc_id", list[j]->ID);
obj.insert("cmd_list", QJsonArray::fromStringList( list[j]->rawcmds ) );
obj.insert("time_started", list[j]->t_started.toString(Qt::ISODate) );
obj.insert("time_finished", list[j]->t_finished.toString(Qt::ISODate) );
emit DispatchFinished(obj);
LogManager::log(LogManager::DISPATCH, obj);
list.takeAt(j)->deleteLater();
j--;
}else{

View File

@@ -21,11 +21,11 @@ public:
//output variables for logging purposes
bool success;
QDateTime t_started, t_finished;
QStringList rawcmds; //copy of cmds at start of process
//QDateTime t_started, t_finished;
// QStringList rawcmds; //copy of cmds at start of process
//Get the current process log (can be run during/after the process runs)
QString getProcLog();
QJsonObject getProcLog();
//Process Status
bool isRunning();
bool isDone();
@@ -34,19 +34,20 @@ public slots:
void startProc();
private:
QString proclog;
QString cCmd;
QJsonObject proclog;
QTimer *uptimer;
private slots:
void cmdError(QProcess::ProcessError);
void cmdFinished(int, QProcess::ExitStatus);
void updateLog(); //readyRead() signal
void emitUpdate();
signals:
void ProcFinished(QString); //ID
void ProcFinished(QString, QJsonObject); // ID / log
//Generic signals for subsystem usage (no direct proc access later)
void ProcessOutput(QString);
void Finished(QString, int, QString); //ID, retcode, log
void ProcUpdate(QString, QJsonObject); // ID/log
};
@@ -82,12 +83,13 @@ private:
private slots:
void mkProcs(Dispatcher::PROC_QUEUE, DProcess *P);
void ProcFinished(QString ID);
void ProcFinished(QString ID, QJsonObject log);
void ProcUpdated(QString ID, QJsonObject log);
void CheckQueues();
signals:
//Main signals
void DispatchFinished(QJsonObject obj); //obj is the data associated with the process
void DispatchEvent(QJsonObject obj); //obj is the data associated with the process
void DispatchStarting(QString ID);
//Signals for private usage

View File

@@ -0,0 +1,51 @@
// ===============================
// PC-BSD REST API Server
// Available under the 3-clause BSD License
// Written by: Ken Moore <ken@pcbsd.org> 2015-2016
// =================================
// These static classes are for defining custom Dispatcher/Event notifications
// for individual subsystems
//=================================
#ifndef _PCBSD_SYSADM_DISPATCH_SUBSYSTEM_FILTER_SYSTEM_H
#define _PCBSD_SYSADM_DISPATCH_SUBSYSTEM_FILTER_SYSTEM_H
#include "globals-qt.h"
#include "EventWatcher.h"
static QJsonObject CreateDispatcherEventNotification(QString ID, QJsonObject log){
//key outputs - need to set these if an event is going to be sent out
QJsonObject args; //any arguments to send out
QString namesp, name; //the namespace/name of the subsystem used
//Quick flags/simplifications for use later
QString cCmd, cLog; //Current command/log for that command (might be a chain of commands)
cCmd = log.value("current_cmd").toString(); //This is usually empty if the proc finished
if(!cCmd.isEmpty()){ cLog = log.value(cCmd).toString(); }
bool isFinished = log.contains("return_codes/"+cCmd) || cCmd.isEmpty();
qDebug() << "Check Dispatcher Event:";
qDebug() << " - RAW LOG:" << log;
qDebug() << "cCmd:" << cCmd << "cLog:" << cLog << "isFinished:" << isFinished;
//Now parse the notification based on the dispatch ID or current command
//NOTE: There might be a random string on the end of the ID (to accomodate similar process calls)
if(ID.startsWith("sysadm_iohyve")){
namesp = "sysadm"; name="iohyve";
//Now perform additional cmd/system based filtering
if(ID.section("::",0,0)=="sysadm_iohyve_fetch" || cCmd.startsWith("iohyve fetch ")){
//Do some parsing of the log
if(isFinished){
args.insert("state","finished");
}else{
args.insert("state","running");
args.insert("progress", cLog.section("\n",-1, QString::SectionSkipEmpty)); //send the last line of the fetch
}
}
}
//Now assemble the output as needed
if(namesp.isEmpty() || name.isEmpty()){ return QJsonObject(); } //no event
args.insert("event_system",namesp+"/"+name);
return args;
}
#endif

View File

@@ -111,13 +111,13 @@ void EventWatcher::DispatchStarting(QString ID){
obj.insert("process_id", ID);
obj.insert("state", "running");
LogManager::log(LogManager::EV_DISPATCH, obj);
qDebug() << "Got Dispatch starting: sending event...";
//qDebug() << "Got Dispatch starting: sending event...";
emit NewEvent(DISPATCHER, obj);
}
void EventWatcher::DispatchFinished(QJsonObject obj){
void EventWatcher::DispatchEvent(QJsonObject obj){
LogManager::log(LogManager::EV_DISPATCH, obj);
qDebug() << "Got Dispatch Finished: sending event...";
//qDebug() << "Got Dispatch Finished: sending event...";
emit NewEvent(DISPATCHER, obj);
}

View File

@@ -55,7 +55,7 @@ public slots:
//Slots for the global Dispatcher to connect to
void DispatchStarting(QString);
void DispatchFinished(QJsonObject);
void DispatchEvent(QJsonObject);
private slots:
//File watcher signals

View File

@@ -595,10 +595,8 @@ RestOutputStruct::ExitCode WebSocket::EvaluateSysadmIohyveRequest(const QJsonVal
}
else if(act=="fetchiso"){
ok = true;
DProcess fetchproc;
out->insert("fetchiso", sysadm::Iohyve::fetchISO(in_args.toObject(), &fetchproc));
connect(&fetchproc, SIGNAL(ProcessOutput(QString)), this, SLOT(slotIohyveFetchProcessOutput(QString)) );
connect(&fetchproc, SIGNAL(Finished(QString, int, QString)), this, SLOT(slotIohyveFetchDone(QString, int, QString)) );
//DProcess fetchproc;
out->insert("fetchiso", sysadm::Iohyve::fetchISO(in_args.toObject()));
}
else if(act=="install"){
ok = true;

View File

@@ -393,8 +393,9 @@ void WebSocket::SslError(const QList<QSslError> &err){ //sslErrors() signal
// PUBLIC SLOTS
// ======================
void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QJsonValue msg){
qDebug() << "Got Socket Event Update:" << msg;
//qDebug() << "Got Socket Event Update:" << msg;
if(msg.isNull()){ msg = EVENTS->lastEvent(evtype); }
if(msg.isNull()){ return; } //nothing to send
if( !ForwardEvents.contains(evtype) ){ return; }
RestOutputStruct out;
out.CODE = RestOutputStruct::OK;

View File

@@ -44,7 +44,7 @@ QJsonObject Iohyve::createGuest(QJsonObject jsin) {
}
// Queue the fetch of an ISO
QJsonObject Iohyve::fetchISO(QJsonObject jsin, DProcess *returnproc) {
QJsonObject Iohyve::fetchISO(QJsonObject jsin) {
QJsonObject retObject;
QStringList keys = jsin.keys();
@@ -57,11 +57,10 @@ QJsonObject Iohyve::fetchISO(QJsonObject jsin, DProcess *returnproc) {
QString url = jsin.value("url").toString();
// Create a unique ID for this queued action
QString ID = QUuid::createUuid().toString();
QString ID = "sysadm_iohyve_fetch::"+QUuid::createUuid().toString();
// Queue the fetch action
returnproc = DISPATCHER->queueProcess(ID, "iohyve fetch " + url);
qDebug() << "returnproc" << returnproc;
DISPATCHER->queueProcess(ID, "iohyve fetch " + url);
// Return some details to user that the action was queued
retObject.insert("command", "iohyve fetch " + url);

View File

@@ -16,7 +16,7 @@ namespace sysadm{
class Iohyve{
public:
static QJsonObject createGuest(QJsonObject);
static QJsonObject fetchISO(QJsonObject, DProcess *);
static QJsonObject fetchISO(QJsonObject);
static QJsonObject installGuest(QJsonObject);
static QJsonObject isSetup();
static QJsonObject listVMs();

View File

@@ -140,7 +140,7 @@ int main( int argc, char ** argv )
qInstallMessageHandler(MessageOutput);
//Connect the background classes
QObject::connect(DISPATCHER, SIGNAL(DispatchFinished(QJsonObject)), EVENTS, SLOT(DispatchFinished(QJsonObject)) );
QObject::connect(DISPATCHER, SIGNAL(DispatchEvent(QJsonObject)), EVENTS, SLOT(DispatchEvent(QJsonObject)) );
QObject::connect(DISPATCHER, SIGNAL(DispatchStarting(QString)), EVENTS, SLOT(DispatchStarting(QString)) );
//Create the daemon

View File

@@ -13,7 +13,8 @@ HEADERS += globals.h globals-qt.h \
SslServer.h \
EventWatcher.h \
LogManager.h \
Dispatcher.h
Dispatcher.h \
DispatcherParsing.h
SOURCES += main.cpp \
WebServer.cpp \