Compare commits

...

13 Commits

Author SHA1 Message Date
ning
6d6460f138 update inhibit notify record 2025-09-05 18:17:47 +08:00
ning
9ec424f98c update notify aggr 2025-09-05 11:21:31 +08:00
ning
c931f43748 change notify 2025-09-04 17:34:03 +08:00
ning
f525dcf185 add event detail 2025-09-02 17:36:35 +08:00
ning
944ee5b801 refactor: events query 2025-09-01 20:27:51 +08:00
ning
fa6fef1689 update event noitfy 2025-08-26 19:54:49 +08:00
ning
167c8aece6 update notify 2025-08-22 19:12:12 +08:00
ning
378fece50b update notity 2025-08-22 10:08:27 +08:00
ning
bb6da02e7f update event api 2025-08-20 20:05:55 +08:00
ning
349e87ce8e update notify 2025-08-19 10:35:17 +08:00
ning
c37cfaa7ce refactor: update notify record 2025-08-15 13:32:05 +08:00
ning
4c1afb1191 refactor: update event tag filter 2025-08-15 10:28:04 +08:00
ning
14e3fd6fa3 update notify 2025-08-12 11:23:30 +08:00
15 changed files with 342 additions and 54 deletions

View File

@@ -35,9 +35,9 @@ func MatchGroupsName(groupName string, groupFilter []models.TagFilter) bool {
func matchTag(value string, filter models.TagFilter) bool {
switch filter.Func {
case "==":
return strings.TrimSpace(filter.Value) == strings.TrimSpace(value)
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) == strings.TrimSpace(value)
case "!=":
return strings.TrimSpace(filter.Value) != strings.TrimSpace(value)
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) != strings.TrimSpace(value)
case "in":
_, has := filter.Vset[value]
return has

View File

@@ -110,10 +110,6 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
e.persist(event)
if event.IsRecovered && event.NotifyRecovered == 0 {
return
}
e.dispatch.HandleEventNotify(event, false)
}

View File

