Compare commits

...

1 Commits

Author SHA1 Message Date
flashbo
138a662988 alert rule support index pattern (#2491) 2025-02-26 19:25:26 +08:00
5 changed files with 125 additions and 13 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/naming"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/datasource/commons/eslike"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/prom"
@@ -62,6 +63,7 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess
ctx: ctx,
stats: stats,
}
eslike.SetEsIndexPatternCacheType(memsto.NewEsIndexPatternCacheType(ctx))
go scheduler.LoopSyncRules(context.Background())
return scheduler

View File

@@ -611,6 +611,8 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/alert-cur-events-del-by-hash", rt.alertCurEventDelByHash)
service.POST("/center/heartbeat", rt.heartbeat)
service.GET("/es-index-pattern-list", rt.esIndexPatternGetList)
}
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
@@ -18,18 +19,20 @@ import (
)
type Query struct {
Ref string `json:"ref" mapstructure:"ref"`
Index string `json:"index" mapstructure:"index"`
Filter string `json:"filter" mapstructure:"filter"`
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
DateField string `json:"date_field" mapstructure:"date_field"`
Interval int64 `json:"interval" mapstructure:"interval"`
Start int64 `json:"start" mapstructure:"start"`
End int64 `json:"end" mapstructure:"end"`
P int `json:"page" mapstructure:"page"` // 页码
Limit int `json:"limit" mapstructure:"limit"` // 每页个数
Ascending bool `json:"ascending" mapstructure:"ascending"` // 按照DataField排序
Ref string `json:"ref" mapstructure:"ref"`
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
Index string `json:"index" mapstructure:"index"`
IndexPattern string `json:"index_pattern" mapstructure:"index_pattern"`
Filter string `json:"filter" mapstructure:"filter"`
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
DateField string `json:"date_field" mapstructure:"date_field"`
Interval int64 `json:"interval" mapstructure:"interval"`
Start int64 `json:"start" mapstructure:"start"`
End int64 `json:"end" mapstructure:"end"`
P int `json:"page" mapstructure:"page"` // 页码
Limit int `json:"limit" mapstructure:"limit"` // 每页个数
Ascending bool `json:"ascending" mapstructure:"ascending"` // 按照DataField排序
Timeout int `json:"timeout" mapstructure:"timeout"`
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
@@ -307,6 +310,16 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
return param, nil
}
var esIndexPatternCache *memsto.EsIndexPatternCacheType
func SetEsIndexPatternCacheType(c *memsto.EsIndexPatternCacheType) {
esIndexPatternCache = c
}
func GetEsIndexPatternCacheType() *memsto.EsIndexPatternCacheType {
return esIndexPatternCache
}
func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, search SearchFunc) ([]models.DataResp, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
@@ -329,7 +342,15 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
param.DateField = "@timestamp"
}
indexArr := strings.Split(param.Index, ",")
var indexArr []string
if param.IndexType == "index_pattern" {
indexArr = []string{param.IndexPattern}
if ip, ok := GetEsIndexPatternCacheType().Get(param.IndexPattern); ok {
param.DateField = ip.TimeField
}
} else {
indexArr = strings.Split(param.Index, ",")
}
q := elastic.NewRangeQuery(param.DateField)
now := time.Now().Unix()
var start, end int64

View File

@@ -0,0 +1,82 @@
package memsto
import (
"log"
"sync"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
type EsIndexPatternCacheType struct {
ctx *ctx.Context
sync.RWMutex
indexPattern map[string]*models.EsIndexPattern // key: name
}
func NewEsIndexPatternCacheType(ctx *ctx.Context) *EsIndexPatternCacheType {
ipc := &EsIndexPatternCacheType{
ctx: ctx,
indexPattern: make(map[string]*models.EsIndexPattern),
}
ipc.SyncEsIndexPattern()
return ipc
}
func (p *EsIndexPatternCacheType) Reset() {
p.Lock()
defer p.Unlock()
p.indexPattern = make(map[string]*models.EsIndexPattern)
}
func (p *EsIndexPatternCacheType) Set(m map[string]*models.EsIndexPattern) {
p.Lock()
p.indexPattern = m
p.Unlock()
}
func (p *EsIndexPatternCacheType) Get(name string) (*models.EsIndexPattern, bool) {
p.RLock()
defer p.RUnlock()
ip, has := p.indexPattern[name]
return ip, has
}
func (p *EsIndexPatternCacheType) SyncEsIndexPattern() {
err := p.syncEsIndexPattern()
if err != nil {
log.Fatalln("failed to sync targets:", err)
}
go p.loopSyncEsIndexPattern()
}
func (p *EsIndexPatternCacheType) loopSyncEsIndexPattern() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := p.syncEsIndexPattern(); err != nil {
logger.Warning("failed to sync host alert rule targets:", err)
}
}
}
func (p *EsIndexPatternCacheType) syncEsIndexPattern() error {
lst, err := models.EsIndexPatternGets(p.ctx, "")
if err != nil {
return err
}
m := make(map[string]*models.EsIndexPattern, len(lst))
for _, p := range lst {
m[p.Name] = p
}
p.Set(m)
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
)
@@ -84,6 +85,10 @@ func (feIndexPatten *EsIndexPattern) FE2DB() {
}
func EsIndexPatternGets(ctx *ctx.Context, where string, args ...interface{}) ([]*EsIndexPattern, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*EsIndexPattern](ctx, "/v1/n9e/es-index-pattern-list")
return lst, err
}
var objs []*EsIndexPattern
err := DB(ctx).Where(where, args...).Find(&objs).Error
if err != nil {