Compare commits

...

1 Commits

Author SHA1 Message Date
Xu Bin
de89dc2963 optimize host query (#2385) 2024-12-26 14:48:45 +08:00
7 changed files with 257 additions and 54 deletions

View File

@@ -96,7 +96,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats, targetCache)
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)

View File

@@ -118,7 +118,7 @@ func (s *Scheduler) syncAlertRules() {
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
alertRule := NewAlertRuleWorker(s.ctx, rule, dsId, processor, s.promClients, s.tdengineClients, s.targetCache)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() {
@@ -127,7 +127,7 @@ func (s *Scheduler) syncAlertRules() {
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
alertRule := NewAlertRuleWorker(s.ctx, rule, 0, processor, s.promClients, s.tdengineClients, s.targetCache)
alertRuleWorkers[alertRule.Hash()] = alertRule
} else {
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule

View File

@@ -15,6 +15,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/hash"
@@ -46,9 +47,9 @@ type AlertRuleWorker struct {
Scheduler *cron.Cron
HostAndDeviceIdentCache sync.Map
DeviceIdentHook func(paramQuery models.ParamQuery) ([]string, error)
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
TargetCache *memsto.TargetCacheType
}
const (
@@ -67,20 +68,20 @@ const (
Inner JoinType = "inner"
)
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, ctx *ctx.Context) *AlertRuleWorker {
func NewAlertRuleWorker(ctx *ctx.Context, rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, targetCache *memsto.TargetCacheType) *AlertRuleWorker {
arw := &AlertRuleWorker{
DatasourceId: datasourceId,
Quit: make(chan struct{}),
Rule: rule,
Processor: Processor,
PromClients: promClients,
TdengineClients: tdengineClients,
Ctx: ctx,
HostAndDeviceIdentCache: sync.Map{},
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
PromClients: promClients,
TdengineClients: tdengineClients,
Ctx: ctx,
DeviceIdentHook: func(paramQuery models.ParamQuery) ([]string, error) {
return nil, nil
},
TargetCache: targetCache,
}
interval := rule.PromEvalInterval
@@ -148,7 +149,6 @@ func (arw *AlertRuleWorker) Eval() {
return
}
arw.Processor.Stats.CounterRuleEval.WithLabelValues().Inc()
arw.HostAndDeviceIdentCache = sync.Map{}
typ := cachedRule.GetRuleType()
var (
@@ -557,35 +557,21 @@ func (arw *AlertRuleWorker) getHostIdents(paramQuery models.ParamQuery) ([]strin
var params []string
q, _ := json.Marshal(paramQuery.Query)
cacheKey := "Host_" + string(q)
value, hit := arw.HostAndDeviceIdentCache.Load(cacheKey)
if idents, ok := value.([]string); hit && ok {
params = idents
return params, nil
}
var queries []models.HostQuery
err := json.Unmarshal(q, &queries)
if err != nil {
return nil, err
}
hostsQuery := models.GetHostsQuery(queries)
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
var lst []*models.Target
err = session.Find(&lst).Error
if err != nil {
return nil, err
hosts := arw.TargetCache.GetHostIdentsQuery(queries)
for i := range hosts {
params = append(params, hosts[i].Ident)
}
for i := range lst {
params = append(params, lst[i].Ident)
}
arw.HostAndDeviceIdentCache.Store(cacheKey, params)
return params, nil
}
func (arw *AlertRuleWorker) getDeviceIdents(paramQuery models.ParamQuery) ([]string, error) {
return arw.DeviceIdentHook(arw, paramQuery)
return arw.DeviceIdentHook(paramQuery)
}
// 生成所有排列组合

View File

@@ -28,17 +28,19 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
var f TargetQuery
ginx.BindJSON(c, &f)
query := models.GetHostsQuery(f.Filters)
hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
ginx.Dangerous(err)
total, err := models.TargetCountByFilter(rt.Ctx, query)
ginx.Dangerous(err)
// todo 这里也走缓存吗?有 limit 和 offset
hosts := rt.TargetCache.GetHostIdentsQuery(f.Filters)
//query := models.GetHostsQuery(f.Filters)
//
//hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
//ginx.Dangerous(err)
//
//total, err := models.TargetCountByFilter(rt.Ctx, query)
//ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
"list": hosts,
"total": total,
"total": len(hosts),
}, nil)
}
@@ -537,7 +539,7 @@ func (rt *Router) checkTargetPerm(c *gin.Context, idents []string) {
func (rt *Router) targetsOfAlertRule(c *gin.Context) {
engineName := ginx.QueryStr(c, "engine_name", "")
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName)
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName, rt.TargetCache.GetHostIdentsQuery)
ret := make(map[string]map[int64][]string)
for en, v := range m {
if en != engineName {

View File

@@ -19,9 +19,11 @@ type TargetsOfAlertRuleCacheType struct {
sync.RWMutex
targets map[string]map[int64][]string // key: ident
targetCache *TargetCacheType
}
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats) *TargetsOfAlertRuleCacheType {
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats, targetCache *TargetCacheType) *TargetsOfAlertRuleCacheType {
tc := &TargetsOfAlertRuleCacheType{
statTotal: -1,
statLastUpdated: -1,
@@ -29,6 +31,7 @@ func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats
engineName: engineName,
stats: stats,
targets: make(map[string]map[int64][]string),
targetCache: targetCache,
}
tc.SyncTargets()
@@ -86,7 +89,7 @@ func (tc *TargetsOfAlertRuleCacheType) loopSyncTargets() {
}
func (tc *TargetsOfAlertRuleCacheType) syncTargets() error {
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName)
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName, tc.targetCache.GetHostIdentsQuery)
if err != nil {
return err
}

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"log"
"math"
"regexp"
"strings"
"sync"
"time"
@@ -27,7 +29,8 @@ type TargetCacheType struct {
redis storage.Redis
sync.RWMutex
targets map[string]*models.Target // key: ident
targets map[string]*models.Target // key: ident
groupToIdents map[int64][]string // key: group_id
}
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
@@ -61,9 +64,10 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
return true
}
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
func (tc *TargetCacheType) Set(m map[string]*models.Target, groupToIdents map[int64][]string, total, lastUpdated int64) {
tc.Lock()
tc.targets = m
tc.groupToIdents = groupToIdents
tc.Unlock()
// only one goroutine used, so no need lock
@@ -160,6 +164,7 @@ func (tc *TargetCacheType) syncTargets() error {
}
m := make(map[string]*models.Target)
groupToIdents := make(map[int64][]string)
metaMap := tc.GetHostMetas(lst)
if len(metaMap) > 0 {
@@ -172,9 +177,12 @@ func (tc *TargetCacheType) syncTargets() error {
for i := 0; i < len(lst); i++ {
m[lst[i].Ident] = lst[i]
for _, groupID := range lst[i].GroupIds {
groupToIdents[groupID] = append(groupToIdents[groupID], lst[i].Ident)
}
}
tc.Set(m, stat.Total, stat.LastUpdated)
tc.Set(m, groupToIdents, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(float64(ms))
@@ -292,3 +300,214 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo
return metaMap
}
func (tc *TargetCacheType) getAllHostIdentsWithoutLock() []string {
var idents []string
for ident, _ := range tc.targets {
idents = append(idents, ident)
}
return idents
}
func (tc *TargetCacheType) getHostIdentsByGroupIdsWithoutLock(groupIDs []int64) []string {
var targetIdents []string
for _, groupID := range groupIDs {
if idents, has := tc.groupToIdents[groupID]; has {
targetIdents = append(targetIdents, idents...)
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsExcludeGroupIdsWithoutLock(groupIDs []int64) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range groupIDs {
if idents, has := tc.groupToIdents[id]; has {
for _, ident := range idents {
exclude[ident] = struct{}{}
}
}
}
for ident, _ := range tc.targets {
if _, ok := exclude[ident]; ok {
continue
}
targetIdents = append(targetIdents, ident)
}
return targetIdents
}
func (tc *TargetCacheType) getHostsByIdentsWithoutLock(idents []string) []*models.Target {
var targets []*models.Target
for _, ident := range idents {
if target, has := tc.targets[ident]; has {
targets = append(targets, target)
}
}
return targets
}
func (tc *TargetCacheType) getHostIdentsExcludeIdentsWithoutLock(idents []string) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range idents {
exclude[id] = struct{}{}
}
for ident, _ := range tc.targets {
if _, ok := exclude[ident]; ok {
continue
}
targetIdents = append(targetIdents, ident)
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsMatchIdentsWithoutLock(identPatterns []string) []string {
var targetIdents []string
for ident, _ := range tc.targets {
for _, identPattern := range identPatterns {
// 模糊匹配转正则
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
targetIdents = append(targetIdents, ident)
break
}
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsByTagsWithoutLock(tags []string) []string {
var targetIdents []string
tagMap := make(map[string]struct{})
for _, tag := range tags {
tagMap[tag] = struct{}{}
}
for ident, target := range tc.targets {
for _, tag := range target.TagsJSON {
if _, ok := tagMap[tag]; ok {
targetIdents = append(targetIdents, ident)
break
}
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsExcludeTagsWithoutLock(tags []string) []string {
var targetIdents []string
for ident, target := range tc.targets {
exclude := false
curTags := make(map[string]struct{})
for _, tag := range target.TagsJSON {
curTags[tag] = struct{}{}
}
for _, tag := range tags {
if _, ok := curTags[tag]; ok {
exclude = true
break
}
}
if !exclude {
targetIdents = append(targetIdents, ident)
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsMatchExcludeIdentsWithoutLock(identPatterns []string) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range identPatterns {
exclude[id] = struct{}{}
}
for ident, _ := range tc.targets {
has := false
for _, identPattern := range identPatterns {
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
has = true
break
}
}
if !has {
targetIdents = append(targetIdents, ident)
}
}
return targetIdents
}
func (tc *TargetCacheType) GetHostIdentsQuery(queries []models.HostQuery) []*models.Target {
tc.Lock()
defer tc.Unlock()
targetIdents := tc.getAllHostIdentsWithoutLock()
for _, q := range queries {
var cur []string
switch q.Key {
case "group_ids":
ids := models.ParseInt64(q.Values)
if q.Op == "==" {
cur = tc.getHostIdentsByGroupIdsWithoutLock(ids)
} else {
cur = tc.getHostIdentsExcludeGroupIdsWithoutLock(ids)
}
case "tags":
var tags []string
for _, v := range q.Values {
if v == nil {
continue
}
tags = append(tags, v.(string))
}
if q.Op == "==" {
cur = tc.getHostIdentsByTagsWithoutLock(tags)
} else {
cur = tc.getHostIdentsExcludeTagsWithoutLock(tags)
}
case "hosts":
var idents []string
for _, v := range q.Values {
if v == nil {
continue
}
idents = append(idents, v.(string))
}
if q.Op == "==" {
cur = idents
} else if q.Op == "!=" {
cur = tc.getHostIdentsExcludeIdentsWithoutLock(idents)
} else if q.Op == "=~" {
cur = tc.getHostIdentsMatchIdentsWithoutLock(idents)
} else if q.Op == "!~" {
cur = tc.getHostIdentsMatchExcludeIdentsWithoutLock(idents)
}
default:
// all_hosts 与其他未知条件不改变已有集合
cur = targetIdents
}
targetIdents = intersection(targetIdents, cur)
}
return tc.getHostsByIdentsWithoutLock(targetIdents)
}
func intersection(a, b []string) []string {
m := make(map[string]struct{})
for _, v := range a {
m[v] = struct{}{}
}
var c []string
for _, v := range b {
if _, ok := m[v]; ok {
c = append(c, v)
}
}
return c
}

View File

@@ -1231,7 +1231,7 @@ func AlertRuleUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error {
return nil
}
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string) (map[string]map[int64][]string, error) {
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string, getTargetFunc func(queries []HostQuery) []*Target) (map[string]map[int64][]string, error) {
if !ctx.IsCenter {
m, err := poster.GetByUrls[map[string]map[int64][]string](ctx, "/v1/n9e/targets-of-alert-rule?engine_name="+engineName)
return m, err
@@ -1255,16 +1255,9 @@ func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string) (map[string]
continue
}
query := GetHostsQuery(rule.Queries)
session := TargetFilterQueryBuild(ctx, query, 0, 0)
var lst []*Target
err := session.Find(&lst).Error
if err != nil {
logger.Errorf("failed to query targets: %v", err)
continue
}
hosts := getTargetFunc(rule.Queries)
for _, target := range lst {
for _, target := range hosts {
if _, exists := m[target.EngineName]; !exists {
m[target.EngineName] = make(map[int64][]string)
}