Compare commits

...

2 Commits

Author SHA1 Message Date
ning
f8ddce8149 code refactor 2025-06-17 18:44:19 +08:00
ning
45685947dd refactor: event notify 2025-06-17 18:00:45 +08:00
2 changed files with 312 additions and 97 deletions

View File

@@ -82,6 +82,10 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
}
pipeline.Init()
// 设置通知记录回调函数
notifyChannelCache.SetNotifyRecordFunc(sender.NotifyRecord)
return notify
}
@@ -451,30 +455,25 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
}
return
case "http":
if e.notifyChannelCache.HttpConcurrencyAdd(notifyChannel.ID) {
defer e.notifyChannelCache.HttpConcurrencyDone(notifyChannel.ID)
}
if notifyChannel.RequestConfig == nil {
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, request config not found", notifyRuleId, notifyChannel.Name, events[0])
// 使用队列模式处理 http 通知
// 创建通知任务
task := &memsto.NotifyTask{
Events: events,
NotifyRuleId: notifyRuleId,
NotifyChannel: notifyChannel,
TplContent: tplContent,
CustomParams: customParams,
Sendtos: sendtos,
}
if notifyChannel.RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, http request config not found", notifyRuleId, notifyChannel.Name, events[0])
}
if NeedBatchContacts(notifyChannel.RequestConfig.HTTPRequestConfig) || len(sendtos) == 0 {
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, sendtos, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos, resp, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), resp, err)
} else {
for i := range sendtos {
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, []string{sendtos[i]}, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos[i], resp, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, []string{sendtos[i]}), resp, err)
}
// 将任务加入队列
success := e.notifyChannelCache.EnqueueNotifyTask(task)
if !success {
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
// 如果入队失败,记录错误通知
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
}
case "smtp":

View File

