mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
1 Commits
dev21
...
update-hea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd5f132676 |
@@ -844,7 +844,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
}
|
||||
m["ident"] = target.Ident
|
||||
|
||||
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
|
||||
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.BeatTime), trigger.Severity))
|
||||
}
|
||||
case "offset":
|
||||
idents, exists := arw.Processor.TargetsOfAlertRuleCache.Get(arw.Processor.EngineName, arw.Rule.Id)
|
||||
@@ -873,7 +873,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
continue
|
||||
}
|
||||
if target, exists := targetMap[ident]; exists {
|
||||
if now-target.UpdateAt > 120 {
|
||||
if now-target.BeatTime > 120 {
|
||||
// means this target is not a active host, do not check offset
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -38,6 +38,16 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
|
||||
total, err := models.TargetCountByFilter(rt.Ctx, query)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
models.FillTargetsBeatTime(rt.Redis, hosts)
|
||||
now := time.Now().Unix()
|
||||
for i := 0; i < len(hosts); i++ {
|
||||
if now-hosts[i].BeatTime < 60 {
|
||||
hosts[i].TargetUp = 2
|
||||
} else if now-hosts[i].BeatTime < 180 {
|
||||
hosts[i].TargetUp = 1
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": hosts,
|
||||
"total": total,
|
||||
@@ -81,9 +91,24 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
models.BuildTargetWhereWithBgids(bgids),
|
||||
models.BuildTargetWhereWithDsIds(dsIds),
|
||||
models.BuildTargetWhereWithQuery(query),
|
||||
models.BuildTargetWhereWithDowntime(downtime),
|
||||
models.BuildTargetWhereWithHosts(hosts),
|
||||
}
|
||||
|
||||
// downtime 筛选:从缓存获取心跳时间,选择较小的集合用 IN 或 NOT IN 过滤
|
||||
if downtime != 0 {
|
||||
downtimeOpt, hasMatch := rt.downtimeFilter(downtime)
|
||||
if !hasMatch {
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": []*models.Target{},
|
||||
"total": 0,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
if downtimeOpt != nil {
|
||||
options = append(options, downtimeOpt)
|
||||
}
|
||||
}
|
||||
|
||||
total, err := models.TargetTotal(rt.Ctx, options...)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
@@ -102,14 +127,17 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
now := time.Now()
|
||||
cache := make(map[int64]*models.BusiGroup)
|
||||
|
||||
// 从 Redis 补全 BeatTime
|
||||
models.FillTargetsBeatTime(rt.Redis, list)
|
||||
|
||||
var keys []string
|
||||
for i := 0; i < len(list); i++ {
|
||||
ginx.Dangerous(list[i].FillGroup(rt.Ctx, cache))
|
||||
keys = append(keys, models.WrapIdent(list[i].Ident))
|
||||
|
||||
if now.Unix()-list[i].UpdateAt < 60 {
|
||||
if now.Unix()-list[i].BeatTime < 60 {
|
||||
list[i].TargetUp = 2
|
||||
} else if now.Unix()-list[i].UpdateAt < 180 {
|
||||
} else if now.Unix()-list[i].BeatTime < 180 {
|
||||
list[i].TargetUp = 1
|
||||
}
|
||||
}
|
||||
@@ -148,6 +176,43 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// downtimeFilter 从缓存获取心跳时间,生成 downtime 筛选条件
|
||||
// 选择匹配集和非匹配集中较小的一方,用 IN 或 NOT IN 来减少 SQL 参数量
|
||||
// 返回值:
|
||||
// - option: 筛选条件,nil 表示所有 target 都符合条件(无需过滤)
|
||||
// - hasMatch: 是否有符合条件的 target,false 表示无匹配应返回空结果
|
||||
func (rt *Router) downtimeFilter(downtime int64) (option models.BuildTargetWhereOption, hasMatch bool) {
|
||||
now := time.Now().Unix()
|
||||
targets := rt.TargetCache.GetAll()
|
||||
var matchIdents, nonMatchIdents []string
|
||||
for _, target := range targets {
|
||||
matched := false
|
||||
if downtime > 0 {
|
||||
matched = target.BeatTime < now-downtime
|
||||
} else if downtime < 0 {
|
||||
matched = target.BeatTime > now+downtime
|
||||
}
|
||||
if matched {
|
||||
matchIdents = append(matchIdents, target.Ident)
|
||||
} else {
|
||||
nonMatchIdents = append(nonMatchIdents, target.Ident)
|
||||
}
|
||||
}
|
||||
|
||||
if len(matchIdents) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if len(nonMatchIdents) == 0 {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
if len(matchIdents) <= len(nonMatchIdents) {
|
||||
return models.BuildTargetWhereWithIdents(matchIdents), true
|
||||
}
|
||||
return models.BuildTargetWhereExcludeIdents(nonMatchIdents), true
|
||||
}
|
||||
|
||||
func (rt *Router) targetExtendInfoByIdent(c *gin.Context) {
|
||||
ident := ginx.QueryStr(c, "ident", "")
|
||||
key := models.WrapExtendIdent(ident)
|
||||
|
||||
@@ -90,7 +90,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
foundDefaultDatasource = true
|
||||
}
|
||||
|
||||
logger.Debugf("get datasource: %+v", item)
|
||||
// logger.Debugf("get datasource: %+v", item)
|
||||
ds := datasource.DatasourceInfo{
|
||||
Id: item.Id,
|
||||
Name: item.Name,
|
||||
@@ -236,5 +236,5 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
// logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -27,7 +27,8 @@ type TargetCacheType struct {
|
||||
redis storage.Redis
|
||||
|
||||
sync.RWMutex
|
||||
targets map[string]*models.Target // key: ident
|
||||
targets map[string]*models.Target // key: ident
|
||||
targetsIndex map[string][]string // key: ip, value: ident list
|
||||
}
|
||||
|
||||
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
|
||||
@@ -38,6 +39,7 @@ func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *Target
|
||||
stats: stats,
|
||||
redis: redis,
|
||||
targets: make(map[string]*models.Target),
|
||||
targetsIndex: make(map[string][]string),
|
||||
}
|
||||
|
||||
tc.SyncTargets()
|
||||
@@ -51,6 +53,7 @@ func (tc *TargetCacheType) Reset() {
|
||||
tc.statTotal = -1
|
||||
tc.statLastUpdated = -1
|
||||
tc.targets = make(map[string]*models.Target)
|
||||
tc.targetsIndex = make(map[string][]string)
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
@@ -62,8 +65,17 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
|
||||
idx := make(map[string][]string, len(m))
|
||||
for ident, target := range m {
|
||||
if _, ok := idx[target.HostIp]; !ok {
|
||||
idx[target.HostIp] = []string{}
|
||||
}
|
||||
idx[target.HostIp] = append(idx[target.HostIp], ident)
|
||||
}
|
||||
|
||||
tc.Lock()
|
||||
tc.targets = m
|
||||
tc.targetsIndex = idx
|
||||
tc.Unlock()
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
@@ -78,6 +90,75 @@ func (tc *TargetCacheType) Get(ident string) (*models.Target, bool) {
|
||||
return val, has
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) GetByIp(ip string) ([]*models.Target, bool) {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
idents, has := tc.targetsIndex[ip]
|
||||
if !has {
|
||||
return nil, false
|
||||
}
|
||||
targs := make([]*models.Target, 0, len(idents))
|
||||
for _, ident := range idents {
|
||||
if val, has := tc.targets[ident]; has {
|
||||
targs = append(targs, val)
|
||||
}
|
||||
}
|
||||
return targs, len(targs) > 0
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) GetAll() []*models.Target {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
lst := make([]*models.Target, 0, len(tc.targets))
|
||||
for _, target := range tc.targets {
|
||||
lst = append(lst, target)
|
||||
}
|
||||
return lst
|
||||
}
|
||||
|
||||
// GetAllBeatTime 返回所有 target 的心跳时间 map,key 为 ident,value 为 BeatTime
|
||||
func (tc *TargetCacheType) GetAllBeatTime() map[string]int64 {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
beatTimeMap := make(map[string]int64, len(tc.targets))
|
||||
for ident, target := range tc.targets {
|
||||
beatTimeMap[ident] = target.BeatTime
|
||||
}
|
||||
return beatTimeMap
|
||||
}
|
||||
|
||||
// refreshBeatTime 从 Redis 刷新缓存中所有 target 的 BeatTime
|
||||
func (tc *TargetCacheType) refreshBeatTime() {
|
||||
if tc.redis == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 快照 ident 列表,避免持锁访问 Redis
|
||||
tc.RLock()
|
||||
idents := make([]string, 0, len(tc.targets))
|
||||
for ident := range tc.targets {
|
||||
idents = append(idents, ident)
|
||||
}
|
||||
tc.RUnlock()
|
||||
|
||||
if len(idents) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
beatTimes := models.FetchBeatTimesFromRedis(tc.redis, idents)
|
||||
if len(beatTimes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
tc.Lock()
|
||||
for ident, ts := range beatTimes {
|
||||
if target, ok := tc.targets[ident]; ok {
|
||||
target.BeatTime = ts
|
||||
}
|
||||
}
|
||||
tc.Unlock()
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) Gets(idents []string) []*models.Target {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
@@ -105,7 +186,7 @@ func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, now, offset i
|
||||
continue
|
||||
}
|
||||
|
||||
if now-target.UpdateAt > 120 {
|
||||
if now-target.BeatTime > 120 {
|
||||
// means this target is not a active host, do not check offset
|
||||
continue
|
||||
}
|
||||
@@ -147,6 +228,7 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
}
|
||||
|
||||
if !tc.StatChanged(stat.Total, stat.LastUpdated) {
|
||||
tc.refreshBeatTime()
|
||||
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(0)
|
||||
tc.stats.GaugeSyncNumber.WithLabelValues("sync_targets").Set(0)
|
||||
dumper.PutSyncRecord("targets", start.Unix(), -1, -1, "not changed")
|
||||
@@ -170,6 +252,9 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
}
|
||||
}
|
||||
|
||||
// 从 Redis 批量获取心跳时间填充 BeatTime
|
||||
models.FillTargetsBeatTime(tc.redis, lst)
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].Ident] = lst[i]
|
||||
}
|
||||
@@ -186,57 +271,18 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
|
||||
// get host update time
|
||||
func (tc *TargetCacheType) GetHostUpdateTime(targets []string) map[string]int64 {
|
||||
metaMap := make(map[string]int64)
|
||||
if tc.redis == nil {
|
||||
return metaMap
|
||||
return make(map[string]int64)
|
||||
}
|
||||
|
||||
num := 0
|
||||
var keys []string
|
||||
for i := 0; i < len(targets); i++ {
|
||||
keys = append(keys, models.WrapIdentUpdateTime(targets[i]))
|
||||
num++
|
||||
if num == 100 {
|
||||
vals := storage.MGet(context.Background(), tc.redis, keys)
|
||||
for _, value := range vals {
|
||||
var hostUpdateTime models.HostUpdateTime
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := json.Unmarshal(value, &hostUpdateTime)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value)
|
||||
continue
|
||||
}
|
||||
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
|
||||
}
|
||||
keys = keys[:0]
|
||||
num = 0
|
||||
}
|
||||
}
|
||||
|
||||
vals := storage.MGet(context.Background(), tc.redis, keys)
|
||||
for _, value := range vals {
|
||||
var hostUpdateTime models.HostUpdateTime
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := json.Unmarshal(value, &hostUpdateTime)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to unmarshal host err:%v value:%s", err, string(value))
|
||||
continue
|
||||
}
|
||||
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
|
||||
}
|
||||
metaMap := models.FetchBeatTimesFromRedis(tc.redis, targets)
|
||||
|
||||
for _, ident := range targets {
|
||||
if _, ok := metaMap[ident]; !ok {
|
||||
// if not exists, get from cache
|
||||
target, exists := tc.Get(ident)
|
||||
if exists {
|
||||
metaMap[ident] = target.UpdateAt
|
||||
metaMap[ident] = target.BeatTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
114
models/target.go
114
models/target.go
@@ -1,6 +1,8 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -8,6 +10,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -36,6 +39,7 @@ type Target struct {
|
||||
OS string `json:"os" gorm:"column:os"`
|
||||
HostTags []string `json:"host_tags" gorm:"serializer:json"`
|
||||
|
||||
BeatTime int64 `json:"beat_time" gorm:"-"` // 实时心跳时间,从 Redis 获取
|
||||
UnixTime int64 `json:"unixtime" gorm:"-"`
|
||||
Offset int64 `json:"offset" gorm:"-"`
|
||||
TargetUp float64 `json:"target_up" gorm:"-"`
|
||||
@@ -97,12 +101,6 @@ func (t *Target) MatchGroupId(gid ...int64) bool {
|
||||
}
|
||||
|
||||
func (t *Target) AfterFind(tx *gorm.DB) (err error) {
|
||||
delta := time.Now().Unix() - t.UpdateAt
|
||||
if delta < 60 {
|
||||
t.TargetUp = 2
|
||||
} else if delta < 180 {
|
||||
t.TargetUp = 1
|
||||
}
|
||||
t.FillTagsMap()
|
||||
return
|
||||
}
|
||||
@@ -182,6 +180,24 @@ func BuildTargetWhereWithHosts(hosts []string) BuildTargetWhereOption {
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithIdents(idents []string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if len(idents) > 0 {
|
||||
session = session.Where("ident in (?)", idents)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereExcludeIdents(idents []string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if len(idents) > 0 {
|
||||
session = session.Where("ident not in (?)", idents)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if query != "" {
|
||||
@@ -203,17 +219,6 @@ func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithDowntime(downtime int64) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if downtime > 0 {
|
||||
session = session.Where("target.update_at < ?", time.Now().Unix()-downtime)
|
||||
} else if downtime < 0 {
|
||||
session = session.Where("target.update_at > ?", time.Now().Unix()+downtime)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func buildTargetWhere(ctx *ctx.Context, options ...BuildTargetWhereOption) *gorm.DB {
|
||||
sub := DB(ctx).Model(&Target{}).Distinct("target.ident")
|
||||
for _, opt := range options {
|
||||
@@ -264,21 +269,6 @@ func TargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}) (int6
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func MissTargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) ([]*Target, error) {
|
||||
var lst []*Target
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
|
||||
err := session.Order("ident").Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func MissTargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) (int64, error) {
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func TargetFilterQueryBuild(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) *gorm.DB {
|
||||
sub := DB(ctx).Model(&Target{}).Distinct("target.ident").Joins("left join " +
|
||||
"target_busi_group on target.ident = target_busi_group.target_ident")
|
||||
@@ -619,6 +609,66 @@ func (t *Target) FillMeta(meta *HostMeta) {
|
||||
t.RemoteAddr = meta.RemoteAddr
|
||||
}
|
||||
|
||||
// FetchBeatTimesFromRedis 从 Redis 批量获取心跳时间,返回 ident -> updateTime 的映射
|
||||
func FetchBeatTimesFromRedis(redis storage.Redis, idents []string) map[string]int64 {
|
||||
result := make(map[string]int64, len(idents))
|
||||
if redis == nil || len(idents) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
num := 0
|
||||
var keys []string
|
||||
for i := 0; i < len(idents); i++ {
|
||||
keys = append(keys, WrapIdentUpdateTime(idents[i]))
|
||||
num++
|
||||
if num == 100 {
|
||||
fetchBeatTimeBatch(redis, keys, result)
|
||||
keys = keys[:0]
|
||||
num = 0
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) > 0 {
|
||||
fetchBeatTimeBatch(redis, keys, result)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func fetchBeatTimeBatch(redis storage.Redis, keys []string, result map[string]int64) {
|
||||
vals := storage.MGet(context.Background(), redis, keys)
|
||||
for _, value := range vals {
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
var hut HostUpdateTime
|
||||
if err := json.Unmarshal(value, &hut); err != nil {
|
||||
logger.Warningf("failed to unmarshal host update time: %v", err)
|
||||
continue
|
||||
}
|
||||
result[hut.Ident] = hut.UpdateTime
|
||||
}
|
||||
}
|
||||
|
||||
// FillTargetsBeatTime 从 Redis 批量获取心跳时间填充 target.BeatTime
|
||||
func FillTargetsBeatTime(redis storage.Redis, targets []*Target) {
|
||||
if len(targets) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
idents := make([]string, len(targets))
|
||||
for i, t := range targets {
|
||||
idents[i] = t.Ident
|
||||
}
|
||||
|
||||
beatTimes := FetchBeatTimesFromRedis(redis, idents)
|
||||
for _, t := range targets {
|
||||
if ts, ok := beatTimes[t.Ident]; ok {
|
||||
t.BeatTime = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TargetIdents(ctx *ctx.Context, ids []int64) ([]string, error) {
|
||||
var ret []string
|
||||
|
||||
|
||||
@@ -106,6 +106,7 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 心跳时间只写入 Redis,不再写入 MySQL update_at
|
||||
err := s.updateTargetsUpdateTs(lst, now, s.redis)
|
||||
if err != nil {
|
||||
logger.Errorf("update_ts: failed to update targets: %v error: %v", lst, err)
|
||||
@@ -133,12 +134,7 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
// 新 target 仍需 INSERT 注册到 MySQL
|
||||
var exists []string
|
||||
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
if err != nil {
|
||||
@@ -153,35 +149,9 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
logger.Debugf("update_ts: redis is nil")
|
||||
|
||||
@@ -18,8 +18,6 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
UpdateTargetByUrlConcurrency int
|
||||
|
||||
@@ -129,10 +127,6 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user