Fix up the file watcher system in the events class. Now it actually functions in it's own thread.

This commit is contained in:
Ken Moore
2016-01-20 11:42:41 -05:00
parent 539e2bf64e
commit 155026aab1
6 changed files with 184 additions and 187 deletions

View File

@@ -8,28 +8,24 @@
// === PUBLIC ===
EventWatcher::EventWatcher(){
starting = true;
LPlog_pos = LPrep_pos = LPerr_pos = 0; //no pos yet
watcher = new QFileSystemWatcher(this);
filechecktimer = new QTimer(this);
filechecktimer->setSingleShot(false);
filechecktimer->setInterval(3600000); //1-hour checks (also run on new event notices)
LPlogfile = LPrepfile = LPerrfile = 0;
connect(watcher, SIGNAL(fileChanged(const QString&)), this, SLOT(WatcherUpdate(const QString&)) );
connect(watcher, SIGNAL(directoryChanged(const QString&)), this, SLOT(WatcherUpdate(const QString&)) );
connect(filechecktimer, SIGNAL(timeout()), this, SLOT( CheckLogFiles()) );
}
EventWatcher::~EventWatcher(){
if(LPlogfile!=0 && LPlogfile->isOpen()){ LPlogfile->close(); }
if(LPerrfile!=0 && LPerrfile->isOpen()){ LPerrfile->close(); }
if(LPrepfile!=0 && LPrepfile->isOpen()){ LPrepfile->close(); }
}
void EventWatcher::start(){
// - DISPATCH Events
starting = true;
if(!QFile::exists(DISPATCHWORKING)){ QProcess::execute("touch "+DISPATCHWORKING); }
qDebug() << " Dispatcher Events:" << DISPATCHWORKING;
//watcher->addPath(DISPATCHWORKING);
//qDebug() << " Dispatcher Events:" << DISPATCHWORKING;
WatcherUpdate(DISPATCHWORKING); //load it initially (will also add it to the watcher)
// - Life Preserver Events
WatcherUpdate(LPLOG); //load it initially (will also add it to the watcher);
@@ -47,179 +43,19 @@ EventWatcher::EVENT_TYPE EventWatcher::typeFromString(QString typ){
QJsonValue EventWatcher::lastEvent(EVENT_TYPE type){
if(HASH.contains(type)){ return HASH.value(type); }
else{ return ""; }
else{ qDebug() << "No saved event:" << type; return QJsonValue(); }
}
// === PRIVATE ===
// == Life Preserver Event Functions
void EventWatcher::ReadLPLogFile(){
if(LPlogfile==0){
if(!QFile::exists(LPLOG)){ return; } //no file
LPlogfile = new QFile(LPLOG);
}
if(!LPlogfile->isOpen()){
if( !LPlogfile->open(QIODevice::ReadOnly) ){ return; } //could not open file
}
QTextStream STREAM(LPlogfile);
while(!STREAM.atEnd()){
QString log = STREAM.readLine();
if(!starting){ qDebug() << "Read LP Log File Line:" << log; }
//Divide up the log into it's sections
QString timestamp = log.section(":",0,2).simplified();
QString time = timestamp.section(" ",3,3).simplified();
QString message = log.section(":",3,3).toLower().simplified();
QString dev = log.section(":",4,4).simplified(); //dataset/snapshot/nothing
//Now decide what to do/show because of the log message
if(message.contains("creating snapshot", Qt::CaseInsensitive)){
dev = message.section(" ",-1).simplified();
QString msg = QString(tr("New snapshot of %1")).arg(dev);
//Setup the status of the message
HASH.insert(110,"SNAPCREATED");
HASH.insert(111,dev); //dataset
HASH.insert(112, msg ); //summary
HASH.insert(113, QString(tr("Creating snapshot for %1")).arg(dev) );
HASH.insert(114, timestamp); //full timestamp
HASH.insert(115, time); // time only
if(!starting){ emit sendLPEvent("snapshot", 1, timestamp+": "+msg); }
}else if(message.contains("Starting replication", Qt::CaseInsensitive)){
//Setup the file watcher for this new log file
//qDebug() << " - Found Rep Start:" << dev << message;
tmpLPRepFile = dev;
dev = message.section(" on ",1,1,QString::SectionSkipEmpty);
//qDebug() << " - New Dev:" << dev << "Valid Pools:" << reppools;
//Make sure the device is currently setup for replication
//if( !reppools.contains(dev) ){ FILE_REPLICATION.clear(); continue; }
watcher->addPath(tmpLPRepFile);
QString msg = QString(tr("Starting replication for %1")).arg(dev);
//Set the appropriate status variables
HASH.insert(120,"STARTED");
HASH.insert(121, dev); //zpool
HASH.insert(122, tr("Replication Starting") ); //summary
HASH.insert(123, msg ); //Full message
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126,tr("Replication Log")+" <"+tmpLPRepFile+">"); //log file
if(!starting){ emit sendLPEvent("replication", 1, timestamp+": "+msg); }
}else if(message.contains("finished replication task", Qt::CaseInsensitive)){
//Done with this replication - close down the rep file watcher
watcher->removePath(tmpLPRepFile);
tmpLPRepFile.clear();
if(LPrepfile!=0 && LPrepfile->isOpen()){ LPrepfile->close(); }
dev = message.section(" -> ",0,0).section(" ",-1).simplified();
//Make sure the device is currently setup for replication
//if( reppools.contains(dev) ){
QString msg = QString(tr("Finished replication for %1")).arg(dev);
//Now set the status of the process
HASH.insert(120,"FINISHED");
HASH.insert(121,dev); //dataset
HASH.insert(122, tr("Finished Replication") ); //summary
HASH.insert(123, msg );
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126, ""); //clear the log file entry
if(!starting){ emit sendLPEvent("replication", 1, timestamp+": "+msg); }
}else if( message.contains("FAILED replication", Qt::CaseInsensitive) ){
watcher->removePath(tmpLPRepFile);
tmpLPRepFile.clear();
if(LPrepfile!=0 && LPrepfile->isOpen()){ LPrepfile->close(); }
//Now set the status of the process
dev = message.section(" -> ",0,0).section(" ",-1).simplified();
//Make sure the device is currently setup for replication
//Update the HASH
QString file = log.section("LOGFILE:",1,1).simplified();
QString tt = QString(tr("Replication Failed for %1")).arg(dev) +"\n"+ QString(tr("Logfile available at: %1")).arg(file);
HASH.insert(120,"ERROR");
HASH.insert(121,dev); //dataset
HASH.insert(122, tr("Replication Failed") ); //summary
HASH.insert(123, tt );
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126, tr("Replication Error Log")+" <"+file+">" );
if(!starting){ emit sendLPEvent("replication", 7, timestamp+": "+tt); }
}
}
}
void EventWatcher::ReadLPErrFile(){
}
void EventWatcher::ReadLPRepFile(){
static QString stat = "";
static QString repTotK = "";
static QString lastSize = "";
if(LPrepfile==0){
if(!QFile::exists(tmpLPRepFile)){ return; } //no file
LPrepfile = new QFile(this);
}
if(LPrepfile->fileName()!=tmpLPRepFile){
//Move over to the proper file
if(LPrepfile->isOpen()){ LPrepfile->close(); }
LPrepfile->setFileName(tmpLPRepFile);
}
if(!LPrepfile->isOpen()){
if( !LPrepfile->open(QIODevice::ReadOnly) ){ return; } //could not open file
stat.clear();
repTotK.clear();
lastSize.clear();
}
QTextStream STREAM(LPrepfile);
while( !STREAM.atEnd() ){
QString line = STREAM.readLine();
if(line.contains("estimated size is")){ repTotK = line.section("size is ",1,1,QString::SectionSkipEmpty).simplified(); } //save the total size to replicate
else if(line.startsWith("send from ")){}
else if(line.startsWith("TIME ")){}
else if(line.startsWith("warning: ")){} //start of an error
else{ stat = line; } //only save the relevant/latest status line
}
if(!stat.isEmpty()){
//qDebug() << "New Status Message:" << stat;
//Divide up the status message into sections
stat.replace("\t"," ");
QString dataset = stat.section(" ",2,2,QString::SectionSkipEmpty).section("/",0,0).simplified();
QString cSize = stat.section(" ",1,1,QString::SectionSkipEmpty);
//Now Setup the tooltip
if(cSize != lastSize){ //don't update the info if the same size info
QString percent;
if(!repTotK.isEmpty() && repTotK!="??"){
//calculate the percentage
double tot = displayToDoubleK(repTotK);
double c = displayToDoubleK(cSize);
if( tot!=-1 & c!=-1){
double p = (c*100)/tot;
p = int(p*10)/10.0; //round to 1 decimel places
percent = QString::number(p) + "%";
}
}
if(repTotK.isEmpty()){ repTotK = "??"; }
//Format the info string
QString status = cSize+"/"+repTotK;
if(!percent.isEmpty()){ status.append(" ("+percent+")"); }
QString txt = QString(tr("Replicating %1: %2")).arg(dataset, status);
lastSize = cSize; //save the current size for later
//Now set the current process status
HASH.insert(120,"RUNNING");
HASH.insert(121,dataset);
HASH.insert(122,txt);
HASH.insert(123,txt);
emit sendLPEvent("replication", 0, txt);
}
}
}
void EventWatcher::sendLPEvent(QString system, int priority, QString msg){
QJsonObject obj;
obj.insert("message",msg);
obj.insert("priority", DisplayPriority(priority) );
obj.insert("class" , system);
emit NewEvent(LIFEPRESERVER, obj);
HASH.insert(LIFEPRESERVER, obj);
//qDebug() << "New LP Event Object:" << obj;
if(!starting){ emit NewEvent(LIFEPRESERVER, obj); }
}
// === General Purpose Functions
@@ -270,12 +106,12 @@ void EventWatcher::WatcherUpdate(const QString &path){
}else if(path==tmpLPRepFile){
//Life Preserver Replication Log (currently-running replication)
ReadLPRepFile();
}else{
//This file should no longer be watched (old replication file?)
if(watcher->files().contains(path) || watcher->directories().contains(path)){
watcher->removePath(path);
}
}
//Make sure this file/dir is not removed from the watcher
/*if(!watcher->files().contains(path) && !watcher->directories().contains(path)){
watcher->addPath(path); //re-add it to the watcher. This happens when the file is removed/re-created instead of just overwritten
}*/
CheckLogFiles(); //check for any other missing files
}
@@ -286,5 +122,161 @@ void EventWatcher::CheckLogFiles(){
if(!watched.contains(LPERRLOG) && QFile::exists(LPERRLOG)){ watcher->addPath(LPERRLOG); }
if(!watched.contains(tmpLPRepFile) && QFile::exists(tmpLPRepFile)){ watcher->addPath(tmpLPRepFile); }
if(!watched.contains(DISPATCHWORKING) && QFile::exists(LPLOG)){ watcher->addPath(DISPATCHWORKING); }
qDebug() << "watched:" << watcher->files() << watcher->directories();
//qDebug() << "watched:" << watcher->files() << watcher->directories();
}
// == Life Preserver Event Functions
void EventWatcher::ReadLPLogFile(){
//Open/Read any new info in the file
QFile LPlogfile(LPLOG);
if( !LPlogfile.open(QIODevice::ReadOnly) ){ return; } //could not open file
QTextStream STREAM(&LPlogfile);
if(LPlog_pos>0){ STREAM.seek(LPlog_pos); }
QStringList info = STREAM.readAll().split("\n");
LPlog_pos = STREAM.pos();
LPlogfile.close();
//Now parse the new info line-by-line
for(int i=0; i<info.length(); i++){
if(info[i].isEmpty()){ continue; }
QString log = info[i];
if(!starting){ qDebug() << "Read LP Log File Line:" << log; }
//Divide up the log into it's sections
QString timestamp = log.section(":",0,2).simplified();
QString time = timestamp.section(" ",3,3).simplified();
QString message = log.section(":",3,3).toLower().simplified();
QString dev = log.section(":",4,4).simplified(); //dataset/snapshot/nothing
//Now decide what to do/show because of the log message
if(message.contains("creating snapshot", Qt::CaseInsensitive)){
dev = message.section(" ",-1).simplified();
QString msg = QString(tr("New snapshot of %1")).arg(dev);
//Setup the status of the message
HASH.insert(110,"SNAPCREATED");
HASH.insert(111,dev); //dataset
HASH.insert(112, msg ); //summary
HASH.insert(113, QString(tr("Creating snapshot for %1")).arg(dev) );
HASH.insert(114, timestamp); //full timestamp
HASH.insert(115, time); // time only
sendLPEvent("snapshot", 1, timestamp+": "+msg);
}else if(message.contains("Starting replication", Qt::CaseInsensitive)){
//Setup the file watcher for this new log file
//qDebug() << " - Found Rep Start:" << dev << message;
tmpLPRepFile = dev;
LPrep_pos = 0; //reset file position
dev = message.section(" on ",1,1,QString::SectionSkipEmpty);
//qDebug() << " - New Dev:" << dev << "Valid Pools:" << reppools;
//Make sure the device is currently setup for replication
//if( !reppools.contains(dev) ){ FILE_REPLICATION.clear(); continue; }
QString msg = QString(tr("Starting replication for %1")).arg(dev);
//Set the appropriate status variables
HASH.insert(120,"STARTED");
HASH.insert(121, dev); //zpool
HASH.insert(122, tr("Replication Starting") ); //summary
HASH.insert(123, msg ); //Full message
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126,tr("Replication Log")+" <"+tmpLPRepFile+">"); //log file
sendLPEvent("replication", 1, timestamp+": "+msg);
}else if(message.contains("finished replication task", Qt::CaseInsensitive)){
//Done with this replication - close down the rep file watcher
tmpLPRepFile.clear();
LPrep_pos = 0; //reset file position
dev = message.section(" -> ",0,0).section(" ",-1).simplified();
//Make sure the device is currently setup for replication
//if( reppools.contains(dev) ){
QString msg = QString(tr("Finished replication for %1")).arg(dev);
//Now set the status of the process
HASH.insert(120,"FINISHED");
HASH.insert(121,dev); //dataset
HASH.insert(122, tr("Finished Replication") ); //summary
HASH.insert(123, msg );
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126, ""); //clear the log file entry
sendLPEvent("replication", 1, timestamp+": "+msg);
}else if( message.contains("FAILED replication", Qt::CaseInsensitive) ){
tmpLPRepFile.clear();
LPrep_pos = 0; //reset file position
//Now set the status of the process
dev = message.section(" -> ",0,0).section(" ",-1).simplified();
//Make sure the device is currently setup for replication
//Update the HASH
QString file = log.section("LOGFILE:",1,1).simplified();
QString tt = QString(tr("Replication Failed for %1")).arg(dev) +"\n"+ QString(tr("Logfile available at: %1")).arg(file);
HASH.insert(120,"ERROR");
HASH.insert(121,dev); //dataset
HASH.insert(122, tr("Replication Failed") ); //summary
HASH.insert(123, tt );
HASH.insert(124, timestamp); //full timestamp
HASH.insert(125, time); // time only
HASH.insert(126, tr("Replication Error Log")+" <"+file+">" );
sendLPEvent("replication", 7, timestamp+": "+tt);
}
}
}
void EventWatcher::ReadLPErrFile(){
}
void EventWatcher::ReadLPRepFile(){
static QString stat = "";
static QString repTotK = "";
static QString lastSize = "";
//Open/Read any new info in the file
QFile LPlogfile(LPLOG);
if( !LPlogfile.open(QIODevice::ReadOnly) ){ return; } //could not open file
QTextStream STREAM(&LPlogfile);
if(LPrep_pos<=0 || !STREAM.seek(LPrep_pos) ){
//New file location
stat.clear();
repTotK.clear();
lastSize.clear();
}
QStringList info = STREAM.readAll().split("\n");
LPrep_pos = STREAM.pos();
LPlogfile.close();
//Now parse the new info line-by-line
for(int i=0; i<info.length(); i++){
QString line = info[i];
if(line.contains("estimated size is")){ repTotK = line.section("size is ",1,1,QString::SectionSkipEmpty).simplified(); } //save the total size to replicate
else if(line.startsWith("send from ")){}
else if(line.startsWith("TIME ")){}
else if(line.startsWith("warning: ")){} //start of an error
else{ stat = line; } //only save the relevant/latest status line
}
if(!stat.isEmpty()){
//qDebug() << "New Status Message:" << stat;
//Divide up the status message into sections
stat.replace("\t"," ");
QString dataset = stat.section(" ",2,2,QString::SectionSkipEmpty).section("/",0,0).simplified();
QString cSize = stat.section(" ",1,1,QString::SectionSkipEmpty);
//Now Setup the tooltip
if(cSize != lastSize){ //don't update the info if the same size info
QString percent;
if(!repTotK.isEmpty() && repTotK!="??"){
//calculate the percentage
double tot = displayToDoubleK(repTotK);
double c = displayToDoubleK(cSize);
if( tot!=-1 & c!=-1){
double p = (c*100)/tot;
p = int(p*10)/10.0; //round to 1 decimel places
percent = QString::number(p) + "%";
}
}
if(repTotK.isEmpty()){ repTotK = "??"; }
//Format the info string
QString status = cSize+"/"+repTotK;
if(!percent.isEmpty()){ status.append(" ("+percent+")"); }
QString txt = QString(tr("Replicating %1: %2")).arg(dataset, status);
lastSize = cSize; //save the current size for later
//Now set the current process status
HASH.insert(120,"RUNNING");
HASH.insert(121,dataset);
HASH.insert(122,txt);
HASH.insert(123,txt);
emit sendLPEvent("replication", 0, txt);
}
}
}

