Compare commits

...

1 Commits

Author SHA1 Message Date
Haobo Zhang
b2913a6f73 feat: record alert evaluation detail (#2777) 2025-07-10 20:37:19 +08:00
6 changed files with 391 additions and 9 deletions

View File

@@ -29,10 +29,12 @@ type HeartbeatConfig struct {
}
type Alerting struct {
Timeout int64
TemplatesDir string
NotifyConcurrency int
WebhookBatchSend bool
Timeout int64
TemplatesDir string
NotifyConcurrency int
WebhookBatchSend bool
StatusPageSingleQuota int
StatusPageTotalQuota int
}
type CallPlugin struct {
@@ -63,4 +65,12 @@ func (a *Alert) PreCheck(configDir string) {
if a.EngineDelay == 0 {
a.EngineDelay = 30
}
if a.Alerting.StatusPageSingleQuota == 0 {
a.Alerting.StatusPageSingleQuota = 10
}
if a.Alerting.StatusPageTotalQuota == 0 {
a.Alerting.StatusPageTotalQuota = 100000
}
}

View File

@@ -116,7 +116,7 @@ func (s *Scheduler) syncAlertRules() {
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx, s.aconf)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() {
@@ -125,7 +125,7 @@ func (s *Scheduler) syncAlertRules() {
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx, s.aconf)
alertRuleWorkers[alertRule.Hash()] = alertRule
} else {
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule

View File

@@ -11,12 +11,15 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/hash"
@@ -28,11 +31,18 @@ import (
"github.com/ccfos/nightingale/v6/prom"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/robfig/cron/v3"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
// Global variables for status data tracking
var (
// GlobalStatusDataTracker tracks stored status data entries using LRU cache
GlobalStatusDataTracker *memsto.AlertStatusLRUCache
)
type AlertRuleWorker struct {
DatasourceId int64
Quit chan struct{}
@@ -52,6 +62,8 @@ type AlertRuleWorker struct {
LastSeriesStore map[uint64]models.DataResp
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
StatusPageSingleQuota int
}
const (
@@ -74,7 +86,7 @@ const (
Inner JoinType = "inner"
)
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, ctx *ctx.Context) *AlertRuleWorker {
func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *process.Processor, promClients *prom.PromClientMap, ctx *ctx.Context, aconf aconf.Alert) *AlertRuleWorker {
arw := &AlertRuleWorker{
DatasourceId: datasourceId,
Quit: make(chan struct{}),
@@ -87,7 +99,8 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
return nil, nil
},
LastSeriesStore: make(map[uint64]models.DataResp),
LastSeriesStore: make(map[uint64]models.DataResp),
StatusPageSingleQuota: aconf.Alerting.StatusPageSingleQuota,
}
interval := rule.PromEvalInterval
@@ -112,6 +125,12 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
Processor.ScheduleEntry = arw.Scheduler.Entry(entryID)
Processor.PromEvalInterval = getPromEvalInterval(Processor.ScheduleEntry.Schedule)
// Initialize GlobalStatusDataTracker if not already initialized
if GlobalStatusDataTracker == nil {
GlobalStatusDataTracker = memsto.NewAlertStatusLRUCache(aconf.Alerting.StatusPageTotalQuota)
}
return arw
}
@@ -346,6 +365,28 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]models.Ano
logger.Infof("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)
points := models.ConvertAnomalyPoints(value)
// 存储状态页面数据
if len(points) > 0 {
for i, point := range points {
if arw.StatusPageSingleQuota < i {
break
}
var labels [][2]string
labels = append(labels, [2]string{"ref", fmt.Sprintf("%v", i)})
for k, v := range point.Labels {
if k != "__name__" {
labels = append(labels, [2]string{string(k), string(v)})
} else {
labels = append(labels, [2]string{"metric_name", string(v)})
}
}
arw.StoreStatusData(labels, int64(point.Timestamp), point.Value)
}
}
arw.Processor.Stats.GaugeQuerySeriesCount.WithLabelValues(
fmt.Sprintf("%v", arw.Rule.Id),
fmt.Sprintf("%v", arw.Processor.DatasourceId()),
@@ -787,7 +828,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
arw.Inhibit = rule.Inhibit
now := time.Now().Unix()
for _, trigger := range rule.Triggers {
for ref, trigger := range rule.Triggers {
switch trigger.Type {
case "target_miss":
t := now - int64(trigger.Duration)
@@ -826,6 +867,10 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
if updateTime < t {
missTargets = append(missTargets, ident)
}
// 存储状态页面数据
labels := [][2]string{{"ident", ident}, {"ref", fmt.Sprintf("%v", ref)}}
arw.StoreStatusData(labels, now, float64(now-updateTime))
}
arw.Processor.Stats.GaugeQuerySeriesCount.WithLabelValues(
fmt.Sprintf("%v", arw.Rule.Id),
@@ -878,6 +923,11 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
}
offset := meta.Offset
// 存储状态页面数据
labels := [][2]string{{"ident", ident}, {"ref", fmt.Sprintf("%v", ref)}}
arw.StoreStatusData(labels, now, float64(offset))
if math.Abs(float64(offset)) > float64(trigger.Duration) {
offsetIdents[ident] = offset
}
@@ -921,6 +971,10 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
if updateTime < t {
missTargets = append(missTargets, ident)
}
// 存储状态页面数据
labels := [][2]string{{"ident", ident}, {"ref", fmt.Sprintf("%v", ref)}}
arw.StoreStatusData(labels, now, float64(now-updateTime))
}
logger.Debugf("rule_eval:%s missTargets:%v", arw.Key(), missTargets)
arw.Processor.Stats.GaugeQuerySeriesCount.WithLabelValues(
@@ -1517,6 +1571,22 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
seriesTagIndex[tagHash] = make([]uint64, 0)
}
seriesTagIndex[tagHash] = append(seriesTagIndex[tagHash], serieHash)
// 存储状态页面数据
ts, value, exists := series[i].Last()
if exists && arw.StatusPageSingleQuota > i {
var labels [][2]string
labels = append(labels, [2]string{"ref", series[i].Ref})
for k, v := range series[i].Metric {
if k != "__name__" {
labels = append(labels, [2]string{string(k), string(v)})
} else {
labels = append(labels, [2]string{"metric_name", string(v)})
}
}
arw.StoreStatusData(labels, int64(ts), value)
}
}
ref, err := GetQueryRef(query)
if err != nil {
@@ -1699,3 +1769,87 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
return points, recoverPoints, nil
}
func (arw *AlertRuleWorker) StoreStatusData(labels [][2]string, timestamp int64, value float64) {
// 生成唯一的键用于跟踪
metric := make(model.Metric)
for _, label := range labels {
metric[model.LabelName(label[0])] = model.LabelValue(label[1])
}
entryKeyHash := hash.GetTagHash(metric)
// 将条目添加到跟踪器
if success := GlobalStatusDataTracker.Put(entryKeyHash); !success {
logger.Warningf("Failed to store status data entry in prometheus with labels: %v (reaching labels total quota limit)", labels)
return
}
DefaultPromDatasourceId := atomic.LoadInt64(&dscache.PromDefaultDatasourceId)
if DefaultPromDatasourceId == 0 {
logger.Errorf("error store status page data, default datasource id is 0")
return
}
writerClient := arw.PromClients.GetWriterCli(DefaultPromDatasourceId)
if writerClient.Client == nil {
logger.Errorf("error store status page data, prometheus client not found for datasource id: %d", DefaultPromDatasourceId)
return
}
// 使用当前时间作为写入 Prometheus 的时间戳
now := time.Now().Unix() * 1000
// 构造基础标签
baseLabels := make([]prompb.Label, 0, len(labels)+1)
for _, label := range labels {
baseLabels = append(baseLabels, prompb.Label{
Name: label[0],
Value: label[1],
})
}
var timeSeries []prompb.TimeSeries
// 创建存储原始时间戳的时间序列
timestampLabels := make([]prompb.Label, 0, len(baseLabels)+1)
timestampLabels = append(timestampLabels, prompb.Label{
Name: "__name__",
Value: fmt.Sprintf("n9e_alert_rule_%d_status_timestamp", arw.Rule.Id),
})
timestampLabels = append(timestampLabels, baseLabels...)
timestampSample := prompb.Sample{
Timestamp: now,
Value: float64(timestamp),
}
timestampTimeSeries := prompb.TimeSeries{
Labels: timestampLabels,
Samples: []prompb.Sample{timestampSample},
}
timeSeries = append(timeSeries, timestampTimeSeries)
// 创建存储原始值的时间序列
valueLabels := make([]prompb.Label, 0, len(baseLabels)+1)
valueLabels = append(valueLabels, prompb.Label{
Name: "__name__",
Value: fmt.Sprintf("n9e_alert_rule_%d_status_value", arw.Rule.Id),
})
valueLabels = append(valueLabels, baseLabels...)
valueSample := prompb.Sample{
Timestamp: now,
Value: value,
}
valueTimeSeries := prompb.TimeSeries{
Labels: valueLabels,
Samples: []prompb.Sample{valueSample},
}
timeSeries = append(timeSeries, valueTimeSeries)
// 写入数据到prometheus
err := writerClient.Write(timeSeries)
if err != nil {
logger.Errorf("error writing status page data to prometheus: %v with labels: %v", err, labels)
return
}
}

View File

@@ -411,6 +411,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/alert-cur-event/:eid", rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.notificationRecordList)
pages.POST("/alert-status", rt.QueryAlertStatus)
// card logic
pages.GET("/alert-cur-events/list", rt.auth(), rt.user(), rt.alertCurEventsList)

View File

@@ -0,0 +1,104 @@
package router
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/ccfos/nightingale/v6/dscache"
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
type AlertStatusQuery struct {
RuleId int64 `json:"rule_id"`
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Step int64 `json:"step"`
Labels string `json:"labels,omitempty"` // 可选的标签过滤
}
func (rt *Router) QueryAlertStatus(c *gin.Context) {
var query AlertStatusQuery
ginx.BindJSON(c, &query)
DefaultPromDatasourceId := atomic.LoadInt64(&dscache.PromDefaultDatasourceId)
if DefaultPromDatasourceId == 0 {
ginx.NewRender(c).Data(map[string]interface{}{
"value": nil,
"timestamp": nil,
}, nil)
return
}
readerClient := rt.PromClients.GetCli(DefaultPromDatasourceId)
if readerClient == nil {
ginx.NewRender(c).Data(map[string]interface{}{
"value": nil,
"timestamp": nil,
}, nil)
return
}
// 构建 PromQL 查询语句
valueMetricName := fmt.Sprintf("n9e_alert_rule_%d_status_value", query.RuleId)
timestampMetricName := fmt.Sprintf("n9e_alert_rule_%d_status_timestamp", query.RuleId)
valuePromql := valueMetricName
timestampPromql := timestampMetricName
// 如果有标签过滤,添加到查询中
if query.Labels != "" {
valuePromql = fmt.Sprintf("%s{%s}", valueMetricName, query.Labels)
timestampPromql = fmt.Sprintf("%s{%s}", timestampMetricName, query.Labels)
}
logger.Infof("Querying alert status: %s & %s, time: %v - %v", valuePromql, timestampPromql, query.StartTime, query.EndTime)
// 使用当前时间进行查询(获取最新状态)
value, warnings, err := readerClient.QueryRange(context.Background(), valuePromql, promsdk.Range{
Start: time.Unix(query.StartTime, 0),
End: time.Unix(query.EndTime, 0),
Step: time.Duration(query.Step) * time.Second,
})
if err != nil {
ginx.NewRender(c).Data(map[string]interface{}{
"value": nil,
"timestamp": nil,
}, nil)
logger.Errorf("Query value error: %v with query: %s", err, valuePromql)
return
}
if len(warnings) > 0 {
logger.Warningf("Query warnings: %v with query: %s", warnings, valuePromql)
}
timestamp, warnings, err := readerClient.QueryRange(context.Background(), timestampPromql, promsdk.Range{
Start: time.Unix(query.StartTime, 0),
End: time.Unix(query.EndTime, 0),
Step: time.Duration(query.Step) * time.Second,
})
if err != nil {
ginx.NewRender(c).Data(map[string]interface{}{
"value": nil,
"timestamp": nil,
}, nil)
logger.Errorf("Query timestamp error: %v with query: %s", err, timestampPromql)
return
}
if len(warnings) > 0 {
logger.Warningf("Query warnings: %v with query: %s & %s", warnings, valuePromql, timestampPromql)
}
ginx.NewRender(c).Data(map[string]interface{}{
"value": value,
"timestamp": timestamp,
}, nil)
}

View File

@@ -0,0 +1,113 @@
package memsto
import (
"sync"
"time"
)
// LRUNode represents a node in the doubly linked list
type LRUNode struct {
Key uint64
Timestamp int64
Prev *LRUNode
Next *LRUNode
}
// AlertStatusLRUCache implements a thread-safe LRU cache for check repeat alert status
type AlertStatusLRUCache struct {
capacity int
size int
cache map[uint64]*LRUNode
head *LRUNode
tail *LRUNode
mutex sync.RWMutex
}
// NewAlertStatusLRUCache creates a new LRU cache with the specified capacity
func NewAlertStatusLRUCache(capacity int) *AlertStatusLRUCache {
cache := &AlertStatusLRUCache{
capacity: capacity,
cache: make(map[uint64]*LRUNode),
head: &LRUNode{},
tail: &LRUNode{},
mutex: sync.RWMutex{},
}
// Initialize dummy head and tail nodes
cache.head.Next = cache.tail
cache.tail.Prev = cache.head
return cache
}
func (lru *AlertStatusLRUCache) checkNodeExpired(node *LRUNode) bool {
return node.Timestamp < time.Now().Unix()-86400
}
// addNode adds a node right after head
func (lru *AlertStatusLRUCache) addNode(node *LRUNode) {
node.Prev = lru.head
node.Next = lru.head.Next
lru.head.Next.Prev = node
lru.head.Next = node
}
// removeNode removes an existing node from the linked list
func (lru *AlertStatusLRUCache) removeNode(node *LRUNode) {
prevNode := node.Prev
newNode := node.Next
prevNode.Next = newNode
newNode.Prev = prevNode
}
// moveToHead moves a node to the head and update timestamp
func (lru *AlertStatusLRUCache) moveToHead(node *LRUNode) {
lru.removeNode(node)
node.Timestamp = time.Now().Unix()
lru.addNode(node)
}
// popTail remove tail node if expired
func (lru *AlertStatusLRUCache) popTail() *LRUNode {
if lru.checkNodeExpired(lru.tail.Prev) {
lastNode := lru.tail.Prev
lru.removeNode(lastNode)
return lastNode
}
return nil
}
// Put adds or updates an entry in the cache and show success or not
func (lru *AlertStatusLRUCache) Put(key uint64) bool {
lru.mutex.Lock()
defer lru.mutex.Unlock()
if node, exists := lru.cache[key]; exists {
// Update existing node
lru.moveToHead(node)
return true
}
if lru.size == lru.capacity {
// Remove the least recently used node
if tail := lru.popTail(); tail != nil {
delete(lru.cache, tail.Key)
lru.size--
} else {
return false
}
}
// Add new node
newNode := &LRUNode{
Key: key,
Timestamp: time.Now().Unix(),
}
lru.cache[key] = newNode
lru.addNode(newNode)
lru.size++
return true
}