mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
11 Commits
webhook-ba
...
eval-check
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab0756d98a | ||
|
|
c6ab3ad2b3 | ||
|
|
d050cf72e9 | ||
|
|
084cc1893e | ||
|
|
cd01123b59 | ||
|
|
23ce84d41c | ||
|
|
4764cc2419 | ||
|
|
da66401576 | ||
|
|
0024c9d99c | ||
|
|
96d3b48f10 | ||
|
|
6a0e7a810f |
@@ -32,6 +32,7 @@ type Alerting struct {
|
||||
Timeout int64
|
||||
TemplatesDir string
|
||||
NotifyConcurrency int
|
||||
WebhookBatchSend bool
|
||||
}
|
||||
|
||||
type CallPlugin struct {
|
||||
|
||||
@@ -41,14 +41,14 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
var redis storage.Redis
|
||||
redis, err = storage.NewRedis(config.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
alertStats := astats.NewSyncStats()
|
||||
|
||||
@@ -68,7 +68,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
|
||||
@@ -90,7 +90,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
|
||||
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, taskTplsCache *memsto.TaskTplCache, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
|
||||
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType) {
|
||||
promClients *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, redis *storage.Redis) {
|
||||
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
|
||||
recordingRuleCache := memsto.NewRecordingRuleCache(ctx, syncStats)
|
||||
targetsOfAlertRulesCache := memsto.NewTargetOfAlertRuleCache(ctx, alertc.Heartbeat.EngineName, syncStats)
|
||||
@@ -103,7 +103,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
|
||||
|
||||
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
|
||||
busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats)
|
||||
busiGroupCache, alertMuteCache, datasourceCache, promClients, tdendgineClients, naming, ctx, alertStats, redis)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, alertc.Alerting, ctx, alertStats)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)
|
||||
|
||||
@@ -264,8 +264,15 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
|
||||
// handle event callbacks
|
||||
e.SendCallbacks(rule, notifyTarget, event)
|
||||
|
||||
// handle ibex callbacks
|
||||
e.HandleIbex(rule, event)
|
||||
|
||||
// handle global webhooks
|
||||
sender.SendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats)
|
||||
if e.alerting.WebhookBatchSend {
|
||||
sender.BatchSendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats)
|
||||
} else {
|
||||
sender.SingleSendWebhooks(notifyTarget.ToWebhookList(), event, e.Astats)
|
||||
}
|
||||
|
||||
// handle plugin call
|
||||
go sender.MayPluginNotify(e.genNoticeBytes(event), e.notifyConfigCache.GetNotifyScript(), e.Astats)
|
||||
@@ -280,7 +287,7 @@ func (e *Dispatch) SendCallbacks(rule *models.AlertRule, notifyTarget *NotifyTar
|
||||
continue
|
||||
}
|
||||
|
||||
cbCtx := sender.BuildCallBackContext(e.ctx, urlStr, rule, []*models.AlertCurEvent{event}, uids, e.userCache, e.Astats)
|
||||
cbCtx := sender.BuildCallBackContext(e.ctx, urlStr, rule, []*models.AlertCurEvent{event}, uids, e.userCache, e.alerting.WebhookBatchSend, e.Astats)
|
||||
|
||||
if strings.HasPrefix(urlStr, "${ibex}") {
|
||||
e.CallBacks[models.IbexDomain].CallBack(cbCtx)
|
||||
@@ -318,6 +325,30 @@ func (e *Dispatch) SendCallbacks(rule *models.AlertRule, notifyTarget *NotifyTar
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEvent) {
|
||||
// 解析 RuleConfig 字段
|
||||
var ruleConfig struct {
|
||||
TaskTpls []*models.Tpl `json:"task_tpls"`
|
||||
}
|
||||
json.Unmarshal([]byte(rule.RuleConfig), &ruleConfig)
|
||||
|
||||
for _, t := range ruleConfig.TaskTpls {
|
||||
if t.TplId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if len(t.Host) == 0 {
|
||||
sender.CallIbex(e.ctx, t.TplId, event.TargetIdent,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
continue
|
||||
}
|
||||
for _, host := range t.Host {
|
||||
sender.CallIbex(e.ctx, t.TplId, host,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Notice struct {
|
||||
Event *models.AlertCurEvent `json:"event"`
|
||||
Tpls map[string]string `json:"tpls"`
|
||||
|
||||
@@ -79,6 +79,22 @@ func (s *NotifyTarget) ToCallbackList() []string {
|
||||
func (s *NotifyTarget) ToWebhookList() []*models.Webhook {
|
||||
webhooks := make([]*models.Webhook, 0, len(s.webhooks))
|
||||
for _, wh := range s.webhooks {
|
||||
if wh.Batch == 0 {
|
||||
wh.Batch = 1000
|
||||
}
|
||||
|
||||
if wh.Timeout == 0 {
|
||||
wh.Timeout = 10
|
||||
}
|
||||
|
||||
if wh.RetryCount == 0 {
|
||||
wh.RetryCount = 10
|
||||
}
|
||||
|
||||
if wh.RetryInterval == 0 {
|
||||
wh.RetryInterval = 10
|
||||
}
|
||||
|
||||
webhooks = append(webhooks, wh)
|
||||
}
|
||||
return webhooks
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
"github.com/ccfos/nightingale/v6/tdengine"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -40,12 +41,14 @@ type Scheduler struct {
|
||||
|
||||
ctx *ctx.Context
|
||||
stats *astats.Stats
|
||||
|
||||
redis *storage.Redis
|
||||
}
|
||||
|
||||
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType,
|
||||
targetCache *memsto.TargetCacheType, toarc *memsto.TargetsOfAlertRuleCacheType,
|
||||
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType,
|
||||
promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats) *Scheduler {
|
||||
promClients *prom.PromClientMap, tdengineClients *tdengine.TdengineClientMap, naming *naming.Naming, ctx *ctx.Context, stats *astats.Stats, redis *storage.Redis) *Scheduler {
|
||||
scheduler := &Scheduler{
|
||||
aconf: aconf,
|
||||
alertRules: make(map[string]*AlertRuleWorker),
|
||||
@@ -65,6 +68,8 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess
|
||||
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
|
||||
redis: redis,
|
||||
}
|
||||
|
||||
go scheduler.LoopSyncRules(context.Background())
|
||||
@@ -117,7 +122,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
logger.Debugf("datasource %d status is %s", dsId, ds.Status)
|
||||
continue
|
||||
}
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
|
||||
|
||||
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.tdengineClients, s.ctx)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
@@ -127,7 +132,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
if !naming.DatasourceHashRing.IsHit(s.aconf.Heartbeat.EngineName, strconv.FormatInt(rule.Id, 10), s.aconf.Heartbeat.Endpoint) {
|
||||
continue
|
||||
}
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, 0, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
|
||||
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.tdengineClients, s.ctx)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
} else {
|
||||
@@ -144,7 +149,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
logger.Debugf("datasource %d status is %s", dsId, ds.Status)
|
||||
continue
|
||||
}
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats)
|
||||
processor := process.NewProcessor(s.aconf.Heartbeat.EngineName, rule, dsId, s.alertRuleCache, s.targetCache, s.targetsOfAlertRuleCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.ctx, s.stats, s.redis)
|
||||
externalRuleWorkers[processor.Key()] = processor
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
"github.com/ccfos/nightingale/v6/tdengine"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
)
|
||||
@@ -105,6 +104,11 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
// logger.Errorf("rule_eval:%s rule not found", arw.Key())
|
||||
return
|
||||
}
|
||||
|
||||
if arw.processor == nil {
|
||||
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
|
||||
return
|
||||
}
|
||||
arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc()
|
||||
|
||||
typ := cachedRule.GetRuleType()
|
||||
@@ -120,12 +124,12 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
case models.LOKI:
|
||||
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
|
||||
default:
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidType, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
|
||||
return
|
||||
}
|
||||
|
||||
if arw.processor == nil {
|
||||
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
|
||||
return
|
||||
if len(anomalyPoints) == 0 {
|
||||
process.Record(arw.ctx, nil, nil, false, process.NoAnomalyPoint, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
|
||||
}
|
||||
|
||||
if arw.inhibit {
|
||||
@@ -146,6 +150,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
models.AlertCurEventDelByHash(arw.ctx, hash)
|
||||
|
||||
pointsMap[tagHash] = point
|
||||
process.Record(arw.ctx, &p, nil, true, process.RecoverMerge, cachedRule.Name, cachedRule.Id, arw.processor.Redis)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -178,12 +183,14 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
|
||||
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
|
||||
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return lst
|
||||
}
|
||||
|
||||
if rule == nil {
|
||||
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.RuleNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return lst
|
||||
}
|
||||
|
||||
@@ -197,12 +204,14 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
|
||||
if promql == "" {
|
||||
logger.Warningf("rule_eval:%s promql is blank", arw.Key())
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), CHECK_QUERY).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidQuery, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
if arw.promClients.IsNil(arw.datasourceId) {
|
||||
logger.Warningf("rule_eval:%s error reader client is nil", arw.Key())
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_CLIENT).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.EmptyPromClient, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -215,6 +224,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
|
||||
logger.Errorf("rule_eval:%s promql:%s, error:%v", arw.Key(), promql, err)
|
||||
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.QueryError(promql, err), arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -225,6 +235,9 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
|
||||
}
|
||||
|
||||
logger.Debugf("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)
|
||||
process.Record(arw.ctx, nil, nil, false,
|
||||
process.QueryRecord(fmt.Sprintf("rule_eval:%s query:%+v, value:%v", arw.Key(), query, value)),
|
||||
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
points := common.ConvertAnomalyPoints(value)
|
||||
for i := 0; i < len(points); i++ {
|
||||
points[i].Severity = query.Severity
|
||||
@@ -243,6 +256,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
|
||||
if ruleConfig == "" {
|
||||
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
@@ -252,6 +266,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
|
||||
logger.Warningf("rule_eval:%d promql parse error:%s", rule.Id, err.Error())
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId())).Inc()
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidQuery, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
@@ -267,6 +282,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
|
||||
logger.Warningf("rule_eval:%d tdengine client is nil", rule.Id)
|
||||
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_CLIENT).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.EmptyPromClient, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -276,15 +292,19 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
|
||||
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
|
||||
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.QueryError(query, err), arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
|
||||
MakeSeriesMap(series, seriesTagIndex, seriesStore)
|
||||
process.Record(arw.ctx, nil, nil, false,
|
||||
process.QueryRecord(fmt.Sprintf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)),
|
||||
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
}
|
||||
|
||||
points, recoverPoints = GetAnomalyPoint(rule.Id, ruleQuery, seriesTagIndex, seriesStore)
|
||||
points, recoverPoints = GetAnomalyPoint(arw, rule.Id, ruleQuery, seriesTagIndex, seriesStore)
|
||||
}
|
||||
|
||||
return points, recoverPoints
|
||||
@@ -298,12 +318,14 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
|
||||
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
|
||||
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.InvalidRuleConfig, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return lst
|
||||
}
|
||||
|
||||
if rule == nil {
|
||||
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.RuleNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
return lst
|
||||
}
|
||||
|
||||
@@ -338,6 +360,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
|
||||
idents = append(idents, engineIdents...)
|
||||
|
||||
if len(idents) == 0 {
|
||||
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -370,6 +393,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval:%s targets not found", arw.Key())
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -424,6 +448,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval:%s targets not found", arw.Key())
|
||||
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
|
||||
process.Record(arw.ctx, nil, nil, false, process.TargetNotFound, arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -444,7 +469,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
|
||||
return lst
|
||||
}
|
||||
|
||||
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
|
||||
func GetAnomalyPoint(arw *AlertRuleWorker, ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
|
||||
points := []common.AnomalyPoint{}
|
||||
recoverPoints := []common.AnomalyPoint{}
|
||||
|
||||
@@ -484,6 +509,9 @@ func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndex ma
|
||||
isTriggered := parser.Calc(trigger.Exp, m)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
|
||||
process.Record(arw.ctx, nil, nil, false,
|
||||
process.QueryRecord(fmt.Sprintf("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)),
|
||||
arw.rule.Name, arw.rule.Id, arw.processor.Redis)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
|
||||
266
alert/process/alert_record.go
Normal file
266
alert/process/alert_record.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package process
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var AlertRecordRedisKey = "alert::%d::%d::%d"
|
||||
var AlertRecordMaxCount int64 = 100000
|
||||
var defaultMaxLengthForList int64 = 1000
|
||||
var defaultTimeDurationForList = 24 * time.Hour
|
||||
|
||||
// AlertRecordCount 内存中维护 alert record 的 key 以及对应 list 的长度
|
||||
var AlertRecordCount sync.Map
|
||||
|
||||
// InitAlertRecordCount init alert record count map
|
||||
func InitAlertRecordCount(ctx *ctx.Context, redis *storage.Redis) error {
|
||||
alertRules, err := models.AlertRuleGetsAll(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// 启动时构造 24 小时内所有 alertRecord 的 key,并从 redis 中查询长度
|
||||
now := time.Now()
|
||||
keys := make([]string, 0)
|
||||
for i := 0; i < 24; i++ {
|
||||
for _, rule := range alertRules {
|
||||
key := fmt.Sprintf(AlertRecordRedisKey, rule.Id, now.Day(), now.Hour())
|
||||
keys = append(keys, key)
|
||||
}
|
||||
now = now.Add(-time.Hour)
|
||||
}
|
||||
|
||||
arc, err := storage.MLLen(ctx.GetContext(), *redis, keys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
AlertRecordCount = *mapToSyncMap(arc)
|
||||
return nil
|
||||
}
|
||||
|
||||
func mapToSyncMap(m map[string]int64) *sync.Map {
|
||||
var sm sync.Map
|
||||
for k, v := range m {
|
||||
sm.Store(k, v)
|
||||
}
|
||||
return &sm
|
||||
}
|
||||
|
||||
type NoEventReason string
|
||||
|
||||
const (
|
||||
NoAnomalyPoint NoEventReason = "no anomaly point for this alert"
|
||||
InvalidType NoEventReason = "rule type not support"
|
||||
InvalidRuleConfig NoEventReason = "rule config invalid"
|
||||
InvalidQuery NoEventReason = "invalid promql query"
|
||||
EmptyPromClient NoEventReason = "empty prom client"
|
||||
QueryErr NoEventReason = "query error, query: %+v, err: %s"
|
||||
TargetNotFound NoEventReason = "targets not found"
|
||||
EventRecovered NoEventReason = "event recovered"
|
||||
RecoverMerge NoEventReason = "recover event merged"
|
||||
RecoverDuration NoEventReason = "within recover duration"
|
||||
RuleNotFound NoEventReason = "rule not found"
|
||||
IsInhibit NoEventReason = "alert is inhibited by high priority"
|
||||
Muted NoEventReason = "alert is muted, detail: %s"
|
||||
MutedByHook NoEventReason = "alert is muted by hook"
|
||||
FullQueue NoEventReason = "alert queue is full"
|
||||
Interval NoEventReason = "fail to reach alert interval"
|
||||
RepeatStep NoEventReason = "fail to reach repeat step"
|
||||
NotifyNumber NoEventReason = "reach max notify number"
|
||||
ValueRecord NoEventReason = "this is a debug record instead of no even reason, "
|
||||
)
|
||||
|
||||
type AlertRecord struct {
|
||||
AlertName string `json:"alert_name"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
ReasonForNoEvent NoEventReason `json:"reason_for_no_event"`
|
||||
IsRecovery bool `json:"is_recovery"`
|
||||
Labels string `json:"labels"`
|
||||
Query string `json:"query"`
|
||||
Values string `json:"values"`
|
||||
}
|
||||
|
||||
func (ar *AlertRecord) MarshalBinary() (data []byte, err error) {
|
||||
return json.Marshal(ar)
|
||||
}
|
||||
|
||||
type AlertRecordRedis struct {
|
||||
Key string `json:"key"`
|
||||
Value *AlertRecord `json:"value"`
|
||||
ExpireDuration time.Duration `json:"expire_duration"`
|
||||
MaxLength int64 `json:"max_length"`
|
||||
}
|
||||
|
||||
func Record(ctx *ctx.Context, point *common.AnomalyPoint, event *models.AlertCurEvent, isRecovery bool, reason NoEventReason, ruleName string, ruleID int64, redis *storage.Redis) {
|
||||
// 开始调度后,告警检测有 4 种情况
|
||||
// 1. 没有生成 point
|
||||
// 2. 生成了 point 但没有生成 event
|
||||
// 3. 生成了 event 但没有投递到队列
|
||||
// 4. 生成了 event 且投递到队列
|
||||
|
||||
var ar AlertRecord
|
||||
|
||||
ar.ReasonForNoEvent = reason
|
||||
ar.IsRecovery = isRecovery
|
||||
ar.AlertName = ruleName
|
||||
now := time.Now()
|
||||
key := fmt.Sprintf(AlertRecordRedisKey, ruleID, now.Day(), now.Hour())
|
||||
|
||||
if event != nil {
|
||||
// 生成了 event 但没有投递
|
||||
ar.CreateAt = event.TriggerTime
|
||||
ar.Labels = event.Tags
|
||||
ar.Query = event.PromQl
|
||||
ar.Values = event.TriggerValue
|
||||
|
||||
} else if point != nil {
|
||||
// 生成了 point 但没有生成 event
|
||||
ar.CreateAt = point.Timestamp
|
||||
ar.Labels = point.Labels.String()
|
||||
ar.Query = point.Query
|
||||
ar.Values = point.ReadableValue()
|
||||
return
|
||||
} else {
|
||||
// 没有生成 point
|
||||
}
|
||||
|
||||
if !ctx.IsCenter {
|
||||
err := poster.PostByUrls(ctx, "/v1/n9e/redis/lpush", AlertRecordRedis{
|
||||
Key: key,
|
||||
Value: &ar,
|
||||
ExpireDuration: defaultTimeDurationForList,
|
||||
MaxLength: defaultMaxLengthForList,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Errorf("fail to forward alert record: %v, err:%s", ar, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
err := PushAlertRecord(ctx, redis, key, &ar)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to push alert record: %v, err:%s", ar, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func PushAlertRecord(ctx *ctx.Context, redis *storage.Redis, key string, ar *AlertRecord) error {
|
||||
err := storage.LPush(ctx.GetContext(), *redis, key, ar)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var count int64
|
||||
val, ok := AlertRecordCount.Load(key)
|
||||
if !ok || val.(int64) == 0 {
|
||||
err := storage.Expire(ctx.GetContext(), *redis, key, defaultTimeDurationForList)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to set expire time for alert record, key :%s, err:%s", key, err.Error())
|
||||
return err
|
||||
}
|
||||
AlertRecordCount.Store(key, int64(1))
|
||||
} else {
|
||||
count = val.(int64) + 1
|
||||
AlertRecordCount.Store(key, count)
|
||||
}
|
||||
|
||||
if count > defaultMaxLengthForList {
|
||||
err := storage.LTrim(ctx.GetContext(), *redis, key, 0, defaultMaxLengthForList-1)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to trim alert record, key :%s, err:%s", key, err.Error())
|
||||
return err
|
||||
}
|
||||
AlertRecordCount.Store(key, defaultMaxLengthForList)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func AlertMutedReason(detail string) NoEventReason {
|
||||
return NoEventReason(fmt.Sprintf(string(Muted), detail))
|
||||
}
|
||||
|
||||
func QueryError(query interface{}, err error) NoEventReason {
|
||||
return NoEventReason(fmt.Sprintf(string(QueryErr), query, err.Error()))
|
||||
}
|
||||
|
||||
func QueryRecord(info string) NoEventReason {
|
||||
return NoEventReason(fmt.Sprintf(string(ValueRecord) + info))
|
||||
}
|
||||
|
||||
type keyAndLen struct {
|
||||
key string
|
||||
length int64
|
||||
}
|
||||
|
||||
// LimitAlertRecordCount drop keys when alert record's count exceed AlertRecordMaxCount
|
||||
func LimitAlertRecordCount(ctx *ctx.Context, redis *storage.Redis) {
|
||||
var keys []string
|
||||
// 查出内存中维护的所有 key 对应的过期时间以及 list 长度
|
||||
AlertRecordCount.Range(func(k, v interface{}) bool {
|
||||
keys = append(keys, k.(string))
|
||||
return true
|
||||
})
|
||||
|
||||
kal := make(map[int][]keyAndLen)
|
||||
var count int64
|
||||
kToLen, err := storage.MLLen(ctx.GetContext(), *redis, keys)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keys, err.Error())
|
||||
return
|
||||
}
|
||||
kToTTL, err := storage.MTTL(ctx.GetContext(), *redis, keys)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keys, err.Error())
|
||||
return
|
||||
}
|
||||
// 按照过期时间将 key 分组
|
||||
for k, v := range kToTTL {
|
||||
l := kToLen[k]
|
||||
if v < 0 {
|
||||
// 不存在/已经过期的 key 不再维护
|
||||
AlertRecordCount.Delete(k)
|
||||
}
|
||||
if l == 0 || v < 0 {
|
||||
continue
|
||||
}
|
||||
hour := int(v.Hours())
|
||||
if _, ok := kal[hour]; !ok {
|
||||
kal[hour] = make([]keyAndLen, 0)
|
||||
}
|
||||
count += l
|
||||
kal[hour] = append(kal[hour], keyAndLen{key: k, length: l})
|
||||
}
|
||||
|
||||
if count <= AlertRecordMaxCount {
|
||||
return
|
||||
}
|
||||
// 如果阈值超过上限,以小时为粒度依次淘汰 key
|
||||
keyDel := make([]string, 0)
|
||||
for i := 0; i < 24; i++ {
|
||||
if count <= AlertRecordMaxCount {
|
||||
break
|
||||
}
|
||||
if len(kal[i]) == 0 {
|
||||
continue
|
||||
}
|
||||
for j := 0; j < len(kal[i]); j++ {
|
||||
keyDel = append(keyDel, kal[i][j].key)
|
||||
count -= kal[i][j].length
|
||||
}
|
||||
}
|
||||
|
||||
err = storage.MDel(ctx.GetContext(), *redis, keyDel...)
|
||||
if err != nil {
|
||||
logger.Errorf("fail to limit alert record's count, keys: %v, err:%s", keyDel, err.Error())
|
||||
}
|
||||
for i := range keyDel {
|
||||
AlertRecordCount.Delete(keyDel[i])
|
||||
}
|
||||
}
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -77,6 +78,8 @@ type Processor struct {
|
||||
HandleFireEventHook HandleEventFunc
|
||||
HandleRecoverEventHook HandleEventFunc
|
||||
EventMuteHook EventMuteHookFunc
|
||||
|
||||
Redis *storage.Redis
|
||||
}
|
||||
|
||||
func (p *Processor) Key() string {
|
||||
@@ -99,7 +102,7 @@ func (p *Processor) Hash() string {
|
||||
func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64, alertRuleCache *memsto.AlertRuleCacheType,
|
||||
targetCache *memsto.TargetCacheType, targetsOfAlertRuleCache *memsto.TargetsOfAlertRuleCacheType,
|
||||
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context,
|
||||
stats *astats.Stats) *Processor {
|
||||
stats *astats.Stats, redis *storage.Redis) *Processor {
|
||||
|
||||
p := &Processor{
|
||||
EngineName: engineName,
|
||||
@@ -119,6 +122,7 @@ func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64,
|
||||
HandleFireEventHook: func(event *models.AlertCurEvent) {},
|
||||
HandleRecoverEventHook: func(event *models.AlertCurEvent) {},
|
||||
EventMuteHook: func(event *models.AlertCurEvent) bool { return false },
|
||||
Redis: redis,
|
||||
}
|
||||
|
||||
p.mayHandleGroup()
|
||||
@@ -134,6 +138,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
|
||||
if cachedRule == nil {
|
||||
logger.Errorf("rule not found %+v", anomalyPoints)
|
||||
p.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", p.DatasourceId()), "handle_event").Inc()
|
||||
Record(p.ctx, nil, nil, false, RuleNotFound, p.rule.Name, p.rule.Id, p.Redis)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -152,12 +157,14 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
|
||||
if isMuted {
|
||||
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
|
||||
logger.Debugf("rule_eval:%s event:%v is muted, detail:%s", p.Key(), event, detail)
|
||||
Record(p.ctx, nil, event, false, AlertMutedReason(detail), p.rule.Name, p.rule.Id, p.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
if p.EventMuteHook(event) {
|
||||
p.Stats.CounterMuteTotal.WithLabelValues(event.GroupName).Inc()
|
||||
logger.Debugf("rule_eval:%s event:%v is muted by hook", p.Key(), event)
|
||||
Record(p.ctx, nil, event, false, MutedByHook, p.rule.Name, p.rule.Id, p.Redis)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -232,17 +239,17 @@ func Relabel(rule *models.AlertRule, event *models.AlertCurEvent) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(rule.EventRelabelConfig) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// need to keep the original label
|
||||
event.OriginalTags = event.Tags
|
||||
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
|
||||
|
||||
labels := make([]prompb.Label, len(event.TagsJSON))
|
||||
for i, tag := range event.TagsJSON {
|
||||
label := strings.Split(tag, "=")
|
||||
if len(label) != 2 {
|
||||
logger.Errorf("event%+v relabel: the label length is not 2:%v", event, label)
|
||||
continue
|
||||
}
|
||||
label := strings.SplitN(tag, "=", 2)
|
||||
event.OriginalTagsJSON[i] = tag
|
||||
labels[i] = prompb.Label{Name: label[0], Value: label[1]}
|
||||
}
|
||||
@@ -277,6 +284,8 @@ func (p *Processor) HandleRecover(alertingKeys map[string]struct{}, now int64, i
|
||||
if _, has := alertingKeys[hash]; has {
|
||||
continue
|
||||
}
|
||||
event, _ := p.pendings.Get(hash)
|
||||
Record(p.ctx, nil, event, true, EventRecovered, p.rule.Name, p.rule.Id, p.Redis)
|
||||
p.pendings.Delete(hash)
|
||||
}
|
||||
|
||||
@@ -324,6 +333,7 @@ func (p *Processor) HandleRecoverEvent(hashArr []string, now int64, inhibit bool
|
||||
p.pendings.Delete(e.Hash)
|
||||
models.AlertCurEventDelByHash(p.ctx, e.Hash)
|
||||
eventMap[event.Tags] = *event
|
||||
Record(p.ctx, nil, &e, true, IsInhibit, p.rule.Name, p.rule.Id, p.Redis)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +355,7 @@ func (p *Processor) RecoverSingle(hash string, now int64, value *string, values
|
||||
// 如果配置了留观时长,就不能立马恢复了
|
||||
if cachedRule.RecoverDuration > 0 && now-event.LastEvalTime < cachedRule.RecoverDuration {
|
||||
logger.Debugf("rule_eval:%s event:%v not recover", p.Key(), event)
|
||||
Record(p.ctx, nil, event, true, RecoverDuration, p.rule.Name, p.rule.Id, p.Redis)
|
||||
return
|
||||
}
|
||||
if value != nil {
|
||||
@@ -401,6 +412,8 @@ func (p *Processor) handleEvent(events []*models.AlertCurEvent) {
|
||||
severity = event.Severity
|
||||
}
|
||||
continue
|
||||
} else {
|
||||
Record(p.ctx, nil, event, false, Interval, p.rule.Name, p.rule.Id, p.Redis)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -411,6 +424,7 @@ func (p *Processor) inhibitEvent(events []*models.AlertCurEvent, highSeverity in
|
||||
for _, event := range events {
|
||||
if p.inhibit && event.Severity > highSeverity {
|
||||
logger.Debugf("rule_eval:%s event:%+v inhibit highSeverity:%d", p.Key(), event, highSeverity)
|
||||
Record(p.ctx, nil, event, false, IsInhibit, p.rule.Name, p.rule.Id, p.Redis)
|
||||
continue
|
||||
}
|
||||
p.fireEvent(event)
|
||||
@@ -421,6 +435,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
|
||||
// As p.rule maybe outdated, use rule from cache
|
||||
cachedRule := p.rule
|
||||
if cachedRule == nil {
|
||||
Record(p.ctx, nil, event, false, RuleNotFound, p.rule.Name, p.rule.Id, p.Redis)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -434,6 +449,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
|
||||
logger.Debugf("rule_eval:%s event:%+v repeat is zero nothing to do", p.Key(), event)
|
||||
// 说明不想重复通知,那就直接返回了,nothing to do
|
||||
// do not need to send alert again
|
||||
Record(p.ctx, nil, event, false, RepeatStep, p.rule.Name, p.rule.Id, p.Redis)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -447,12 +463,15 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
|
||||
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
|
||||
if fired.NotifyCurNumber >= cachedRule.NotifyMaxNumber {
|
||||
logger.Debugf("rule_eval:%s event:%+v reach max number", p.Key(), event)
|
||||
Record(p.ctx, nil, event, false, NotifyNumber, p.rule.Name, p.rule.Id, p.Redis)
|
||||
return
|
||||
} else {
|
||||
event.NotifyCurNumber = fired.NotifyCurNumber + 1
|
||||
p.pushEventToQueue(event)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Record(p.ctx, nil, event, false, RepeatStep, p.rule.Name, p.rule.Id, p.Redis)
|
||||
}
|
||||
} else {
|
||||
event.NotifyCurNumber = 1
|
||||
@@ -472,6 +491,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
|
||||
if !queue.EventQueue.PushFront(e) {
|
||||
logger.Warningf("event_push_queue: queue is full, event:%+v", e)
|
||||
p.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", p.DatasourceId()), "push_event_queue").Inc()
|
||||
Record(p.ctx, nil, e, e.IsRecovered, FullQueue, p.rule.Name, p.rule.Id, p.Redis)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,13 +29,14 @@ type (
|
||||
Rule *models.AlertRule
|
||||
Events []*models.AlertCurEvent
|
||||
Stats *astats.Stats
|
||||
BatchSend bool
|
||||
}
|
||||
|
||||
DefaultCallBacker struct{}
|
||||
)
|
||||
|
||||
func BuildCallBackContext(ctx *ctx.Context, callBackURL string, rule *models.AlertRule, events []*models.AlertCurEvent,
|
||||
uids []int64, userCache *memsto.UserCacheType, stats *astats.Stats) CallBackContext {
|
||||
uids []int64, userCache *memsto.UserCacheType, batchSend bool, stats *astats.Stats) CallBackContext {
|
||||
users := userCache.GetByUserIds(uids)
|
||||
|
||||
newCallBackUrl, _ := events[0].ParseURL(callBackURL)
|
||||
@@ -45,6 +46,7 @@ func BuildCallBackContext(ctx *ctx.Context, callBackURL string, rule *models.Ale
|
||||
Rule: rule,
|
||||
Events: events,
|
||||
Users: users,
|
||||
BatchSend: batchSend,
|
||||
Stats: stats,
|
||||
}
|
||||
}
|
||||
@@ -112,6 +114,21 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
|
||||
|
||||
event := ctx.Events[0]
|
||||
|
||||
if ctx.BatchSend {
|
||||
webhookConf := &models.Webhook{
|
||||
Type: models.RuleCallback,
|
||||
Enable: true,
|
||||
Url: ctx.CallBackURL,
|
||||
Timeout: 5,
|
||||
RetryCount: 3,
|
||||
RetryInterval: 10,
|
||||
Batch: 1000,
|
||||
}
|
||||
|
||||
PushCallbackEvent(webhookConf, event, ctx.Stats)
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
|
||||
resp, code, err := poster.PostJSON(ctx.CallBackURL, 5*time.Second, event, 3)
|
||||
if err != nil {
|
||||
@@ -140,3 +157,27 @@ type TaskCreateReply struct {
|
||||
Err string `json:"err"`
|
||||
Dat int64 `json:"dat"` // task.id
|
||||
}
|
||||
|
||||
func PushCallbackEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
CallbackEventQueueLock.RLock()
|
||||
queue := CallbackEventQueue[webhook.Url]
|
||||
CallbackEventQueueLock.RUnlock()
|
||||
|
||||
if queue == nil {
|
||||
queue = &WebhookQueue{
|
||||
list: NewSafeListLimited(QueueMaxSize),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
CallbackEventQueueLock.Lock()
|
||||
CallbackEventQueue[webhook.Url] = queue
|
||||
CallbackEventQueueLock.Unlock()
|
||||
|
||||
StartConsumer(queue, webhook.Batch, webhook, stats)
|
||||
}
|
||||
|
||||
succ := queue.list.PushFront(event)
|
||||
if !succ {
|
||||
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,15 +77,20 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
|
||||
return
|
||||
}
|
||||
|
||||
tpl := c.taskTplCache.Get(id)
|
||||
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event)
|
||||
}
|
||||
|
||||
func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
|
||||
tpl := taskTplCache.Get(id)
|
||||
if tpl == nil {
|
||||
logger.Errorf("event_callback_ibex: no such tpl(%d)", id)
|
||||
return
|
||||
}
|
||||
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := canDoIbex(tpl.UpdateBy, tpl, host, c.targetCache, c.userCache)
|
||||
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
|
||||
return
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
@@ -14,14 +15,19 @@ import (
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func sendWebhook(webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) bool {
|
||||
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) bool {
|
||||
channel := "webhook"
|
||||
if webhook.Type == models.RuleCallback {
|
||||
channel = "callback"
|
||||
}
|
||||
|
||||
conf := webhook
|
||||
if conf.Url == "" || !conf.Enable {
|
||||
return false
|
||||
}
|
||||
bs, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("alertingWebhook failed to marshal event:%+v err:%v", event, err)
|
||||
logger.Errorf("%s alertingWebhook failed to marshal event:%+v err:%v", channel, event, err)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -29,7 +35,7 @@ func sendWebhook(webhook *models.Webhook, event *models.AlertCurEvent, stats *as
|
||||
|
||||
req, err := http.NewRequest("POST", conf.Url, bf)
|
||||
if err != nil {
|
||||
logger.Warningf("alertingWebhook failed to new reques event:%+v err:%v", event, err)
|
||||
logger.Warningf("%s alertingWebhook failed to new reques event:%s err:%v", channel, string(bs), err)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -58,12 +64,12 @@ func sendWebhook(webhook *models.Webhook, event *models.AlertCurEvent, stats *as
|
||||
},
|
||||
}
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues("webhook").Inc()
|
||||
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
|
||||
var resp *http.Response
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues("webhook").Inc()
|
||||
logger.Errorf("event_webhook_fail, ruleId: [%d], eventId: [%d], event:%+v, url: [%s], error: [%s]", event.RuleId, event.Id, event, conf.Url, err)
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
|
||||
logger.Errorf("event_%s_fail, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, err)
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -74,15 +80,15 @@ func sendWebhook(webhook *models.Webhook, event *models.AlertCurEvent, stats *as
|
||||
}
|
||||
|
||||
if resp.StatusCode == 429 {
|
||||
logger.Errorf("event_webhook_fail, url: %s, response code: %d, body: %s event:%+v", conf.Url, resp.StatusCode, string(body), event)
|
||||
logger.Errorf("event_%s_fail, url: %s, response code: %d, body: %s event:%s", channel, conf.Url, resp.StatusCode, string(body), string(bs))
|
||||
return true
|
||||
}
|
||||
|
||||
logger.Debugf("event_webhook_succ, url: %s, response code: %d, body: %s event:%+v", conf.Url, resp.StatusCode, string(body), event)
|
||||
logger.Debugf("event_%s_succ, url: %s, response code: %d, body: %s event:%s", channel, conf.Url, resp.StatusCode, string(body), string(bs))
|
||||
return false
|
||||
}
|
||||
|
||||
func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
func SingleSendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
for _, conf := range webhooks {
|
||||
retryCount := 0
|
||||
for retryCount < 3 {
|
||||
@@ -95,3 +101,73 @@ func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BatchSendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
for _, conf := range webhooks {
|
||||
logger.Infof("push event:%+v to queue:%v", event, conf)
|
||||
PushEvent(conf, event, stats)
|
||||
}
|
||||
}
|
||||
|
||||
var EventQueue = make(map[string]*WebhookQueue)
|
||||
var CallbackEventQueue = make(map[string]*WebhookQueue)
|
||||
var CallbackEventQueueLock sync.RWMutex
|
||||
var EventQueueLock sync.RWMutex
|
||||
|
||||
const QueueMaxSize = 100000
|
||||
|
||||
type WebhookQueue struct {
|
||||
list *SafeListLimited
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func PushEvent(webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
EventQueueLock.RLock()
|
||||
queue := EventQueue[webhook.Url]
|
||||
EventQueueLock.RUnlock()
|
||||
|
||||
if queue == nil {
|
||||
queue = &WebhookQueue{
|
||||
list: NewSafeListLimited(QueueMaxSize),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
EventQueueLock.Lock()
|
||||
EventQueue[webhook.Url] = queue
|
||||
EventQueueLock.Unlock()
|
||||
|
||||
StartConsumer(queue, webhook.Batch, webhook, stats)
|
||||
}
|
||||
|
||||
succ := queue.list.PushFront(event)
|
||||
if !succ {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues("push_event_queue").Inc()
|
||||
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
|
||||
}
|
||||
}
|
||||
|
||||
func StartConsumer(queue *WebhookQueue, popSize int, webhook *models.Webhook, stats *astats.Stats) {
|
||||
for {
|
||||
select {
|
||||
case <-queue.closeCh:
|
||||
logger.Infof("event queue:%v closed", queue)
|
||||
return
|
||||
default:
|
||||
events := queue.list.PopBack(popSize)
|
||||
if len(events) == 0 {
|
||||
time.Sleep(time.Millisecond * 400)
|
||||
continue
|
||||
}
|
||||
|
||||
retryCount := 0
|
||||
for retryCount < webhook.RetryCount {
|
||||
needRetry := sendWebhook(webhook, events, stats)
|
||||
if !needRetry {
|
||||
break
|
||||
}
|
||||
retryCount++
|
||||
time.Sleep(time.Second * time.Duration(webhook.RetryInterval) * time.Duration(retryCount))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
111
alert/sender/webhook_queue.go
Normal file
111
alert/sender/webhook_queue.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package sender
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
)
|
||||
|
||||
type SafeList struct {
|
||||
sync.RWMutex
|
||||
L *list.List
|
||||
}
|
||||
|
||||
func NewSafeList() *SafeList {
|
||||
return &SafeList{L: list.New()}
|
||||
}
|
||||
|
||||
func (sl *SafeList) PushFront(v interface{}) *list.Element {
|
||||
sl.Lock()
|
||||
e := sl.L.PushFront(v)
|
||||
sl.Unlock()
|
||||
return e
|
||||
}
|
||||
|
||||
func (sl *SafeList) PushFrontBatch(vs []interface{}) {
|
||||
sl.Lock()
|
||||
for _, item := range vs {
|
||||
sl.L.PushFront(item)
|
||||
}
|
||||
sl.Unlock()
|
||||
}
|
||||
|
||||
func (sl *SafeList) PopBack(max int) []*models.AlertCurEvent {
|
||||
sl.Lock()
|
||||
|
||||
count := sl.L.Len()
|
||||
if count == 0 {
|
||||
sl.Unlock()
|
||||
return []*models.AlertCurEvent{}
|
||||
}
|
||||
|
||||
if count > max {
|
||||
count = max
|
||||
}
|
||||
|
||||
items := make([]*models.AlertCurEvent, 0, count)
|
||||
for i := 0; i < count; i++ {
|
||||
item := sl.L.Remove(sl.L.Back())
|
||||
sample, ok := item.(*models.AlertCurEvent)
|
||||
if ok {
|
||||
items = append(items, sample)
|
||||
}
|
||||
}
|
||||
|
||||
sl.Unlock()
|
||||
return items
|
||||
}
|
||||
|
||||
func (sl *SafeList) RemoveAll() {
|
||||
sl.Lock()
|
||||
sl.L.Init()
|
||||
sl.Unlock()
|
||||
}
|
||||
|
||||
func (sl *SafeList) Len() int {
|
||||
sl.RLock()
|
||||
size := sl.L.Len()
|
||||
sl.RUnlock()
|
||||
return size
|
||||
}
|
||||
|
||||
// SafeList with Limited Size
|
||||
type SafeListLimited struct {
|
||||
maxSize int
|
||||
SL *SafeList
|
||||
}
|
||||
|
||||
func NewSafeListLimited(maxSize int) *SafeListLimited {
|
||||
return &SafeListLimited{SL: NewSafeList(), maxSize: maxSize}
|
||||
}
|
||||
|
||||
func (sll *SafeListLimited) PopBack(max int) []*models.AlertCurEvent {
|
||||
return sll.SL.PopBack(max)
|
||||
}
|
||||
|
||||
func (sll *SafeListLimited) PushFront(v interface{}) bool {
|
||||
if sll.SL.Len() >= sll.maxSize {
|
||||
return false
|
||||
}
|
||||
|
||||
sll.SL.PushFront(v)
|
||||
return true
|
||||
}
|
||||
|
||||
func (sll *SafeListLimited) PushFrontBatch(vs []interface{}) bool {
|
||||
if sll.SL.Len() >= sll.maxSize {
|
||||
return false
|
||||
}
|
||||
|
||||
sll.SL.PushFrontBatch(vs)
|
||||
return true
|
||||
}
|
||||
|
||||
func (sll *SafeListLimited) RemoveAll() {
|
||||
sll.SL.RemoveAll()
|
||||
}
|
||||
|
||||
func (sll *SafeListLimited) Len() int {
|
||||
return sll.SL.Len()
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
centerrt "github.com/ccfos/nightingale/v6/center/router"
|
||||
"github.com/ccfos/nightingale/v6/center/sso"
|
||||
"github.com/ccfos/nightingale/v6/conf"
|
||||
"github.com/ccfos/nightingale/v6/cron"
|
||||
"github.com/ccfos/nightingale/v6/dumper"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
@@ -60,9 +61,20 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var redis storage.Redis
|
||||
redis, err = storage.NewRedis(config.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), db, true)
|
||||
migrate.Migrate(db)
|
||||
models.InitRoot(ctx)
|
||||
err = cron.InitLimitAlertRecordCountCron(ctx, &redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config.HTTP.JWTAuth.SigningKey = models.InitJWTSigningKey(ctx)
|
||||
|
||||
@@ -72,11 +84,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
}
|
||||
|
||||
integration.Init(ctx, config.Center.BuiltinIntegrationsDir)
|
||||
var redis storage.Redis
|
||||
redis, err = storage.NewRedis(config.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metas := metas.New(redis)
|
||||
idents := idents.New(ctx, redis)
|
||||
@@ -100,7 +107,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
|
||||
process.InitAlertRecordCount(ctx, &redis)
|
||||
|
||||
writers := writer.NewWriters(config.Pushgw)
|
||||
|
||||
|
||||
@@ -315,6 +315,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/busi-group/:id/alert-rules", rt.auth(), rt.user(), rt.perm("/alert-rules"), rt.alertRuleGets)
|
||||
pages.POST("/busi-group/:id/alert-rules", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.bgrw(), rt.alertRuleAddByFE)
|
||||
pages.POST("/busi-group/:id/alert-rules/import", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.bgrw(), rt.alertRuleAddByImport)
|
||||
pages.POST("/busi-group/:id/alert-rules/import-prom-rule", rt.auth(),
|
||||
rt.user(), rt.perm("/alert-rules/add"), rt.bgrw(), rt.alertRuleAddByImportPromRule)
|
||||
pages.DELETE("/busi-group/:id/alert-rules", rt.auth(), rt.user(), rt.perm("/alert-rules/del"), rt.bgrw(), rt.alertRuleDel)
|
||||
pages.PUT("/busi-group/:id/alert-rules/fields", rt.auth(), rt.user(), rt.perm("/alert-rules/put"), rt.bgrw(), rt.alertRulePutFields)
|
||||
pages.PUT("/busi-group/:id/alert-rule/:arid", rt.auth(), rt.user(), rt.perm("/alert-rules/put"), rt.alertRulePutByFE)
|
||||
@@ -322,6 +324,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/alert-rule/:arid/pure", rt.auth(), rt.user(), rt.perm("/alert-rules"), rt.alertRulePureGet)
|
||||
pages.PUT("/busi-group/alert-rule/validate", rt.auth(), rt.user(), rt.perm("/alert-rules/put"), rt.alertRuleValidation)
|
||||
pages.POST("/relabel-test", rt.auth(), rt.user(), rt.relabelTest)
|
||||
pages.GET("/alert-record/list", rt.auth(), rt.user(), rt.perm("/alert-rules"), rt.getRuleRecords)
|
||||
|
||||
pages.GET("/busi-groups/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGetsByGids)
|
||||
pages.GET("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGets)
|
||||
@@ -551,6 +554,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
service.GET("/targets-of-alert-rule", rt.targetsOfAlertRule)
|
||||
|
||||
service.POST("/redis/lpush", rt.redisLPushAlertRecord)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/ccfos/nightingale/v6/alert/process"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
@@ -125,6 +128,25 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(reterr, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertRuleAddByImportPromRule(c *gin.Context) {
|
||||
username := c.MustGet("username").(string)
|
||||
|
||||
type PromRule struct {
|
||||
Groups []models.PromRuleGroup `yaml:"groups"`
|
||||
}
|
||||
var pr PromRule
|
||||
ginx.Dangerous(c.BindYAML(&pr))
|
||||
if len(pr.Groups) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "input yaml is empty")
|
||||
}
|
||||
|
||||
lst := models.DealPromGroup(pr.Groups)
|
||||
bgid := ginx.UrlParamInt64(c, "id")
|
||||
err := rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language"))
|
||||
|
||||
ginx.NewRender(c).Data(err, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertRuleAddByService(c *gin.Context) {
|
||||
var lst []models.AlertRule
|
||||
ginx.BindJSON(c, &lst)
|
||||
@@ -275,6 +297,32 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
continue
|
||||
}
|
||||
|
||||
if f.Action == "annotations_add" {
|
||||
if annotations, has := f.Fields["annotations"]; has {
|
||||
annotationsMap := annotations.(map[string]interface{})
|
||||
for k, v := range annotationsMap {
|
||||
ar.AnnotationsJSON[k] = v.(string)
|
||||
}
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if f.Action == "annotations_del" {
|
||||
if annotations, has := f.Fields["annotations"]; has {
|
||||
annotationsKeys := annotations.(map[string]interface{})
|
||||
for key := range annotationsKeys {
|
||||
delete(ar.AnnotationsJSON, key)
|
||||
}
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if f.Action == "callback_add" {
|
||||
// 增加一个 callback 地址
|
||||
if callbacks, has := f.Fields["callbacks"]; has {
|
||||
@@ -453,3 +501,71 @@ func (rt *Router) relabelTest(c *gin.Context) {
|
||||
|
||||
ginx.NewRender(c).Data(tags, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) redisLPushAlertRecord(c *gin.Context) {
|
||||
var ar process.AlertRecordRedis
|
||||
ginx.BindJSON(c, &ar)
|
||||
ginx.NewRender(c).Message(process.PushAlertRecord(rt.Ctx, &rt.Redis, ar.Key, ar.Value))
|
||||
}
|
||||
|
||||
func (rt *Router) getRuleRecords(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
offset := ginx.QueryInt(c, "offset", 0)
|
||||
ruleId := ginx.QueryInt64(c, "rule_id", 0)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
alertRecords := getRuleRecordsFromRedis(rt.Ctx, &rt.Redis, stime, etime, ruleId, limit, offset, query)
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": alertRecords,
|
||||
"total": len(alertRecords),
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func getRuleRecordsFromRedis(ctx *ctx.Context, redis *storage.Redis, stime int64, etime, ruleId int64, limit, offset int, query string) []*process.AlertRecord {
|
||||
s := time.Unix(stime, 0).Round(time.Hour)
|
||||
e := time.Unix(etime, 0).Round(time.Hour)
|
||||
|
||||
keys := make([]string, 0)
|
||||
for t := s; !t.After(e); t = t.Add(time.Hour) {
|
||||
keys = append(keys, fmt.Sprintf(process.AlertRecordRedisKey, ruleId, t.Day(), t.Hour()))
|
||||
}
|
||||
|
||||
alertRecords, err := storage.MRangeList[*process.AlertRecord](ctx.GetContext(), *redis, keys)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
alertRecords = filterAlertRecords(alertRecords, query)
|
||||
|
||||
start := offset * limit
|
||||
if start > len(alertRecords) {
|
||||
return nil
|
||||
}
|
||||
|
||||
end := start + limit
|
||||
if end > len(alertRecords) {
|
||||
end = len(alertRecords)
|
||||
}
|
||||
|
||||
return alertRecords[start:end]
|
||||
}
|
||||
|
||||
func filterAlertRecords(alertRecords []*process.AlertRecord, query string) []*process.AlertRecord {
|
||||
if query == "" {
|
||||
return alertRecords
|
||||
}
|
||||
queryFields := strings.Fields(query)
|
||||
var res []*process.AlertRecord
|
||||
for i := range alertRecords {
|
||||
need := true
|
||||
for j := range queryFields {
|
||||
if !strings.Contains(alertRecords[i].Labels, queryFields[j]) {
|
||||
need = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if need {
|
||||
res = append(res, alertRecords[i])
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package router
|
||||
import (
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -57,7 +57,7 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
|
||||
}
|
||||
|
||||
if req.Hostname == "" {
|
||||
return req, fmt.Errorf("hostname is required", 400)
|
||||
return req, errors.New("hostname is required")
|
||||
}
|
||||
|
||||
// maybe from pushgw
|
||||
@@ -121,6 +121,10 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
|
||||
field["agent_version"] = req.AgentVersion
|
||||
}
|
||||
|
||||
if req.OS != "" && req.OS != target.OS {
|
||||
field["os"] = req.OS
|
||||
}
|
||||
|
||||
if len(field) > 0 {
|
||||
err := target.UpdateFieldsMap(ctx, field)
|
||||
if err != nil {
|
||||
|
||||
@@ -196,6 +196,13 @@ func (rt *Router) taskTplDel(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
ids, err := models.GetAlertRuleIdsByTaskId(rt.Ctx, tid)
|
||||
ginx.Dangerous(err)
|
||||
if len(ids) > 0 {
|
||||
ginx.NewRender(c).Message("can't del this task tpl, used by alert rule ids(%v) ", ids)
|
||||
return
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Message(tpl.Del(rt.Ctx))
|
||||
}
|
||||
|
||||
|
||||
@@ -36,11 +36,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
//check CenterApi is default value
|
||||
if len(config.CenterApi.Addrs) < 1 {
|
||||
return nil, errors.New("failed to init config: the CenterApi configuration is missing")
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
var redis storage.Redis
|
||||
redis, err = storage.NewRedis(config.Redis)
|
||||
@@ -48,6 +43,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//check CenterApi is default value
|
||||
if len(config.CenterApi.Addrs) < 1 {
|
||||
return nil, errors.New("failed to init config: the CenterApi configuration is missing")
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
|
||||
@@ -75,7 +76,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache,
|
||||
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
|
||||
alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache, &redis)
|
||||
|
||||
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
|
||||
|
||||
|
||||
29
cron/limit_alert_record_count.go
Normal file
29
cron/limit_alert_record_count.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package cron
|
||||
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/alert/process"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
const (
|
||||
limitAlertRecordCountCron = "@every 1h"
|
||||
)
|
||||
|
||||
func InitLimitAlertRecordCountCron(ctx *ctx.Context, redis *storage.Redis) error {
|
||||
c := cron.New()
|
||||
|
||||
_, err := c.AddFunc(limitAlertRecordCountCron, func() {
|
||||
process.LimitAlertRecordCount(ctx, redis)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.Start()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -83,4 +83,7 @@ ALTER TABLE recording_rule ADD COLUMN cron_pattern VARCHAR(255) DEFAULT '' COMME
|
||||
|
||||
/* v7.0.0-beta.14 */
|
||||
ALTER TABLE alert_cur_event ADD COLUMN original_tags TEXT COMMENT 'labels key=val,,k2=v2';
|
||||
ALTER TABLE alert_his_event ADD COLUMN original_tags TEXT COMMENT 'labels key=val,,k2=v2';
|
||||
ALTER TABLE alert_his_event ADD COLUMN original_tags TEXT COMMENT 'labels key=val,,k2=v2';
|
||||
|
||||
/* v7.1.0 */
|
||||
ALTER TABLE target ADD COLUMN os VARCHAR(31) DEFAULT '' COMMENT 'os type';
|
||||
@@ -192,7 +192,7 @@
|
||||
"prom_ql": "",
|
||||
"queries": [
|
||||
{
|
||||
"prom_ql": "elasticsearch_filesystem_data_available_bytes / elasticsearch_filesystem_data_size_bytes * 100 \u003c 10",
|
||||
"prom_ql": "elasticsearch_filesystem_data_available_bytes / elasticsearch_filesystem_data_size_in_bytes * 100 \u003c 10",
|
||||
"severity": 1
|
||||
}
|
||||
],
|
||||
@@ -275,7 +275,7 @@
|
||||
"prom_ql": "",
|
||||
"queries": [
|
||||
{
|
||||
"prom_ql": "elasticsearch_filesystem_data_available_bytes / elasticsearch_filesystem_data_size_bytes * 100 \u003c 20",
|
||||
"prom_ql": "elasticsearch_filesystem_data_available_bytes / elasticsearch_filesystem_data_size_in_bytes * 100 \u003c 20",
|
||||
"severity": 2
|
||||
}
|
||||
],
|
||||
@@ -1078,4 +1078,4 @@
|
||||
"update_by": "",
|
||||
"uuid": 1717556327360313000
|
||||
}
|
||||
]
|
||||
]
|
||||
|
||||
@@ -4,7 +4,6 @@ ElasticSearch 通过 HTTP JSON 的方式暴露了自身的监控指标,通过
|
||||
|
||||
如果是小规模集群,设置 `local=false`,从集群中某一个节点抓取数据,即可拿到整个集群所有节点的监控数据。如果是大规模集群,建议设置 `local=true`,在集群的每个节点上都部署抓取器,抓取本地 elasticsearch 进程的监控数据。
|
||||
|
||||
ElasticSearch 详细的监控讲解,请参考这篇 [文章](https://time.geekbang.org/column/article/628847)。
|
||||
|
||||
## 配置示例
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# kafka plugin
|
||||
|
||||
Kafka 的核心指标,其实都是通过 JMX 的方式暴露的,可以参考这篇 [文章](https://time.geekbang.org/column/article/628498)。对于 JMX 暴露的指标,使用 jolokia 或者使用 jmx_exporter 那个 jar 包来采集即可,不需要本插件。
|
||||
Kafka 的核心指标,其实都是通过 JMX 的方式暴露的。对于 JMX 暴露的指标,使用 jolokia 或者使用 jmx_exporter 那个 jar 包来采集即可,不需要本插件。
|
||||
|
||||
本插件主要是采集的消费者延迟数据,这个数据无法通过 Kafka 服务端的 JMX 拿到。
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Kubernetes
|
||||
|
||||
这个插件已经废弃。Kubernetes 监控系列可以参考这个 [文章](https://flashcat.cloud/categories/kubernetes%E7%9B%91%E6%8E%A7%E4%B8%93%E6%A0%8F/)。或者参考 [专栏](https://time.geekbang.org/column/article/630306)。
|
||||
这个插件已经废弃。Kubernetes 监控系列可以参考这个 [文章](https://flashcat.cloud/categories/kubernetes%E7%9B%91%E6%8E%A7%E4%B8%93%E6%A0%8F/)。
|
||||
|
||||
不过 Kubernetes 这个类别下的内置告警规则和内置仪表盘都是可以使用的。
|
||||
|
||||
|
||||
@@ -26,6 +26,21 @@ const (
|
||||
TDENGINE = "tdengine"
|
||||
)
|
||||
|
||||
const (
|
||||
AlertRuleEnabled = 0
|
||||
AlertRuleDisabled = 1
|
||||
|
||||
AlertRuleEnableInGlobalBG = 0
|
||||
AlertRuleEnableInOneBG = 1
|
||||
|
||||
AlertRuleNotNotifyRecovered = 0
|
||||
AlertRuleNotifyRecovered = 1
|
||||
|
||||
AlertRuleNotifyRepeatStep60Min = 60
|
||||
|
||||
AlertRuleRecoverDuration0Sec = 0
|
||||
)
|
||||
|
||||
type AlertRule struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"` // busi group id
|
||||
@@ -84,6 +99,16 @@ type AlertRule struct {
|
||||
UUID int64 `json:"uuid" gorm:"-"` // tpl identifier
|
||||
}
|
||||
|
||||
type Tpl struct {
|
||||
TplId int64 `json:"tpl_id"`
|
||||
Host []string `json:"host"`
|
||||
}
|
||||
|
||||
type RuleConfig struct {
|
||||
EventRelabelConfig []*pconf.RelabelConfig `json:"event_relabel_config"`
|
||||
TaskTpls []*Tpl `json:"task_tpls"`
|
||||
}
|
||||
|
||||
type PromRuleConfig struct {
|
||||
Queries []PromQuery `json:"queries"`
|
||||
Inhibit bool `json:"inhibit"`
|
||||
@@ -422,6 +447,19 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
|
||||
return DB(ctx).Model(ar).UpdateColumn("annotations", string(b)).Error
|
||||
}
|
||||
|
||||
if column == "annotations" {
|
||||
newAnnotations := value.(map[string]interface{})
|
||||
ar.AnnotationsJSON = make(map[string]string)
|
||||
for k, v := range newAnnotations {
|
||||
ar.AnnotationsJSON[k] = v.(string)
|
||||
}
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return DB(ctx).Model(ar).UpdateColumn("annotations", string(b)).Error
|
||||
}
|
||||
|
||||
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
|
||||
}
|
||||
|
||||
@@ -679,6 +717,25 @@ func AlertRuleExists(ctx *ctx.Context, id, groupId int64, datasourceIds []int64,
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func GetAlertRuleIdsByTaskId(ctx *ctx.Context, taskId int64) ([]int64, error) {
|
||||
tpl := "%\"tpl_id\":" + fmt.Sprint(taskId) + "}%"
|
||||
cb := "{ibex}/" + fmt.Sprint(taskId) + "%"
|
||||
session := DB(ctx).Where("rule_config like ? or callbacks like ?", tpl, cb)
|
||||
|
||||
var lst []AlertRule
|
||||
var ids []int64
|
||||
err := session.Find(&lst).Error
|
||||
if err != nil || len(lst) == 0 {
|
||||
return ids, err
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
ids = append(ids, lst[i].Id)
|
||||
}
|
||||
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func AlertRuleGets(ctx *ctx.Context, groupId int64) ([]AlertRule, error) {
|
||||
session := DB(ctx).Where("group_id=?", groupId).Order("name")
|
||||
|
||||
|
||||
@@ -230,10 +230,11 @@ type Target struct {
|
||||
HostIp string `gorm:"column:host_ip;varchar(15);default:'';comment:IPv4 string;index:idx_host_ip"`
|
||||
AgentVersion string `gorm:"column:agent_version;varchar(255);default:'';comment:agent version;index:idx_agent_version"`
|
||||
EngineName string `gorm:"column:engine_name;varchar(255);default:'';comment:engine name;index:idx_engine_name"`
|
||||
OS string `gorm:"column:os;varchar(31);default:'';comment:os type;index:idx_os"`
|
||||
}
|
||||
|
||||
type Datasource struct {
|
||||
IsDefault bool `gorm:"column:is_default;type:boolean;not null;comment:is default datasource"`
|
||||
IsDefault bool `gorm:"column:is_default;type:boolean;comment:is default datasource"`
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
|
||||
@@ -7,7 +7,11 @@ const NOTIFYCONTACT = "notify_contact"
|
||||
const SMTP = "smtp_config"
|
||||
const IBEX = "ibex_server"
|
||||
|
||||
var GlobalCallback = 0
|
||||
var RuleCallback = 1
|
||||
|
||||
type Webhook struct {
|
||||
Type int `json:"type"`
|
||||
Enable bool `json:"enable"`
|
||||
Url string `json:"url"`
|
||||
BasicAuthUser string `json:"basic_auth_user"`
|
||||
@@ -17,6 +21,9 @@ type Webhook struct {
|
||||
Headers []string `json:"headers_str"`
|
||||
SkipVerify bool `json:"skip_verify"`
|
||||
Note string `json:"note"`
|
||||
RetryCount int `json:"retry_count"`
|
||||
RetryInterval int `json:"retry_interval"`
|
||||
Batch int `json:"batch"`
|
||||
}
|
||||
|
||||
type NotifyScript struct {
|
||||
|
||||
93
models/prom_alert_rule.go
Normal file
93
models/prom_alert_rule.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type PromRule struct {
|
||||
Alert string `yaml:"alert,omitempty" json:"alert,omitempty"` // 报警规则的名称
|
||||
Record string `yaml:"record,omitempty" json:"record,omitempty"` // 记录规则的名称
|
||||
Expr string `yaml:"expr,omitempty" json:"expr,omitempty"` // PromQL 表达式
|
||||
For string `yaml:"for,omitempty" json:"for,omitempty"` // 告警的等待时间
|
||||
Annotations map[string]string `yaml:"annotations,omitempty" json:"annotations,omitempty"` // 规则的注释信息
|
||||
Labels map[string]string `yaml:"labels,omitempty" json:"labels,omitempty"` // 规则的标签信息
|
||||
}
|
||||
|
||||
type PromRuleGroup struct {
|
||||
Name string `yaml:"name"`
|
||||
Rules []PromRule `yaml:"rules"`
|
||||
Interval string `yaml:"interval,omitempty"`
|
||||
}
|
||||
|
||||
func convertInterval(interval string) int {
|
||||
duration, err := time.ParseDuration(interval)
|
||||
if err != nil {
|
||||
logger.Errorf("Error parsing interval `%s`,err: %v", interval, err)
|
||||
return 0
|
||||
}
|
||||
return int(duration.Seconds())
|
||||
}
|
||||
|
||||
func ConvertAlert(rule PromRule, interval string) AlertRule {
|
||||
annotations := rule.Annotations
|
||||
appendTags := []string{}
|
||||
severity := 2
|
||||
|
||||
if len(rule.Labels) > 0 {
|
||||
for k, v := range rule.Labels {
|
||||
if k != "severity" {
|
||||
appendTags = append(appendTags, fmt.Sprintf("%s=%s", k, v))
|
||||
} else {
|
||||
switch v {
|
||||
case "critical":
|
||||
severity = 1
|
||||
case "warning":
|
||||
severity = 2
|
||||
case "info":
|
||||
severity = 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return AlertRule{
|
||||
Name: rule.Alert,
|
||||
Severity: severity,
|
||||
Disabled: AlertRuleEnabled,
|
||||
PromForDuration: convertInterval(rule.For),
|
||||
PromQl: rule.Expr,
|
||||
PromEvalInterval: convertInterval(interval),
|
||||
EnableStimeJSON: "00:00",
|
||||
EnableEtimeJSON: "23:59",
|
||||
EnableDaysOfWeekJSON: []string{
|
||||
"1", "2", "3", "4", "5", "6", "0",
|
||||
},
|
||||
EnableInBG: AlertRuleEnableInGlobalBG,
|
||||
NotifyRecovered: AlertRuleNotifyRecovered,
|
||||
NotifyRepeatStep: AlertRuleNotifyRepeatStep60Min,
|
||||
RecoverDuration: AlertRuleRecoverDuration0Sec,
|
||||
AnnotationsJSON: annotations,
|
||||
AppendTagsJSON: appendTags,
|
||||
}
|
||||
}
|
||||
|
||||
func DealPromGroup(promRule []PromRuleGroup) []AlertRule {
|
||||
var alertRules []AlertRule
|
||||
|
||||
for _, group := range promRule {
|
||||
interval := group.Interval
|
||||
if interval == "" {
|
||||
interval = "15s"
|
||||
}
|
||||
for _, rule := range group.Rules {
|
||||
if rule.Alert != "" {
|
||||
alertRules = append(alertRules, ConvertAlert(rule, interval))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return alertRules
|
||||
}
|
||||
84
models/prom_alert_rule_test.go
Normal file
84
models/prom_alert_rule_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package models_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
func TestConvertAlert(t *testing.T) {
|
||||
jobMissing := []models.PromRule{}
|
||||
err := yaml.Unmarshal([]byte(` - alert: PrometheusJobMissing
|
||||
expr: absent(up{job="prometheus"})
|
||||
for: 1m
|
||||
labels:
|
||||
severity: warning
|
||||
annotations:
|
||||
summary: Prometheus job missing (instance {{ $labels.instance }})
|
||||
description: "A Prometheus job has disappeared\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"`), &jobMissing)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to Unmarshal, err: %s", err)
|
||||
}
|
||||
t.Logf("jobMissing: %+v", jobMissing[0])
|
||||
convJobMissing := models.ConvertAlert(jobMissing[0], "30s")
|
||||
if convJobMissing.PromEvalInterval != 30 {
|
||||
t.Errorf("PromEvalInterval is expected to be 30, but got %d",
|
||||
convJobMissing.PromEvalInterval)
|
||||
}
|
||||
if convJobMissing.PromForDuration != 60 {
|
||||
t.Errorf("PromForDuration is expected to be 60, but got %d",
|
||||
convJobMissing.PromForDuration)
|
||||
}
|
||||
if convJobMissing.Severity != 2 {
|
||||
t.Errorf("Severity is expected to be 2, but got %d", convJobMissing.Severity)
|
||||
}
|
||||
|
||||
ruleEvaluationSlow := []models.PromRule{}
|
||||
yaml.Unmarshal([]byte(` - alert: PrometheusRuleEvaluationSlow
|
||||
expr: prometheus_rule_group_last_duration_seconds > prometheus_rule_group_interval_seconds
|
||||
for: 180s
|
||||
labels:
|
||||
severity: info
|
||||
annotations:
|
||||
summary: Prometheus rule evaluation slow (instance {{ $labels.instance }})
|
||||
description: "Prometheus rule evaluation took more time than the scheduled interval. It indicates a slower storage backend access or too complex query.\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
|
||||
`), &ruleEvaluationSlow)
|
||||
t.Logf("ruleEvaluationSlow: %+v", ruleEvaluationSlow[0])
|
||||
convRuleEvaluationSlow := models.ConvertAlert(ruleEvaluationSlow[0], "1m")
|
||||
if convRuleEvaluationSlow.PromEvalInterval != 60 {
|
||||
t.Errorf("PromEvalInterval is expected to be 60, but got %d",
|
||||
convJobMissing.PromEvalInterval)
|
||||
}
|
||||
if convRuleEvaluationSlow.PromForDuration != 180 {
|
||||
t.Errorf("PromForDuration is expected to be 180, but got %d",
|
||||
convJobMissing.PromForDuration)
|
||||
}
|
||||
if convRuleEvaluationSlow.Severity != 3 {
|
||||
t.Errorf("Severity is expected to be 3, but got %d", convJobMissing.Severity)
|
||||
}
|
||||
|
||||
targetMissing := []models.PromRule{}
|
||||
yaml.Unmarshal([]byte(` - alert: PrometheusTargetMissing
|
||||
expr: up == 0
|
||||
for: 1.5m
|
||||
labels:
|
||||
severity: critical
|
||||
annotations:
|
||||
summary: Prometheus target missing (instance {{ $labels.instance }})
|
||||
description: "A Prometheus target has disappeared. An exporter might be crashed.\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
|
||||
`), &targetMissing)
|
||||
t.Logf("targetMissing: %+v", targetMissing[0])
|
||||
convTargetMissing := models.ConvertAlert(targetMissing[0], "1h")
|
||||
if convTargetMissing.PromEvalInterval != 3600 {
|
||||
t.Errorf("PromEvalInterval is expected to be 3600, but got %d",
|
||||
convTargetMissing.PromEvalInterval)
|
||||
}
|
||||
if convTargetMissing.PromForDuration != 90 {
|
||||
t.Errorf("PromForDuration is expected to be 90, but got %d",
|
||||
convTargetMissing.PromForDuration)
|
||||
}
|
||||
if convTargetMissing.Severity != 1 {
|
||||
t.Errorf("Severity is expected to be 1, but got %d", convTargetMissing.Severity)
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,7 @@ type Target struct {
|
||||
HostIp string `json:"host_ip"` //ipv4,do not needs range select
|
||||
AgentVersion string `json:"agent_version"`
|
||||
EngineName string `json:"engine_name"`
|
||||
OS string `json:"os" gorm:"column:os"`
|
||||
|
||||
UnixTime int64 `json:"unixtime" gorm:"-"`
|
||||
Offset int64 `json:"offset" gorm:"-"`
|
||||
@@ -33,7 +34,6 @@ type Target struct {
|
||||
MemUtil float64 `json:"mem_util" gorm:"-"`
|
||||
CpuNum int `json:"cpu_num" gorm:"-"`
|
||||
CpuUtil float64 `json:"cpu_util" gorm:"-"`
|
||||
OS string `json:"os" gorm:"-"`
|
||||
Arch string `json:"arch" gorm:"-"`
|
||||
RemoteAddr string `json:"remote_addr" gorm:"-"`
|
||||
}
|
||||
@@ -111,7 +111,8 @@ func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
|
||||
arr := strings.Fields(query)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
q := "%" + arr[i] + "%"
|
||||
session = session.Where("ident like ? or note like ? or tags like ?", q, q, q)
|
||||
session = session.Where("ident like ? or note like ? or tags like ? "+
|
||||
"or os like ?", q, q, q, q)
|
||||
}
|
||||
}
|
||||
return session
|
||||
@@ -418,7 +419,6 @@ func (t *Target) FillMeta(meta *HostMeta) {
|
||||
t.CpuNum = meta.CpuNum
|
||||
t.UnixTime = meta.UnixTime
|
||||
t.Offset = meta.Offset
|
||||
t.OS = meta.OS
|
||||
t.Arch = meta.Arch
|
||||
t.RemoteAddr = meta.RemoteAddr
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/conf"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
|
||||
@@ -32,8 +32,6 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
var redis storage.Redis
|
||||
if config.Redis.Address != "" {
|
||||
redis, err = storage.NewRedis(config.Redis)
|
||||
@@ -41,6 +39,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
idents := idents.New(ctx, redis)
|
||||
metas := metas.New(redis)
|
||||
|
||||
|
||||
109
storage/redis.go
109
storage/redis.go
@@ -2,12 +2,15 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/tlsx"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -135,3 +138,109 @@ func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
// LPush push value to list
|
||||
func LPush(ctx context.Context, r Redis, key string, values ...interface{}) error {
|
||||
|
||||
_, err := r.LPush(ctx, key, values).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func LTrim(ctx context.Context, r Redis, key string, start, stop int64) error {
|
||||
return r.LTrim(ctx, key, start, stop).Err()
|
||||
}
|
||||
|
||||
func Expire(ctx context.Context, r Redis, key string, expiration time.Duration) error {
|
||||
return r.Expire(ctx, key, expiration).Err()
|
||||
}
|
||||
|
||||
// MRangeList get multiple list from redis and unmarshal to []T
|
||||
func MRangeList[T any](ctx context.Context, r Redis, keys []string) ([]T, error) {
|
||||
pipe := r.Pipeline()
|
||||
for _, k := range keys {
|
||||
pipe.LRange(ctx, k, 0, -1)
|
||||
}
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var res []T
|
||||
for i := range cmds {
|
||||
if cmds[i].Err() != nil {
|
||||
continue
|
||||
}
|
||||
val := cmds[i].(*redis.StringSliceCmd).Val()
|
||||
for _, v := range val {
|
||||
var temp T
|
||||
err := json.Unmarshal([]byte(v), &temp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
res = append(res, temp)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func MLLen(ctx context.Context, r Redis, keys []string) (map[string]int64, error) {
|
||||
pipe := r.Pipeline()
|
||||
for _, key := range keys {
|
||||
pipe.LLen(ctx, key)
|
||||
}
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make(map[string]int64)
|
||||
for i, key := range keys {
|
||||
cmd := cmds[i]
|
||||
if errors.Is(cmd.Err(), redis.Nil) {
|
||||
res[key] = 0
|
||||
continue
|
||||
}
|
||||
|
||||
if cmd.Err() != nil {
|
||||
logger.Errorf("failed to get length of key: %s, err: %s", key, cmd.Err())
|
||||
continue
|
||||
}
|
||||
res[key] = cmd.(*redis.IntCmd).Val()
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func MTTL(ctx context.Context, r Redis, keys []string) (map[string]time.Duration, error) {
|
||||
pipe := r.Pipeline()
|
||||
for _, key := range keys {
|
||||
pipe.TTL(ctx, key)
|
||||
}
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res := make(map[string]time.Duration)
|
||||
for i, key := range keys {
|
||||
cmd := cmds[i]
|
||||
if errors.Is(cmd.Err(), redis.Nil) {
|
||||
continue
|
||||
}
|
||||
|
||||
if cmd.Err() != nil {
|
||||
logger.Errorf("failed to get ttl of key: %s, err: %s", key, cmd.Err())
|
||||
continue
|
||||
}
|
||||
res[key] = cmd.(*redis.DurationCmd).Val()
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func MDel(ctx context.Context, r Redis, keys ...string) error {
|
||||
pipe := r.Pipeline()
|
||||
for _, key := range keys {
|
||||
pipe.Del(ctx, key)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user