Compare commits

...

8 Commits

Author SHA1 Message Date
Ulric Qin
734aa52fb9 modify alerting aggr verify rules 2025-05-22 15:03:58 +08:00
ning
ad5ecf4de4 refactor: update cur event api 2025-05-21 20:30:16 +08:00
ning
11dc745b6c fix: AlertAggrView update 2025-05-21 20:11:32 +08:00
ning
9538b7eed9 fix: AlertAggrView update 2025-05-21 20:05:27 +08:00
ning
05980f0f75 add processor 2025-05-20 18:22:37 +08:00
smx_Morgan
457aeafaa8 feat : add event_Ids to alert-cur-events/list (#2681) 2025-05-20 15:57:13 +08:00
Yening Qin
a8599a11ed refactor: change TimeSpanMuteStrategy (#2687) 2025-05-20 15:52:37 +08:00
Yening Qin
0972e4b0d6 add callback processor (#2685) 2025-05-19 19:45:43 +08:00
17 changed files with 416 additions and 174 deletions

View File

@@ -15,7 +15,6 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/alert/sender"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
@@ -161,7 +160,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
continue
}
var processors []pipeline.Processor
var processors []models.Processor
for _, pipelineConfig := range notifyRule.PipelineConfigs {
if !pipelineConfig.Enable {
continue
@@ -178,14 +177,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
continue
}
for _, p := range eventPipeline.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
if err != nil {
logger.Warningf("notify_id: %d, event:%+v, processor:%+v type not found", notifyRuleId, eventCopy, p)
continue
}
processors = append(processors, processor)
}
processors = append(processors, e.eventProcessorCache.GetProcessorsById(pipelineConfig.PipelineId)...)
}
for _, processor := range processors {
@@ -198,6 +190,11 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
}
}
if eventCopy == nil {
// 如果 eventCopy 为 nil说明 eventCopy 被 processor drop 掉了, 不再发送通知
continue
}
// notify
for i := range notifyRule.NotifyConfigs {
if !NotifyRuleApplicable(&notifyRule.NotifyConfigs[i], eventCopy) {

View File

@@ -44,6 +44,12 @@ func TimeSpanMuteStrategy(rule *models.AlertRule, event *models.AlertCurEvent) b
triggerTime := tm.Format("15:04")
triggerWeek := strconv.Itoa(int(tm.Weekday()))
if rule.EnableDaysOfWeek == "" {
// 如果规则没有配置生效时间,则默认全天生效
return false
}
enableStime := strings.Fields(rule.EnableStime)
enableEtime := strings.Fields(rule.EnableEtime)
enableDaysOfWeek := strings.Split(rule.EnableDaysOfWeek, ";")

View File

@@ -5,7 +5,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []Processor) {
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []models.Processor) {
for _, processor := range processors {
processor.Process(ctx, event)
}

View File

@@ -0,0 +1,105 @@
package callback
import (
"crypto/tls"
"encoding/json"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
type HTTPConfig struct {
URL string `json:"url"`
Method string `json:"method,omitempty"`
Body string `json:"body,omitempty"`
Headers map[string]string `json:"headers"`
AuthUsername string `json:"auth_username"`
AuthPassword string `json:"auth_password"`
Timeout int `json:"timeout"` // 单位:ms
SkipSSLVerify bool `json:"skip_ssl_verify"`
Proxy string `json:"proxy"`
Client *http.Client `json:"-"`
}
// RelabelConfig
type CallbackConfig struct {
HTTPConfig
}
func init() {
models.RegisterProcessor("callback", &CallbackConfig{})
}
func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
result, err := common.InitProcessor[*CallbackConfig](settings)
return result, err
}
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
if c.Client == nil {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
}
if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy)
if err != nil {
logger.Errorf("failed to parse proxy url: %v", err)
} else {
transport.Proxy = http.ProxyURL(proxyURL)
}
}
c.Client = &http.Client{
Timeout: time.Duration(c.Timeout) * time.Millisecond,
Transport: transport,
}
}
headers := make(map[string]string)
headers["Content-Type"] = "application/json"
for k, v := range c.Headers {
headers[k] = v
}
body, err := json.Marshal(event)
if err != nil {
logger.Errorf("failed to marshal event: %v", err)
return
}
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event)
return
}
for k, v := range headers {
req.Header.Set(k, v)
}
if c.AuthUsername != "" && c.AuthPassword != "" {
req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
}
resp, err := c.Client.Do(req)
if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event)
return
}
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event)
return
}
logger.Infof("response body: %s", string(b))
}