View File

@@ -21,8 +21,6 @@ public:
EventWatcher();
~EventWatcher();
void start();
//Convert a string into the type flag
static EVENT_TYPE typeFromString(QString);
@@ -40,11 +38,8 @@ private:
//Life Preserver Event variables/functions
QString tmpLPRepFile;
QFile *LPlogfile, *LPrepfile, *LPerrfile;
qint64 LPlog_pos, LPrep_pos, LPerr_pos; //file position markers
void ReadLPLogFile();
void ReadLPErrFile();
void ReadLPRepFile();
void sendLPEvent(QString system, int priority, QString msg);
//General purpose functions
@@ -52,12 +47,17 @@ private:
double displayToDoubleK(QString);
public slots:
void start();
private slots:
//File watcher signals
void WatcherUpdate(const QString&);
void CheckLogFiles(); //catch/load any new log files into the watcher
//LP File changed signals/slots
void ReadLPLogFile();
void ReadLPErrFile();
void ReadLPRepFile();
signals:
void NewEvent(EventWatcher::EVENT_TYPE, QJsonValue); //type/message
};

View File

@@ -36,7 +36,6 @@ bool WebServer::startServer(quint16 port, bool websocket){
qDebug() << "Server Started:" << QDateTime::currentDateTime().toString(Qt::ISODate);
qDebug() << " Port:" << port;
if(WSServer!=0){ qDebug() << " URL:" << WSServer->serverUrl().toString(); }
EVENTS->start();
}else{
qCritical() << "Could not start server - exiting...";
}

