mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-06 16:08:56 +00:00
Compare commits
1 Commits
dev21
...
alert-add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
138a662988 |
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/alert/naming"
|
||||
"github.com/ccfos/nightingale/v6/alert/process"
|
||||
"github.com/ccfos/nightingale/v6/datasource/commons/eslike"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
@@ -62,6 +63,7 @@ func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcess
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
}
|
||||
eslike.SetEsIndexPatternCacheType(memsto.NewEsIndexPatternCacheType(ctx))
|
||||
|
||||
go scheduler.LoopSyncRules(context.Background())
|
||||
return scheduler
|
||||
|
||||
@@ -611,6 +611,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.GET("/alert-cur-events-del-by-hash", rt.alertCurEventDelByHash)
|
||||
|
||||
service.POST("/center/heartbeat", rt.heartbeat)
|
||||
|
||||
service.GET("/es-index-pattern-list", rt.esIndexPatternGetList)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/araddon/dateparse"
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
@@ -18,18 +19,20 @@ import (
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Index string `json:"index" mapstructure:"index"`
|
||||
Filter string `json:"filter" mapstructure:"filter"`
|
||||
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
|
||||
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
|
||||
DateField string `json:"date_field" mapstructure:"date_field"`
|
||||
Interval int64 `json:"interval" mapstructure:"interval"`
|
||||
Start int64 `json:"start" mapstructure:"start"`
|
||||
End int64 `json:"end" mapstructure:"end"`
|
||||
P int `json:"page" mapstructure:"page"` // 页码
|
||||
Limit int `json:"limit" mapstructure:"limit"` // 每页个数
|
||||
Ascending bool `json:"ascending" mapstructure:"ascending"` // 按照DataField排序
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
|
||||
Index string `json:"index" mapstructure:"index"`
|
||||
IndexPattern string `json:"index_pattern" mapstructure:"index_pattern"`
|
||||
Filter string `json:"filter" mapstructure:"filter"`
|
||||
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
|
||||
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
|
||||
DateField string `json:"date_field" mapstructure:"date_field"`
|
||||
Interval int64 `json:"interval" mapstructure:"interval"`
|
||||
Start int64 `json:"start" mapstructure:"start"`
|
||||
End int64 `json:"end" mapstructure:"end"`
|
||||
P int `json:"page" mapstructure:"page"` // 页码
|
||||
Limit int `json:"limit" mapstructure:"limit"` // 每页个数
|
||||
Ascending bool `json:"ascending" mapstructure:"ascending"` // 按照DataField排序
|
||||
|
||||
Timeout int `json:"timeout" mapstructure:"timeout"`
|
||||
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
|
||||
@@ -307,6 +310,16 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
|
||||
return param, nil
|
||||
}
|
||||
|
||||
var esIndexPatternCache *memsto.EsIndexPatternCacheType
|
||||
|
||||
func SetEsIndexPatternCacheType(c *memsto.EsIndexPatternCacheType) {
|
||||
esIndexPatternCache = c
|
||||
}
|
||||
|
||||
func GetEsIndexPatternCacheType() *memsto.EsIndexPatternCacheType {
|
||||
return esIndexPatternCache
|
||||
}
|
||||
|
||||
func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, search SearchFunc) ([]models.DataResp, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
@@ -329,7 +342,15 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
|
||||
param.DateField = "@timestamp"
|
||||
}
|
||||
|
||||
indexArr := strings.Split(param.Index, ",")
|
||||
var indexArr []string
|
||||
if param.IndexType == "index_pattern" {
|
||||
indexArr = []string{param.IndexPattern}
|
||||
if ip, ok := GetEsIndexPatternCacheType().Get(param.IndexPattern); ok {
|
||||
param.DateField = ip.TimeField
|
||||
}
|
||||
} else {
|
||||
indexArr = strings.Split(param.Index, ",")
|
||||
}
|
||||
q := elastic.NewRangeQuery(param.DateField)
|
||||
now := time.Now().Unix()
|
||||
var start, end int64
|
||||
|
||||
82
memsto/es_index_pattern.go
Normal file
82
memsto/es_index_pattern.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package memsto
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type EsIndexPatternCacheType struct {
|
||||
ctx *ctx.Context
|
||||
|
||||
sync.RWMutex
|
||||
indexPattern map[string]*models.EsIndexPattern // key: name
|
||||
}
|
||||
|
||||
func NewEsIndexPatternCacheType(ctx *ctx.Context) *EsIndexPatternCacheType {
|
||||
ipc := &EsIndexPatternCacheType{
|
||||
ctx: ctx,
|
||||
indexPattern: make(map[string]*models.EsIndexPattern),
|
||||
}
|
||||
|
||||
ipc.SyncEsIndexPattern()
|
||||
return ipc
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) Reset() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
p.indexPattern = make(map[string]*models.EsIndexPattern)
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) Set(m map[string]*models.EsIndexPattern) {
|
||||
p.Lock()
|
||||
p.indexPattern = m
|
||||
p.Unlock()
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) Get(name string) (*models.EsIndexPattern, bool) {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
|
||||
ip, has := p.indexPattern[name]
|
||||
return ip, has
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) SyncEsIndexPattern() {
|
||||
err := p.syncEsIndexPattern()
|
||||
if err != nil {
|
||||
log.Fatalln("failed to sync targets:", err)
|
||||
}
|
||||
|
||||
go p.loopSyncEsIndexPattern()
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) loopSyncEsIndexPattern() {
|
||||
duration := time.Duration(9000) * time.Millisecond
|
||||
for {
|
||||
time.Sleep(duration)
|
||||
if err := p.syncEsIndexPattern(); err != nil {
|
||||
logger.Warning("failed to sync host alert rule targets:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *EsIndexPatternCacheType) syncEsIndexPattern() error {
|
||||
lst, err := models.EsIndexPatternGets(p.ctx, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m := make(map[string]*models.EsIndexPattern, len(lst))
|
||||
for _, p := range lst {
|
||||
m[p.Name] = p
|
||||
}
|
||||
p.Set(m)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@@ -84,6 +85,10 @@ func (feIndexPatten *EsIndexPattern) FE2DB() {
|
||||
}
|
||||
|
||||
func EsIndexPatternGets(ctx *ctx.Context, where string, args ...interface{}) ([]*EsIndexPattern, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*EsIndexPattern](ctx, "/v1/n9e/es-index-pattern-list")
|
||||
return lst, err
|
||||
}
|
||||
var objs []*EsIndexPattern
|
||||
err := DB(ctx).Where(where, args...).Find(&objs).Error
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user