Compare commits

..

3 Commits

Author SHA1 Message Date
ning
a855a01323 fix: es query data 2026-03-13 17:47:57 +08:00
ning
314e11a303 refactor: change get ident from metric 2026-03-05 15:32:21 +08:00
ning
3e5871e9f1 fix: target update sync 2026-03-04 20:51:16 +08:00
13 changed files with 45 additions and 259 deletions

View File

@@ -33,17 +33,6 @@ type Alerting struct {
TemplatesDir string
NotifyConcurrency int
WebhookBatchSend bool
GlobalWebhook GlobalWebhook
}
type GlobalWebhook struct {
Enable bool
Url string
BasicAuthUser string
BasicAuthPass string
Timeout int
Headers []string
SkipVerify bool
}
type CallPlugin struct {

View File

@@ -117,8 +117,6 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
sender.InitStaticGlobalWebhook(alertc.Alerting.GlobalWebhook)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, configCvalCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients, alertMuteCache)

View File

@@ -608,10 +608,6 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
return
}
if !isSubscribe {
go sender.SendStaticGlobalWebhook(e.ctx, event.DeepCopy(), e.Astats)
}
rule := e.alertRuleCache.Get(event.RuleId)
if rule == nil {
return

View File

@@ -1,116 +0,0 @@
package sender
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
var staticGlobalWebhookClient *http.Client
var staticGlobalWebhookConf aconf.GlobalWebhook
const staticGlobalWebhookChannel = "static_global_webhook"
func InitStaticGlobalWebhook(conf aconf.GlobalWebhook) {
staticGlobalWebhookConf = conf
if !conf.Enable || conf.Url == "" {
return
}
if len(conf.Headers) > 0 && len(conf.Headers)%2 != 0 {
logger.Warningf("static_global_webhook headers count is odd(%d), headers will be ignored", len(conf.Headers))
}
timeout := conf.Timeout
if timeout <= 0 {
timeout = 10
}
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: conf.SkipVerify},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
}
if poster.UseProxy(conf.Url) {
transport.Proxy = http.ProxyFromEnvironment
}
staticGlobalWebhookClient = &http.Client{
Timeout: time.Duration(timeout) * time.Second,
Transport: transport,
}
logger.Infof("static_global_webhook initialized, url:%s", conf.Url)
}
func SendStaticGlobalWebhook(ctx *ctx.Context, event *models.AlertCurEvent, stats *astats.Stats) {
if staticGlobalWebhookClient == nil {
return
}
bs, err := json.Marshal(event)
if err != nil {
logger.Errorf("%s failed to marshal event err:%v", staticGlobalWebhookChannel, err)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
return
}
req, err := http.NewRequest("POST", staticGlobalWebhookConf.Url, bytes.NewBuffer(bs))
if err != nil {
logger.Warningf("%s failed to new request event:%s err:%v", staticGlobalWebhookChannel, string(bs), err)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
return
}
req.Header.Set("Content-Type", "application/json")
if staticGlobalWebhookConf.BasicAuthUser != "" && staticGlobalWebhookConf.BasicAuthPass != "" {
req.SetBasicAuth(staticGlobalWebhookConf.BasicAuthUser, staticGlobalWebhookConf.BasicAuthPass)
}
if len(staticGlobalWebhookConf.Headers) > 0 && len(staticGlobalWebhookConf.Headers)%2 == 0 {
for i := 0; i < len(staticGlobalWebhookConf.Headers); i += 2 {
if staticGlobalWebhookConf.Headers[i] == "Host" || staticGlobalWebhookConf.Headers[i] == "host" {
req.Host = staticGlobalWebhookConf.Headers[i+1]
continue
}
req.Header.Set(staticGlobalWebhookConf.Headers[i], staticGlobalWebhookConf.Headers[i+1])
}
}
stats.AlertNotifyTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
resp, err := staticGlobalWebhookClient.Do(req)
if err != nil {
stats.AlertNotifyErrorTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
logger.Errorf("%s_fail url:%s event:%s error:%v", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, event.Hash, err)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, "", err)
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
res := fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(body))
if resp.StatusCode >= 400 {
stats.AlertNotifyErrorTotal.WithLabelValues(staticGlobalWebhookChannel).Inc()
logger.Errorf("%s_fail url:%s status:%d body:%s event:%s", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, res, fmt.Errorf("status code %d", resp.StatusCode))
return
}
logger.Debugf("%s_succ url:%s status:%d body:%s event:%s", staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, resp.StatusCode, string(body), event.Hash)
NotifyRecord(ctx, []*models.AlertCurEvent{event}, 0, staticGlobalWebhookChannel, staticGlobalWebhookConf.Url, res, nil)
}

