Compare commits

...

1 Commits

Author SHA1 Message Date
smx_Morgan
2816d006a0 refactor: optimize host alert query (#2507) 2025-03-27 15:34:18 +08:00
4 changed files with 340 additions and 8 deletions

View File

@@ -101,7 +101,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
promClients *prom.PromClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType, messageTemplateCache *memsto.MessageTemplateCacheType) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats, targetCache, alertRuleCache)
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
go models.InitNotifyChannel(ctx)

View File

@@ -81,6 +81,13 @@ func (arc *AlertRuleCacheType) GetRuleIds() []int64 {
return list
}
// GetAll returns all alert rules
func (arc *AlertRuleCacheType) GetAll() map[int64]*models.AlertRule {
arc.RLock()
defer arc.RUnlock()
return arc.rules
}
func (arc *AlertRuleCacheType) SyncAlertRules() {
err := arc.syncAlertRules()
if err != nil {

View File

@@ -1,7 +1,10 @@
package memsto
import (
"encoding/json"
"log"
"regexp"
"strings"
"sync"
"time"
@@ -19,9 +22,17 @@ type TargetsOfAlertRuleCacheType struct {
sync.RWMutex
targets map[string]map[int64][]string // key: ident
targetCache *TargetCacheType
ruleCache *AlertRuleCacheType
targetsByGroup map[int64][]*models.Target
targetsByIdent map[string][]*models.Target
targetsByTag map[string][]*models.Target
allTargets map[string]*models.Target
}
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats) *TargetsOfAlertRuleCacheType {
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats, targetCache *TargetCacheType, ruleCache *AlertRuleCacheType) *TargetsOfAlertRuleCacheType {
tc := &TargetsOfAlertRuleCacheType{
statTotal: -1,
statLastUpdated: -1,
@@ -29,6 +40,8 @@ func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats
engineName: engineName,
stats: stats,
targets: make(map[string]map[int64][]string),
targetCache: targetCache,
ruleCache: ruleCache,
}
tc.SyncTargets()
@@ -86,15 +99,319 @@ func (tc *TargetsOfAlertRuleCacheType) loopSyncTargets() {
}
func (tc *TargetsOfAlertRuleCacheType) syncTargets() error {
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName)
if err != nil {
return err
// 从缓存获取所有 targetmap
tc.updateTargetMaps()
m := make(map[string]map[int64][]string)
// 从缓存获取所有 host alert rule
rules := tc.ruleCache.GetAll()
hostrules := make(map[int64]*models.AlertRule)
for k, v := range rules {
if v.Prod == "host" {
hostrules[k] = v
}
}
logger.Debugf("get_targets_of_alert_rule total: %d engine_name:%s", len(m), tc.engineName)
for k, v := range m {
logger.Debugf("get_targets_of_alert_rule key:%s value:%v", k, v)
targetsByGroup := tc.targetsByGroup
targetsByIdent := tc.targetsByIdent
targetsByTag := tc.targetsByTag
for _, hr := range hostrules {
var rule *models.HostRuleConfig
if err := json.Unmarshal([]byte(hr.RuleConfig), &rule); err != nil {
logger.Errorf("rule:%d rule_config:%s, error:%v", hr.Id, hr.RuleConfig, err)
continue
}
if rule == nil {
logger.Errorf("rule:%d rule_config:%s, error:rule is nil", hr.Id, hr.RuleConfig)
continue
}
// 用于存放 tags 过滤的结果,先将所有 target 放到其中
resmap := make(map[int64]*models.Target)
for _, target := range tc.allTargets {
resmap[target.Id] = target
}
// 遍历 rule 的 queries根据不同的 key 进行过滤
// inMap 为符合条件的 targetnotInMap 为不符合条件的 target
// inMap 和 notInMap 可能都为 nil表示不需要过滤
for _, q := range rule.Queries {
var inMap map[int64]struct{}
var notInMap map[int64]struct{}
switch q.Key {
case "group_ids":
inMap, notInMap = filterGroupMap(targetsByGroup, q)
case "tags":
inMap, notInMap = filterTagMap(targetsByTag, q)
case "hosts":
inMap, notInMap = filterHostMap(targetsByIdent, q)
}
handleTargetFilterMap(resmap, inMap, notInMap)
}
// 将过滤后的结果放到 m 中
for _, target := range resmap {
if _, exists := m[target.EngineName]; !exists {
m[target.EngineName] = make(map[int64][]string)
}
if _, exists := m[target.EngineName][hr.Id]; !exists {
m[target.EngineName][hr.Id] = make([]string, 0)
}
m[target.EngineName][hr.Id] = append(m[target.EngineName][hr.Id], target.Ident)
}
}
tc.Set(m, 0, 0)
return nil
}
// 更新 target 相关的 map根据不同的 key包括 targetsByGroup, targetsByIdent, targetsByTag
func (tc *TargetsOfAlertRuleCacheType) updateTargetMaps() {
allTargets := tc.targetCache.GetAll()
targetsByGroup := make(map[int64][]*models.Target)
targetsByIdent := make(map[string][]*models.Target)
targetsByTag := make(map[string][]*models.Target)
for _, target := range allTargets {
if _, exists := targetsByGroup[target.GroupId]; !exists {
targetsByGroup[target.GroupId] = make([]*models.Target, 0)
}
targetsByGroup[target.GroupId] = append(targetsByGroup[target.GroupId], target)
if _, exists := targetsByIdent[target.Ident]; !exists {
targetsByIdent[target.Ident] = make([]*models.Target, 0)
}
targetsByIdent[target.Ident] = append(targetsByIdent[target.Ident], target)
// 将 hosttags 和 tags 都放到 targetsByTag 中
for _, tag := range target.HostTags {
if _, exists := targetsByTag[tag]; !exists {
targetsByTag[tag] = make([]*models.Target, 0)
}
targetsByTag[tag] = append(targetsByTag[tag], target)
}
tags := strings.Split(target.Tags, " ")
for _, tag := range tags {
if tag == "" {
continue
}
if _, exists := targetsByTag[tag]; !exists {
targetsByTag[tag] = make([]*models.Target, 0)
}
targetsByTag[tag] = append(targetsByTag[tag], target)
}
}
tc.targetsByGroup = targetsByGroup
tc.targetsByIdent = targetsByIdent
tc.targetsByTag = targetsByTag
tc.allTargets = allTargets
}
// 根据 query 过滤 group id map 中 符合条件和不符合条件的 target分别存放在 inMap 和 notInMap 中
// 当 q.Op == "==" 时,返回的 inMap 中包含所有符合条件的 target
// 当 q.Op == "!=" 时,返回的 notInMap 中包含所有不符合条件的 target
func filterGroupMap(targetMap map[int64][]*models.Target, q models.HostQuery) (inMap map[int64]struct{}, notInMap map[int64]struct{}) {
if q.Op == "==" {
inMap = make(map[int64]struct{})
// 遍历 q.Values将符合条件的 target 都放到新 map 中
for _, v := range q.Values {
key := v.(int64)
if targets, exists := targetMap[key]; exists {
// 筛选出符合条件的 target
for _, target := range targets {
inMap[target.Id] = struct{}{}
}
}
}
return inMap, nil
} else {
notInMap = make(map[int64]struct{})
// 筛选出不符合条件的 target
for _, v := range q.Values {
key := v.(int64)
if targets, exists := targetMap[key]; exists {
for _, target := range targets {
notInMap[target.Id] = struct{}{}
}
}
}
return nil, notInMap
}
}
// 针对 tags 过滤,返回两个 map一个是符合条件的 target一个是不符合条件的 target
// 因为同一个 target 可能存在多个 tag所以不能简单的将 tag 的 key 移除,而是需要知道具体的 target 是否需要移除
// 当 q.Op == "==" 时,返回的 inMap 中包含所有符合条件的 target
// 当 q.Op == "!=" 时,返回的 notInMap 中包含所有不符合条件的 target这时 inMap 为 nil
// 上级可根据 inMap 是否为 nil 来判断是 == 还是 !=
func filterTagMap(targetMap map[string][]*models.Target, q models.HostQuery) (inMap map[int64]struct{}, notInMap map[int64]struct{}) {
if q.Op == "==" {
inMap = make(map[int64]struct{})
notInMap = make(map[int64]struct{})
for _, v := range q.Values {
key := v.(string)
if targets, exists := targetMap[key]; exists {
// 筛选出符合条件的 target
for _, target := range targets {
inMap[target.Id] = struct{}{}
}
}
}
} else {
// 直接从 targetMap 中删除对应的 key
inMap = nil
notInMap = make(map[int64]struct{})
for _, v := range q.Values {
key := v.(string)
if targets, exists := targetMap[key]; exists {
// 筛选出不符合条件的 target
for _, target := range targets {
notInMap[target.Id] = struct{}{}
}
}
}
}
return inMap, notInMap
}
// // 根据 query 过滤 host map 中 符合条件和不符合条件的 target分别存放在 inMap 和 notInMap 中
// 当 q.Op == "==" 时,返回的 inMap 中包含所有符合条件的 target
// 当 q.Op == "!=" 时,返回的 notInMap 中包含所有不符合条件的 target
// 当 q.Op == "=~" 时,模糊过滤,返回的 inMap 中包含所有符合条件的 target
// 当 q.Op == "!~" 时,模糊过滤,返回的 notInMap 中包含所有不符合条件的 target
// 在 ~ 的情况下value 可能为通配符 * 或 %,支持模糊匹配
func filterHostMap(targetMap map[string][]*models.Target, q models.HostQuery) (inMap map[int64]struct{}, notInMap map[int64]struct{}) {
if q.Op == "==" {
inMap = make(map[int64]struct{})
// 遍历 q.Values将符合条件的 target 都放到 inMap 中
for _, v := range q.Values {
key := v.(string)
if targets, exists := targetMap[key]; exists {
for _, target := range targets {
inMap[target.Id] = struct{}{}
}
}
}
return inMap, nil
} else if q.Op == "!=" {
notInMap = make(map[int64]struct{})
// 遍历 q.Values将不符合条件的 target 都放到 notInMap 中
for _, v := range q.Values {
key := v.(string)
if targets, exists := targetMap[key]; exists {
for _, target := range targets {
notInMap[target.Id] = struct{}{}
}
}
}
return nil, notInMap
} else if q.Op == "=~" {
inMap = make(map[int64]struct{})
for _, v := range q.Values {
pattern := v.(string)
regex := likePatternToRegex(pattern)
re, err := regexp.Compile(regex)
if err != nil {
logger.Errorf("failed to compile regex:%s error:%v", regex, err)
continue
}
for key := range targetMap {
if re.MatchString(key) {
for _, target := range targetMap[key] {
inMap[target.Id] = struct{}{}
}
}
}
}
return inMap, nil
} else if q.Op == "!~" {
notInMap = make(map[int64]struct{})
for _, v := range q.Values {
pattern := v.(string)
regex := likePatternToRegex(pattern)
re, err := regexp.Compile(regex)
if err != nil {
logger.Errorf("failed to compile regex:%s error:%v", regex, err)
continue
}
for key := range targetMap {
if re.MatchString(key) {
for _, target := range targetMap[key] {
notInMap[target.Id] = struct{}{}
}
}
}
}
return nil, notInMap
}
return nil, nil
}
// 将 like 模式转换为正则表达式
// % * 匹配任意个字符
func likePatternToRegex(pattern string) string {
var sb strings.Builder
// 添加正则表达式起始标记
sb.WriteString("^")
for _, ch := range pattern {
switch ch {
case '%':
case '*':
// % 匹配任意个字符
sb.WriteString(".*")
default:
// 对于其他特殊正则字符,需要转义
if strings.ContainsRune(`.+?()|[]{}^$\\`, ch) {
sb.WriteString("\\")
}
sb.WriteRune(ch)
}
}
// 添加正则表达式结束标记
sb.WriteString("$")
return sb.String()
}
// 根据 inMap 和 notInMap 过滤 targetMap将不符合条件的 target 删除
// inMap 中包含的 target 都是符合条件的 target
// notInMap 中包含的 target 都是不符合条件的 target
// resmap 为需要过滤的 map, 过滤后的结果会直接修改 resmap
func handleTargetFilterMap(targetMap map[int64]*models.Target, inMap map[int64]struct{}, notInMap map[int64]struct{}) {
if inMap != nil {
for key := range targetMap {
if _, exists := inMap[key]; !exists {
delete(targetMap, key)
}
}
}
if notInMap != nil {
for key := range targetMap {
if _, exists := notInMap[key]; exists {
delete(targetMap, key)
}
}
}
}

View File

@@ -90,6 +90,14 @@ func (tc *TargetCacheType) Gets(idents []string) []*models.Target {
return targets
}
// 获取所有的 target
func (tc *TargetCacheType) GetAll() map[string]*models.Target {
tc.RLock()
defer tc.RUnlock()
return tc.targets
}
func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, now, offset int64) map[string]int64 {
tc.RLock()
defer tc.RUnlock()