Finish adding in the new dispatcher system (untested). This should be ready for trying out in some subsystem now.

Note: to queue up a job you just need to run one of the DISPATCHER->queue() functions (there are a few overloaded versions for simplicity)
This commit is contained in:
Ken Moore
2016-01-29 16:24:40 -05:00
parent 188f20ff16
commit 04c366f114
9 changed files with 91 additions and 42 deletions

View File

@@ -20,7 +20,7 @@ DProcess::DProcess(QObject *parent) : QProcess(parent){
}
DProcess::~DProcess(){
if(this->state()!=QProcess::NotRunning)){
if( this->state()!=QProcess::NotRunning ){
this->terminate();
}
}
@@ -30,11 +30,14 @@ void DProcess::startProc(){
finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
return;
}else if(proclog.isEmpty()){
started = QDateTime::currentDateTime(); //first cmd started
rawcmds = cmds;
}else{
proclog.append("\n");
}
QString cmd = cmds.takeFirst();
success = false; //not finished yet
if(!proclog.isEmpty()){ proclog.append("\n"); }
else{ started = QDateTime::currentDateTime(); } //first cmd started
proclog.append("[Running Command: "+cmd+" ]");
this->start(cmd);
}
@@ -105,8 +108,10 @@ void Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QStringL
QList<DProcess*> list;
if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list
HASH[queue] << P; //add this proc to the end of the list
if(queue==NO_QUEUE || HASH[queue].length()==1){ P->startProc(); } //go ahead and start it now
else{ CheckQueues(); }
if(queue==NO_QUEUE || HASH[queue].length()==1){
emit DispatchStarting(P->ID);
P->startProc(); //go ahead and start it now
}else{ CheckQueues(); }
}
// === PRIVATE ===
@@ -115,6 +120,7 @@ DProcess* Dispatcher:: createProcess(QString ID, QStringList cmds){
DProcess* P = new DProcess(this);
P->cmds = cmds;
P->ID = ID;
P->success = false;
connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) );
return P;
}
@@ -122,22 +128,23 @@ DProcess* Dispatcher:: createProcess(QString ID, QStringList cmds){
// === PRIVATE SLOTS ===
void Dispatcher::ProcFinished(QString ID){
//Find the process with this ID and close it down (with proper events)
for int i=0; i<enum_length; i++){
PROC_QUEUE queue = static_cast(PROC_QUEUE>(i);
bool found = false;
for(int i=0; i<enum_length && !found; i++){
Dispatcher::PROC_QUEUE queue = static_cast<Dispatcher::PROC_QUEUE>(i);
if(HASH.contains(queue)){
QList<DProcess*> list = HASH[queue];
bool found = false;
for(int l=0; l<list.length() && !found; l++){
if(list[l].ID==ID){
if(list[l]->ID==ID){
QJsonObject obj;
obj.insert("log",list[l].procLog());
obj.insert("success", list[l].success ? "true" : "false" );
obj.insert("log",list[l]->getProcLog());
obj.insert("success", list[l]->success ? "true" : "false" );
obj.insert("proc_id", ID);
obj.insert("cmd_list", QJsonArray::fromStringList( list[l].rawcmds );
obj.insert("time_started", list[l].started.toString(QT::ISODate) );
obj.insert("time_finished", list[l].finished.toString(QT::ISODate) );
obj.insert("cmd_list", QJsonArray::fromStringList( list[l]->rawcmds ) );
obj.insert("time_started", list[l]->started.toString(Qt::ISODate) );
obj.insert("time_finished", list[l]->finished.toString(Qt::ISODate) );
emit DispatchFinished(ID, list[l]->success);
delete list.takeAt(l);
LogManager::log(LogManager::EV_DISPATCH, obj);
LogManager::log(LogManager::DISPATCH, obj);
found = true;
}
} //end loop over queue list
@@ -147,5 +154,19 @@ void Dispatcher::ProcFinished(QString ID){
}
void Dispatcher::CheckQueues(){
for(int i=0; i<enum_length; i++){
PROC_QUEUE queue = static_cast<PROC_QUEUE>(i);
if(HASH.contains(queue)){
QList<DProcess*> list = HASH[queue];
for(int j=0; j<list.length(); j++){
if(j>0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time
if(!list[j]->isRunning() && list[j]->getProcLog().isEmpty()){
//Need to start this one - has not run yet
emit DispatchStarting(list[j]->ID);
list[j]->startProc();
}
} //end loop over list
}
} //end loop over queue types
}

View File

@@ -14,7 +14,7 @@
class DProcess : public QProcess{
Q_OBJECT
public:
DProcess(QObject parent = 0);
DProcess(QObject *parent = 0);
~DProcess();
QString ID;
@@ -68,16 +68,10 @@ private:
QString queue_file;
//Internal lists
QHash<PROC_QUEUE, QList<DProcess*> > QUEUES;
QHash<PROC_QUEUE, QList<DProcess*> > HASH;
//Simplification routine for setting up a process
DProcess* createProcess(QString ID, QStringList cmds){
DProcess* P = new DProcess(this);
P->cmds = cmds;
P->ID = ID;
connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) );
return P;
}
DProcess* createProcess(QString ID, QStringList cmds);
private slots:
void ProcFinished(QString ID);

View File

@@ -5,6 +5,8 @@
// =================================
#include "EventWatcher.h"
#include "globals.h"
// === PUBLIC ===
EventWatcher::EventWatcher(){
starting = true;
@@ -24,9 +26,9 @@ EventWatcher::~EventWatcher(){
void EventWatcher::start(){
// - DISPATCH Events
starting = true;
if(!QFile::exists(DISPATCHWORKING)){ QProcess::execute("touch "+DISPATCHWORKING); }
//if(!QFile::exists(DISPATCHWORKING)){ QProcess::execute("touch "+DISPATCHWORKING); }
//qDebug() << " Dispatcher Events:" << DISPATCHWORKING;
WatcherUpdate(DISPATCHWORKING); //load it initially (will also add it to the watcher)
//WatcherUpdate(DISPATCHWORKING); //load it initially (will also add it to the watcher)
// - Life Preserver Events
WatcherUpdate(LPLOG); //load it initially (will also add it to the watcher);
WatcherUpdate(LPERRLOG); //load it initially (will also add it to the watcher);
@@ -86,10 +88,29 @@ double EventWatcher::displayToDoubleK(QString displayNumber){
return num;
}
// === PUBLIC SLOTS ===
//Slots for the global Dispatcher to connect to
void EventWatcher::DispatchStarting(QString ID){
QJsonObject obj;
obj.insert("process_id", ID);
obj.insert("state", "running");
LogManager::log(LogManager::EV_DISPATCH, obj);
emit NewEvent(DISPATCHER, obj);
}
void EventWatcher::DispatchFinished(QString ID, bool success){
QJsonObject obj;
obj.insert("process_id", ID);
obj.insert("state", "finished");
obj.insert("result", success ? "success" : "failure");
LogManager::log(LogManager::EV_DISPATCH, obj);
emit NewEvent(DISPATCHER, obj);
}
// === PRIVATE SLOTS ===
void EventWatcher::WatcherUpdate(const QString &path){
if(!starting){ qDebug() << "Event Watcher Update:" << path; }
if(path==DISPATCHWORKING){
/*if(path==DISPATCHWORKING){
//Read the file contents
QString stat = readFile(DISPATCHWORKING);
if(stat.simplified().isEmpty()){ stat = "idle"; }
@@ -97,7 +118,8 @@ void EventWatcher::WatcherUpdate(const QString &path){
HASH.insert(DISPATCHER,stat); //save for later
//Forward those contents on to the currently-open sockets
emit NewEvent(DISPATCHER, QJsonValue(stat) );
}else if(path==LPLOG){
}else*/
if(path==LPLOG){
//Main Life Preserver Log File
ReadLPLogFile();
}else if(path==LPERRLOG){
@@ -121,7 +143,7 @@ void EventWatcher::CheckLogFiles(){
if(!watched.contains(LPLOG) && QFile::exists(LPLOG)){ watcher->addPath(LPLOG); }
if(!watched.contains(LPERRLOG) && QFile::exists(LPERRLOG)){ watcher->addPath(LPERRLOG); }
if(!watched.contains(tmpLPRepFile) && QFile::exists(tmpLPRepFile)){ watcher->addPath(tmpLPRepFile); }
if(!watched.contains(DISPATCHWORKING) && QFile::exists(LPLOG)){ watcher->addPath(DISPATCHWORKING); }
//if(!watched.contains(DISPATCHWORKING) && QFile::exists(LPLOG)){ watcher->addPath(DISPATCHWORKING); }
//qDebug() << "watched:" << watcher->files() << watcher->directories();
}

View File

@@ -8,7 +8,7 @@
#include "globals-qt.h"
#define DISPATCHWORKING QString("/var/tmp/appcafe/dispatch-queue.working")
//#define DISPATCHWORKING QString("/var/tmp/appcafe/dispatch-queue.working")
#define LPLOG QString("/var/log/lpreserver/lpreserver.log")
#define LPERRLOG QString("/var/log/lpreserver/error.log")
#define LPREPLOGDIR QString("/var/log/lpreserver/")
@@ -49,6 +49,10 @@ private:
public slots:
void start();
//Slots for the global Dispatcher to connect to
void DispatchStarting(QString);
void DispatchFinished(QString, bool);
private slots:
//File watcher signals
void WatcherUpdate(const QString&);

View File

@@ -15,7 +15,7 @@
//===========================================
// Event Files (EV_*): JSON input/output (full event)
// HOST: String input/output (simple messages)
//
// DISPATCH: Full log of dispatcher processes
//===========================================
#define LOGDIR QString("/var/log/sysadm")
@@ -24,11 +24,12 @@ class LogManager{
public:
//Enumeration of common log files (will automatically use proper file)
// === ADD NEW FILE SUPPORT HERE ===
enum LOG_FILE {HOST, EV_DISPATCH, EV_LP};
enum LOG_FILE {HOST, DISPATCH, EV_DISPATCH, EV_LP};
//Conversion function for flag->path
static QString flagToPath(LogManager::LOG_FILE flag){
QString filepath;
if(flag==HOST){ filepath.append("hostinfo"); }
else if(flag==DISPATCH){ filepath.append("dispatcher"); }
else if(flag==EV_DISPATCH){ filepath.append("events-dispatcher"); }
else if(flag==EV_LP){ filepath.append("events-lifepreserver"); }
else{ return ""; } //invalid file given

View File

@@ -35,10 +35,10 @@ RestOutputStruct::ExitCode WebSocket::AvailableSubsystems(bool allaccess, QJsonO
out->insert("rpc/syscache","read"); //no write to syscache - only reads
}
// - dispatcher
if(DispatcherClient::DispatcherAvailable()){
//if(DispatcherClient::DispatcherAvailable()){
//"read" is the event notifications, "write" is the ability to queue up jobs
out->insert("rpc/dispatcher", allaccess ? "read/write" : "read");
}
//}
// - network
out->insert("sysadm/network","read/write");
@@ -143,7 +143,7 @@ RestOutputStruct::ExitCode WebSocket::EvaluateDispatcherRequest(const QJsonValue
}else{ return RestOutputStruct::BADREQUEST; }
//Run the Request (should be one value for each in_req)
QStringList values = DispatcherClient::parseInputs(in_req, AUTHSYSTEM);;
QStringList values = DispatcherClient::parseInputs(in_req, AUTHSYSTEM);
while(values.length() < in_req.length()){ values << "[ERROR]"; } //ensure lists are same length
//Format the result

View File

@@ -8,15 +8,15 @@
#include "globals-qt.h"
#include "LogManager.h"
//Global variables/classes (intially created in main.cpp)
extern QSettings *CONFIG;
#include "EventWatcher.h"
extern EventWatcher *EVENTS;
//#include "ProcessQueue.h"
//extern ProcessQueue *PQUEUE;
//#include "LogManager.h"
//extern LogManager *LOGS;
#include "Dispatcher.h"
extern Dispatcher *DISPATCHER;
//Special defines
#define WSPORTNUMBER 12150 // WebSocket server default port

View File

@@ -15,6 +15,7 @@
//Create any global classes
QSettings *CONFIG = new QSettings("PCBSD","sysadm");
EventWatcher *EVENTS = new EventWatcher();
Dispatcher *DISPATCHER = new Dispatcher();
//Create the default logfile
QFile logfile;
@@ -76,6 +77,10 @@ int main( int argc, char ** argv )
logfile.open(QIODevice::WriteOnly | QIODevice::Append);
qInstallMessageHandler(MessageOutput);
//Connect the background classes
QObject::connect(DISPATCHER, SIGNAL(DispatchFinished(QString, bool)), EVENTS, SLOT(DispatchFinished(QString,bool)) );
QObject::connect(DISPATCHER, SIGNAL(DispatchStarting(QString)), EVENTS, SLOT(DispatchStarting(QString)) );
//Create the daemon
qDebug() << "Starting the PC-BSD sysadm server...." << (websocket ? "(WebSocket)" : "(TCP)");
WebServer *w = new WebServer();

View File

@@ -13,7 +13,8 @@ HEADERS += globals.h globals-qt.h \
AuthorizationManager.h \
SslServer.h \
EventWatcher.h \
LogManager.h
LogManager.h \
Dispatcher.h
SOURCES += main.cpp \
WebServer.cpp \
@@ -23,7 +24,8 @@ SOURCES += main.cpp \
dispatcher-client.cpp \
AuthorizationManager.cpp \
EventWatcher.cpp \
LogManager.cpp
LogManager.cpp \
Dispatcher.cpp
TARGET=sysadm-server