Adding Kafka SSL

This commit is contained in:
stephb9959
2022-02-20 10:43:20 -08:00
parent 0a7fc52d06
commit 6f2eab4c53
3 changed files with 26 additions and 2 deletions

2
build
View File

@@ -1 +1 @@
2
3

View File

@@ -40,6 +40,10 @@ openwifi.kafka.enable = true
openwifi.kafka.brokerlist = a1.arilia.com:9092
openwifi.kafka.auto.commit = false
openwifi.kafka.queue.buffering.max.ms = 50
openwifi.kafka.ssl.ca.location =
openwifi.kafka.ssl.certificate.location =
openwifi.kafka.ssl.key.location =
openwifi.kafka.ssl.key.password =
alb.enable = true
alb.port = 16107

View File

@@ -3856,12 +3856,30 @@ namespace OpenWifi {
KafkaManager()->Logger().error(Poco::format("kafka-error: %d, reason: %s", error, reason));
}
inline void AddKafkaSecurity(cppkafka::Configuration & Config) {
auto CA = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.ca.location","");
auto Certificate = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.certificate.location","");
auto Key = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.location","");
auto Password = MicroService::instance().ConfigGetString("openwifi.kafka.ssl.key.password","");
if(CA.empty() || Certificate.empty() || Key.empty())
return;
Config.set("ssl.ca.location", CA);
Config.set("ssl.certificate.location", Certificate);
Config.set("ssl.key.location", Key);
if(!Password.empty())
Config.set("ssl.key.password", Password);
}
inline void KafkaProducer::run() {
cppkafka::Configuration Config({
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") }
});
AddKafkaSecurity(Config);
Config.set_log_callback(KafkaLoggerFun);
Config.set_error_callback(KafkaErrorFun);
@@ -3875,7 +3893,7 @@ namespace OpenWifi {
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) {
KafkaMessage * Msg = dynamic_cast<KafkaMessage*>(Note.get());
auto Msg = dynamic_cast<KafkaMessage*>(Note.get());
if(Msg!= nullptr) {
Producer.produce(
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
@@ -3894,6 +3912,8 @@ namespace OpenWifi {
{ "enable.partition.eof", false }
});
AddKafkaSecurity(Config);
Config.set_log_callback(KafkaLoggerFun);
Config.set_error_callback(KafkaErrorFun);