From 2212dc8e71ccd638af0a78440753a663da4c314a Mon Sep 17 00:00:00 2001 From: stephb9959 Date: Sun, 20 Feb 2022 10:41:44 -0800 Subject: [PATCH] Adding kafka ssl --- build | 2 +- owprov.properties | 4 ++++ src/framework/ConfigurationValidator.cpp | 2 +- src/framework/MicroService.h | 22 +++++++++++++++++++++- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/build b/build index d8263ee..e440e5c 100644 --- a/build +++ b/build @@ -1 +1 @@ -2 \ No newline at end of file +3 \ No newline at end of file diff --git a/owprov.properties b/owprov.properties index 13c04cd..7039805 100644 --- a/owprov.properties +++ b/owprov.properties @@ -65,6 +65,10 @@ openwifi.kafka.enable = true openwifi.kafka.brokerlist = a1.arilia.com:9092 openwifi.kafka.auto.commit = false openwifi.kafka.queue.buffering.max.ms = 50 +openwifi.kafka.ssl.ca.location = +openwifi.kafka.ssl.certificate.location = +openwifi.kafka.ssl.key.location = +openwifi.kafka.ssl.key.password = # # This section select which form of persistence you need diff --git a/src/framework/ConfigurationValidator.cpp b/src/framework/ConfigurationValidator.cpp index a7f1d7e..0b9dba3 100644 --- a/src/framework/ConfigurationValidator.cpp +++ b/src/framework/ConfigurationValidator.cpp @@ -2414,7 +2414,7 @@ namespace OpenWifi { } else if(format == "uc-mac") { if(std::regex_match(value,mac_regex)) return; - throw std::invalid_argument(value + " is not a valid MAC: should be something like 2e60:3500::/64."); + throw std::invalid_argument(value + " is not a valid MAC: should be something like 11:22:33:44:55:66"); } else if(format == "uc-timeout") { if(std::regex_match(value,uc_timeout_regex)) return; diff --git a/src/framework/MicroService.h b/src/framework/MicroService.h index 03e3e36..34f18a9 100644 --- a/src/framework/MicroService.h +++ b/src/framework/MicroService.h @@ -3856,12 +3856,30 @@ namespace OpenWifi { KafkaManager()->Logger().error(Poco::format("kafka-error: %d, reason: %s", error, reason)); } + inline void AddKafkaSecurity(cppkafka::Configuration & Config) { + auto CA = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.ca.location",""); + auto Certificate = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.certificate.location",""); + auto Key = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.location",""); + auto Password = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.password",""); + + if(CA.empty() || Certificate.empty() || Key.empty()) + return; + + Config.set("ssl.ca.location", CA); + Config.set("ssl.certificate.location", Certificate); + Config.set("ssl.key.location", Key); + if(!Password.empty()) + Config.set("ssl.key.password", Password); + } + inline void KafkaProducer::run() { cppkafka::Configuration Config({ { "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") }, { "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") } }); + AddKafkaSecurity(Config); + Config.set_log_callback(KafkaLoggerFun); Config.set_error_callback(KafkaErrorFun); @@ -3875,7 +3893,7 @@ namespace OpenWifi { Poco::AutoPtr Note(Queue_.waitDequeueNotification()); while(Note && Running_) { - KafkaMessage * Msg = dynamic_cast(Note.get()); + auto Msg = dynamic_cast(Note.get()); if(Msg!= nullptr) { Producer.produce( cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload())); @@ -3894,6 +3912,8 @@ namespace OpenWifi { { "enable.partition.eof", false } }); + AddKafkaSecurity(Config); + Config.set_log_callback(KafkaLoggerFun); Config.set_error_callback(KafkaErrorFun);