mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
2 Commits
v8.1.0
...
optimize-h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8ddce8149 | ||
|
|
45685947dd |
@@ -82,6 +82,10 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
|
||||
}
|
||||
|
||||
pipeline.Init()
|
||||
|
||||
// 设置通知记录回调函数
|
||||
notifyChannelCache.SetNotifyRecordFunc(sender.NotifyRecord)
|
||||
|
||||
return notify
|
||||
}
|
||||
|
||||
@@ -451,30 +455,25 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
|
||||
}
|
||||
return
|
||||
|
||||
case "http":
|
||||
if e.notifyChannelCache.HttpConcurrencyAdd(notifyChannel.ID) {
|
||||
defer e.notifyChannelCache.HttpConcurrencyDone(notifyChannel.ID)
|
||||
}
|
||||
if notifyChannel.RequestConfig == nil {
|
||||
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, request config not found", notifyRuleId, notifyChannel.Name, events[0])
|
||||
// 使用队列模式处理 http 通知
|
||||
// 创建通知任务
|
||||
task := &memsto.NotifyTask{
|
||||
Events: events,
|
||||
NotifyRuleId: notifyRuleId,
|
||||
NotifyChannel: notifyChannel,
|
||||
TplContent: tplContent,
|
||||
CustomParams: customParams,
|
||||
Sendtos: sendtos,
|
||||
}
|
||||
|
||||
if notifyChannel.RequestConfig.HTTPRequestConfig == nil {
|
||||
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, http request config not found", notifyRuleId, notifyChannel.Name, events[0])
|
||||
}
|
||||
|
||||
if NeedBatchContacts(notifyChannel.RequestConfig.HTTPRequestConfig) || len(sendtos) == 0 {
|
||||
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, sendtos, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos, resp, err)
|
||||
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), resp, err)
|
||||
} else {
|
||||
for i := range sendtos {
|
||||
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, []string{sendtos[i]}, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos[i], resp, err)
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, []string{sendtos[i]}), resp, err)
|
||||
}
|
||||
// 将任务加入队列
|
||||
success := e.notifyChannelCache.EnqueueNotifyTask(task)
|
||||
if !success {
|
||||
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
|
||||
// 如果入队失败,记录错误通知
|
||||
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
|
||||
}
|
||||
|
||||
case "smtp":
|
||||
|
||||
@@ -2,8 +2,10 @@ package memsto
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -14,9 +16,23 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// NotifyTask 表示一个通知发送任务
|
||||
type NotifyTask struct {
|
||||
Events []*models.AlertCurEvent
|
||||
NotifyRuleId int64
|
||||
NotifyChannel *models.NotifyChannelConfig
|
||||
TplContent map[string]interface{}
|
||||
CustomParams map[string]string
|
||||
Sendtos []string
|
||||
}
|
||||
|
||||
// NotifyRecordFunc 通知记录函数类型
|
||||
type NotifyRecordFunc func(ctx *ctx.Context, events []*models.AlertCurEvent, notifyRuleId int64, channelName, target, resp string, err error)
|
||||
|
||||
type NotifyChannelCacheType struct {
|
||||
statTotal int64
|
||||
statLastUpdated int64
|
||||
@@ -24,13 +40,18 @@ type NotifyChannelCacheType struct {
|
||||
stats *Stats
|
||||
|
||||
sync.RWMutex
|
||||
channels map[int64]*models.NotifyChannelConfig // key: channel id
|
||||
|
||||
httpConcurrency map[int64]chan struct{}
|
||||
channels map[int64]*models.NotifyChannelConfig // key: channel id
|
||||
channelsQueue map[int64]*list.SafeListLimited
|
||||
|
||||
httpClient map[int64]*http.Client
|
||||
smtpCh map[int64]chan *models.EmailContext
|
||||
smtpQuitCh map[int64]chan struct{}
|
||||
|
||||
// 队列消费者控制
|
||||
queueQuitCh map[int64]chan struct{}
|
||||
|
||||
// 通知记录回调函数
|
||||
notifyRecordFunc NotifyRecordFunc
|
||||
}
|
||||
|
||||
func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheType {
|
||||
@@ -40,18 +61,20 @@ func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheTy
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
channels: make(map[int64]*models.NotifyChannelConfig),
|
||||
channelsQueue: make(map[int64]*list.SafeListLimited),
|
||||
queueQuitCh: make(map[int64]chan struct{}),
|
||||
httpClient: make(map[int64]*http.Client),
|
||||
smtpCh: make(map[int64]chan *models.EmailContext),
|
||||
smtpQuitCh: make(map[int64]chan struct{}),
|
||||
}
|
||||
|
||||
ncc.SyncNotifyChannels()
|
||||
return ncc
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) Reset() {
|
||||
ncc.Lock()
|
||||
defer ncc.Unlock()
|
||||
|
||||
ncc.statTotal = -1
|
||||
ncc.statLastUpdated = -1
|
||||
ncc.channels = make(map[int64]*models.NotifyChannelConfig)
|
||||
// SetNotifyRecordFunc 设置通知记录回调函数
|
||||
func (ncc *NotifyChannelCacheType) SetNotifyRecordFunc(fn NotifyRecordFunc) {
|
||||
ncc.notifyRecordFunc = fn
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
@@ -62,30 +85,253 @@ func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, httpConcurrency map[int64]chan struct{}, httpClient map[int64]*http.Client,
|
||||
smtpCh map[int64]chan *models.EmailContext, quitCh map[int64]chan struct{}, total, lastUpdated int64) {
|
||||
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, total, lastUpdated int64) {
|
||||
ncc.Lock()
|
||||
for _, k := range ncc.httpConcurrency {
|
||||
close(k)
|
||||
}
|
||||
ncc.httpConcurrency = httpConcurrency
|
||||
ncc.channels = m
|
||||
ncc.httpClient = httpClient
|
||||
ncc.smtpCh = smtpCh
|
||||
defer ncc.Unlock()
|
||||
|
||||
for i := range ncc.smtpQuitCh {
|
||||
close(ncc.smtpQuitCh[i])
|
||||
}
|
||||
// 1. 处理需要删除的通道
|
||||
ncc.removeDeletedChannels(m)
|
||||
|
||||
ncc.smtpQuitCh = quitCh
|
||||
|
||||
ncc.Unlock()
|
||||
// 2. 处理新增和更新的通道
|
||||
ncc.addOrUpdateChannels(m)
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
ncc.statTotal = total
|
||||
ncc.statLastUpdated = lastUpdated
|
||||
}
|
||||
|
||||
// removeDeletedChannels 移除已删除的通道
|
||||
func (ncc *NotifyChannelCacheType) removeDeletedChannels(newChannels map[int64]*models.NotifyChannelConfig) {
|
||||
for chID := range ncc.channels {
|
||||
if _, exists := newChannels[chID]; !exists {
|
||||
logger.Infof("removing deleted channel %d", chID)
|
||||
|
||||
// 停止消费者协程
|
||||
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
|
||||
close(quitCh)
|
||||
delete(ncc.queueQuitCh, chID)
|
||||
}
|
||||
|
||||
// 删除队列
|
||||
delete(ncc.channelsQueue, chID)
|
||||
|
||||
// 删除HTTP客户端
|
||||
delete(ncc.httpClient, chID)
|
||||
|
||||
// 停止SMTP发送器
|
||||
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
|
||||
close(quitCh)
|
||||
delete(ncc.smtpQuitCh, chID)
|
||||
delete(ncc.smtpCh, chID)
|
||||
}
|
||||
|
||||
// 删除通道配置
|
||||
delete(ncc.channels, chID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addOrUpdateChannels 添加或更新通道
|
||||
func (ncc *NotifyChannelCacheType) addOrUpdateChannels(newChannels map[int64]*models.NotifyChannelConfig) {
|
||||
for chID, newChannel := range newChannels {
|
||||
oldChannel, exists := ncc.channels[chID]
|
||||
if exists {
|
||||
if ncc.channelConfigChanged(oldChannel, newChannel) {
|
||||
logger.Infof("updating channel %d (new: %t)", chID, !exists)
|
||||
ncc.stopChannelResources(chID)
|
||||
} else {
|
||||
logger.Infof("channel %d config not changed", chID)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 更新通道配置
|
||||
ncc.channels[chID] = newChannel
|
||||
|
||||
// 根据类型创建相应的资源
|
||||
switch newChannel.RequestType {
|
||||
case "http", "flashduty":
|
||||
// 创建HTTP客户端
|
||||
if newChannel.RequestConfig != nil && newChannel.RequestConfig.HTTPRequestConfig != nil {
|
||||
cli, err := models.GetHTTPClient(newChannel)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to create HTTP client for channel %d: %v", chID, err)
|
||||
} else {
|
||||
if ncc.httpClient == nil {
|
||||
ncc.httpClient = make(map[int64]*http.Client)
|
||||
}
|
||||
ncc.httpClient[chID] = cli
|
||||
}
|
||||
}
|
||||
|
||||
// 对于 http 类型,启动队列和消费者
|
||||
if newChannel.RequestType == "http" {
|
||||
ncc.startHttpChannel(chID, newChannel)
|
||||
}
|
||||
case "smtp":
|
||||
// 创建SMTP发送器
|
||||
if newChannel.RequestConfig != nil && newChannel.RequestConfig.SMTPRequestConfig != nil {
|
||||
ch := make(chan *models.EmailContext)
|
||||
quit := make(chan struct{})
|
||||
go ncc.startEmailSender(chID, newChannel.RequestConfig.SMTPRequestConfig, ch, quit)
|
||||
|
||||
if ncc.smtpCh == nil {
|
||||
ncc.smtpCh = make(map[int64]chan *models.EmailContext)
|
||||
}
|
||||
if ncc.smtpQuitCh == nil {
|
||||
ncc.smtpQuitCh = make(map[int64]chan struct{})
|
||||
}
|
||||
ncc.smtpCh[chID] = ch
|
||||
ncc.smtpQuitCh[chID] = quit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// channelConfigChanged 检查通道配置是否发生变化
|
||||
func (ncc *NotifyChannelCacheType) channelConfigChanged(oldChannel, newChannel *models.NotifyChannelConfig) bool {
|
||||
if oldChannel == nil || newChannel == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
// check updateat
|
||||
if oldChannel.UpdateAt != newChannel.UpdateAt {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// stopChannelResources 停止通道的相关资源
|
||||
func (ncc *NotifyChannelCacheType) stopChannelResources(chID int64) {
|
||||
// 停止HTTP消费者协程
|
||||
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
|
||||
close(quitCh)
|
||||
delete(ncc.queueQuitCh, chID)
|
||||
delete(ncc.channelsQueue, chID)
|
||||
}
|
||||
|
||||
// 停止SMTP发送器
|
||||
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
|
||||
close(quitCh)
|
||||
delete(ncc.smtpQuitCh, chID)
|
||||
delete(ncc.smtpCh, chID)
|
||||
}
|
||||
}
|
||||
|
||||
// startHttpChannel 启动HTTP通道的队列和消费者
|
||||
func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.NotifyChannelConfig) {
|
||||
if channel.RequestConfig == nil || channel.RequestConfig.HTTPRequestConfig == nil {
|
||||
logger.Warningf("notify channel %+v http request config not found", channel)
|
||||
return
|
||||
}
|
||||
|
||||
// 创建队列
|
||||
queue := list.NewSafeListLimited(100000)
|
||||
ncc.channelsQueue[chID] = queue
|
||||
|
||||
// 启动消费者协程
|
||||
quitCh := make(chan struct{})
|
||||
ncc.queueQuitCh[chID] = quitCh
|
||||
|
||||
// 启动指定数量的消费者协程
|
||||
concurrency := channel.RequestConfig.HTTPRequestConfig.Concurrency
|
||||
for i := 0; i < concurrency; i++ {
|
||||
go ncc.startNotifyConsumer(chID, queue, quitCh)
|
||||
}
|
||||
|
||||
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
|
||||
}
|
||||
|
||||
// 启动通知消费者协程
|
||||
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
|
||||
logger.Infof("starting notify consumer for channel %d", channelID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-quitCh:
|
||||
logger.Infof("notify consumer for channel %d stopped", channelID)
|
||||
return
|
||||
default:
|
||||
// 从队列中取出任务
|
||||
task := queue.PopBack()
|
||||
if task == nil {
|
||||
// 队列为空,等待一段时间
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
notifyTask, ok := task.(*NotifyTask)
|
||||
if !ok {
|
||||
logger.Errorf("invalid task type in queue for channel %d", channelID)
|
||||
continue
|
||||
}
|
||||
|
||||
// 处理通知任务
|
||||
ncc.processNotifyTask(notifyTask)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processNotifyTask 处理通知任务(仅处理 http 类型)
|
||||
func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
|
||||
httpClient := ncc.GetHttpClient(task.NotifyChannel.ID)
|
||||
|
||||
// 现在只处理 http 类型,flashduty 保持直接发送
|
||||
if task.NotifyChannel.RequestType == "http" {
|
||||
if len(task.Sendtos) == 0 || ncc.needBatchContacts(task.NotifyChannel.RequestConfig.HTTPRequestConfig) {
|
||||
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, task.Sendtos, httpClient)
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
|
||||
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos, resp, err)
|
||||
|
||||
// 调用通知记录回调函数
|
||||
if ncc.notifyRecordFunc != nil {
|
||||
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, task.Sendtos), resp, err)
|
||||
}
|
||||
} else {
|
||||
for i := range task.Sendtos {
|
||||
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, []string{task.Sendtos[i]}, httpClient)
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
|
||||
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos[i], resp, err)
|
||||
|
||||
// 调用通知记录回调函数
|
||||
if ncc.notifyRecordFunc != nil {
|
||||
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, []string{task.Sendtos[i]}), resp, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 判断是否需要批量发送联系人
|
||||
func (ncc *NotifyChannelCacheType) needBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
|
||||
if requestConfig == nil {
|
||||
return false
|
||||
}
|
||||
b, _ := json.Marshal(requestConfig)
|
||||
return strings.Contains(string(b), "$sendtos")
|
||||
}
|
||||
|
||||
// 获取发送目标
|
||||
func (ncc *NotifyChannelCacheType) getSendTarget(customParams map[string]string, sendtos []string) string {
|
||||
if len(customParams) == 0 {
|
||||
return strings.Join(sendtos, ",")
|
||||
}
|
||||
|
||||
values := make([]string, 0)
|
||||
for _, value := range customParams {
|
||||
runes := []rune(value)
|
||||
if len(runes) <= 4 {
|
||||
values = append(values, value)
|
||||
} else {
|
||||
maskedValue := string(runes[:len(runes)-4]) + "****"
|
||||
values = append(values, maskedValue)
|
||||
}
|
||||
}
|
||||
|
||||
return strings.Join(values, ",")
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) Get(channelId int64) *models.NotifyChannelConfig {
|
||||
ncc.RLock()
|
||||
defer ncc.RUnlock()
|
||||
@@ -117,6 +363,25 @@ func (ncc *NotifyChannelCacheType) GetChannelIds() []int64 {
|
||||
return list
|
||||
}
|
||||
|
||||
// 新增:将通知任务加入队列
|
||||
func (ncc *NotifyChannelCacheType) EnqueueNotifyTask(task *NotifyTask) bool {
|
||||
ncc.RLock()
|
||||
queue := ncc.channelsQueue[task.NotifyChannel.ID]
|
||||
ncc.RUnlock()
|
||||
|
||||
if queue == nil {
|
||||
logger.Errorf("no queue found for channel %d", task.NotifyChannel.ID)
|
||||
return false
|
||||
}
|
||||
|
||||
success := queue.PushFront(task)
|
||||
if !success {
|
||||
logger.Warningf("failed to enqueue notify task for channel %d, queue is full", task.NotifyChannel.ID)
|
||||
}
|
||||
|
||||
return success
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) SyncNotifyChannels() {
|
||||
err := ncc.syncNotifyChannels()
|
||||
if err != nil {
|
||||
@@ -162,38 +427,8 @@ func (ncc *NotifyChannelCacheType) syncNotifyChannels() error {
|
||||
m[lst[i].ID] = lst[i]
|
||||
}
|
||||
|
||||
httpConcurrency := make(map[int64]chan struct{})
|
||||
httpClient := make(map[int64]*http.Client)
|
||||
smtpCh := make(map[int64]chan *models.EmailContext)
|
||||
quitCh := make(map[int64]chan struct{})
|
||||
|
||||
for i := range lst {
|
||||
// todo 优化变更粒度
|
||||
|
||||
switch lst[i].RequestType {
|
||||
case "http", "flashduty":
|
||||
if lst[i].RequestConfig == nil || lst[i].RequestConfig.HTTPRequestConfig == nil {
|
||||
logger.Warningf("notify channel %+v http request config not found", lst[i])
|
||||
continue
|
||||
}
|
||||
|
||||
cli, _ := models.GetHTTPClient(lst[i])
|
||||
httpClient[lst[i].ID] = cli
|
||||
httpConcurrency[lst[i].ID] = make(chan struct{}, lst[i].RequestConfig.HTTPRequestConfig.Concurrency)
|
||||
for j := 0; j < lst[i].RequestConfig.HTTPRequestConfig.Concurrency; j++ {
|
||||
httpConcurrency[lst[i].ID] <- struct{}{}
|
||||
}
|
||||
case "smtp":
|
||||
ch := make(chan *models.EmailContext)
|
||||
quit := make(chan struct{})
|
||||
go ncc.startEmailSender(lst[i].ID, lst[i].RequestConfig.SMTPRequestConfig, ch, quit)
|
||||
smtpCh[lst[i].ID] = ch
|
||||
quitCh[lst[i].ID] = quit
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
ncc.Set(m, httpConcurrency, httpClient, smtpCh, quitCh, stat.Total, stat.LastUpdated)
|
||||
// 增量更新:只传递通道配置,让增量更新逻辑按需创建资源
|
||||
ncc.Set(m, stat.Total, stat.LastUpdated)
|
||||
|
||||
ms := time.Since(start).Milliseconds()
|
||||
ncc.stats.GaugeCronDuration.WithLabelValues("sync_notify_channels").Set(float64(ms))
|
||||
@@ -305,22 +540,3 @@ func (ncc *NotifyChannelCacheType) dialSmtp(quitCh chan struct{}, d *gomail.Dial
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) HttpConcurrencyAdd(channelId int64) bool {
|
||||
ncc.RLock()
|
||||
defer ncc.RUnlock()
|
||||
if _, ok := ncc.httpConcurrency[channelId]; !ok {
|
||||
return false
|
||||
}
|
||||
_, ok := <-ncc.httpConcurrency[channelId]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelCacheType) HttpConcurrencyDone(channelId int64) {
|
||||
ncc.RLock()
|
||||
defer ncc.RUnlock()
|
||||
if _, ok := ncc.httpConcurrency[channelId]; !ok {
|
||||
return
|
||||
}
|
||||
ncc.httpConcurrency[channelId] <- struct{}{}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user