@@ -24,6 +24,15 @@ import (
"github.com/toolkits/pkg/logger"
)
var ShouldSkipNotify func(*ctx.Context, *models.AlertCurEvent, int64) bool
var SendByNotifyRule func(*ctx.Context, *memsto.UserCacheType, *memsto.UserGroupCacheType, *memsto.NotifyChannelCacheType,
[]*models.AlertCurEvent, int64, *models.NotifyConfig, *models.NotifyChannelConfig, *models.MessageTemplate)
func init() {
ShouldSkipNotify = shouldSkipNotify
SendByNotifyRule = SendNotifyRuleMessage
}
type Dispatch struct {
alertRuleCache *memsto.AlertRuleCacheType
userCache *memsto.UserCacheType
@@ -45,9 +54,8 @@ type Dispatch struct {
tpls map[string]*template.Template
ExtraSenders map[string]sender.Sender
BeforeSenderHook func(*models.AlertCurEvent) bool
ctx *ctx.Context
Astats *astats.Stats
ctx *ctx.Context
Astats *astats.Stats
RwLock sync.RWMutex
}
@@ -56,7 +64,7 @@ type Dispatch struct {
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, c *ctx.Context, astats *astats.Stats) *Dispatch {
notify := &Dispatch{
alertRuleCache: alertRuleCache,
userCache: userCache,
@@ -77,7 +85,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
ExtraSenders: make(map[string]sender.Sender),
BeforeSenderHook: func(*models.AlertCurEvent) bool { return true },
ctx: ctx,
ctx: c,
Astats: astats,
}
@@ -166,6 +174,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
if !notifyRule.Enable {
continue
}
eventCopy.NotifyRuleId = notifyRuleId
eventCopy.NotifyRuleName = notifyRule.Name
var processors []models.Processor
for _, pipelineConfig := range notifyRule.PipelineConfigs {
@@ -194,13 +204,14 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
eventCopy, res, err = processor.Process(e.ctx, eventCopy)
if eventCopy == nil {
logger.Warningf("after processor notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, "", "", res, errors.New("drop by processor"))
break
}
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
}
if eventCopy == nil {
// 如果 eventCopy 为 nil说明 eventCopy 被 processor drop 掉了, 不再发送通知
if ShouldSkipNotify(e.ctx, eventCopy, notifyRuleId) {
logger.Infof("notify_id: %d, event:%+v, should skip notify", notifyRuleId, eventCopy)
continue
}
@@ -227,14 +238,26 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
continue
}
// todo go send
// todo 聚合 event
go e.sendV2([]*models.AlertCurEvent{eventCopy}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
go SendByNotifyRule(e.ctx, e.userCache, e.userGroupCache, e.notifyChannelCache, []*models.AlertCurEvent{eventCopy}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
}
}
}
}
func shouldSkipNotify(ctx *ctx.Context, event *models.AlertCurEvent, notifyRuleId int64) bool {
if event == nil {
// 如果 eventCopy 为 nil说明 eventCopy 被 processor drop 掉了, 不再发送通知
return true
}
if event.IsRecovered && event.NotifyRecovered == 0 {
// 如果 eventCopy 是恢复事件,且 NotifyRecovered 为 0则不发送通知
return true
}
return false
}
func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
if pipeline == nil {
return true
@@ -445,7 +468,8 @@ func GetNotifyConfigParams(notifyConfig *models.NotifyConfig, contactKey string,
return sendtos, flashDutyChannelIDs, customParams
}
func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
func SendNotifyRuleMessage(ctx *ctx.Context, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
if len(events) == 0 {
logger.Errorf("notify_id: %d events is empty", notifyRuleId)
return
@@ -461,10 +485,7 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
contactKey = notifyChannel.ParamConfig.UserInfo.ContactKey
}
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, e.userCache, e.userGroupCache)
e.Astats.GaugeNotifyRecordQueueSize.Inc()
defer e.Astats.GaugeNotifyRecordQueueSize.Dec()
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, userCache, userGroupCache)
switch notifyChannel.RequestType {
case "flashduty":
@@ -474,10 +495,10 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
for i := range flashDutyChannelIDs {
start := time.Now()
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], notifyChannelCache.GetHttpClient(notifyChannel.ID))
respBody = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), respBody)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
}
case "http":
@@ -493,22 +514,22 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
}
// 将任务加入队列
success := e.notifyChannelCache.EnqueueNotifyTask(task)
success := notifyChannelCache.EnqueueNotifyTask(task)
if !success {
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
// 如果入队失败,记录错误通知
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
}
case "smtp":
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, e.notifyChannelCache.GetSmtpClient(notifyChannel.ID))
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, notifyChannelCache.GetSmtpClient(notifyChannel.ID))
case "script":
start := time.Now()
target, res, err := notifyChannel.SendScript(events, tplContent, customParams, sendtos)
res = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), res)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
default:
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v send type not found", notifyRuleId, notifyChannel.Name, events[0])
}
@@ -523,6 +544,11 @@ func NeedBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
// event: 告警/恢复事件
// isSubscribe: 告警事件是否由subscribe的配置产生
func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bool) {
go e.HandleEventWithNotifyRule(event)
if event.IsRecovered && event.NotifyRecovered == 0 {
return
}
rule := e.alertRuleCache.Get(event.RuleId)
if rule == nil {
return
@@ -555,7 +581,6 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
notifyTarget.AndMerge(handler(rule, event, notifyTarget, e))
}
go e.HandleEventWithNotifyRule(event)
go e.Send(rule, event, notifyTarget, isSubscribe)
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event

View File

@@ -543,6 +543,9 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/notify-rule/custom-params", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRuleCustomParamsGet)
pages.POST("/notify-rule/event-pipelines-tryrun", rt.auth(), rt.user(), rt.perm("/notification-rules/add"), rt.tryRunEventProcessorByNotifyRule)
pages.GET("/event-tagkeys", rt.auth(), rt.user(), rt.eventTagKeys)
pages.GET("/event-tagvalues", rt.auth(), rt.user(), rt.eventTagValues)
// 事件Pipeline相关路由
pages.GET("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.eventPipelinesList)
pages.POST("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/add"), rt.addEventPipeline)

View File

@@ -13,6 +13,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
func getUserGroupIds(ctx *gin.Context, rt *Router, myGroups bool) ([]int64, error) {
@@ -305,3 +306,123 @@ func (rt *Router) alertCurEventDelByHash(c *gin.Context) {
hash := ginx.QueryStr(c, "hash")
ginx.NewRender(c).Message(models.AlertCurEventDelByHash(rt.Ctx, hash))
}
func (rt *Router) eventTagKeys(c *gin.Context) {
// 获取最近1天的活跃告警事件
now := time.Now().Unix()
stime := now - 24*3600
etime := now
// 获取用户可见的业务组ID列表
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
if err != nil {
logger.Warningf("failed to get business group ids: %v", err)
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 查询活跃告警事件,限制数量以提高性能
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 200, 0, []int64{})
if err != nil {
logger.Warningf("failed to get current alert events: %v", err)
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 如果没有查到事件,返回默认标签
if len(events) == 0 {
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 收集所有标签键并去重
tagKeys := make(map[string]struct{})
for _, event := range events {
for key := range event.TagsMap {
tagKeys[key] = struct{}{}
}
}
// 转换为字符串切片
var result []string
for key := range tagKeys {
result = append(result, key)
}
// 如果没有收集到任何标签键,返回默认值
if len(result) == 0 {
result = []string{"ident", "app", "service", "instance"}
}
ginx.NewRender(c).Data(result, nil)
}
func (rt *Router) eventTagValues(c *gin.Context) {
// 获取标签key
tagKey := ginx.QueryStr(c, "key")
// 获取最近1天的活跃告警事件
now := time.Now().Unix()
stime := now - 24*3600
etime := now
// 获取用户可见的业务组ID列表
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
if err != nil {
logger.Warningf("failed to get business group ids: %v", err)
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 查询活跃告警事件,获取更多数据以保证统计准确性
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 1000, 0, []int64{})
if err != nil {
logger.Warningf("failed to get current alert events: %v", err)
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 如果没有查到事件,返回空数组
if len(events) == 0 {
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 统计标签值出现次数
valueCount := make(map[string]int)
for _, event := range events {
// TagsMap已经在AlertCurEventsGet中处理直接使用
if value, exists := event.TagsMap[tagKey]; exists && value != "" {
valueCount[value]++
}
}
// 转换为切片并按出现次数降序排序
type tagValue struct {
value string
count int
}
tagValues := make([]tagValue, 0, len(valueCount))
for value, count := range valueCount {
tagValues = append(tagValues, tagValue{value, count})
}
// 按出现次数降序排序
sort.Slice(tagValues, func(i, j int) bool {
return tagValues[i].count > tagValues[j].count
})
// 只取Top20并转换为字符串数组
limit := 20
if len(tagValues) < limit {
limit = len(tagValues)
}
result := make([]string, 0, limit)
for i := 0; i < limit; i++ {
result = append(result, tagValues[i].value)
}
ginx.NewRender(c).Data(result, nil)
}

View File

@@ -62,11 +62,11 @@ func (rt *Router) alertHisEventsList(c *gin.Context) {
ginx.Dangerous(err)
total, err := models.AlertHisEventTotal(rt.Ctx, prods, bgids, stime, etime, severity,
recovered, dsIds, cates, ruleId, query)
recovered, dsIds, cates, ruleId, query, []int64{})
ginx.Dangerous(err)
list, err := models.AlertHisEventGets(rt.Ctx, prods, bgids, stime, etime, severity, recovered,
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit))
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit), []int64{})
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@@ -8,6 +8,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/i18n"
)
// 获取事件Pipeline列表
@@ -139,12 +140,14 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
}
event := hisEvent.ToCur()
lang := c.GetHeader("X-Language")
var result string
for _, p := range f.PipelineConfig.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
}
event, _, err = processor.Process(rt.Ctx, event)
event, result, err = processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
@@ -152,7 +155,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
if event == nil {
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
"result": i18n.Sprintf(lang, "event is dropped"),
}, nil)
return
}
@@ -160,7 +163,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
m := map[string]interface{}{
"event": event,
"result": "",
"result": i18n.Sprintf(lang, result),
}
ginx.NewRender(c).Data(m, nil)
}
@@ -188,9 +191,10 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
ginx.Bomb(200, "processor err: %+v", err)
}
lang := c.GetHeader("X-Language")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": res,
"result": i18n.Sprintf(lang, res),
}, nil)
}
@@ -231,9 +235,10 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
if event == nil {
lang := c.GetHeader("X-Language")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
"result": i18n.Sprintf(lang, "event is dropped"),
}, nil)
return
}

