mirror of
https://github.com/Telecominfraproject/wlan-cloud-ucentralfms.git
synced 2025-10-30 18:27:54 +00:00
379 lines
12 KiB
C++
379 lines
12 KiB
C++
//
|
|
// Created by stephane bourque on 2022-10-25.
|
|
//
|
|
|
|
#include "KafkaManager.h"
|
|
|
|
#include "fmt/format.h"
|
|
#include "framework/MicroServiceFuncs.h"
|
|
|
|
namespace OpenWifi {
|
|
|
|
void KafkaLoggerFun([[maybe_unused]] cppkafka::KafkaHandleBase &handle, int level,
|
|
const std::string &facility, const std::string &message) {
|
|
switch ((cppkafka::LogLevel)level) {
|
|
case cppkafka::LogLevel::LogNotice: {
|
|
poco_notice(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
case cppkafka::LogLevel::LogDebug: {
|
|
poco_debug(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
case cppkafka::LogLevel::LogInfo: {
|
|
poco_information(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
case cppkafka::LogLevel::LogWarning: {
|
|
poco_warning(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
case cppkafka::LogLevel::LogAlert:
|
|
case cppkafka::LogLevel::LogCrit: {
|
|
poco_critical(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
case cppkafka::LogLevel::LogErr:
|
|
case cppkafka::LogLevel::LogEmerg:
|
|
default: {
|
|
poco_error(KafkaManager()->Logger(),
|
|
fmt::format("kafka-log: facility: {} message: {}", facility, message));
|
|
} break;
|
|
}
|
|
}
|
|
|
|
inline void KafkaErrorFun([[maybe_unused]] cppkafka::KafkaHandleBase &handle, int error,
|
|
const std::string &reason) {
|
|
poco_error(KafkaManager()->Logger(),
|
|
fmt::format("kafka-error: {}, reason: {}", error, reason));
|
|
}
|
|
|
|
inline void AddKafkaSecurity(cppkafka::Configuration &Config) {
|
|
auto CA = MicroServiceConfigGetString("openwifi.kafka.ssl.ca.location", "");
|
|
auto Certificate =
|
|
MicroServiceConfigGetString("openwifi.kafka.ssl.certificate.location", "");
|
|
auto Key = MicroServiceConfigGetString("openwifi.kafka.ssl.key.location", "");
|
|
auto Password = MicroServiceConfigGetString("openwifi.kafka.ssl.key.password", "");
|
|
|
|
if (CA.empty() || Certificate.empty() || Key.empty())
|
|
return;
|
|
|
|
Config.set("ssl.ca.location", CA);
|
|
Config.set("ssl.certificate.location", Certificate);
|
|
Config.set("ssl.key.location", Key);
|
|
if (!Password.empty())
|
|
Config.set("ssl.key.password", Password);
|
|
}
|
|
|
|
void KafkaManager::initialize(Poco::Util::Application &self) {
|
|
SubSystemServer::initialize(self);
|
|
KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable", false);
|
|
}
|
|
|
|
inline void KafkaProducer::run() {
|
|
Poco::Logger &Logger_ =
|
|
Poco::Logger::create("KAFKA-PRODUCER", KafkaManager()->Logger().getChannel());
|
|
poco_information(Logger_, "Starting...");
|
|
|
|
Utils::SetThreadName("Kafka:Prod");
|
|
cppkafka::Configuration Config(
|
|
{{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")},
|
|
{"metadata.broker.list",
|
|
MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}});
|
|
|
|
AddKafkaSecurity(Config);
|
|
|
|
Config.set_log_callback(KafkaLoggerFun);
|
|
Config.set_error_callback(KafkaErrorFun);
|
|
|
|
KafkaManager()->SystemInfoWrapper_ =
|
|
R"lit({ "system" : { "id" : )lit" + std::to_string(MicroServiceID()) +
|
|
R"lit( , "host" : ")lit" + MicroServicePrivateEndPoint() +
|
|
R"lit(" } , "payload" : )lit";
|
|
|
|
cppkafka::Producer Producer(Config);
|
|
Running_ = true;
|
|
|
|
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
|
|
while (Note && Running_) {
|
|
try {
|
|
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
|
|
if (Msg != nullptr) {
|
|
Producer.produce(cppkafka::MessageBuilder(Msg->Topic())
|
|
.key(Msg->Key())
|
|
.payload(Msg->Payload()));
|
|
}
|
|
} catch (const cppkafka::HandleException &E) {
|
|
poco_warning(Logger_,
|
|
fmt::format("Caught a Kafka exception (producer): {}", E.what()));
|
|
} catch (const Poco::Exception &E) {
|
|
Logger_.log(E);
|
|
} catch (...) {
|
|
poco_error(Logger_, "std::exception");
|
|
}
|
|
Note = Queue_.waitDequeueNotification();
|
|
}
|
|
poco_information(Logger_, "Stopped...");
|
|
}
|
|
|
|
inline void KafkaConsumer::run() {
|
|
Utils::SetThreadName("Kafka:Cons");
|
|
|
|
Poco::Logger &Logger_ =
|
|
Poco::Logger::create("KAFKA-CONSUMER", KafkaManager()->Logger().getChannel());
|
|
|
|
poco_information(Logger_, "Starting...");
|
|
|
|
cppkafka::Configuration Config(
|
|
{{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")},
|
|
{"metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")},
|
|
{"group.id", MicroServiceConfigGetString("openwifi.kafka.group.id", "")},
|
|
{"enable.auto.commit", MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false)},
|
|
{"auto.offset.reset", "latest"},
|
|
{"enable.partition.eof", false}});
|
|
|
|
AddKafkaSecurity(Config);
|
|
|
|
Config.set_log_callback(KafkaLoggerFun);
|
|
Config.set_error_callback(KafkaErrorFun);
|
|
|
|
cppkafka::TopicConfiguration topic_config = {{"auto.offset.reset", "smallest"}};
|
|
|
|
// Now configure it to be the default topic config
|
|
Config.set_default_topic_configuration(topic_config);
|
|
|
|
cppkafka::Consumer Consumer(Config);
|
|
Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList &partitions) {
|
|
if (!partitions.empty()) {
|
|
poco_information(Logger_, fmt::format("Partition assigned: {}...",
|
|
partitions.front().get_partition()));
|
|
}
|
|
});
|
|
Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList &partitions) {
|
|
if (!partitions.empty()) {
|
|
poco_information(Logger_, fmt::format("Partition revocation: {}...",
|
|
partitions.front().get_partition()));
|
|
}
|
|
});
|
|
|
|
bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
|
|
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20);
|
|
|
|
Types::StringVec Topics;
|
|
KafkaManager()->Topics(Topics);
|
|
Consumer.subscribe(Topics);
|
|
|
|
Running_ = true;
|
|
while (Running_) {
|
|
try {
|
|
std::vector<cppkafka::Message> MsgVec =
|
|
Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100));
|
|
for (auto const &Msg : MsgVec) {
|
|
if (!Msg)
|
|
continue;
|
|
if (Msg.get_error()) {
|
|
if (!Msg.is_eof()) {
|
|
poco_error(Logger_,
|
|
fmt::format("Error: {}", Msg.get_error().to_string()));
|
|
}
|
|
if (!AutoCommit)
|
|
Consumer.async_commit(Msg);
|
|
continue;
|
|
}
|
|
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared<std::string>(Msg.get_payload()));
|
|
if (!AutoCommit)
|
|
Consumer.async_commit(Msg);
|
|
}
|
|
} catch (const cppkafka::HandleException &E) {
|
|
poco_warning(Logger_,
|
|
fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
|
|
} catch (const Poco::Exception &E) {
|
|
Logger_.log(E);
|
|
} catch (...) {
|
|
poco_error(Logger_, "std::exception");
|
|
}
|
|
}
|
|
Consumer.unsubscribe();
|
|
poco_information(Logger_, "Stopped...");
|
|
}
|
|
|
|
void KafkaProducer::Start() {
|
|
if (!Running_) {
|
|
Running_ = true;
|
|
Worker_.start(*this);
|
|
}
|
|
}
|
|
|
|
void KafkaProducer::Stop() {
|
|
if (Running_) {
|
|
Running_ = false;
|
|
Queue_.wakeUpAll();
|
|
Worker_.join();
|
|
}
|
|
}
|
|
|
|
void KafkaProducer::Produce(const char *Topic, const std::string &Key,
|
|
const std::shared_ptr<std::string> Payload) {
|
|
std::lock_guard G(Mutex_);
|
|
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
|
|
}
|
|
|
|
void KafkaConsumer::Start() {
|
|
if (!Running_) {
|
|
Running_ = true;
|
|
Worker_.start(*this);
|
|
}
|
|
}
|
|
|
|
void KafkaConsumer::Stop() {
|
|
if (Running_) {
|
|
Running_ = false;
|
|
Worker_.wakeUp();
|
|
Worker_.join();
|
|
}
|
|
}
|
|
|
|
void KafkaDispatcher::Start() {
|
|
if (!Running_) {
|
|
Running_ = true;
|
|
Worker_.start(*this);
|
|
}
|
|
}
|
|
|
|
void KafkaDispatcher::Stop() {
|
|
if (Running_) {
|
|
Running_ = false;
|
|
Queue_.wakeUpAll();
|
|
Worker_.join();
|
|
}
|
|
}
|
|
|
|
auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic,
|
|
Types::TopicNotifyFunction &F) {
|
|
std::lock_guard G(Mutex_);
|
|
auto It = Notifiers_.find(Topic);
|
|
if (It == Notifiers_.end()) {
|
|
Types::TopicNotifyFunctionList L;
|
|
L.emplace(L.end(), std::make_pair(F, FunctionId_));
|
|
Notifiers_[Topic] = std::move(L);
|
|
} else {
|
|
It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_));
|
|
}
|
|
return FunctionId_++;
|
|
}
|
|
|
|
void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) {
|
|
std::lock_guard G(Mutex_);
|
|
auto It = Notifiers_.find(Topic);
|
|
if (It != Notifiers_.end()) {
|
|
Types::TopicNotifyFunctionList &L = It->second;
|
|
for (auto it = L.begin(); it != L.end(); it++)
|
|
if (it->second == Id) {
|
|
L.erase(it);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
|
|
const std::shared_ptr<std::string> Payload) {
|
|
std::lock_guard G(Mutex_);
|
|
auto It = Notifiers_.find(Topic);
|
|
if (It != Notifiers_.end()) {
|
|
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
|
|
}
|
|
}
|
|
|
|
void KafkaDispatcher::run() {
|
|
Poco::Logger &Logger_ =
|
|
Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel());
|
|
poco_information(Logger_, "Starting...");
|
|
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
|
|
Utils::SetThreadName("kafka:dispatch");
|
|
while (Note && Running_) {
|
|
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
|
|
if (Msg != nullptr) {
|
|
auto It = Notifiers_.find(Msg->Topic());
|
|
if (It != Notifiers_.end()) {
|
|
const auto &FL = It->second;
|
|
for (const auto &[CallbackFunc, _] : FL) {
|
|
CallbackFunc(Msg->Key(), Msg->Payload());
|
|
}
|
|
}
|
|
}
|
|
Note = Queue_.waitDequeueNotification();
|
|
}
|
|
poco_information(Logger_, "Stopped...");
|
|
}
|
|
|
|
void KafkaDispatcher::Topics(std::vector<std::string> &T) {
|
|
T.clear();
|
|
for (const auto &[TopicName, _] : Notifiers_)
|
|
T.push_back(TopicName);
|
|
}
|
|
|
|
int KafkaManager::Start() {
|
|
if (!KafkaEnabled_)
|
|
return 0;
|
|
ConsumerThr_.Start();
|
|
ProducerThr_.Start();
|
|
Dispatcher_.Start();
|
|
return 0;
|
|
}
|
|
|
|
void KafkaManager::Stop() {
|
|
if (KafkaEnabled_) {
|
|
poco_information(Logger(), "Stopping...");
|
|
Dispatcher_.Stop();
|
|
ProducerThr_.Stop();
|
|
ConsumerThr_.Stop();
|
|
poco_information(Logger(), "Stopped...");
|
|
return;
|
|
}
|
|
}
|
|
|
|
void KafkaManager::PostMessage(const char *topic, const std::string &key,
|
|
const std::shared_ptr<std::string> PayLoad, bool WrapMessage) {
|
|
if (KafkaEnabled_) {
|
|
ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad);
|
|
}
|
|
}
|
|
|
|
void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
|
|
const std::shared_ptr<std::string> Payload) {
|
|
Dispatcher_.Dispatch(Topic, Key, Payload);
|
|
}
|
|
|
|
[[nodiscard]] const std::shared_ptr<std::string> KafkaManager::WrapSystemId(const std::shared_ptr<std::string> PayLoad) {
|
|
*PayLoad = SystemInfoWrapper_ + *PayLoad + "}";
|
|
return PayLoad;
|
|
}
|
|
|
|
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
|
|
Types::TopicNotifyFunction &F) {
|
|
if (KafkaEnabled_) {
|
|
return Dispatcher_.RegisterTopicWatcher(Topic, F);
|
|
} else {
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
|
|
if (KafkaEnabled_) {
|
|
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
|
|
}
|
|
}
|
|
|
|
void KafkaManager::Topics(std::vector<std::string> &T) { Dispatcher_.Topics(T); }
|
|
|
|
void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) {
|
|
poco_information(
|
|
Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition()));
|
|
}
|
|
|
|
void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList &partitions) {
|
|
poco_information(Logger(), fmt::format("Partition revocation: {}...",
|
|
partitions.front().get_partition()));
|
|
}
|
|
|
|
} // namespace OpenWifi
|