stephb9959
2022-10-30 09:20:09 -07:00
parent 21aa3ef685
commit a30b4e1dae

View File

@@ -97,11 +97,11 @@ namespace OpenWifi {
cppkafka::MessageBuilder(Msg->Topic()).key(Msg->Key()).payload(Msg->Payload()));
}
} catch (const cppkafka::HandleException &E) {
poco_warning(KafkaManager()->Logger(),fmt::format("Caught a Kafka exception (producer): {}", E.what()));
poco_warning(Logger_,fmt::format("Caught a Kafka exception (producer): {}", E.what()));
} catch( const Poco::Exception &E) {
KafkaManager()->Logger().log(E);
Logger_.log(E);
} catch (...) {
poco_error(KafkaManager()->Logger(),"std::exception");
poco_error(Logger_,"std::exception");
}
Note = Queue_.waitDequeueNotification();
}
@@ -132,15 +132,15 @@ namespace OpenWifi {
Config.set_default_topic_configuration(topic_config);
cppkafka::Consumer Consumer(Config);
Consumer.set_assignment_callback([](cppkafka::TopicPartitionList& partitions) {
Consumer.set_assignment_callback([&](cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
KafkaManager()->Logger().information(fmt::format("Partition assigned: {}...",
Logger_.information(fmt::format("Partition assigned: {}...",
partitions.front().get_partition()));
}
});
Consumer.set_revocation_callback([](const cppkafka::TopicPartitionList& partitions) {
Consumer.set_revocation_callback([&](const cppkafka::TopicPartitionList& partitions) {
if(!partitions.empty()) {
KafkaManager()->Logger().information(fmt::format("Partition revocation: {}...",
Logger_.information(fmt::format("Partition revocation: {}...",
partitions.front().get_partition()));
}
});
@@ -161,7 +161,7 @@ namespace OpenWifi {
continue;
if (Msg.get_error()) {
if (!Msg.is_eof()) {
poco_error(KafkaManager()->Logger(),fmt::format("Error: {}", Msg.get_error().to_string()));
poco_error(Logger_,fmt::format("Error: {}", Msg.get_error().to_string()));
}
if(!AutoCommit)
Consumer.async_commit(Msg);
@@ -172,11 +172,11 @@ namespace OpenWifi {
Consumer.async_commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(KafkaManager()->Logger(),fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
poco_warning(Logger_,fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
} catch (const Poco::Exception &E) {
KafkaManager()->Logger().log(E);
Logger_.log(E);
} catch (...) {
poco_error(KafkaManager()->Logger(),"std::exception");
poco_error(Logger_,"std::exception");
}
}
Consumer.unsubscribe();