mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-10 18:08:58 +00:00
Compare commits
8 Commits
dev22
...
events-agg
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
734aa52fb9 | ||
|
|
ad5ecf4de4 | ||
|
|
11dc745b6c | ||
|
|
9538b7eed9 | ||
|
|
05980f0f75 | ||
|
|
457aeafaa8 | ||
|
|
a8599a11ed | ||
|
|
0972e4b0d6 |
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/alert/sender"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
@@ -161,7 +160,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
continue
|
||||
}
|
||||
|
||||
var processors []pipeline.Processor
|
||||
var processors []models.Processor
|
||||
for _, pipelineConfig := range notifyRule.PipelineConfigs {
|
||||
if !pipelineConfig.Enable {
|
||||
continue
|
||||
@@ -178,14 +177,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, p := range eventPipeline.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
logger.Warningf("notify_id: %d, event:%+v, processor:%+v type not found", notifyRuleId, eventCopy, p)
|
||||
continue
|
||||
}
|
||||
processors = append(processors, processor)
|
||||
}
|
||||
processors = append(processors, e.eventProcessorCache.GetProcessorsById(pipelineConfig.PipelineId)...)
|
||||
}
|
||||
|
||||
for _, processor := range processors {
|
||||
@@ -198,6 +190,11 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
}
|
||||
}
|
||||
|
||||
if eventCopy == nil {
|
||||
// 如果 eventCopy 为 nil,说明 eventCopy 被 processor drop 掉了, 不再发送通知
|
||||
continue
|
||||
}
|
||||
|
||||
// notify
|
||||
for i := range notifyRule.NotifyConfigs {
|
||||
if !NotifyRuleApplicable(¬ifyRule.NotifyConfigs[i], eventCopy) {
|
||||
|
||||
@@ -44,6 +44,12 @@ func TimeSpanMuteStrategy(rule *models.AlertRule, event *models.AlertCurEvent) b
|
||||
triggerTime := tm.Format("15:04")
|
||||
triggerWeek := strconv.Itoa(int(tm.Weekday()))
|
||||
|
||||
if rule.EnableDaysOfWeek == "" {
|
||||
// 如果规则没有配置生效时间,则默认全天生效
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
enableStime := strings.Fields(rule.EnableStime)
|
||||
enableEtime := strings.Fields(rule.EnableEtime)
|
||||
enableDaysOfWeek := strings.Split(rule.EnableDaysOfWeek, ";")
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []Processor) {
|
||||
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []models.Processor) {
|
||||
for _, processor := range processors {
|
||||
processor.Process(ctx, event)
|
||||
}
|
||||
|
||||
105
alert/pipeline/processor/callback/callback.go
Normal file
105
alert/pipeline/processor/callback/callback.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package callback
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type HTTPConfig struct {
|
||||
URL string `json:"url"`
|
||||
Method string `json:"method,omitempty"`
|
||||
Body string `json:"body,omitempty"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
AuthUsername string `json:"auth_username"`
|
||||
AuthPassword string `json:"auth_password"`
|
||||
Timeout int `json:"timeout"` // 单位:ms
|
||||
SkipSSLVerify bool `json:"skip_ssl_verify"`
|
||||
Proxy string `json:"proxy"`
|
||||
Client *http.Client `json:"-"`
|
||||
}
|
||||
|
||||
// RelabelConfig
|
||||
type CallbackConfig struct {
|
||||
HTTPConfig
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("callback", &CallbackConfig{})
|
||||
}
|
||||
|
||||
func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*CallbackConfig](settings)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
}
|
||||
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to parse proxy url: %v", err)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
}
|
||||
|
||||
c.Client = &http.Client{
|
||||
Timeout: time.Duration(c.Timeout) * time.Millisecond,
|
||||
Transport: transport,
|
||||
}
|
||||
}
|
||||
|
||||
headers := make(map[string]string)
|
||||
headers["Content-Type"] = "application/json"
|
||||
for k, v := range c.Headers {
|
||||
headers[k] = v
|
||||
}
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal event: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
if c.AuthUsername != "" && c.AuthPassword != "" {
|
||||
req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
|
||||
}
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send request: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response body: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("response body: %s", string(b))
|
||||
}
|
||||
24
alert/pipeline/processor/common/common.go
Normal file
24
alert/pipeline/processor/common/common.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// InitProcessor 是一个通用的初始化处理器的方法
|
||||
// 使用泛型简化处理器初始化逻辑
|
||||
// T 必须是 models.Processor 接口的实现
|
||||
func InitProcessor[T any](settings interface{}) (T, error) {
|
||||
var zero T
|
||||
b, err := json.Marshal(settings)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
|
||||
var result T
|
||||
err = json.Unmarshal(b, &result)
|
||||
if err != nil {
|
||||
return zero, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
57
alert/pipeline/processor/eventdrop/event_drop.go
Normal file
57
alert/pipeline/processor/eventdrop/event_drop.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package eventdrop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
texttemplate "text/template"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type EventDropConfig struct {
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("eventdrop", &EventDropConfig{})
|
||||
}
|
||||
|
||||
func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*EventDropConfig](settings)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
|
||||
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
|
||||
// 在标签过滤和属性过滤都不满足需求时可以使用
|
||||
// 如果模板执行结果为 true,则删除该事件
|
||||
|
||||
var defs = []string{
|
||||
"{{ $event := . }}",
|
||||
"{{ $labels := .TagsMap }}",
|
||||
"{{ $value := .TriggerValue }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Content), "")
|
||||
|
||||
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to parse template: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
if err = tpl.Execute(&body, event); err != nil {
|
||||
logger.Errorf("failed to execute template: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
result := strings.TrimSpace(body.String())
|
||||
if result == "true" {
|
||||
event = nil
|
||||
}
|
||||
}
|
||||
94
alert/pipeline/processor/eventupdate/event_update.go
Normal file
94
alert/pipeline/processor/eventupdate/event_update.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package eventupdate
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// RelabelConfig
|
||||
type EventUpdateConfig struct {
|
||||
callback.HTTPConfig
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("eventupdate", &EventUpdateConfig{})
|
||||
}
|
||||
|
||||
func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*EventUpdateConfig](settings)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
}
|
||||
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to parse proxy url: %v", err)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
}
|
||||
|
||||
c.Client = &http.Client{
|
||||
Timeout: time.Duration(c.Timeout) * time.Millisecond,
|
||||
Transport: transport,
|
||||
}
|
||||
}
|
||||
|
||||
headers := make(map[string]string)
|
||||
headers["Content-Type"] = "application/json"
|
||||
for k, v := range c.Headers {
|
||||
headers[k] = v
|
||||
}
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal event: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
if c.AuthUsername != "" && c.AuthPassword != "" {
|
||||
req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
|
||||
}
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send request: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response body: %v event: %v", err, event)
|
||||
return
|
||||
}
|
||||
logger.Infof("response body: %s", string(b))
|
||||
|
||||
json.Unmarshal(b, &event)
|
||||
}
|
||||
@@ -1,15 +1,15 @@
|
||||
package relabel
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
@@ -19,17 +19,12 @@ type RelabelConfig struct {
|
||||
}
|
||||
|
||||
func init() {
|
||||
pipeline.RegisterProcessor("relabel", &RelabelConfig{})
|
||||
models.RegisterProcessor("relabel", &RelabelConfig{})
|
||||
}
|
||||
|
||||
func (r *RelabelConfig) Init(settings interface{}) (pipeline.Processor, error) {
|
||||
b, err := json.Marshal(settings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(b, &r.RelabelConfig)
|
||||
return r, err
|
||||
func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*RelabelConfig](settings)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
|
||||
|
||||
@@ -415,7 +415,6 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/alert-his-events/list", rt.auth(), rt.user(), rt.alertHisEventsList)
|
||||
pages.DELETE("/alert-cur-events", rt.auth(), rt.user(), rt.perm("/alert-cur-events/del"), rt.alertCurEventDel)
|
||||
pages.GET("/alert-cur-events/stats", rt.auth(), rt.alertCurEventsStatistics)
|
||||
pages.GET("/alert-cur-events-datasources", rt.auth(), rt.user(), rt.alertDataSourcesList)
|
||||
|
||||
pages.GET("/alert-aggr-views", rt.auth(), rt.alertAggrViewGets)
|
||||
pages.DELETE("/alert-aggr-views", rt.auth(), rt.user(), rt.alertAggrViewDel)
|
||||
|
||||
@@ -72,6 +72,7 @@ func (rt *Router) alertAggrViewPut(c *gin.Context) {
|
||||
view.Name = f.Name
|
||||
view.Rule = f.Rule
|
||||
view.Cate = f.Cate
|
||||
view.Format = f.Format
|
||||
if view.CreateBy == 0 {
|
||||
view.CreateBy = me.Id
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/strx"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
@@ -49,7 +50,7 @@ func getUserGroupIds(ctx *gin.Context, rt *Router, myGroups bool) ([]int64, erro
|
||||
|
||||
func (rt *Router) alertCurEventsCard(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
severity := ginx.QueryInt(c, "severity", -1)
|
||||
severity := strx.IdsInt64ForAPI(ginx.QueryStr(c, "severity", ""), ",")
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组,默认false
|
||||
|
||||
@@ -96,7 +97,7 @@ func (rt *Router) alertCurEventsCard(c *gin.Context) {
|
||||
|
||||
// 最多获取50000个,获取太多也没啥意义
|
||||
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
|
||||
cates, 0, query, 50000, 0, gids)
|
||||
cates, 0, query, 50000, 0, gids, []int64{})
|
||||
ginx.Dangerous(err)
|
||||
|
||||
cardmap := make(map[string]*AlertCard)
|
||||
@@ -173,13 +174,15 @@ func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
|
||||
// 列表方式,拉取活跃告警
|
||||
func (rt *Router) alertCurEventsList(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
severity := ginx.QueryInt(c, "severity", -1)
|
||||
severity := strx.IdsInt64ForAPI(ginx.QueryStr(c, "severity", ""), ",")
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组,默认false
|
||||
|
||||
dsIds := queryDatasourceIds(c)
|
||||
|
||||
eventIds := strx.IdsInt64ForAPI(ginx.QueryStr(c, "event_ids", ""), ",")
|
||||
|
||||
prod := ginx.QueryStr(c, "prods", "")
|
||||
if prod == "" {
|
||||
prod = ginx.QueryStr(c, "rule_prods", "")
|
||||
@@ -212,11 +215,11 @@ func (rt *Router) alertCurEventsList(c *gin.Context) {
|
||||
ginx.Dangerous(err)
|
||||
|
||||
total, err := models.AlertCurEventTotal(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
|
||||
cates, ruleId, query, gids)
|
||||
cates, ruleId, query, gids, eventIds)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
|
||||
cates, ruleId, query, limit, ginx.Offset(c, limit), gids)
|
||||
cates, ruleId, query, limit, ginx.Offset(c, limit), gids, eventIds)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
cache := make(map[int64]*models.UserGroup)
|
||||
@@ -260,63 +263,6 @@ func (rt *Router) checkCurEventBusiGroupRWPermission(c *gin.Context, ids []int64
|
||||
}
|
||||
}
|
||||
|
||||
// 列表方式,拉取活跃告警
|
||||
func (rt *Router) alertDataSourcesList(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
severity := ginx.QueryInt(c, "severity", -1)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组,默认false
|
||||
|
||||
prod := ginx.QueryStr(c, "prods", "")
|
||||
if prod == "" {
|
||||
prod = ginx.QueryStr(c, "rule_prods", "")
|
||||
}
|
||||
|
||||
prods := []string{}
|
||||
if prod != "" {
|
||||
prods = strings.Split(prod, ",")
|
||||
}
|
||||
|
||||
cate := ginx.QueryStr(c, "cate", "$all")
|
||||
cates := []string{}
|
||||
if cate != "$all" {
|
||||
cates = strings.Split(cate, ",")
|
||||
}
|
||||
|
||||
ruleId := ginx.QueryInt64(c, "rid", 0)
|
||||
var gids []int64
|
||||
var err error
|
||||
if myGroups {
|
||||
gids, err = getUserGroupIds(c, rt, myGroups)
|
||||
ginx.Dangerous(err)
|
||||
if len(gids) == 0 {
|
||||
gids = append(gids, -1)
|
||||
}
|
||||
}
|
||||
|
||||
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, []int64{},
|
||||
cates, ruleId, query, 50000, 0, gids)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
uniqueDsIds := make(map[int64]struct{})
|
||||
|
||||
for i := 0; i < len(list); i++ {
|
||||
uniqueDsIds[list[i].DatasourceId] = struct{}{}
|
||||
}
|
||||
|
||||
dsIds := make([]int64, 0, len(uniqueDsIds))
|
||||
for id := range uniqueDsIds {
|
||||
dsIds = append(dsIds, id)
|
||||
}
|
||||
dsList, err := models.GetDatasourceInfosByIds(rt.Ctx, dsIds)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(dsList, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertCurEventGet(c *gin.Context) {
|
||||
eid := ginx.UrlParamInt64(c, "eid")
|
||||
event, err := models.AlertCurEventGetById(rt.Ctx, eid)
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
@@ -139,8 +139,8 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
for _, p := range f.PipelineConfig.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
for _, p := range f.PipelineConfig.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
|
||||
}
|
||||
@@ -153,8 +153,8 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
// 测试事件处理器
|
||||
func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
ProcessorConfig models.Processor `json:"processor_config"`
|
||||
EventId int64 `json:"event_id"`
|
||||
ProcessorConfig models.ProcessorConfig `json:"processor_config"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
@@ -164,7 +164,7 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
processor, err := pipeline.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
|
||||
processor, err := models.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor type not found")
|
||||
}
|
||||
@@ -199,8 +199,8 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
}
|
||||
|
||||
for _, pl := range pipelines {
|
||||
for _, p := range pl.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
for _, p := range pl.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ type EventProcessorCacheType struct {
|
||||
stats *Stats
|
||||
|
||||
sync.RWMutex
|
||||
processors map[int64]*models.EventPipeline // key: pipeline id
|
||||
eventPipelines map[int64]*models.EventPipeline // key: pipeline id
|
||||
}
|
||||
|
||||
func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCacheType {
|
||||
@@ -29,7 +29,7 @@ func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCache
|
||||
statLastUpdated: -1,
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
processors: make(map[int64]*models.EventPipeline),
|
||||
eventPipelines: make(map[int64]*models.EventPipeline),
|
||||
}
|
||||
epc.SyncEventProcessors()
|
||||
return epc
|
||||
@@ -41,7 +41,7 @@ func (epc *EventProcessorCacheType) Reset() {
|
||||
|
||||
epc.statTotal = -1
|
||||
epc.statLastUpdated = -1
|
||||
epc.processors = make(map[int64]*models.EventPipeline)
|
||||
epc.eventPipelines = make(map[int64]*models.EventPipeline)
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
@@ -54,7 +54,7 @@ func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
|
||||
func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total, lastUpdated int64) {
|
||||
epc.Lock()
|
||||
epc.processors = m
|
||||
epc.eventPipelines = m
|
||||
epc.Unlock()
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
@@ -65,17 +65,29 @@ func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total
|
||||
func (epc *EventProcessorCacheType) Get(processorId int64) *models.EventPipeline {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
return epc.processors[processorId]
|
||||
return epc.eventPipelines[processorId]
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) GetProcessorsById(processorId int64) []models.Processor {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
|
||||
eventPipeline, ok := epc.eventPipelines[processorId]
|
||||
if !ok {
|
||||
return []models.Processor{}
|
||||
}
|
||||
|
||||
return eventPipeline.Processors
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) GetProcessorIds() []int64 {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
|
||||
count := len(epc.processors)
|
||||
count := len(epc.eventPipelines)
|
||||
list := make([]int64, 0, count)
|
||||
for processorId := range epc.processors {
|
||||
list = append(list, processorId)
|
||||
for eid := range epc.eventPipelines {
|
||||
list = append(list, eid)
|
||||
}
|
||||
|
||||
return list
|
||||
@@ -125,7 +137,18 @@ func (epc *EventProcessorCacheType) syncEventProcessors() error {
|
||||
|
||||
m := make(map[int64]*models.EventPipeline)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].ID] = lst[i]
|
||||
eventPipeline := lst[i]
|
||||
for _, p := range eventPipeline.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
logger.Warningf("event_pipeline_id: %d, event:%+v, processor:%+v type not found", eventPipeline.ID, eventPipeline, p)
|
||||
continue
|
||||
}
|
||||
|
||||
eventPipeline.Processors = append(eventPipeline.Processors, processor)
|
||||
}
|
||||
|
||||
m[lst[i].ID] = eventPipeline
|
||||
}
|
||||
|
||||
epc.Set(m, stat.Total, stat.LastUpdated)
|
||||
|
||||
@@ -38,33 +38,35 @@ func (v *AlertAggrView) Verify() error {
|
||||
return errors.New("rule is blank")
|
||||
}
|
||||
|
||||
var validFields = []string{
|
||||
"cluster",
|
||||
"group_id",
|
||||
"group_name",
|
||||
"rule_id",
|
||||
"rule_name",
|
||||
"severity",
|
||||
"runbook_url",
|
||||
"target_ident",
|
||||
"target_note",
|
||||
}
|
||||
|
||||
arr := strings.Split(v.Rule, "::")
|
||||
for i := 0; i < len(arr); i++ {
|
||||
pair := strings.Split(arr[i], ":")
|
||||
if len(pair) != 2 {
|
||||
return errors.New("rule invalid")
|
||||
if !strings.Contains(v.Rule, "{{") {
|
||||
var validFields = []string{
|
||||
"cluster",
|
||||
"group_id",
|
||||
"group_name",
|
||||
"rule_id",
|
||||
"rule_name",
|
||||
"severity",
|
||||
"runbook_url",
|
||||
"target_ident",
|
||||
"target_note",
|
||||
}
|
||||
|
||||
if !(pair[0] == "field" || pair[0] == "tagkey") {
|
||||
return errors.New("rule invalid")
|
||||
}
|
||||
arr := strings.Split(v.Rule, "::")
|
||||
for i := 0; i < len(arr); i++ {
|
||||
pair := strings.Split(arr[i], ":")
|
||||
if len(pair) != 2 {
|
||||
return errors.New("rule invalid")
|
||||
}
|
||||
|
||||
if pair[0] == "field" {
|
||||
// 只支持有限的field
|
||||
if !slice.ContainsString(validFields, pair[1]) {
|
||||
return fmt.Errorf("unsupported field: %s", pair[1])
|
||||
if !(pair[0] == "field" || pair[0] == "tagkey") {
|
||||
return errors.New("rule invalid")
|
||||
}
|
||||
|
||||
if pair[0] == "field" {
|
||||
// 只支持有限的field
|
||||
if !slice.ContainsString(validFields, pair[1]) {
|
||||
return fmt.Errorf("unsupported field: %s", pair[1])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -90,7 +92,7 @@ func (v *AlertAggrView) Update(ctx *ctx.Context) error {
|
||||
}
|
||||
v.UpdateAt = time.Now().Unix()
|
||||
|
||||
return DB(ctx).Model(v).Select("name", "rule", "cate", "update_at", "create_by").Updates(v).Error
|
||||
return DB(ctx).Model(v).Select("name", "rule", "cate", "format", "update_at", "create_by").Updates(v).Error
|
||||
}
|
||||
|
||||
// AlertAggrViewDel: userid for safe delete
|
||||
|
||||
@@ -537,7 +537,7 @@ func (e *AlertCurEvent) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*User
|
||||
}
|
||||
|
||||
func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
|
||||
severity int, dsIds []int64, cates []string, ruleId int64, query string, myGroups []int64) (int64, error) {
|
||||
severity []int64, dsIds []int64, cates []string, ruleId int64, query string, myGroups []int64, eventIds []int64) (int64, error) {
|
||||
session := DB(ctx).Model(&AlertCurEvent{})
|
||||
if stime != 0 && etime != 0 {
|
||||
session = session.Where("trigger_time between ? and ?", stime, etime)
|
||||
@@ -550,8 +550,8 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
|
||||
session = session.Where("group_id in ?", bgids)
|
||||
}
|
||||
|
||||
if severity >= 0 {
|
||||
session = session.Where("severity = ?", severity)
|
||||
if len(severity) > 0 {
|
||||
session = session.Where("severity in ?", severity)
|
||||
}
|
||||
|
||||
if len(dsIds) > 0 {
|
||||
@@ -570,6 +570,9 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
|
||||
session = session.Where("group_id in ?", myGroups)
|
||||
}
|
||||
|
||||
if len(eventIds) > 0 {
|
||||
session = session.Where("id in ?", eventIds)
|
||||
}
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
@@ -582,7 +585,7 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
|
||||
}
|
||||
|
||||
func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
|
||||
severity int, dsIds []int64, cates []string, ruleId int64, query string, limit, offset int, myGroups []int64) (
|
||||
severity []int64, dsIds []int64, cates []string, ruleId int64, query string, limit, offset int, myGroups []int64, eventIds []int64) (
|
||||
[]AlertCurEvent, error) {
|
||||
session := DB(ctx).Model(&AlertCurEvent{})
|
||||
|
||||
@@ -597,8 +600,8 @@ func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, e
|
||||
session = session.Where("group_id in ?", bgids)
|
||||
}
|
||||
|
||||
if severity >= 0 {
|
||||
session = session.Where("severity = ?", severity)
|
||||
if len(severity) > 0 {
|
||||
session = session.Where("severity in ?", severity)
|
||||
}
|
||||
|
||||
if len(dsIds) > 0 {
|
||||
@@ -615,6 +618,9 @@ func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, e
|
||||
if len(myGroups) > 0 {
|
||||
session = session.Where("group_id in ?", myGroups)
|
||||
}
|
||||
if len(eventIds) > 0 {
|
||||
session = session.Where("id in ?", eventIds)
|
||||
}
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
for i := 0; i < len(arr); i++ {
|
||||
|
||||
@@ -11,22 +11,24 @@ import (
|
||||
|
||||
// EventPipeline 事件Pipeline模型
|
||||
type EventPipeline struct {
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name" gorm:"type:varchar(128)"`
|
||||
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
|
||||
TeamNames []string `json:"team_names" gorm:"-"`
|
||||
Description string `json:"description" gorm:"type:varchar(255)"`
|
||||
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
|
||||
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
|
||||
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
|
||||
Processors []Processor `json:"processors" gorm:"type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name" gorm:"type:varchar(128)"`
|
||||
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
|
||||
TeamNames []string `json:"team_names" gorm:"-"`
|
||||
Description string `json:"description" gorm:"type:varchar(255)"`
|
||||
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
|
||||
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
|
||||
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
|
||||
ProcessorConfigs []ProcessorConfig `json:"processors" gorm:"type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
|
||||
Processors []Processor `json:"-" gorm:"-"`
|
||||
}
|
||||
|
||||
type Processor struct {
|
||||
type ProcessorConfig struct {
|
||||
Typ string `json:"typ"`
|
||||
Config interface{} `json:"config"`
|
||||
}
|
||||
@@ -44,22 +46,20 @@ func (e *EventPipeline) Verify() error {
|
||||
return errors.New("team_ids cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EventPipeline) DB2FE() {
|
||||
if e.TeamIds == nil {
|
||||
if len(e.TeamIds) == 0 {
|
||||
e.TeamIds = make([]int64, 0)
|
||||
}
|
||||
if e.LabelFilters == nil {
|
||||
if len(e.LabelFilters) == 0 {
|
||||
e.LabelFilters = make([]TagFilter, 0)
|
||||
}
|
||||
if e.AttrFilters == nil {
|
||||
if len(e.AttrFilters) == 0 {
|
||||
e.AttrFilters = make([]TagFilter, 0)
|
||||
}
|
||||
if e.Processors == nil {
|
||||
e.Processors = make([]Processor, 0)
|
||||
if len(e.ProcessorConfigs) == 0 {
|
||||
e.ProcessorConfigs = make([]ProcessorConfig, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateEventPipeline 创建事件Pipeline
|
||||
@@ -74,7 +74,7 @@ func GetEventPipeline(ctx *ctx.Context, id int64) (*EventPipeline, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pipeline.DB2FE()
|
||||
pipeline.Verify()
|
||||
return &pipeline, nil
|
||||
}
|
||||
|
||||
@@ -108,7 +108,7 @@ func ListEventPipelines(ctx *ctx.Context) ([]*EventPipeline, error) {
|
||||
}
|
||||
|
||||
for _, p := range pipelines {
|
||||
p.DB2FE()
|
||||
p.Verify()
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
|
||||
@@ -1,33 +1,21 @@
|
||||
package pipeline
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
// Processor 是处理器接口,所有处理器类型都需要实现此接口
|
||||
type Processor interface {
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *models.AlertCurEvent) // 处理告警事件
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *AlertCurEvent) // 处理告警事件
|
||||
}
|
||||
|
||||
// NewProcessorFn 创建处理器的函数类型
|
||||
type NewProcessorFn func(settings interface{}) (Processor, error)
|
||||
|
||||
// 处理器注册表,存储各种类型处理器的构造函数
|
||||
var processorRegister = map[string]NewProcessorFn{}
|
||||
|
||||
// // ProcessorTypes 存储所有支持的处理器类型
|
||||
// var Processors map[int64]models.Processor
|
||||
|
||||
// func init() {
|
||||
// Processors = make(map[int64]models.Processor)
|
||||
// }
|
||||
|
||||
// RegisterProcessor 注册处理器类型
|
||||
func RegisterProcessor(typ string, p Processor) {
|
||||
if _, found := processorRegister[typ]; found {
|
||||
return
|
||||
@@ -35,7 +23,6 @@ func RegisterProcessor(typ string, p Processor) {
|
||||
processorRegister[typ] = p.Init
|
||||
}
|
||||
|
||||
// GetProcessorByType 根据类型获取处理器实例
|
||||
func GetProcessorByType(typ string, settings interface{}) (Processor, error) {
|
||||
typ = strings.TrimSpace(typ)
|
||||
fn, found := processorRegister[typ]
|
||||
Reference in New Issue
Block a user