View File

@@ -354,7 +354,7 @@ void WebSocket::SslError(const QList<QSslError> &err){ //sslErrors() signal
// PUBLIC SLOTS
// ======================
void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QJsonValue msg){
if(msg.isUndefined()){ msg = EVENTS->lastEvent(evtype); }
if(msg.isNull()){ msg = EVENTS->lastEvent(evtype); }
//qDebug() << "Socket Status Update:" << msg;
if(!ForwardEvents.contains(evtype)){ return; }
RestOutputStruct out;
@@ -367,7 +367,7 @@ void WebSocket::EventUpdate(EventWatcher::EVENT_TYPE evtype, QJsonValue msg){
}else if(evtype==EventWatcher::LIFEPRESERVER){
out.in_struct.name = "life-preserver";
}
//Now send the message back through the socket
this->sendReply(out.assembleMessage());
}

View File

@@ -32,6 +32,7 @@
#include <QTcpServer>
#include <QSslSocket>
#include <QThread>
#include <QFileSystemWatcher>
#include <QList>

View File

@@ -82,9 +82,14 @@ int main( int argc, char ** argv )
//Start the daemon
int ret = 1; //error return value
if( w->startServer(port, websocket) ){
//Now start the event loop
QThread TBACK;
EVENTS->moveToThread(&TBACK);
TBACK.start();
QTimer::singleShot(0,EVENTS, SLOT(start()) );
//Now start the main event loop
ret = a.exec();
qDebug() << "Server Stopped:" << QDateTime::currentDateTime().toString(Qt::ISODate);
//TBACK.stop();
}else{
qDebug() << "[FATAL] Server could not be started:" << QDateTime::currentDateTime().toString(Qt::ISODate);
qDebug() << " - Tried port:" << port;