diff --git a/src/KafkaManager.cpp b/src/KafkaManager.cpp index b8f40b2..32f18e2 100644 --- a/src/KafkaManager.cpp +++ b/src/KafkaManager.cpp @@ -55,6 +55,7 @@ namespace uCentral { void KafkaManager::ProducerThr() { cppkafka::Configuration Config({ + { "client.id", Daemon()->ConfigGetString("ucentral.kafka.client.id") }, { "metadata.broker.list", Daemon()->ConfigGetString("ucentral.kafka.brokerlist") } }); SystemInfoWrapper_ = R"lit({ "system" : { "id" : )lit" + @@ -88,9 +89,10 @@ namespace uCentral { void KafkaManager::ConsumerThr() { cppkafka::Configuration Config({ + { "client.id", Daemon()->ConfigGetString("ucentral.kafka.client.id") }, + { "metadata.broker.list", Daemon()->ConfigGetString("ucentral.kafka.brokerlist") }, { "group.id", Daemon()->ConfigGetString("ucentral.kafka.group.id") }, { "enable.auto.commit", Daemon()->ConfigGetBool("ucentral.kafka.auto.commit",false) }, - { "metadata.broker.list", Daemon()->ConfigGetString("ucentral.kafka.brokerlist") }, { "auto.offset.reset", "latest" } , { "enable.partition.eof", false } }); diff --git a/src/Kafka_topics.h b/src/Kafka_topics.h index bb2b028..e6636b3 100644 --- a/src/Kafka_topics.h +++ b/src/Kafka_topics.h @@ -27,7 +27,7 @@ namespace uCentral::KafkaTopics { static const std::string PUBLIC{"publicEndPoint"}; static const std::string PRIVATE{"privateEndPoint"}; static const std::string KEY{"key"}; - static const std::string VERSION{"version"}; + static const std::string VRSN{"version"}; static const std::string TOKEN{"token"}; } } diff --git a/src/MicroService.cpp b/src/MicroService.cpp index af640ee..8d38982 100644 --- a/src/MicroService.cpp +++ b/src/MicroService.cpp @@ -72,7 +72,7 @@ namespace uCentral { if( Object->has(KafkaTopics::ServiceEvents::Fields::TYPE) && Object->has(KafkaTopics::ServiceEvents::Fields::PUBLIC) && Object->has(KafkaTopics::ServiceEvents::Fields::PRIVATE) && - Object->has(KafkaTopics::ServiceEvents::Fields::VERSION) && + Object->has(KafkaTopics::ServiceEvents::Fields::VRSN) && Object->has(KafkaTopics::ServiceEvents::Fields::KEY)) { if (Event == KafkaTopics::ServiceEvents::EVENT_KEEP_ALIVE && Services_.find(ID) != Services_.end()) { @@ -88,7 +88,7 @@ namespace uCentral { .PrivateEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PRIVATE).toString(), .PublicEndPoint = Object->get(KafkaTopics::ServiceEvents::Fields::PUBLIC).toString(), .AccessKey = Object->get(KafkaTopics::ServiceEvents::Fields::KEY).toString(), - .Version = Object->get(KafkaTopics::ServiceEvents::Fields::VERSION).toString(), + .Version = Object->get(KafkaTopics::ServiceEvents::Fields::VRSN).toString(), .LastUpdate = (uint64_t)std::time(nullptr)}; for (const auto &[Id, Svc] : Services_) { logger().information(Poco::format("ID: %Lu Type: %s EndPoint: %s",Id,Svc.Type,Svc.PrivateEndPoint)); @@ -406,7 +406,7 @@ namespace uCentral { Obj.set(KafkaTopics::ServiceEvents::Fields::PUBLIC,MyPublicEndPoint_); Obj.set(KafkaTopics::ServiceEvents::Fields::PRIVATE,MyPrivateEndPoint_); Obj.set(KafkaTopics::ServiceEvents::Fields::KEY,MyHash_); - Obj.set(KafkaTopics::ServiceEvents::Fields::VERSION,Version_); + Obj.set(KafkaTopics::ServiceEvents::Fields::VRSN,Version_); std::stringstream ResultText; Poco::JSON::Stringifier::stringify(Obj, ResultText); return ResultText.str(); diff --git a/ucentralfms.properties b/ucentralfms.properties index 1e4132e..5094a68 100644 --- a/ucentralfms.properties +++ b/ucentralfms.properties @@ -58,6 +58,7 @@ alb.port = 16104 # Kafka # ucentral.kafka.group.id = firmware +ucentral.kafka.client.id = firmware1 ucentral.kafka.enable = true ucentral.kafka.brokerlist = a1.arilia.com:9092 ucentral.kafka.auto.commit = false