Compare commits

...

1 Commits

Author SHA1 Message Date
ning
f6a857f030 refactor: optimize event processor 2025-06-12 16:42:46 +08:00
9 changed files with 71 additions and 62 deletions

View File

@@ -185,8 +185,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
for _, processor := range processors { for _, processor := range processors {
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor) logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
eventCopy = processor.Process(e.ctx, eventCopy) eventCopy, res, err := processor.Process(e.ctx, eventCopy)
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor) logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
if eventCopy == nil { if eventCopy == nil {
logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor) logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
break break

View File

@@ -17,7 +17,6 @@ import (
"github.com/ccfos/nightingale/v6/models" "github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx" "github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/tplx" "github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/toolkits/pkg/logger"
) )
const ( const (
@@ -55,26 +54,23 @@ func (c *AISummaryConfig) Init(settings interface{}) (models.Processor, error) {
return result, err return result, err
} }
func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent { func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil { if c.Client == nil {
if err := c.initHTTPClient(); err != nil { if err := c.initHTTPClient(); err != nil {
logger.Errorf("failed to initialize HTTP client: %v", err) return event, "", fmt.Errorf("failed to initialize HTTP client: %v processor: %v", err, c)
return event
} }
} }
// 准备告警事件信息 // 准备告警事件信息
eventInfo, err := c.prepareEventInfo(event) eventInfo, err := c.prepareEventInfo(event)
if err != nil { if err != nil {
logger.Errorf("failed to prepare event info: %v", err) return event, "", fmt.Errorf("failed to prepare event info: %v processor: %v", err, c)
return event
} }
// 调用AI模型生成总结 // 调用AI模型生成总结
summary, err := c.generateAISummary(eventInfo) summary, err := c.generateAISummary(eventInfo)
if err != nil { if err != nil {
logger.Errorf("failed to generate AI summary: %v", err) return event, "", fmt.Errorf("failed to generate AI summary: %v processor: %v", err, c)
return event
} }
// 将总结添加到annotations字段 // 将总结添加到annotations字段
@@ -86,12 +82,11 @@ func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
// 更新Annotations字段 // 更新Annotations字段
b, err := json.Marshal(event.AnnotationsJSON) b, err := json.Marshal(event.AnnotationsJSON)
if err != nil { if err != nil {
logger.Errorf("failed to marshal annotations: %v", err) return event, "", fmt.Errorf("failed to marshal annotations: %v processor: %v", err, c)
return event
} }
event.Annotations = string(b) event.Annotations = string(b)
return event return event, "", nil
} }
func (c *AISummaryConfig) initHTTPClient() error { func (c *AISummaryConfig) initHTTPClient() error {
@@ -137,7 +132,7 @@ func (c *AISummaryConfig) prepareEventInfo(event *models.AlertCurEvent) (string,
func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) { func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
// 构建基础请求参数 // 构建基础请求参数
reqParams := map[string]interface{}{ reqParams := map[string]interface{}{
"model": c.ModelName, "model": c.ModelName,
"messages": []Message{ "messages": []Message{
{ {
Role: "user", Role: "user",

View File

@@ -54,7 +54,8 @@ func TestAISummaryConfig_Process(t *testing.T) {
assert.NotNil(t, processor) assert.NotNil(t, processor)
// 测试处理函数 // 测试处理函数
result := processor.Process(&ctx.Context{}, event) result, _, err := processor.Process(&ctx.Context{}, event)
assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
assert.NotEmpty(t, result.AnnotationsJSON["ai_summary"]) assert.NotEmpty(t, result.AnnotationsJSON["ai_summary"])

View File

@@ -3,6 +3,7 @@ package callback
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@@ -42,7 +43,7 @@ func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
return result, err return result, err
} }
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent { func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil { if c.Client == nil {
transport := &http.Transport{ transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify}, TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
@@ -51,7 +52,7 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
if c.Proxy != "" { if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy) proxyURL, err := url.Parse(c.Proxy)
if err != nil { if err != nil {
logger.Errorf("failed to parse proxy url: %v", err) return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
} else { } else {
transport.Proxy = http.ProxyURL(proxyURL) transport.Proxy = http.ProxyURL(proxyURL)
} }
@@ -71,14 +72,12 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
body, err := json.Marshal(event) body, err := json.Marshal(event)
if err != nil { if err != nil {
logger.Errorf("failed to marshal event: %v", err) return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
return event
} }
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body))) req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil { if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event) return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
return event
} }
for k, v := range headers { for k, v := range headers {
@@ -91,16 +90,14 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
resp, err := c.Client.Do(req) resp, err := c.Client.Do(req)
if err != nil { if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event) return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
return event
} }
b, err := io.ReadAll(resp.Body) b, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event) return event, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
return event
} }
logger.Infof("response body: %s", string(b)) logger.Debugf("callback processor response body: %s", string(b))
return event return event, "callback success", nil
} }

View File

@@ -2,6 +2,7 @@ package eventdrop
import ( import (
"bytes" "bytes"
"fmt"
"strings" "strings"
texttemplate "text/template" texttemplate "text/template"
@@ -25,7 +26,7 @@ func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
return result, err return result, err
} }
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent { func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑 // 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
// 在标签过滤和属性过滤都不满足需求时可以使用 // 在标签过滤和属性过滤都不满足需求时可以使用
// 如果模板执行结果为 true则删除该事件 // 如果模板执行结果为 true则删除该事件
@@ -40,22 +41,20 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text) tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
if err != nil { if err != nil {
logger.Errorf("processor failed to parse template: %v event: %v", err, event) return event, "", fmt.Errorf("processor failed to parse template: %v processor: %v", err, c)
return event
} }
var body bytes.Buffer var body bytes.Buffer
if err = tpl.Execute(&body, event); err != nil { if err = tpl.Execute(&body, event); err != nil {
logger.Errorf("processor failed to execute template: %v event: %v", err, event) return event, "", fmt.Errorf("processor failed to execute template: %v processor: %v", err, c)
return event
} }
result := strings.TrimSpace(body.String()) result := strings.TrimSpace(body.String())
logger.Infof("processor eventdrop result: %v", result) logger.Infof("processor eventdrop result: %v", result)
if result == "true" { if result == "true" {
logger.Infof("processor eventdrop drop event: %v", event) logger.Infof("processor eventdrop drop event: %v", event)
return nil return event, "drop event success", nil
} }
return event return event, "drop event failed", nil
} }

