Compare commits

..

1 Commits

Author SHA1 Message Date
Xu Bin
c74014d53c refactor: global webhook add env proxy (#2367) 2024-12-20 14:20:52 +08:00
11 changed files with 78 additions and 311 deletions

View File

@@ -96,7 +96,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats, targetCache)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)

View File

@@ -118,7 +118,7 @@ func (s *Scheduler) syncAlertRules() {
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(s.ctx, rule, dsId, processor, s.promClients, s.tdengineClients, s.targetCache)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() {
@@ -127,7 +127,7 @@ func (s *Scheduler) syncAlertRules() {
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(s.ctx, rule, 0, processor, s.promClients, s.tdengineClients, s.targetCache)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
} else {
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule

View File

@@ -15,7 +15,6 @@ import (
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/hash"
@@ -47,9 +46,9 @@ type AlertRuleWorker struct {
Scheduler *cron.Cron
DeviceIdentHook func(paramQuery models.ParamQuery) ([]string, error)
HostAndDeviceIdentCache sync.Map
TargetCache *memsto.TargetCacheType
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
}
const (
@@ -68,20 +67,20 @@ const (
Inner JoinType = "inner"
)
func NewAlertRuleWorker(ctx *ctx.Context, rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, targetCache *memsto.TargetCacheType) *AlertRuleWorker {
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, ctx *ctx.Context) *AlertRuleWorker {
arw := &AlertRuleWorker{
DatasourceId: datasourceId,
Quit: make(chan struct{}),
Rule: rule,
Processor: Processor,
PromClients: promClients,
TdengineClients: tdengineClients,
Ctx: ctx,
DeviceIdentHook: func(paramQuery models.ParamQuery) ([]string, error) {
PromClients: promClients,
TdengineClients: tdengineClients,
Ctx: ctx,
HostAndDeviceIdentCache: sync.Map{},
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
return nil, nil
},
TargetCache: targetCache,
}
interval := rule.PromEvalInterval
@@ -149,6 +148,7 @@ func (arw *AlertRuleWorker) Eval() {
return
}
arw.Processor.Stats.CounterRuleEval.WithLabelValues().Inc()
arw.HostAndDeviceIdentCache = sync.Map{}
typ := cachedRule.GetRuleType()
var (
@@ -557,21 +557,35 @@ func (arw *AlertRuleWorker) getHostIdents(paramQuery models.ParamQuery) ([]strin
var params []string
q, _ := json.Marshal(paramQuery.Query)
cacheKey := "Host_" + string(q)
value, hit := arw.HostAndDeviceIdentCache.Load(cacheKey)
if idents, ok := value.([]string); hit && ok {
params = idents
return params, nil
}
var queries []models.HostQuery
err := json.Unmarshal(q, &queries)
if err != nil {
return nil, err
}
hosts := arw.TargetCache.GetHostIdentsQuery(queries)
for i := range hosts {
params = append(params, hosts[i].Ident)
hostsQuery := models.GetHostsQuery(queries)
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
var lst []*models.Target
err = session.Find(&lst).Error
if err != nil {
return nil, err
}
for i := range lst {
params = append(params, lst[i].Ident)
}
arw.HostAndDeviceIdentCache.Store(cacheKey, params)
return params, nil
}
func (arw *AlertRuleWorker) getDeviceIdents(paramQuery models.ParamQuery) ([]string, error) {
return arw.DeviceIdentHook(paramQuery)
return arw.DeviceIdentHook(arw, paramQuery)
}
// 生成所有排列组合

View File

@@ -28,19 +28,17 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
var f TargetQuery
ginx.BindJSON(c, &f)
// todo 这里也走缓存吗?有 limit 和 offset
hosts := rt.TargetCache.GetHostIdentsQuery(f.Filters)
//query := models.GetHostsQuery(f.Filters)
//
//hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
//ginx.Dangerous(err)
//
//total, err := models.TargetCountByFilter(rt.Ctx, query)
//ginx.Dangerous(err)
query := models.GetHostsQuery(f.Filters)
hosts, err := models.TargetGetsByFilter(rt.Ctx, query, f.Limit, (f.P-1)*f.Limit)
ginx.Dangerous(err)
total, err := models.TargetCountByFilter(rt.Ctx, query)
ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
"list": hosts,
"total": len(hosts),
"total": total,
}, nil)
}
@@ -539,7 +537,7 @@ func (rt *Router) checkTargetPerm(c *gin.Context, idents []string) {
func (rt *Router) targetsOfAlertRule(c *gin.Context) {
engineName := ginx.QueryStr(c, "engine_name", "")
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName, rt.TargetCache.GetHostIdentsQuery)
m, err := models.GetTargetsOfHostAlertRule(rt.Ctx, engineName)
ret := make(map[string]map[int64][]string)
for en, v := range m {
if en != engineName {

View File

@@ -19,11 +19,9 @@ type TargetsOfAlertRuleCacheType struct {
sync.RWMutex
targets map[string]map[int64][]string // key: ident
targetCache *TargetCacheType
}
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats, targetCache *TargetCacheType) *TargetsOfAlertRuleCacheType {
func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats) *TargetsOfAlertRuleCacheType {
tc := &TargetsOfAlertRuleCacheType{
statTotal: -1,
statLastUpdated: -1,
@@ -31,7 +29,6 @@ func NewTargetOfAlertRuleCache(ctx *ctx.Context, engineName string, stats *Stats
engineName: engineName,
stats: stats,
targets: make(map[string]map[int64][]string),
targetCache: targetCache,
}
tc.SyncTargets()
@@ -89,7 +86,7 @@ func (tc *TargetsOfAlertRuleCacheType) loopSyncTargets() {
}
func (tc *TargetsOfAlertRuleCacheType) syncTargets() error {
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName, tc.targetCache.GetHostIdentsQuery)
m, err := models.GetTargetsOfHostAlertRule(tc.ctx, tc.engineName)
if err != nil {
return err
}

View File

@@ -5,8 +5,6 @@ import (
"encoding/json"
"log"
"math"
"regexp"
"strings"
"sync"
"time"
@@ -29,8 +27,7 @@ type TargetCacheType struct {
redis storage.Redis
sync.RWMutex
targets map[string]*models.Target // key: ident
groupToIdents map[int64][]string // key: group_id
targets map[string]*models.Target // key: ident
}
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
@@ -64,10 +61,9 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
return true
}
func (tc *TargetCacheType) Set(m map[string]*models.Target, groupToIdents map[int64][]string, total, lastUpdated int64) {
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
tc.Lock()
tc.targets = m
tc.groupToIdents = groupToIdents
tc.Unlock()
// only one goroutine used, so no need lock
@@ -164,7 +160,6 @@ func (tc *TargetCacheType) syncTargets() error {
}
m := make(map[string]*models.Target)
groupToIdents := make(map[int64][]string)
metaMap := tc.GetHostMetas(lst)
if len(metaMap) > 0 {
@@ -177,12 +172,9 @@ func (tc *TargetCacheType) syncTargets() error {
for i := 0; i < len(lst); i++ {
m[lst[i].Ident] = lst[i]
for _, groupID := range lst[i].GroupIds {
groupToIdents[groupID] = append(groupToIdents[groupID], lst[i].Ident)
}
}
tc.Set(m, groupToIdents, stat.Total, stat.LastUpdated)
tc.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(float64(ms))
@@ -300,214 +292,3 @@ func (tc *TargetCacheType) GetHostMetas(targets []*models.Target) map[string]*mo
return metaMap
}
func (tc *TargetCacheType) getAllHostIdentsWithoutLock() []string {
var idents []string
for ident, _ := range tc.targets {
idents = append(idents, ident)
}
return idents
}
func (tc *TargetCacheType) getHostIdentsByGroupIdsWithoutLock(groupIDs []int64) []string {
var targetIdents []string
for _, groupID := range groupIDs {
if idents, has := tc.groupToIdents[groupID]; has {
targetIdents = append(targetIdents, idents...)
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsExcludeGroupIdsWithoutLock(groupIDs []int64) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range groupIDs {
if idents, has := tc.groupToIdents[id]; has {
for _, ident := range idents {
exclude[ident] = struct{}{}
}
}
}
for ident, _ := range tc.targets {
if _, ok := exclude[ident]; ok {
continue
}
targetIdents = append(targetIdents, ident)
}
return targetIdents
}
func (tc *TargetCacheType) getHostsByIdentsWithoutLock(idents []string) []*models.Target {
var targets []*models.Target
for _, ident := range idents {
if target, has := tc.targets[ident]; has {
targets = append(targets, target)
}
}
return targets
}
func (tc *TargetCacheType) getHostIdentsExcludeIdentsWithoutLock(idents []string) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range idents {
exclude[id] = struct{}{}
}
for ident, _ := range tc.targets {
if _, ok := exclude[ident]; ok {
continue
}
targetIdents = append(targetIdents, ident)
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsMatchIdentsWithoutLock(identPatterns []string) []string {
var targetIdents []string
for ident, _ := range tc.targets {
for _, identPattern := range identPatterns {
// 模糊匹配转正则
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
targetIdents = append(targetIdents, ident)
break
}
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsByTagsWithoutLock(tags []string) []string {
var targetIdents []string
tagMap := make(map[string]struct{})
for _, tag := range tags {
tagMap[tag] = struct{}{}
}
for ident, target := range tc.targets {
for _, tag := range target.TagsJSON {
if _, ok := tagMap[tag]; ok {
targetIdents = append(targetIdents, ident)
break
}
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsExcludeTagsWithoutLock(tags []string) []string {
var targetIdents []string
for ident, target := range tc.targets {
exclude := false
curTags := make(map[string]struct{})
for _, tag := range target.TagsJSON {
curTags[tag] = struct{}{}
}
for _, tag := range tags {
if _, ok := curTags[tag]; ok {
exclude = true
break
}
}
if !exclude {
targetIdents = append(targetIdents, ident)
}
}
return targetIdents
}
func (tc *TargetCacheType) getHostIdentsMatchExcludeIdentsWithoutLock(identPatterns []string) []string {
var targetIdents []string
exclude := make(map[string]struct{})
for _, id := range identPatterns {
exclude[id] = struct{}{}
}
for ident, _ := range tc.targets {
has := false
for _, identPattern := range identPatterns {
if ok, _ := regexp.Match(strings.Replace(identPattern, "*", ".*", -1), []byte(ident)); ok {
has = true
break
}
}
if !has {
targetIdents = append(targetIdents, ident)
}
}
return targetIdents
}
func (tc *TargetCacheType) GetHostIdentsQuery(queries []models.HostQuery) []*models.Target {
tc.Lock()
defer tc.Unlock()
targetIdents := tc.getAllHostIdentsWithoutLock()
for _, q := range queries {
var cur []string
switch q.Key {
case "group_ids":
ids := models.ParseInt64(q.Values)
if q.Op == "==" {
cur = tc.getHostIdentsByGroupIdsWithoutLock(ids)
} else {
cur = tc.getHostIdentsExcludeGroupIdsWithoutLock(ids)
}
case "tags":
var tags []string
for _, v := range q.Values {
if v == nil {
continue
}
tags = append(tags, v.(string))
}
if q.Op == "==" {
cur = tc.getHostIdentsByTagsWithoutLock(tags)
} else {
cur = tc.getHostIdentsExcludeTagsWithoutLock(tags)
}
case "hosts":
var idents []string
for _, v := range q.Values {
if v == nil {
continue
}
idents = append(idents, v.(string))
}
if q.Op == "==" {
cur = idents
} else if q.Op == "!=" {
cur = tc.getHostIdentsExcludeIdentsWithoutLock(idents)
} else if q.Op == "=~" {
cur = tc.getHostIdentsMatchIdentsWithoutLock(idents)
} else if q.Op == "!~" {
cur = tc.getHostIdentsMatchExcludeIdentsWithoutLock(idents)
}
default:
// all_hosts 与其他未知条件不改变已有集合
cur = targetIdents
}
targetIdents = intersection(targetIdents, cur)
}
return tc.getHostsByIdentsWithoutLock(targetIdents)
}
func intersection(a, b []string) []string {
m := make(map[string]struct{})
for _, v := range a {
m[v] = struct{}{}
}
var c []string
for _, v := range b {
if _, ok := m[v]; ok {
c = append(c, v)
}
}
return c
}

View File

@@ -1231,7 +1231,7 @@ func AlertRuleUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error {
return nil
}
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string, getTargetFunc func(queries []HostQuery) []*Target) (map[string]map[int64][]string, error) {
func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string) (map[string]map[int64][]string, error) {
if !ctx.IsCenter {
m, err := poster.GetByUrls[map[string]map[int64][]string](ctx, "/v1/n9e/targets-of-alert-rule?engine_name="+engineName)
return m, err
@@ -1255,9 +1255,16 @@ func GetTargetsOfHostAlertRule(ctx *ctx.Context, engineName string, getTargetFun
continue
}
hosts := getTargetFunc(rule.Queries)
query := GetHostsQuery(rule.Queries)
session := TargetFilterQueryBuild(ctx, query, 0, 0)
var lst []*Target
err := session.Find(&lst).Error
if err != nil {
logger.Errorf("failed to query targets: %v", err)
continue
}
for _, target := range hosts {
for _, target := range lst {
if _, exists := m[target.EngineName]; !exists {
m[target.EngineName] = make(map[int64][]string)
}

View File

@@ -115,74 +115,53 @@ func BusiGroupExists(ctx *ctx.Context, where string, args ...interface{}) (bool,
return num > 0, err
}
// RegisterGroupDelCheckEntries 提供给外部注册删除 group 时需要检查的表
func RegisterGroupDelCheckEntries(e []CheckEntry) {
entries = append(entries, e...)
}
type CheckEntry struct {
Entry interface{}
ErrorMessage string
FieldName string
}
var entries = []CheckEntry{
var entries = []struct {
entry interface{}
errorMessage string
}{
{
Entry: &AlertRule{},
ErrorMessage: "Some alert rules still in the BusiGroup",
FieldName: "group_id",
entry: &AlertRule{},
errorMessage: "Some alert rules still in the BusiGroup",
},
{
Entry: &AlertMute{},
ErrorMessage: "Some alert mutes still in the BusiGroup",
FieldName: "group_id",
entry: &AlertMute{},
errorMessage: "Some alert mutes still in the BusiGroup",
},
{
Entry: &AlertSubscribe{},
ErrorMessage: "Some alert subscribes still in the BusiGroup",
FieldName: "group_id",
entry: &AlertSubscribe{},
errorMessage: "Some alert subscribes still in the BusiGroup",
},
{
Entry: &Board{},
ErrorMessage: "Some Board still in the BusiGroup",
FieldName: "group_id",
entry: &Target{},
errorMessage: "Some targets still in the BusiGroup",
},
{
Entry: &Target{},
ErrorMessage: "Some targets still in the BusiGroup",
FieldName: "group_id",
entry: &RecordingRule{},
errorMessage: "Some recording rules still in the BusiGroup",
},
{
Entry: &RecordingRule{},
ErrorMessage: "Some recording rules still in the BusiGroup",
FieldName: "group_id",
entry: &TaskTpl{},
errorMessage: "Some recovery scripts still in the BusiGroup",
},
{
Entry: &TaskTpl{},
ErrorMessage: "Some recovery scripts still in the BusiGroup",
FieldName: "group_id",
entry: &TaskRecord{},
errorMessage: "Some Task Record records still in the BusiGroup",
},
{
Entry: &TaskRecord{},
ErrorMessage: "Some Task Record records still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TargetBusiGroup{},
ErrorMessage: "Some target busigroups still in the BusiGroup",
FieldName: "group_id",
entry: &TargetBusiGroup{},
errorMessage: "Some target busigroups still in the BusiGroup",
},
}
func (bg *BusiGroup) Del(ctx *ctx.Context) error {
for _, e := range entries {
has, err := Exists(DB(ctx).Model(e.Entry).Where(fmt.Sprintf("%s=?", e.FieldName), bg.Id))
has, err := Exists(DB(ctx).Model(e.entry).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New(e.ErrorMessage)
return errors.New(e.errorMessage)
}
}

View File

@@ -20,7 +20,6 @@ type EsIndexPattern struct {
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
CrossClusterEnabled int `json:"cross_cluster_enabled"`
}
func (t *EsIndexPattern) TableName() string {

View File

@@ -67,7 +67,7 @@ func MigrateTables(db *gorm.DB) error {
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.NotificaitonRecord{},
&models.TargetBusiGroup{}, &EsIndexPatternMigrate{}}
&models.TargetBusiGroup{}}
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})
@@ -319,11 +319,3 @@ type TaskHostDoing struct {
func (TaskHostDoing) TableName() string {
return "task_host_doing"
}
type EsIndexPatternMigrate struct {
CrossClusterEnabled int `gorm:"column:cross_cluster_enabled;type:int;default:0"`
}
func (EsIndexPatternMigrate) TableName() string {
return "es_index_pattern"
}

View File

@@ -114,7 +114,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
var (
ignoreIdent = ginx.QueryBool(c, "ignore_ident", false)
ignoreHost = ginx.QueryBool(c, "ignore_host", true) // 默认值改成 true要不然答疑成本太高。发版的时候通知 telegraf 用户,让他们设置 ignore_host=false
ignoreHost = ginx.QueryBool(c, "ignore_host", false)
ids = make(map[string]struct{})
)