Compare commits

...

1 Commits

Author SHA1 Message Date
ning
9867a9519c fix: alert with var 2025-11-29 23:40:16 +08:00

View File

@@ -386,10 +386,21 @@ type sample struct {
// 每个节点先查询无参数的 query, 即 mem_used_percent{} > curVal, 得到满足值变量的所有结果
// 结果中有满足本节点参数变量的值,加入异常点列表
// 参数变量的值不满足的组合,需要覆盖上层筛选中产生的异常点
// VarFillingAfterQuery 先查询再过滤变量,效率较高,但无法处理有聚合函数导致标签丢失的情况
//
// 修复说明 (Issue #2971):
// 原实现中使用参数变量组合作为 key 存储异常点,导致同一参数值下的多条时序数据互相覆盖。
// 修复方案:
// 1. 同一层内:使用完整的标签 hash 作为 key避免不同时序数据覆盖
// 2. 跨层级时:子层按参数变量组合前缀删除父层的所有相关告警,实现子筛选覆盖父筛选
func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerClient promsdk.API) []models.AnomalyPoint {
varToLabel := ExtractVarMapping(query.PromQl)
fullQuery := removeVal(query.PromQl)
// 存储所有的异常点key 为参数变量的组合,可以实现子筛选对上一层筛选的覆盖
// 存储所有的异常点
// key 格式: {参数变量组合}@@{标签hash}
// 这样可以:
// 1. 同层内不同时序数据有不同的 key标签hash不同
// 2. 跨层时可以按参数变量组合前缀删除父层的告警
anomalyPointsMap := make(map[string]models.AnomalyPoint)
// 统一变量配置格式
VarConfigForCalc := &models.ChildVarConfig{
@@ -416,7 +427,16 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
})
// 遍历变量配置链表
curNode := VarConfigForCalc
isFirstLayer := true
for curNode != nil {
// 当前层收集到的所有异常点,按参数组合分组
// key: 参数变量组合, value: 该组合下的所有异常点及其完整key
type pointWithKey struct {
point models.AnomalyPoint
fullKey string
}
currentLayerPointsByParam := make(map[string][]pointWithKey)
for _, param := range curNode.ParamVal {
// curQuery 当前节点的无参数 query用于时序库查询
curQuery := fullQuery
@@ -458,8 +478,13 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
curRealQuery = fillVar(curRealQuery, paramKey, val)
}
if _, ok := paramPermutation[strings.Join(cur, JoinMark)]; ok {
anomalyPointsMap[strings.Join(cur, JoinMark)] = models.AnomalyPoint{
paramKey := strings.Join(cur, JoinMark)
if _, ok := paramPermutation[paramKey]; ok {
// 计算标签 hash确保不同的时序数据有不同的 key
tagHash := hash.GetTagHash(seqVals[i].Metric)
fullKey := paramKey + JoinMark + fmt.Sprintf("%d", tagHash)
point := models.AnomalyPoint{
Key: seqVals[i].Metric.String(),
Timestamp: seqVals[i].Timestamp.Unix(),
Value: float64(seqVals[i].Value),
@@ -467,17 +492,44 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
Severity: query.Severity,
Query: curRealQuery,
}
// 生成异常点后,删除该参数组合
delete(paramPermutation, strings.Join(cur, JoinMark))
currentLayerPointsByParam[paramKey] = append(currentLayerPointsByParam[paramKey], pointWithKey{point: point, fullKey: fullKey})
}
}
// 剩余的参数组合为本层筛选不产生异常点的组合,需要覆盖上层筛选中产生的异常点
for k, _ := range paramPermutation {
delete(anomalyPointsMap, k)
// 初始化空的参数组合(用于子层覆盖父层的场景)
for paramKey := range paramPermutation {
if _, exists := currentLayerPointsByParam[paramKey]; !exists {
currentLayerPointsByParam[paramKey] = []pointWithKey{}
}
}
}
// 处理当前层的结果
for paramKey, pointsWithKeys := range currentLayerPointsByParam {
if !isFirstLayer {
// 非首层(子层):先删除父层中该参数组合的所有告警
// 这实现了 issue #2433 要求的子筛选覆盖父筛选功能
keysToDelete := make([]string, 0)
for k := range anomalyPointsMap {
// key 格式: {参数组合}@@{标签hash}
// 检查是否以当前参数组合开头(后面跟着 JoinMark
if strings.HasPrefix(k, paramKey+JoinMark) {
keysToDelete = append(keysToDelete, k)
}
}
for _, k := range keysToDelete {
delete(anomalyPointsMap, k)
}
}
// 添加当前层的所有异常点
for _, pwk := range pointsWithKeys {
anomalyPointsMap[pwk.fullKey] = pwk.point
}
}
curNode = curNode.ChildVarConfigs
isFirstLayer = false
}
anomalyPoints := make([]models.AnomalyPoint, 0)
@@ -1228,9 +1280,20 @@ func GetQueryRefAndUnit(query interface{}) (string, string, error) {
// 每个节点先填充参数再进行查询, 即先得到完整的 promql avg(mem_used_percent{host="127.0.0.1"}) > 5
// 再查询得到满足值变量的所有结果加入异常点列表
// 参数变量的值不满足的组合,需要覆盖上层筛选中产生的异常点
//
// 修复说明 (Issue #2971):
// 原实现中使用参数变量组合作为 key 存储异常点,导致同一参数值下的多条时序数据互相覆盖。
// 修复方案:
// 1. 同一层内:使用完整的标签 hash 作为 key避免不同时序数据覆盖
// 2. 跨层级时:子层按参数变量组合删除父层的所有相关告警,实现子筛选覆盖父筛选
func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, readerClient promsdk.API) []models.AnomalyPoint {
varToLabel := ExtractVarMapping(query.PromQl)
// 存储异常点的 mapkey 为参数变量的组合,可以实现子筛选对上一层筛选的覆盖
// 存储异常点的 map
// key 格式: {参数变量组合}@@{标签hash}
// 这样可以:
// 1. 同层内不同时序数据有不同的 key标签hash不同
// 2. 跨层时可以按参数变量组合前缀删除父层的告警
anomalyPointsMap := sync.Map{}
// 统一变量配置格式
VarConfigForCalc := &models.ChildVarConfig{
@@ -1255,11 +1318,19 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
sort.Slice(ParamKeys, func(i, j int) bool {
return ParamKeys[i] < ParamKeys[j]
})
// 遍历变量配置链表
// 遍历变量配置链表(父层 -> 子层)
curNode := VarConfigForCalc
isFirstLayer := true
for curNode != nil {
// 当前层收集到的所有异常点,按参数组合分组
// key: 参数变量组合, value: 该组合下的所有异常点
currentLayerPointsByParam := make(map[string][]models.AnomalyPoint)
var currentLayerMutex sync.Mutex
for _, param := range curNode.ParamVal {
curPromql := query.PromQl
// 取出阈值变量
valMap := make(map[string]string)
for val, valQuery := range param {
@@ -1279,7 +1350,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
}
keyToPromql := make(map[string]string)
for paramPermutationKeys, _ := range paramPermutation {
for paramPermutationKeys := range paramPermutation {
realPromql := curPromql
split := strings.Split(paramPermutationKeys, JoinMark)
for j := range ParamKeys {
@@ -1291,15 +1362,20 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
// 并发查询
wg := sync.WaitGroup{}
semaphore := make(chan struct{}, 200)
for key, promql := range keyToPromql {
for paramKey, promql := range keyToPromql {
wg.Add(1)
semaphore <- struct{}{}
go func(key, promql string) {
go func(paramKey, promql string) {
defer func() {
<-semaphore
wg.Done()
}()
arw.Processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.DatasourceId), fmt.Sprintf("%d", arw.Rule.Id)).Inc()
arw.Processor.Stats.CounterQueryDataTotal.WithLabelValues(
fmt.Sprintf("%d", arw.DatasourceId),
fmt.Sprintf("%d", arw.Rule.Id),
).Inc()
value, _, err := readerClient.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%s, promql:%s, error:%v", arw.Key(), promql, err)
@@ -1309,29 +1385,67 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
points := models.ConvertAnomalyPoints(value)
if len(points) == 0 {
anomalyPointsMap.Delete(key)
// 查询无结果时,标记该参数组合需要清除(用于子层覆盖父层)
currentLayerMutex.Lock()
if _, exists := currentLayerPointsByParam[paramKey]; !exists {
currentLayerPointsByParam[paramKey] = []models.AnomalyPoint{}
}
currentLayerMutex.Unlock()
return
}
for i := 0; i < len(points); i++ {
points[i].Severity = query.Severity
points[i].Query = promql
points[i].ValuesUnit = map[string]unit.FormattedValue{
"v": unit.ValueFormatter(query.Unit, 2, points[i].Value),
}
// 每个异常点都需要生成 key子筛选使用 key 覆盖上层筛选,解决 issue https://github.com/ccfos/nightingale/issues/2433 提的问题
var cur []string
for _, paramKey := range ParamKeys {
val := string(points[i].Labels[model.LabelName(varToLabel[paramKey])])
cur = append(cur, val)
}
anomalyPointsMap.Store(strings.Join(cur, JoinMark), points[i])
}
}(key, promql)
// 收集当前层的异常点
currentLayerMutex.Lock()
currentLayerPointsByParam[paramKey] = append(currentLayerPointsByParam[paramKey], points...)
currentLayerMutex.Unlock()
}(paramKey, promql)
}
wg.Wait()
}
// 处理当前层的结果
for paramKey, points := range currentLayerPointsByParam {
if !isFirstLayer {
// 非首层(子层):先删除父层中该参数组合的所有告警
// 这实现了 issue #2433 要求的子筛选覆盖父筛选功能
keysToDelete := make([]string, 0)
anomalyPointsMap.Range(func(k, v any) bool {
keyStr := k.(string)
// key 格式: {参数组合}@@{标签hash}
// 检查是否以当前参数组合开头
if strings.HasPrefix(keyStr, paramKey+JoinMark) {
keysToDelete = append(keysToDelete, keyStr)
}
return true
})
for _, k := range keysToDelete {
anomalyPointsMap.Delete(k)
}
}
// 添加当前层的所有异常点
// 使用 参数组合 + 标签hash 作为 key保证同一参数值下的不同时序数据不会互相覆盖
for _, point := range points {
// 计算标签 hash确保不同的时序数据有不同的 key
tagHash := hash.GetTagHash(point.Labels)
fullKey := paramKey + JoinMark + fmt.Sprintf("%d", tagHash)
anomalyPointsMap.Store(fullKey, point)
}
}
curNode = curNode.ChildVarConfigs
isFirstLayer = false
}
// 收集所有异常点
anomalyPoints := make([]models.AnomalyPoint, 0)
anomalyPointsMap.Range(func(key, value any) bool {
if point, ok := value.(models.AnomalyPoint); ok {
@@ -1339,6 +1453,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
}
return true
})
return anomalyPoints
}