// // Created by stephane bourque on 2022-10-25. // #include "KafkaManager.h" #include "fmt/format.h" #include "framework/MicroServiceFuncs.h" namespace OpenWifi { void KafkaLoggerFun([[maybe_unused]] cppkafka::KafkaHandleBase &handle, int level, const std::string &facility, const std::string &message) { switch ((cppkafka::LogLevel)level) { case cppkafka::LogLevel::LogNotice: { poco_notice(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; case cppkafka::LogLevel::LogDebug: { poco_debug(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; case cppkafka::LogLevel::LogInfo: { poco_information(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; case cppkafka::LogLevel::LogWarning: { poco_warning(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; case cppkafka::LogLevel::LogAlert: case cppkafka::LogLevel::LogCrit: { poco_critical(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; case cppkafka::LogLevel::LogErr: case cppkafka::LogLevel::LogEmerg: default: { poco_error(KafkaManager()->Logger(), fmt::format("kafka-log: facility: {} message: {}", facility, message)); } break; } } inline void KafkaErrorFun([[maybe_unused]] cppkafka::KafkaHandleBase &handle, int error, const std::string &reason) { poco_error(KafkaManager()->Logger(), fmt::format("kafka-error: {}, reason: {}", error, reason)); } inline void AddKafkaSecurity(cppkafka::Configuration &Config) { auto CA = MicroServiceConfigGetString("openwifi.kafka.ssl.ca.location", ""); auto Certificate = MicroServiceConfigGetString("openwifi.kafka.ssl.certificate.location", ""); auto Key = MicroServiceConfigGetString("openwifi.kafka.ssl.key.location", ""); auto Password = MicroServiceConfigGetString("openwifi.kafka.ssl.key.password", ""); if (CA.empty() || Certificate.empty() || Key.empty()) return; Config.set("ssl.ca.location", CA); Config.set("ssl.certificate.location", Certificate); Config.set("ssl.key.location", Key); if (!Password.empty()) Config.set("ssl.key.password", Password); } void KafkaManager::initialize(Poco::Util::Application &self) { SubSystemServer::initialize(self); KafkaEnabled_ = MicroServiceConfigGetBool("openwifi.kafka.enable", false); } inline void KafkaProducer::run() { Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-PRODUCER", KafkaManager()->Logger().getChannel()); poco_information(Logger_, "Starting..."); Utils::SetThreadName("Kafka:Prod"); cppkafka::Configuration Config( {{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")}, {"metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}}); AddKafkaSecurity(Config); Config.set_log_callback(KafkaLoggerFun); Config.set_error_callback(KafkaErrorFun); KafkaManager()->SystemInfoWrapper_ = R"lit({ "system" : { "id" : )lit" + std::to_string(MicroServiceID()) + R"lit( , "host" : ")lit" + MicroServicePrivateEndPoint() + R"lit(" } , "payload" : )lit"; cppkafka::Producer Producer(Config); Running_ = true; Poco::AutoPtr Note(Queue_.waitDequeueNotification()); while (Note && Running_) { try { auto Msg = dynamic_cast(Note.get()); if (Msg != nullptr) { Producer.produce(cppkafka::MessageBuilder(Msg->Topic()) .key(Msg->Key()) .payload(Msg->Payload())); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, fmt::format("Caught a Kafka exception (producer): {}", E.what())); } catch (const Poco::Exception &E) { Logger_.log(E); } catch (...) { poco_error(Logger_, "std::exception"); } Note = Queue_.waitDequeueNotification(); } poco_information(Logger_, "Stopped..."); } inline void KafkaConsumer::run() { Utils::SetThreadName("Kafka:Cons"); Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-CONSUMER", KafkaManager()->Logger().getChannel()); poco_information(Logger_, "Starting..."); cppkafka::Configuration Config( {{"client.id", MicroServiceConfigGetString("openwifi.kafka.client.id", "")}, {"metadata.broker.list", MicroServiceConfigGetString("openwifi.kafka.brokerlist", "")}, {"group.id", MicroServiceConfigGetString("openwifi.kafka.group.id", "")}, {"enable.auto.commit", MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false)}, {"auto.offset.reset", "latest"}, {"enable.partition.eof", false}}); AddKafkaSecurity(Config); Config.set_log_callback(KafkaLoggerFun); Config.set_error_callback(KafkaErrorFun); cppkafka::TopicConfiguration topic_config = {{"auto.offset.reset", "smallest"}}; // Now configure it to be the default topic config Config.set_default_topic_configuration(topic_config); cppkafka::Consumer Consumer(Config); Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList &partitions) { if (!partitions.empty()) { poco_information(Logger_, fmt::format("Partition assigned: {}...", partitions.front().get_partition())); } }); Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList &partitions) { if (!partitions.empty()) { poco_information(Logger_, fmt::format("Partition revocation: {}...", partitions.front().get_partition())); } }); bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false); auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 20); Types::StringVec Topics; KafkaManager()->Topics(Topics); Consumer.subscribe(Topics); Running_ = true; while (Running_) { try { std::vector MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(100)); for (auto const &Msg : MsgVec) { if (!Msg) continue; if (Msg.get_error()) { if (!Msg.is_eof()) { poco_error(Logger_, fmt::format("Error: {}", Msg.get_error().to_string())); } if (!AutoCommit) Consumer.async_commit(Msg); continue; } KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), std::make_shared(Msg.get_payload())); if (!AutoCommit) Consumer.async_commit(Msg); } } catch (const cppkafka::HandleException &E) { poco_warning(Logger_, fmt::format("Caught a Kafka exception (consumer): {}", E.what())); } catch (const Poco::Exception &E) { Logger_.log(E); } catch (...) { poco_error(Logger_, "std::exception"); } } Consumer.unsubscribe(); poco_information(Logger_, "Stopped..."); } void KafkaProducer::Start() { if (!Running_) { Running_ = true; Worker_.start(*this); } } void KafkaProducer::Stop() { if (Running_) { Running_ = false; Queue_.wakeUpAll(); Worker_.join(); } } void KafkaProducer::Produce(const char *Topic, const std::string &Key, const std::shared_ptr Payload) { std::lock_guard G(Mutex_); Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); } void KafkaConsumer::Start() { if (!Running_) { Running_ = true; Worker_.start(*this); } } void KafkaConsumer::Stop() { if (Running_) { Running_ = false; Worker_.wakeUp(); Worker_.join(); } } void KafkaDispatcher::Start() { if (!Running_) { Running_ = true; Worker_.start(*this); } } void KafkaDispatcher::Stop() { if (Running_) { Running_ = false; Queue_.wakeUpAll(); Worker_.join(); } } auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { std::lock_guard G(Mutex_); auto It = Notifiers_.find(Topic); if (It == Notifiers_.end()) { Types::TopicNotifyFunctionList L; L.emplace(L.end(), std::make_pair(F, FunctionId_)); Notifiers_[Topic] = std::move(L); } else { It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_)); } return FunctionId_++; } void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) { std::lock_guard G(Mutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { Types::TopicNotifyFunctionList &L = It->second; for (auto it = L.begin(); it != L.end(); it++) if (it->second == Id) { L.erase(it); break; } } } void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr Payload) { std::lock_guard G(Mutex_); auto It = Notifiers_.find(Topic); if (It != Notifiers_.end()) { Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload)); } } void KafkaDispatcher::run() { Poco::Logger &Logger_ = Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel()); poco_information(Logger_, "Starting..."); Poco::AutoPtr Note(Queue_.waitDequeueNotification()); Utils::SetThreadName("kafka:dispatch"); while (Note && Running_) { auto Msg = dynamic_cast(Note.get()); if (Msg != nullptr) { auto It = Notifiers_.find(Msg->Topic()); if (It != Notifiers_.end()) { const auto &FL = It->second; for (const auto &[CallbackFunc, _] : FL) { CallbackFunc(Msg->Key(), Msg->Payload()); } } } Note = Queue_.waitDequeueNotification(); } poco_information(Logger_, "Stopped..."); } void KafkaDispatcher::Topics(std::vector &T) { T.clear(); for (const auto &[TopicName, _] : Notifiers_) T.push_back(TopicName); } int KafkaManager::Start() { if (!KafkaEnabled_) return 0; ConsumerThr_.Start(); ProducerThr_.Start(); Dispatcher_.Start(); return 0; } void KafkaManager::Stop() { if (KafkaEnabled_) { poco_information(Logger(), "Stopping..."); Dispatcher_.Stop(); ProducerThr_.Stop(); ConsumerThr_.Stop(); poco_information(Logger(), "Stopped..."); return; } } void KafkaManager::PostMessage(const char *topic, const std::string &key, const std::shared_ptr PayLoad, bool WrapMessage) { if (KafkaEnabled_) { ProducerThr_.Produce(topic, key, WrapMessage ? WrapSystemId(PayLoad) : PayLoad); } } void KafkaManager::Dispatch(const char *Topic, const std::string &Key, const std::shared_ptr Payload) { Dispatcher_.Dispatch(Topic, Key, Payload); } [[nodiscard]] const std::shared_ptr KafkaManager::WrapSystemId(const std::shared_ptr PayLoad) { *PayLoad = SystemInfoWrapper_ + *PayLoad + "}"; return PayLoad; } uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) { if (KafkaEnabled_) { return Dispatcher_.RegisterTopicWatcher(Topic, F); } else { return 0; } } void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) { if (KafkaEnabled_) { Dispatcher_.UnregisterTopicWatcher(Topic, Id); } } void KafkaManager::Topics(std::vector &T) { Dispatcher_.Topics(T); } void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) { poco_information( Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition())); } void KafkaManager::PartitionRevocation(const cppkafka::TopicPartitionList &partitions) { poco_information(Logger(), fmt::format("Partition revocation: {}...", partitions.front().get_partition())); } } // namespace OpenWifi