Compare commits

...

5 Commits

Author SHA1 Message Date
ning
c205cd302b refactor: events query 2025-09-02 10:20:57 +08:00
ning
d35d8c52e1 update event noitfy 2025-09-02 10:20:57 +08:00
ning
751ec84dae update notify 2025-09-02 10:20:57 +08:00
Yening Qin
e1594c59fb refactor: dscache sync add datasource process hook (#2792) (#2842) 2025-08-26 15:55:34 +08:00
Yening Qin
7df0c417e9 update notify (#2836) 2025-08-22 16:15:36 +08:00
15 changed files with 323 additions and 47 deletions

View File

@@ -35,9 +35,9 @@ func MatchGroupsName(groupName string, groupFilter []models.TagFilter) bool {
func matchTag(value string, filter models.TagFilter) bool {
switch filter.Func {
case "==":
return strings.TrimSpace(filter.Value) == strings.TrimSpace(value)
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) == strings.TrimSpace(value)
case "!=":
return strings.TrimSpace(filter.Value) != strings.TrimSpace(value)
return strings.TrimSpace(fmt.Sprintf("%v", filter.Value)) != strings.TrimSpace(value)
case "in":
_, has := filter.Vset[value]
return has

View File

@@ -110,10 +110,6 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
e.persist(event)
if event.IsRecovered && event.NotifyRecovered == 0 {
return
}
e.dispatch.HandleEventNotify(event, false)
}

View File

@@ -45,6 +45,7 @@ type Dispatch struct {
tpls map[string]*template.Template
ExtraSenders map[string]sender.Sender
BeforeSenderHook func(*models.AlertCurEvent) bool
NotifyHook func(*models.AlertCurEvent, int64) bool
ctx *ctx.Context
Astats *astats.Stats
@@ -76,6 +77,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
tpls: make(map[string]*template.Template),
ExtraSenders: make(map[string]sender.Sender),
BeforeSenderHook: func(*models.AlertCurEvent) bool { return true },
NotifyHook: func(*models.AlertCurEvent, int64) bool { return false },
ctx: ctx,
Astats: astats,
@@ -166,6 +168,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
if !notifyRule.Enable {
continue
}
eventCopy.NotifyRuleId = notifyRuleId
eventCopy.NotifyRuleName = notifyRule.Name
var processors []models.Processor
for _, pipelineConfig := range notifyRule.PipelineConfigs {
@@ -200,10 +204,24 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
}
if eventCopy == nil {
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventOrigin}, notifyRuleId, "", "", "", errors.New("drop by processor"))
// 如果 eventCopy 为 nil说明 eventCopy 被 processor drop 掉了, 不再发送通知
continue
}
if e.NotifyHook(eventCopy, notifyRuleId) {
logger.Debugf("notify_id: %d, event:%+v, notify_hook return true", notifyRuleId, eventCopy)
continue
}
if eventCopy.IsRecovered && eventCopy.NotifyRecovered == 0 {
// 如果 eventCopy 是恢复事件,且 NotifyRecovered 为 0则不发送通知
continue
}
logger.Debugf("notify_id: %d, event:%+v, notify_hook return false", notifyRuleId, eventCopy)
// notify
for i := range notifyRule.NotifyConfigs {
err := NotifyRuleMatchCheck(&notifyRule.NotifyConfigs[i], eventCopy)
@@ -227,9 +245,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
continue
}
// todo go send
// todo 聚合 event
go e.sendV2([]*models.AlertCurEvent{eventCopy}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
go SendV2(e.ctx, e.userCache, e.userGroupCache, e.notifyChannelCache, []*models.AlertCurEvent{eventCopy}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
}
}
}
@@ -445,7 +461,8 @@ func GetNotifyConfigParams(notifyConfig *models.NotifyConfig, contactKey string,
return sendtos, flashDutyChannelIDs, customParams
}
func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
func SendV2(ctx *ctx.Context, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
events []*models.AlertCurEvent, notifyRuleId int64, notifyConfig *models.NotifyConfig, notifyChannel *models.NotifyChannelConfig, messageTemplate *models.MessageTemplate) {
if len(events) == 0 {
logger.Errorf("notify_id: %d events is empty", notifyRuleId)
return
@@ -461,10 +478,7 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
contactKey = notifyChannel.ParamConfig.UserInfo.ContactKey
}
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, e.userCache, e.userGroupCache)
e.Astats.GaugeNotifyRecordQueueSize.Inc()
defer e.Astats.GaugeNotifyRecordQueueSize.Dec()
sendtos, flashDutyChannelIDs, customParams := GetNotifyConfigParams(notifyConfig, contactKey, userCache, userGroupCache)
switch notifyChannel.RequestType {
case "flashduty":
@@ -474,10 +488,10 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
for i := range flashDutyChannelIDs {
start := time.Now()
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], notifyChannelCache.GetHttpClient(notifyChannel.ID))
respBody = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), respBody)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
}
case "http":
@@ -493,22 +507,22 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
}
// 将任务加入队列
success := e.notifyChannelCache.EnqueueNotifyTask(task)
success := notifyChannelCache.EnqueueNotifyTask(task)
if !success {
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
// 如果入队失败,记录错误通知
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
}
case "smtp":
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, e.notifyChannelCache.GetSmtpClient(notifyChannel.ID))
notifyChannel.SendEmail(notifyRuleId, events, tplContent, sendtos, notifyChannelCache.GetSmtpClient(notifyChannel.ID))
case "script":
start := time.Now()
target, res, err := notifyChannel.SendScript(events, tplContent, customParams, sendtos)
res = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), res)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
default:
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v send type not found", notifyRuleId, notifyChannel.Name, events[0])
}
@@ -523,6 +537,11 @@ func NeedBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
// event: 告警/恢复事件
// isSubscribe: 告警事件是否由subscribe的配置产生
func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bool) {
go e.HandleEventWithNotifyRule(event)
if event.IsRecovered && event.NotifyRecovered == 0 {
return
}
rule := e.alertRuleCache.Get(event.RuleId)
if rule == nil {
return
@@ -555,7 +574,6 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
notifyTarget.AndMerge(handler(rule, event, notifyTarget, e))
}
go e.HandleEventWithNotifyRule(event)
go e.Send(rule, event, notifyTarget, isSubscribe)
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event

View File

@@ -543,6 +543,9 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/notify-rule/custom-params", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRuleCustomParamsGet)
pages.POST("/notify-rule/event-pipelines-tryrun", rt.auth(), rt.user(), rt.perm("/notification-rules/add"), rt.tryRunEventProcessorByNotifyRule)
pages.GET("/event-tagkeys", rt.auth(), rt.user(), rt.eventTagKeys)
pages.GET("/event-tagvalues", rt.auth(), rt.user(), rt.eventTagValues)
// 事件Pipeline相关路由
pages.GET("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.eventPipelinesList)
pages.POST("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/add"), rt.addEventPipeline)

View File

@@ -13,6 +13,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
func getUserGroupIds(ctx *gin.Context, rt *Router, myGroups bool) ([]int64, error) {
@@ -305,3 +306,123 @@ func (rt *Router) alertCurEventDelByHash(c *gin.Context) {
hash := ginx.QueryStr(c, "hash")
ginx.NewRender(c).Message(models.AlertCurEventDelByHash(rt.Ctx, hash))
}
func (rt *Router) eventTagKeys(c *gin.Context) {
// 获取最近1天的活跃告警事件
now := time.Now().Unix()
stime := now - 24*3600
etime := now
// 获取用户可见的业务组ID列表
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
if err != nil {
logger.Warningf("failed to get business group ids: %v", err)
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 查询活跃告警事件,限制数量以提高性能
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 200, 0, []int64{})
if err != nil {
logger.Warningf("failed to get current alert events: %v", err)
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 如果没有查到事件,返回默认标签
if len(events) == 0 {
ginx.NewRender(c).Data([]string{"ident", "app", "service", "instance"}, nil)
return
}
// 收集所有标签键并去重
tagKeys := make(map[string]struct{})
for _, event := range events {
for key := range event.TagsMap {
tagKeys[key] = struct{}{}
}
}
// 转换为字符串切片
var result []string
for key := range tagKeys {
result = append(result, key)
}
// 如果没有收集到任何标签键,返回默认值
if len(result) == 0 {
result = []string{"ident", "app", "service", "instance"}
}
ginx.NewRender(c).Data(result, nil)
}
func (rt *Router) eventTagValues(c *gin.Context) {
// 获取标签key
tagKey := ginx.QueryStr(c, "key")
// 获取最近1天的活跃告警事件
now := time.Now().Unix()
stime := now - 24*3600
etime := now
// 获取用户可见的业务组ID列表
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView, false)
if err != nil {
logger.Warningf("failed to get business group ids: %v", err)
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 查询活跃告警事件,获取更多数据以保证统计准确性
events, err := models.AlertCurEventsGet(rt.Ctx, []string{}, bgids, stime, etime, []int64{}, []int64{}, []string{}, 0, "", 1000, 0, []int64{})
if err != nil {
logger.Warningf("failed to get current alert events: %v", err)
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 如果没有查到事件,返回空数组
if len(events) == 0 {
ginx.NewRender(c).Data([]string{}, nil)
return
}
// 统计标签值出现次数
valueCount := make(map[string]int)
for _, event := range events {
// TagsMap已经在AlertCurEventsGet中处理直接使用
if value, exists := event.TagsMap[tagKey]; exists && value != "" {
valueCount[value]++
}
}
// 转换为切片并按出现次数降序排序
type tagValue struct {
value string
count int
}
tagValues := make([]tagValue, 0, len(valueCount))
for value, count := range valueCount {
tagValues = append(tagValues, tagValue{value, count})
}
// 按出现次数降序排序
sort.Slice(tagValues, func(i, j int) bool {
return tagValues[i].count > tagValues[j].count
})
// 只取Top20并转换为字符串数组
limit := 20
if len(tagValues) < limit {
limit = len(tagValues)
}
result := make([]string, 0, limit)
for i := 0; i < limit; i++ {
result = append(result, tagValues[i].value)
}
ginx.NewRender(c).Data(result, nil)
}

View File

@@ -62,11 +62,11 @@ func (rt *Router) alertHisEventsList(c *gin.Context) {
ginx.Dangerous(err)
total, err := models.AlertHisEventTotal(rt.Ctx, prods, bgids, stime, etime, severity,
recovered, dsIds, cates, ruleId, query)
recovered, dsIds, cates, ruleId, query, []int64{})
ginx.Dangerous(err)
list, err := models.AlertHisEventGets(rt.Ctx, prods, bgids, stime, etime, severity, recovered,
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit))
dsIds, cates, ruleId, query, limit, ginx.Offset(c, limit), []int64{})
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@@ -8,6 +8,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/i18n"
)
// 获取事件Pipeline列表
@@ -139,12 +140,14 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
}
event := hisEvent.ToCur()
lang := c.GetHeader("X-Language")
var result string
for _, p := range f.PipelineConfig.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
}
event, _, err = processor.Process(rt.Ctx, event)
event, result, err = processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
@@ -152,7 +155,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
if event == nil {
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
"result": i18n.Sprintf(lang, "event is dropped"),
}, nil)
return
}
@@ -160,7 +163,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
m := map[string]interface{}{
"event": event,
"result": "",
"result": i18n.Sprintf(lang, result),
}
ginx.NewRender(c).Data(m, nil)
}
@@ -188,9 +191,10 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
ginx.Bomb(200, "processor err: %+v", err)
}
lang := c.GetHeader("X-Language")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": res,
"result": i18n.Sprintf(lang, res),
}, nil)
}
@@ -231,9 +235,10 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
if event == nil {
lang := c.GetHeader("X-Language")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
"result": i18n.Sprintf(lang, "event is dropped"),
}, nil)
return
}

