Compare commits

...

9 Commits

Author SHA1 Message Date
ning
ee88108128 code refactor 2025-01-21 17:47:27 +08:00
ning
95e01c2099 code refactor 2025-01-21 15:58:01 +08:00
ning
1726e60106 code refactor 2025-01-21 15:31:00 +08:00
ning
aa741c66ca code refactor 2025-01-21 15:00:45 +08:00
ning
70d75cb597 code refactor 2025-01-21 12:20:25 +08:00
ning
9f69dc8dc8 code refactor 2025-01-21 11:35:52 +08:00
ning
a39a20fac9 code refactor 2025-01-10 11:01:12 +08:00
ning
34de5b772d code refactor 2025-01-09 17:35:39 +08:00
ning
5343513afe feat: nodata check 2025-01-09 17:14:05 +08:00
5 changed files with 174 additions and 205 deletions

View File

@@ -48,6 +48,8 @@ type AlertRuleWorker struct {
HostAndDeviceIdentCache sync.Map
LastSeriesStore map[uint64]models.DataResp
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
}
@@ -84,6 +86,7 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
return nil, nil
},
LastSeriesStore: make(map[uint64]models.DataResp),
}
interval := rule.PromEvalInterval
@@ -533,7 +536,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
if len(query) == 0 {
paramsKeyAllLabel, err := getParamKeyAllLabel(varToLabel[paramKey], originPromql, readerClient)
if err != nil {
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v", arw.Key(), paramQuery.Query, err)
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v query:%s", arw.Key(), err, paramQuery.Query)
}
params = paramsKeyAllLabel
} else {
@@ -819,122 +822,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
return lst, nil
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]models.AnomalyPoint, []models.AnomalyPoint) {
points := []models.AnomalyPoint{}
recoverPoints := []models.AnomalyPoint{}
if len(ruleQuery.Triggers) == 0 {
return points, recoverPoints
}
if len(seriesTagIndexes) == 0 {
return points, recoverPoints
}
unitMap := make(map[string]string)
for _, query := range ruleQuery.Queries {
ref, unit, err := GetQueryRefAndUnit(query)
if err != nil {
continue
}
unitMap[ref] = unit
}
for _, trigger := range ruleQuery.Triggers {
// seriesTagIndex 的 key 仅做分组使用value 为每组 series 的 hash
seriesTagIndex := ProcessJoins(ruleId, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", ruleId, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", ruleId, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap[series.Ref] = unit.ValueFormatter(u, 2, v)
}
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
ts = int64(t)
sample = series
value = v
}
isTriggered := parser.Calc(trigger.Exp, m)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
}
values += fmt.Sprintf("%s:%v ", k, v)
}
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
if sample.Query != "" {
point.Query = sample.Query
}
// 恢复条件判断经过讨论是只在表达式模式下支持,表达式模式会通过 isTriggered 判断是告警点还是恢复点
// 1. 不设置恢复判断,满足恢复条件产生 recoverPoint 恢复,无数据不产生 anomalyPoint 恢复
// 2. 设置满足条件才恢复,仅可通过产生 recoverPoint 恢复,不能通过不产生 anomalyPoint 恢复
// 3. 设置无数据不恢复,仅可通过产生 recoverPoint 恢复,不产生 anomalyPoint 恢复
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// 对齐原实现 do nothing
case models.RecoverOnCondition:
// 额外判断恢复条件,满足才恢复
fulfill := parser.Calc(trigger.RecoverConfig.RecoverExp, m)
if !fulfill {
continue
}
}
recoverPoints = append(recoverPoints, point)
}
}
}
return points, recoverPoints
}
func flatten(rehashed map[uint64][][]uint64) map[uint64][]uint64 {
seriesTagIndex := make(map[uint64][]uint64)
var i uint64
@@ -1486,107 +1373,158 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
unitMap[ref] = unit
}
// 判断
for _, trigger := range ruleQuery.Triggers {
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
if !ruleQuery.ExpTriggerDisable {
for _, trigger := range ruleQuery.Triggers {
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
for k, v := range series.Metric {
if k == "__name__" {
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
// 过滤掉表达式中不包含的标签
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
m["$"+series.Ref+"."+string(k)] = string(v)
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
for k, v := range series.Metric {
if k == "__name__" {
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
// 过滤掉表达式中不包含的标签
continue
}
m["$"+series.Ref+"."+string(k)] = string(v)
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
}
ts = int64(t)
sample = series
value = v
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
}
switch v.(type) {
case float64:
values += fmt.Sprintf("%s:%.3f ", k, v)
case string:
values += fmt.Sprintf("%s:%s ", k, v)
}
}
ts = int64(t)
sample = series
value = v
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
}
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
switch v.(type) {
case float64:
values += fmt.Sprintf("%s:%.3f ", k, v)
case string:
values += fmt.Sprintf("%s:%s ", k, v)
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// do nothing
case models.RecoverOnCondition:
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
if !fulfill {
continue
}
}
recoverPoints = append(recoverPoints, point)
}
}
}
}
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
if ruleQuery.NodataTrigger.Enable {
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// do nothing
case models.RecoverOnCondition:
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
if !fulfill {
now := time.Now().Unix()
// 使用 arw.LastSeriesStore 检查上次查询结果
if len(arw.LastSeriesStore) > 0 {
// 遍历上次的曲线数据
for hash, lastSeries := range arw.LastSeriesStore {
if ruleQuery.NodataTrigger.ResolveAfterEnable {
lastTs, _, exists := lastSeries.Last()
if !exists {
continue
}
// 检查是否超过 resolve_after 时间
if now-int64(lastTs) > int64(ruleQuery.NodataTrigger.ResolveAfter) {
logger.Infof("rule_eval rid:%d series:%+v resolve after %d seconds now:%d lastTs:%d", rule.Id, lastSeries, ruleQuery.NodataTrigger.ResolveAfter, now, int64(lastTs))
delete(arw.LastSeriesStore, hash)
continue
}
}
recoverPoints = append(recoverPoints, point)
// 检查是否在本次查询结果中存在
if _, exists := seriesStore[hash]; !exists {
// 生成无数据告警点
point := models.AnomalyPoint{
Key: lastSeries.MetricName(),
Labels: lastSeries.Metric,
Timestamp: now,
Value: 0,
Values: fmt.Sprintf("nodata since %v", time.Unix(now, 0).Format("2006-01-02 15:04:05")),
Severity: ruleQuery.NodataTrigger.Severity,
Triggered: true,
Query: fmt.Sprintf("nodata check for %s", lastSeries.LabelsString()),
TriggerType: models.TriggerTypeNodata,
}
points = append(points, point)
logger.Infof("rule_eval rid:%d nodata point:%+v", rule.Id, point)
}
}
}
// 更新 arw.LastSeriesStore
for hash, series := range seriesStore {
arw.LastSeriesStore[hash] = series
}
}
}

