Compare commits

...

1 Commits

Author SHA1 Message Date
smx_Morgan
65a1ca95ff refactor: event notify (#2818) 2025-08-12 16:53:32 +08:00
4 changed files with 72 additions and 22 deletions

View File

@@ -221,7 +221,6 @@ func SendNotifyChannelMessage(ctx *ctx.Context, userCache *memsto.UserCacheType,
if err != nil {
return "", fmt.Errorf("failed to get http client: %v", err)
}
if notifyChannel.RequestConfig == nil {
return "", fmt.Errorf("request config is nil")
}

View File

@@ -33,6 +33,8 @@ type NotifyTask struct {
// NotifyRecordFunc 通知记录函数类型
type NotifyRecordFunc func(ctx *ctx.Context, events []*models.AlertCurEvent, notifyRuleId int64, channelName, target, resp string, err error)
type NotifyCallbackFunc func(ctx *ctx.Context, ident string, params map[string]string)
type NotifyChannelCacheType struct {
statTotal int64
statLastUpdated int64
@@ -52,13 +54,15 @@ type NotifyChannelCacheType struct {
// 通知记录回调函数
notifyRecordFunc NotifyRecordFunc
NotifyCallback NotifyCallbackFunc
}
func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheType {
func NewNotifyChannelCache(c *ctx.Context, stats *Stats) *NotifyChannelCacheType {
ncc := &NotifyChannelCacheType{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
ctx: c,
stats: stats,
channels: make(map[int64]*models.NotifyChannelConfig),
channelsQueue: make(map[int64]*list.SafeListLimited),
@@ -66,6 +70,9 @@ func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheTy
httpClient: make(map[int64]*http.Client),
smtpCh: make(map[int64]chan *models.EmailContext),
smtpQuitCh: make(map[int64]chan struct{}),
NotifyCallback: func(ctx *ctx.Context, ident string, params map[string]string) {
},
}
ncc.SyncNotifyChannels()
@@ -168,6 +175,8 @@ func (ncc *NotifyChannelCacheType) addOrUpdateChannels(newChannels map[int64]*mo
if newChannel.RequestType == "http" {
ncc.startHttpChannel(chID, newChannel)
}
go ncc.NotifyCallback(ncc.ctx, newChannel.Ident, newChannel.RequestConfig.HTTPRequestConfig.Request.Parameters)
case "smtp":
// 创建SMTP发送器
if newChannel.RequestConfig != nil && newChannel.RequestConfig.SMTPRequestConfig != nil {
@@ -305,6 +314,7 @@ func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
}
}
}
}
// 判断是否需要批量发送联系人

View File

@@ -714,6 +714,18 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
return tpl.Update(ctx, *t)
}
func (t *MessageTemplate) UpsertForce(ctx *ctx.Context, ident string) error {
tpl, err := MessageTemplateGet(ctx, "ident = ?", ident)
if err != nil {
return errors.WithMessage(err, "failed to get message tpl")
}
if tpl == nil {
return Insert(ctx, t)
}
return tpl.Update(ctx, *t)
}
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
if t == nil {
return nil

View File

@@ -42,6 +42,12 @@ type EmailContext struct {
Mail *gomail.Message
}
var NotifyHookQuery HookQuery = func(head map[string]string, ident string, query url.Values, params map[string]string) (url.Values, map[string]string, bool) {
return nil, nil, false
}
type HookQuery func(head map[string]string, ident string, query url.Values, params map[string]string) (url.Values, map[string]string, bool)
// NotifyChannelConfig 通知媒介
type NotifyChannelConfig struct {
ID int64 `json:"id" gorm:"primaryKey"`
@@ -454,21 +460,36 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return "", err
query, headers, getQueryOk := NotifyHookQuery(ncc.RequestConfig.HTTPRequestConfig.Headers, ncc.Ident, query, parameters)
if !getQueryOk {
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return "", err
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
} else {
for key, value := range headers {
@@ -476,12 +497,6 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
@@ -1459,6 +1474,20 @@ func (ncc *NotifyChannelConfig) Upsert(ctx *ctx.Context) error {
return ch.Update(ctx, *ncc)
}
// UpsertForce 强制更新,用于初始化时覆盖现有记录
func (ncc *NotifyChannelConfig) UpsertForce(ctx *ctx.Context) error {
ch, err := NotifyChannelGet(ctx, "name = ?", ncc.Name)
if err != nil {
return errors.WithMessage(err, "notify channel init failed to get message tpl")
}
if ch == nil {
return Insert(ctx, ncc)
}
return ch.Update(ctx, *ncc)
}
var FeishuAppBody = `#!/usr/bin/env python
# -*- coding: UTF-8 -*-