View File

@@ -0,0 +1,24 @@
package common
import (
"encoding/json"
)
// InitProcessor 是一个通用的初始化处理器的方法
// 使用泛型简化处理器初始化逻辑
// T 必须是 models.Processor 接口的实现
func InitProcessor[T any](settings interface{}) (T, error) {
var zero T
b, err := json.Marshal(settings)
if err != nil {
return zero, err
}
var result T
err = json.Unmarshal(b, &result)
if err != nil {
return zero, err
}
return result, nil
}

View File

@@ -0,0 +1,57 @@
package eventdrop
import (
"bytes"
"strings"
texttemplate "text/template"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/toolkits/pkg/logger"
)
type EventDropConfig struct {
Content string `json:"content"`
}
func init() {
models.RegisterProcessor("eventdrop", &EventDropConfig{})
}
func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
result, err := common.InitProcessor[*EventDropConfig](settings)
return result, err
}
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
// 在标签过滤和属性过滤都不满足需求时可以使用
// 如果模板执行结果为 true则删除该事件
var defs = []string{
"{{ $event := . }}",
"{{ $labels := .TagsMap }}",
"{{ $value := .TriggerValue }}",
}
text := strings.Join(append(defs, c.Content), "")
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
if err != nil {
logger.Errorf("failed to parse template: %v event: %v", err, event)
return
}
var body bytes.Buffer
if err = tpl.Execute(&body, event); err != nil {
logger.Errorf("failed to execute template: %v event: %v", err, event)
return
}
result := strings.TrimSpace(body.String())
if result == "true" {
event = nil
}
}

View File

@@ -0,0 +1,94 @@
package eventupdate
import (
"crypto/tls"
"encoding/json"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
)
// RelabelConfig
type EventUpdateConfig struct {
callback.HTTPConfig
}
func init() {
models.RegisterProcessor("eventupdate", &EventUpdateConfig{})
}
func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error) {
result, err := common.InitProcessor[*EventUpdateConfig](settings)
return result, err
}
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
if c.Client == nil {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
}
if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy)
if err != nil {
logger.Errorf("failed to parse proxy url: %v", err)
} else {
transport.Proxy = http.ProxyURL(proxyURL)
}
}
c.Client = &http.Client{
Timeout: time.Duration(c.Timeout) * time.Millisecond,
Transport: transport,
}
}
headers := make(map[string]string)
headers["Content-Type"] = "application/json"
for k, v := range c.Headers {
headers[k] = v
}
body, err := json.Marshal(event)
if err != nil {
logger.Errorf("failed to marshal event: %v", err)
return
}
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event)
return
}
for k, v := range headers {
req.Header.Set(k, v)
}
if c.AuthUsername != "" && c.AuthPassword != "" {
req.SetBasicAuth(c.AuthUsername, c.AuthPassword)
}
resp, err := c.Client.Do(req)
if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event)
return
}
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event)
return
}
logger.Infof("response body: %s", string(b))
json.Unmarshal(b, &event)
}

View File

