stephb9959
2023-09-24 10:54:45 -07:00
parent 1d077b945d
commit e71b83ced7
3 changed files with 59 additions and 139 deletions

2
build
View File

@@ -1 +1 @@
31
32

View File

@@ -6,6 +6,7 @@
#include "fmt/format.h"
#include "framework/MicroServiceFuncs.h"
#include "cppkafka/utils/consumer_dispatcher.h"
namespace OpenWifi {
@@ -159,45 +160,49 @@ namespace OpenWifi {
}
});
bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);
// bool AutoCommit = MicroServiceConfigGetBool("openwifi.kafka.auto.commit", false);
// auto BatchSize = MicroServiceConfigGetInt("openwifi.kafka.consumer.batchsize", 100);
Types::StringVec Topics;
KafkaManager()->Topics(Topics);
std::for_each(Topics_.begin(),Topics_.end(),
[&](const std::string & T) { Topics.emplace_back(T); });
Consumer.subscribe(Topics);
Running_ = true;
std::vector<cppkafka::Message> MsgVec;
while (Running_) {
try {
MsgVec.clear();
MsgVec.reserve(BatchSize);
MsgVec = Consumer.poll_batch(BatchSize, std::chrono::milliseconds(1000));
for (auto const &Msg : MsgVec) {
if (!Msg)
continue;
if (Msg.get_error()) {
if (!Msg.is_eof()) {
poco_error(Logger_,
fmt::format("Error: {}", Msg.get_error().to_string()));
Dispatcher_ = std::make_unique<cppkafka::ConsumerDispatcher>(Consumer);
Dispatcher_->run(
// Callback executed whenever a new message is consumed
[&](cppkafka::Message msg) {
// Print the key (if any)
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(msg.get_topic());
if (It != Notifiers_.end()) {
const auto &FL = It->second;
for (const auto &[CallbackFunc, _] : FL) {
try {
CallbackFunc(msg.get_key(), msg.get_payload());
} catch(const Poco::Exception &E) {
} catch(...) {
}
if (!AutoCommit)
Consumer.commit(Msg);
continue;
}
KafkaManager()->Dispatch(Msg.get_topic().c_str(), Msg.get_key(), Msg.get_payload());
if (!AutoCommit)
Consumer.commit(Msg);
}
} catch (const cppkafka::HandleException &E) {
poco_warning(Logger_,
fmt::format("Caught a Kafka exception (consumer): {}", E.what()));
} catch (const Poco::Exception &E) {
Logger_.log(E);
} catch (...) {
poco_error(Logger_, "std::exception");
Consumer.commit(msg);
},
// Whenever there's an error (other than the EOF soft error)
[&Logger_](cppkafka::Error error) {
poco_warning(Logger_,fmt::format("Error: {}", error.to_string()));
},
// Whenever EOF is reached on a partition, print this
[&Logger_](cppkafka::ConsumerDispatcher::EndOfFile, const cppkafka::TopicPartition& topic_partition) {
poco_debug(Logger_,fmt::format("Partition {} EOF", topic_partition.get_partition()));
}
}
);
Consumer.unsubscribe();
poco_information(Logger_, "Stopped...");
}
@@ -225,7 +230,6 @@ namespace OpenWifi {
void KafkaConsumer::Start() {
if (!Running_) {
Running_ = true;
Worker_.start(*this);
}
}
@@ -233,29 +237,16 @@ namespace OpenWifi {
void KafkaConsumer::Stop() {
if (Running_) {
Running_ = false;
Worker_.wakeUp();
if(Dispatcher_) {
Dispatcher_->stop();
}
Worker_.join();
}
}
void KafkaDispatcher::Start() {
if (!Running_) {
Running_ = true;
Worker_.start(*this);
}
}
void KafkaDispatcher::Stop() {
if (Running_) {
Running_ = false;
Queue_.wakeUpAll();
Worker_.join();
}
}
auto KafkaDispatcher::RegisterTopicWatcher(const std::string &Topic,
std::uint64_t KafkaConsumer::RegisterTopicWatcher(const std::string &Topic,
Types::TopicNotifyFunction &F) {
std::lock_guard G(Mutex_);
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(Topic);
if (It == Notifiers_.end()) {
Types::TopicNotifyFunctionList L;
@@ -264,11 +255,12 @@ namespace OpenWifi {
} else {
It->second.emplace(It->second.end(), std::make_pair(F, FunctionId_));
}
Topics_.insert(Topic);
return FunctionId_++;
}
void KafkaDispatcher::UnregisterTopicWatcher(const std::string &Topic, int Id) {
std::lock_guard G(Mutex_);
void KafkaConsumer::UnregisterTopicWatcher(const std::string &Topic, int Id) {
std::lock_guard G(ConsumerMutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Types::TopicNotifyFunctionList &L = It->second;
@@ -280,56 +272,17 @@ namespace OpenWifi {
}
}
void KafkaDispatcher::Dispatch(const char *Topic, const std::string &Key,
const std::string & Payload) {
std::lock_guard G(Mutex_);
auto It = Notifiers_.find(Topic);
if (It != Notifiers_.end()) {
Queue_.enqueueNotification(new KafkaMessage(Topic, Key, Payload));
}
}
void KafkaDispatcher::run() {
Poco::Logger &Logger_ =
Poco::Logger::create("KAFKA-DISPATCHER", KafkaManager()->Logger().getChannel());
poco_information(Logger_, "Starting...");
Poco::AutoPtr<Poco::Notification> Note(Queue_.waitDequeueNotification());
Utils::SetThreadName("kafka:dispatch");
while (Note && Running_) {
auto Msg = dynamic_cast<KafkaMessage *>(Note.get());
if (Msg != nullptr) {
auto It = Notifiers_.find(Msg->Topic());
if (It != Notifiers_.end()) {
const auto &FL = It->second;
for (const auto &[CallbackFunc, _] : FL) {
CallbackFunc(Msg->Key(), Msg->Payload());
}
}
}
Note = Queue_.waitDequeueNotification();
}
poco_information(Logger_, "Stopped...");
}
void KafkaDispatcher::Topics(std::vector<std::string> &T) {
T.clear();
for (const auto &[TopicName, _] : Notifiers_)
T.push_back(TopicName);
}
int KafkaManager::Start() {
if (!KafkaEnabled_)
return 0;
ConsumerThr_.Start();
ProducerThr_.Start();
Dispatcher_.Start();
return 0;
}
void KafkaManager::Stop() {
if (KafkaEnabled_) {
poco_information(Logger(), "Stopping...");
Dispatcher_.Stop();
ProducerThr_.Stop();
ConsumerThr_.Stop();
poco_information(Logger(), "Stopped...");
@@ -353,12 +306,6 @@ namespace OpenWifi {
}
}
void KafkaManager::Dispatch(const char *Topic, const std::string &Key,
const std::string &Payload) {
Dispatcher_.Dispatch(Topic, Key, Payload);
}
[[nodiscard]] std::string KafkaManager::WrapSystemId(const std::string & PayLoad) {
return fmt::format( R"lit({{ "system" : {{ "id" : {},
"host" : "{}" }},
@@ -366,23 +313,6 @@ namespace OpenWifi {
MicroServicePrivateEndPoint(), PayLoad ) ;
}
uint64_t KafkaManager::RegisterTopicWatcher(const std::string &Topic,
Types::TopicNotifyFunction &F) {
if (KafkaEnabled_) {
return Dispatcher_.RegisterTopicWatcher(Topic, F);
} else {
return 0;
}
}
void KafkaManager::UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
if (KafkaEnabled_) {
Dispatcher_.UnregisterTopicWatcher(Topic, Id);
}
}
void KafkaManager::Topics(std::vector<std::string> &T) { Dispatcher_.Topics(T); }
void KafkaManager::PartitionAssignment(const cppkafka::TopicPartitionList &partitions) {
poco_information(
Logger(), fmt::format("Partition assigned: {}...", partitions.front().get_partition()));

View File

@@ -39,7 +39,7 @@ namespace OpenWifi {
void Produce(const char *Topic, const std::string &Key, const std::string & Payload);
private:
std::recursive_mutex Mutex_;
std::mutex Mutex_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
Poco::NotificationQueue Queue_;
@@ -47,33 +47,22 @@ namespace OpenWifi {
class KafkaConsumer : public Poco::Runnable {
public:
void run() override;
void Start();
void Stop();
private:
std::recursive_mutex Mutex_;
Poco::Thread Worker_;
std::mutex ConsumerMutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
};
uint64_t FunctionId_ = 1;
std::unique_ptr<cppkafka::ConsumerDispatcher> Dispatcher_;
std::set<std::string> Topics_;
class KafkaDispatcher : public Poco::Runnable {
public:
void Start();
void Stop();
auto RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void run() override;
friend class KafkaManager;
std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, int Id);
void Dispatch(const char *Topic, const std::string &Key, const std::string & Payload);
void run() override;
void Topics(std::vector<std::string> &T);
private:
std::recursive_mutex Mutex_;
Types::NotifyTable Notifiers_;
Poco::Thread Worker_;
mutable std::atomic_bool Running_ = false;
uint64_t FunctionId_ = 1;
Poco::NotificationQueue Queue_;
};
class KafkaManager : public SubSystemServer {
@@ -96,19 +85,20 @@ namespace OpenWifi {
void PostMessage(const char *topic, const std::string &key,
const Poco::JSON::Object &Object, bool WrapMessage = true);
void Dispatch(const char *Topic, const std::string &Key, const std::string &Payload);
[[nodiscard]] std::string WrapSystemId(const std::string & PayLoad);
[[nodiscard]] inline bool Enabled() const { return KafkaEnabled_; }
uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F);
void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id);
void Topics(std::vector<std::string> &T);
inline std::uint64_t RegisterTopicWatcher(const std::string &Topic, Types::TopicNotifyFunction &F) {
return ConsumerThr_.RegisterTopicWatcher(Topic,F);
}
inline void UnregisterTopicWatcher(const std::string &Topic, uint64_t Id) {
return ConsumerThr_.UnregisterTopicWatcher(Topic,Id);
}
private:
bool KafkaEnabled_ = false;
std::string SystemInfoWrapper_;
KafkaProducer ProducerThr_;
KafkaConsumer ConsumerThr_;
KafkaDispatcher Dispatcher_;
void PartitionAssignment(const cppkafka::TopicPartitionList &partitions);
void PartitionRevocation(const cppkafka::TopicPartitionList &partitions);