Compare commits

...

2 Commits

Author SHA1 Message Date
ning
433619f9dd code refactor 2024-07-02 16:46:19 +08:00
ning
c83dea0f9f feat:recording rule support cron pattern 2024-07-02 14:50:54 +08:00
5 changed files with 39 additions and 22 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/robfig/cron/v3"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
@@ -19,19 +20,35 @@ type RecordRuleContext struct {
datasourceId int64
quit chan struct{}
scheduler *cron.Cron
rule *models.RecordingRule
promClients *prom.PromClientMap
stats *astats.Stats
}
func NewRecordRuleContext(rule *models.RecordingRule, datasourceId int64, promClients *prom.PromClientMap, writers *writer.WritersType, stats *astats.Stats) *RecordRuleContext {
return &RecordRuleContext{
rrc := &RecordRuleContext{
datasourceId: datasourceId,
quit: make(chan struct{}),
rule: rule,
promClients: promClients,
stats: stats,
}
if rule.CronPattern == "" && rule.PromEvalInterval != 0 {
rule.CronPattern = fmt.Sprintf("@every %ds", rule.PromEvalInterval)
}
rrc.scheduler = cron.New(cron.WithSeconds())
_, err := rrc.scheduler.AddFunc(rule.CronPattern, func() {
rrc.Eval()
})
if err != nil {
logger.Errorf("add cron pattern error: %v", err)
}
return rrc
}
func (rrc *RecordRuleContext) Key() string {
@@ -51,23 +68,7 @@ func (rrc *RecordRuleContext) Prepare() {}
func (rrc *RecordRuleContext) Start() {
logger.Infof("eval:%s started", rrc.Key())
interval := rrc.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
ticker := time.NewTicker(time.Duration(interval) * time.Second)
go func() {
defer ticker.Stop()
for {
select {
case <-rrc.quit:
return
case <-ticker.C:
rrc.Eval()
}
}
}()
rrc.scheduler.Start()
}
func (rrc *RecordRuleContext) Eval() {
@@ -109,5 +110,8 @@ func (rrc *RecordRuleContext) Eval() {
func (rrc *RecordRuleContext) Stop() {
logger.Infof("%s stopped", rrc.Key())
c := rrc.scheduler.Stop()
<-c.Done()
close(rrc.quit)
}

1
go.mod
View File

@@ -85,6 +85,7 @@ require (
github.com/pquerna/cachecontrol v0.1.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/robfig/cron/v3 v3.0.1
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect

2
go.sum
View File

@@ -250,6 +250,8 @@ github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62/go.mod h1:65XQgovT59RWatovFwnwocoUxiI/eENTnOY5GK3STuY=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=

View File

@@ -189,6 +189,7 @@ type AlertMute struct {
type RecordingRule struct {
QueryConfigs string `gorm:"type:text;not null;column:query_configs"` // query_configs
DatasourceIds string `gorm:"column:datasource_ids;type:varchar(255);default:'';comment:datasource ids"`
CronPattern string `gorm:"column:cron_pattern;type:varchar(255);default:'';comment:cron pattern"`
}
type AlertingEngines struct {

View File

@@ -27,9 +27,10 @@ type RecordingRule struct {
QueryConfigs string `json:"-" gorm:"query_configs"` // query_configs
QueryConfigsJson []QueryConfig `json:"query_configs" gorm:"-"` // query_configs for fe
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Note string `json:"note"` // note
CronPattern string `json:"cron_pattern"`
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Note string `json:"note"` // note
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
@@ -68,8 +69,12 @@ func (re *RecordingRule) DB2FE() error {
json.Unmarshal([]byte(re.DatasourceIds), &re.DatasourceIdsJson)
json.Unmarshal([]byte(re.QueryConfigs), &re.QueryConfigsJson)
return nil
if re.CronPattern == "" && re.PromEvalInterval != 0 {
re.CronPattern = fmt.Sprintf("@every %ds", re.PromEvalInterval)
}
return nil
}
func (re *RecordingRule) Verify() error {
@@ -99,6 +104,10 @@ func (re *RecordingRule) Verify() error {
re.PromEvalInterval = 60
}
if re.CronPattern == "" {
re.CronPattern = "@every 60s"
}
re.AppendTags = strings.TrimSpace(re.AppendTags)
rer := strings.Fields(re.AppendTags)
for i := 0; i < len(rer); i++ {