@@ -1,15 +1,15 @@
package relabel
import (
"encoding/json"
"fmt"
"strings"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/prometheus/prometheus/prompb"
)
@@ -19,17 +19,12 @@ type RelabelConfig struct {
}
func init() {
pipeline.RegisterProcessor("relabel", &RelabelConfig{})
models.RegisterProcessor("relabel", &RelabelConfig{})
}
func (r *RelabelConfig) Init(settings interface{}) (pipeline.Processor, error) {
b, err := json.Marshal(settings)
if err != nil {
return nil, err
}
err = json.Unmarshal(b, &r.RelabelConfig)
return r, err
func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
result, err := common.InitProcessor[*RelabelConfig](settings)
return result, err
}
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {

View File

@@ -415,7 +415,6 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/alert-his-events/list", rt.auth(), rt.user(), rt.alertHisEventsList)
pages.DELETE("/alert-cur-events", rt.auth(), rt.user(), rt.perm("/alert-cur-events/del"), rt.alertCurEventDel)
pages.GET("/alert-cur-events/stats", rt.auth(), rt.alertCurEventsStatistics)
pages.GET("/alert-cur-events-datasources", rt.auth(), rt.user(), rt.alertDataSourcesList)
pages.GET("/alert-aggr-views", rt.auth(), rt.alertAggrViewGets)
pages.DELETE("/alert-aggr-views", rt.auth(), rt.user(), rt.alertAggrViewDel)

View File

@@ -72,6 +72,7 @@ func (rt *Router) alertAggrViewPut(c *gin.Context) {
view.Name = f.Name
view.Rule = f.Rule
view.Cate = f.Cate
view.Format = f.Format
if view.CreateBy == 0 {
view.CreateBy = me.Id
}

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/strx"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
@@ -49,7 +50,7 @@ func getUserGroupIds(ctx *gin.Context, rt *Router, myGroups bool) ([]int64, erro
func (rt *Router) alertCurEventsCard(c *gin.Context) {
stime, etime := getTimeRange(c)
severity := ginx.QueryInt(c, "severity", -1)
severity := strx.IdsInt64ForAPI(ginx.QueryStr(c, "severity", ""), ",")
query := ginx.QueryStr(c, "query", "")
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组默认false
@@ -96,7 +97,7 @@ func (rt *Router) alertCurEventsCard(c *gin.Context) {
// 最多获取50000个获取太多也没啥意义
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
cates, 0, query, 50000, 0, gids)
cates, 0, query, 50000, 0, gids, []int64{})
ginx.Dangerous(err)
cardmap := make(map[string]*AlertCard)
@@ -173,13 +174,15 @@ func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
// 列表方式,拉取活跃告警
func (rt *Router) alertCurEventsList(c *gin.Context) {
stime, etime := getTimeRange(c)
severity := ginx.QueryInt(c, "severity", -1)
severity := strx.IdsInt64ForAPI(ginx.QueryStr(c, "severity", ""), ",")
query := ginx.QueryStr(c, "query", "")
limit := ginx.QueryInt(c, "limit", 20)
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组默认false
dsIds := queryDatasourceIds(c)
eventIds := strx.IdsInt64ForAPI(ginx.QueryStr(c, "event_ids", ""), ",")
prod := ginx.QueryStr(c, "prods", "")
if prod == "" {
prod = ginx.QueryStr(c, "rule_prods", "")
@@ -212,11 +215,11 @@ func (rt *Router) alertCurEventsList(c *gin.Context) {
ginx.Dangerous(err)
total, err := models.AlertCurEventTotal(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
cates, ruleId, query, gids)
cates, ruleId, query, gids, eventIds)
ginx.Dangerous(err)
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, dsIds,
cates, ruleId, query, limit, ginx.Offset(c, limit), gids)
cates, ruleId, query, limit, ginx.Offset(c, limit), gids, eventIds)
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)
@@ -260,63 +263,6 @@ func (rt *Router) checkCurEventBusiGroupRWPermission(c *gin.Context, ids []int64
}
}
// 列表方式,拉取活跃告警
func (rt *Router) alertDataSourcesList(c *gin.Context) {
stime, etime := getTimeRange(c)
severity := ginx.QueryInt(c, "severity", -1)
query := ginx.QueryStr(c, "query", "")
myGroups := ginx.QueryBool(c, "my_groups", false) // 是否只看自己组默认false
prod := ginx.QueryStr(c, "prods", "")
if prod == "" {
prod = ginx.QueryStr(c, "rule_prods", "")
}
prods := []string{}
if prod != "" {
prods = strings.Split(prod, ",")
}
cate := ginx.QueryStr(c, "cate", "$all")
cates := []string{}
if cate != "$all" {
cates = strings.Split(cate, ",")
}
ruleId := ginx.QueryInt64(c, "rid", 0)
var gids []int64
var err error
if myGroups {
gids, err = getUserGroupIds(c, rt, myGroups)
ginx.Dangerous(err)
if len(gids) == 0 {
gids = append(gids, -1)
}
}
bgids, err := GetBusinessGroupIds(c, rt.Ctx, rt.Center.EventHistoryGroupView)
ginx.Dangerous(err)
list, err := models.AlertCurEventsGet(rt.Ctx, prods, bgids, stime, etime, severity, []int64{},
cates, ruleId, query, 50000, 0, gids)
ginx.Dangerous(err)
uniqueDsIds := make(map[int64]struct{})
for i := 0; i < len(list); i++ {
uniqueDsIds[list[i].DatasourceId] = struct{}{}
}
dsIds := make([]int64, 0, len(uniqueDsIds))
for id := range uniqueDsIds {
dsIds = append(dsIds, id)
}
dsList, err := models.GetDatasourceInfosByIds(rt.Ctx, dsIds)
ginx.Dangerous(err)
ginx.NewRender(c).Data(dsList, nil)
}
func (rt *Router) alertCurEventGet(c *gin.Context) {
eid := ginx.UrlParamInt64(c, "eid")
event, err := models.AlertCurEventGetById(rt.Ctx, eid)

View File

@@ -4,8 +4,8 @@ import (
"net/http"
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
@@ -139,8 +139,8 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
}
event := hisEvent.ToCur()
for _, p := range f.PipelineConfig.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
for _, p := range f.PipelineConfig.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
}
@@ -153,8 +153,8 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
// 测试事件处理器
func (rt *Router) tryRunEventProcessor(c *gin.Context) {
var f struct {
EventId int64 `json:"event_id"`
ProcessorConfig models.Processor `json:"processor_config"`
EventId int64 `json:"event_id"`
ProcessorConfig models.ProcessorConfig `json:"processor_config"`
}
ginx.BindJSON(c, &f)
@@ -164,7 +164,7 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
}
event := hisEvent.ToCur()
processor, err := pipeline.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
processor, err := models.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor type not found")
}
@@ -199,8 +199,8 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
}
for _, pl := range pipelines {
for _, p := range pl.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
for _, p := range pl.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
}

