From d706dba60a98facf9d99e2de2ee001e4c52faa5f Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 30 Oct 2022 10:03:06 -0700 Subject: [PATCH] https://telecominfraproject.atlassian.net/browse/WIFI-11303 Signed-off-by: stephb9959 --- src/framework/KafkaManager.cpp | 54 +++++++++++++++------------------- src/framework/KafkaManager.h | 51 ++++++++++---------------------- 2 files changed, 39 insertions(+), 66 deletions(-) diff --git a/src/framework/KafkaManager.cpp b/src/framework/KafkaManager.cpp index 55fa042..7f1d8d9 100644 --- a/src/framework/KafkaManager.cpp +++ b/src/framework/KafkaManager.cpp @@ -69,6 +69,7 @@ namespace OpenWifi { inline void KafkaProducer::run() { + Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-PRODUCER", KafkaManager()->Logger().getChannel()); Utils::SetThreadName("Kafka:Prod"); cppkafka::Configuration Config({ { "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "") }, @@ -110,6 +111,10 @@ namespace OpenWifi { inline void KafkaConsumer::run() { Utils::SetThreadName("Kafka:Cons"); + Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-CONSUMER", KafkaManager()->Logger().getChannel()); + + poco_information(Logger_,"Starting..."); + cppkafka::Configuration Config({ { "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id","") }, { "metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist","") }, @@ -134,13 +139,13 @@ namespace OpenWifi { cppkafka::Consumer Consumer(Config); Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList& partitions) { if(!partitions.empty()) { - Logger_.information(fmt::format("Partition assigned: {}...", + poco_information(Logger_,fmt::format("Partition assigned: {}...", partitions.front().get_partition())); } }); Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList& partitions) { if(!partitions.empty()) { - Logger_.information(fmt::format("Partition revocation: {}...", + poco_information(Logger_,fmt::format("Partition revocation: {}...", partitions.front().get_partition())); } }); @@ -180,10 +185,10 @@ namespace OpenWifi { } } Consumer.unsubscribe(); + poco_information(Logger_,"Stopped..."); } void KafkaProducer::Start() { - poco_information(Logger_,"Starting..."); if(!Running_) { Running_=true; Worker_.start(*this); @@ -192,11 +197,9 @@ namespace OpenWifi { void KafkaProducer::Stop() { if(Running_) { - poco_information(Logger_,"Stopping..."); Running_=false; Queue_.wakeUpAll(); Worker_.join(); - poco_information(Logger_,"Stopped..."); } } @@ -207,7 +210,6 @@ namespace OpenWifi { void KafkaConsumer::Start() { if(!Running_) { - poco_information(Logger_,"Starting..."); Running_=true; Worker_.start(*this); } @@ -215,17 +217,14 @@ namespace OpenWifi { void KafkaConsumer::Stop() { if(Running_) { - poco_information(Logger_,"Stopping..."); Running_=false; Worker_.wakeUp(); Worker_.join(); - poco_information(Logger_,"Stopped..."); } } void KafkaDispatcher::Start() { if(!Running_) { - poco_information(Logger_,"Starting..."); Running_=true; Worker_.start(*this); } @@ -233,11 +232,9 @@ namespace OpenWifi { void KafkaDispatcher::Stop() { if(Running_) { - poco_information(Logger_,"Stopping..."); Running_=false; Queue_.wakeUpAll(); Worker_.join(); - poco_information(Logger_,"Stopped..."); } } @@ -276,6 +273,8 @@ namespace OpenWifi { } void KafkaDispatcher::run() { + Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel()); + poco_information(Logger_,"Starting..."); Poco::AutoPtr Note(Queue_.waitDequeueNotification()); Utils::SetThreadName("kafka:dispatch"); while(Note && Running_) { @@ -291,6 +290,7 @@ namespace OpenWifi { } Note = Queue_.waitDequeueNotification(); } + poco_information(Logger_,"Stopped..."); } void KafkaDispatcher::Topics(std::vector &T) { @@ -304,24 +304,18 @@ namespace OpenWifi { std::cout << __LINE__ << std::endl; if(!KafkaEnabled_) return 0; - std::cout << __LINE__ << std::endl; - poco_information(Logger(),"Starting..."); - std::cout << __LINE__ << std::endl; - ConsumerThr_->Start(); - std::cout << __LINE__ << std::endl; - ProducerThr_->Start(); - std::cout << __LINE__ << std::endl; - Dispatcher_->Start(); - std::cout << __LINE__ << std::endl; + ConsumerThr_.Start(); + ProducerThr_.Start(); + Dispatcher_.Start(); return 0; } void KafkaManager::Stop() { if(KafkaEnabled_) { poco_information(Logger(),"Stopping..."); - Dispatcher_->Stop(); - ProducerThr_->Stop(); - ConsumerThr_->Stop(); + Dispatcher_.Stop(); + ProducerThr_.Stop(); + ConsumerThr_.Stop(); poco_information(Logger(),"Stopped..."); return; } @@ -329,12 +323,12 @@ namespace OpenWifi { void KafkaManager::PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage ) { if(KafkaEnabled_) { - ProducerThr_->Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad); + ProducerThr_.Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad); } } void KafkaManager::Dispatch(const std::string &Topic, const std::string & Key, const std::string &Payload) { - Dispatcher_->Dispatch(Topic, Key, Payload); + Dispatcher_.Dispatch(Topic, Key, Payload); } [[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) { @@ -343,7 +337,7 @@ namespace OpenWifi { uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { if(KafkaEnabled_) { - return Dispatcher_->RegisterTopicWatcher(Topic,F); + return Dispatcher_.RegisterTopicWatcher(Topic,F); } else { return 0; } @@ -351,20 +345,20 @@ namespace OpenWifi { void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { if(KafkaEnabled_) { - Dispatcher_->UnregisterTopicWatcher(Topic, Id); + Dispatcher_.UnregisterTopicWatcher(Topic, Id); } } void KafkaManager::Topics(std::vector &T) { - Dispatcher_->Topics(T); + Dispatcher_.Topics(T); } void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) { - Logger().information(fmt::format("Partition assigned: {}...", partitions.front().get_partition())); + poco_information(Logger(),fmt::format("Partition assigned: {}...", partitions.front().get_partition())); } void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList& partitions) { - Logger().information(fmt::format("Partition revocation: {}...",partitions.front().get_partition())); + poco_information(Logger(),fmt::format("Partition revocation: {}...",partitions.front().get_partition())); } } // namespace OpenWifi \ No newline at end of file diff --git a/src/framework/KafkaManager.h b/src/framework/KafkaManager.h index e2cced8..e96f25c 100644 --- a/src/framework/KafkaManager.h +++ b/src/framework/KafkaManager.h @@ -33,34 +33,26 @@ namespace OpenWifi { }; class KafkaProducer : public Poco::Runnable { - public: - KafkaProducer(Poco::Logger &L) : - Logger_(L) { - } - void run () override; - void Start(); - void Stop(); - void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload); + public: + void run () override; + void Start(); + void Stop(); + void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload); private: - Poco::Logger &Logger_; - std::recursive_mutex Mutex_; - Poco::Thread Worker_; - mutable std::atomic_bool Running_=false; - Poco::NotificationQueue Queue_; - }; + std::recursive_mutex Mutex_; + Poco::Thread Worker_; + mutable std::atomic_bool Running_=false; + Poco::NotificationQueue Queue_; + }; class KafkaConsumer : public Poco::Runnable { public: - KafkaConsumer(Poco::Logger &L) : - Logger_(L) { - } void run() override; void Start(); void Stop(); private: - Poco::Logger &Logger_; std::recursive_mutex Mutex_; Poco::Thread Worker_; mutable std::atomic_bool Running_=false; @@ -68,10 +60,6 @@ namespace OpenWifi { class KafkaDispatcher : public Poco::Runnable { public: - KafkaDispatcher(Poco::Logger &L) : - Logger_(L) { - } - void Start(); void Stop(); auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F); @@ -81,7 +69,6 @@ namespace OpenWifi { void Topics(std::vector &T); private: - Poco::Logger &Logger_; std::recursive_mutex Mutex_; Types::NotifyTable Notifiers_; Poco::Thread Worker_; @@ -115,25 +102,17 @@ namespace OpenWifi { void Topics(std::vector &T); private: - bool KafkaEnabled_ = false; - std::string SystemInfoWrapper_; - std::unique_ptr ProducerThr_; - std::unique_ptr ConsumerThr_; - std::unique_ptr Dispatcher_; + bool KafkaEnabled_ = false; + std::string SystemInfoWrapper_; + KafkaProducer ProducerThr_; + KafkaConsumer ConsumerThr_; + KafkaDispatcher Dispatcher_; void PartitionAssignment(const cppkafka::TopicPartitionList& partitions); void PartitionRevocation(const cppkafka::TopicPartitionList& partitions); KafkaManager() noexcept: SubSystemServer("KafkaManager", "KAFKA-SVR", "openwifi.kafka") { - ConsumerThr_ = std::make_unique(Logger().create("KAFKA-CONSUMER",Logger().getChannel())); - std::cout << __LINE__ << std::endl; - std::cout << __LINE__ << std::endl; - ProducerThr_ = std::make_unique(Logger().create("KAFKA-PRODUCER",Logger().getChannel())); - std::cout << __LINE__ << std::endl; - std::cout << __LINE__ << std::endl; - Dispatcher_ = std::make_unique(Logger().create("KAFKA-DISPATCHER",Logger().getChannel())); - std::cout << __LINE__ << std::endl; } };