mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-07 08:29:10 +00:00
Compare commits
1 Commits
main
...
send-event
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d8050aeed |
@@ -33,6 +33,17 @@ type Alerting struct {
|
||||
TemplatesDir string
|
||||
NotifyConcurrency int
|
||||
WebhookBatchSend bool
|
||||
GlobalWebhook GlobalWebhook
|
||||
}
|
||||
|
||||
type GlobalWebhook struct {
|
||||
Enable bool
|
||||
Url string
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
Timeout int
|
||||
Headers []string
|
||||
SkipVerify bool
|
||||
}
|
||||
|
||||
type CallPlugin struct {
|
||||
|
||||
@@ -117,6 +117,8 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
|
||||
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
|
||||
|
||||
sender.InitGlobalWebhook(alertc.Alerting.GlobalWebhook)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, configCvalCache, alertc.Alerting, ctx, alertStats)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients, alertMuteCache)
|
||||
|
||||
|
||||
@@ -604,6 +604,7 @@ func NeedBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
|
||||
// isSubscribe: 告警事件是否由subscribe的配置产生
|
||||
func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bool) {
|
||||
go e.HandleEventWithNotifyRule(event)
|
||||
go sender.SendGlobalWebhook(event.DeepCopy(), e.Astats)
|
||||
if event.IsRecovered && event.NotifyRecovered == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
106
alert/sender/global_webhook.go
Normal file
106
alert/sender/global_webhook.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package sender
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var globalWebhookClient *http.Client
|
||||
var globalWebhookConf aconf.GlobalWebhook
|
||||
|
||||
func InitGlobalWebhook(conf aconf.GlobalWebhook) {
|
||||
globalWebhookConf = conf
|
||||
if !conf.Enable || conf.Url == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if len(conf.Headers) > 0 && len(conf.Headers)%2 != 0 {
|
||||
logger.Warningf("global_webhook headers count is odd(%d), headers will be ignored", len(conf.Headers))
|
||||
}
|
||||
|
||||
timeout := conf.Timeout
|
||||
if timeout <= 0 {
|
||||
timeout = 10
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: conf.SkipVerify},
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
}
|
||||
|
||||
if poster.UseProxy(conf.Url) {
|
||||
transport.Proxy = http.ProxyFromEnvironment
|
||||
}
|
||||
|
||||
globalWebhookClient = &http.Client{
|
||||
Timeout: time.Duration(timeout) * time.Second,
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
logger.Infof("global_webhook initialized, url:%s", conf.Url)
|
||||
}
|
||||
|
||||
func SendGlobalWebhook(event *models.AlertCurEvent, stats *astats.Stats) {
|
||||
if globalWebhookClient == nil {
|
||||
return
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("global_webhook failed to marshal event err:%v", err)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", globalWebhookConf.Url, bytes.NewBuffer(bs))
|
||||
if err != nil {
|
||||
logger.Warningf("global_webhook failed to new request event:%s err:%v", string(bs), err)
|
||||
return
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if globalWebhookConf.BasicAuthUser != "" && globalWebhookConf.BasicAuthPass != "" {
|
||||
req.SetBasicAuth(globalWebhookConf.BasicAuthUser, globalWebhookConf.BasicAuthPass)
|
||||
}
|
||||
|
||||
if len(globalWebhookConf.Headers) > 0 && len(globalWebhookConf.Headers)%2 == 0 {
|
||||
for i := 0; i < len(globalWebhookConf.Headers); i += 2 {
|
||||
if globalWebhookConf.Headers[i] == "Host" || globalWebhookConf.Headers[i] == "host" {
|
||||
req.Host = globalWebhookConf.Headers[i+1]
|
||||
continue
|
||||
}
|
||||
req.Header.Set(globalWebhookConf.Headers[i], globalWebhookConf.Headers[i+1])
|
||||
}
|
||||
}
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues("global_webhook").Inc()
|
||||
resp, err := globalWebhookClient.Do(req)
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues("global_webhook").Inc()
|
||||
logger.Errorf("global_webhook_fail url:%s event:%s error:%v", globalWebhookConf.Url, event.Hash, err)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues("global_webhook").Inc()
|
||||
logger.Errorf("global_webhook_fail url:%s status:%d body:%s event:%s", globalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugf("global_webhook_succ url:%s status:%d body:%s event:%s", globalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
|
||||
}
|
||||
@@ -191,7 +191,6 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
|
||||
}
|
||||
|
||||
if targetNeedUpdate {
|
||||
newTarget.UpdateAt = time.Now().Unix()
|
||||
err := models.DB(ctx).Model(&target).Updates(newTarget).Error
|
||||
if err != nil {
|
||||
logger.Errorf("update target fields failed, err: %v", err)
|
||||
|
||||
@@ -87,7 +87,7 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
|
||||
}
|
||||
|
||||
return DB(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
|
||||
if err := DB(ctx).Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if targets, err := TargetsGetByIdents(ctx, idents); err != nil {
|
||||
@@ -100,24 +100,13 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
|
||||
}
|
||||
}
|
||||
|
||||
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
|
||||
if err := tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", updateAt).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func TargetUnbindBgids(ctx *ctx.Context, idents []string, bgids []int64) error {
|
||||
return DB(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Where("target_ident in ? and group_id in ?",
|
||||
idents, bgids).Delete(&TargetBusiGroup{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
|
||||
return tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", time.Now().Unix()).Error
|
||||
})
|
||||
return DB(ctx).Where("target_ident in ? and group_id in ?",
|
||||
idents, bgids).Delete(&TargetBusiGroup{}).Error
|
||||
}
|
||||
|
||||
func TargetDeleteBgids(tx *gorm.DB, idents []string) error {
|
||||
@@ -161,8 +150,7 @@ func TargetOverrideBgids(ctx *ctx.Context, idents []string, bgids []int64, tags
|
||||
return err
|
||||
}
|
||||
if len(tags) == 0 {
|
||||
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
|
||||
return tx.Model(&Target{}).Where("ident IN ?", idents).Update("update_at", updateAt).Error
|
||||
return nil
|
||||
}
|
||||
|
||||
return tx.Model(Target{}).Where("ident IN ?", idents).Updates(map[string]interface{}{
|
||||
|
||||
@@ -21,7 +21,6 @@ type Pushgw struct {
|
||||
PushConcurrency int
|
||||
UpdateTargetByUrlConcurrency int
|
||||
|
||||
GetHeartbeatFromMetric bool // 是否从时序数据中提取机器心跳时间,默认 false
|
||||
BusiGroupLabelKey string
|
||||
IdentMetrics []string
|
||||
IdentStatsThreshold int
|
||||
|
||||
@@ -250,10 +250,8 @@ func (r *Router) datadogSeries(c *gin.Context) {
|
||||
}
|
||||
|
||||
if ident != "" {
|
||||
if r.Pushgw.GetHeartbeatFromMetric {
|
||||
// register host
|
||||
ids[ident] = struct{}{}
|
||||
}
|
||||
// register host
|
||||
ids[ident] = struct{}{}
|
||||
|
||||
// fill tags
|
||||
target, has := r.TargetCache.Get(ident)
|
||||
|
||||
@@ -200,10 +200,8 @@ func (rt *Router) falconPush(c *gin.Context) {
|
||||
}
|
||||
|
||||
if ident != "" {
|
||||
if rt.Pushgw.GetHeartbeatFromMetric {
|
||||
// register host
|
||||
ids[ident] = struct{}{}
|
||||
}
|
||||
// register host
|
||||
ids[ident] = struct{}{}
|
||||
|
||||
// fill tags
|
||||
target, has := rt.TargetCache.Get(ident)
|
||||
|
||||
@@ -195,10 +195,8 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
|
||||
|
||||
host, has := arr[i].Tags["ident"]
|
||||
if has {
|
||||
if rt.Pushgw.GetHeartbeatFromMetric {
|
||||
// register host
|
||||
ids[host] = struct{}{}
|
||||
}
|
||||
// register host
|
||||
ids[host] = struct{}{}
|
||||
|
||||
// fill tags
|
||||
target, has := rt.TargetCache.Get(host)
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ginx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ginx"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
@@ -149,7 +149,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
|
||||
pstat.CounterSampleReceivedByIdent.WithLabelValues(ident).Inc()
|
||||
}
|
||||
|
||||
if rt.Pushgw.GetHeartbeatFromMetric && insertTarget {
|
||||
if insertTarget {
|
||||
// has ident tag or agent_hostname tag
|
||||
// register host in table target
|
||||
ids[ident] = struct{}{}
|
||||
|
||||
Reference in New Issue
Block a user