mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-16 21:09:09 +00:00
Compare commits
3 Commits
main
...
send-event
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4db3ffd8b | ||
|
|
0947c3dfa2 | ||
|
|
0d8050aeed |
@@ -33,6 +33,17 @@ type Alerting struct {
|
||||
TemplatesDir string
|
||||
NotifyConcurrency int
|
||||
WebhookBatchSend bool
|
||||
GlobalWebhook GlobalWebhook
|
||||
}
|
||||
|
||||
type GlobalWebhook struct {
|
||||
Enable bool
|
||||
Url string
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
Timeout int
|
||||
Headers []string
|
||||
SkipVerify bool
|
||||
}
|
||||
|
||||
type CallPlugin struct {
|
||||
|
||||
@@ -117,6 +117,8 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
|
||||
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
|
||||
|
||||
sender.InitStaticGlobalWebhook(alertc.Alerting.GlobalWebhook)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, configCvalCache, alertc.Alerting, ctx, alertStats)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients, alertMuteCache)
|
||||
|
||||
|
||||
@@ -608,6 +608,10 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
|
||||
return
|
||||
}
|
||||
|
||||
if !isSubscribe {
|
||||
go sender.SendStaticGlobalWebhook(e.ctx, event.DeepCopy(), e.Astats)
|
||||
}
|
||||
|
||||
rule := e.alertRuleCache.Get(event.RuleId)
|
||||
if rule == nil {
|
||||
return
|
||||
|
||||
116
alert/sender/global_webhook.go
Normal file
116
alert/sender/global_webhook.go
Normal file
@@ -0,0 +1,116 @@
|
||||
package sender
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var staticGlobalWebhookClient *http.Client
|
||||
var staticGlobalWebhookConf aconf.GlobalWebhook
|
||||
|
||||
const staticGlobalWebhookChannel = "static_global_webhook"
|
||||
|
||||
func InitStaticGlobalWebhook(conf aconf.GlobalWebhook) {
|
||||
staticGlobalWebhookConf = conf
|
||||
if !conf.Enable || conf.Url == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if len(conf.Headers) > 0 && len(conf.Headers)%2 != 0 {
|
||||
logger.Warningf("static_global_webhook headers count is odd(%d), headers will be ignored", len(conf.Headers))
|
||||
}
|
||||
|
||||
timeout := conf.Timeout
|
||||
if timeout <= 0 {
|
||||
timeout = 10
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: conf.SkipVerify},
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
}
|
||||
|
||||
if poster.UseProxy(conf.Url) {
|
||||
transport.Proxy = http.ProxyFromEnvironment
|
||||
}
|
||||
|
||||
staticGlobalWebhookClient = &http.Client{
|
||||
Timeout: time.Duration(timeout) * time.Second,
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
logger.Infof("static_global_webhook initialized, url:%s", conf.Url)
|
||||
}
|
||||
|
||||
func SendStaticGlobalWebhook(ctx *ctx.Context, event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
if staticGlobalWebhookClient == nil {
|
||||
return
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("%s failed to marshal event err:%v", staticGlobalWebhookChannel, err)
|
||||
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", staticGlobalWebhookConf.Url, bytes.NewBuffer(bs))
|
||||
if err != nil {
|
||||
logger.Warningf("%s failed to new request event:%s err:%v", staticGlobalWebhookChannel, string(bs), err)
|
||||
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if staticGlobalWebhookConf.BasicAuthUser != "" && staticGlobalWebhookConf.BasicAuthPass != "" {
|
||||
req.SetBasicAuth(staticGlobalWebhookConf.BasicAuthUser, staticGlobalWebhookConf.BasicAuthPass)
|
||||
}
|
||||
|
||||
if len(staticGlobalWebhookConf.Headers) > 0 && len(staticGlobalWebhookConf.Headers)%2 == 0 {
|
||||
for i := 0; i < len(staticGlobalWebhookConf.Headers); i += 2 {
|
||||
if staticGlobalWebhookConf.Headers[i] == "Host" || staticGlobalWebhookConf.Headers[i] == "host" {
|
||||
req.Host = staticGlobalWebhookConf.Headers[i+1]
|
||||
continue
|
||||
}
|
||||
req.Header.Set(staticGlobalWebhookConf.Headers[i], staticGlobalWebhookConf.Headers[i+1])
|
||||
}
|
||||
}
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
|
||||
resp, err := staticGlobalWebhookClient.Do(req)
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
|
||||
logger.Errorf("%s_fail url:%s event:%s error:%v", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, event.Hash, err)
|
||||
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
|
||||
res := fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(body))
|
||||
if resp.StatusCode >= 400 {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
|
||||
logger.Errorf("%s_fail url:%s status:%d body:%s event:%s", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
|
||||
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, res, fmt.Errorf("status code %d", resp.StatusCode))
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugf("%s_succ url:%s status:%d body:%s event:%s", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
|
||||
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, res, nil)
|
||||
}
|
||||
114
alert/sender/global_webhook_test.go
Normal file
114
alert/sender/global_webhook_test.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package sender
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
ctxpkg "github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type roundTripperFunc func(*http.Request) (*http.Response, error)
|
||||
|
||||
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
return f(req)
|
||||
}
|
||||
|
||||
func newStaticWebhookTestStats() *astats.Stats {
|
||||
return &astats.Stats{
|
||||
AlertNotifyTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{Name: "test_static_global_webhook_total"},
|
||||
[]string{"channel"},
|
||||
),
|
||||
AlertNotifyErrorTotal: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{Name: "test_static_global_webhook_error_total"},
|
||||
[]string{"channel"},
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendStaticGlobalWebhookRecordsNewRequestFailure(t *testing.T) {
|
||||
prevClient := staticGlobalWebhookClient
|
||||
prevConf := staticGlobalWebhookConf
|
||||
defer func() {
|
||||
staticGlobalWebhookClient = prevClient
|
||||
staticGlobalWebhookConf = prevConf
|
||||
}()
|
||||
|
||||
NotifyRecordQueue.RemoveAll()
|
||||
defer NotifyRecordQueue.RemoveAll()
|
||||
|
||||
staticGlobalWebhookClient = &http.Client{}
|
||||
staticGlobalWebhookConf = aconf.GlobalWebhook{Enable: true, Url: "://bad-url"}
|
||||
|
||||
SendStaticGlobalWebhook(
|
||||
ctxpkg.NewContext(context.Background(), nil, true),
|
||||
&models.AlertCurEvent{Id: 1, Hash: "event-1"},
|
||||
newStaticWebhookTestStats(),
|
||||
)
|
||||
|
||||
if got := NotifyRecordQueue.Len(); got != 1 {
|
||||
t.Fatalf("expected 1 notify record, got %d", got)
|
||||
}
|
||||
|
||||
record, ok := NotifyRecordQueue.PopBack().(*models.NotificationRecord)
|
||||
if !ok {
|
||||
t.Fatalf("expected *models.NotificationRecord in queue")
|
||||
}
|
||||
|
||||
if record.Status != models.NotiStatusFailure {
|
||||
t.Fatalf("expected failure status, got %d", record.Status)
|
||||
}
|
||||
|
||||
if record.Channel != staticGlobalWebhookChannel {
|
||||
t.Fatalf("expected channel %q, got %q", staticGlobalWebhookChannel, record.Channel)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendStaticGlobalWebhookRecordsTransportFailure(t *testing.T) {
|
||||
prevClient := staticGlobalWebhookClient
|
||||
prevConf := staticGlobalWebhookConf
|
||||
defer func() {
|
||||
staticGlobalWebhookClient = prevClient
|
||||
staticGlobalWebhookConf = prevConf
|
||||
}()
|
||||
|
||||
NotifyRecordQueue.RemoveAll()
|
||||
defer NotifyRecordQueue.RemoveAll()
|
||||
|
||||
staticGlobalWebhookClient = &http.Client{
|
||||
Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
|
||||
return nil, errors.New("transport boom")
|
||||
}),
|
||||
}
|
||||
staticGlobalWebhookConf = aconf.GlobalWebhook{Enable: true, Url: "http://example.com/webhook"}
|
||||
|
||||
SendStaticGlobalWebhook(
|
||||
ctxpkg.NewContext(context.Background(), nil, true),
|
||||
&models.AlertCurEvent{Id: 2, Hash: "event-2"},
|
||||
newStaticWebhookTestStats(),
|
||||
)
|
||||
|
||||
if got := NotifyRecordQueue.Len(); got != 1 {
|
||||
t.Fatalf("expected 1 notify record, got %d", got)
|
||||
}
|
||||
|
||||
record, ok := NotifyRecordQueue.PopBack().(*models.NotificationRecord)
|
||||
if !ok {
|
||||
t.Fatalf("expected *models.NotificationRecord in queue")
|
||||
}
|
||||
|
||||
if record.Status != models.NotiStatusFailure {
|
||||
t.Fatalf("expected failure status, got %d", record.Status)
|
||||
}
|
||||
|
||||
if !strings.Contains(record.Details, "transport boom") {
|
||||
t.Fatalf("expected transport error details, got %q", record.Details)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user