Compare commits

...

10 Commits

Author SHA1 Message Date
ning
f5ef0f07be add validateCronPattern 2025-02-17 10:59:07 +08:00
Yening Qin
80b31c0e70 add DisableNotifyRecord (#2448) 2025-01-24 16:19:59 +08:00
ning
9677c42385 update toolkits/pkg version 2025-01-16 16:26:43 +08:00
Yening Qin
a66f98ed8b refactor: event set AnnotationsJSON 2024-12-29 10:22:15 +08:00
Yening Qin
41a72a1de8 fix: the dedup logic when adding tags to target (#2387)
Co-authored-by: flashbo <36443248+lwb0214@users.noreply.github.com>
2024-12-24 11:49:59 +08:00
Yening Qin
39cb026b10 feat: support webhook proxy configuration (#2376) 2024-12-20 14:36:24 +08:00
Yening Qin
e0eb7e72d4 refactor: es_index_pattern add cross_cluster_enabled (#2371) 2024-12-19 14:11:51 +08:00
Yening Qin
33118c53c7 refactor: group delete check (#2369) 2024-12-18 17:52:40 +08:00
Yening Qin
54732cf387 fix: alert rule with var (#2358) 2024-12-12 16:59:41 +08:00
Yening Qin
90fb400a10 fix: alert rule with var (#2355) 2024-12-12 13:20:50 +08:00
20 changed files with 182 additions and 98 deletions

View File

@@ -29,10 +29,11 @@ type HeartbeatConfig struct {
}
type Alerting struct {
Timeout int64
TemplatesDir string
NotifyConcurrency int
WebhookBatchSend bool
Timeout int64
TemplatesDir string
NotifyConcurrency int
WebhookBatchSend bool
DisableNotifyRecord bool
}
type CallPlugin struct {

View File

@@ -41,7 +41,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
ctx := ctx.NewContext(context.Background(), nil, false, config.Alert.Alerting.DisableNotifyRecord, config.CenterApi)
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)

View File

@@ -11,6 +11,7 @@ const (
type Stats struct {
AlertNotifyTotal *prometheus.CounterVec
CounterConsumeAlertsTotal *prometheus.CounterVec
AlertNotifyErrorTotal *prometheus.CounterVec
CounterAlertsTotal *prometheus.CounterVec
GaugeAlertQueueSize prometheus.Gauge
@@ -90,6 +91,13 @@ func NewSyncStats() *Stats {
Help: "Total number alert events.",
}, []string{"cluster", "type", "busi_group"})
CounterConsumeAlertsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "consume_alerts_total",
Help: "Total number alert events consume.",
}, []string{"cluster", "type", "busi_group"})
// 内存中的告警事件队列的长度
GaugeAlertQueueSize := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
@@ -121,6 +129,7 @@ func NewSyncStats() *Stats {
prometheus.MustRegister(
CounterAlertsTotal,
CounterConsumeAlertsTotal,
GaugeAlertQueueSize,
AlertNotifyTotal,
AlertNotifyErrorTotal,
@@ -137,6 +146,7 @@ func NewSyncStats() *Stats {
return &Stats{
CounterAlertsTotal: CounterAlertsTotal,
CounterConsumeAlertsTotal: CounterConsumeAlertsTotal,
GaugeAlertQueueSize: GaugeAlertQueueSize,
AlertNotifyTotal: AlertNotifyTotal,
AlertNotifyErrorTotal: AlertNotifyErrorTotal,

View File

@@ -88,7 +88,7 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
eventType = "recovery"
}
e.dispatch.Astats.CounterAlertsTotal.WithLabelValues(event.Cluster, eventType, event.GroupName).Inc()
e.dispatch.Astats.CounterConsumeAlertsTotal.WithLabelValues(event.Cluster, eventType, event.GroupName).Inc()
if err := event.ParseRule("rule_name"); err != nil {
logger.Warningf("ruleid:%d failed to parse rule name: %v", event.RuleId, err)

View File

@@ -252,8 +252,8 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]models.Ano
if query.VarEnabled {
var anomalyPoints []models.AnomalyPoint
if hasLabelLossAggregator(query) {
// 若有聚合函数则需要先填充变量然后查询,这个方式效率较低
if hasLabelLossAggregator(query) || notExactMatch(query) {
// 若有聚合函数或非精确匹配则需要先填充变量然后查询,这个方式效率较低
anomalyPoints = arw.VarFillingBeforeQuery(query, readerClient)
} else {
// 先查询再过滤变量,效率较高,但无法处理有聚合函数的情况
@@ -538,6 +538,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
return nil, fmt.Errorf("param key: %s, params is empty", paramKey)
}
logger.Infof("rule_eval:%s paramKey: %s, params: %v", arw.Key(), paramKey, params)
paramMap[paramKey] = params
}
@@ -546,7 +547,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
res := make(map[string]struct{})
for i := range permutation {
res[strings.Join(permutation[i], "-")] = struct{}{}
res[strings.Join(permutation[i], "@@")] = struct{}{}
}
return res, nil
@@ -1280,7 +1281,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
keyToPromql := make(map[string]string)
for paramPermutationKeys, _ := range paramPermutation {
realPromql := curPromql
split := strings.Split(paramPermutationKeys, "-")
split := strings.Split(paramPermutationKeys, "@@")
for j := range ParamKeys {
realPromql = fillVar(realPromql, ParamKeys[j], split[j])
}
@@ -1303,6 +1304,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
logger.Errorf("rule_eval:%s, promql:%s, error:%v", arw.Key(), promql, err)
return
}
logger.Infof("rule_eval:%s, promql:%s, value:%+v", arw.Key(), promql, value)
points := models.ConvertAnomalyPoints(value)
if len(points) == 0 {
@@ -1353,6 +1355,15 @@ func hasLabelLossAggregator(query models.PromQuery) bool {
return false
}
// 判断 query 中是否有 != =~ !~
func notExactMatch(query models.PromQuery) bool {
promql := strings.ToLower(query.PromQl)
if strings.Contains(promql, "!=") || strings.Contains(promql, "=~") || strings.Contains(promql, "!~") {
return true
}
return false
}
// ExtractVarMapping 从 promql 中提取变量映射关系,为了在 query 之后可以将标签正确的放回 promql
// 输入: sum(rate(mem_used_percent{host="$my_host"})) by (instance) + avg(node_load1{region="$region"}) > $val
// 输出: map[string]string{"my_host":"host", "region":"region"}

View File

@@ -2,6 +2,7 @@ package process
import (
"bytes"
"encoding/json"
"fmt"
"html/template"
"sort"
@@ -215,7 +216,6 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
event.Callbacks = p.rule.Callbacks
event.CallbacksJSON = p.rule.CallbacksJSON
event.Annotations = p.rule.Annotations
event.AnnotationsJSON = make(map[string]string)
event.RuleConfig = p.rule.RuleConfig
event.RuleConfigJson = p.rule.RuleConfigJson
event.Severity = anomalyPoint.Severity
@@ -224,6 +224,11 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
event.RecoverConfig = anomalyPoint.RecoverConfig
event.RuleHash = ruleHash
if err := json.Unmarshal([]byte(p.rule.Annotations), &event.AnnotationsJSON); err != nil {
event.AnnotationsJSON = make(map[string]string) // 解析失败时使用空 map
logger.Warningf("unmarshal annotations json failed:%v, rule:%d annotations:%s", err, p.rule.Id, p.rule.Annotations)
}
if p.target != "" {
if pt, exist := p.TargetCache.Get(p.target); exist {
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
@@ -517,6 +522,14 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
}
dispatch.LogEvent(e, "push_queue")
eventType := "alert"
if e.IsRecovered {
eventType = "recovery"
}
p.Stats.CounterAlertsTotal.WithLabelValues(e.Cluster, eventType, e.GroupName).Inc()
if !queue.EventQueue.PushFront(e) {
logger.Warningf("event_push_queue: queue is full, event:%+v", e)
p.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", p.DatasourceId()), "push_event_queue", p.BusiGroupCache.GetNameByBusiGroupId(p.rule.GroupId), fmt.Sprintf("%v", p.rule.Id)).Inc()

View File

@@ -139,6 +139,10 @@ func doSendAndRecord(ctx *ctx.Context, url, token string, body interface{}, chan
}
func NotifyRecord(ctx *ctx.Context, evts []*models.AlertCurEvent, channel, target, res string, err error) {
if ctx.DisableNotifyRecord {
return
}
// 一个通知可能对应多个 event都需要记录
notis := make([]*models.NotificaitonRecord, 0, len(evts))
for _, evt := range evts {

View File

@@ -13,6 +13,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
@@ -59,11 +60,17 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
if webhook != nil {
insecureSkipVerify = webhook.SkipVerify
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
}
if poster.UseProxy(conf.Url) {
transport.Proxy = http.ProxyFromEnvironment
}
client := http.Client{
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: transport,
}
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()

View File

@@ -66,7 +66,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db, true)
ctx := ctx.NewContext(context.Background(), db, true, config.Alert.Alerting.DisableNotifyRecord)
migrate.Migrate(db)
isRootInit := models.InitRoot(ctx)

View File

@@ -272,11 +272,9 @@ func (rt *Router) validateTags(tags []string) error {
}
func (rt *Router) addTagsToTarget(target *models.Target, tags []string) error {
hostTagsMap := target.GetHostTagsMap()
for _, tag := range tags {
tagKey := strings.Split(tag, "=")[0]
if _, ok := hostTagsMap[tagKey]; ok ||
strings.Contains(target.Tags, tagKey+"=") {
if _, exist := target.TagsMap[tagKey]; exist {
return fmt.Errorf("duplicate tagkey(%s)", tagKey)
}
}

View File

@@ -18,7 +18,7 @@ func Upgrade(configFile string) error {
return err
}
ctx := ctx.NewContext(context.Background(), db, true)
ctx := ctx.NewContext(context.Background(), db, true, false)
for _, cluster := range config.Clusters {
count, err := models.GetDatasourcesCountByName(ctx, cluster.Name)
if err != nil {

View File

@@ -41,7 +41,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if len(config.CenterApi.Addrs) < 1 {
return nil, errors.New("failed to init config: the CenterApi configuration is missing")
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
ctx := ctx.NewContext(context.Background(), nil, false, config.Alert.Alerting.DisableNotifyRecord, config.CenterApi)
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)

2
go.mod
View File

@@ -34,7 +34,7 @@ require (
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.8
github.com/toolkits/pkg v1.3.9
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/oauth2 v0.10.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df

4
go.sum
View File

@@ -321,8 +321,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/toolkits/pkg v1.3.8 h1:2yamC20c5mHRtbcGiLY99Lm/2mVitFn6onE8KKvMT1o=
github.com/toolkits/pkg v1.3.8/go.mod h1:M9ecwFGW1vxCTUFM9sr2ZjXSKb04N+1sTQ6SA3RNAIU=
github.com/toolkits/pkg v1.3.9 h1:ua4kVPNgsugBtZcd7kepbkuXoszzkifsG7P+Ku4nYVk=
github.com/toolkits/pkg v1.3.9/go.mod h1:M9ecwFGW1vxCTUFM9sr2ZjXSKb04N+1sTQ6SA3RNAIU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/robfig/cron/v3"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
@@ -493,6 +494,27 @@ func (ar *AlertRule) Verify() error {
}
}
if err := ar.validateCronPattern(); err != nil {
return err
}
return nil
}
func (ar *AlertRule) validateCronPattern() error {
if ar.CronPattern == "" {
return nil
}
// 创建一个临时的 cron scheduler 来验证表达式
scheduler := cron.New(cron.WithSeconds())
// 尝试添加一个空函数来验证 cron 表达式
_, err := scheduler.AddFunc(ar.CronPattern, func() {})
if err != nil {
return fmt.Errorf("invalid cron pattern: %s, error: %v", ar.CronPattern, err)
}
return nil
}

View File

@@ -115,68 +115,75 @@ func BusiGroupExists(ctx *ctx.Context, where string, args ...interface{}) (bool,
return num > 0, err
}
// RegisterGroupDelCheckEntries 提供给外部注册删除 group 时需要检查的表
func RegisterGroupDelCheckEntries(e []CheckEntry) {
entries = append(entries, e...)
}
type CheckEntry struct {
Entry interface{}
ErrorMessage string
FieldName string
}
var entries = []CheckEntry{
{
Entry: &AlertRule{},
ErrorMessage: "Some alert rules still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &AlertMute{},
ErrorMessage: "Some alert mutes still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &AlertSubscribe{},
ErrorMessage: "Some alert subscribes still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &Board{},
ErrorMessage: "Some Board still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &Target{},
ErrorMessage: "Some targets still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &RecordingRule{},
ErrorMessage: "Some recording rules still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TaskTpl{},
ErrorMessage: "Some recovery scripts still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TaskRecord{},
ErrorMessage: "Some Task Record records still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TargetBusiGroup{},
ErrorMessage: "Some target busigroups still in the BusiGroup",
FieldName: "group_id",
},
}
func (bg *BusiGroup) Del(ctx *ctx.Context) error {
has, err := Exists(DB(ctx).Model(&AlertMute{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
for _, e := range entries {
has, err := Exists(DB(ctx).Model(e.Entry).Where(fmt.Sprintf("%s=?", e.FieldName), bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some alert mutes still in the BusiGroup")
}
has, err = Exists(DB(ctx).Model(&AlertSubscribe{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some alert subscribes still in the BusiGroup")
}
has, err = Exists(DB(ctx).Model(&TargetBusiGroup{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some targets still in the BusiGroup")
}
has, err = Exists(DB(ctx).Model(&Board{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some dashboards still in the BusiGroup")
}
has, err = Exists(DB(ctx).Model(&TaskTpl{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some recovery scripts still in the BusiGroup")
}
// hasCR, err := Exists(DB(ctx).Table("collect_rule").Where("group_id=?", bg.Id))
// if err != nil {
// return err
// }
// if hasCR {
// return errors.New("Some collect rules still in the BusiGroup")
// }
has, err = Exists(DB(ctx).Model(&AlertRule{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}
if has {
return errors.New("Some alert rules still in the BusiGroup")
if has {
return errors.New(e.ErrorMessage)
}
}
return DB(ctx).Transaction(func(tx *gorm.DB) error {

View File

@@ -20,6 +20,7 @@ type EsIndexPattern struct {
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
CrossClusterEnabled int `json:"cross_cluster_enabled"`
}
func (t *EsIndexPattern) TableName() string {

View File

@@ -67,7 +67,7 @@ func MigrateTables(db *gorm.DB) error {
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.NotificaitonRecord{},
&models.TargetBusiGroup{}}
&models.TargetBusiGroup{}, &EsIndexPatternMigrate{}}
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})
@@ -319,3 +319,11 @@ type TaskHostDoing struct {
func (TaskHostDoing) TableName() string {
return "task_host_doing"
}
type EsIndexPatternMigrate struct {
CrossClusterEnabled int `gorm:"column:cross_cluster_enabled;type:int;default:0"`
}
func (EsIndexPatternMigrate) TableName() string {
return "es_index_pattern"
}

View File

@@ -9,23 +9,25 @@ import (
)
type Context struct {
DB *gorm.DB
CenterApi conf.CenterApi
Ctx context.Context
IsCenter bool
DB *gorm.DB
CenterApi conf.CenterApi
Ctx context.Context
IsCenter bool
DisableNotifyRecord bool
}
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, centerApis ...conf.CenterApi) *Context {
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, disableNotifyRecord bool, centerApis ...conf.CenterApi) *Context {
var api conf.CenterApi
if len(centerApis) > 0 {
api = centerApis[0]
}
return &Context{
Ctx: ctx,
DB: db,
CenterApi: api,
IsCenter: isCenter,
Ctx: ctx,
DB: db,
CenterApi: api,
IsCenter: isCenter,
DisableNotifyRecord: disableNotifyRecord,
}
}

View File

@@ -63,7 +63,7 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
Timeout: time.Duration(cfg.Timeout) * time.Millisecond,
}
if useProxy(url) {
if UseProxy(url) {
client.Transport = ProxyTransporter
}
@@ -147,7 +147,7 @@ func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err e
Timeout: time.Duration(cfg.Timeout) * time.Millisecond,
}
if useProxy(url) {
if UseProxy(url) {
client.Transport = ProxyTransporter
}
@@ -195,7 +195,7 @@ var ProxyTransporter = &http.Transport{
Proxy: http.ProxyFromEnvironment,
}
func useProxy(url string) bool {
func UseProxy(url string) bool {
// N9E_PROXY_URL=oapi.dingtalk.com,feishu.com
patterns := os.Getenv("N9E_PROXY_URL")
if patterns != "" {
@@ -228,7 +228,7 @@ func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int)
Timeout: timeout,
}
if useProxy(url) {
if UseProxy(url) {
client.Transport = ProxyTransporter
}