Compare commits

...

1 Commits

Author SHA1 Message Date
Xu Bin
ab0756d98a feat: eval check record (#2086) 2024-08-13 20:37:23 +08:00
13 changed files with 570 additions and 32 deletions

View File

@@ -41,14 +41,14 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
@@ -68,7 +68,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
@@ -90,7 +90,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, taskTplsCache *memsto.TaskTplCache, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, redis *storage.Redis) {
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
@@ -103,7 +103,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats)
busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats, redis)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)

View File

@@ -13,6 +13,7 @@ import (
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/storage"
"github.com/ccfos/nightingale/v6/tdengine"
"github.com/toolkits/pkg/logger"
@@ -40,12 +41,14 @@ type Scheduler struct {
ctx *ctx.Context
stats *astats.Stats
redis *storage.Redis
}
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType,
targetCache *memsto.TargetCacheType, toarc *memsto.TargetsOfAlertRuleCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType,
promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats) *Scheduler {
promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats, redis *storage.Redis) *Scheduler {
scheduler := &Scheduler{
aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker),
@@ -65,6 +68,8 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess
ctx: ctx,
stats: stats,
redis: redis,
}
go scheduler.LoopSyncRules(context.Background())
@@ -117,7 +122,7 @@ func (s *Scheduler) syncAlertRules() {
logger.Debugf("datasource %d status is %s", dsId, ds.Status)
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
@@ -127,7 +132,7 @@ func (s *Scheduler) syncAlertRules() {
if !naming.DatasourceHashRing.IsHit(s.aconf.Heartbeat.EngineName, strconv.FormatInt(rule.Id, 10), s.aconf.Heartbeat.Endpoint) {
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
} else {
@@ -144,7 +149,7 @@ func (s *Scheduler) syncAlertRules() {
logger.Debugf("datasource %d status is %s", dsId, ds.Status)
continue
}
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
externalRuleWorkers[processor.Key()] = processor
}
}

View File

@@ -18,7 +18,6 @@ import (
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/tdengine"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
@@ -105,6 +104,11 @@ func (arw *AlertRuleWorker) Eval() {
// logger.Errorf("rule_eval:%s rule not found", arw.Key())
return
}
if arw.processor == nil {
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
return
}
arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc()
typ := cachedRule.GetRuleType()
@@ -120,12 +124,12 @@ func (arw *AlertRuleWorker) Eval() {
case models.LOKI:
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
default:
process.Record(arw.ctx, nil, nil, false, process.InvalidType, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
return
}
if arw.processor == nil {
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
return
if len(anomalyPoints) == 0 {
process.Record(arw.ctx, nil, nil, false, process.NoAnomalyPoint, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
}
if arw.inhibit {
@@ -146,6 +150,7 @@ func (arw *AlertRuleWorker) Eval() {
models.AlertCurEventDelByHash(arw.ctx, hash)
pointsMap[tagHash] = point
process.Record(arw.ctx, &p, nil, true, process.RecoverMerge, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
}
}
@@ -178,12 +183,14 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return lst
}
if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.RuleNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return lst
}
@@ -197,12 +204,14 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
if promql == "" {
logger.Warningf("rule_eval:%s promql is blank", arw.Key())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), CHECK_QUERY).Inc()
process.Record(arw.ctx, nil, nil, false, process.InvalidQuery, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
if arw.promClients.IsNil(arw.datasourceId) {
logger.Warningf("rule_eval:%s error reader client is nil", arw.Key())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_CLIENT).Inc()
process.Record(arw.ctx, nil, nil, false, process.EmptyPromClient, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -215,6 +224,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
logger.Errorf("rule_eval:%s promql:%s, error:%v", arw.Key(), promql, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
process.Record(arw.ctx, nil, nil, false, process.QueryError(promql, err), arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -225,6 +235,9 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
}
logger.Debugf("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)
process.Record(arw.ctx, nil, nil, false,
process.QueryRecord(fmt.Sprintf("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)),
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
points := common.ConvertAnomalyPoints(value)
for i := 0; i < len(points); i++ {
points[i].Severity = query.Severity
@@ -243,6 +256,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
if ruleConfig == "" {
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return points, recoverPoints
}
@@ -252,6 +266,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval:%d promql parse error:%s", rule.Id, err.Error())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId())).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.InvalidQuery, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return points, recoverPoints
}
@@ -267,6 +282,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval:%d tdengine client is nil", rule.Id)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_CLIENT).Inc()
process.Record(arw.ctx, nil, nil, false, process.EmptyPromClient, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -276,15 +292,19 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
process.Record(arw.ctx, nil, nil, false, process.QueryError(query, err), arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
MakeSeriesMap(series, seriesTagIndex, seriesStore)
process.Record(arw.ctx, nil, nil, false,
process.QueryRecord(fmt.Sprintf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)),
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
}
points, recoverPoints = GetAnomalyPoint(rule.Id, ruleQuery, seriesTagIndex, seriesStore)
points, recoverPoints = GetAnomalyPoint(arw, rule.Id, ruleQuery, seriesTagIndex, seriesStore)
}
return points, recoverPoints
@@ -298,12 +318,14 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return lst
}
if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
process.Record(arw.ctx, nil, nil, false, process.RuleNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
return lst
}
@@ -338,6 +360,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
idents = append(idents, engineIdents...)
if len(idents) == 0 {
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -370,6 +393,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
if !exists {
logger.Warningf("rule_eval:%s targets not found", arw.Key())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -424,6 +448,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
if !exists {
logger.Warningf("rule_eval:%s targets not found", arw.Key())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
continue
}
@@ -444,7 +469,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
return lst
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
func GetAnomalyPoint(arw *AlertRuleWorker, ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
@@ -484,6 +509,9 @@ func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex ma
isTriggered := parser.Calc(trigger.Exp, m)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
process.Record(arw.ctx, nil, nil, false,
process.QueryRecord(fmt.Sprintf("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)),
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
var values string
for k, v := range m {

View File

@@ -0,0 +1,266 @@
package process
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/logger"
)
var AlertRecordRedisKey = "alert::%d::%d::%d"
var AlertRecordMaxCount int64 = 100000
var defaultMaxLengthForList int64 = 1000
var defaultTimeDurationForList = 24 * time.Hour
// AlertRecordCount 内存中维护 alert record 的 key 以及对应 list 的长度
var AlertRecordCount sync.Map
// InitAlertRecordCount init alert record count map
func InitAlertRecordCount(ctx *ctx.Context, redis *storage.Redis) error {
alertRules, err := models.AlertRuleGetsAll(ctx)
if err != nil {
return err
}
// 启动时构造 24 小时内所有 alertRecord 的 key并从 redis 中查询长度
now := time.Now()
keys := make([]string, 0)
for i := 0; i < 24; i++ {
for _, rule := range alertRules {
key := fmt.Sprintf(AlertRecordRedisKey, rule.Id, now.Day(), now.Hour())
keys = append(keys, key)
}
now = now.Add(-time.Hour)
}
arc, err := storage.MLLen(ctx.GetContext(), *redis, keys)
if err != nil {
return err
}
AlertRecordCount = *mapToSyncMap(arc)
return nil
}
func mapToSyncMap(m map[string]int64) *sync.Map {
var sm sync.Map
for k, v := range m {
sm.Store(k, v)
}
return &sm
}
type NoEventReason string
const (
NoAnomalyPoint NoEventReason = "no anomaly point for this alert"
InvalidType NoEventReason = "rule type not support"
InvalidRuleConfig NoEventReason = "rule config invalid"
InvalidQuery NoEventReason = "invalid promql query"
EmptyPromClient NoEventReason = "empty prom client"
QueryErr NoEventReason = "query error, query: %+v, err: %s"
TargetNotFound NoEventReason = "targets not found"
EventRecovered NoEventReason = "event recovered"
RecoverMerge NoEventReason = "recover event merged"
RecoverDuration NoEventReason = "within recover duration"
RuleNotFound NoEventReason = "rule not found"
IsInhibit NoEventReason = "alert is inhibited by high priority"
Muted NoEventReason = "alert is muted, detail: %s"
MutedByHook NoEventReason = "alert is muted by hook"
FullQueue NoEventReason = "alert queue is full"
Interval NoEventReason = "fail to reach alert interval"
RepeatStep NoEventReason = "fail to reach repeat step"
NotifyNumber NoEventReason = "reach max notify number"
ValueRecord NoEventReason = "this is a debug record instead of no even reason, "
)
type AlertRecord struct {
AlertName string `json:"alert_name"`
CreateAt int64 `json:"create_at"`
ReasonForNoEvent NoEventReason `json:"reason_for_no_event"`
IsRecovery bool `json:"is_recovery"`
Labels string `json:"labels"`
Query string `json:"query"`
Values string `json:"values"`
}
func (ar *AlertRecord) MarshalBinary() (data []byte, err error) {
return json.Marshal(ar)
}
type AlertRecordRedis struct {
Key string `json:"key"`
Value *AlertRecord `json:"value"`
ExpireDuration time.Duration `json:"expire_duration"`
MaxLength int64 `json:"max_length"`
}
func Record(ctx *ctx.Context, point *common.AnomalyPoint, event *models.AlertCurEvent, isRecovery bool, reason NoEventReason, ruleName string, ruleID int64, redis *storage.Redis) {
// 开始调度后,告警检测有 4 种情况
// 1. 没有生成 point
// 2. 生成了 point 但没有生成 event
// 3. 生成了 event 但没有投递到队列
// 4. 生成了 event 且投递到队列
var ar AlertRecord
ar.ReasonForNoEvent = reason
ar.IsRecovery = isRecovery
ar.AlertName = ruleName
now := time.Now()
key := fmt.Sprintf(AlertRecordRedisKey, ruleID, now.Day(), now.Hour())
if event != nil {
// 生成了 event 但没有投递
ar.CreateAt = event.TriggerTime
ar.Labels = event.Tags
ar.Query = event.PromQl
ar.Values = event.TriggerValue
} else if point != nil {
// 生成了 point 但没有生成 event
ar.CreateAt = point.Timestamp
ar.Labels = point.Labels.String()
ar.Query = point.Query
ar.Values = point.ReadableValue()
return
} else {
// 没有生成 point
}
if !ctx.IsCenter {
err := poster.PostByUrls(ctx, "/v1/n9e/redis/lpush", AlertRecordRedis{
Key: key,
Value: &ar,
ExpireDuration: defaultTimeDurationForList,
MaxLength: defaultMaxLengthForList,
})
if err != nil {
logger.Errorf("fail to forward alert record: %v, err:%s", ar, err.Error())
}
return
}
err := PushAlertRecord(ctx, redis, key, &ar)
if err != nil {
logger.Errorf("fail to push alert record: %v, err:%s", ar, err.Error())
}
}
func PushAlertRecord(ctx *ctx.Context, redis *storage.Redis, key string, ar *AlertRecord) error {
err := storage.LPush(ctx.GetContext(), *redis, key, ar)
if err != nil {
return err
}
var count int64
val, ok := AlertRecordCount.Load(key)
if !ok || val.(int64) == 0 {
err := storage.Expire(ctx.GetContext(), *redis, key, defaultTimeDurationForList)
if err != nil {
logger.Errorf("fail to set expire time for alert record, key :%s, err:%s", key, err.Error())
return err
}
AlertRecordCount.Store(key, int64(1))
} else {
count = val.(int64) + 1
AlertRecordCount.Store(key, count)
}
if count > defaultMaxLengthForList {
err := storage.LTrim(ctx.GetContext(), *redis, key, 0, defaultMaxLengthForList-1)
if err != nil {
logger.Errorf("fail to trim alert record, key :%s, err:%s", key, err.Error())
return err
}
AlertRecordCount.Store(key, defaultMaxLengthForList)
}
return nil
}
func AlertMutedReason(detail string) NoEventReason {
return NoEventReason(fmt.Sprintf(string(Muted), detail))
}
func QueryError(query interface{}, err error) NoEventReason {
return NoEventReason(fmt.Sprintf(string(QueryErr), query, err.Error()))
}
func QueryRecord(info string) NoEventReason {
return NoEventReason(fmt.Sprintf(string(ValueRecord) + info))
}
type keyAndLen struct {
key string
length int64
}
// LimitAlertRecordCount drop keys when alert record's count exceed AlertRecordMaxCount
func LimitAlertRecordCount(ctx *ctx.Context, redis *storage.Redis) {
var keys []string
// 查出内存中维护的所有 key 对应的过期时间以及 list 长度
AlertRecordCount.Range(func(k, v interface{}) bool {
keys = append(keys, k.(string))
return true
})
kal := make(map[int][]keyAndLen)
var count int64
kToLen, err := storage.MLLen(ctx.GetContext(), *redis, keys)
if err != nil {
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keys, err.Error())
return
}
kToTTL, err := storage.MTTL(ctx.GetContext(), *redis, keys)
if err != nil {
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keys, err.Error())
return
}
// 按照过期时间将 key 分组
for k, v := range kToTTL {
l := kToLen[k]
if v < 0 {
// 不存在/已经过期的 key 不再维护
AlertRecordCount.Delete(k)
}
if l == 0 || v < 0 {
continue
}
hour := int(v.Hours())
if _, ok := kal[hour]; !ok {
kal[hour] = make([]keyAndLen, 0)
}
count += l
kal[hour] = append(kal[hour], keyAndLen{key: k, length: l})
}
if count <= AlertRecordMaxCount {
return
}
// 如果阈值超过上限,以小时为粒度依次淘汰 key
keyDel := make([]string, 0)
for i := 0; i < 24; i++ {
if count <= AlertRecordMaxCount {
break
}
if len(kal[i]) == 0 {
continue
}
for j := 0; j < len(kal[i]); j++ {
keyDel = append(keyDel, kal[i][j].key)
count -= kal[i][j].length
}
}
err = storage.MDel(ctx.GetContext(), *redis, keyDel...)
if err != nil {
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keyDel, err.Error())
}
for i := range keyDel {
AlertRecordCount.Delete(keyDel[i])
}
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/logger"
@@ -77,6 +78,8 @@ type Processor struct {
HandleFireEventHook HandleEventFunc
HandleRecoverEventHook HandleEventFunc
EventMuteHook EventMuteHookFunc
Redis *storage.Redis
}
func (p *Processor) Key() string {
@@ -99,7 +102,7 @@ func (p *Processor) Hash() string {
func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64, alertRuleCache *memsto.AlertRuleCacheType,
targetCache *memsto.TargetCacheType, targetsOfAlertRuleCache *memsto.TargetsOfAlertRuleCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
stats *astats.Stats) *Processor {
stats *astats.Stats, redis *storage.Redis) *Processor {
p := &Processor{
EngineName: engineName,
@@ -119,6 +122,7 @@ func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64,
HandleFireEventHook: func(event *models.AlertCurEvent) {},
HandleRecoverEventHook: func(event *models.AlertCurEvent) {},
EventMuteHook: func(event *models.AlertCurEvent) bool { return false },
Redis: redis,
}
p.mayHandleGroup()
@@ -134,6 +138,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
if cachedRule == nil {
logger.Errorf("rule not found %+v", anomalyPoints)
p.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", p.DatasourceId()), "handle_event").Inc()
Record(p.ctx, nil, nil, false, RuleNotFound, p.rule.Name, p.rule.Id, p.Redis)
return
}
@@ -152,12 +157,14 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
if isMuted {
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
logger.Debugf("rule_eval:%s event:%v is muted, detail:%s", p.Key(), event, detail)
Record(p.ctx, nil, event, false, AlertMutedReason(detail), p.rule.Name, p.rule.Id, p.Redis)
continue
}
if p.EventMuteHook(event) {
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
logger.Debugf("rule_eval:%s event:%v is muted by hook", p.Key(), event)
Record(p.ctx, nil, event, false, MutedByHook, p.rule.Name, p.rule.Id, p.Redis)
continue
}
@@ -277,6 +284,8 @@ func (p *Processor) HandleRecover(alertingKeys map[string]struct{}, now int64, i
if _, has := alertingKeys[hash]; has {
continue
}
event, _ := p.pendings.Get(hash)
Record(p.ctx, nil, event, true, EventRecovered, p.rule.Name, p.rule.Id, p.Redis)
p.pendings.Delete(hash)
}
@@ -324,6 +333,7 @@ func (p *Processor) HandleRecoverEvent(hashArr []string, now int64, inhibit bool
p.pendings.Delete(e.Hash)
models.AlertCurEventDelByHash(p.ctx, e.Hash)
eventMap[event.Tags] = *event
Record(p.ctx, nil, &e, true, IsInhibit, p.rule.Name, p.rule.Id, p.Redis)
}
}
@@ -345,6 +355,7 @@ func (p *Processor) RecoverSingle(hash string, now int64, value *string, values
// 如果配置了留观时长,就不能立马恢复了
if cachedRule.RecoverDuration > 0 && now-event.LastEvalTime < cachedRule.RecoverDuration {
logger.Debugf("rule_eval:%s event:%v not recover", p.Key(), event)
Record(p.ctx, nil, event, true, RecoverDuration, p.rule.Name, p.rule.Id, p.Redis)
return
}
if value != nil {
@@ -401,6 +412,8 @@ func (p *Processor) handleEvent(events []*models.AlertCurEvent) {
severity = event.Severity
}
continue
} else {
Record(p.ctx, nil, event, false, Interval, p.rule.Name, p.rule.Id, p.Redis)
}
}
@@ -411,6 +424,7 @@ func (p *Processor) inhibitEvent(events []*models.AlertCurEvent, highSeverity in
for _, event := range events {
if p.inhibit && event.Severity > highSeverity {
logger.Debugf("rule_eval:%s event:%+v inhibit highSeverity:%d", p.Key(), event, highSeverity)
Record(p.ctx, nil, event, false, IsInhibit, p.rule.Name, p.rule.Id, p.Redis)
continue
}
p.fireEvent(event)
@@ -421,6 +435,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
// As p.rule maybe outdated, use rule from cache
cachedRule := p.rule
if cachedRule == nil {
Record(p.ctx, nil, event, false, RuleNotFound, p.rule.Name, p.rule.Id, p.Redis)
return
}
@@ -434,6 +449,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
logger.Debugf("rule_eval:%s event:%+v repeat is zero nothing to do", p.Key(), event)
// 说明不想重复通知那就直接返回了nothing to do
// do not need to send alert again
Record(p.ctx, nil, event, false, RepeatStep, p.rule.Name, p.rule.Id, p.Redis)
return
}
@@ -447,12 +463,15 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
if fired.NotifyCurNumber >= cachedRule.NotifyMaxNumber {
logger.Debugf("rule_eval:%s event:%+v reach max number", p.Key(), event)
Record(p.ctx, nil, event, false, NotifyNumber, p.rule.Name, p.rule.Id, p.Redis)
return
} else {
event.NotifyCurNumber = fired.NotifyCurNumber + 1
p.pushEventToQueue(event)
}
}
} else {
Record(p.ctx, nil, event, false, RepeatStep, p.rule.Name, p.rule.Id, p.Redis)
}
} else {
event.NotifyCurNumber = 1
@@ -472,6 +491,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
if !queue.EventQueue.PushFront(e) {
logger.Warningf("event_push_queue: queue is full, event:%+v", e)
p.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", p.DatasourceId()), "push_event_queue").Inc()
Record(p.ctx, nil, e, e.IsRecovered, FullQueue, p.rule.Name, p.rule.Id, p.Redis)
}
}

View File

@@ -16,6 +16,7 @@ import (
centerrt "github.com/ccfos/nightingale/v6/center/router"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/conf"
"github.com/ccfos/nightingale/v6/cron"
"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
@@ -60,9 +61,20 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if err != nil {
return nil, err
}
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db, true)
migrate.Migrate(db)
models.InitRoot(ctx)
err = cron.InitLimitAlertRecordCountCron(ctx, &redis)
if err != nil {
return nil, err
}
config.HTTP.JWTAuth.SigningKey = models.InitJWTSigningKey(ctx)
@@ -72,11 +84,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
integration.Init(ctx, config.Center.BuiltinIntegrationsDir)
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
metas := metas.New(redis)
idents := idents.New(ctx, redis)
@@ -100,7 +107,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
process.InitAlertRecordCount(ctx, &redis)
writers := writer.NewWriters(config.Pushgw)

View File

@@ -324,6 +324,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/alert-rule/:arid/pure", rt.auth(), rt.user(), rt.perm("/alert-rules"), rt.alertRulePureGet)
pages.PUT("/busi-group/alert-rule/validate", rt.auth(), rt.user(), rt.perm("/alert-rules/put"), rt.alertRuleValidation)
pages.POST("/relabel-test", rt.auth(), rt.user(), rt.relabelTest)
pages.GET("/alert-record/list", rt.auth(), rt.user(), rt.perm("/alert-rules"), rt.getRuleRecords)
pages.GET("/busi-groups/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGetsByGids)
pages.GET("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGets)
@@ -553,6 +554,7 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/targets-of-alert-rule", rt.targetsOfAlertRule)
service.POST("/redis/lpush", rt.redisLPushAlertRecord)
}
}

View File

@@ -3,15 +3,17 @@ package router
import (
"encoding/json"
"fmt"
"github.com/ccfos/nightingale/v6/alert/process"
"net/http"
"strconv"
"strings"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/ginx"
@@ -499,3 +501,71 @@ func (rt *Router) relabelTest(c *gin.Context) {
ginx.NewRender(c).Data(tags, nil)
}
func (rt *Router) redisLPushAlertRecord(c *gin.Context) {
var ar process.AlertRecordRedis
ginx.BindJSON(c, &ar)
ginx.NewRender(c).Message(process.PushAlertRecord(rt.Ctx, &rt.Redis, ar.Key, ar.Value))
}
func (rt *Router) getRuleRecords(c *gin.Context) {
stime, etime := getTimeRange(c)
limit := ginx.QueryInt(c, "limit", 20)
offset := ginx.QueryInt(c, "offset", 0)
ruleId := ginx.QueryInt64(c, "rule_id", 0)
query := ginx.QueryStr(c, "query", "")
alertRecords := getRuleRecordsFromRedis(rt.Ctx, &rt.Redis, stime, etime, ruleId, limit, offset, query)
ginx.NewRender(c).Data(gin.H{
"list": alertRecords,
"total": len(alertRecords),
}, nil)
}
func getRuleRecordsFromRedis(ctx *ctx.Context, redis *storage.Redis, stime int64, etime, ruleId int64, limit, offset int, query string) []*process.AlertRecord {
s := time.Unix(stime, 0).Round(time.Hour)
e := time.Unix(etime, 0).Round(time.Hour)
keys := make([]string, 0)
for t := s; !t.After(e); t = t.Add(time.Hour) {
keys = append(keys, fmt.Sprintf(process.AlertRecordRedisKey, ruleId, t.Day(), t.Hour()))
}
alertRecords, err := storage.MRangeList[*process.AlertRecord](ctx.GetContext(), *redis, keys)
if err != nil {
fmt.Println(err)
}
alertRecords = filterAlertRecords(alertRecords, query)
start := offset * limit
if start > len(alertRecords) {
return nil
}
end := start + limit
if end > len(alertRecords) {
end = len(alertRecords)
}
return alertRecords[start:end]
}
func filterAlertRecords(alertRecords []*process.AlertRecord, query string) []*process.AlertRecord {
if query == "" {
return alertRecords
}
queryFields := strings.Fields(query)
var res []*process.AlertRecord
for i := range alertRecords {
need := true
for j := range queryFields {
if !strings.Contains(alertRecords[i].Labels, queryFields[j]) {
need = false
break
}
}
if need {
res = append(res, alertRecords[i])
}
}
return res
}

View File

@@ -36,11 +36,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if err != nil {
return nil, err
}
//check CenterApi is default value
if len(config.CenterApi.Addrs) < 1 {
return nil, errors.New("failed to init config: the CenterApi configuration is missing")
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
var redis storage.Redis
redis, err = storage.NewRedis(config.Redis)
@@ -48,6 +43,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
//check CenterApi is default value
if len(config.CenterApi.Addrs) < 1 {
return nil, errors.New("failed to init config: the CenterApi configuration is missing")
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
syncStats := memsto.NewSyncStats()
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
@@ -75,7 +76,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache,
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)

View File

@@ -0,0 +1,29 @@
package cron
import (
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/storage"
"github.com/robfig/cron/v3"
)
const (
limitAlertRecordCountCron = "@every 1h"
)
func InitLimitAlertRecordCountCron(ctx *ctx.Context, redis *storage.Redis) error {
c := cron.New()
_, err := c.AddFunc(limitAlertRecordCountCron, func() {
process.LimitAlertRecordCount(ctx, redis)
})
if err != nil {
return err
}
c.Start()
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/ccfos/nightingale/v6/conf"
"gorm.io/gorm"
)

View File

@@ -32,8 +32,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
var redis storage.Redis
if config.Redis.Address != "" {
redis, err = storage.NewRedis(config.Redis)
@@ -41,6 +39,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
idents := idents.New(ctx, redis)
metas := metas.New(redis)

View File

@@ -2,12 +2,15 @@ package storage
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"
"github.com/ccfos/nightingale/v6/pkg/tlsx"
"github.com/redis/go-redis/v9"
"github.com/toolkits/pkg/logger"
)
@@ -135,3 +138,109 @@ func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
_, err := pipe.Exec(ctx)
return err
}
// LPush push value to list
func LPush(ctx context.Context, r Redis, key string, values ...interface{}) error {
_, err := r.LPush(ctx, key, values).Result()
if err != nil {
return err
}
return nil
}
func LTrim(ctx context.Context, r Redis, key string, start, stop int64) error {
return r.LTrim(ctx, key, start, stop).Err()
}
func Expire(ctx context.Context, r Redis, key string, expiration time.Duration) error {
return r.Expire(ctx, key, expiration).Err()
}
// MRangeList get multiple list from redis and unmarshal to []T
func MRangeList[T any](ctx context.Context, r Redis, keys []string) ([]T, error) {
pipe := r.Pipeline()
for _, k := range keys {
pipe.LRange(ctx, k, 0, -1)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}
var res []T
for i := range cmds {
if cmds[i].Err() != nil {
continue
}
val := cmds[i].(*redis.StringSliceCmd).Val()
for _, v := range val {
var temp T
err := json.Unmarshal([]byte(v), &temp)
if err != nil {
continue
}
res = append(res, temp)
}
}
return res, nil
}
func MLLen(ctx context.Context, r Redis, keys []string) (map[string]int64, error) {
pipe := r.Pipeline()
for _, key := range keys {
pipe.LLen(ctx, key)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}
res := make(map[string]int64)
for i, key := range keys {
cmd := cmds[i]
if errors.Is(cmd.Err(), redis.Nil) {
res[key] = 0
continue
}
if cmd.Err() != nil {
logger.Errorf("failed to get length of key: %s, err: %s", key, cmd.Err())
continue
}
res[key] = cmd.(*redis.IntCmd).Val()
}
return res, nil
}
func MTTL(ctx context.Context, r Redis, keys []string) (map[string]time.Duration, error) {
pipe := r.Pipeline()
for _, key := range keys {
pipe.TTL(ctx, key)
}
cmds, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}
res := make(map[string]time.Duration)
for i, key := range keys {
cmd := cmds[i]
if errors.Is(cmd.Err(), redis.Nil) {
continue
}
if cmd.Err() != nil {
logger.Errorf("failed to get ttl of key: %s, err: %s", key, cmd.Err())
continue
}
res[key] = cmd.(*redis.DurationCmd).Val()
}
return res, nil
}
func MDel(ctx context.Context, r Redis, keys ...string) error {
pipe := r.Pipeline()
for _, key := range keys {
pipe.Del(ctx, key)
}
_, err := pipe.Exec(ctx)
return err
}