View File

@@ -1,114 +0,0 @@
package sender
import (
"context"
"errors"
"net/http"
"strings"
"testing"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
ctxpkg "github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/prometheus/client_golang/prometheus"
)
type roundTripperFunc func(*http.Request) (*http.Response, error)
func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}
func newStaticWebhookTestStats() *astats.Stats {
return &astats.Stats{
AlertNotifyTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "test_static_global_webhook_total"},
[]string{"channel"},
),
AlertNotifyErrorTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{Name: "test_static_global_webhook_error_total"},
[]string{"channel"},
),
}
}
func TestSendStaticGlobalWebhookRecordsNewRequestFailure(t *testing.T) {
prevClient := staticGlobalWebhookClient
prevConf := staticGlobalWebhookConf
defer func() {
staticGlobalWebhookClient = prevClient
staticGlobalWebhookConf = prevConf
}()
NotifyRecordQueue.RemoveAll()
defer NotifyRecordQueue.RemoveAll()
staticGlobalWebhookClient = &http.Client{}
staticGlobalWebhookConf = aconf.GlobalWebhook{Enable: true, Url: "://bad-url"}
SendStaticGlobalWebhook(
ctxpkg.NewContext(context.Background(), nil, true),
&models.AlertCurEvent{Id: 1, Hash: "event-1"},
newStaticWebhookTestStats(),
)
if got := NotifyRecordQueue.Len(); got != 1 {
t.Fatalf("expected 1 notify record, got %d", got)
}
record, ok := NotifyRecordQueue.PopBack().(*models.NotificationRecord)
if !ok {
t.Fatalf("expected *models.NotificationRecord in queue")
}
if record.Status != models.NotiStatusFailure {
t.Fatalf("expected failure status, got %d", record.Status)
}
if record.Channel != staticGlobalWebhookChannel {
t.Fatalf("expected channel %q, got %q", staticGlobalWebhookChannel, record.Channel)
}
}
func TestSendStaticGlobalWebhookRecordsTransportFailure(t *testing.T) {
prevClient := staticGlobalWebhookClient
prevConf := staticGlobalWebhookConf
defer func() {
staticGlobalWebhookClient = prevClient
staticGlobalWebhookConf = prevConf
}()
NotifyRecordQueue.RemoveAll()
defer NotifyRecordQueue.RemoveAll()
staticGlobalWebhookClient = &http.Client{
Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return nil, errors.New("transport boom")
}),
}
staticGlobalWebhookConf = aconf.GlobalWebhook{Enable: true, Url: "http://example.com/webhook"}
SendStaticGlobalWebhook(
ctxpkg.NewContext(context.Background(), nil, true),
&models.AlertCurEvent{Id: 2, Hash: "event-2"},
newStaticWebhookTestStats(),
)
if got := NotifyRecordQueue.Len(); got != 1 {
t.Fatalf("expected 1 notify record, got %d", got)
}
record, ok := NotifyRecordQueue.PopBack().(*models.NotificationRecord)
if !ok {
t.Fatalf("expected *models.NotificationRecord in queue")
}
if record.Status != models.NotiStatusFailure {
t.Fatalf("expected failure status, got %d", record.Status)
}
if !strings.Contains(record.Details, "transport boom") {
t.Fatalf("expected transport error details, got %q", record.Details)
}
}

