Compare commits

...

1 Commits

Author SHA1 Message Date
flashbo
d8cac1da41 feat: update busi group tags (#2101) 2024-08-28 17:49:10 +08:00
14 changed files with 330 additions and 48 deletions

View File

@@ -358,11 +358,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
}
m["ident"] = target.Ident
bg := arw.processor.BusiGroupCache.GetByBusiGroupId(target.GroupId)
if bg != nil && bg.LabelEnable == 1 {
m["busigroup"] = bg.LabelValue
}
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
}
case "offset":
@@ -411,11 +406,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
}
m["ident"] = host
bg := arw.processor.BusiGroupCache.GetByBusiGroupId(target.GroupId)
if bg != nil && bg.LabelEnable == 1 {
m["busigroup"] = bg.LabelValue
}
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(offset), trigger.Severity))
}
case "pct_target_miss":

View File

@@ -114,7 +114,7 @@ func BgNotMatchMuteStrategy(rule *models.AlertRule, event *models.AlertCurEvent,
target, exists := targetCache.Get(ident)
// 对于包含ident的告警事件check一下ident所属bg和rule所属bg是否相同
// 如果告警规则选择了只在本BG生效那其他BG的机器就不能因此规则产生告警
if exists && target.GroupId != rule.GroupId {
if exists && !target.MatchGroupId(rule.GroupId) {
logger.Debugf("[%s] mute: rule_eval:%d cluster:%s", "BgNotMatchMuteStrategy", rule.Id, event.Cluster)
return true
}

View File

@@ -181,7 +181,7 @@ func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *m
return false, nil
}
return target.GroupId == tpl.GroupId, nil
return target.MatchGroupId(tpl.GroupId), nil
}
func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) {

View File

@@ -110,10 +110,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
go cron.CleanNotifyRecord(ctx, config.Center.CleanNotifyRecordDay)
alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, config.Alert, config.Ibex, cconf.Operations, dsCache, notifyConfigCache, promClients, tdengineClients,
centerRouter := centerrt.New(config.HTTP, config.Center, config.Alert, config.Ibex,
cconf.Operations, config.Pushgw, dsCache, notifyConfigCache, promClients, tdengineClients,
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
centerRouter.Config(r)

View File

@@ -22,6 +22,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/version"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/storage"
"github.com/ccfos/nightingale/v6/tdengine"
@@ -38,6 +39,7 @@ type Router struct {
Ibex conf.Ibex
Alert aconf.Alert
Operations cconf.Operation
Pushgw pconf.Pushgw
DatasourceCache *memsto.DatasourceCacheType
NotifyConfigCache *memsto.NotifyConfigCacheType
PromClients *prom.PromClientMap
@@ -53,13 +55,19 @@ type Router struct {
HeartbeatHook HeartbeatHookFunc
}
func New(httpConfig httpx.Config, center cconf.Center, alert aconf.Alert, ibex conf.Ibex, operations cconf.Operation, ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType, pc *prom.PromClientMap, tdendgineClients *tdengine.TdengineClientMap, redis storage.Redis, sso *sso.SsoClient, ctx *ctx.Context, metaSet *metas.Set, idents *idents.Set, tc *memsto.TargetCacheType, uc *memsto.UserCacheType, ugc *memsto.UserGroupCacheType) *Router {
func New(httpConfig httpx.Config, center cconf.Center,
alert aconf.Alert, ibex conf.Ibex, operations cconf.Operation, pushgw pconf.Pushgw,
ds *memsto.DatasourceCacheType, ncc *memsto.NotifyConfigCacheType, pc *prom.PromClientMap,
tdendgineClients *tdengine.TdengineClientMap, redis storage.Redis, sso *sso.SsoClient,
ctx *ctx.Context, metaSet *metas.Set, idents *idents.Set, tc *memsto.TargetCacheType,
uc *memsto.UserCacheType, ugc *memsto.UserGroupCacheType) *Router {
return &Router{
HTTP: httpConfig,
Center: center,
Alert: alert,
Ibex: ibex,
Operations: operations,
Pushgw: pushgw,
DatasourceCache: ds,
NotifyConfigCache: ncc,
PromClients: pc,
@@ -277,6 +285,9 @@ func (rt *Router) Config(r *gin.Engine) {
pages.DELETE("/targets/tags", rt.auth(), rt.user(), rt.perm("/targets/put"), rt.targetUnbindTagsByFE)
pages.PUT("/targets/note", rt.auth(), rt.user(), rt.perm("/targets/put"), rt.targetUpdateNote)
pages.PUT("/targets/bgid", rt.auth(), rt.user(), rt.perm("/targets/put"), rt.targetUpdateBgid)
pages.PUT("/targets/bgids", rt.auth(), rt.user(), rt.perm("/targets/put"), rt.targetBindBgids)
pages.DELETE("/targets/bgids", rt.auth(), rt.user(), rt.perm("/targets/del"), rt.targetUnbindBgids)
pages.POST("/targets/migrate-bg", rt.auth(), rt.user(), rt.perm("/targets/put"), rt.migrateBg)
pages.POST("/builtin-cate-favorite", rt.auth(), rt.user(), rt.builtinCateFavoriteAdd)
pages.DELETE("/builtin-cate-favorite/:name", rt.auth(), rt.user(), rt.builtinCateFavoriteDel)

View File

@@ -82,11 +82,14 @@ func HandleHeartbeat(c *gin.Context, ctx *ctx.Context, engineName string, metaSe
gid := ginx.QueryInt64(c, "gid", 0)
hostIp := strings.TrimSpace(req.HostIp)
field := make(map[string]interface{})
if gid != 0 && gid != target.GroupId {
field["group_id"] = gid
if gid != 0 && !target.MatchGroupId(gid) {
err := models.TargetBindBgids(ctx, []string{target.Ident}, []int64{gid})
if err != nil {
logger.Errorf("update target group ids failed, err: %v", err)
}
}
field := make(map[string]interface{})
if hostIp != "" && hostIp != target.HostIp {
field["host_ip"] = hostIp
}

View File

@@ -78,6 +78,13 @@ func (rt *Router) targetGets(c *gin.Context) {
ginx.Offset(c, limit), order, desc, options...)
ginx.Dangerous(err)
tgs, err := models.TargetBusiGroupsGetAll(rt.Ctx)
ginx.Dangerous(err)
for _, t := range list {
t.GroupIds = tgs[t.Ident]
}
if err == nil {
now := time.Now()
cache := make(map[int64]*models.BusiGroup)
@@ -442,6 +449,92 @@ func (rt *Router) targetUpdateBgid(c *gin.Context) {
ginx.NewRender(c).Data(failedResults, models.TargetUpdateBgid(rt.Ctx, f.Idents, f.Bgid, false))
}
type targetBgidsForm struct {
Idents []string `json:"idents" binding:"required_without=HostIps"`
HostIps []string `json:"host_ips" binding:"required_without=Idents"`
Bgids []int64 `json:"bgids"`
}
func (rt *Router) targetBindBgids(c *gin.Context) {
var f targetBgidsForm
var err error
var failedResults = make(map[string]string)
ginx.BindJSON(c, &f)
if len(f.Idents) == 0 && len(f.HostIps) == 0 {
ginx.Bomb(http.StatusBadRequest, "idents or host_ips must be provided")
}
// Acquire idents by idents and hostIps
failedResults, f.Idents, err = models.TargetsGetIdentsByIdentsAndHostIps(rt.Ctx, f.Idents, f.HostIps)
if err != nil {
ginx.Bomb(http.StatusBadRequest, err.Error())
}
user := c.MustGet("user").(*models.User)
if user.IsAdmin() {
ginx.NewRender(c).Data(failedResults, models.TargetBindBgids(rt.Ctx, f.Idents, f.Bgids))
return
}
can, err := user.CheckPerm(rt.Ctx, "/targets/bind")
ginx.Dangerous(err)
if !can {
ginx.Bomb(http.StatusForbidden, "No permission. Only admin can assign BG")
}
rt.checkTargetPerm(c, f.Idents)
for _, bgid := range f.Bgids {
bg := BusiGroup(rt.Ctx, bgid)
can, err := user.CanDoBusiGroup(rt.Ctx, bg, "rw")
ginx.Dangerous(err)
if !can {
ginx.Bomb(http.StatusForbidden, "No permission. You are not admin of BG(%s)", bg.Name)
}
}
ginx.NewRender(c).Data(failedResults, models.TargetBindBgids(rt.Ctx, f.Idents, f.Bgids))
}
type targetUnbindBgidsForm struct {
Idents []string `json:"idents" binding:"required_without=HostIps"`
HostIps []string `json:"host_ips" binding:"required_without=Idents"`
Bgids []int64 `json:"bgids"`
}
func (rt *Router) targetUnbindBgids(c *gin.Context) {
var f targetUnbindBgidsForm
var err error
var failedResults = make(map[string]string)
ginx.BindJSON(c, &f)
if len(f.Idents) == 0 && len(f.HostIps) == 0 {
ginx.Bomb(http.StatusBadRequest, "idents or host_ips must be provided")
}
// Acquire idents by idents and hostIps
failedResults, f.Idents, err = models.TargetsGetIdentsByIdentsAndHostIps(rt.Ctx, f.Idents, f.HostIps)
if err != nil {
ginx.Bomb(http.StatusBadRequest, err.Error())
}
user := c.MustGet("user").(*models.User)
if user.IsAdmin() {
ginx.NewRender(c).Data(failedResults, models.TargetUnbindBgids(rt.Ctx, f.Idents, f.Bgids))
return
}
rt.checkTargetPerm(c, f.Idents)
ginx.NewRender(c).Data(failedResults, models.TargetUnbindBgids(rt.Ctx, f.Idents, f.Bgids))
}
// 允许手动触发 busi_group 与 target 的关联关系的迁移
func (rt *Router) migrateBg(c *gin.Context) {
ginx.NewRender(c).Data(nil, models.DoMigrateBg(rt.Ctx, rt.Pushgw.BusiGroupLabelKey))
}
func (rt *Router) targetUpdateBgidByService(c *gin.Context) {
var f targetBgidForm
var err error

View File

@@ -173,6 +173,15 @@ func (tc *TargetCacheType) syncTargets() error {
m[lst[i].Ident] = lst[i]
}
tgs, err := models.TargetBusiGroupsGetAll(tc.ctx)
if err != nil {
dumper.PutSyncRecord("targets", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to call TargetBusiGroupsGetAll")
}
for _, t := range lst {
t.GroupIds = tgs[t.Ident]
}
tc.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()

View File

@@ -58,7 +58,8 @@ func MigrateTables(db *gorm.DB) error {
dts := []interface{}{&RecordingRule{}, &AlertRule{}, &AlertSubscribe{}, &AlertMute{},
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.BuiltinComponent{}, &models.NotificaitonRecord{}}
&models.MetricFilter{}, &models.BuiltinComponent{}, &models.NotificaitonRecord{},
&models.TargetBusiGroup{}}
if !columnHasIndex(db, &AlertHisEvent{}, "original_tags") ||
!columnHasIndex(db, &AlertCurEvent{}, "original_tags") {

View File

@@ -1,6 +1,7 @@
package models
import (
"fmt"
"sort"
"strings"
"time"
@@ -16,7 +17,7 @@ import (
type Target struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
GroupObj *BusiGroup `json:"group_obj" gorm:"-"`
GroupObjs []*BusiGroup `json:"group_objs" gorm:"-"`
Ident string `json:"ident"`
Note string `json:"note"`
Tags string `json:"-"`
@@ -36,6 +37,7 @@ type Target struct {
CpuUtil float64 `json:"cpu_util" gorm:"-"`
Arch string `json:"arch" gorm:"-"`
RemoteAddr string `json:"remote_addr" gorm:"-"`
GroupIds []int64 `json:"group_ids" gorm:"-"`
}
func (t *Target) TableName() string {
@@ -43,26 +45,45 @@ func (t *Target) TableName() string {
}
func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
if t.GroupId <= 0 {
return nil
var err error
if len(t.GroupIds) == 0 {
t.GroupIds, err = TargetGroupIdsGetByIdent(ctx, t.Ident)
if err != nil {
return errors.WithMessage(err, "failed to get target gids")
}
t.GroupObjs = make([]*BusiGroup, 0, len(t.GroupIds))
}
bg, has := cache[t.GroupId]
if has {
t.GroupObj = bg
return nil
for _, gid := range t.GroupIds {
bg, has := cache[gid]
if has {
t.GroupObjs = append(t.GroupObjs, bg)
continue
}
bg, err := BusiGroupGetById(ctx, gid)
if err != nil {
return errors.WithMessage(err, "failed to get busi group")
}
t.GroupObjs = append(t.GroupObjs, bg)
cache[gid] = bg
}
bg, err := BusiGroupGetById(ctx, t.GroupId)
if err != nil {
return errors.WithMessage(err, "failed to get busi group")
}
t.GroupObj = bg
cache[t.GroupId] = bg
return nil
}
func (t *Target) MatchGroupId(gid ...int64) bool {
for _, tgId := range t.GroupIds {
for _, id := range gid {
if tgId == id {
return true
}
}
}
return false
}
func (t *Target) AfterFind(tx *gorm.DB) (err error) {
delta := time.Now().Unix() - t.UpdateAt
if delta < 60 {
@@ -101,7 +122,8 @@ type BuildTargetWhereOption func(session *gorm.DB) *gorm.DB
func BuildTargetWhereWithBgids(bgids []int64) BuildTargetWhereOption {
return func(session *gorm.DB) *gorm.DB {
if len(bgids) > 0 {
session = session.Where("group_id in (?)", bgids)
session = session.Joins("join target_busi_group on target.ident = "+
"target_busi_group.target_ident").Where("target_busi_group.group_id in (?)", bgids)
}
return session
}
@@ -457,3 +479,68 @@ func IdentsFilter(ctx *ctx.Context, idents []string, where string, args ...inter
func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}) error {
return DB(ctx).Model(m).Updates(fields).Error
}
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
// 1. 判断是否已经完成迁移
var cnt int64
if err := DB(ctx).Model(&TargetBusiGroup{}).Count(&cnt).Error; err != nil {
fmt.Println("Failed to count target_busi_group, err:", err)
return
}
if cnt > 0 {
fmt.Println("Migration has been completed.")
return
}
DoMigrateBg(ctx, bgLabelKey)
}
func DoMigrateBg(ctx *ctx.Context, bgLabelKey string) error {
// 2. 获取全量 target
targets, err := TargetGetsAll(ctx)
if err != nil {
fmt.Println("Failed to get target, err:", err)
return err
}
// 3. 获取全量 busi_group
bgs, err := BusiGroupGetAll(ctx)
if err != nil {
fmt.Println("Failed to get bg, err:", err)
return err
}
bgById := make(map[int64]*BusiGroup, len(bgs))
for _, bg := range bgs {
bgById[bg.Id] = bg
}
// 4. 如果某 busi_group 有 label将其存至对应的 target tags 中
for _, t := range targets {
if t.GroupId == 0 {
continue
}
err := DB(ctx).Transaction(func(tx *gorm.DB) error {
// 4.1 将 group_id 迁移至关联表
if err := TargetBindBgids(ctx, []string{t.Ident}, []int64{t.GroupId}); err != nil {
return err
}
if err := TargetUpdateBgid(ctx, []string{t.Ident}, 0, false); err != nil {
return err
}
// 4.2 判断该机器是否需要新增 tag
if bg, ok := bgById[t.GroupId]; !ok || bg.LabelEnable == 0 ||
strings.Contains(t.Tags, bgLabelKey+"=") {
return nil
} else {
return t.AddTags(ctx, []string{bgLabelKey + "=" + bg.LabelValue})
}
})
if err != nil {
fmt.Println("Failed to migrate bg, err:", err)
continue
}
}
return nil
}

View File

@@ -0,0 +1,74 @@
package models
import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"gorm.io/gorm/clause"
)
type TargetBusiGroup struct {
Id int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement"`
TargetIdent string `json:"target_ident" gorm:"type:varchar(191);not null;index:idx_target_group,unique,priority:1"`
GroupId int64 `json:"group_id" gorm:"type:bigint;not null;index:idx_target_group,unique,priority:2"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null"`
}
func (t *TargetBusiGroup) TableName() string {
return "target_busi_group"
}
func TargetBusiGroupsGetAll(ctx *ctx.Context) (map[string][]int64, error) {
var lst []*TargetBusiGroup
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
}
tgs := make(map[string][]int64)
for _, tg := range lst {
tgs[tg.TargetIdent] = append(tgs[tg.TargetIdent], tg.GroupId)
}
return tgs, nil
}
func TargetGroupIdsGetByIdent(ctx *ctx.Context, ident string) ([]int64, error) {
var lst []*TargetBusiGroup
err := DB(ctx).Where("target_ident = ?", ident).Find(&lst).Error
if err != nil {
return nil, err
}
groupIds := make([]int64, 0, len(lst))
for _, tg := range lst {
groupIds = append(groupIds, tg.GroupId)
}
return groupIds, nil
}
func TargetBindBgids(ctx *ctx.Context, idents []string, bgids []int64) error {
lst := make([]TargetBusiGroup, 0, len(bgids)*len(idents))
updateAt := time.Now().Unix()
for _, bgid := range bgids {
for _, ident := range idents {
cur := TargetBusiGroup{
TargetIdent: ident,
GroupId: bgid,
UpdateAt: updateAt,
}
lst = append(lst, cur)
}
}
var cl clause.Expression = clause.Insert{Modifier: "ignore"}
switch DB(ctx).Dialector.Name() {
case "sqlite":
cl = clause.Insert{Modifier: "or ignore"}
case "postgres":
cl = clause.OnConflict{DoNothing: true}
}
return DB(ctx).Clauses(cl).CreateInBatches(&lst, 10).Error
}
func TargetUnbindBgids(ctx *ctx.Context, idents []string, bgids []int64) error {
return DB(ctx).Where("target_ident in ? and group_id in ?",
idents, bgids).Delete(&TargetBusiGroup{}).Error
}

View File

@@ -736,7 +736,11 @@ func (u *User) BusiGroups(ctx *ctx.Context, limit int, query string, all ...bool
return lst, nil
}
err = DB(ctx).Order("name").Limit(limit).Where("id=?", t.GroupId).Find(&lst).Error
t.GroupIds, err = TargetGroupIdsGetByIdent(ctx, t.Ident)
if err != nil {
return nil, err
}
err = DB(ctx).Order("name").Limit(limit).Where("id in ?", t.GroupIds).Find(&lst).Error
}
return lst, err
@@ -768,8 +772,12 @@ func (u *User) BusiGroups(ctx *ctx.Context, limit int, query string, all ...bool
return lst, err
}
if t != nil && slice.ContainsInt64(busiGroupIds, t.GroupId) {
err = DB(ctx).Order("name").Limit(limit).Where("id=?", t.GroupId).Find(&lst).Error
t.GroupIds, err = TargetGroupIdsGetByIdent(ctx, t.Ident)
if err != nil {
return nil, err
}
if t != nil && t.MatchGroupId(busiGroupIds...) {
err = DB(ctx).Order("name").Limit(limit).Where("id in ?", t.GroupIds).Find(&lst).Error
}
}

View File

@@ -10,17 +10,18 @@ import (
)
type Pushgw struct {
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int
IdentDropThreshold int
WriteConcurrency int
LabelRewrite bool
ForceUseServerTS bool
DebugSample map[string]string
DropSample []map[string]string
WriterOpt WriterGlobalOpt
Writers []WriterOptions
BusiGroupLabelKey string
IdentMetrics []string
IdentStatsThreshold int
IdentDropThreshold int
WriteConcurrency int
LabelRewrite bool
BusiGroupLabelRewrite bool
ForceUseServerTS bool
DebugSample map[string]string
DropSample []map[string]string
WriterOpt WriterGlobalOpt
Writers []WriterOptions
}
type WriterGlobalOpt struct {

View File

@@ -21,7 +21,9 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
for key, value := range target.TagsMap {
if index, has := labelKeys[key]; has {
// overwrite labels
if rt.Pushgw.LabelRewrite {
if rt.Pushgw.BusiGroupLabelRewrite && key == rt.Pushgw.BusiGroupLabelKey {
pt.Labels[index].Value = value // 此次更新前busigroup 标签没有在页面 tag 中配置
} else if rt.Pushgw.LabelRewrite {
pt.Labels[index].Value = value
}
continue