Adding kafka logging in framework.

This commit is contained in:
stephb9959
2022-01-23 10:25:28 -08:00
parent 50050fe16d
commit 3e72a41686

View File

@@ -254,6 +254,7 @@ namespace OpenWifi::RESTAPI_utils {
OrgObject.set("index",MonthlyArray);
A.add(OrgObject);
}
Obj.set(Field, A);
}
template<typename T> void field_to_json(Poco::JSON::Object &Obj,
@@ -2327,57 +2328,167 @@ namespace OpenWifi {
Poco::JSON::Object Body_;
};
class KafkaMessage: public Poco::Notification {
public:
KafkaMessage( const std::string &Topic, const std::string &Key, const std::string & Payload) :
Topic_(Topic), Key_(Key), Payload_(Payload)
{
}
inline const std::string & Topic() { return Topic_; }
inline const std::string & Key() { return Key_; }
inline const std::string & Payload() { return Payload_; }
private:
std::string Topic_;
std::string Key_;
std::string Payload_;
};
class KafkaProducer : public Poco::Runnable {
public:
inline void run();
void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
void Stop() {
if(Running_) {
Running_=false;
Worker_.wakeUp();
Worker_.join();
}
}
inline void run () override;
inline void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
inline void Stop() {
if(Running_) {
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
}
}
inline void Produce(const std::string &Topic, const std::string &Key, const std::string &Payload) {
std::lock_guard G(Mutex_);
Queue_.enqueueNotification( new KafkaMessage(Topic,Key,Payload));
}
private:
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
Poco::NotificationQueue Queue_;
};
class KafkaConsumer : public Poco::Runnable {
public:
inline void run();
void Start() {
inline void run() override;
void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
void Stop() {
void Stop() {
if(Running_) {
Running_=false;
Worker_.wakeUp();
Worker_.join();
}
}
private:
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
private:
std::mutex Mutex_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
};
class KafkaDispatcher : public Poco::Runnable {
public:
inline void Start() {
if(!Running_) {
Running_=true;
Worker_.start(*this);
}
}
inline void Stop() {
if(Running_) {
Running_=false;
Queue_.wakeUpAll();
Worker_.join();
}
}
inline auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if(It == Notifiers_.end()) {
Types::TopicNotifyFunctionList L;
L.emplace(L.end(),std::make_pair(F,FunctionId_));
Notifiers_[Topic] = std::move(L);
} else {
It->second.emplace(It->second.end(),std::make_pair(F,FunctionId_));
}
return FunctionId_++;
}
inline void UnregisterTopicWatcher(const std::string &Topic, int Id) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if(It != Notifiers_.end()) {
Types::TopicNotifyFunctionList & L = It->second;
for(auto it=L.begin(); it!=L.end(); it++)
if(it->second == Id) {
L.erase(it);
break;
}
}
}
void Dispatch(const std::string &Topic, const std::string &Key, const std::string &Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if(It!=Notifiers_.end()) {
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
}
inline void run() override {
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) {
auto Msg = dynamic_cast<KafkaMessage*>(Note.get());
if(Msg!= nullptr) {
auto It = Notifiers_.find(Msg->Topic());
if (It != Notifiers_.end()) {
const auto & FL = It->second;
for(const auto &[CallbackFunc,_]:FL) {
CallbackFunc(Msg->Key(), Msg->Payload());
}
}
}
Note = Queue_.waitDequeueNotification();
}
}
inline void Topics(std::vector<std::string> &T) {
T.clear();
for(const auto &[TopicName,_]:Notifiers_)
T.push_back(TopicName);
}
private:
std::mutex Mutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
std::atomic_bool Running_=false;
uint64_t FunctionId_=1;
Poco::NotificationQueue Queue_;
};
class KafkaManager : public SubSystemServer {
public:
struct KMessage {
std::string Topic,
Key,
PayLoad;
};
friend class KafkaConsumer;
friend class KafkaProducer;
@@ -2394,11 +2505,13 @@ namespace OpenWifi {
return 0;
ConsumerThr_.Start();
ProducerThr_.Start();
Dispatcher_.Start();
return 0;
}
inline void Stop() override {
if(KafkaEnabled_) {
Dispatcher_.Stop();
ProducerThr_.Stop();
ConsumerThr_.Stop();
return;
@@ -2407,63 +2520,44 @@ namespace OpenWifi {
inline void PostMessage(const std::string &topic, const std::string & key, const std::string &PayLoad, bool WrapMessage = true ) {
if(KafkaEnabled_) {
std::lock_guard G(Mutex_);
KMessage M{
.Topic = topic,
.Key = key,
.PayLoad = WrapMessage ? WrapSystemId(PayLoad) : PayLoad };
Queue_.push(M);
ProducerThr_.Produce(topic,key,WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
}
}
inline void Dispatch(const std::string &Topic, const std::string & Key, const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}
[[nodiscard]] inline std::string WrapSystemId(const std::string & PayLoad) {
return std::move( SystemInfoWrapper_ + PayLoad + "}");
}
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
inline int RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
inline uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
if(KafkaEnabled_) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if(It == Notifiers_.end()) {
Types::TopicNotifyFunctionList L;
L.emplace(L.end(),std::make_pair(F,FunctionId_));
Notifiers_[Topic] = std::move(L);
} else {
It->second.emplace(It->second.end(),std::make_pair(F,FunctionId_));
}
return FunctionId_++;
return Dispatcher_.RegisterTopicWatcher(Topic,F);
} else {
return 0;
}
}
inline void UnregisterTopicWatcher(const std::string &Topic, int Id) {
inline void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
if(KafkaEnabled_) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if(It != Notifiers_.end()) {
Types::TopicNotifyFunctionList & L = It->second;
for(auto it=L.begin(); it!=L.end(); it++)
if(it->second == Id) {
L.erase(it);
break;
}
}
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
}
}
// void WakeUp();
inline void Topics(std::vector<std::string> &T) {
Dispatcher_.Topics(T);
}
private:
bool KafkaEnabled_ = false;
std::queue<KMessage> Queue_;
std::string SystemInfoWrapper_;
int FunctionId_=1;
Types::NotifyTable Notifiers_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_;
inline void PartitionAssignment(const cppkafka::TopicPartitionList& partitions) {
Logger().information(Poco::format("Partition assigned: %Lu...",(uint64_t )partitions.front().get_partition()));
@@ -3665,30 +3759,19 @@ namespace OpenWifi {
std::to_string(MicroService::instance().ID()) +
R"lit( , "host" : ")lit" + MicroService::instance().PrivateEndPoint() +
R"lit(" } , "payload" : )lit" ;
cppkafka::Producer Producer(Config);
cppkafka::Producer Producer(Config);
Running_ = true;
while(Running_) {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
try
{
std::lock_guard G(Mutex_);
auto Num=0;
while (!KafkaManager()->Queue_.empty()) {
const auto M = KafkaManager()->Queue_.front();
Producer.produce(
cppkafka::MessageBuilder(M.Topic).key(M.Key).payload(M.PayLoad));
KafkaManager()->Queue_.pop();
Num++;
}
if(Num)
Producer.flush();
} catch (const cppkafka::HandleException &E ) {
KafkaManager()->Logger().warning(Poco::format("Caught a Kafka exception (producer): %s",std::string{E.what()}));
} catch (const Poco::Exception &E) {
KafkaManager()->Logger().log(E);
}
}
Producer.flush();
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) {
KafkaMessage * Msg = dynamic_cast<KafkaMessage*>(Note.get());
if(Msg!= nullptr) {
Producer.produce(
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
}
Note = Queue_.waitDequeueNotification();
}
}
inline void KafkaConsumer::run() {
@@ -3729,15 +3812,13 @@ namespace OpenWifi {
auto BatchSize = MicroService::instance().ConfigGetInt("openwifi.kafka.consumer.batchsize",20);
Types::StringVec Topics;
for(const auto &i:KafkaManager()->Notifiers_)
Topics.push_back(i.first);
KafkaManager()->Topics(Topics);
Consumer.subscribe(Topics);
Running_ = true;
while(Running_) {
try {
std::vector<cppkafka::Message> MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(200));
std::vector<cppkafka::Message> MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100));
for(auto const &Msg:MsgVec) {
if (!Msg)
continue;
@@ -3748,17 +3829,7 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
continue;
}
std::lock_guard G(Mutex_);
auto It = KafkaManager()->Notifiers_.find(Msg.get_topic());
if (It != KafkaManager()->Notifiers_.end()) {
Types::TopicNotifyFunctionList &FL = It->second;
std::string Key{Msg.get_key()};
std::string Payload{Msg.get_payload()};
for (auto &F : FL) {
std::thread T(F.first, Key, Payload);
T.detach();
}
}
KafkaManager()->Dispatch(Msg.get_topic(), Msg.get_key(),Msg.get_payload() );
if (!AutoCommit)
Consumer.async_commit(Msg);
}