Get all the threading working properly so teh dispatcher functions correctly now.

This commit is contained in:
Ken Moore
2016-02-02 13:00:49 -05:00
parent be12e18983
commit 470000b08b
3 changed files with 59 additions and 22 deletions

View File

@@ -14,9 +14,6 @@ DProcess::DProcess(QObject *parent) : QProcess(parent){
//Setup the process
this->setProcessEnvironment(QProcessEnvironment::systemEnvironment());
this->setProcessChannelMode(QProcess::MergedChannels);
//setup internal connections
connect(this, SIGNAL(finished(int, QProcess::ExitStatus)), this, SLOT(cmdFinished(int, QProcess::ExitStatus)) );
}
DProcess::~DProcess(){
@@ -27,15 +24,22 @@ DProcess::~DProcess(){
void DProcess::startProc(){
if(cmds.isEmpty()){
finished = QDateTime::currentDateTime();
t_finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
return;
}
QString cmd = cmds.takeFirst();
success = false; //not finished yet
if(!proclog.isEmpty()){ proclog.append("\n"); }
else{ started = QDateTime::currentDateTime(); } //first cmd started
else{ //first cmd started
t_started = QDateTime::currentDateTime();
rawcmds = cmds; rawcmds.prepend(cmd);
//setup internal connections
connect(this, SIGNAL(finished(int, QProcess::ExitStatus)), this, SLOT(cmdFinished(int, QProcess::ExitStatus)) );
connect(this, SIGNAL(error(QProcess::ProcessError)), this, SLOT(cmdError(QProcess::ProcessError)) );
}
proclog.append("[Running Command: "+cmd+" ]");
//qDebug() << "Proc Starting:" << ID << cmd;
this->start(cmd);
}
@@ -43,6 +47,10 @@ bool DProcess::isRunning(){
return (this->state()!=QProcess::NotRunning);
}
bool DProcess::isDone(){
return (!this->isRunning() && !proclog.isEmpty());
}
QString DProcess::getProcLog(){
//First update the internal log as needed
proclog.append( this->readAllStandardOutput() );
@@ -50,12 +58,18 @@ QString DProcess::getProcLog(){
return proclog;
}
void DProcess::cmdError(QProcess::ProcessError err){
//qDebug() << "Process Error:" << err;
cmdFinished(-1, QProcess::NormalExit);
}
void DProcess::cmdFinished(int ret, QProcess::ExitStatus status){
//determin success/failure
//determine success/failure
success = (status==QProcess::NormalExit && ret==0);
//update the log before starting another command
proclog.append( this->readAllStandardOutput() );
//Now run any additional commands
//qDebug() << "Proc Finished:" << ID << success << proclog;
if(success){ startProc(); }//will emit the finished signal as needed if no more commands
else{
if(status==QProcess::NormalExit){
@@ -63,7 +77,7 @@ void DProcess::cmdFinished(int ret, QProcess::ExitStatus status){
}else{
proclog.append("\n[Command Failed: Process Crashed ]");
}
finished = QDateTime::currentDateTime();
t_finished = QDateTime::currentDateTime();
emit ProcFinished(ID);
}
}
@@ -102,6 +116,7 @@ void Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QString
void Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QStringList cmds){
//This is the primary queueProcess() function - all the overloads end up here to do the actual work
//For multi-threading, need to emit a signal/slot for this action (object creations need to be in same thread as parent)
qDebug() << "Queue Process:" << queue << ID << cmds;
emit mkProcs(queue, ID, cmds);
}
@@ -109,6 +124,7 @@ void Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QStringL
//Simplification routine for setting up a process
DProcess* Dispatcher::createProcess(QString ID, QStringList cmds){
DProcess* P = new DProcess();
P->moveToThread(this->thread());
P->cmds = cmds;
P->ID = ID;
connect(P, SIGNAL(ProcFinished(QString)), this, SLOT(ProcFinished(QString)) );
@@ -117,16 +133,19 @@ DProcess* Dispatcher::createProcess(QString ID, QStringList cmds){
// === PRIVATE SLOTS ===
void Dispatcher::mkProcs(Dispatcher::PROC_QUEUE queue, QString ID, QStringList cmds){
//qDebug() << " - Create Process:" << queue << ID << cmds;
DProcess *P = createProcess(ID, cmds);
QList<DProcess*> list;
if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list
HASH[queue] << P; //add this proc to the end of the list
if(queue==NO_QUEUE || HASH[queue].length()==1){ P->startProc(); } //go ahead and start it now
else{ CheckQueues(); }
//if(queue==NO_QUEUE || HASH[queue].length()==1){ P->startProc(); } //go ahead and start it now
//QTimer::singleShot(20,this, SLOT(CheckQueues()) );
CheckQueues();
}
void Dispatcher::ProcFinished(QString ID){
//Find the process with this ID and close it down (with proper events)
//qDebug() << " - Got Proc Finished Signal:" << ID;
bool found = false;
for(int i=0; i<enum_length && !found; i++){
Dispatcher::PROC_QUEUE queue = static_cast<Dispatcher::PROC_QUEUE>(i);
@@ -139,8 +158,8 @@ void Dispatcher::ProcFinished(QString ID){
obj.insert("success", list[l]->success ? "true" : "false" );
obj.insert("proc_id", ID);
obj.insert("cmd_list", QJsonArray::fromStringList( list[l]->rawcmds ) );
obj.insert("time_started", list[l]->started.toString(Qt::ISODate) );
obj.insert("time_finished", list[l]->finished.toString(Qt::ISODate) );
obj.insert("time_started", list[l]->t_started.toString(Qt::ISODate) );
obj.insert("time_finished", list[l]->t_finished.toString(Qt::ISODate) );
emit DispatchFinished(ID, list[l]->success);
delete list.takeAt(l);
LogManager::log(LogManager::DISPATCH, obj);
@@ -149,7 +168,7 @@ void Dispatcher::ProcFinished(QString ID){
} //end loop over queue list
}
}//end loop over queue enumeration
CheckQueues();
QTimer::singleShot(20,this, SLOT(CheckQueues()) );
}
void Dispatcher::CheckQueues(){
@@ -159,10 +178,24 @@ for(int i=0; i<enum_length; i++){
QList<DProcess*> list = HASH[queue];
for(int j=0; j<list.length(); j++){
if(j>0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time
if(!list[j]->isRunning() && list[j]->getProcLog().isEmpty()){
//Need to start this one - has not run yet
emit DispatchStarting(list[j]->ID);
list[j]->startProc();
if( !list[j]->isRunning() ){
if(list[j]->isDone() ){
QJsonObject obj;
obj.insert("log",list[j]->getProcLog());
obj.insert("success", list[j]->success ? "true" : "false" );
obj.insert("proc_id", list[j]->ID);
obj.insert("cmd_list", QJsonArray::fromStringList( list[j]->rawcmds ) );
obj.insert("time_started", list[j]->t_started.toString(Qt::ISODate) );
obj.insert("time_finished", list[j]->t_finished.toString(Qt::ISODate) );
emit DispatchFinished(list[j]->ID, list[j]->success);
LogManager::log(LogManager::DISPATCH, obj);
delete list.takeAt(j);
j--;
}else{
//Need to start this one - has not run yet
emit DispatchStarting(list[j]->ID);
QTimer::singleShot(0,list[j], SLOT(startProc()) );
}
}
} //end loop over list
}

View File

@@ -22,20 +22,23 @@ public:
//output variables for logging purposes
bool success;
QDateTime started, finished;
QDateTime t_started, t_finished;
QStringList rawcmds; //copy of cmds at start of process
void startProc();
//Get the current process log (can be run during/after the process runs)
QString getProcLog();
//Process Status
bool isRunning();
bool isDone();
public slots:
void startProc();
private:
QString proclog;
private slots:
void cmdError(QProcess::ProcessError);
void cmdFinished(int, QProcess::ExitStatus);
signals:

View File

@@ -89,10 +89,11 @@ int main( int argc, char ** argv )
//Start the daemon
int ret = 1; //error return value
if( w->startServer(port, websocket) ){
QThread TBACK;
QThread TBACK, TBACK2;
EVENTS->moveToThread(&TBACK);
DISPATCHER->moveToThread(&TBACK);
DISPATCHER->moveToThread(&TBACK2);
TBACK.start();
TBACK2.start();
QTimer::singleShot(0,EVENTS, SLOT(start()) );
//Now start the main event loop
ret = a.exec();