Compare commits

...

1 Commits

Author SHA1 Message Date
ning
15c2eadda6 refactor: optimize webhook send 2024-12-11 16:25:35 +08:00
5 changed files with 77 additions and 58 deletions

View File

@@ -293,9 +293,9 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
// handle global webhooks
if !event.OverrideGlobalWebhook() {
if e.alerting.WebhookBatchSend {
sender.BatchSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
sender.BatchSendWebhooks(e.ctx, notifyTarget.ToWebhookMap(), event, e.Astats)
} else {
sender.SingleSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
sender.SingleSendWebhooks(e.ctx, notifyTarget.ToWebhookMap(), event, e.Astats)
}
}

View File

@@ -76,52 +76,8 @@ func (s *NotifyTarget) ToCallbackList() []string {
return callbacks
}
func (s *NotifyTarget) ToWebhookList() []*models.Webhook {
webhooks := make([]*models.Webhook, 0, len(s.webhooks))
for _, wh := range s.webhooks {
if wh.Batch == 0 {
wh.Batch = 1000
}
if wh.Timeout == 0 {
wh.Timeout = 10
}
if wh.RetryCount == 0 {
wh.RetryCount = 10
}
if wh.RetryInterval == 0 {
wh.RetryInterval = 10
}
webhooks = append(webhooks, wh)
}
return webhooks
}
func (s *NotifyTarget) ToWebhookMap() map[string]*models.Webhook {
webhookMap := make(map[string]*models.Webhook, len(s.webhooks))
for _, wh := range s.webhooks {
if wh.Batch == 0 {
wh.Batch = 1000
}
if wh.Timeout == 0 {
wh.Timeout = 10
}
if wh.RetryCount == 0 {
wh.RetryCount = 10
}
if wh.RetryInterval == 0 {
wh.RetryInterval = 10
}
webhookMap[wh.Url] = wh
}
return webhookMap
return s.webhooks
}
func (s *NotifyTarget) ToUidList() []int64 {

View File

@@ -59,17 +59,21 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
if webhook != nil {
insecureSkipVerify = webhook.SkipVerify
}
client := http.Client{
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
if conf.Client == nil {
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
conf.Client = &http.Client{
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
}
}
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
var resp *http.Response
var body []byte
resp, err = client.Do(req)
resp, err = conf.Client.Do(req)
if err != nil {
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
@@ -91,7 +95,7 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
return false, string(body), nil
}
func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func SingleSendWebhooks(ctx *ctx.Context, webhooks map[string]*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
for _, conf := range webhooks {
retryCount := 0
for retryCount < 3 {
@@ -106,7 +110,7 @@ func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *mod
}
}
func BatchSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
func BatchSendWebhooks(ctx *ctx.Context, webhooks map[string]*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
for _, conf := range webhooks {
logger.Infof("push event:%+v to queue:%v", event, conf)
PushEvent(ctx, conf, event, stats)

View File

@@ -1,7 +1,9 @@
package memsto
import (
"crypto/tls"
"encoding/json"
"net/http"
"strings"
"sync"
"time"
@@ -19,7 +21,7 @@ import (
type NotifyConfigCacheType struct {
ctx *ctx.Context
ConfigCache *ConfigCache
webhooks []*models.Webhook
webhooks map[string]*models.Webhook
smtp aconf.SMTPConfig
script models.NotifyScript
@@ -47,6 +49,7 @@ func NewNotifyConfigCache(ctx *ctx.Context, configCache *ConfigCache) *NotifyCon
w := &NotifyConfigCacheType{
ctx: ctx,
ConfigCache: configCache,
webhooks: make(map[string]*models.Webhook),
}
w.SyncNotifyConfigs()
return w
@@ -85,11 +88,55 @@ func (w *NotifyConfigCacheType) syncNotifyConfigs() error {
}
if strings.TrimSpace(cval) != "" {
err = json.Unmarshal([]byte(cval), &w.webhooks)
var webhooks []*models.Webhook
err = json.Unmarshal([]byte(cval), &webhooks)
if err != nil {
dumper.PutSyncRecord("webhooks", start.Unix(), -1, -1, "failed to unmarshal configs.webhook: "+err.Error())
logger.Errorf("failed to unmarshal webhooks:%s error:%v", cval, err)
}
newWebhooks := make(map[string]*models.Webhook, len(webhooks))
for i := 0; i < len(webhooks); i++ {
if webhooks[i].Batch == 0 {
webhooks[i].Batch = 1000
}
if webhooks[i].Timeout == 0 {
webhooks[i].Timeout = 10
}
if webhooks[i].RetryCount == 0 {
webhooks[i].RetryCount = 10
}
if webhooks[i].RetryInterval == 0 {
webhooks[i].RetryInterval = 10
}
if webhooks[i].Client == nil {
webhooks[i].Client = &http.Client{
Timeout: time.Second * time.Duration(webhooks[i].Timeout),
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhooks[i].SkipVerify},
},
}
}
newWebhooks[webhooks[i].Url] = webhooks[i]
}
for url, wh := range newWebhooks {
if oldWh, has := w.webhooks[url]; has && oldWh.Hash() != wh.Hash() {
w.webhooks[url] = wh
} else {
w.webhooks[url] = wh
}
}
for url := range w.webhooks {
if _, has := newWebhooks[url]; !has {
delete(w.webhooks, url)
}
}
}
dumper.PutSyncRecord("webhooks", start.Unix(), time.Since(start).Milliseconds(), len(w.webhooks), "success, webhooks:\n"+cval)
@@ -133,7 +180,7 @@ func (w *NotifyConfigCacheType) syncNotifyConfigs() error {
return nil
}
func (w *NotifyConfigCacheType) GetWebhooks() []*models.Webhook {
func (w *NotifyConfigCacheType) GetWebhooks() map[string]*models.Webhook {
w.RWMutex.RLock()
defer w.RWMutex.RUnlock()
return w.webhooks

View File

@@ -1,5 +1,12 @@
package models
import (
"fmt"
"net/http"
"github.com/toolkits/pkg/str"
)
const WEBHOOKKEY = "webhook"
const NOTIFYSCRIPT = "notify_script"
const NOTIFYCHANNEL = "notify_channel"
@@ -24,6 +31,11 @@ type Webhook struct {
RetryCount int `json:"retry_count"`
RetryInterval int `json:"retry_interval"`
Batch int `json:"batch"`
Client *http.Client `json:"-"`
}
func (w *Webhook) Hash() string {
return str.MD5(fmt.Sprintf("%d_%t_%s_%s_%s_%d_%v_%t_%s_%d_%d_%d", w.Type, w.Enable, w.Url, w.BasicAuthUser, w.BasicAuthPass, w.Timeout, w.HeaderMap, w.SkipVerify, w.Note, w.RetryCount, w.RetryInterval, w.Batch))
}
type NotifyScript struct {