mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
1 Commits
release-18
...
optimize-h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de89dc2963 |
@@ -96,7 +96,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
|
||||
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
|
||||
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
|
||||
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
|
||||
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats, targetCache)
|
||||
|
||||
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
|
||||
|
||||
|
||||
@@ -118,7 +118,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
}
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
|
||||
|
||||
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
|
||||
alertRule := NewAlertRuleWorker(s.ctx, rule, dsId, processor, s.promClients, s.tdengineClients, s.targetCache)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
}
|
||||
} else if rule.IsHostRule() {
|
||||
@@ -127,7 +127,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
continue
|
||||
}
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
|
||||
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
|
||||
alertRule := NewAlertRuleWorker(s.ctx, rule, 0, processor, s.promClients, s.tdengineClients, s.targetCache)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
} else {
|
||||
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/process"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/hash"
|
||||
@@ -46,9 +47,9 @@ type AlertRuleWorker struct {
|
||||
|
||||
Scheduler *cron.Cron
|
||||
|
||||
HostAndDeviceIdentCache sync.Map
|
||||
DeviceIdentHook func(paramQuery models.ParamQuery) ([]string, error)
|
||||
|
||||
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
|
||||
TargetCache *memsto.TargetCacheType
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -67,20 +68,20 @@ const (
|
||||
Inner JoinType = "inner"
|
||||
)
|
||||
|
||||
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, ctx *ctx.Context) *AlertRuleWorker {
|
||||
func NewAlertRuleWorker(ctx *ctx.Context, rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, targetCache *memsto.TargetCacheType) *AlertRuleWorker {
|
||||
arw := &AlertRuleWorker{
|
||||
DatasourceId: datasourceId,
|
||||
Quit: make(chan struct{}),
|
||||
Rule: rule,
|
||||
Processor: Processor,
|
||||
|
||||
PromClients: promClients,
|
||||
TdengineClients: tdengineClients,
|
||||
Ctx: ctx,
|
||||
HostAndDeviceIdentCache: sync.Map{},
|
||||
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
|
||||
PromClients: promClients,
|
||||
TdengineClients: tdengineClients,
|
||||
Ctx: ctx,
|
||||
DeviceIdentHook: func(paramQuery models.ParamQuery) ([]string, error) {
|
||||
return nil, nil
|
||||
},
|
||||
TargetCache: targetCache,
|
||||
}
|
||||
|
||||
interval := rule.PromEvalInterval
|
||||
@@ -148,7 +149,6 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
return
|
||||
}
|
||||
arw.Processor.Stats.CounterRuleEval.WithLabelValues().Inc()
|
||||
arw.HostAndDeviceIdentCache = sync.Map{}
|
||||
|
||||
typ := cachedRule.GetRuleType()
|
||||
var (
|
||||
@@ -557,35 +557,21 @@ func (arw *AlertRuleWorker) getHostIdents(paramQuery models.ParamQuery) ([]strin
|
||||
var params []string
|
||||
q, _ := json.Marshal(paramQuery.Query)
|
||||
|
||||
cacheKey := "Host_" + string(q)
|
||||
value, hit := arw.HostAndDeviceIdentCache.Load(cacheKey)
|
||||
if idents, ok := value.([]string); hit && ok {
|
||||
params = idents
|
||||
return params, nil
|
||||
}
|
||||
|
||||
var queries []models.HostQuery
|
||||
err := json.Unmarshal(q, &queries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hostsQuery := models.GetHostsQuery(queries)
|
||||
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
|
||||
var lst []*models.Target
|
||||
err = session.Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
hosts := arw.TargetCache.GetHostIdentsQuery(queries)
|
||||
for i := range hosts {
|
||||
params = append(params, hosts[i].Ident)
|
||||
}
|
||||
for i := range lst {
|
||||
params = append(params, lst[i].Ident)
|
||||
}
|
||||
arw.HostAndDeviceIdentCache.Store(cacheKey, params)
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func (arw *AlertRuleWorker) getDeviceIdents(paramQuery models.ParamQuery) ([]string, error) {
|
||||
return arw.DeviceIdentHook(arw, paramQuery)
|
||||
return arw.DeviceIdentHook(paramQuery)
|
||||
}
|
||||
|
||||
// 生成所有排列组合
|
||||
|
||||
@@ -28,17 +28,19 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
|
||||
var f TargetQuery
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
query := models.GetHostsQuery(f.Filters)
|
||||
|
||||
hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
total, err := models.TargetCountByFilter(rt.Ctx, query)
|
||||
ginx.Dangerous(err)
|
||||
// todo 这里也走缓存吗?有 limit 和 offset
|
||||
hosts := rt.TargetCache.GetHostIdentsQuery(f.Filters)
|
||||
//query := models.GetHostsQuery(f.Filters)
|
||||
//
|
||||
//hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
|
||||
//ginx.Dangerous(err)
|
||||
//
|
||||
//total, err := models.TargetCountByFilter(rt.Ctx, query)
|
||||
//ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": hosts,
|
||||
"total": total,
|
||||
"total": len(hosts),
|
||||
}, nil)
|
||||
}
|
||||
|
||||
@@ -537,7 +539,7 @@ func (rt *Router) checkTargetPerm(c *gin.Context, idents []string) {
|
||||
|
||||
func (rt *Router) targetsOfAlertRule(c *gin.Context) {
|
||||
engineName := ginx.QueryStr(c, "engine_name", "")
|
||||
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName)
|
||||
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName, rt.TargetCache.GetHostIdentsQuery)
|
||||
ret := make(map[string]map[int64][]string)
|
||||
for en, v := range m {
|
||||
if en != engineName {
|
||||
|
||||
@@ -19,9 +19,11 @@ type TargetsOfAlertRuleCacheType struct {
|
||||
|
||||
sync.RWMutex
|
||||
targets map[string]map[int64][]string // key: ident
|
||||
|
||||
targetCache *TargetCacheType
|
||||
}
|
||||
|
||||
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats) *TargetsOfAlertRuleCacheType {
|
||||
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats, targetCache *TargetCacheType) *TargetsOfAlertRuleCacheType {
|
||||
tc := &TargetsOfAlertRuleCacheType{
|
||||
statTotal: -1,
|
||||
statLastUpdated: -1,
|
||||
@@ -29,6 +31,7 @@ func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats
|
||||
engineName: engineName,
|
||||
stats: stats,
|
||||
targets: make(map[string]map[int64][]string),
|
||||
targetCache: targetCache,
|
||||
}
|
||||
|
||||
tc.SyncTargets()
|
||||
@@ -86,7 +89,7 @@ func (tc *TargetsOfAlertRuleCacheType) loopSyncTargets() {
|
||||
}
|
||||
|
||||
func (tc *TargetsOfAlertRuleCacheType) syncTargets() error {
|
||||
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName)
|
||||
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName, tc.targetCache.GetHostIdentsQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"math"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -27,7 +29,8 @@ type TargetCacheType struct {
|
||||
redis storage.Redis
|
||||
|
||||
sync.RWMutex
|
||||
targets map[string]*models.Target // key: ident
|
||||
targets map[string]*models.Target // key: ident
|
||||
groupToIdents map[int64][]string // key: group_id
|
||||
}
|
||||
|
||||
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
|
||||
@@ -61,9 +64,10 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
|
||||
func (tc *TargetCacheType) Set(m map[string]*models.Target, groupToIdents map[int64][]string, total, lastUpdated int64) {
|
||||
tc.Lock()
|
||||
tc.targets = m
|
||||
tc.groupToIdents = groupToIdents
|
||||
tc.Unlock()
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
@@ -160,6 +164,7 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
}
|
||||
|
||||
m := make(map[string]*models.Target)
|
||||
groupToIdents := make(map[int64][]string)
|
||||
|
||||
metaMap := tc.GetHostMetas(lst)
|
||||
if len(metaMap) > 0 {
|
||||
@@ -172,9 +177,12 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].Ident] = lst[i]
|
||||
for _, groupID := range lst[i].GroupIds {
|
||||
groupToIdents[groupID] = append(groupToIdents[groupID], lst[i].Ident)
|
||||
}
|
||||
}
|
||||
|
||||
tc.Set(m, stat.Total, stat.LastUpdated)
|
||||
tc.Set(m, groupToIdents, stat.Total, stat.LastUpdated)
|
||||
|
||||
ms := time.Since(start).Milliseconds()
|
||||
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(float64(ms))
|
||||
@@ -292,3 +300,214 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo
|
||||
|
||||
return metaMap
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getAllHostIdentsWithoutLock() []string {
|
||||
var idents []string
|
||||
for ident, _ := range tc.targets {
|
||||
idents = append(idents, ident)
|
||||
}
|
||||
return idents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsByGroupIdsWithoutLock(groupIDs []int64) []string {
|
||||
var targetIdents []string
|
||||
for _, groupID := range groupIDs {
|
||||
if idents, has := tc.groupToIdents[groupID]; has {
|
||||
targetIdents = append(targetIdents, idents...)
|
||||
}
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsExcludeGroupIdsWithoutLock(groupIDs []int64) []string {
|
||||
var targetIdents []string
|
||||
exclude := make(map[string]struct{})
|
||||
for _, id := range groupIDs {
|
||||
if idents, has := tc.groupToIdents[id]; has {
|
||||
for _, ident := range idents {
|
||||
exclude[ident] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for ident, _ := range tc.targets {
|
||||
if _, ok := exclude[ident]; ok {
|
||||
continue
|
||||
}
|
||||
targetIdents = append(targetIdents, ident)
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostsByIdentsWithoutLock(idents []string) []*models.Target {
|
||||
var targets []*models.Target
|
||||
for _, ident := range idents {
|
||||
if target, has := tc.targets[ident]; has {
|
||||
targets = append(targets, target)
|
||||
}
|
||||
}
|
||||
return targets
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsExcludeIdentsWithoutLock(idents []string) []string {
|
||||
var targetIdents []string
|
||||
exclude := make(map[string]struct{})
|
||||
for _, id := range idents {
|
||||
exclude[id] = struct{}{}
|
||||
}
|
||||
|
||||
for ident, _ := range tc.targets {
|
||||
if _, ok := exclude[ident]; ok {
|
||||
continue
|
||||
}
|
||||
targetIdents = append(targetIdents, ident)
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsMatchIdentsWithoutLock(identPatterns []string) []string {
|
||||
var targetIdents []string
|
||||
for ident, _ := range tc.targets {
|
||||
for _, identPattern := range identPatterns {
|
||||
// 模糊匹配转正则
|
||||
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
|
||||
targetIdents = append(targetIdents, ident)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsByTagsWithoutLock(tags []string) []string {
|
||||
var targetIdents []string
|
||||
|
||||
tagMap := make(map[string]struct{})
|
||||
for _, tag := range tags {
|
||||
tagMap[tag] = struct{}{}
|
||||
}
|
||||
|
||||
for ident, target := range tc.targets {
|
||||
for _, tag := range target.TagsJSON {
|
||||
if _, ok := tagMap[tag]; ok {
|
||||
targetIdents = append(targetIdents, ident)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsExcludeTagsWithoutLock(tags []string) []string {
|
||||
var targetIdents []string
|
||||
|
||||
for ident, target := range tc.targets {
|
||||
exclude := false
|
||||
curTags := make(map[string]struct{})
|
||||
for _, tag := range target.TagsJSON {
|
||||
curTags[tag] = struct{}{}
|
||||
}
|
||||
for _, tag := range tags {
|
||||
if _, ok := curTags[tag]; ok {
|
||||
exclude = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !exclude {
|
||||
targetIdents = append(targetIdents, ident)
|
||||
}
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) getHostIdentsMatchExcludeIdentsWithoutLock(identPatterns []string) []string {
|
||||
var targetIdents []string
|
||||
exclude := make(map[string]struct{})
|
||||
for _, id := range identPatterns {
|
||||
exclude[id] = struct{}{}
|
||||
}
|
||||
|
||||
for ident, _ := range tc.targets {
|
||||
has := false
|
||||
for _, identPattern := range identPatterns {
|
||||
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
|
||||
has = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !has {
|
||||
targetIdents = append(targetIdents, ident)
|
||||
}
|
||||
}
|
||||
return targetIdents
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) GetHostIdentsQuery(queries []models.HostQuery) []*models.Target {
|
||||
tc.Lock()
|
||||
defer tc.Unlock()
|
||||
|
||||
targetIdents := tc.getAllHostIdentsWithoutLock()
|
||||
|
||||
for _, q := range queries {
|
||||
var cur []string
|
||||
switch q.Key {
|
||||
case "group_ids":
|
||||
ids := models.ParseInt64(q.Values)
|
||||
if q.Op == "==" {
|
||||
cur = tc.getHostIdentsByGroupIdsWithoutLock(ids)
|
||||
} else {
|
||||
cur = tc.getHostIdentsExcludeGroupIdsWithoutLock(ids)
|
||||
}
|
||||
case "tags":
|
||||
var tags []string
|
||||
for _, v := range q.Values {
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
tags = append(tags, v.(string))
|
||||
}
|
||||
if q.Op == "==" {
|
||||
cur = tc.getHostIdentsByTagsWithoutLock(tags)
|
||||
} else {
|
||||
cur = tc.getHostIdentsExcludeTagsWithoutLock(tags)
|
||||
}
|
||||
case "hosts":
|
||||
var idents []string
|
||||
for _, v := range q.Values {
|
||||
if v == nil {
|
||||
continue
|
||||
}
|
||||
idents = append(idents, v.(string))
|
||||
}
|
||||
if q.Op == "==" {
|
||||
cur = idents
|
||||
} else if q.Op == "!=" {
|
||||
cur = tc.getHostIdentsExcludeIdentsWithoutLock(idents)
|
||||
} else if q.Op == "=~" {
|
||||
cur = tc.getHostIdentsMatchIdentsWithoutLock(idents)
|
||||
} else if q.Op == "!~" {
|
||||
cur = tc.getHostIdentsMatchExcludeIdentsWithoutLock(idents)
|
||||
}
|
||||
default:
|
||||
// all_hosts 与其他未知条件不改变已有集合
|
||||
cur = targetIdents
|
||||
}
|
||||
targetIdents = intersection(targetIdents, cur)
|
||||
}
|
||||
return tc.getHostsByIdentsWithoutLock(targetIdents)
|
||||
}
|
||||
|
||||
func intersection(a, b []string) []string {
|
||||
m := make(map[string]struct{})
|
||||
for _, v := range a {
|
||||
m[v] = struct{}{}
|
||||
}
|
||||
|
||||
var c []string
|
||||
for _, v := range b {
|
||||
if _, ok := m[v]; ok {
|
||||
c = append(c, v)
|
||||
}
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -1231,7 +1231,7 @@ func AlertRuleUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string) (map[string]map[int64][]string, error) {
|
||||
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string, getTargetFunc func(queries []HostQuery) []*Target) (map[string]map[int64][]string, error) {
|
||||
if !ctx.IsCenter {
|
||||
m, err := poster.GetByUrls[map[string]map[int64][]string](ctx, "/v1/n9e/targets-of-alert-rule?engine_name="+engineName)
|
||||
return m, err
|
||||
@@ -1255,16 +1255,9 @@ func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string) (map[string]
|
||||
continue
|
||||
}
|
||||
|
||||
query := GetHostsQuery(rule.Queries)
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
var lst []*Target
|
||||
err := session.Find(&lst).Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to query targets: %v", err)
|
||||
continue
|
||||
}
|
||||
hosts := getTargetFunc(rule.Queries)
|
||||
|
||||
for _, target := range lst {
|
||||
for _, target := range hosts {
|
||||
if _, exists := m[target.EngineName]; !exists {
|
||||
m[target.EngineName] = make(map[int64][]string)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user