stephb9959
2022-10-30 10:03:06 -07:00
parent 8b23197359
commit d706dba60a
2 changed files with 39 additions and 66 deletions

View File

@@ -69,6 +69,7 @@ namespace OpenWifi {
inline void KafkaProducer::run() {
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-PRODUCER", KafkaManager()->Logger().getChannel());
Utils::SetThreadName("Kafka:Prod");
cppkafka::Configuration Config({
{ "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "") },
@@ -110,6 +111,10 @@ namespace OpenWifi {
inline void KafkaConsumer::run() {
Utils::SetThreadName("Kafka:Cons");
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-CONSUMER", KafkaManager()->Logger().getChannel());
poco_information(Logger_,"Starting...");
cppkafka::Configuration Config({
{ "client.id", MicroServiceConfigGetString("openwifi.kafka.client.id","") },
{ "metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist","") },
@@ -134,13 +139,13 @@ namespace OpenWifi {
cppkafka::Consumer Consumer(Config);
Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
Logger_.information(fmt::format("Partition assigned: {}...",
poco_information(Logger_,fmt::format("Partition assigned: {}...",
partitions.front().get_partition()));
}
});
Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
Logger_.information(fmt::format("Partition revocation: {}...",
poco_information(Logger_,fmt::format("Partition revocation: {}...",
partitions.front().get_partition()));
}
});
@@ -180,10 +185,10 @@ namespace OpenWifi {
}
}
Consumer.unsubscribe();
poco_information(Logger_,"Stopped...");
}
void KafkaProducer::Start() {
poco_information(Logger_,"Starting...");
if(!Running_) {
Running_=true;
Worker_.start(*this);
@@ -192,11 +197,9 @@ namespace OpenWifi {
void KafkaProducer::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
@@ -207,7 +210,6 @@ namespace OpenWifi {
void KafkaConsumer::Start() {
if(!Running_) {
poco_information(Logger_,"Starting...");
Running_=true;
Worker_.start(*this);
}
@@ -215,17 +217,14 @@ namespace OpenWifi {
void KafkaConsumer::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Worker_.wakeUp();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
void KafkaDispatcher::Start() {
if(!Running_) {
poco_information(Logger_,"Starting...");
Running_=true;
Worker_.start(*this);
}
@@ -233,11 +232,9 @@ namespace OpenWifi {
void KafkaDispatcher::Stop() {
if(Running_) {
poco_information(Logger_,"Stopping...");
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
poco_information(Logger_,"Stopped...");
}
}
@@ -276,6 +273,8 @@ namespace OpenWifi {
}
void KafkaDispatcher::run() {
Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel());
poco_information(Logger_,"Starting...");
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
Utils::SetThreadName("kafka:dispatch");
while(Note && Running_) {
@@ -291,6 +290,7 @@ namespace OpenWifi {
}
Note = Queue_.waitDequeueNotification();
}
poco_information(Logger_,"Stopped...");
}
void KafkaDispatcher::Topics(std::vector<std::string> &T) {
@@ -304,24 +304,18 @@ namespace OpenWifi {
std::cout << __LINE__ << std::endl;
if(!KafkaEnabled_)
return 0;
std::cout << __LINE__ << std::endl;
poco_information(Logger(),"Starting...");
std::cout << __LINE__ << std::endl;
ConsumerThr_->Start();
std::cout << __LINE__ << std::endl;
ProducerThr_->Start();
std::cout << __LINE__ << std::endl;
Dispatcher_->Start();
std::cout << __LINE__ << std::endl;
ConsumerThr_.Start();
ProducerThr_.Start();
Dispatcher_.Start();
return 0;
}
void KafkaManager::Stop() {
if(KafkaEnabled_) {
poco_information(Logger(),"Stopping...");
Dispatcher_->Stop();
ProducerThr_->Stop();
ConsumerThr_->Stop();
Dispatcher_.Stop();
ProducerThr_.Stop();
ConsumerThr_.Stop();
poco_information(Logger(),"Stopped...");
return;
}
@@ -329,12 +323,12 @@ namespace OpenWifi {
void KafkaManager::PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage ) {
if(KafkaEnabled_) {
ProducerThr_->Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
ProducerThr_.Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}
void KafkaManager::Dispatch(const std::string &Topic, const std::string & Key, const std::string &Payload) {
Dispatcher_->Dispatch(Topic, Key, Payload);
Dispatcher_.Dispatch(Topic, Key, Payload);
}
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
@@ -343,7 +337,7 @@ namespace OpenWifi {
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
if(KafkaEnabled_) {
return Dispatcher_->RegisterTopicWatcher(Topic,F);
return Dispatcher_.RegisterTopicWatcher(Topic,F);
} else {
return 0;
}
@@ -351,20 +345,20 @@ namespace OpenWifi {
void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
if(KafkaEnabled_) {
Dispatcher_->UnregisterTopicWatcher(Topic, Id);
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
}
}
void KafkaManager::Topics(std::vector<std::string> &T) {
Dispatcher_->Topics(T);
Dispatcher_.Topics(T);
}
void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList& partitions) {
Logger().information(fmt::format("Partition assigned: {}...", partitions.front().get_partition()));
poco_information(Logger(),fmt::format("Partition assigned: {}...", partitions.front().get_partition()));
}
void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList& partitions) {
Logger().information(fmt::format("Partition revocation: {}...",partitions.front().get_partition()));
poco_information(Logger(),fmt::format("Partition revocation: {}...",partitions.front().get_partition()));
}
} // namespace OpenWifi

View File

@@ -33,34 +33,26 @@ namespace OpenWifi {
};
class KafkaProducer : public Poco::Runnable {
public:
KafkaProducer(Poco::Logger &L) :
Logger_(L) {
}
void run () override;
void Start();
void Stop();
void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
public:
void run () override;
void Start();
void Stop();
void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload);
private:
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
Poco::NotificationQueue Queue_;
};
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
Poco::NotificationQueue Queue_;
};
class KafkaConsumer : public Poco::Runnable {
public:
KafkaConsumer(Poco::Logger &L) :
Logger_(L) {
}
void run() override;
void Start();
void Stop();
private:
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_=false;
@@ -68,10 +60,6 @@ namespace OpenWifi {
class KafkaDispatcher : public Poco::Runnable {
public:
KafkaDispatcher(Poco::Logger &L) :
Logger_(L) {
}
void Start();
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
@@ -81,7 +69,6 @@ namespace OpenWifi {
void Topics(std::vector<std::string> &T);
private:
Poco::Logger &Logger_;
std::recursive_mutex Mutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
@@ -115,25 +102,17 @@ namespace OpenWifi {
void Topics(std::vector<std::string> &T);
private:
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
std::unique_ptr<KafkaProducer> ProducerThr_;
std::unique_ptr<KafkaConsumer> ConsumerThr_;
std::unique_ptr<KafkaDispatcher> Dispatcher_;
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_;
void PartitionAssignment(const cppkafka::TopicPartitionList& partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList& partitions);
KafkaManager() noexcept:
SubSystemServer("KafkaManager", "KAFKA-SVR", "openwifi.kafka") {
ConsumerThr_ = std::make_unique<KafkaConsumer>(Logger().create("KAFKA-CONSUMER",Logger().getChannel()));
std::cout << __LINE__ << std::endl;
std::cout << __LINE__ << std::endl;
ProducerThr_ = std::make_unique<KafkaProducer>(Logger().create("KAFKA-PRODUCER",Logger().getChannel()));
std::cout << __LINE__ << std::endl;
std::cout << __LINE__ << std::endl;
Dispatcher_ = std::make_unique<KafkaDispatcher>(Logger().create("KAFKA-DISPATCHER",Logger().getChannel()));
std::cout << __LINE__ << std::endl;
}
};