Fix up the dispatcher queueing system (for real this time). Now the processes will properly get removed when they are finished, and the next process started up.

This commit is contained in:
Ken Moore
2016-03-29 20:21:43 -04:00
parent f66656675b
commit 08af33c778

View File

@@ -109,7 +109,8 @@ void DProcess::emitUpdate(){
// ================================
Dispatcher::Dispatcher(){
qRegisterMetaType<Dispatcher::PROC_QUEUE>("Dispatcher::PROC_QUEUE");
//connect(this, SIGNAL(mkprocs(Dispatcher::PROC_QUEUE, DProcess*)), this, SLOT(mkProcs(Dispatcher::PROC_QUEUE, DProcess*)) );
connect(this, SIGNAL(mkprocs(Dispatcher::PROC_QUEUE, DProcess*)), this, SLOT(mkProcs(Dispatcher::PROC_QUEUE, DProcess*)) );
connect(this, SIGNAL(checkProcs()), this, SLOT(CheckQueues()) );
}
Dispatcher::~Dispatcher(){
@@ -118,8 +119,8 @@ Dispatcher::~Dispatcher(){
void Dispatcher::start(QString queuefile){
//Setup connections here (in case it was moved to different thread after creation)
connect(this, SIGNAL(mkprocs(Dispatcher::PROC_QUEUE, DProcess*)), this, SLOT(mkProcs(Dispatcher::PROC_QUEUE, DProcess*)) );
connect(this, SIGNAL(checkProcs()), this, SLOT(checkQueues()) );
//connect(this, SIGNAL(mkprocs(Dispatcher::PROC_QUEUE, DProcess*)), this, SLOT(mkProcs(Dispatcher::PROC_QUEUE, DProcess*)) );
//connect(this, SIGNAL(checkProcs()), this, SLOT(checkQueues()) );
//load any previously-unrun processes
// TO DO
}
@@ -144,7 +145,7 @@ DProcess* Dispatcher::queueProcess(Dispatcher::PROC_QUEUE queue, QString ID, QSt
//For multi-threading, need to emit a signal/slot for this action (object creations need to be in same thread as parent)
//qDebug() << "Queue Process:" << queue << ID << cmds;
DProcess *P = createProcess(ID, cmds);
emit mkprocs(queue, P);
this->emit mkprocs(queue, P);
return P;
}
@@ -162,12 +163,14 @@ DProcess* Dispatcher::createProcess(QString ID, QStringList cmds){
void Dispatcher::mkProcs(Dispatcher::PROC_QUEUE queue, DProcess *P){
//qDebug() << "mkProcs()";
//P->moveToThread(this->thread());
QList<DProcess*> list = HASH.value(queue);
list << P;
//qDebug() << " - add to queue:" << queue;
HASH.insert(queue,list);
connect(P, SIGNAL(ProcFinished(QString, QJsonObject)), this, SLOT(ProcFinished(QString, QJsonObject)) );
connect(P, SIGNAL(ProcUpdate(QString, QJsonObject)), this, SLOT(ProcUpdated(QString, QJsonObject)) );
QList<DProcess*> list;
if(!HASH.contains(queue)){ HASH.insert(queue, list); } //insert an empty list
HASH[queue] << P; //add this proc to the end of the list
emit checkProcs();
QTimer::singleShot(30, this, SIGNAL(checkProcs()) );
//this->emit checkProcs();
}
void Dispatcher::ProcFinished(QString ID, QJsonObject log){
@@ -182,7 +185,8 @@ void Dispatcher::ProcFinished(QString ID, QJsonObject log){
emit DispatchEvent(log);
}
emit checkProcs();
QTimer::singleShot(30, this, SIGNAL(checkProcs()) );
//this->emit checkProcs();
}
void Dispatcher::ProcUpdated(QString ID, QJsonObject log){
@@ -194,15 +198,22 @@ void Dispatcher::ProcUpdated(QString ID, QJsonObject log){
}
void Dispatcher::CheckQueues(){
//qDebug() << "Check Queues...";
for(int i=0; i<enum_length; i++){
PROC_QUEUE queue = static_cast<PROC_QUEUE>(i);
//qDebug() << "Got queue:" << queue;
if(HASH.contains(queue)){
//qDebug() << "Hash has queue";
QList<DProcess*> list = HASH[queue];
//qDebug() << "Length:" << list.length();
for(int j=0; j<list.length(); j++){
//qDebug() << "Check Proc:" << list[j]->ID;
if(j>0 && queue!=NO_QUEUE){ break; } //done with this - only first item in these queues should run at a time
if( !list[j]->isRunning() ){
if(list[j]->isDone() ){
//qDebug() << "Remove Finished Proc:" << list[j]->ID;
list.takeAt(j)->deleteLater();
HASH.insert(queue, list); //replace the list in the hash since it changed
j--;
}else{
//Need to start this one - has not run yet