mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
13 Commits
v8.2.2
...
notify-ref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6d6460f138 | ||
|
|
9ec424f98c | ||
|
|
c931f43748 | ||
|
|
f525dcf185 | ||
|
|
944ee5b801 | ||
|
|
fa6fef1689 | ||
|
|
167c8aece6 | ||
|
|
378fece50b | ||
|
|
bb6da02e7f | ||
|
|
349e87ce8e | ||
|
|
c37cfaa7ce | ||
|
|
4c1afb1191 | ||
|
|
14e3fd6fa3 |
@@ -35,9 +35,9 @@ func MatchGroupsName(groupName string, groupFilter []models.TagFilter) bool {
|
||||
func matchTag(value string, filter models.TagFilter) bool {
|
||||
switch filter.Func {
|
||||
case "==":
|
||||
return strings.TrimSpace(filter.Value) == strings.TrimSpace(value)
|
||||
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) == strings.TrimSpace(value)
|
||||
case "!=":
|
||||
return strings.TrimSpace(filter.Value) != strings.TrimSpace(value)
|
||||
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) != strings.TrimSpace(value)
|
||||
case "in":
|
||||
_, has := filter.Vset[value]
|
||||
return has
|
||||
|
||||
@@ -110,10 +110,6 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
|
||||
|
||||
e.persist(event)
|
||||
|
||||
if event.IsRecovered && event.NotifyRecovered == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
e.dispatch.HandleEventNotify(event, false)
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,15 @@ import (
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var ShouldSkipNotify func(*ctx.Context, *models.AlertCurEvent, int64) bool
|
||||
var SendByNotifyRule func(*ctx.Context, *memsto.UserCacheType, *memsto.UserGroupCacheType, *memsto.NotifyChannelCacheType,
|
||||
[]*models.AlertCurEvent, int64, *models.NotifyConfig, *models.NotifyChannelConfig, *models.MessageTemplate)
|
||||
|
||||
func init() {
|
||||
ShouldSkipNotify = shouldSkipNotify
|
||||
SendByNotifyRule = SendNotifyRuleMessage
|
||||
}
|
||||
|
||||
type Dispatch struct {
|
||||
alertRuleCache *memsto.AlertRuleCacheType
|
||||
userCache *memsto.UserCacheType
|
||||
@@ -45,9 +54,8 @@ type Dispatch struct {
|
||||
tpls map[string]*template.Template
|
||||
ExtraSenders map[string]sender.Sender
|
||||
BeforeSenderHook func(*models.AlertCurEvent) bool
|
||||
|
||||
ctx *ctx.Context
|
||||
Astats *astats.Stats
|
||||
ctx *ctx.Context
|
||||
Astats *astats.Stats
|
||||
|
||||
RwLock sync.RWMutex
|
||||
}
|
||||
@@ -56,7 +64,7 @@ type Dispatch struct {
|
||||
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
|
||||
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
|
||||
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
|
||||
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
|
||||
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, c *ctx.Context, astats *astats.Stats) *Dispatch {
|
||||
notify := &Dispatch{
|
||||
alertRuleCache: alertRuleCache,
|
||||
userCache: userCache,
|
||||
@@ -77,7 +85,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
|
||||
ExtraSenders: make(map[string]sender.Sender),
|
||||
BeforeSenderHook: func(*models.AlertCurEvent) bool { return true },
|
||||
|
||||
ctx: ctx,
|
||||
ctx: c,
|
||||
Astats: astats,
|
||||
}
|
||||
|
||||
@@ -166,6 +174,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
if !notifyRule.Enable {
|
||||
continue
|
||||
}
|
||||
eventCopy.NotifyRuleId = notifyRuleId
|
||||
eventCopy.NotifyRuleName = notifyRule.Name
|
||||
|
||||
var processors []models.Processor
|
||||
for _, pipelineConfig := range notifyRule.PipelineConfigs {
|
||||
@@ -194,13 +204,14 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
eventCopy, res, err = processor.Process(e.ctx, eventCopy)
|
||||
if eventCopy == nil {
|
||||
logger.Warningf("after processor notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
|
||||
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, "", "", res, errors.New("drop by processor"))
|
||||
break
|
||||
}
|
||||
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
|
||||
}
|
||||
|
||||
if eventCopy == nil {
|
||||
// 如果 eventCopy 为 nil,说明 eventCopy 被 processor drop 掉了, 不再发送通知
|
||||
if ShouldSkipNotify(e.ctx, eventCopy, notifyRuleId) {
|
||||
logger.Infof("notify_id: %d, event:%+v, should skip notify", notifyRuleId, eventCopy)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -227,14 +238,26 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
continue
|
||||
}
|
||||
|
||||
// todo go send
|
||||
// todo 聚合 event
|
||||
go e.sendV2([]*models.AlertCurEvent{eventCopy}, notifyRuleId, ¬ifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
|
||||
go SendByNotifyRule(e.ctx, e.userCache, e.userGroupCache, e.notifyChannelCache, []*models.AlertCurEvent{eventCopy}, notifyRuleId, ¬ifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func shouldSkipNotify(ctx *ctx.Context, event *models.AlertCurEvent, notifyRuleId int64) bool {
|
||||
if event == nil {
|
||||
// 如果 eventCopy 为 nil,说明 eventCopy 被 processor drop 掉了, 不再发送通知
|
||||
return true
|
||||
}
|
||||
|
||||
if event.IsRecovered && event.NotifyRecovered == 0 {
|
||||
// 如果 eventCopy 是恢复事件,且 NotifyRecovered 为 0,则不发送通知
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
|
||||
if pipeline == nil {
|
||||
return true
|
||||
@@ -445,7 +468,8 @@ func GetNotifyConfigParams(notifyConfig *models.NotifyConfig, contactKey string,
|
||||
return sendtos, flashDutyChannelIDs, customParams
|
||||
}
|
||||
|
||||
func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
|
||||
func SendNotifyRuleMessage(ctx *ctx.Context, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
|
||||
events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
|
||||
if len(events) == 0 {
|
||||
logger.Errorf("notify_id: %d events is empty", notifyRuleId)
|
||||
return
|
||||
@@ -461,10 +485,7 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
|
||||
contactKey = notifyChannel.ParamConfig.UserInfo.ContactKey
|
||||
}
|
||||
|
||||
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, e.userCache, e.userGroupCache)
|
||||
|
||||
e.Astats.GaugeNotifyRecordQueueSize.Inc()
|
||||
defer e.Astats.GaugeNotifyRecordQueueSize.Dec()
|
||||
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, userCache, userGroupCache)
|
||||
|
||||
switch notifyChannel.RequestType {
|
||||
case "flashduty":
|
||||
@@ -474,10 +495,10 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
|
||||
|
||||
for i := range flashDutyChannelIDs {
|
||||
start := time.Now()
|
||||
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
|
||||
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], notifyChannelCache.GetHttpClient(notifyChannel.ID))
|
||||
respBody = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), respBody)
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
|
||||
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
|
||||
}
|
||||
|
||||
case "http":
|
||||
@@ -493,22 +514,22 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
|
||||
}
|
||||
|
||||
// 将任务加入队列
|
||||
success := e.notifyChannelCache.EnqueueNotifyTask(task)
|
||||
success := notifyChannelCache.EnqueueNotifyTask(task)
|
||||
if !success {
|
||||
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
|
||||
// 如果入队失败,记录错误通知
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
|
||||
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
|
||||
}
|
||||
|
||||
case "smtp":
|
||||
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, e.notifyChannelCache.GetSmtpClient(notifyChannel.ID))
|
||||
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, notifyChannelCache.GetSmtpClient(notifyChannel.ID))
|
||||
|
||||
case "script":
|
||||
start := time.Now()
|
||||
target, res, err := notifyChannel.SendScript(events, tplContent, customParams, sendtos)
|
||||
res = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), res)
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
|
||||
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
|
||||
default:
|
||||
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v send type not found", notifyRuleId, notifyChannel.Name, events[0])
|
||||
}
|
||||
@@ -523,6 +544,11 @@ func NeedBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
|
||||
// event: 告警/恢复事件
|
||||
// isSubscribe: 告警事件是否由subscribe的配置产生
|
||||
func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bool) {
|
||||
go e.HandleEventWithNotifyRule(event)
|
||||
if event.IsRecovered && event.NotifyRecovered == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
rule := e.alertRuleCache.Get(event.RuleId)
|
||||
if rule == nil {
|
||||
return
|
||||
@@ -555,7 +581,6 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
|
||||
notifyTarget.AndMerge(handler(rule, event, notifyTarget, e))
|
||||
}
|
||||
|
||||
go e.HandleEventWithNotifyRule(event)
|
||||
go e.Send(rule, event, notifyTarget, isSubscribe)
|
||||
|
||||
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event
|
||||
|
||||
@@ -543,6 +543,9 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/notify-rule/custom-params", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRuleCustomParamsGet)
|
||||
pages.POST("/notify-rule/event-pipelines-tryrun", rt.auth(), rt.user(), rt.perm("/notification-rules/add"), rt.tryRunEventProcessorByNotifyRule)
|
||||
|
||||
pages.GET("/event-tagkeys", rt.auth(), rt.user(), rt.eventTagKeys)
|
||||
pages.GET("/event-tagvalues", rt.auth(), rt.user(), rt.eventTagValues)
|
||||
|
||||
// 事件Pipeline相关路由
|
||||
pages.GET("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.eventPipelinesList)
|
||||
pages.POST("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/add"), rt.addEventPipeline)
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func getUserGroupIds(ctx *gin.Context, rt *Router, myGroups bool) ([]int64, error) {
|
||||
@@ -305,3 +306,123 @@ func (rt *Router) alertCurEventDelByHash(c *gin.Context) {
|
||||
hash := ginx.QueryStr(c, "hash")
|
||||
ginx.NewRender(c).Message(models.AlertCurEventDelByHash(rt.Ctx, hash))
|
||||
}
|
||||
|
||||
func (rt *Router) eventTagKeys(c *gin.Context) {
|
||||
// 获取最近1天的活跃告警事件
|
||||
now := time.Now().Unix()
|
||||
stime := now - 24*3600
|
||||
etime := now
|
||||
|
||||
// 获取用户可见的业务组ID列表
|
||||
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get business group ids: %v", err)
|
||||
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 查询活跃告警事件,限制数量以提高性能
|
||||
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 200, 0, []int64{})
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get current alert events: %v", err)
|
||||
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 如果没有查到事件,返回默认标签
|
||||
if len(events) == 0 {
|
||||
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 收集所有标签键并去重
|
||||
tagKeys := make(map[string]struct{})
|
||||
for _, event := range events {
|
||||
for key := range event.TagsMap {
|
||||
tagKeys[key] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// 转换为字符串切片
|
||||
var result []string
|
||||
for key := range tagKeys {
|
||||
result = append(result, key)
|
||||
}
|
||||
|
||||
// 如果没有收集到任何标签键,返回默认值
|
||||
if len(result) == 0 {
|
||||
result = []string{"ident", "app", "service", "instance"}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(result, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) eventTagValues(c *gin.Context) {
|
||||
// 获取标签key
|
||||
tagKey := ginx.QueryStr(c, "key")
|
||||
|
||||
// 获取最近1天的活跃告警事件
|
||||
now := time.Now().Unix()
|
||||
stime := now - 24*3600
|
||||
etime := now
|
||||
|
||||
// 获取用户可见的业务组ID列表
|
||||
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get business group ids: %v", err)
|
||||
ginx.NewRender(c).Data([]string{}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 查询活跃告警事件,获取更多数据以保证统计准确性
|
||||
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 1000, 0, []int64{})
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get current alert events: %v", err)
|
||||
ginx.NewRender(c).Data([]string{}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 如果没有查到事件,返回空数组
|
||||
if len(events) == 0 {
|
||||
ginx.NewRender(c).Data([]string{}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 统计标签值出现次数
|
||||
valueCount := make(map[string]int)
|
||||
for _, event := range events {
|
||||
// TagsMap已经在AlertCurEventsGet中处理,直接使用
|
||||
if value, exists := event.TagsMap[tagKey]; exists && value != "" {
|
||||
valueCount[value]++
|
||||
}
|
||||
}
|
||||
|
||||
// 转换为切片并按出现次数降序排序
|
||||
type tagValue struct {
|
||||
value string
|
||||
count int
|
||||
}
|
||||
|
||||
tagValues := make([]tagValue, 0, len(valueCount))
|
||||
for value, count := range valueCount {
|
||||
tagValues = append(tagValues, tagValue{value, count})
|
||||
}
|
||||
|
||||
// 按出现次数降序排序
|
||||
sort.Slice(tagValues, func(i, j int) bool {
|
||||
return tagValues[i].count > tagValues[j].count
|
||||
})
|
||||
|
||||
// 只取Top20并转换为字符串数组
|
||||
limit := 20
|
||||
if len(tagValues) < limit {
|
||||
limit = len(tagValues)
|
||||
}
|
||||
|
||||
result := make([]string, 0, limit)
|
||||
for i := 0; i < limit; i++ {
|
||||
result = append(result, tagValues[i].value)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(result, nil)
|
||||
}
|
||||
|
||||
@@ -62,11 +62,11 @@ func (rt *Router) alertHisEventsList(c *gin.Context) {
|
||||
ginx.Dangerous(err)
|
||||
|
||||
total, err := models.AlertHisEventTotal(rt.Ctx, prods, bgids, stime, etime, severity,
|
||||
recovered, dsIds, cates, ruleId, query)
|
||||
recovered, dsIds, cates, ruleId, query, []int64{})
|
||||
ginx.Dangerous(err)
|
||||
|
||||
list, err := models.AlertHisEventGets(rt.Ctx, prods, bgids, stime, etime, severity, recovered,
|
||||
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit))
|
||||
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit), []int64{})
|
||||
ginx.Dangerous(err)
|
||||
|
||||
cache := make(map[int64]*models.UserGroup)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/i18n"
|
||||
)
|
||||
|
||||
// 获取事件Pipeline列表
|
||||
@@ -139,12 +140,14 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
lang := c.GetHeader("X-Language")
|
||||
var result string
|
||||
for _, p := range f.PipelineConfig.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
|
||||
}
|
||||
event, _, err = processor.Process(rt.Ctx, event)
|
||||
event, result, err = processor.Process(rt.Ctx, event)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
@@ -152,7 +155,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
if event == nil {
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": "event is dropped",
|
||||
"result": i18n.Sprintf(lang, "event is dropped"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
@@ -160,7 +163,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
|
||||
m := map[string]interface{}{
|
||||
"event": event,
|
||||
"result": "",
|
||||
"result": i18n.Sprintf(lang, result),
|
||||
}
|
||||
ginx.NewRender(c).Data(m, nil)
|
||||
}
|
||||
@@ -188,9 +191,10 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
ginx.Bomb(200, "processor err: %+v", err)
|
||||
}
|
||||
|
||||
lang := c.GetHeader("X-Language")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": res,
|
||||
"result": i18n.Sprintf(lang, res),
|
||||
}, nil)
|
||||
}
|
||||
|
||||
@@ -231,9 +235,10 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
if event == nil {
|
||||
lang := c.GetHeader("X-Language")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": "event is dropped",
|
||||
"result": i18n.Sprintf(lang, "event is dropped"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -196,6 +196,7 @@ func (rt *Router) eventsMessage(c *gin.Context) {
|
||||
var defs = []string{
|
||||
"{{$events := .}}",
|
||||
"{{$event := index . 0}}",
|
||||
"{{$aggr_key := \"\"}}",
|
||||
}
|
||||
ret := make(map[string]string, len(req.Tpl.Content))
|
||||
for k, v := range req.Tpl.Content {
|
||||
|
||||
@@ -78,6 +78,8 @@ type AlertCurEvent struct {
|
||||
RuleHash string `json:"rule_hash" gorm:"-"`
|
||||
ExtraInfoMap []map[string]string `json:"extra_info_map" gorm:"-"`
|
||||
NotifyRuleIds []int64 `json:"notify_rule_ids" gorm:"serializer:json"`
|
||||
NotifyRuleId int64 `json:"notify_rule_id" gorm:"-"`
|
||||
NotifyRuleName string `json:"notify_rule_name" gorm:"-"`
|
||||
|
||||
NotifyVersion int `json:"notify_version" gorm:"-"` // 0: old, 1: new
|
||||
NotifyRules []*EventNotifyRule `json:"notify_rules" gorm:"-"`
|
||||
|
||||
@@ -127,7 +127,7 @@ func (e *AlertHisEvent) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*User
|
||||
|
||||
func AlertHisEventTotal(
|
||||
ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64, severity int,
|
||||
recovered int, dsIds []int64, cates []string, ruleId int64, query string) (int64, error) {
|
||||
recovered int, dsIds []int64, cates []string, ruleId int64, query string, eventIds []int64) (int64, error) {
|
||||
session := DB(ctx).Model(&AlertHisEvent{}).Where("last_eval_time between ? and ?", stime, etime)
|
||||
|
||||
if len(prods) > 0 {
|
||||
@@ -158,6 +158,10 @@ func AlertHisEventTotal(
|
||||
session = session.Where("rule_id = ?", ruleId)
|
||||
}
|
||||
|
||||
if len(eventIds) > 0 {
|
||||
session = session.Where("id in ?", eventIds)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
@@ -171,7 +175,7 @@ func AlertHisEventTotal(
|
||||
|
||||
func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
|
||||
severity int, recovered int, dsIds []int64, cates []string, ruleId int64, query string,
|
||||
limit, offset int) ([]AlertHisEvent, error) {
|
||||
limit, offset int, eventIds []int64) ([]AlertHisEvent, error) {
|
||||
session := DB(ctx).Where("last_eval_time between ? and ?", stime, etime)
|
||||
|
||||
if len(prods) != 0 {
|
||||
@@ -202,6 +206,10 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, e
|
||||
session = session.Where("rule_id = ?", ruleId)
|
||||
}
|
||||
|
||||
if len(eventIds) > 0 {
|
||||
session = session.Where("id in ?", eventIds)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
|
||||
@@ -20,7 +20,7 @@ type TagFilter struct {
|
||||
Key string `json:"key"` // tag key
|
||||
Func string `json:"func"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
|
||||
Op string `json:"op"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
|
||||
Value string `json:"value"` // tag value
|
||||
Value interface{} `json:"value"` // tag value
|
||||
Regexp *regexp.Regexp // parse value to regexp if func = '=~' or '!~'
|
||||
Vset map[string]struct{} // parse value to regexp if func = 'in' or 'not in'
|
||||
}
|
||||
@@ -46,15 +46,59 @@ func ParseTagFilter(bFilters []TagFilter) ([]TagFilter, error) {
|
||||
var err error
|
||||
for i := 0; i < len(bFilters); i++ {
|
||||
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
|
||||
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
|
||||
// 这里存在两个情况,一个是 string 一个是 int
|
||||
var pattern string
|
||||
switch v := bFilters[i].Value.(type) {
|
||||
case string:
|
||||
pattern = v
|
||||
case int:
|
||||
pattern = strconv.Itoa(v)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
|
||||
}
|
||||
bFilters[i].Regexp, err = regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
|
||||
arr := strings.Fields(bFilters[i].Value)
|
||||
// 这里存在两个情况,一个是 string 一个是[]int
|
||||
bFilters[i].Vset = make(map[string]struct{})
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
|
||||
switch v := bFilters[i].Value.(type) {
|
||||
case string:
|
||||
// 处理字符串情况
|
||||
arr := strings.Fields(v)
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
}
|
||||
case []int:
|
||||
// 处理[]int情况
|
||||
for j := 0; j < len(v); j++ {
|
||||
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
|
||||
}
|
||||
case []string:
|
||||
for j := 0; j < len(v); j++ {
|
||||
bFilters[i].Vset[v[j]] = struct{}{}
|
||||
}
|
||||
case []interface{}:
|
||||
// 处理[]interface{}情况(JSON解析可能产生)
|
||||
for j := 0; j < len(v); j++ {
|
||||
switch item := v[j].(type) {
|
||||
case string:
|
||||
bFilters[i].Vset[item] = struct{}{}
|
||||
case int:
|
||||
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
|
||||
case float64:
|
||||
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
|
||||
}
|
||||
}
|
||||
default:
|
||||
// 兜底处理,转为字符串
|
||||
str := fmt.Sprintf("%v", v)
|
||||
arr := strings.Fields(str)
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -73,15 +117,54 @@ func GetTagFilters(jsonArr ormx.JSONArr) ([]TagFilter, error) {
|
||||
}
|
||||
for i := 0; i < len(bFilters); i++ {
|
||||
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
|
||||
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
|
||||
var pattern string
|
||||
switch v := bFilters[i].Value.(type) {
|
||||
case string:
|
||||
pattern = v
|
||||
case int:
|
||||
pattern = strconv.Itoa(v)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
|
||||
}
|
||||
bFilters[i].Regexp, err = regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
|
||||
arr := strings.Fields(bFilters[i].Value)
|
||||
bFilters[i].Vset = make(map[string]struct{})
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
|
||||
// 在GetTagFilters中,Value通常是string类型,但也要处理其他可能的类型
|
||||
switch v := bFilters[i].Value.(type) {
|
||||
case string:
|
||||
// 处理字符串情况
|
||||
arr := strings.Fields(v)
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
}
|
||||
case []int:
|
||||
// 处理[]int情况
|
||||
for j := 0; j < len(v); j++ {
|
||||
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
|
||||
}
|
||||
case []interface{}:
|
||||
// 处理[]interface{}情况(JSON解析可能产生)
|
||||
for j := 0; j < len(v); j++ {
|
||||
switch item := v[j].(type) {
|
||||
case string:
|
||||
bFilters[i].Vset[item] = struct{}{}
|
||||
case int:
|
||||
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
|
||||
case float64:
|
||||
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
|
||||
}
|
||||
}
|
||||
default:
|
||||
// 兜底处理,转为字符串
|
||||
str := fmt.Sprintf("%v", v)
|
||||
arr := strings.Fields(str)
|
||||
for j := 0; j < len(arr); j++ {
|
||||
bFilters[i].Vset[arr[j]] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"html/template"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
texttemplate "text/template"
|
||||
"time"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
)
|
||||
|
||||
// MessageTemplate 消息模板结构
|
||||
@@ -714,10 +716,26 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
|
||||
return tpl.Update(ctx, *t)
|
||||
}
|
||||
|
||||
func GetAggrKey(events []*AlertCurEvent) string {
|
||||
if len(events) <= 1 {
|
||||
return ""
|
||||
}
|
||||
|
||||
ids := make([]string, 0)
|
||||
for i := range len(events) {
|
||||
ids = append(ids, fmt.Sprintf("%d", events[i].Id))
|
||||
}
|
||||
sort.Strings(ids)
|
||||
idsStr := strings.Join(ids, ",")
|
||||
logger.Debugf("notify_hook aggr_key: %s, ids: %v", str.MD5(idsStr), ids)
|
||||
return str.MD5(idsStr)
|
||||
}
|
||||
|
||||
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
|
||||
if t == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// event 内容渲染到 messageTemplate
|
||||
tplContent := make(map[string]interface{})
|
||||
for key, msgTpl := range t.Content {
|
||||
@@ -728,6 +746,8 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
|
||||
"{{ $value := $event.TriggerValue }}",
|
||||
}
|
||||
|
||||
defs = append(defs, fmt.Sprintf("{{ $aggrkey := \"%s\" }}", GetAggrKey(events)))
|
||||
|
||||
var body bytes.Buffer
|
||||
if t.NotifyChannelIdent == "email" {
|
||||
text := strings.Join(append(defs, msgTpl), "")
|
||||
|
||||
@@ -444,6 +444,7 @@ type NotifyRule struct {
|
||||
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
|
||||
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
|
||||
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text"`
|
||||
ExtraConfig interface{} `gorm:"column:extra_config;type:text"`
|
||||
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
|
||||
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
|
||||
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
@@ -19,6 +22,7 @@ type NotifyRule struct {
|
||||
|
||||
// 通知配置
|
||||
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
|
||||
ExtraConfig interface{} `json:"extra_config" gorm:"serializer:json"`
|
||||
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
@@ -39,6 +43,7 @@ type NotifyConfig struct {
|
||||
ChannelID int64 `json:"channel_id"` // 通知媒介(如:阿里云短信)
|
||||
TemplateID int64 `json:"template_id"` // 通知模板
|
||||
Params map[string]interface{} `json:"params"` // 通知参数
|
||||
Type string `json:"type"`
|
||||
|
||||
Severities []int `json:"severities"` // 适用级别(一级告警、二级告警、三级告警)
|
||||
TimeRanges []TimeRanges `json:"time_ranges"` // 适用时段
|
||||
@@ -46,6 +51,12 @@ type NotifyConfig struct {
|
||||
Attributes []TagFilter `json:"attributes"` // 适用属性
|
||||
}
|
||||
|
||||
func (n *NotifyConfig) Hash() string {
|
||||
hash := sha256.New()
|
||||
hash.Write([]byte(fmt.Sprintf("%d%d%v%s%v%v%v%v", n.ChannelID, n.TemplateID, n.Params, n.Type, n.Severities, n.TimeRanges, n.LabelKeys, n.Attributes)))
|
||||
return hex.EncodeToString(hash.Sum(nil))
|
||||
}
|
||||
|
||||
type CustomParams struct {
|
||||
UserIDs []int64 `json:"user_ids"`
|
||||
UserGroupIDs []int64 `json:"user_group_ids"`
|
||||
@@ -75,11 +86,6 @@ func GetNotifyRule(c *ctx.Context, id int64) (*NotifyRule, error) {
|
||||
return &rule, nil
|
||||
}
|
||||
|
||||
// 更新 NotifyRule
|
||||
func UpdateNotifyRule(c *ctx.Context, rule *NotifyRule) error {
|
||||
return DB(c).Save(rule).Error
|
||||
}
|
||||
|
||||
// 删除 NotifyRule
|
||||
func DeleteNotifyRule(c *ctx.Context, id int64) error {
|
||||
return DB(c).Delete(&NotifyRule{}, id).Error
|
||||
|
||||
@@ -79,6 +79,11 @@ var I18N = `{
|
||||
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
|
||||
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
|
||||
|
||||
"event is dropped": "事件已被丢弃,不会进行通知",
|
||||
"drop event success": "丢弃事件成功",
|
||||
"drop event failed": "丢弃事件失败",
|
||||
"callback success": "回调成功",
|
||||
|
||||
"Infrastructure": "基础设施",
|
||||
"Host - View": "机器 - 查看",
|
||||
"Host - Modify": "机器 - 修改",
|
||||
@@ -269,7 +274,11 @@ var I18N = `{
|
||||
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
|
||||
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
|
||||
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
|
||||
|
||||
"event is dropped": "事件已被丟棄,不會進行通知",
|
||||
"drop event success": "丟棄事件成功",
|
||||
"drop event failed": "丟棄事件失敗",
|
||||
"callback success": "回調成功",
|
||||
|
||||
"Infrastructure": "基礎設施",
|
||||
"Host - View": "機器 - 查看",
|
||||
"Host - Modify": "機器 - 修改",
|
||||
@@ -457,7 +466,11 @@ var I18N = `{
|
||||
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
|
||||
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
|
||||
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
|
||||
|
||||
"event is dropped": "イベントが破棄されました,通知は行われません",
|
||||
"drop event success": "イベント破棄成功",
|
||||
"drop event failed": "イベント破棄失敗",
|
||||
"callback success": "コールバック成功",
|
||||
|
||||
"Infrastructure": "インフラストラクチャ",
|
||||
"Host - View": "機器 - 閲覧",
|
||||
"Host - Modify": "機器 - 修正",
|
||||
@@ -645,7 +658,11 @@ var I18N = `{
|
||||
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
|
||||
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
|
||||
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
|
||||
|
||||
"event is dropped": "Событие отброшено, уведомление не будет отправлено",
|
||||
"drop event success": "Событие успешно отброшено",
|
||||
"drop event failed": "Не удалось отбросить событие",
|
||||
"callback success": "Обратный вызов успешен",
|
||||
|
||||
"Infrastructure": "Инфраструктура",
|
||||
"Host - View": "Хост - Просмотр",
|
||||
"Host - Modify": "Хост - Изменить",
|
||||
|
||||
Reference in New Issue
Block a user