View File

@@ -224,6 +224,15 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
event.RecoverConfig = anomalyPoint.RecoverConfig
event.RuleHash = ruleHash
if anomalyPoint.TriggerType == models.TriggerTypeNodata {
event.TriggerValue = "nodata"
ruleConfig := models.RuleQuery{}
json.Unmarshal([]byte(p.rule.RuleConfig), &ruleConfig)
ruleConfig.TriggerType = anomalyPoint.TriggerType
b, _ := json.Marshal(ruleConfig)
event.RuleConfig = string(b)
}
if err := json.Unmarshal([]byte(p.rule.Annotations), &event.AnnotationsJSON); err != nil {
event.AnnotationsJSON = make(map[string]string) // 解析失败时使用空 map
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)

View File

@@ -161,9 +161,9 @@ type PromRuleConfig struct {
type RecoverJudge int
const (
Origin RecoverJudge = 0
RecoverWithoutData RecoverJudge = 1
RecoverOnCondition RecoverJudge = 2
Origin RecoverJudge = 0
NotRecoverWhenNoData RecoverJudge = 1
RecoverOnCondition RecoverJudge = 2
)
type RecoverConfig struct {
@@ -194,10 +194,20 @@ type HostTrigger struct {
}
type RuleQuery struct {
Version string `json:"version"`
Inhibit bool `json:"inhibit"`
Queries []interface{} `json:"queries"`
Triggers []Trigger `json:"triggers"`
Version string `json:"version"`
Inhibit bool `json:"inhibit"`
Queries []interface{} `json:"queries"`
ExpTriggerDisable bool `json:"exp_trigger_disable"`
Triggers []Trigger `json:"triggers"`
NodataTrigger NodataTrigger `json:"nodata_trigger"`
TriggerType TriggerType `json:"trigger_type,omitempty"`
}
type NodataTrigger struct {
Enable bool `json:"enable"`
Severity int `json:"severity"`
ResolveAfterEnable bool `json:"resolve_after_enable"`
ResolveAfter int `json:"resolve_after"` // 单位秒
}
type Trigger struct {
@@ -1175,8 +1185,6 @@ func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
event.RuleProd = ar.Prod
event.RuleAlgo = ar.Algorithm
event.PromForDuration = ar.PromForDuration
event.RuleConfig = ar.RuleConfig
event.RuleConfigJson = ar.RuleConfigJson
event.Callbacks = ar.Callbacks
event.CallbacksJSON = ar.CallbacksJSON
event.RunbookUrl = ar.RunbookUrl

View File

@@ -20,8 +20,16 @@ type AnomalyPoint struct {
Values string `json:"values"`
ValuesUnit map[string]unit.FormattedValue `json:"values_unit"`
RecoverConfig RecoverConfig `json:"recover_config"`
TriggerType TriggerType `json:"trigger_type"`
}
type TriggerType string
const (
TriggerTypeNormal TriggerType = "normal"
TriggerTypeNodata TriggerType = "nodata"
)
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {
anomalyPointLabels := make(model.Metric)
for k, v := range labels {

View File

@@ -53,6 +53,12 @@ func (d *DataResp) MetricName() string {
return string(metric)
}
// labels 转换为 string
func (d *DataResp) LabelsString() string {
labels := d.Metric
return labels.String()
}
type RelationKey struct {
LeftKey string `json:"left_key"`
RightKey string `json:"right_key"`