Hardening Kafka errors in producer when there is a kafka disconnection.

This commit is contained in:
stephb9959
2022-05-17 12:16:46 -07:00
parent 9d654535a4
commit 9bc6372a42

View File

@@ -4123,8 +4123,10 @@ namespace OpenWifi {
inline void KafkaProducer::run() {
cppkafka::Configuration Config({
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") }
{ "client.id", MicroService::instance().ConfigGetString("openwifi.kafka.client.id") },
{ "metadata.broker.list", MicroService::instance().ConfigGetString("openwifi.kafka.brokerlist") },
{ "reconnect.backoff.max.ms", 5000 },
{ "reconnect.backoff.ms", 100 }
});
AddKafkaSecurity(Config);
@@ -4142,11 +4144,17 @@ namespace OpenWifi {
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
while(Note && Running_) {
auto Msg = dynamic_cast<KafkaMessage*>(Note.get());
if(Msg!= nullptr) {
Producer.produce(
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
}
try {
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
if (Msg != nullptr) {
Producer.produce(
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
}
} catch( const Poco::Exception &E) {
KafkaManager()->Logger().log(E);
} catch (const cppkafka::HandleException &E) {
KafkaManager()->Logger().error(fmt::format("{}: Exception --> {}", E.get_error().to_string(), E.what()));
}
Note = Queue_.waitDequeueNotification();
}
}
@@ -4158,7 +4166,9 @@ namespace OpenWifi {
{ "group.id", MicroService::instance().ConfigGetString("openwifi.kafka.group.id") },
{ "enable.auto.commit", MicroService::instance().ConfigGetBool("openwifi.kafka.auto.commit",false) },
{ "auto.offset.reset", "latest" } ,
{ "enable.partition.eof", false }
{ "enable.partition.eof", false },
{ "reconnect.backoff.max.ms", 5000 },
{ "reconnect.backoff.ms", 100 }
});
AddKafkaSecurity(Config);