View File

@@ -20,7 +20,7 @@ type EventProcessorCacheType struct {
stats *Stats
sync.RWMutex
processors map[int64]*models.EventPipeline // key: pipeline id
eventPipelines map[int64]*models.EventPipeline // key: pipeline id
}
func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCacheType {
@@ -29,7 +29,7 @@ func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCache
statLastUpdated: -1,
ctx: ctx,
stats: stats,
processors: make(map[int64]*models.EventPipeline),
eventPipelines: make(map[int64]*models.EventPipeline),
}
epc.SyncEventProcessors()
return epc
@@ -41,7 +41,7 @@ func (epc *EventProcessorCacheType) Reset() {
epc.statTotal = -1
epc.statLastUpdated = -1
epc.processors = make(map[int64]*models.EventPipeline)
epc.eventPipelines = make(map[int64]*models.EventPipeline)
}
func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
@@ -54,7 +54,7 @@ func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total, lastUpdated int64) {
epc.Lock()
epc.processors = m
epc.eventPipelines = m
epc.Unlock()
// only one goroutine used, so no need lock
@@ -65,17 +65,29 @@ func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total
func (epc *EventProcessorCacheType) Get(processorId int64) *models.EventPipeline {
epc.RLock()
defer epc.RUnlock()
return epc.processors[processorId]
return epc.eventPipelines[processorId]
}
func (epc *EventProcessorCacheType) GetProcessorsById(processorId int64) []models.Processor {
epc.RLock()
defer epc.RUnlock()
eventPipeline, ok := epc.eventPipelines[processorId]
if !ok {
return []models.Processor{}
}
return eventPipeline.Processors
}
func (epc *EventProcessorCacheType) GetProcessorIds() []int64 {
epc.RLock()
defer epc.RUnlock()
count := len(epc.processors)
count := len(epc.eventPipelines)
list := make([]int64, 0, count)
for processorId := range epc.processors {
list = append(list, processorId)
for eid := range epc.eventPipelines {
list = append(list, eid)
}
return list
@@ -125,7 +137,18 @@ func (epc *EventProcessorCacheType) syncEventProcessors() error {
m := make(map[int64]*models.EventPipeline)
for i := 0; i < len(lst); i++ {
m[lst[i].ID] = lst[i]
eventPipeline := lst[i]
for _, p := range eventPipeline.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
logger.Warningf("event_pipeline_id: %d, event:%+v, processor:%+v type not found", eventPipeline.ID, eventPipeline, p)
continue
}
eventPipeline.Processors = append(eventPipeline.Processors, processor)
}
m[lst[i].ID] = eventPipeline
}
epc.Set(m, stat.Total, stat.LastUpdated)

View File

@@ -38,33 +38,35 @@ func (v *AlertAggrView) Verify() error {
return errors.New("rule is blank")
}
var validFields = []string{
"cluster",
"group_id",
"group_name",
"rule_id",
"rule_name",
"severity",
"runbook_url",
"target_ident",
"target_note",
}
arr := strings.Split(v.Rule, "::")
for i := 0; i < len(arr); i++ {
pair := strings.Split(arr[i], ":")
if len(pair) != 2 {
return errors.New("rule invalid")
if !strings.Contains(v.Rule, "{{") {
var validFields = []string{
"cluster",
"group_id",
"group_name",
"rule_id",
"rule_name",
"severity",
"runbook_url",
"target_ident",
"target_note",
}
if !(pair[0] == "field" || pair[0] == "tagkey") {
return errors.New("rule invalid")
}
arr := strings.Split(v.Rule, "::")
for i := 0; i < len(arr); i++ {
pair := strings.Split(arr[i], ":")
if len(pair) != 2 {
return errors.New("rule invalid")
}
if pair[0] == "field" {
// 只支持有限的field
if !slice.ContainsString(validFields, pair[1]) {
return fmt.Errorf("unsupported field: %s", pair[1])
if !(pair[0] == "field" || pair[0] == "tagkey") {
return errors.New("rule invalid")
}
if pair[0] == "field" {
// 只支持有限的field
if !slice.ContainsString(validFields, pair[1]) {
return fmt.Errorf("unsupported field: %s", pair[1])
}
}
}
}
@@ -90,7 +92,7 @@ func (v *AlertAggrView) Update(ctx *ctx.Context) error {
}
v.UpdateAt = time.Now().Unix()
return DB(ctx).Model(v).Select("name", "rule", "cate", "update_at", "create_by").Updates(v).Error
return DB(ctx).Model(v).Select("name", "rule", "cate", "format", "update_at", "create_by").Updates(v).Error
}
// AlertAggrViewDel: userid for safe delete

View File

@@ -537,7 +537,7 @@ func (e *AlertCurEvent) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*User
}
func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
severity int, dsIds []int64, cates []string, ruleId int64, query string, myGroups []int64) (int64, error) {
severity []int64, dsIds []int64, cates []string, ruleId int64, query string, myGroups []int64, eventIds []int64) (int64, error) {
session := DB(ctx).Model(&AlertCurEvent{})
if stime != 0 && etime != 0 {
session = session.Where("trigger_time between ? and ?", stime, etime)
@@ -550,8 +550,8 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
session = session.Where("group_id in ?", bgids)
}
if severity >= 0 {
session = session.Where("severity = ?", severity)
if len(severity) > 0 {
session = session.Where("severity in ?", severity)
}
if len(dsIds) > 0 {
@@ -570,6 +570,9 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
session = session.Where("group_id in ?", myGroups)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -582,7 +585,7 @@ func AlertCurEventTotal(ctx *ctx.Context, prods []string, bgids []int64, stime,
}
func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, etime int64,
severity int, dsIds []int64, cates []string, ruleId int64, query string, limit, offset int, myGroups []int64) (
severity []int64, dsIds []int64, cates []string, ruleId int64, query string, limit, offset int, myGroups []int64, eventIds []int64) (
[]AlertCurEvent, error) {
session := DB(ctx).Model(&AlertCurEvent{})
@@ -597,8 +600,8 @@ func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, e
session = session.Where("group_id in ?", bgids)
}
if severity >= 0 {
session = session.Where("severity = ?", severity)
if len(severity) > 0 {
session = session.Where("severity in ?", severity)
}
if len(dsIds) > 0 {
@@ -615,6 +618,9 @@ func AlertCurEventsGet(ctx *ctx.Context, prods []string, bgids []int64, stime, e
if len(myGroups) > 0 {
session = session.Where("group_id in ?", myGroups)
}
if len(eventIds) > 0 {
session = session.Where("id in ?", eventIds)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {

View File

@@ -11,22 +11,24 @@ import (
// EventPipeline 事件Pipeline模型
type EventPipeline struct {
ID int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name" gorm:"type:varchar(128)"`
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
TeamNames []string `json:"team_names" gorm:"-"`
Description string `json:"description" gorm:"type:varchar(255)"`
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
Processors []Processor `json:"processors" gorm:"type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
ID int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name" gorm:"type:varchar(128)"`
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
TeamNames []string `json:"team_names" gorm:"-"`
Description string `json:"description" gorm:"type:varchar(255)"`
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
ProcessorConfigs []ProcessorConfig `json:"processors" gorm:"type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
Processors []Processor `json:"-" gorm:"-"`
}
type Processor struct {
type ProcessorConfig struct {
Typ string `json:"typ"`
Config interface{} `json:"config"`
}
@@ -44,22 +46,20 @@ func (e *EventPipeline) Verify() error {
return errors.New("team_ids cannot be empty")
}
return nil
}
func (e *EventPipeline) DB2FE() {
if e.TeamIds == nil {
if len(e.TeamIds) == 0 {
e.TeamIds = make([]int64, 0)
}
if e.LabelFilters == nil {
if len(e.LabelFilters) == 0 {
e.LabelFilters = make([]TagFilter, 0)
}
if e.AttrFilters == nil {
if len(e.AttrFilters) == 0 {
e.AttrFilters = make([]TagFilter, 0)
}
if e.Processors == nil {
e.Processors = make([]Processor, 0)
if len(e.ProcessorConfigs) == 0 {
e.ProcessorConfigs = make([]ProcessorConfig, 0)
}
return nil
}
// CreateEventPipeline 创建事件Pipeline
@@ -74,7 +74,7 @@ func GetEventPipeline(ctx *ctx.Context, id int64) (*EventPipeline, error) {
if err != nil {
return nil, err
}
pipeline.DB2FE()
pipeline.Verify()
return &pipeline, nil
}
@@ -108,7 +108,7 @@ func ListEventPipelines(ctx *ctx.Context) ([]*EventPipeline, error) {
}
for _, p := range pipelines {
p.DB2FE()
p.Verify()
}
return pipelines, nil

View File

@@ -1,33 +1,21 @@
package pipeline
package models
import (
"fmt"
"strings"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
// Processor 是处理器接口,所有处理器类型都需要实现此接口
type Processor interface {
Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *models.AlertCurEvent) // 处理告警事件
Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *AlertCurEvent) // 处理告警事件
}
// NewProcessorFn 创建处理器的函数类型
type NewProcessorFn func(settings interface{}) (Processor, error)
// 处理器注册表,存储各种类型处理器的构造函数
var processorRegister = map[string]NewProcessorFn{}
// // ProcessorTypes 存储所有支持的处理器类型
// var Processors map[int64]models.Processor
// func init() {
// Processors = make(map[int64]models.Processor)
// }
// RegisterProcessor 注册处理器类型
func RegisterProcessor(typ string, p Processor) {
if _, found := processorRegister[typ]; found {
return
@@ -35,7 +23,6 @@ func RegisterProcessor(typ string, p Processor) {
processorRegister[typ] = p.Init
}
// GetProcessorByType 根据类型获取处理器实例
func GetProcessorByType(typ string, settings interface{}) (Processor, error) {
typ = strings.TrimSpace(typ)
fn, found := processorRegister[typ]