Framework update

This commit is contained in:
stephb9959
2022-05-05 21:31:12 -07:00
parent 973823004a
commit 84cf98d6a8

View File

@@ -3313,7 +3313,7 @@ namespace OpenWifi {
Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) { Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) {
auto PrivateEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(); auto PrivateEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString();
if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && Services_.find(PrivateEndPoint) != Services_.end()) { if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && Services_.find(PrivateEndPoint) != Services_.end()) {
Services_[PrivateEndPoint].LastUpdate = std::time(nullptr); Services_[PrivateEndPoint].LastUpdate = OpenWifi::Now();
} else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) { } else if (Event == KafkaTopics::ServiceEvents::EVENT_LEAVE) {
Services_.erase(PrivateEndPoint); Services_.erase(PrivateEndPoint);
poco_debug(logger(),fmt::format("Service {} ID={} leaving system.",Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(),ID)); poco_debug(logger(),fmt::format("Service {} ID={} leaving system.",Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(),ID));
@@ -3326,10 +3326,16 @@ namespace OpenWifi {
.PublicEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PUBLIC).toString(), .PublicEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PUBLIC).toString(),
.AccessKey = Object->get(KafkaTopics::ServiceEvents::Fields::KEY).toString(), .AccessKey = Object->get(KafkaTopics::ServiceEvents::Fields::KEY).toString(),
.Version = Object->get(KafkaTopics::ServiceEvents::Fields::VRSN).toString(), .Version = Object->get(KafkaTopics::ServiceEvents::Fields::VRSN).toString(),
.LastUpdate = (uint64_t)std::time(nullptr)}; .LastUpdate = OpenWifi::Now() };
for (const auto &[PrvEndPoint, Svc] : Services_) {
logger().debug(fmt::format("ID: {} Type: {} EndPoint: {}",Svc.Id,Svc.Type,PrvEndPoint)); std::string SvcList;
for (const auto &Svc: Services_) {
if(SvcList.empty())
SvcList = Svc.second.Type;
else
SvcList += ", " + Svc.second.Type;
} }
logger().information(fmt::format("Current list of microservices: {}", SvcList));
} }
} else { } else {
poco_error(logger(),fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",Event)); poco_error(logger(),fmt::format("KAFKA-MSG: invalid event '{}', missing a field.",Event));
@@ -4702,7 +4708,7 @@ namespace OpenWifi {
private: private:
}; };
class MyParallelSocketReactor { /* class MyParallelSocketReactor {
public: public:
explicit MyParallelSocketReactor(uint32_t NumReactors = 8); explicit MyParallelSocketReactor(uint32_t NumReactors = 8);
~MyParallelSocketReactor(); ~MyParallelSocketReactor();
@@ -4712,6 +4718,7 @@ namespace OpenWifi {
Poco::Net::SocketReactor *Reactors_; Poco::Net::SocketReactor *Reactors_;
Poco::ThreadPool ReactorPool_; Poco::ThreadPool ReactorPool_;
}; };
*/
class WebSocketClient; class WebSocketClient;
@@ -4725,7 +4732,8 @@ namespace OpenWifi {
int Start() override; int Start() override;
void Stop() override; void Stop() override;
void run() override; void run() override;
MyParallelSocketReactor &ReactorPool(); // MyParallelSocketReactor &ReactorPool();
Poco::Net::SocketReactor & Reactor() { return Reactor_; }
void NewClient(Poco::Net::WebSocket &WS, const std::string &Id); void NewClient(Poco::Net::WebSocket &WS, const std::string &Id);
bool Register(WebSocketClient *Client, const std::string &Id); bool Register(WebSocketClient *Client, const std::string &Id);
void SetProcessor(WebSocketClientProcessor *F); void SetProcessor(WebSocketClientProcessor *F);
@@ -4763,7 +4771,9 @@ namespace OpenWifi {
private: private:
std::atomic_bool Running_ = false; std::atomic_bool Running_ = false;
Poco::Thread Thr_; Poco::Thread Thr_;
std::unique_ptr<MyParallelSocketReactor> ReactorPool_; // std::unique_ptr<MyParallelSocketReactor> ReactorPool_;
Poco::Net::SocketReactor Reactor_;
Poco::Thread ReactorThread_;
bool GeoCodeEnabled_ = false; bool GeoCodeEnabled_ = false;
std::string GoogleApiKey_; std::string GoogleApiKey_;
std::map<std::string, std::pair<WebSocketClient *, std::string>> Clients_; std::map<std::string, std::pair<WebSocketClient *, std::string>> Clients_;
@@ -4794,7 +4804,7 @@ namespace OpenWifi {
void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf); void OnSocketError(const Poco::AutoPtr<Poco::Net::ErrorNotification> &pNf);
}; };
inline MyParallelSocketReactor::MyParallelSocketReactor(uint32_t NumReactors) : /* inline MyParallelSocketReactor::MyParallelSocketReactor(uint32_t NumReactors) :
NumReactors_(NumReactors) NumReactors_(NumReactors)
{ {
Reactors_ = new Poco::Net::SocketReactor[NumReactors_]; Reactors_ = new Poco::Net::SocketReactor[NumReactors_];
@@ -4816,8 +4826,8 @@ namespace OpenWifi {
return Reactors_[ rand() % NumReactors_ ]; return Reactors_[ rand() % NumReactors_ ];
} }
inline MyParallelSocketReactor & WebSocketClientServer::ReactorPool() { return *ReactorPool_; } // inline MyParallelSocketReactor & WebSocketClientServer::ReactorPool() { return *ReactorPool_; }
*/
inline void WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id) { inline void WebSocketClientServer::NewClient(Poco::Net::WebSocket & WS, const std::string &Id) {
std::lock_guard G(Mutex_); std::lock_guard G(Mutex_);
auto Client = new WebSocketClient(WS,Id,Logger(), Processor_); auto Client = new WebSocketClient(WS,Id,Logger(), Processor_);
@@ -4867,13 +4877,16 @@ namespace OpenWifi {
inline int WebSocketClientServer::Start() { inline int WebSocketClientServer::Start() {
GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey",""); GoogleApiKey_ = MicroService::instance().ConfigGetString("google.apikey","");
GeoCodeEnabled_ = !GoogleApiKey_.empty(); GeoCodeEnabled_ = !GoogleApiKey_.empty();
ReactorPool_ = std::make_unique<MyParallelSocketReactor>(); // ReactorPool_ = std::make_unique<MyParallelSocketReactor>();
ReactorThread_.start(Reactor_);
Thr_.start(*this); Thr_.start(*this);
return 0; return 0;
}; };
inline void WebSocketClientServer::Stop() { inline void WebSocketClientServer::Stop() {
if(Running_) { if(Running_) {
Reactor_.stop();
ReactorThread_.join();
Running_ = false; Running_ = false;
Thr_.wakeUp(); Thr_.wakeUp();
Thr_.join(); Thr_.join();
@@ -4999,7 +5012,7 @@ namespace OpenWifi {
inline WebSocketClient::WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L, WebSocketClientProcessor * Processor) : inline WebSocketClient::WebSocketClient( Poco::Net::WebSocket & WS , const std::string &Id, Poco::Logger & L, WebSocketClientProcessor * Processor) :
Reactor_(WebSocketClientServer()->ReactorPool().Reactor()), Reactor_(WebSocketClientServer()->Reactor()),
Id_(Id), Id_(Id),
Logger_(L), Logger_(L),
Processor_(Processor) { Processor_(Processor) {