View File

@@ -22,6 +22,8 @@ import (
var FromAPIHook func()
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
func Init(ctx *ctx.Context, fromAPI bool) {
go getDatasourcesFromDBLoop(ctx, fromAPI)
}
@@ -100,6 +102,10 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
if DatasourceProcessHook != nil {
dss = DatasourceProcessHook(dss)
}
PutDatasources(dss)
} else {
FromAPIHook()

View File

@@ -78,6 +78,8 @@ type AlertCurEvent struct {
RuleHash string `json:"rule_hash" gorm:"-"`
ExtraInfoMap []map[string]string `json:"extra_info_map" gorm:"-"`
NotifyRuleIds []int64 `json:"notify_rule_ids" gorm:"serializer:json"`
NotifyRuleId int64 `json:"notify_rule_id" gorm:"-"`
NotifyRuleName string `json:"notify_rule_name" gorm:"-"`
NotifyVersion int `json:"notify_version" gorm:"-"` // 0: old, 1: new
NotifyRules []*EventNotifyRule `json:"notify_rules" gorm:"-"`

View File

@@ -127,7 +127,7 @@ func (e *AlertHisEvent) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*User
func AlertHisEventTotal(
ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64, severity int,
recovered int, dsIds []int64, cates []string, ruleId int64, query string) (int64, error) {
recovered int, dsIds []int64, cates []string, ruleId int64, query string, eventIds []int64) (int64, error) {
session := DB(ctx).Model(&AlertHisEvent{}).Where("last_eval_time between ? and ?", stime, etime)
if len(prods) > 0 {
@@ -158,6 +158,10 @@ func AlertHisEventTotal(
session = session.Where("rule_id = ?", ruleId)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -171,7 +175,7 @@ func AlertHisEventTotal(
func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
severity int, recovered int, dsIds []int64, cates []string, ruleId int64, query string,
limit, offset int) ([]AlertHisEvent, error) {
limit, offset int, eventIds []int64) ([]AlertHisEvent, error) {
session := DB(ctx).Where("last_eval_time between ? and ?", stime, etime)
if len(prods) != 0 {
@@ -202,6 +206,10 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgids []int64, stime, e
session = session.Where("rule_id = ?", ruleId)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {

View File

@@ -20,7 +20,7 @@ type TagFilter struct {
Key string `json:"key"` // tag key
Func string `json:"func"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
Op string `json:"op"` // `==` | `=~` | `in` | `!=` | `!~` | `not in`
Value string `json:"value"` // tag value
Value interface{} `json:"value"` // tag value
Regexp *regexp.Regexp // parse value to regexp if func = '=~' or '!~'
Vset map[string]struct{} // parse value to regexp if func = 'in' or 'not in'
}
@@ -46,15 +46,59 @@ func ParseTagFilter(bFilters []TagFilter) ([]TagFilter, error) {
var err error
for i := 0; i < len(bFilters); i++ {
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
// 这里存在两个情况,一个是 string 一个是 int
var pattern string
switch v := bFilters[i].Value.(type) {
case string:
pattern = v
case int:
pattern = strconv.Itoa(v)
default:
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
}
bFilters[i].Regexp, err = regexp.Compile(pattern)
if err != nil {
return nil, err
}
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
arr := strings.Fields(bFilters[i].Value)
// 这里存在两个情况,一个是 string 一个是[]int
bFilters[i].Vset = make(map[string]struct{})
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
switch v := bFilters[i].Value.(type) {
case string:
// 处理字符串情况
arr := strings.Fields(v)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
case []int:
// 处理[]int情况
for j := 0; j < len(v); j++ {
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
}
case []string:
for j := 0; j < len(v); j++ {
bFilters[i].Vset[v[j]] = struct{}{}
}
case []interface{}:
// 处理[]interface{}情况JSON解析可能产生
for j := 0; j < len(v); j++ {
switch item := v[j].(type) {
case string:
bFilters[i].Vset[item] = struct{}{}
case int:
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
case float64:
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
}
}
default:
// 兜底处理,转为字符串
str := fmt.Sprintf("%v", v)
arr := strings.Fields(str)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
}
}
}
@@ -73,15 +117,54 @@ func GetTagFilters(jsonArr ormx.JSONArr) ([]TagFilter, error) {
}
for i := 0; i < len(bFilters); i++ {
if bFilters[i].Func == "=~" || bFilters[i].Func == "!~" {
bFilters[i].Regexp, err = regexp.Compile(bFilters[i].Value)
var pattern string
switch v := bFilters[i].Value.(type) {
case string:
pattern = v
case int:
pattern = strconv.Itoa(v)
default:
return nil, fmt.Errorf("unsupported value type for regex: %T", v)
}
bFilters[i].Regexp, err = regexp.Compile(pattern)
if err != nil {
return nil, err
}
} else if bFilters[i].Func == "in" || bFilters[i].Func == "not in" {
arr := strings.Fields(bFilters[i].Value)
bFilters[i].Vset = make(map[string]struct{})
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
// 在GetTagFilters中Value通常是string类型但也要处理其他可能的类型
switch v := bFilters[i].Value.(type) {
case string:
// 处理字符串情况
arr := strings.Fields(v)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
case []int:
// 处理[]int情况
for j := 0; j < len(v); j++ {
bFilters[i].Vset[strconv.Itoa(v[j])] = struct{}{}
}
case []interface{}:
// 处理[]interface{}情况JSON解析可能产生
for j := 0; j < len(v); j++ {
switch item := v[j].(type) {
case string:
bFilters[i].Vset[item] = struct{}{}
case int:
bFilters[i].Vset[strconv.Itoa(item)] = struct{}{}
case float64:
bFilters[i].Vset[strconv.Itoa(int(item))] = struct{}{}
}
}
default:
// 兜底处理,转为字符串
str := fmt.Sprintf("%v", v)
arr := strings.Fields(str)
for j := 0; j < len(arr); j++ {
bFilters[i].Vset[arr[j]] = struct{}{}
}
}
}
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"html/template"
"regexp"
"sort"
"strings"
texttemplate "text/template"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
// MessageTemplate 消息模板结构
@@ -714,10 +716,26 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
return tpl.Update(ctx, *t)
}
func GetAggrKey(events []*AlertCurEvent) string {
if len(events) <= 1 {
return ""
}
ids := make([]string, 0)
for i := range len(events) {
ids = append(ids, fmt.Sprintf("%d", events[i].Id))
}
sort.Strings(ids)
idsStr := strings.Join(ids, ",")
logger.Debugf("notify_hook aggr_key: %s, ids: %v", str.MD5(idsStr), ids)
return str.MD5(idsStr)
}
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
if t == nil {
return nil
}
// event 内容渲染到 messageTemplate
tplContent := make(map[string]interface{})
for key, msgTpl := range t.Content {
@@ -728,6 +746,8 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
"{{ $value := $event.TriggerValue }}",
}
defs = append(defs, fmt.Sprintf("{{ $aggrkey := \"%s\" }}", GetAggrKey(events)))
var body bytes.Buffer
if t.NotifyChannelIdent == "email" {
text := strings.Join(append(defs, msgTpl), "")

View File

@@ -444,6 +444,7 @@ type NotifyRule struct {
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text"`
ExtraConfig interface{} `gorm:"column:extra_config;type:text"`
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`

View File

@@ -19,6 +19,7 @@ type NotifyRule struct {
// 通知配置
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
ExtraConfig interface{} `json:"extra_config" gorm:"serializer:json"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
@@ -75,11 +76,6 @@ func GetNotifyRule(c *ctx.Context, id int64) (*NotifyRule, error) {
return &rule, nil
}
// 更新 NotifyRule
func UpdateNotifyRule(c *ctx.Context, rule *NotifyRule) error {
return DB(c).Save(rule).Error
}
// 删除 NotifyRule
func DeleteNotifyRule(c *ctx.Context, id int64) error {
return DB(c).Delete(&NotifyRule{}, id).Error

View File

@@ -79,6 +79,11 @@ var I18N = `{
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
"event is dropped": "事件已被丢弃,不会进行通知",
"drop event success": "丢弃事件成功",
"drop event failed": "丢弃事件失败",
"callback success": "回调成功",
"Infrastructure": "基础设施",
"Host - View": "机器 - 查看",
"Host - Modify": "机器 - 修改",
@@ -269,7 +274,11 @@ var I18N = `{
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
"event is dropped": "事件已被丟棄,不會進行通知",
"drop event success": "丟棄事件成功",
"drop event failed": "丟棄事件失敗",
"callback success": "回調成功",
"Infrastructure": "基礎設施",
"Host - View": "機器 - 查看",
"Host - Modify": "機器 - 修改",
@@ -457,7 +466,11 @@ var I18N = `{
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
"event is dropped": "イベントが破棄されました,通知は行われません",
"drop event success": "イベント破棄成功",
"drop event failed": "イベント破棄失敗",
"callback success": "コールバック成功",
"Infrastructure": "インフラストラクチャ",
"Host - View": "機器 - 閲覧",
"Host - Modify": "機器 - 修正",
@@ -645,7 +658,11 @@ var I18N = `{
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
"event is dropped": "Событие отброшено, уведомление не будет отправлено",
"drop event success": "Событие успешно отброшено",
"drop event failed": "Не удалось отбросить событие",
"callback success": "Обратный вызов успешен",
"Infrastructure": "Инфраструктура",
"Host - View": "Хост - Просмотр",
"Host - Modify": "Хост - Изменить",