@@ -2,8 +2,10 @@ package memsto
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
@@ -14,9 +16,23 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/pkg/errors"
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/logger"
)
// NotifyTask 表示一个通知发送任务
type NotifyTask struct {
Events []*models.AlertCurEvent
NotifyRuleId int64
NotifyChannel *models.NotifyChannelConfig
TplContent map[string]interface{}
CustomParams map[string]string
Sendtos []string
}
// NotifyRecordFunc 通知记录函数类型
type NotifyRecordFunc func(ctx *ctx.Context, events []*models.AlertCurEvent, notifyRuleId int64, channelName, target, resp string, err error)
type NotifyChannelCacheType struct {
statTotal int64
statLastUpdated int64
@@ -24,13 +40,18 @@ type NotifyChannelCacheType struct {
stats *Stats
sync.RWMutex
channels map[int64]*models.NotifyChannelConfig // key: channel id
httpConcurrency map[int64]chan struct{}
channels map[int64]*models.NotifyChannelConfig // key: channel id
channelsQueue map[int64]*list.SafeListLimited
httpClient map[int64]*http.Client
smtpCh map[int64]chan *models.EmailContext
smtpQuitCh map[int64]chan struct{}
// 队列消费者控制
queueQuitCh map[int64]chan struct{}
// 通知记录回调函数
notifyRecordFunc NotifyRecordFunc
}
func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheType {
@@ -40,18 +61,20 @@ func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheTy
ctx: ctx,
stats: stats,
channels: make(map[int64]*models.NotifyChannelConfig),
channelsQueue: make(map[int64]*list.SafeListLimited),
queueQuitCh: make(map[int64]chan struct{}),
httpClient: make(map[int64]*http.Client),
smtpCh: make(map[int64]chan *models.EmailContext),
smtpQuitCh: make(map[int64]chan struct{}),
}
ncc.SyncNotifyChannels()
return ncc
}
func (ncc *NotifyChannelCacheType) Reset() {
ncc.Lock()
defer ncc.Unlock()
ncc.statTotal = -1
ncc.statLastUpdated = -1
ncc.channels = make(map[int64]*models.NotifyChannelConfig)
// SetNotifyRecordFunc 设置通知记录回调函数
func (ncc *NotifyChannelCacheType) SetNotifyRecordFunc(fn NotifyRecordFunc) {
ncc.notifyRecordFunc = fn
}
func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
@@ -62,30 +85,253 @@ func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
return true
}
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, httpConcurrency map[int64]chan struct{}, httpClient map[int64]*http.Client,
smtpCh map[int64]chan *models.EmailContext, quitCh map[int64]chan struct{}, total, lastUpdated int64) {
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, total, lastUpdated int64) {
ncc.Lock()
for _, k := range ncc.httpConcurrency {
close(k)
}
ncc.httpConcurrency = httpConcurrency
ncc.channels = m
ncc.httpClient = httpClient
ncc.smtpCh = smtpCh
defer ncc.Unlock()
for i := range ncc.smtpQuitCh {
close(ncc.smtpQuitCh[i])
}
// 1. 处理需要删除的通道
ncc.removeDeletedChannels(m)
ncc.smtpQuitCh = quitCh
ncc.Unlock()
// 2. 处理新增和更新的通道
ncc.addOrUpdateChannels(m)
// only one goroutine used, so no need lock
ncc.statTotal = total
ncc.statLastUpdated = lastUpdated
}
// removeDeletedChannels 移除已删除的通道
func (ncc *NotifyChannelCacheType) removeDeletedChannels(newChannels map[int64]*models.NotifyChannelConfig) {
for chID := range ncc.channels {
if _, exists := newChannels[chID]; !exists {
logger.Infof("removing deleted channel %d", chID)
// 停止消费者协程
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
close(quitCh)
delete(ncc.queueQuitCh, chID)
}
// 删除队列
delete(ncc.channelsQueue, chID)
// 删除HTTP客户端
delete(ncc.httpClient, chID)
// 停止SMTP发送器
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
close(quitCh)
delete(ncc.smtpQuitCh, chID)
delete(ncc.smtpCh, chID)
}
// 删除通道配置
delete(ncc.channels, chID)
}
}
}
// addOrUpdateChannels 添加或更新通道
func (ncc *NotifyChannelCacheType) addOrUpdateChannels(newChannels map[int64]*models.NotifyChannelConfig) {
for chID, newChannel := range newChannels {
oldChannel, exists := ncc.channels[chID]
if exists {
if ncc.channelConfigChanged(oldChannel, newChannel) {
logger.Infof("updating channel %d (new: %t)", chID, !exists)
ncc.stopChannelResources(chID)
} else {
logger.Infof("channel %d config not changed", chID)
continue
}
}
// 更新通道配置
ncc.channels[chID] = newChannel
// 根据类型创建相应的资源
switch newChannel.RequestType {
case "http", "flashduty":
// 创建HTTP客户端
if newChannel.RequestConfig != nil && newChannel.RequestConfig.HTTPRequestConfig != nil {
cli, err := models.GetHTTPClient(newChannel)
if err != nil {
logger.Warningf("failed to create HTTP client for channel %d: %v", chID, err)
} else {
if ncc.httpClient == nil {
ncc.httpClient = make(map[int64]*http.Client)
}
ncc.httpClient[chID] = cli
}
}
// 对于 http 类型,启动队列和消费者
if newChannel.RequestType == "http" {
ncc.startHttpChannel(chID, newChannel)
}
case "smtp":
// 创建SMTP发送器
if newChannel.RequestConfig != nil && newChannel.RequestConfig.SMTPRequestConfig != nil {
ch := make(chan *models.EmailContext)
quit := make(chan struct{})
go ncc.startEmailSender(chID, newChannel.RequestConfig.SMTPRequestConfig, ch, quit)
if ncc.smtpCh == nil {
ncc.smtpCh = make(map[int64]chan *models.EmailContext)
}
if ncc.smtpQuitCh == nil {
ncc.smtpQuitCh = make(map[int64]chan struct{})
}
ncc.smtpCh[chID] = ch
ncc.smtpQuitCh[chID] = quit
}
}
}
}
// channelConfigChanged 检查通道配置是否发生变化
func (ncc *NotifyChannelCacheType) channelConfigChanged(oldChannel, newChannel *models.NotifyChannelConfig) bool {
if oldChannel == nil || newChannel == nil {
return true
}
// check updateat
if oldChannel.UpdateAt != newChannel.UpdateAt {
return true
}
return false
}
// stopChannelResources 停止通道的相关资源
func (ncc *NotifyChannelCacheType) stopChannelResources(chID int64) {
// 停止HTTP消费者协程
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
close(quitCh)
delete(ncc.queueQuitCh, chID)
delete(ncc.channelsQueue, chID)
}
// 停止SMTP发送器
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
close(quitCh)
delete(ncc.smtpQuitCh, chID)
delete(ncc.smtpCh, chID)
}
}
// startHttpChannel 启动HTTP通道的队列和消费者
func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.NotifyChannelConfig) {
if channel.RequestConfig == nil || channel.RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify channel %+v http request config not found", channel)
return
}
// 创建队列
queue := list.NewSafeListLimited(100000)
ncc.channelsQueue[chID] = queue
// 启动消费者协程
quitCh := make(chan struct{})
ncc.queueQuitCh[chID] = quitCh
// 启动指定数量的消费者协程
concurrency := channel.RequestConfig.HTTPRequestConfig.Concurrency
for i := 0; i < concurrency; i++ {
go ncc.startNotifyConsumer(chID, queue, quitCh)
}
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
}
// 启动通知消费者协程
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
logger.Infof("starting notify consumer for channel %d", channelID)
for {
select {
case <-quitCh:
logger.Infof("notify consumer for channel %d stopped", channelID)
return
default:
// 从队列中取出任务
task := queue.PopBack()
if task == nil {
// 队列为空,等待一段时间
time.Sleep(100 * time.Millisecond)
continue
}
notifyTask, ok := task.(*NotifyTask)
if !ok {
logger.Errorf("invalid task type in queue for channel %d", channelID)
continue
}
// 处理通知任务
ncc.processNotifyTask(notifyTask)
}
}
}
// processNotifyTask 处理通知任务(仅处理 http 类型)
func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
httpClient := ncc.GetHttpClient(task.NotifyChannel.ID)
// 现在只处理 http 类型flashduty 保持直接发送
if task.NotifyChannel.RequestType == "http" {
if len(task.Sendtos) == 0 || ncc.needBatchContacts(task.NotifyChannel.RequestConfig.HTTPRequestConfig) {
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, task.Sendtos, httpClient)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos, resp, err)
// 调用通知记录回调函数
if ncc.notifyRecordFunc != nil {
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, task.Sendtos), resp, err)
}
} else {
for i := range task.Sendtos {
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, []string{task.Sendtos[i]}, httpClient)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos[i], resp, err)
// 调用通知记录回调函数
if ncc.notifyRecordFunc != nil {
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, []string{task.Sendtos[i]}), resp, err)
}
}
}
}
}
// 判断是否需要批量发送联系人
func (ncc *NotifyChannelCacheType) needBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
if requestConfig == nil {
return false
}
b, _ := json.Marshal(requestConfig)
return strings.Contains(string(b), "$sendtos")
}
// 获取发送目标
func (ncc *NotifyChannelCacheType) getSendTarget(customParams map[string]string, sendtos []string) string {
if len(customParams) == 0 {
return strings.Join(sendtos, ",")
}
values := make([]string, 0)
for _, value := range customParams {
runes := []rune(value)
if len(runes) <= 4 {
values = append(values, value)
} else {
maskedValue := string(runes[:len(runes)-4]) + "****"
values = append(values, maskedValue)
}
}
return strings.Join(values, ",")
}
func (ncc *NotifyChannelCacheType) Get(channelId int64) *models.NotifyChannelConfig {
ncc.RLock()
defer ncc.RUnlock()
@@ -117,6 +363,25 @@ func (ncc *NotifyChannelCacheType) GetChannelIds() []int64 {
return list
}
// 新增:将通知任务加入队列
func (ncc *NotifyChannelCacheType) EnqueueNotifyTask(task *NotifyTask) bool {
ncc.RLock()
queue := ncc.channelsQueue[task.NotifyChannel.ID]
ncc.RUnlock()
if queue == nil {
logger.Errorf("no queue found for channel %d", task.NotifyChannel.ID)
return false
}
success := queue.PushFront(task)
if !success {
logger.Warningf("failed to enqueue notify task for channel %d, queue is full", task.NotifyChannel.ID)
}
return success
}
func (ncc *NotifyChannelCacheType) SyncNotifyChannels() {
err := ncc.syncNotifyChannels()
if err != nil {
@@ -162,38 +427,8 @@ func (ncc *NotifyChannelCacheType) syncNotifyChannels() error {
m[lst[i].ID] = lst[i]
}
httpConcurrency := make(map[int64]chan struct{})
httpClient := make(map[int64]*http.Client)
smtpCh := make(map[int64]chan *models.EmailContext)
quitCh := make(map[int64]chan struct{})
for i := range lst {
// todo 优化变更粒度
switch lst[i].RequestType {
case "http", "flashduty":
if lst[i].RequestConfig == nil || lst[i].RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify channel %+v http request config not found", lst[i])
continue
}
cli, _ := models.GetHTTPClient(lst[i])
httpClient[lst[i].ID] = cli
httpConcurrency[lst[i].ID] = make(chan struct{}, lst[i].RequestConfig.HTTPRequestConfig.Concurrency)
for j := 0; j < lst[i].RequestConfig.HTTPRequestConfig.Concurrency; j++ {
httpConcurrency[lst[i].ID] <- struct{}{}
}
case "smtp":
ch := make(chan *models.EmailContext)
quit := make(chan struct{})
go ncc.startEmailSender(lst[i].ID, lst[i].RequestConfig.SMTPRequestConfig, ch, quit)
smtpCh[lst[i].ID] = ch
quitCh[lst[i].ID] = quit
default:
}
}
ncc.Set(m, httpConcurrency, httpClient, smtpCh, quitCh, stat.Total, stat.LastUpdated)
// 增量更新:只传递通道配置,让增量更新逻辑按需创建资源
ncc.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
ncc.stats.GaugeCronDuration.WithLabelValues("sync_notify_channels").Set(float64(ms))
@@ -305,22 +540,3 @@ func (ncc *NotifyChannelCacheType) dialSmtp(quitCh chan struct{}, d *gomail.Dial
}
}
}
func (ncc *NotifyChannelCacheType) HttpConcurrencyAdd(channelId int64) bool {
ncc.RLock()
defer ncc.RUnlock()
if _, ok := ncc.httpConcurrency[channelId]; !ok {
return false
}
_, ok := <-ncc.httpConcurrency[channelId]
return ok
}
func (ncc *NotifyChannelCacheType) HttpConcurrencyDone(channelId int64) {
ncc.RLock()
defer ncc.RUnlock()
if _, ok := ncc.httpConcurrency[channelId]; !ok {
return
}
ncc.httpConcurrency[channelId] <- struct{}{}
}