mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-04 06:59:00 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9867a9519c |
@@ -386,10 +386,21 @@ type sample struct {
|
||||
// 每个节点先查询无参数的 query, 即 mem_used_percent{} > curVal, 得到满足值变量的所有结果
|
||||
// 结果中有满足本节点参数变量的值,加入异常点列表
|
||||
// 参数变量的值不满足的组合,需要覆盖上层筛选中产生的异常点
|
||||
// VarFillingAfterQuery 先查询再过滤变量,效率较高,但无法处理有聚合函数导致标签丢失的情况
|
||||
//
|
||||
// 修复说明 (Issue #2971):
|
||||
// 原实现中使用参数变量组合作为 key 存储异常点,导致同一参数值下的多条时序数据互相覆盖。
|
||||
// 修复方案:
|
||||
// 1. 同一层内:使用完整的标签 hash 作为 key,避免不同时序数据覆盖
|
||||
// 2. 跨层级时:子层按参数变量组合前缀删除父层的所有相关告警,实现子筛选覆盖父筛选
|
||||
func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerClient promsdk.API) []models.AnomalyPoint {
|
||||
varToLabel := ExtractVarMapping(query.PromQl)
|
||||
fullQuery := removeVal(query.PromQl)
|
||||
// 存储所有的异常点,key 为参数变量的组合,可以实现子筛选对上一层筛选的覆盖
|
||||
// 存储所有的异常点
|
||||
// key 格式: {参数变量组合}@@{标签hash}
|
||||
// 这样可以:
|
||||
// 1. 同层内不同时序数据有不同的 key(标签hash不同)
|
||||
// 2. 跨层时可以按参数变量组合前缀删除父层的告警
|
||||
anomalyPointsMap := make(map[string]models.AnomalyPoint)
|
||||
// 统一变量配置格式
|
||||
VarConfigForCalc := &models.ChildVarConfig{
|
||||
@@ -416,7 +427,16 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
|
||||
})
|
||||
// 遍历变量配置链表
|
||||
curNode := VarConfigForCalc
|
||||
isFirstLayer := true
|
||||
for curNode != nil {
|
||||
// 当前层收集到的所有异常点,按参数组合分组
|
||||
// key: 参数变量组合, value: 该组合下的所有异常点及其完整key
|
||||
type pointWithKey struct {
|
||||
point models.AnomalyPoint
|
||||
fullKey string
|
||||
}
|
||||
currentLayerPointsByParam := make(map[string][]pointWithKey)
|
||||
|
||||
for _, param := range curNode.ParamVal {
|
||||
// curQuery 当前节点的无参数 query,用于时序库查询
|
||||
curQuery := fullQuery
|
||||
@@ -458,8 +478,13 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
|
||||
curRealQuery = fillVar(curRealQuery, paramKey, val)
|
||||
}
|
||||
|
||||
if _, ok := paramPermutation[strings.Join(cur, JoinMark)]; ok {
|
||||
anomalyPointsMap[strings.Join(cur, JoinMark)] = models.AnomalyPoint{
|
||||
paramKey := strings.Join(cur, JoinMark)
|
||||
if _, ok := paramPermutation[paramKey]; ok {
|
||||
// 计算标签 hash,确保不同的时序数据有不同的 key
|
||||
tagHash := hash.GetTagHash(seqVals[i].Metric)
|
||||
fullKey := paramKey + JoinMark + fmt.Sprintf("%d", tagHash)
|
||||
|
||||
point := models.AnomalyPoint{
|
||||
Key: seqVals[i].Metric.String(),
|
||||
Timestamp: seqVals[i].Timestamp.Unix(),
|
||||
Value: float64(seqVals[i].Value),
|
||||
@@ -467,17 +492,44 @@ func (arw *AlertRuleWorker) VarFillingAfterQuery(query models.PromQuery, readerC
|
||||
Severity: query.Severity,
|
||||
Query: curRealQuery,
|
||||
}
|
||||
// 生成异常点后,删除该参数组合
|
||||
delete(paramPermutation, strings.Join(cur, JoinMark))
|
||||
currentLayerPointsByParam[paramKey] = append(currentLayerPointsByParam[paramKey], pointWithKey{point: point, fullKey: fullKey})
|
||||
}
|
||||
}
|
||||
|
||||
// 剩余的参数组合为本层筛选不产生异常点的组合,需要覆盖上层筛选中产生的异常点
|
||||
for k, _ := range paramPermutation {
|
||||
delete(anomalyPointsMap, k)
|
||||
// 初始化空的参数组合(用于子层覆盖父层的场景)
|
||||
for paramKey := range paramPermutation {
|
||||
if _, exists := currentLayerPointsByParam[paramKey]; !exists {
|
||||
currentLayerPointsByParam[paramKey] = []pointWithKey{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 处理当前层的结果
|
||||
for paramKey, pointsWithKeys := range currentLayerPointsByParam {
|
||||
if !isFirstLayer {
|
||||
// 非首层(子层):先删除父层中该参数组合的所有告警
|
||||
// 这实现了 issue #2433 要求的子筛选覆盖父筛选功能
|
||||
keysToDelete := make([]string, 0)
|
||||
for k := range anomalyPointsMap {
|
||||
// key 格式: {参数组合}@@{标签hash}
|
||||
// 检查是否以当前参数组合开头(后面跟着 JoinMark)
|
||||
if strings.HasPrefix(k, paramKey+JoinMark) {
|
||||
keysToDelete = append(keysToDelete, k)
|
||||
}
|
||||
}
|
||||
for _, k := range keysToDelete {
|
||||
delete(anomalyPointsMap, k)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加当前层的所有异常点
|
||||
for _, pwk := range pointsWithKeys {
|
||||
anomalyPointsMap[pwk.fullKey] = pwk.point
|
||||
}
|
||||
}
|
||||
|
||||
curNode = curNode.ChildVarConfigs
|
||||
isFirstLayer = false
|
||||
}
|
||||
|
||||
anomalyPoints := make([]models.AnomalyPoint, 0)
|
||||
@@ -1228,9 +1280,20 @@ func GetQueryRefAndUnit(query interface{}) (string, string, error) {
|
||||
// 每个节点先填充参数再进行查询, 即先得到完整的 promql avg(mem_used_percent{host="127.0.0.1"}) > 5
|
||||
// 再查询得到满足值变量的所有结果加入异常点列表
|
||||
// 参数变量的值不满足的组合,需要覆盖上层筛选中产生的异常点
|
||||
//
|
||||
// 修复说明 (Issue #2971):
|
||||
// 原实现中使用参数变量组合作为 key 存储异常点,导致同一参数值下的多条时序数据互相覆盖。
|
||||
// 修复方案:
|
||||
// 1. 同一层内:使用完整的标签 hash 作为 key,避免不同时序数据覆盖
|
||||
// 2. 跨层级时:子层按参数变量组合删除父层的所有相关告警,实现子筛选覆盖父筛选
|
||||
func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, readerClient promsdk.API) []models.AnomalyPoint {
|
||||
varToLabel := ExtractVarMapping(query.PromQl)
|
||||
// 存储异常点的 map,key 为参数变量的组合,可以实现子筛选对上一层筛选的覆盖
|
||||
|
||||
// 存储异常点的 map
|
||||
// key 格式: {参数变量组合}@@{标签hash}
|
||||
// 这样可以:
|
||||
// 1. 同层内不同时序数据有不同的 key(标签hash不同)
|
||||
// 2. 跨层时可以按参数变量组合前缀删除父层的告警
|
||||
anomalyPointsMap := sync.Map{}
|
||||
// 统一变量配置格式
|
||||
VarConfigForCalc := &models.ChildVarConfig{
|
||||
@@ -1255,11 +1318,19 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
|
||||
sort.Slice(ParamKeys, func(i, j int) bool {
|
||||
return ParamKeys[i] < ParamKeys[j]
|
||||
})
|
||||
// 遍历变量配置链表
|
||||
|
||||
// 遍历变量配置链表(父层 -> 子层)
|
||||
curNode := VarConfigForCalc
|
||||
isFirstLayer := true
|
||||
for curNode != nil {
|
||||
// 当前层收集到的所有异常点,按参数组合分组
|
||||
// key: 参数变量组合, value: 该组合下的所有异常点
|
||||
currentLayerPointsByParam := make(map[string][]models.AnomalyPoint)
|
||||
var currentLayerMutex sync.Mutex
|
||||
|
||||
for _, param := range curNode.ParamVal {
|
||||
curPromql := query.PromQl
|
||||
|
||||
// 取出阈值变量
|
||||
valMap := make(map[string]string)
|
||||
for val, valQuery := range param {
|
||||
@@ -1279,7 +1350,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
|
||||
}
|
||||
|
||||
keyToPromql := make(map[string]string)
|
||||
for paramPermutationKeys, _ := range paramPermutation {
|
||||
for paramPermutationKeys := range paramPermutation {
|
||||
realPromql := curPromql
|
||||
split := strings.Split(paramPermutationKeys, JoinMark)
|
||||
for j := range ParamKeys {
|
||||
@@ -1291,15 +1362,20 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
|
||||
// 并发查询
|
||||
wg := sync.WaitGroup{}
|
||||
semaphore := make(chan struct{}, 200)
|
||||
for key, promql := range keyToPromql {
|
||||
for paramKey, promql := range keyToPromql {
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{}
|
||||
go func(key, promql string) {
|
||||
go func(paramKey, promql string) {
|
||||
defer func() {
|
||||
<-semaphore
|
||||
wg.Done()
|
||||
}()
|
||||
arw.Processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.DatasourceId), fmt.Sprintf("%d", arw.Rule.Id)).Inc()
|
||||
|
||||
arw.Processor.Stats.CounterQueryDataTotal.WithLabelValues(
|
||||
fmt.Sprintf("%d", arw.DatasourceId),
|
||||
fmt.Sprintf("%d", arw.Rule.Id),
|
||||
).Inc()
|
||||
|
||||
value, _, err := readerClient.Query(context.Background(), promql, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%s, promql:%s, error:%v", arw.Key(), promql, err)
|
||||
@@ -1309,29 +1385,67 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
|
||||
|
||||
points := models.ConvertAnomalyPoints(value)
|
||||
if len(points) == 0 {
|
||||
anomalyPointsMap.Delete(key)
|
||||
// 查询无结果时,标记该参数组合需要清除(用于子层覆盖父层)
|
||||
currentLayerMutex.Lock()
|
||||
if _, exists := currentLayerPointsByParam[paramKey]; !exists {
|
||||
currentLayerPointsByParam[paramKey] = []models.AnomalyPoint{}
|
||||
}
|
||||
currentLayerMutex.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < len(points); i++ {
|
||||
points[i].Severity = query.Severity
|
||||
points[i].Query = promql
|
||||
points[i].ValuesUnit = map[string]unit.FormattedValue{
|
||||
"v": unit.ValueFormatter(query.Unit, 2, points[i].Value),
|
||||
}
|
||||
// 每个异常点都需要生成 key,子筛选使用 key 覆盖上层筛选,解决 issue https://github.com/ccfos/nightingale/issues/2433 提的问题
|
||||
var cur []string
|
||||
for _, paramKey := range ParamKeys {
|
||||
val := string(points[i].Labels[model.LabelName(varToLabel[paramKey])])
|
||||
cur = append(cur, val)
|
||||
}
|
||||
anomalyPointsMap.Store(strings.Join(cur, JoinMark), points[i])
|
||||
}
|
||||
}(key, promql)
|
||||
|
||||
// 收集当前层的异常点
|
||||
currentLayerMutex.Lock()
|
||||
currentLayerPointsByParam[paramKey] = append(currentLayerPointsByParam[paramKey], points...)
|
||||
currentLayerMutex.Unlock()
|
||||
}(paramKey, promql)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// 处理当前层的结果
|
||||
for paramKey, points := range currentLayerPointsByParam {
|
||||
if !isFirstLayer {
|
||||
// 非首层(子层):先删除父层中该参数组合的所有告警
|
||||
// 这实现了 issue #2433 要求的子筛选覆盖父筛选功能
|
||||
keysToDelete := make([]string, 0)
|
||||
anomalyPointsMap.Range(func(k, v any) bool {
|
||||
keyStr := k.(string)
|
||||
// key 格式: {参数组合}@@{标签hash}
|
||||
// 检查是否以当前参数组合开头
|
||||
if strings.HasPrefix(keyStr, paramKey+JoinMark) {
|
||||
keysToDelete = append(keysToDelete, keyStr)
|
||||
}
|
||||
return true
|
||||
})
|
||||
for _, k := range keysToDelete {
|
||||
anomalyPointsMap.Delete(k)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加当前层的所有异常点
|
||||
// 使用 参数组合 + 标签hash 作为 key,保证同一参数值下的不同时序数据不会互相覆盖
|
||||
for _, point := range points {
|
||||
// 计算标签 hash,确保不同的时序数据有不同的 key
|
||||
tagHash := hash.GetTagHash(point.Labels)
|
||||
fullKey := paramKey + JoinMark + fmt.Sprintf("%d", tagHash)
|
||||
anomalyPointsMap.Store(fullKey, point)
|
||||
}
|
||||
}
|
||||
|
||||
curNode = curNode.ChildVarConfigs
|
||||
isFirstLayer = false
|
||||
}
|
||||
|
||||
// 收集所有异常点
|
||||
anomalyPoints := make([]models.AnomalyPoint, 0)
|
||||
anomalyPointsMap.Range(func(key, value any) bool {
|
||||
if point, ok := value.(models.AnomalyPoint); ok {
|
||||
@@ -1339,6 +1453,7 @@ func (arw *AlertRuleWorker) VarFillingBeforeQuery(query models.PromQuery, reader
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
return anomalyPoints
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user