Compare commits

...

2 Commits

Author SHA1 Message Date
ning
314e11a303 refactor: change get ident from metric 2026-03-05 15:32:21 +08:00
ning
3e5871e9f1 fix: target update sync 2026-03-04 20:51:16 +08:00
7 changed files with 32 additions and 12 deletions

View File

@@ -191,6 +191,7 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
}
if targetNeedUpdate {
newTarget.UpdateAt = time.Now().Unix()
err := models.DB(ctx).Model(&target).Updates(newTarget).Error
if err != nil {
logger.Errorf("update target fields failed, err: %v", err)

View File

@@ -87,7 +87,7 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
}
return DB(ctx).Transaction(func(tx *gorm.DB) error {
if err := DB(ctx).Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
if err := tx.Clauses(cl).CreateInBatches(&lst, 10).Error; err != nil {
return err
}
if targets, err := TargetsGetByIdents(ctx, idents); err != nil {
@@ -100,13 +100,24 @@ func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64, tags []st
}
}
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
if err := tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", updateAt).Error; err != nil {
return err
}
return nil
})
}
func TargetUnbindBgids(ctx *ctx.Context, idents []string, bgids []int64) error {
return DB(ctx).Where("target_ident in ? and group_id in ?",
idents, bgids).Delete(&TargetBusiGroup{}).Error
return DB(ctx).Transaction(func(tx *gorm.DB) error {
if err := tx.Where("target_ident in ? and group_id in ?",
idents, bgids).Delete(&TargetBusiGroup{}).Error; err != nil {
return err
}
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
return tx.Model(&Target{}).Where("ident in ?", idents).Update("update_at", time.Now().Unix()).Error
})
}
func TargetDeleteBgids(tx *gorm.DB, idents []string) error {
@@ -150,7 +161,8 @@ func TargetOverrideBgids(ctx *ctx.Context, idents []string, bgids []int64, tags
return err
}
if len(tags) == 0 {
return nil
// update target.update_at so that syncTargets can detect the change and refresh GroupIds cache
return tx.Model(&Target{}).Where("ident IN ?", idents).Update("update_at", updateAt).Error
}
return tx.Model(Target{}).Where("ident IN ?", idents).Updates(map[string]interface{}{

View File

@@ -21,6 +21,7 @@ type Pushgw struct {
PushConcurrency int
UpdateTargetByUrlConcurrency int
GetHeartbeatFromMetric bool // 是否从时序数据中提取机器心跳时间,默认 false
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int

View File

@@ -250,8 +250,10 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = struct{}{}
if r.Pushgw.GetHeartbeatFromMetric {
// register host
ids[ident] = struct{}{}
}
// fill tags
target, has := r.TargetCache.Get(ident)

View File

@@ -200,8 +200,10 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if ident != "" {
// register host
ids[ident] = struct{}{}
if rt.Pushgw.GetHeartbeatFromMetric {
// register host
ids[ident] = struct{}{}
}
// fill tags
target, has := rt.TargetCache.Get(ident)

View File

@@ -195,8 +195,10 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
host, has := arr[i].Tags["ident"]
if has {
// register host
ids[host] = struct{}{}
if rt.Pushgw.GetHeartbeatFromMetric {
// register host
ids[host] = struct{}{}
}
// fill tags
target, has := rt.TargetCache.Get(host)

View File

@@ -7,8 +7,8 @@ import (
"net/http"
"sync/atomic"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/pkg/ginx"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@@ -149,7 +149,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
pstat.CounterSampleReceivedByIdent.WithLabelValues(ident).Inc()
}
if insertTarget {
if rt.Pushgw.GetHeartbeatFromMetric && insertTarget {
// has ident tag or agent_hostname tag
// register host in table target
ids[ident] = struct{}{}