View File

@@ -3,6 +3,7 @@ package eventupdate
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@@ -30,7 +31,7 @@ func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error)
return result, err return result, err
} }
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent { func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil { if c.Client == nil {
transport := &http.Transport{ transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify}, TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
@@ -39,7 +40,7 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
if c.Proxy != "" { if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy) proxyURL, err := url.Parse(c.Proxy)
if err != nil { if err != nil {
logger.Errorf("failed to parse proxy url: %v", err) return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
} else { } else {
transport.Proxy = http.ProxyURL(proxyURL) transport.Proxy = http.ProxyURL(proxyURL)
} }
@@ -59,14 +60,12 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
body, err := json.Marshal(event) body, err := json.Marshal(event)
if err != nil { if err != nil {
logger.Errorf("failed to marshal event: %v", err) return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
return event
} }
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body))) req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil { if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event) return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
return event
} }
for k, v := range headers { for k, v := range headers {
@@ -79,22 +78,19 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
resp, err := c.Client.Do(req) resp, err := c.Client.Do(req)
if err != nil { if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event) return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
return event
} }
b, err := io.ReadAll(resp.Body) b, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event) return nil, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
return event
} }
logger.Infof("response body: %s", string(b)) logger.Debugf("event update processor response body: %s", string(b))
err = json.Unmarshal(b, &event) err = json.Unmarshal(b, &event)
if err != nil { if err != nil {
logger.Errorf("failed to unmarshal response body: %v event: %v", err, event) return event, "", fmt.Errorf("failed to unmarshal response body: %v processor: %v", err, c)
return event
} }
return event return event, "", nil
} }

View File

@@ -42,7 +42,7 @@ func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
return result, err return result, err
} }
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent { func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
sourceLabels := make([]model.LabelName, len(r.SourceLabels)) sourceLabels := make([]model.LabelName, len(r.SourceLabels))
for i := range r.SourceLabels { for i := range r.SourceLabels {
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT)) sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
@@ -64,7 +64,7 @@ func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *
} }
EventRelabel(event, relabelConfigs) EventRelabel(event, relabelConfigs)
return event return event, "", nil
} }
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) { func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {

View File

@@ -8,7 +8,6 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx" "github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
) )
// 获取事件Pipeline列表 // 获取事件Pipeline列表
@@ -145,9 +144,17 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
if err != nil { if err != nil {
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err) ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
} }
event = processor.Process(rt.Ctx, event) event, _, err = processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
if event == nil { if event == nil {
ginx.Bomb(http.StatusBadRequest, "event is dropped") ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
}, nil)
return
} }
} }
@@ -172,13 +179,15 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
if err != nil { if err != nil {
ginx.Bomb(http.StatusBadRequest, "get processor err: %+v", err) ginx.Bomb(http.StatusBadRequest, "get processor err: %+v", err)
} }
event = processor.Process(rt.Ctx, event) event, res, err := processor.Process(rt.Ctx, event)
logger.Infof("processor %+v result: %+v", f.ProcessorConfig, event) if err != nil {
if event == nil { ginx.Bomb(http.StatusBadRequest, "processor err: %+v", err)
ginx.Bomb(http.StatusBadRequest, "event is dropped")
} }
ginx.NewRender(c).Data(event, nil) ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": res,
}, nil)
} }
func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) { func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
@@ -212,9 +221,17 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
if err != nil { if err != nil {
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err) ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
} }
event = processor.Process(rt.Ctx, event)
event, _, err := processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
if event == nil { if event == nil {
ginx.Bomb(http.StatusBadRequest, "event is dropped") ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
}, nil)
return
} }
} }
} }

View File

@@ -8,8 +8,12 @@ import (
) )
type Processor interface { type Processor interface {
Init(settings interface{}) (Processor, error) // 初始化配置 Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *AlertCurEvent) *AlertCurEvent // 处理告警事件 Process(ctx *ctx.Context, event *AlertCurEvent) (*AlertCurEvent, string, error)
// 处理器有三种情况:
// 1. 处理成功,返回处理后的事件
// 2. 处理成功,不需要返回处理后端事件,只返回处理结果,将处理结果放到 string 中,比如 eventdrop callback 处理器
// 3. 处理失败,返回错误,将错误放到 error 中
} }
type NewProcessorFn func(settings interface{}) (Processor, error) type NewProcessorFn func(settings interface{}) (Processor, error)