Adding kafka ssl

This commit is contained in:
stephb9959
2022-02-20 10:41:44 -08:00
parent b76b73c0e2
commit 2212dc8e71
4 changed files with 27 additions and 3 deletions

2
build
View File

@@ -1 +1 @@
2 3

View File

@@ -65,6 +65,10 @@ openwifi.kafka.enable = true
openwifi.kafka.brokerlist = a1.arilia.com:9092 openwifi.kafka.brokerlist = a1.arilia.com:9092
openwifi.kafka.auto.commit = false openwifi.kafka.auto.commit = false
openwifi.kafka.queue.buffering.max.ms = 50 openwifi.kafka.queue.buffering.max.ms = 50
openwifi.kafka.ssl.ca.location =
openwifi.kafka.ssl.certificate.location =
openwifi.kafka.ssl.key.location =
openwifi.kafka.ssl.key.password =
# #
# This section select which form of persistence you need # This section select which form of persistence you need

View File

@@ -2414,7 +2414,7 @@ namespace OpenWifi {
} else if(format == "uc-mac") { } else if(format == "uc-mac") {
if(std::regex_match(value,mac_regex)) if(std::regex_match(value,mac_regex))
return; return;
throw std::invalid_argument(value + " is not a valid MAC: should be something like 2e60:3500::/64."); throw std::invalid_argument(value + " is not a valid MAC: should be something like 11:22:33:44:55:66");
} else if(format == "uc-timeout") { } else if(format == "uc-timeout") {
if(std::regex_match(value,uc_timeout_regex)) if(std::regex_match(value,uc_timeout_regex))
return; return;

View File

@@ -3856,12 +3856,30 @@ namespace OpenWifi {
KafkaManager()->Logger().error(Poco::format("kafka-error: %d, reason: %s", error, reason)); KafkaManager()->Logger().error(Poco::format("kafka-error: %d, reason: %s", error, reason));
} }
inline void AddKafkaSecurity(cppkafka::Configuration & Config) {
auto CA = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.ca.location","");
auto Certificate = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.certificate.location","");
auto Key = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.location","");
auto Password = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.password","");
if(CA.empty() || Certificate.empty() || Key.empty())
return;
Config.set("ssl.ca.location", CA);
Config.set("ssl.certificate.location", Certificate);
Config.set("ssl.key.location", Key);
if(!Password.empty())
Config.set("ssl.key.password", Password);
}
inline void KafkaProducer::run() { inline void KafkaProducer::run() {
cppkafka::Configuration Config({ cppkafka::Configuration Config({
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") }, { "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") } { "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") }
}); });
AddKafkaSecurity(Config);
Config.set_log_callback(KafkaLoggerFun); Config.set_log_callback(KafkaLoggerFun);
Config.set_error_callback(KafkaErrorFun); Config.set_error_callback(KafkaErrorFun);
@@ -3875,7 +3893,7 @@ namespace OpenWifi {
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification()); Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) { while(Note && Running_) {
KafkaMessage * Msg = dynamic_cast<KafkaMessage*>(Note.get()); auto Msg = dynamic_cast<KafkaMessage*>(Note.get());
if(Msg!= nullptr) { if(Msg!= nullptr) {
Producer.produce( Producer.produce(
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload())); cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
@@ -3894,6 +3912,8 @@ namespace OpenWifi {
{ "enable.partition.eof", false } { "enable.partition.eof", false }
}); });
AddKafkaSecurity(Config);
Config.set_log_callback(KafkaLoggerFun); Config.set_log_callback(KafkaLoggerFun);
Config.set_error_callback(KafkaErrorFun); Config.set_error_callback(KafkaErrorFun);