View File

@@ -191,6 +191,7 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
}
if targetNeedUpdate {
newTarget.UpdateAt = time.Now().Unix()
err := models.DB(ctx).Model(&target).Updates(newTarget).Error
if err != nil {
logger.Errorf("update target fields failed, err: %v", err)

View File

@@ -599,6 +599,19 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
GetBuckets("", keys, bucketsData, metrics, "", 0, param.MetricAggr.Func)
// Drop the last incomplete bucket to avoid inaccurate values at the boundary.
// When the last bucket's time range extends beyond or reaches the query end time,
// it may contain only partial data, making aggregated values (count, sum, etc.) artificially low.
for k, v := range metrics.Data {
if len(v) <= 1 {
continue
}
lastTs := v[len(v)-1][0]
if int64(lastTs)+param.Interval > end {
metrics.Data[k] = v[:len(v)-1]
}
}
items, err := TransferData(fmt.Sprintf("%s_%s", field, param.MetricAggr.Func), param.Ref, metrics.Data), nil
var m map[string]interface{}

View File

@@ -87,7 +87,7 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
}
return DB(ctx).Transaction(func(tx *gorm.DB) error {
if err := DB(ctx).Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
if err := tx.Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
return err
}
if targets, err := TargetsGetByIdents(ctx, idents); err != nil {
@@ -100,13 +100,24 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
}
}
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
if err := tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", updateAt).Error; err != nil {
return err
}
return nil
})
}
func TargetUnbindBgids(ctx *ctx.Context, idents []string, bgids []int64) error {
return DB(ctx).Where("target_ident in ? and group_id in ?",
idents, bgids).Delete(&TargetBusiGroup{}).Error
return DB(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Where("target_ident in ? and group_id in ?",
idents, bgids).Delete(&TargetBusiGroup{}).Error; err != nil {
return err
}
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
return tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", time.Now().Unix()).Error
})
}
func TargetDeleteBgids(tx *gorm.DB, idents []string) error {
@@ -150,7 +161,8 @@ func TargetOverrideBgids(ctx *ctx.Context, idents []string, bgids []int64, tags
return err
}
if len(tags) == 0 {
return nil
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
return tx.Model(&Target{}).Where("ident IN ?", idents).Update("update_at", updateAt).Error
}
return tx.Model(Target{}).Where("ident IN ?", idents).Updates(map[string]interface{}{

View File

@@ -21,6 +21,7 @@ type Pushgw struct {
PushConcurrency int
UpdateTargetByUrlConcurrency int
GetHeartbeatFromMetric bool // 是否从时序数据中提取机器心跳时间,默认 false
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int

View File

@@ -250,8 +250,10 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = struct{}{}
if r.Pushgw.GetHeartbeatFromMetric {
// register host
ids[ident] = struct{}{}
}
// fill tags
target, has := r.TargetCache.Get(ident)

View File

@@ -200,8 +200,10 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = struct{}{}
if rt.Pushgw.GetHeartbeatFromMetric {
// register host
ids[ident] = struct{}{}
}
// fill tags
target, has := rt.TargetCache.Get(ident)

View File

@@ -195,8 +195,10 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
host, has := arr[i].Tags["ident"]
if has {
// register host
ids[host] = struct{}{}
if rt.Pushgw.GetHeartbeatFromMetric {
// register host
ids[host] = struct{}{}
}
// fill tags
target, has := rt.TargetCache.Get(host)

View File

@@ -7,8 +7,8 @@ import (
"net/http"
"sync/atomic"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/pkg/ginx"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@@ -149,7 +149,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
pstat.CounterSampleReceivedByIdent.WithLabelValues(ident).Inc()
}
if insertTarget {
if rt.Pushgw.GetHeartbeatFromMetric && insertTarget {
// has ident tag or agent_hostname tag
// register host in table target
ids[ident] = struct{}{}