mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
9 Commits
fix-sql
...
nodata_tri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee88108128 | ||
|
|
95e01c2099 | ||
|
|
1726e60106 | ||
|
|
aa741c66ca | ||
|
|
70d75cb597 | ||
|
|
9f69dc8dc8 | ||
|
|
a39a20fac9 | ||
|
|
34de5b772d | ||
|
|
5343513afe |
@@ -48,6 +48,8 @@ type AlertRuleWorker struct {
|
||||
|
||||
HostAndDeviceIdentCache sync.Map
|
||||
|
||||
LastSeriesStore map[uint64]models.DataResp
|
||||
|
||||
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
|
||||
}
|
||||
|
||||
@@ -84,6 +86,7 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
|
||||
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
|
||||
return nil, nil
|
||||
},
|
||||
LastSeriesStore: make(map[uint64]models.DataResp),
|
||||
}
|
||||
|
||||
interval := rule.PromEvalInterval
|
||||
@@ -533,7 +536,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
|
||||
if len(query) == 0 {
|
||||
paramsKeyAllLabel, err := getParamKeyAllLabel(varToLabel[paramKey], originPromql, readerClient)
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v", arw.Key(), paramQuery.Query, err)
|
||||
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v query:%s", arw.Key(), err, paramQuery.Query)
|
||||
}
|
||||
params = paramsKeyAllLabel
|
||||
} else {
|
||||
@@ -819,122 +822,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]models.AnomalyPoint, []models.AnomalyPoint) {
|
||||
points := []models.AnomalyPoint{}
|
||||
recoverPoints := []models.AnomalyPoint{}
|
||||
|
||||
if len(ruleQuery.Triggers) == 0 {
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
if len(seriesTagIndexes) == 0 {
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
unitMap := make(map[string]string)
|
||||
for _, query := range ruleQuery.Queries {
|
||||
ref, unit, err := GetQueryRefAndUnit(query)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
unitMap[ref] = unit
|
||||
}
|
||||
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
// seriesTagIndex 的 key 仅做分组使用,value 为每组 series 的 hash
|
||||
seriesTagIndex := ProcessJoins(ruleId, trigger, seriesTagIndexes, seriesStore)
|
||||
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", ruleId, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", ruleId, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap[series.Ref] = unit.ValueFormatter(u, 2, v)
|
||||
}
|
||||
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
}
|
||||
isTriggered := parser.Calc(trigger.Exp, m)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
}
|
||||
values += fmt.Sprintf("%s:%v ", k, v)
|
||||
}
|
||||
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
|
||||
if sample.Query != "" {
|
||||
point.Query = sample.Query
|
||||
}
|
||||
// 恢复条件判断经过讨论是只在表达式模式下支持,表达式模式会通过 isTriggered 判断是告警点还是恢复点
|
||||
// 1. 不设置恢复判断,满足恢复条件产生 recoverPoint 恢复,无数据不产生 anomalyPoint 恢复
|
||||
// 2. 设置满足条件才恢复,仅可通过产生 recoverPoint 恢复,不能通过不产生 anomalyPoint 恢复
|
||||
// 3. 设置无数据不恢复,仅可通过产生 recoverPoint 恢复,不产生 anomalyPoint 恢复
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// 对齐原实现 do nothing
|
||||
case models.RecoverOnCondition:
|
||||
// 额外判断恢复条件,满足才恢复
|
||||
fulfill := parser.Calc(trigger.RecoverConfig.RecoverExp, m)
|
||||
if !fulfill {
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
func flatten(rehashed map[uint64][][]uint64) map[uint64][]uint64 {
|
||||
seriesTagIndex := make(map[uint64][]uint64)
|
||||
var i uint64
|
||||
@@ -1486,107 +1373,158 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
|
||||
unitMap[ref] = unit
|
||||
}
|
||||
|
||||
// 判断
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
if !ruleQuery.ExpTriggerDisable {
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
for k, v := range series.Metric {
|
||||
if k == "__name__" {
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
|
||||
// 过滤掉表达式中不包含的标签
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref+"."+string(k)] = string(v)
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
for k, v := range series.Metric {
|
||||
if k == "__name__" {
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
|
||||
// 过滤掉表达式中不包含的标签
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref+"."+string(k)] = string(v)
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
|
||||
}
|
||||
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
|
||||
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
switch v.(type) {
|
||||
case float64:
|
||||
values += fmt.Sprintf("%s:%.3f ", k, v)
|
||||
case string:
|
||||
values += fmt.Sprintf("%s:%s ", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
|
||||
}
|
||||
|
||||
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
|
||||
switch v.(type) {
|
||||
case float64:
|
||||
values += fmt.Sprintf("%s:%.3f ", k, v)
|
||||
case string:
|
||||
values += fmt.Sprintf("%s:%s ", k, v)
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// do nothing
|
||||
case models.RecoverOnCondition:
|
||||
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
|
||||
if !fulfill {
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
if ruleQuery.NodataTrigger.Enable {
|
||||
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// do nothing
|
||||
case models.RecoverOnCondition:
|
||||
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
|
||||
if !fulfill {
|
||||
now := time.Now().Unix()
|
||||
|
||||
// 使用 arw.LastSeriesStore 检查上次查询结果
|
||||
if len(arw.LastSeriesStore) > 0 {
|
||||
// 遍历上次的曲线数据
|
||||
for hash, lastSeries := range arw.LastSeriesStore {
|
||||
|
||||
if ruleQuery.NodataTrigger.ResolveAfterEnable {
|
||||
lastTs, _, exists := lastSeries.Last()
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查是否超过 resolve_after 时间
|
||||
if now-int64(lastTs) > int64(ruleQuery.NodataTrigger.ResolveAfter) {
|
||||
logger.Infof("rule_eval rid:%d series:%+v resolve after %d seconds now:%d lastTs:%d", rule.Id, lastSeries, ruleQuery.NodataTrigger.ResolveAfter, now, int64(lastTs))
|
||||
delete(arw.LastSeriesStore, hash)
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
|
||||
// 检查是否在本次查询结果中存在
|
||||
if _, exists := seriesStore[hash]; !exists {
|
||||
// 生成无数据告警点
|
||||
point := models.AnomalyPoint{
|
||||
Key: lastSeries.MetricName(),
|
||||
Labels: lastSeries.Metric,
|
||||
Timestamp: now,
|
||||
Value: 0,
|
||||
Values: fmt.Sprintf("nodata since %v", time.Unix(now, 0).Format("2006-01-02 15:04:05")),
|
||||
Severity: ruleQuery.NodataTrigger.Severity,
|
||||
Triggered: true,
|
||||
Query: fmt.Sprintf("nodata check for %s", lastSeries.LabelsString()),
|
||||
TriggerType: models.TriggerTypeNodata,
|
||||
}
|
||||
points = append(points, point)
|
||||
logger.Infof("rule_eval rid:%d nodata point:%+v", rule.Id, point)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 更新 arw.LastSeriesStore
|
||||
for hash, series := range seriesStore {
|
||||
arw.LastSeriesStore[hash] = series
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,6 +224,15 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
|
||||
event.RecoverConfig = anomalyPoint.RecoverConfig
|
||||
event.RuleHash = ruleHash
|
||||
|
||||
if anomalyPoint.TriggerType == models.TriggerTypeNodata {
|
||||
event.TriggerValue = "nodata"
|
||||
ruleConfig := models.RuleQuery{}
|
||||
json.Unmarshal([]byte(p.rule.RuleConfig), &ruleConfig)
|
||||
ruleConfig.TriggerType = anomalyPoint.TriggerType
|
||||
b, _ := json.Marshal(ruleConfig)
|
||||
event.RuleConfig = string(b)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(p.rule.Annotations), &event.AnnotationsJSON); err != nil {
|
||||
event.AnnotationsJSON = make(map[string]string) // 解析失败时使用空 map
|
||||
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)
|
||||
|
||||
@@ -161,9 +161,9 @@ type PromRuleConfig struct {
|
||||
type RecoverJudge int
|
||||
|
||||
const (
|
||||
Origin RecoverJudge = 0
|
||||
RecoverWithoutData RecoverJudge = 1
|
||||
RecoverOnCondition RecoverJudge = 2
|
||||
Origin RecoverJudge = 0
|
||||
NotRecoverWhenNoData RecoverJudge = 1
|
||||
RecoverOnCondition RecoverJudge = 2
|
||||
)
|
||||
|
||||
type RecoverConfig struct {
|
||||
@@ -194,10 +194,20 @@ type HostTrigger struct {
|
||||
}
|
||||
|
||||
type RuleQuery struct {
|
||||
Version string `json:"version"`
|
||||
Inhibit bool `json:"inhibit"`
|
||||
Queries []interface{} `json:"queries"`
|
||||
Triggers []Trigger `json:"triggers"`
|
||||
Version string `json:"version"`
|
||||
Inhibit bool `json:"inhibit"`
|
||||
Queries []interface{} `json:"queries"`
|
||||
ExpTriggerDisable bool `json:"exp_trigger_disable"`
|
||||
Triggers []Trigger `json:"triggers"`
|
||||
NodataTrigger NodataTrigger `json:"nodata_trigger"`
|
||||
TriggerType TriggerType `json:"trigger_type,omitempty"`
|
||||
}
|
||||
|
||||
type NodataTrigger struct {
|
||||
Enable bool `json:"enable"`
|
||||
Severity int `json:"severity"`
|
||||
ResolveAfterEnable bool `json:"resolve_after_enable"`
|
||||
ResolveAfter int `json:"resolve_after"` // 单位秒
|
||||
}
|
||||
|
||||
type Trigger struct {
|
||||
@@ -1175,8 +1185,6 @@ func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
|
||||
event.RuleProd = ar.Prod
|
||||
event.RuleAlgo = ar.Algorithm
|
||||
event.PromForDuration = ar.PromForDuration
|
||||
event.RuleConfig = ar.RuleConfig
|
||||
event.RuleConfigJson = ar.RuleConfigJson
|
||||
event.Callbacks = ar.Callbacks
|
||||
event.CallbacksJSON = ar.CallbacksJSON
|
||||
event.RunbookUrl = ar.RunbookUrl
|
||||
|
||||
@@ -20,8 +20,16 @@ type AnomalyPoint struct {
|
||||
Values string `json:"values"`
|
||||
ValuesUnit map[string]unit.FormattedValue `json:"values_unit"`
|
||||
RecoverConfig RecoverConfig `json:"recover_config"`
|
||||
TriggerType TriggerType `json:"trigger_type"`
|
||||
}
|
||||
|
||||
type TriggerType string
|
||||
|
||||
const (
|
||||
TriggerTypeNormal TriggerType = "normal"
|
||||
TriggerTypeNodata TriggerType = "nodata"
|
||||
)
|
||||
|
||||
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {
|
||||
anomalyPointLabels := make(model.Metric)
|
||||
for k, v := range labels {
|
||||
|
||||
@@ -53,6 +53,12 @@ func (d *DataResp) MetricName() string {
|
||||
return string(metric)
|
||||
}
|
||||
|
||||
// labels 转换为 string
|
||||
func (d *DataResp) LabelsString() string {
|
||||
labels := d.Metric
|
||||
return labels.String()
|
||||
}
|
||||
|
||||
type RelationKey struct {
|
||||
LeftKey string `json:"left_key"`
|
||||
RightKey string `json:"right_key"`
|
||||
|
||||
Reference in New Issue
Block a user