mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
4 Commits
webhook-pr
...
optimize-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15c2eadda6 | ||
|
|
0aea38e564 | ||
|
|
45e9253b2a | ||
|
|
9385ca9931 |
@@ -293,9 +293,9 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
|
||||
// handle global webhooks
|
||||
if !event.OverrideGlobalWebhook() {
|
||||
if e.alerting.WebhookBatchSend {
|
||||
sender.BatchSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
|
||||
sender.BatchSendWebhooks(e.ctx, notifyTarget.ToWebhookMap(), event, e.Astats)
|
||||
} else {
|
||||
sender.SingleSendWebhooks(e.ctx, notifyTarget.ToWebhookList(), event, e.Astats)
|
||||
sender.SingleSendWebhooks(e.ctx, notifyTarget.ToWebhookMap(), event, e.Astats)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -76,52 +76,8 @@ func (s *NotifyTarget) ToCallbackList() []string {
|
||||
return callbacks
|
||||
}
|
||||
|
||||
func (s *NotifyTarget) ToWebhookList() []*models.Webhook {
|
||||
webhooks := make([]*models.Webhook, 0, len(s.webhooks))
|
||||
for _, wh := range s.webhooks {
|
||||
if wh.Batch == 0 {
|
||||
wh.Batch = 1000
|
||||
}
|
||||
|
||||
if wh.Timeout == 0 {
|
||||
wh.Timeout = 10
|
||||
}
|
||||
|
||||
if wh.RetryCount == 0 {
|
||||
wh.RetryCount = 10
|
||||
}
|
||||
|
||||
if wh.RetryInterval == 0 {
|
||||
wh.RetryInterval = 10
|
||||
}
|
||||
|
||||
webhooks = append(webhooks, wh)
|
||||
}
|
||||
return webhooks
|
||||
}
|
||||
|
||||
func (s *NotifyTarget) ToWebhookMap() map[string]*models.Webhook {
|
||||
webhookMap := make(map[string]*models.Webhook, len(s.webhooks))
|
||||
for _, wh := range s.webhooks {
|
||||
if wh.Batch == 0 {
|
||||
wh.Batch = 1000
|
||||
}
|
||||
|
||||
if wh.Timeout == 0 {
|
||||
wh.Timeout = 10
|
||||
}
|
||||
|
||||
if wh.RetryCount == 0 {
|
||||
wh.RetryCount = 10
|
||||
}
|
||||
|
||||
if wh.RetryInterval == 0 {
|
||||
wh.RetryInterval = 10
|
||||
}
|
||||
|
||||
webhookMap[wh.Url] = wh
|
||||
}
|
||||
return webhookMap
|
||||
return s.webhooks
|
||||
}
|
||||
|
||||
func (s *NotifyTarget) ToUidList() []int64 {
|
||||
|
||||
@@ -59,17 +59,21 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
|
||||
if webhook != nil {
|
||||
insecureSkipVerify = webhook.SkipVerify
|
||||
}
|
||||
client := http.Client{
|
||||
Timeout: time.Duration(conf.Timeout) * time.Second,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
|
||||
},
|
||||
|
||||
if conf.Client == nil {
|
||||
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
|
||||
conf.Client = &http.Client{
|
||||
Timeout: time.Duration(conf.Timeout) * time.Second,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
|
||||
var resp *http.Response
|
||||
var body []byte
|
||||
resp, err = client.Do(req)
|
||||
resp, err = conf.Client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
|
||||
@@ -91,7 +95,7 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
|
||||
return false, string(body), nil
|
||||
}
|
||||
|
||||
func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
func SingleSendWebhooks(ctx *ctx.Context, webhooks map[string]*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
for _, conf := range webhooks {
|
||||
retryCount := 0
|
||||
for retryCount < 3 {
|
||||
@@ -106,7 +110,7 @@ func SingleSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *mod
|
||||
}
|
||||
}
|
||||
|
||||
func BatchSendWebhooks(ctx *ctx.Context, webhooks []*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
func BatchSendWebhooks(ctx *ctx.Context, webhooks map[string]*models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
for _, conf := range webhooks {
|
||||
logger.Infof("push event:%+v to queue:%v", event, conf)
|
||||
PushEvent(ctx, conf, event, stats)
|
||||
|
||||
@@ -138,6 +138,9 @@ ForceUseServerTS = true
|
||||
# [Pushgw.WriterOpt]
|
||||
# QueueMaxSize = 1000000
|
||||
# QueuePopSize = 1000
|
||||
# AllQueueMaxSize = 1000000
|
||||
# fresh time, unit ms
|
||||
# AllQueueMaxSizeInterval = 200
|
||||
|
||||
[[Pushgw.Writers]]
|
||||
# Url = "http://127.0.0.1:8480/insert/0/prometheus/api/v1/write"
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package memsto
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -19,7 +21,7 @@ import (
|
||||
type NotifyConfigCacheType struct {
|
||||
ctx *ctx.Context
|
||||
ConfigCache *ConfigCache
|
||||
webhooks []*models.Webhook
|
||||
webhooks map[string]*models.Webhook
|
||||
smtp aconf.SMTPConfig
|
||||
script models.NotifyScript
|
||||
|
||||
@@ -47,6 +49,7 @@ func NewNotifyConfigCache(ctx *ctx.Context, configCache *ConfigCache) *NotifyCon
|
||||
w := &NotifyConfigCacheType{
|
||||
ctx: ctx,
|
||||
ConfigCache: configCache,
|
||||
webhooks: make(map[string]*models.Webhook),
|
||||
}
|
||||
w.SyncNotifyConfigs()
|
||||
return w
|
||||
@@ -85,11 +88,55 @@ func (w *NotifyConfigCacheType) syncNotifyConfigs() error {
|
||||
}
|
||||
|
||||
if strings.TrimSpace(cval) != "" {
|
||||
err = json.Unmarshal([]byte(cval), &w.webhooks)
|
||||
var webhooks []*models.Webhook
|
||||
err = json.Unmarshal([]byte(cval), &webhooks)
|
||||
if err != nil {
|
||||
dumper.PutSyncRecord("webhooks", start.Unix(), -1, -1, "failed to unmarshal configs.webhook: "+err.Error())
|
||||
logger.Errorf("failed to unmarshal webhooks:%s error:%v", cval, err)
|
||||
}
|
||||
|
||||
newWebhooks := make(map[string]*models.Webhook, len(webhooks))
|
||||
for i := 0; i < len(webhooks); i++ {
|
||||
if webhooks[i].Batch == 0 {
|
||||
webhooks[i].Batch = 1000
|
||||
}
|
||||
|
||||
if webhooks[i].Timeout == 0 {
|
||||
webhooks[i].Timeout = 10
|
||||
}
|
||||
|
||||
if webhooks[i].RetryCount == 0 {
|
||||
webhooks[i].RetryCount = 10
|
||||
}
|
||||
|
||||
if webhooks[i].RetryInterval == 0 {
|
||||
webhooks[i].RetryInterval = 10
|
||||
}
|
||||
|
||||
if webhooks[i].Client == nil {
|
||||
webhooks[i].Client = &http.Client{
|
||||
Timeout: time.Second * time.Duration(webhooks[i].Timeout),
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhooks[i].SkipVerify},
|
||||
},
|
||||
}
|
||||
}
|
||||
newWebhooks[webhooks[i].Url] = webhooks[i]
|
||||
}
|
||||
|
||||
for url, wh := range newWebhooks {
|
||||
if oldWh, has := w.webhooks[url]; has && oldWh.Hash() != wh.Hash() {
|
||||
w.webhooks[url] = wh
|
||||
} else {
|
||||
w.webhooks[url] = wh
|
||||
}
|
||||
}
|
||||
|
||||
for url := range w.webhooks {
|
||||
if _, has := newWebhooks[url]; !has {
|
||||
delete(w.webhooks, url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
dumper.PutSyncRecord("webhooks", start.Unix(), time.Since(start).Milliseconds(), len(w.webhooks), "success, webhooks:\n"+cval)
|
||||
@@ -133,7 +180,7 @@ func (w *NotifyConfigCacheType) syncNotifyConfigs() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *NotifyConfigCacheType) GetWebhooks() []*models.Webhook {
|
||||
func (w *NotifyConfigCacheType) GetWebhooks() map[string]*models.Webhook {
|
||||
w.RWMutex.RLock()
|
||||
defer w.RWMutex.RUnlock()
|
||||
return w.webhooks
|
||||
|
||||
@@ -115,68 +115,54 @@ func BusiGroupExists(ctx *ctx.Context, where string, args ...interface{}) (bool,
|
||||
return num > 0, err
|
||||
}
|
||||
|
||||
var entries = []struct {
|
||||
entry interface{}
|
||||
errorMessage string
|
||||
}{
|
||||
{
|
||||
entry: &AlertRule{},
|
||||
errorMessage: "Some alert rules still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &AlertMute{},
|
||||
errorMessage: "Some alert mutes still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &AlertSubscribe{},
|
||||
errorMessage: "Some alert subscribes still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &Target{},
|
||||
errorMessage: "Some targets still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &RecordingRule{},
|
||||
errorMessage: "Some recording rules still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &TaskTpl{},
|
||||
errorMessage: "Some recovery scripts still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &TaskRecord{},
|
||||
errorMessage: "Some Task Record records still in the BusiGroup",
|
||||
},
|
||||
{
|
||||
entry: &TargetBusiGroup{},
|
||||
errorMessage: "Some target busigroups still in the BusiGroup",
|
||||
},
|
||||
}
|
||||
|
||||
func (bg *BusiGroup) Del(ctx *ctx.Context) error {
|
||||
has, err := Exists(DB(ctx).Model(&AlertMute{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, e := range entries {
|
||||
has, err := Exists(DB(ctx).Model(e.entry).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some alert mutes still in the BusiGroup")
|
||||
}
|
||||
|
||||
has, err = Exists(DB(ctx).Model(&AlertSubscribe{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some alert subscribes still in the BusiGroup")
|
||||
}
|
||||
|
||||
has, err = Exists(DB(ctx).Model(&TargetBusiGroup{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some targets still in the BusiGroup")
|
||||
}
|
||||
|
||||
has, err = Exists(DB(ctx).Model(&Board{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some dashboards still in the BusiGroup")
|
||||
}
|
||||
|
||||
has, err = Exists(DB(ctx).Model(&TaskTpl{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some recovery scripts still in the BusiGroup")
|
||||
}
|
||||
|
||||
// hasCR, err := Exists(DB(ctx).Table("collect_rule").Where("group_id=?", bg.Id))
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
|
||||
// if hasCR {
|
||||
// return errors.New("Some collect rules still in the BusiGroup")
|
||||
// }
|
||||
|
||||
has, err = Exists(DB(ctx).Model(&AlertRule{}).Where("group_id=?", bg.Id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if has {
|
||||
return errors.New("Some alert rules still in the BusiGroup")
|
||||
if has {
|
||||
return errors.New(e.errorMessage)
|
||||
}
|
||||
}
|
||||
|
||||
return DB(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/toolkits/pkg/str"
|
||||
)
|
||||
|
||||
const WEBHOOKKEY = "webhook"
|
||||
const NOTIFYSCRIPT = "notify_script"
|
||||
const NOTIFYCHANNEL = "notify_channel"
|
||||
@@ -24,6 +31,11 @@ type Webhook struct {
|
||||
RetryCount int `json:"retry_count"`
|
||||
RetryInterval int `json:"retry_interval"`
|
||||
Batch int `json:"batch"`
|
||||
Client *http.Client `json:"-"`
|
||||
}
|
||||
|
||||
func (w *Webhook) Hash() string {
|
||||
return str.MD5(fmt.Sprintf("%d_%t_%s_%s_%s_%d_%v_%t_%s_%d_%d_%d", w.Type, w.Enable, w.Url, w.BasicAuthUser, w.BasicAuthPass, w.Timeout, w.HeaderMap, w.SkipVerify, w.Note, w.RetryCount, w.RetryInterval, w.Batch))
|
||||
}
|
||||
|
||||
type NotifyScript struct {
|
||||
|
||||
@@ -24,8 +24,10 @@ type Pushgw struct {
|
||||
}
|
||||
|
||||
type WriterGlobalOpt struct {
|
||||
QueueMaxSize int
|
||||
QueuePopSize int
|
||||
QueueMaxSize int
|
||||
QueuePopSize int
|
||||
AllQueueMaxSize int
|
||||
AllQueueMaxSizeInterval int
|
||||
}
|
||||
|
||||
type WriterOptions struct {
|
||||
@@ -77,6 +79,14 @@ func (p *Pushgw) PreCheck() {
|
||||
p.WriterOpt.QueuePopSize = 1000
|
||||
}
|
||||
|
||||
if p.WriterOpt.AllQueueMaxSize <= 0 {
|
||||
p.WriterOpt.AllQueueMaxSize = 10000000
|
||||
}
|
||||
|
||||
if p.WriterOpt.AllQueueMaxSizeInterval <= 0 {
|
||||
p.WriterOpt.AllQueueMaxSizeInterval = 200
|
||||
}
|
||||
|
||||
if p.WriteConcurrency <= 0 {
|
||||
p.WriteConcurrency = 5000
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/fasttime"
|
||||
@@ -138,9 +139,10 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
}
|
||||
|
||||
type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]WriterType
|
||||
queues map[string]*IdentQueue
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]WriterType
|
||||
queues map[string]*IdentQueue
|
||||
allQueueLen atomic.Value
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -160,14 +162,30 @@ func (ws *WritersType) ReportQueueStats(ident string, identQueue *IdentQueue) (i
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) SetAllQueueLen() {
|
||||
for {
|
||||
curMetricLen := 0
|
||||
ws.RLock()
|
||||
for _, q := range ws.queues {
|
||||
curMetricLen += q.list.Len()
|
||||
}
|
||||
ws.RUnlock()
|
||||
ws.allQueueLen.Store(curMetricLen)
|
||||
time.Sleep(time.Duration(ws.pushgw.WriterOpt.AllQueueMaxSizeInterval) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func NewWriters(pushgwConfig pconf.Pushgw) *WritersType {
|
||||
writers := &WritersType{
|
||||
backends: make(map[string]WriterType),
|
||||
queues: make(map[string]*IdentQueue),
|
||||
pushgw: pushgwConfig,
|
||||
backends: make(map[string]WriterType),
|
||||
queues: make(map[string]*IdentQueue),
|
||||
pushgw: pushgwConfig,
|
||||
allQueueLen: atomic.Value{},
|
||||
}
|
||||
|
||||
writers.Init()
|
||||
|
||||
go writers.SetAllQueueLen()
|
||||
go writers.CleanExpQueue()
|
||||
return writers
|
||||
}
|
||||
@@ -217,6 +235,13 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
|
||||
}
|
||||
|
||||
identQueue.ts = time.Now().Unix()
|
||||
curLen := ws.allQueueLen.Load().(int)
|
||||
if curLen > ws.pushgw.WriterOpt.AllQueueMaxSize {
|
||||
logger.Warningf("Write %+v full, metric count over limit: %d", v, curLen)
|
||||
CounterPushQueueErrorTotal.WithLabelValues(ident).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
succ := identQueue.list.PushFront(v)
|
||||
if !succ {
|
||||
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, identQueue.list.Len())
|
||||
@@ -245,6 +270,7 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
opts := ws.pushgw.Writers
|
||||
ws.allQueueLen.Store(0)
|
||||
|
||||
for i := 0; i < len(opts); i++ {
|
||||
tlsConf, err := opts[i].ClientConfig.TLSConfig()
|
||||
|
||||
Reference in New Issue
Block a user