View File

@@ -196,6 +196,7 @@ func (rt *Router) eventsMessage(c *gin.Context) {
var defs = []string{
"{{$events := .}}",
"{{$event := index . 0}}",
"{{$aggr_key := \"\"}}",
}
ret := make(map[string]string, len(req.Tpl.Content))
for k, v := range req.Tpl.Content {

View File

@@ -78,6 +78,8 @@ type AlertCurEvent struct {
RuleHash string `json:"rule_hash" gorm:"-"`
ExtraInfoMap []map[string]string `json:"extra_info_map" gorm:"-"`
NotifyRuleIds []int64 `json:"notify_rule_ids" gorm:"serializer:json"`
NotifyRuleId int64 `json:"notify_rule_id" gorm:"-"`
NotifyRuleName string `json:"notify_rule_name" gorm:"-"`
NotifyVersion int `json:"notify_version" gorm:"-"` // 0: old, 1: new
NotifyRules []*EventNotifyRule `json:"notify_rules" gorm:"-"`

View File

@@ -127,7 +127,7 @@ func (e *AlertHisEvent) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*User
func AlertHisEventTotal(
ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64, severity int,
recovered int, dsIds []int64, cates []string, ruleId int64, query string) (int64, error) {
recovered int, dsIds []int64, cates []string, ruleId int64, query string, eventIds []int64) (int64, error) {
session := DB(ctx).Model(&AlertHisEvent{}).Where("last_eval_time between ? and ?", stime, etime)
if len(prods) > 0 {
@@ -158,6 +158,10 @@ func AlertHisEventTotal(
session = session.Where("rule_id = ?", ruleId)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -171,7 +175,7 @@ func AlertHisEventTotal(
func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
severity int, recovered int, dsIds []int64, cates []string, ruleId int64, query string,
limit, offset int) ([]AlertHisEvent, error) {
limit, offset int, eventIds []int64) ([]AlertHisEvent, error) {
session := DB(ctx).Where("last_eval_time between ? and ?", stime, etime)
if len(prods) != 0 {
@@ -202,6 +206,10 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, e
session = session.Where("rule_id = ?", ruleId)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {

View File

@@ -20,7 +20,7 @@ type TagFilter struct {
Key string `json:"key"` // tag key
Func string `json:"func"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
Op string `json:"op"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
Value string `json:"value"` // tag value
Value interface{} `json:"value"` // tag value
Regexp *regexp.Regexp // parse value to regexp if func = '=~' or '!~'
Vset map[string]struct{} // parse value to regexp if func = 'in' or 'not in'
}
@@ -46,15 +46,59 @@ func ParseTagFilter(bFilters []TagFilter) ([]TagFilter, error) {
var err error
for i := 0; i < len(bFilters); i++ {
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
// 这里存在两个情况,一个是 string 一个是 int
var pattern string
switch v := bFilters[i].Value.(type) {
case string:
pattern = v
case int:
pattern = strconv.Itoa(v)
default:
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
}
bFilters[i].Regexp, err = regexp.Compile(pattern)
if err != nil {
return nil, err
}
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
arr := strings.Fields(bFilters[i].Value)
// 这里存在两个情况,一个是 string 一个是[]int
bFilters[i].Vset = make(map[string]struct{})
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
switch v := bFilters[i].Value.(type) {
case string:
// 处理字符串情况
arr := strings.Fields(v)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
case []int:
// 处理[]int情况
for j := 0; j < len(v); j++ {
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
}
case []string:
for j := 0; j < len(v); j++ {
bFilters[i].Vset[v[j]] = struct{}{}
}
case []interface{}:
// 处理[]interface{}情况JSON解析可能产生
for j := 0; j < len(v); j++ {
switch item := v[j].(type) {
case string:
bFilters[i].Vset[item] = struct{}{}
case int:
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
case float64:
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
}
}
default:
// 兜底处理,转为字符串
str := fmt.Sprintf("%v", v)
arr := strings.Fields(str)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
}
}
}
@@ -73,15 +117,54 @@ func GetTagFilters(jsonArr ormx.JSONArr) ([]TagFilter, error) {
}
for i := 0; i < len(bFilters); i++ {
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
var pattern string
switch v := bFilters[i].Value.(type) {
case string:
pattern = v
case int:
pattern = strconv.Itoa(v)
default:
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
}
bFilters[i].Regexp, err = regexp.Compile(pattern)
if err != nil {
return nil, err
}
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
arr := strings.Fields(bFilters[i].Value)
bFilters[i].Vset = make(map[string]struct{})
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
// 在GetTagFilters中Value通常是string类型但也要处理其他可能的类型
switch v := bFilters[i].Value.(type) {
case string:
// 处理字符串情况
arr := strings.Fields(v)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
case []int:
// 处理[]int情况
for j := 0; j < len(v); j++ {
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
}
case []interface{}:
// 处理[]interface{}情况JSON解析可能产生
for j := 0; j < len(v); j++ {
switch item := v[j].(type) {
case string:
bFilters[i].Vset[item] = struct{}{}
case int:
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
case float64:
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
}
}
default:
// 兜底处理,转为字符串
str := fmt.Sprintf("%v", v)
arr := strings.Fields(str)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"html/template"
"regexp"
"sort"
"strings"
texttemplate "text/template"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
// MessageTemplate 消息模板结构
@@ -714,10 +716,26 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
return tpl.Update(ctx, *t)
}
func GetAggrKey(events []*AlertCurEvent) string {
if len(events) <= 1 {
return ""
}
ids := make([]string, 0)
for i := range len(events) {
ids = append(ids, fmt.Sprintf("%d", events[i].Id))
}
sort.Strings(ids)
idsStr := strings.Join(ids, ",")
logger.Debugf("notify_hook aggr_key: %s, ids: %v", str.MD5(idsStr), ids)
return str.MD5(idsStr)
}
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
if t == nil {
return nil
}
// event 内容渲染到 messageTemplate
tplContent := make(map[string]interface{})
for key, msgTpl := range t.Content {
@@ -728,6 +746,8 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
"{{ $value := $event.TriggerValue }}",
}
defs = append(defs, fmt.Sprintf("{{ $aggrkey := \"%s\" }}", GetAggrKey(events)))
var body bytes.Buffer
if t.NotifyChannelIdent == "email" {
text := strings.Join(append(defs, msgTpl), "")

View File

@@ -444,6 +444,7 @@ type NotifyRule struct {
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text"`
ExtraConfig interface{} `gorm:"column:extra_config;type:text"`
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`

View File

@@ -1,7 +1,10 @@
package models
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
@@ -19,6 +22,7 @@ type NotifyRule struct {
// 通知配置
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
ExtraConfig interface{} `json:"extra_config" gorm:"serializer:json"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
@@ -39,6 +43,7 @@ type NotifyConfig struct {
ChannelID int64 `json:"channel_id"` // 通知媒介(如:阿里云短信)
TemplateID int64 `json:"template_id"` // 通知模板
Params map[string]interface{} `json:"params"` // 通知参数
Type string `json:"type"`
Severities []int `json:"severities"` // 适用级别(一级告警、二级告警、三级告警)
TimeRanges []TimeRanges `json:"time_ranges"` // 适用时段
@@ -46,6 +51,12 @@ type NotifyConfig struct {
Attributes []TagFilter `json:"attributes"` // 适用属性
}
func (n *NotifyConfig) Hash() string {
hash := sha256.New()
hash.Write([]byte(fmt.Sprintf("%d%d%v%s%v%v%v%v", n.ChannelID, n.TemplateID, n.Params, n.Type, n.Severities, n.TimeRanges, n.LabelKeys, n.Attributes)))
return hex.EncodeToString(hash.Sum(nil))
}
type CustomParams struct {
UserIDs []int64 `json:"user_ids"`
UserGroupIDs []int64 `json:"user_group_ids"`
@@ -75,11 +86,6 @@ func GetNotifyRule(c *ctx.Context, id int64) (*NotifyRule, error) {
return &rule, nil
}
// 更新 NotifyRule
func UpdateNotifyRule(c *ctx.Context, rule *NotifyRule) error {
return DB(c).Save(rule).Error
}
// 删除 NotifyRule
func DeleteNotifyRule(c *ctx.Context, id int64) error {
return DB(c).Delete(&NotifyRule{}, id).Error

View File

@@ -79,6 +79,11 @@ var I18N = `{
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
"event is dropped": "事件已被丢弃,不会进行通知",
"drop event success": "丢弃事件成功",
"drop event failed": "丢弃事件失败",
"callback success": "回调成功",
"Infrastructure": "基础设施",
"Host - View": "机器 - 查看",
"Host - Modify": "机器 - 修改",
@@ -269,7 +274,11 @@ var I18N = `{
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
"event is dropped": "事件已被丟棄,不會進行通知",
"drop event success": "丟棄事件成功",
"drop event failed": "丟棄事件失敗",
"callback success": "回調成功",
"Infrastructure": "基礎設施",
"Host - View": "機器 - 查看",
"Host - Modify": "機器 - 修改",
@@ -457,7 +466,11 @@ var I18N = `{
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
"event is dropped": "イベントが破棄されました,通知は行われません",
"drop event success": "イベント破棄成功",
"drop event failed": "イベント破棄失敗",
"callback success": "コールバック成功",
"Infrastructure": "インフラストラクチャ",
"Host - View": "機器 - 閲覧",
"Host - Modify": "機器 - 修正",
@@ -645,7 +658,11 @@ var I18N = `{
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
"event is dropped": "Событие отброшено, уведомление не будет отправлено",
"drop event success": "Событие успешно отброшено",
"drop event failed": "Не удалось отбросить событие",
"callback success": "Обратный вызов успешен",
"Infrastructure": "Инфраструктура",
"Host - View": "Хост - Просмотр",
"Host - Modify": "Хост - Изменить",