mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 14:38:55 +00:00
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fc4d07943 | ||
|
|
9ef1d64821 | ||
|
|
2d73bdac33 | ||
|
|
278f23ac7d | ||
|
|
7573da9026 | ||
|
|
5a0f99da1e | ||
|
|
bffdecc13c |
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/engine"
|
||||
"github.com/ccfos/nightingale/v6/alert/sender"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
@@ -231,6 +232,8 @@ func shouldSkipNotify(ctx *ctx.Context, event *models.AlertCurEvent, notifyRuleI
|
||||
}
|
||||
|
||||
func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, event *models.AlertCurEvent, eventProcessorCache *memsto.EventProcessorCacheType, ctx *ctx.Context, id int64, from string) *models.AlertCurEvent {
|
||||
workflowEngine := engine.NewWorkflowEngine(ctx)
|
||||
|
||||
for _, pipelineConfig := range pipelineConfigs {
|
||||
if !pipelineConfig.Enable {
|
||||
continue
|
||||
@@ -247,23 +250,28 @@ func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, e
|
||||
continue
|
||||
}
|
||||
|
||||
processors := eventProcessorCache.GetProcessorsById(pipelineConfig.PipelineId)
|
||||
for _, processor := range processors {
|
||||
var res string
|
||||
var err error
|
||||
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, before processor:%+v, event: %+v", from, id, pipelineConfig.PipelineId, processor, event)
|
||||
event, res, err = processor.Process(ctx, event)
|
||||
if event == nil {
|
||||
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, event dropped, after processor:%+v, event: %+v", from, id, pipelineConfig.PipelineId, processor, eventOrigin)
|
||||
|
||||
if from == "notify_rule" {
|
||||
// alert_rule 获取不到 eventId 记录没有意义
|
||||
sender.NotifyRecord(ctx, []*models.AlertCurEvent{eventOrigin}, id, "", "", res, fmt.Errorf("processor_by_%s_id:%d pipeline_id:%d, drop by processor", from, id, pipelineConfig.PipelineId))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, after processor:%+v, event: %+v, res:%v, err:%v", from, id, pipelineConfig.PipelineId, processor, event, res, err)
|
||||
// 统一使用工作流引擎执行(兼容线性模式和工作流模式)
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeEvent,
|
||||
TriggerBy: from,
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(eventPipeline, event, triggerCtx)
|
||||
if err != nil {
|
||||
logger.Errorf("processor_by_%s_id:%d pipeline_id:%d, pipeline execute error: %v", from, id, pipelineConfig.PipelineId, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if resultEvent == nil {
|
||||
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, event dropped, event: %+v", from, id, pipelineConfig.PipelineId, eventOrigin)
|
||||
if from == "notify_rule" {
|
||||
sender.NotifyRecord(ctx, []*models.AlertCurEvent{eventOrigin}, id, "", "", result.Message, fmt.Errorf("processor_by_%s_id:%d pipeline_id:%d, drop by pipeline", from, id, pipelineConfig.PipelineId))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
event = resultEvent
|
||||
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, pipeline executed, status:%s, message:%s", from, id, pipelineConfig.PipelineId, result.Status, result.Message)
|
||||
}
|
||||
|
||||
event.FE2DB()
|
||||
|
||||
383
alert/pipeline/engine/engine.go
Normal file
383
alert/pipeline/engine/engine.go
Normal file
@@ -0,0 +1,383 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/google/uuid"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type WorkflowEngine struct {
|
||||
ctx *ctx.Context
|
||||
}
|
||||
|
||||
func NewWorkflowEngine(c *ctx.Context) *WorkflowEngine {
|
||||
return &WorkflowEngine{ctx: c}
|
||||
}
|
||||
|
||||
func (e *WorkflowEngine) Execute(pipeline *models.EventPipeline, event *models.AlertCurEvent, triggerCtx *models.WorkflowTriggerContext) (*models.AlertCurEvent, *models.WorkflowResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
wfCtx := e.initWorkflowContext(pipeline, event, triggerCtx)
|
||||
|
||||
nodes := pipeline.GetWorkflowNodes()
|
||||
connections := pipeline.GetWorkflowConnections()
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return event, &models.WorkflowResult{
|
||||
Event: event,
|
||||
Status: models.ExecutionStatusSuccess,
|
||||
Message: "no nodes to execute",
|
||||
}, nil
|
||||
}
|
||||
|
||||
nodeMap := make(map[string]*models.WorkflowNode)
|
||||
for i := range nodes {
|
||||
if nodes[i].RetryInterval == 0 {
|
||||
nodes[i].RetryInterval = 1
|
||||
}
|
||||
|
||||
if nodes[i].MaxRetries == 0 {
|
||||
nodes[i].MaxRetries = 1
|
||||
}
|
||||
|
||||
nodeMap[nodes[i].ID] = &nodes[i]
|
||||
}
|
||||
|
||||
result := e.executeDAG(nodeMap, connections, wfCtx)
|
||||
result.Event = wfCtx.Event
|
||||
|
||||
duration := time.Since(startTime).Milliseconds()
|
||||
|
||||
if triggerCtx != nil && triggerCtx.Mode != "" {
|
||||
e.saveExecutionRecord(pipeline, wfCtx, result, triggerCtx, startTime.Unix(), duration)
|
||||
}
|
||||
|
||||
return wfCtx.Event, result, nil
|
||||
}
|
||||
|
||||
func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, event *models.AlertCurEvent, triggerCtx *models.WorkflowTriggerContext) *models.WorkflowContext {
|
||||
// 合并环境变量
|
||||
env := pipeline.GetEnvMap()
|
||||
if triggerCtx != nil && triggerCtx.EnvOverrides != nil {
|
||||
for k, v := range triggerCtx.EnvOverrides {
|
||||
env[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
metadata := map[string]string{
|
||||
"start_time": fmt.Sprintf("%d", time.Now().Unix()),
|
||||
"pipeline_id": fmt.Sprintf("%d", pipeline.ID),
|
||||
}
|
||||
|
||||
// 是否启用流式输出
|
||||
stream := false
|
||||
if triggerCtx != nil {
|
||||
metadata["request_id"] = triggerCtx.RequestID
|
||||
metadata["trigger_mode"] = triggerCtx.Mode
|
||||
metadata["trigger_by"] = triggerCtx.TriggerBy
|
||||
stream = triggerCtx.Stream
|
||||
}
|
||||
|
||||
return &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: env,
|
||||
Vars: make(map[string]interface{}), // 初始化空的 Vars,供节点间传递数据
|
||||
Metadata: metadata,
|
||||
Stream: stream,
|
||||
}
|
||||
}
|
||||
|
||||
// executeDAG 使用 Kahn 算法执行 DAG
|
||||
func (e *WorkflowEngine) executeDAG(nodeMap map[string]*models.WorkflowNode, connections models.Connections, wfCtx *models.WorkflowContext) *models.WorkflowResult {
|
||||
result := &models.WorkflowResult{
|
||||
Status: models.ExecutionStatusSuccess,
|
||||
NodeResults: make([]*models.NodeExecutionResult, 0),
|
||||
Stream: wfCtx.Stream, // 从上下文继承流式输出设置
|
||||
}
|
||||
|
||||
// 计算每个节点的入度
|
||||
inDegree := make(map[string]int)
|
||||
for nodeID := range nodeMap {
|
||||
inDegree[nodeID] = 0
|
||||
}
|
||||
|
||||
// 遍历连接,计算入度
|
||||
for _, nodeConns := range connections {
|
||||
for _, targets := range nodeConns.Main {
|
||||
for _, target := range targets {
|
||||
inDegree[target.Node]++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 找到所有入度为 0 的节点(起始节点)
|
||||
queue := make([]string, 0)
|
||||
for nodeID, degree := range inDegree {
|
||||
if degree == 0 {
|
||||
queue = append(queue, nodeID)
|
||||
}
|
||||
}
|
||||
|
||||
// 如果没有起始节点,说明存在循环依赖
|
||||
if len(queue) == 0 && len(nodeMap) > 0 {
|
||||
result.Status = models.ExecutionStatusFailed
|
||||
result.Message = "workflow has circular dependency"
|
||||
return result
|
||||
}
|
||||
|
||||
// 记录已执行的节点
|
||||
executed := make(map[string]bool)
|
||||
// 记录节点的分支选择结果
|
||||
branchResults := make(map[string]*int)
|
||||
|
||||
for len(queue) > 0 {
|
||||
// 取出队首节点
|
||||
nodeID := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// 检查是否已执行
|
||||
if executed[nodeID] {
|
||||
continue
|
||||
}
|
||||
|
||||
node, exists := nodeMap[nodeID]
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// 执行节点
|
||||
nodeResult, nodeOutput := e.executeNode(node, wfCtx)
|
||||
result.NodeResults = append(result.NodeResults, nodeResult)
|
||||
|
||||
if nodeOutput != nil && nodeOutput.Stream && nodeOutput.StreamChan != nil {
|
||||
// 流式输出节点通常是最后一个节点
|
||||
// 直接传递 StreamChan 给 WorkflowResult,不阻塞等待
|
||||
result.Stream = true
|
||||
result.StreamChan = nodeOutput.StreamChan
|
||||
result.Event = wfCtx.Event
|
||||
result.Status = "streaming"
|
||||
result.Message = fmt.Sprintf("streaming output from node: %s", node.Name)
|
||||
|
||||
// 更新节点状态为 streaming
|
||||
nodeResult.Status = "streaming"
|
||||
nodeResult.Message = "streaming in progress"
|
||||
|
||||
// 立即返回,让 API 层处理流式响应
|
||||
return result
|
||||
}
|
||||
executed[nodeID] = true
|
||||
|
||||
// 保存分支结果
|
||||
if nodeResult.BranchIndex != nil {
|
||||
branchResults[nodeID] = nodeResult.BranchIndex
|
||||
}
|
||||
|
||||
// 检查执行状态
|
||||
if nodeResult.Status == "failed" {
|
||||
if !node.ContinueOnFail {
|
||||
result.Status = models.ExecutionStatusFailed
|
||||
result.ErrorNode = nodeID
|
||||
result.Message = fmt.Sprintf("node %s failed: %s", node.Name, nodeResult.Error)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
// 检查是否终止
|
||||
if nodeResult.Status == "terminated" {
|
||||
result.Message = fmt.Sprintf("workflow terminated at node %s", node.Name)
|
||||
return result
|
||||
}
|
||||
|
||||
// 更新后继节点的入度
|
||||
if nodeConns, ok := connections[nodeID]; ok {
|
||||
for outputIndex, targets := range nodeConns.Main {
|
||||
// 检查是否应该走这个分支
|
||||
if !e.shouldFollowBranch(nodeID, outputIndex, branchResults) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, target := range targets {
|
||||
inDegree[target.Node]--
|
||||
if inDegree[target.Node] == 0 {
|
||||
queue = append(queue, target.Node)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// executeNode 执行单个节点
|
||||
// 返回:节点执行结果、节点输出(用于流式输出检测)
|
||||
func (e *WorkflowEngine) executeNode(node *models.WorkflowNode, wfCtx *models.WorkflowContext) (*models.NodeExecutionResult, *models.NodeOutput) {
|
||||
startTime := time.Now()
|
||||
nodeResult := &models.NodeExecutionResult{
|
||||
NodeID: node.ID,
|
||||
NodeName: node.Name,
|
||||
NodeType: node.Type,
|
||||
StartedAt: startTime.Unix(),
|
||||
}
|
||||
|
||||
var nodeOutput *models.NodeOutput
|
||||
|
||||
// 跳过禁用的节点
|
||||
if node.Disabled {
|
||||
nodeResult.Status = "skipped"
|
||||
nodeResult.Message = "node is disabled"
|
||||
nodeResult.FinishedAt = time.Now().Unix()
|
||||
nodeResult.DurationMs = time.Since(startTime).Milliseconds()
|
||||
return nodeResult, nil
|
||||
}
|
||||
|
||||
// 获取处理器
|
||||
processor, err := models.GetProcessorByType(node.Type, node.Config)
|
||||
if err != nil {
|
||||
nodeResult.Status = "failed"
|
||||
nodeResult.Error = fmt.Sprintf("failed to get processor: %v", err)
|
||||
nodeResult.FinishedAt = time.Now().Unix()
|
||||
nodeResult.DurationMs = time.Since(startTime).Milliseconds()
|
||||
return nodeResult, nil
|
||||
}
|
||||
|
||||
// 执行处理器(带重试)
|
||||
var retries int
|
||||
maxRetries := node.MaxRetries
|
||||
if !node.RetryOnFail {
|
||||
maxRetries = 0
|
||||
}
|
||||
|
||||
for retries <= maxRetries {
|
||||
// 检查是否为分支处理器
|
||||
if branchProcessor, ok := processor.(models.BranchProcessor); ok {
|
||||
output, err := branchProcessor.ProcessWithBranch(e.ctx, wfCtx)
|
||||
if err != nil {
|
||||
if retries < maxRetries {
|
||||
retries++
|
||||
time.Sleep(time.Duration(node.RetryInterval) * time.Second)
|
||||
continue
|
||||
}
|
||||
nodeResult.Status = "failed"
|
||||
nodeResult.Error = err.Error()
|
||||
} else {
|
||||
nodeResult.Status = "success"
|
||||
if output != nil {
|
||||
nodeOutput = output
|
||||
if output.WfCtx != nil {
|
||||
wfCtx = output.WfCtx
|
||||
}
|
||||
nodeResult.Message = output.Message
|
||||
nodeResult.BranchIndex = output.BranchIndex
|
||||
if output.Terminate {
|
||||
nodeResult.Status = "terminated"
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// 普通处理器
|
||||
newWfCtx, msg, err := processor.Process(e.ctx, wfCtx)
|
||||
if err != nil {
|
||||
if retries < maxRetries {
|
||||
retries++
|
||||
time.Sleep(time.Duration(node.RetryInterval) * time.Second)
|
||||
continue
|
||||
}
|
||||
nodeResult.Status = "failed"
|
||||
nodeResult.Error = err.Error()
|
||||
} else {
|
||||
nodeResult.Status = "success"
|
||||
nodeResult.Message = msg
|
||||
if newWfCtx != nil {
|
||||
wfCtx = newWfCtx
|
||||
|
||||
// 检测流式输出标记
|
||||
if newWfCtx.Stream && newWfCtx.StreamChan != nil {
|
||||
nodeOutput = &models.NodeOutput{
|
||||
WfCtx: newWfCtx,
|
||||
Message: msg,
|
||||
Stream: true,
|
||||
StreamChan: newWfCtx.StreamChan,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 如果事件被 drop(返回 nil 或 Event 为 nil),标记为终止
|
||||
if newWfCtx == nil || newWfCtx.Event == nil {
|
||||
nodeResult.Status = "terminated"
|
||||
nodeResult.Message = msg
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
nodeResult.FinishedAt = time.Now().Unix()
|
||||
nodeResult.DurationMs = time.Since(startTime).Milliseconds()
|
||||
|
||||
logger.Infof("workflow: executed node %s (type=%s) status=%s msg=%s duration=%dms",
|
||||
node.Name, node.Type, nodeResult.Status, nodeResult.Message, nodeResult.DurationMs)
|
||||
|
||||
return nodeResult, nodeOutput
|
||||
}
|
||||
|
||||
// shouldFollowBranch 判断是否应该走某个分支
|
||||
func (e *WorkflowEngine) shouldFollowBranch(nodeID string, outputIndex int, branchResults map[string]*int) bool {
|
||||
branchIndex, hasBranch := branchResults[nodeID]
|
||||
if !hasBranch {
|
||||
// 没有分支结果,说明不是分支节点,只走第一个输出
|
||||
return outputIndex == 0
|
||||
}
|
||||
|
||||
if branchIndex == nil {
|
||||
// branchIndex 为 nil,走默认分支(通常是最后一个)
|
||||
return true
|
||||
}
|
||||
|
||||
// 只走选中的分支
|
||||
return outputIndex == *branchIndex
|
||||
}
|
||||
|
||||
func (e *WorkflowEngine) saveExecutionRecord(pipeline *models.EventPipeline, wfCtx *models.WorkflowContext, result *models.WorkflowResult, triggerCtx *models.WorkflowTriggerContext, startTime int64, duration int64) {
|
||||
executionID := triggerCtx.RequestID
|
||||
if executionID == "" {
|
||||
executionID = uuid.New().String()
|
||||
}
|
||||
|
||||
execution := &models.EventPipelineExecution{
|
||||
ID: executionID,
|
||||
PipelineID: pipeline.ID,
|
||||
PipelineName: pipeline.Name,
|
||||
Mode: triggerCtx.Mode,
|
||||
Status: result.Status,
|
||||
ErrorMessage: result.Message,
|
||||
ErrorNode: result.ErrorNode,
|
||||
CreatedAt: startTime,
|
||||
FinishedAt: time.Now().Unix(),
|
||||
DurationMs: duration,
|
||||
TriggerBy: triggerCtx.TriggerBy,
|
||||
}
|
||||
|
||||
if wfCtx.Event != nil {
|
||||
execution.EventID = wfCtx.Event.Id
|
||||
}
|
||||
|
||||
if err := execution.SetNodeResults(result.NodeResults); err != nil {
|
||||
logger.Errorf("workflow: failed to set node results: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
secretKeys := pipeline.GetSecretKeys()
|
||||
sanitizedEnv := wfCtx.SanitizedEnv(secretKeys)
|
||||
if err := execution.SetEnvSnapshot(sanitizedEnv); err != nil {
|
||||
logger.Errorf("workflow: failed to set env snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
if err := models.CreateEventPipelineExecution(e.ctx, execution); err != nil {
|
||||
logger.Errorf("workflow: failed to save execution record: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventdrop"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventupdate"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/logic"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/relabel"
|
||||
)
|
||||
|
||||
|
||||
@@ -55,23 +55,24 @@ func (c *AISummaryConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
func (c *AISummaryConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
event := wfCtx.Event
|
||||
if c.Client == nil {
|
||||
if err := c.initHTTPClient(); err != nil {
|
||||
return event, "", fmt.Errorf("failed to initialize HTTP client: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to initialize HTTP client: %v processor: %v", err, c)
|
||||
}
|
||||
}
|
||||
|
||||
// 准备告警事件信息
|
||||
eventInfo, err := c.prepareEventInfo(event)
|
||||
eventInfo, err := c.prepareEventInfo(wfCtx)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to prepare event info: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to prepare event info: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
// 调用AI模型生成总结
|
||||
summary, err := c.generateAISummary(eventInfo)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to generate AI summary: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to generate AI summary: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
// 将总结添加到annotations字段
|
||||
@@ -83,11 +84,11 @@ func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
// 更新Annotations字段
|
||||
b, err := json.Marshal(event.AnnotationsJSON)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to marshal annotations: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to marshal annotations: %v processor: %v", err, c)
|
||||
}
|
||||
event.Annotations = string(b)
|
||||
|
||||
return event, "", nil
|
||||
return wfCtx, "", nil
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) initHTTPClient() error {
|
||||
@@ -110,9 +111,10 @@ func (c *AISummaryConfig) initHTTPClient() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) prepareEventInfo(event *models.AlertCurEvent) (string, error) {
|
||||
func (c *AISummaryConfig) prepareEventInfo(wfCtx *models.WorkflowContext) (string, error) {
|
||||
var defs = []string{
|
||||
"{{$event := .}}",
|
||||
"{{$event := .Event}}",
|
||||
"{{$env := .Env}}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.PromptTemplate), "")
|
||||
@@ -122,7 +124,7 @@ func (c *AISummaryConfig) prepareEventInfo(event *models.AlertCurEvent) (string,
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
err = t.Execute(&body, event)
|
||||
err = t.Execute(&body, wfCtx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to execute prompt template: %v", err)
|
||||
}
|
||||
|
||||
@@ -42,8 +42,14 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// 创建 WorkflowContext
|
||||
wfCtx := &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: map[string]string{},
|
||||
}
|
||||
|
||||
// 测试模板处理
|
||||
eventInfo, err := config.prepareEventInfo(event)
|
||||
eventInfo, err := config.prepareEventInfo(wfCtx)
|
||||
assert.NoError(t, err)
|
||||
assert.Contains(t, eventInfo, "Test Rule")
|
||||
assert.Contains(t, eventInfo, "1")
|
||||
@@ -54,18 +60,18 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
assert.NotNil(t, processor)
|
||||
|
||||
// 测试处理函数
|
||||
result, _, err := processor.Process(&ctx.Context{}, event)
|
||||
result, _, err := processor.Process(&ctx.Context{}, wfCtx)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, result)
|
||||
assert.NotEmpty(t, result.AnnotationsJSON["ai_summary"])
|
||||
assert.NotEmpty(t, result.Event.AnnotationsJSON["ai_summary"])
|
||||
|
||||
// 展示处理结果
|
||||
t.Log("\n=== 处理结果 ===")
|
||||
t.Logf("告警规则: %s", result.RuleName)
|
||||
t.Logf("严重程度: %d", result.Severity)
|
||||
t.Logf("标签: %v", result.TagsMap)
|
||||
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
|
||||
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
|
||||
t.Logf("告警规则: %s", result.Event.RuleName)
|
||||
t.Logf("严重程度: %d", result.Event.Severity)
|
||||
t.Logf("标签: %v", result.Event.TagsMap)
|
||||
t.Logf("原始注释: %v", result.Event.AnnotationsJSON["description"])
|
||||
t.Logf("AI总结: %s", result.Event.AnnotationsJSON["ai_summary"])
|
||||
}
|
||||
|
||||
func TestConvertCustomParam(t *testing.T) {
|
||||
|
||||
@@ -43,7 +43,8 @@ func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
func (c *CallbackConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
event := wfCtx.Event
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
@@ -52,7 +53,7 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
@@ -72,12 +73,12 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
@@ -90,14 +91,14 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
logger.Debugf("callback processor response body: %s", string(b))
|
||||
return event, "callback success", nil
|
||||
return wfCtx, "callback success", nil
|
||||
}
|
||||
|
||||
@@ -26,27 +26,29 @@ func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
func (c *EventDropConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
|
||||
// 在标签过滤和属性过滤都不满足需求时可以使用
|
||||
// 如果模板执行结果为 true,则删除该事件
|
||||
event := wfCtx.Event
|
||||
|
||||
var defs = []string{
|
||||
"{{ $event := . }}",
|
||||
"{{ $labels := .TagsMap }}",
|
||||
"{{ $value := .TriggerValue }}",
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Content), "")
|
||||
|
||||
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("processor failed to parse template: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("processor failed to parse template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
if err = tpl.Execute(&body, event); err != nil {
|
||||
return event, "", fmt.Errorf("processor failed to execute template: %v processor: %v", err, c)
|
||||
if err = tpl.Execute(&body, wfCtx); err != nil {
|
||||
return wfCtx, "", fmt.Errorf("processor failed to execute template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
result := strings.TrimSpace(body.String())
|
||||
@@ -56,5 +58,5 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
return nil, "drop event success", nil
|
||||
}
|
||||
|
||||
return event, "drop event failed", nil
|
||||
return wfCtx, "drop event failed", nil
|
||||
}
|
||||
|
||||
@@ -31,7 +31,8 @@ func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
func (c *EventUpdateConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
event := wfCtx.Event
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
@@ -40,7 +41,7 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
@@ -60,12 +61,12 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
@@ -78,7 +79,7 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
@@ -89,8 +90,8 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
|
||||
err = json.Unmarshal(b, &event)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to unmarshal response body: %v processor: %v", err, c)
|
||||
return wfCtx, "", fmt.Errorf("failed to unmarshal response body: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
return event, "", nil
|
||||
return wfCtx, "", nil
|
||||
}
|
||||
|
||||
197
alert/pipeline/processor/logic/if.go
Normal file
197
alert/pipeline/processor/logic/if.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
alertCommon "github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
)
|
||||
|
||||
// 判断模式常量
|
||||
const (
|
||||
ConditionModeExpression = "expression" // 表达式模式(默认)
|
||||
ConditionModeTags = "tags" // 标签/属性模式
|
||||
)
|
||||
|
||||
// IfConfig If 条件处理器配置
|
||||
type IfConfig struct {
|
||||
// 判断模式:expression(表达式)或 tags(标签/属性)
|
||||
Mode string `json:"mode,omitempty"`
|
||||
|
||||
// 表达式模式配置
|
||||
// 条件表达式(支持 Go 模板语法)
|
||||
// 例如:{{ if eq .Severity 1 }}true{{ end }}
|
||||
Condition string `json:"condition,omitempty"`
|
||||
|
||||
// 标签/属性模式配置
|
||||
LabelKeys []models.TagFilter `json:"label_keys,omitempty"` // 适用标签
|
||||
Attributes []models.TagFilter `json:"attributes,omitempty"` // 适用属性
|
||||
|
||||
// 内部使用,解析后的过滤器
|
||||
parsedLabelKeys []models.TagFilter `json:"-"`
|
||||
parsedAttributes []models.TagFilter `json:"-"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("logic.if", &IfConfig{})
|
||||
}
|
||||
|
||||
func (c *IfConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*IfConfig](settings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 解析标签过滤器
|
||||
if len(result.LabelKeys) > 0 {
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
labelKeysCopy := make([]models.TagFilter, len(result.LabelKeys))
|
||||
copy(labelKeysCopy, result.LabelKeys)
|
||||
for i := range labelKeysCopy {
|
||||
if labelKeysCopy[i].Func == "" {
|
||||
labelKeysCopy[i].Func = labelKeysCopy[i].Op
|
||||
}
|
||||
}
|
||||
result.parsedLabelKeys, err = models.ParseTagFilter(labelKeysCopy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse label_keys: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 解析属性过滤器
|
||||
if len(result.Attributes) > 0 {
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
attributesCopy := make([]models.TagFilter, len(result.Attributes))
|
||||
copy(attributesCopy, result.Attributes)
|
||||
for i := range attributesCopy {
|
||||
if attributesCopy[i].Func == "" {
|
||||
attributesCopy[i].Func = attributesCopy[i].Op
|
||||
}
|
||||
}
|
||||
result.parsedAttributes, err = models.ParseTagFilter(attributesCopy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse attributes: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Process 实现 Processor 接口(兼容旧模式)
|
||||
func (c *IfConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
result, err := c.evaluateCondition(wfCtx)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("if processor: failed to evaluate condition: %v", err)
|
||||
}
|
||||
|
||||
if result {
|
||||
return wfCtx, "condition matched (true branch)", nil
|
||||
}
|
||||
return wfCtx, "condition not matched (false branch)", nil
|
||||
}
|
||||
|
||||
// ProcessWithBranch 实现 BranchProcessor 接口
|
||||
func (c *IfConfig) ProcessWithBranch(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.NodeOutput, error) {
|
||||
result, err := c.evaluateCondition(wfCtx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("if processor: failed to evaluate condition: %v", err)
|
||||
}
|
||||
|
||||
output := &models.NodeOutput{
|
||||
WfCtx: wfCtx,
|
||||
}
|
||||
|
||||
if result {
|
||||
// 条件为 true,走输出 0(true 分支)
|
||||
branchIndex := 0
|
||||
output.BranchIndex = &branchIndex
|
||||
output.Message = "condition matched (true branch)"
|
||||
} else {
|
||||
// 条件为 false,走输出 1(false 分支)
|
||||
branchIndex := 1
|
||||
output.BranchIndex = &branchIndex
|
||||
output.Message = "condition not matched (false branch)"
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
// evaluateCondition 评估条件
|
||||
func (c *IfConfig) evaluateCondition(wfCtx *models.WorkflowContext) (bool, error) {
|
||||
mode := c.Mode
|
||||
if mode == "" {
|
||||
mode = ConditionModeExpression // 默认表达式模式
|
||||
}
|
||||
|
||||
switch mode {
|
||||
case ConditionModeTags:
|
||||
return c.evaluateTagsCondition(wfCtx.Event)
|
||||
default:
|
||||
return c.evaluateExpressionCondition(wfCtx)
|
||||
}
|
||||
}
|
||||
|
||||
// evaluateExpressionCondition 评估表达式条件
|
||||
func (c *IfConfig) evaluateExpressionCondition(wfCtx *models.WorkflowContext) (bool, error) {
|
||||
if c.Condition == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// 构建模板数据
|
||||
var defs = []string{
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Condition), "")
|
||||
|
||||
tpl, err := template.New("if_condition").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err = tpl.Execute(&buf, wfCtx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
result := strings.TrimSpace(strings.ToLower(buf.String()))
|
||||
return result == "true" || result == "1", nil
|
||||
}
|
||||
|
||||
// evaluateTagsCondition 评估标签/属性条件
|
||||
func (c *IfConfig) evaluateTagsCondition(event *models.AlertCurEvent) (bool, error) {
|
||||
// 如果没有配置任何过滤条件,默认返回 true
|
||||
if len(c.parsedLabelKeys) == 0 && len(c.parsedAttributes) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// 匹配标签 (TagsMap)
|
||||
if len(c.parsedLabelKeys) > 0 {
|
||||
tagsMap := event.TagsMap
|
||||
if tagsMap == nil {
|
||||
tagsMap = make(map[string]string)
|
||||
}
|
||||
if !alertCommon.MatchTags(tagsMap, c.parsedLabelKeys) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 匹配属性 (JsonTagsAndValue - 所有 JSON 字段)
|
||||
if len(c.parsedAttributes) > 0 {
|
||||
attributesMap := event.JsonTagsAndValue()
|
||||
if !alertCommon.MatchTags(attributesMap, c.parsedAttributes) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
224
alert/pipeline/processor/logic/switch.go
Normal file
224
alert/pipeline/processor/logic/switch.go
Normal file
@@ -0,0 +1,224 @@
|
||||
package logic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
alertCommon "github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
)
|
||||
|
||||
// SwitchCase Switch 分支定义
|
||||
type SwitchCase struct {
|
||||
// 判断模式:expression(表达式)或 tags(标签/属性)
|
||||
Mode string `json:"mode,omitempty"`
|
||||
|
||||
// 表达式模式配置
|
||||
// 条件表达式(支持 Go 模板语法)
|
||||
Condition string `json:"condition,omitempty"`
|
||||
|
||||
// 标签/属性模式配置
|
||||
LabelKeys []models.TagFilter `json:"label_keys,omitempty"` // 适用标签
|
||||
Attributes []models.TagFilter `json:"attributes,omitempty"` // 适用属性
|
||||
|
||||
// 分支名称(可选,用于日志)
|
||||
Name string `json:"name,omitempty"`
|
||||
|
||||
// 内部使用,解析后的过滤器
|
||||
parsedLabelKeys []models.TagFilter `json:"-"`
|
||||
parsedAttributes []models.TagFilter `json:"-"`
|
||||
}
|
||||
|
||||
// SwitchConfig Switch 多分支处理器配置
|
||||
type SwitchConfig struct {
|
||||
// 分支条件列表
|
||||
// 按顺序匹配,第一个为 true 的分支将被选中
|
||||
Cases []SwitchCase `json:"cases"`
|
||||
// 是否允许多个分支同时匹配(默认 false,只走第一个匹配的)
|
||||
AllowMultiple bool `json:"allow_multiple,omitempty"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("logic.switch", &SwitchConfig{})
|
||||
}
|
||||
|
||||
func (c *SwitchConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*SwitchConfig](settings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 解析每个 case 的标签和属性过滤器
|
||||
for i := range result.Cases {
|
||||
if len(result.Cases[i].LabelKeys) > 0 {
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
labelKeysCopy := make([]models.TagFilter, len(result.Cases[i].LabelKeys))
|
||||
copy(labelKeysCopy, result.Cases[i].LabelKeys)
|
||||
for j := range labelKeysCopy {
|
||||
if labelKeysCopy[j].Func == "" {
|
||||
labelKeysCopy[j].Func = labelKeysCopy[j].Op
|
||||
}
|
||||
}
|
||||
result.Cases[i].parsedLabelKeys, err = models.ParseTagFilter(labelKeysCopy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse label_keys for case[%d]: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result.Cases[i].Attributes) > 0 {
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
attributesCopy := make([]models.TagFilter, len(result.Cases[i].Attributes))
|
||||
copy(attributesCopy, result.Cases[i].Attributes)
|
||||
for j := range attributesCopy {
|
||||
if attributesCopy[j].Func == "" {
|
||||
attributesCopy[j].Func = attributesCopy[j].Op
|
||||
}
|
||||
}
|
||||
result.Cases[i].parsedAttributes, err = models.ParseTagFilter(attributesCopy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse attributes for case[%d]: %v", i, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Process 实现 Processor 接口(兼容旧模式)
|
||||
func (c *SwitchConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
index, caseName, err := c.evaluateCases(wfCtx)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("switch processor: failed to evaluate cases: %v", err)
|
||||
}
|
||||
|
||||
if index >= 0 {
|
||||
if caseName != "" {
|
||||
return wfCtx, fmt.Sprintf("matched case[%d]: %s", index, caseName), nil
|
||||
}
|
||||
return wfCtx, fmt.Sprintf("matched case[%d]", index), nil
|
||||
}
|
||||
|
||||
// 走默认分支(最后一个输出)
|
||||
return wfCtx, "no case matched, using default branch", nil
|
||||
}
|
||||
|
||||
// ProcessWithBranch 实现 BranchProcessor 接口
|
||||
func (c *SwitchConfig) ProcessWithBranch(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.NodeOutput, error) {
|
||||
index, caseName, err := c.evaluateCases(wfCtx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("switch processor: failed to evaluate cases: %v", err)
|
||||
}
|
||||
|
||||
output := &models.NodeOutput{
|
||||
WfCtx: wfCtx,
|
||||
}
|
||||
|
||||
if index >= 0 {
|
||||
output.BranchIndex = &index
|
||||
if caseName != "" {
|
||||
output.Message = fmt.Sprintf("matched case[%d]: %s", index, caseName)
|
||||
} else {
|
||||
output.Message = fmt.Sprintf("matched case[%d]", index)
|
||||
}
|
||||
} else {
|
||||
// 默认分支的索引是 cases 数量(即最后一个输出端口)
|
||||
defaultIndex := len(c.Cases)
|
||||
output.BranchIndex = &defaultIndex
|
||||
output.Message = "no case matched, using default branch"
|
||||
}
|
||||
|
||||
return output, nil
|
||||
}
|
||||
|
||||
// evaluateCases 评估所有分支条件
|
||||
// 返回匹配的分支索引和分支名称,如果没有匹配返回 -1
|
||||
func (c *SwitchConfig) evaluateCases(wfCtx *models.WorkflowContext) (int, string, error) {
|
||||
for i := range c.Cases {
|
||||
matched, err := c.evaluateCaseCondition(&c.Cases[i], wfCtx)
|
||||
if err != nil {
|
||||
return -1, "", fmt.Errorf("case[%d] evaluation error: %v", i, err)
|
||||
}
|
||||
if matched {
|
||||
return i, c.Cases[i].Name, nil
|
||||
}
|
||||
}
|
||||
return -1, "", nil
|
||||
}
|
||||
|
||||
// evaluateCaseCondition 评估单个分支条件
|
||||
func (c *SwitchConfig) evaluateCaseCondition(caseItem *SwitchCase, wfCtx *models.WorkflowContext) (bool, error) {
|
||||
mode := caseItem.Mode
|
||||
if mode == "" {
|
||||
mode = ConditionModeExpression // 默认表达式模式
|
||||
}
|
||||
|
||||
switch mode {
|
||||
case ConditionModeTags:
|
||||
return c.evaluateTagsCondition(caseItem, wfCtx.Event)
|
||||
default:
|
||||
return c.evaluateExpressionCondition(caseItem.Condition, wfCtx)
|
||||
}
|
||||
}
|
||||
|
||||
// evaluateExpressionCondition 评估表达式条件
|
||||
func (c *SwitchConfig) evaluateExpressionCondition(condition string, wfCtx *models.WorkflowContext) (bool, error) {
|
||||
if condition == "" {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var defs = []string{
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, condition), "")
|
||||
|
||||
tpl, err := template.New("switch_condition").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err = tpl.Execute(&buf, wfCtx); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
result := strings.TrimSpace(strings.ToLower(buf.String()))
|
||||
return result == "true" || result == "1", nil
|
||||
}
|
||||
|
||||
// evaluateTagsCondition 评估标签/属性条件
|
||||
func (c *SwitchConfig) evaluateTagsCondition(caseItem *SwitchCase, event *models.AlertCurEvent) (bool, error) {
|
||||
// 如果没有配置任何过滤条件,默认返回 false(不匹配)
|
||||
if len(caseItem.parsedLabelKeys) == 0 && len(caseItem.parsedAttributes) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// 匹配标签 (TagsMap)
|
||||
if len(caseItem.parsedLabelKeys) > 0 {
|
||||
tagsMap := event.TagsMap
|
||||
if tagsMap == nil {
|
||||
tagsMap = make(map[string]string)
|
||||
}
|
||||
if !alertCommon.MatchTags(tagsMap, caseItem.parsedLabelKeys) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 匹配属性 (JsonTagsAndValue - 所有 JSON 字段)
|
||||
if len(caseItem.parsedAttributes) > 0 {
|
||||
attributesMap := event.JsonTagsAndValue()
|
||||
if !alertCommon.MatchTags(attributesMap, caseItem.parsedAttributes) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext) (*models.WorkflowContext, string, error) {
|
||||
sourceLabels := make([]model.LabelName, len(r.SourceLabels))
|
||||
for i := range r.SourceLabels {
|
||||
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
|
||||
@@ -63,8 +63,8 @@ func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (
|
||||
},
|
||||
}
|
||||
|
||||
EventRelabel(event, relabelConfigs)
|
||||
return event, "", nil
|
||||
EventRelabel(wfCtx.Event, relabelConfigs)
|
||||
return wfCtx, "", nil
|
||||
}
|
||||
|
||||
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {
|
||||
|
||||
@@ -558,6 +558,19 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.POST("/event-pipeline-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventPipeline)
|
||||
pages.POST("/event-processor-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventProcessor)
|
||||
|
||||
// API 触发工作流
|
||||
pages.POST("/event-pipeline/:id/trigger", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.triggerEventPipelineByAPI)
|
||||
// SSE 流式执行工作流
|
||||
pages.POST("/event-pipeline/:id/stream", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.streamEventPipeline)
|
||||
|
||||
// 事件Pipeline执行记录路由
|
||||
pages.GET("/event-pipeline-executions", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.listAllEventPipelineExecutions)
|
||||
pages.GET("/event-pipeline/:id/executions", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.listEventPipelineExecutions)
|
||||
pages.GET("/event-pipeline/:id/execution/:exec_id", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.getEventPipelineExecution)
|
||||
pages.GET("/event-pipeline-execution/:exec_id", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.getEventPipelineExecution)
|
||||
pages.GET("/event-pipeline/:id/execution-stats", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.getEventPipelineExecutionStats)
|
||||
pages.POST("/event-pipeline-executions/clean", rt.auth(), rt.user(), rt.admin(), rt.cleanEventPipelineExecutions)
|
||||
|
||||
pages.POST("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/add"), rt.notifyChannelsAdd)
|
||||
pages.DELETE("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/del"), rt.notifyChannelsDel)
|
||||
pages.PUT("/notify-channel-config/:id", rt.auth(), rt.user(), rt.perm("/notification-channels/put"), rt.notifyChannelPut)
|
||||
@@ -690,6 +703,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.GET("/message-templates", rt.messageTemplateGets)
|
||||
|
||||
service.GET("/event-pipelines", rt.eventPipelinesListByService)
|
||||
service.POST("/event-pipeline/:id/trigger", rt.triggerEventPipelineByService)
|
||||
service.POST("/event-pipeline/:id/stream", rt.streamEventPipelineByService)
|
||||
|
||||
// 手机号加密存储配置接口
|
||||
service.POST("/users/phone/encrypt", rt.usersPhoneEncrypt)
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/engine"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/i18n"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// 获取事件Pipeline列表
|
||||
@@ -27,6 +32,8 @@ func (rt *Router) eventPipelinesList(c *gin.Context) {
|
||||
for _, tid := range pipeline.TeamIds {
|
||||
pipeline.TeamNames = append(pipeline.TeamNames, ugMap[tid])
|
||||
}
|
||||
// 兼容处理:自动填充工作流字段
|
||||
pipeline.FillWorkflowFields()
|
||||
}
|
||||
|
||||
gids, err := models.MyGroupIdsMap(rt.Ctx, me.Id)
|
||||
@@ -61,6 +68,9 @@ func (rt *Router) getEventPipeline(c *gin.Context) {
|
||||
err = pipeline.FillTeamNames(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
// 兼容处理:自动填充工作流字段
|
||||
pipeline.FillWorkflowFields()
|
||||
|
||||
ginx.NewRender(c).Data(pipeline, nil)
|
||||
}
|
||||
|
||||
@@ -131,7 +141,9 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
PipelineConfig models.EventPipeline `json:"pipeline_config"`
|
||||
EnvVariables map[string]string `json:"env_variables,omitempty"`
|
||||
}
|
||||
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
@@ -141,30 +153,33 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
lang := c.GetHeader("X-Language")
|
||||
var result string
|
||||
for _, p := range f.PipelineConfig.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
|
||||
}
|
||||
event, result, err = processor.Process(rt.Ctx, event)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
if event == nil {
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": i18n.Sprintf(lang, "event is dropped"),
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
// 统一使用工作流引擎执行(兼容线性模式和工作流模式)
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvVariables,
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(&f.PipelineConfig, event, triggerCtx)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "pipeline execute error: %v", err)
|
||||
}
|
||||
|
||||
m := map[string]interface{}{
|
||||
"event": event,
|
||||
"result": i18n.Sprintf(lang, result),
|
||||
"event": resultEvent,
|
||||
"result": i18n.Sprintf(lang, result.Message),
|
||||
"status": result.Status,
|
||||
"node_results": result.NodeResults,
|
||||
}
|
||||
|
||||
if resultEvent == nil {
|
||||
m["result"] = i18n.Sprintf(lang, "event is dropped")
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(m, nil)
|
||||
}
|
||||
|
||||
@@ -186,14 +201,18 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
if err != nil {
|
||||
ginx.Bomb(200, "get processor err: %+v", err)
|
||||
}
|
||||
event, res, err := processor.Process(rt.Ctx, event)
|
||||
wfCtx := &models.WorkflowContext{
|
||||
Event: event,
|
||||
Vars: make(map[string]interface{}),
|
||||
}
|
||||
wfCtx, res, err := processor.Process(rt.Ctx, wfCtx)
|
||||
if err != nil {
|
||||
ginx.Bomb(200, "processor err: %+v", err)
|
||||
}
|
||||
|
||||
lang := c.GetHeader("X-Language")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"event": wfCtx.Event,
|
||||
"result": i18n.Sprintf(lang, res),
|
||||
}, nil)
|
||||
}
|
||||
@@ -223,6 +242,10 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "processors not found")
|
||||
}
|
||||
|
||||
wfCtx := &models.WorkflowContext{
|
||||
Event: event,
|
||||
Vars: make(map[string]interface{}),
|
||||
}
|
||||
for _, pl := range pipelines {
|
||||
for _, p := range pl.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
@@ -230,14 +253,14 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
|
||||
}
|
||||
|
||||
event, _, err := processor.Process(rt.Ctx, event)
|
||||
wfCtx, _, err = processor.Process(rt.Ctx, wfCtx)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
if event == nil {
|
||||
if wfCtx == nil || wfCtx.Event == nil {
|
||||
lang := c.GetHeader("X-Language")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"event": nil,
|
||||
"result": i18n.Sprintf(lang, "event is dropped"),
|
||||
}, nil)
|
||||
return
|
||||
@@ -245,10 +268,337 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(event, nil)
|
||||
ginx.NewRender(c).Data(wfCtx.Event, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) eventPipelinesListByService(c *gin.Context) {
|
||||
pipelines, err := models.ListEventPipelines(rt.Ctx)
|
||||
ginx.NewRender(c).Data(pipelines, err)
|
||||
}
|
||||
|
||||
type EventPipelineRequest struct {
|
||||
// 事件数据(可选,如果不传则使用空事件)
|
||||
Event *models.AlertCurEvent `json:"event,omitempty"`
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides,omitempty"`
|
||||
|
||||
Username string `json:"username,omitempty"`
|
||||
}
|
||||
|
||||
// executePipelineTrigger 执行 Pipeline 触发的公共逻辑
|
||||
func (rt *Router) executePipelineTrigger(pipeline *models.EventPipeline, req *EventPipelineRequest, triggerBy string) (string, error) {
|
||||
// 准备事件数据
|
||||
var event *models.AlertCurEvent
|
||||
if req.Event != nil {
|
||||
event = req.Event
|
||||
} else {
|
||||
// 创建空事件
|
||||
event = &models.AlertCurEvent{
|
||||
TriggerTime: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
// 校验必填环境变量
|
||||
if err := pipeline.ValidateEnvVariables(req.EnvOverrides); err != nil {
|
||||
return "", fmt.Errorf("env validation failed: %v", err)
|
||||
}
|
||||
|
||||
// 生成执行ID
|
||||
executionID := uuid.New().String()
|
||||
|
||||
// 创建触发上下文
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: triggerBy,
|
||||
EnvOverrides: req.EnvOverrides,
|
||||
RequestID: executionID,
|
||||
}
|
||||
|
||||
// 异步执行工作流
|
||||
go func() {
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
_, _, err := workflowEngine.Execute(pipeline, event, triggerCtx)
|
||||
if err != nil {
|
||||
logger.Errorf("async workflow execute error: pipeline_id=%d execution_id=%s err=%v",
|
||||
pipeline.ID, executionID, err)
|
||||
}
|
||||
}()
|
||||
|
||||
return executionID, nil
|
||||
}
|
||||
|
||||
// triggerEventPipelineByService Service 调用触发工作流执行
|
||||
func (rt *Router) triggerEventPipelineByService(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
var f EventPipelineRequest
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
// 获取 Pipeline
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, pipelineID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "pipeline not found: %v", err)
|
||||
}
|
||||
|
||||
executionID, err := rt.executePipelineTrigger(pipeline, &f, f.Username)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "%v", err)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"execution_id": executionID,
|
||||
"message": "workflow execution started",
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// triggerEventPipelineByAPI API 触发工作流执行
|
||||
func (rt *Router) triggerEventPipelineByAPI(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
var f EventPipelineRequest
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
// 获取 Pipeline
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, pipelineID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "pipeline not found: %v", err)
|
||||
}
|
||||
|
||||
// 检查权限
|
||||
me := c.MustGet("user").(*models.User)
|
||||
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
|
||||
executionID, err := rt.executePipelineTrigger(pipeline, &f, me.Username)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"execution_id": executionID,
|
||||
"message": "workflow execution started",
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
|
||||
pipelineName := ginx.QueryStr(c, "pipeline_name", "")
|
||||
mode := ginx.QueryStr(c, "mode", "")
|
||||
status := ginx.QueryStr(c, "status", "")
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
offset := ginx.QueryInt(c, "p", 1)
|
||||
|
||||
if limit <= 0 || limit > 1000 {
|
||||
limit = 20
|
||||
}
|
||||
if offset <= 0 {
|
||||
offset = 1
|
||||
}
|
||||
|
||||
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineName, mode, status, limit, (offset-1)*limit)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": executions,
|
||||
"total": total,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) listEventPipelineExecutions(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
mode := ginx.QueryStr(c, "mode", "")
|
||||
status := ginx.QueryStr(c, "status", "")
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
offset := ginx.QueryInt(c, "p", 1)
|
||||
|
||||
if limit <= 0 || limit > 1000 {
|
||||
limit = 20
|
||||
}
|
||||
if offset <= 0 {
|
||||
offset = 1
|
||||
}
|
||||
|
||||
executions, total, err := models.ListEventPipelineExecutions(rt.Ctx, pipelineID, mode, status, limit, (offset-1)*limit)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": executions,
|
||||
"total": total,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) getEventPipelineExecution(c *gin.Context) {
|
||||
execID := ginx.UrlParamStr(c, "exec_id")
|
||||
|
||||
detail, err := models.GetEventPipelineExecutionDetail(rt.Ctx, execID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "execution not found: %v", err)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(detail, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) getEventPipelineExecutionStats(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
stats, err := models.GetEventPipelineExecutionStatistics(rt.Ctx, pipelineID)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(stats, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) cleanEventPipelineExecutions(c *gin.Context) {
|
||||
var f struct {
|
||||
BeforeDays int `json:"before_days"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
if f.BeforeDays <= 0 {
|
||||
f.BeforeDays = 30
|
||||
}
|
||||
|
||||
beforeTime := time.Now().AddDate(0, 0, -f.BeforeDays).Unix()
|
||||
affected, err := models.DeleteEventPipelineExecutions(rt.Ctx, beforeTime)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"deleted": affected,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) streamEventPipeline(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
var f EventPipelineRequest
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, pipelineID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "pipeline not found: %v", err)
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
|
||||
var event *models.AlertCurEvent
|
||||
if f.Event != nil {
|
||||
event = f.Event
|
||||
} else {
|
||||
event = &models.AlertCurEvent{
|
||||
TriggerTime: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
_, result, err := workflowEngine.Execute(pipeline, event, triggerCtx)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusInternalServerError, "execute failed: %v", err)
|
||||
}
|
||||
|
||||
if result.Stream && result.StreamChan != nil {
|
||||
rt.handleStreamResponse(c, result, triggerCtx.RequestID)
|
||||
return
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(result, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) handleStreamResponse(c *gin.Context, result *models.WorkflowResult, requestID string) {
|
||||
// 设置 SSE 响应头
|
||||
c.Header("Content-Type", "text/event-stream")
|
||||
c.Header("Cache-Control", "no-cache")
|
||||
c.Header("Connection", "keep-alive")
|
||||
c.Header("X-Accel-Buffering", "no") // 禁用 nginx 缓冲
|
||||
c.Header("X-Request-ID", requestID)
|
||||
|
||||
flusher, ok := c.Writer.(http.Flusher)
|
||||
if !ok {
|
||||
ginx.Bomb(http.StatusInternalServerError, "streaming not supported")
|
||||
return
|
||||
}
|
||||
|
||||
// 发送初始连接成功消息
|
||||
initData := fmt.Sprintf(`{"type":"connected","request_id":"%s","timestamp":%d}`, requestID, time.Now().UnixMilli())
|
||||
fmt.Fprintf(c.Writer, "data: %s\n\n", initData)
|
||||
flusher.Flush()
|
||||
|
||||
// 从 channel 读取并发送 SSE
|
||||
timeout := time.After(30 * time.Minute) // 最长流式输出时间
|
||||
for {
|
||||
select {
|
||||
case chunk, ok := <-result.StreamChan:
|
||||
if !ok {
|
||||
// channel 关闭,发送结束标记
|
||||
return
|
||||
}
|
||||
|
||||
data, err := json.Marshal(chunk)
|
||||
if err != nil {
|
||||
logger.Errorf("stream: failed to marshal chunk: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Fprintf(c.Writer, "data: %s\n\n", data)
|
||||
flusher.Flush()
|
||||
|
||||
if chunk.Done {
|
||||
return
|
||||
}
|
||||
|
||||
case <-c.Request.Context().Done():
|
||||
// 客户端断开连接
|
||||
logger.Infof("stream: client disconnected, request_id=%s", requestID)
|
||||
return
|
||||
case <-timeout:
|
||||
logger.Errorf("stream: timeout, request_id=%s", requestID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Router) streamEventPipelineByService(c *gin.Context) {
|
||||
pipelineID := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
var f EventPipelineRequest
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, pipelineID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "pipeline not found: %v", err)
|
||||
}
|
||||
|
||||
var event *models.AlertCurEvent
|
||||
if f.Event != nil {
|
||||
event = f.Event
|
||||
} else {
|
||||
event = &models.AlertCurEvent{
|
||||
TriggerTime: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: f.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
_, result, err := workflowEngine.Execute(pipeline, event, triggerCtx)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusInternalServerError, "execute failed: %v", err)
|
||||
}
|
||||
|
||||
// 检查是否是流式输出
|
||||
if result.Stream && result.StreamChan != nil {
|
||||
rt.handleStreamResponse(c, result, triggerCtx.RequestID)
|
||||
return
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(result, nil)
|
||||
}
|
||||
|
||||
@@ -294,7 +294,40 @@ ALTER TABLE `alert_rule` ADD COLUMN `pipeline_configs` text COMMENT 'pipeline co
|
||||
ALTER TABLE `board` ADD COLUMN `note` varchar(1024) not null default '' comment 'note';
|
||||
ALTER TABLE `builtin_payloads` ADD COLUMN `note` varchar(1024) not null default '' comment 'note of payload';
|
||||
|
||||
/* v9 2026-01-09 */
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `typ` varchar(128) NOT NULL DEFAULT '' COMMENT 'pipeline type: builtin, user-defined';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `use_case` varchar(128) NOT NULL DEFAULT '' COMMENT 'use case: metric_explorer, event_summary, event_pipeline';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `trigger_mode` varchar(128) NOT NULL DEFAULT 'event' COMMENT 'trigger mode: event, api, cron';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `disabled` tinyint(1) NOT NULL DEFAULT 0 COMMENT 'disabled flag';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `nodes` text COMMENT 'workflow nodes (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `connections` text COMMENT 'node connections (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `env_variables` text COMMENT 'environment variables (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `label_filters` text COMMENT 'label filters (JSON)';
|
||||
|
||||
CREATE TABLE `event_pipeline_execution` (
|
||||
`id` varchar(36) NOT NULL COMMENT 'execution id',
|
||||
`pipeline_id` bigint NOT NULL COMMENT 'pipeline id',
|
||||
`pipeline_name` varchar(128) DEFAULT '' COMMENT 'pipeline name snapshot',
|
||||
`event_id` bigint DEFAULT 0 COMMENT 'related alert event id',
|
||||
`mode` varchar(16) NOT NULL DEFAULT 'event' COMMENT 'trigger mode: event/api/cron',
|
||||
`status` varchar(16) NOT NULL DEFAULT 'running' COMMENT 'status: running/success/failed',
|
||||
`node_results` mediumtext COMMENT 'node execution results (JSON)',
|
||||
`error_message` varchar(1024) DEFAULT '' COMMENT 'error message',
|
||||
`error_node` varchar(36) DEFAULT '' COMMENT 'error node id',
|
||||
`created_at` bigint NOT NULL DEFAULT 0 COMMENT 'start timestamp',
|
||||
`finished_at` bigint DEFAULT 0 COMMENT 'finish timestamp',
|
||||
`duration_ms` bigint DEFAULT 0 COMMENT 'duration in milliseconds',
|
||||
`trigger_by` varchar(64) DEFAULT '' COMMENT 'trigger by',
|
||||
`env_snapshot` text COMMENT 'environment variables snapshot (sanitized)',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_pipeline_id` (`pipeline_id`),
|
||||
KEY `idx_event_id` (`event_id`),
|
||||
KEY `idx_mode` (`mode`),
|
||||
KEY `idx_status` (`status`),
|
||||
KEY `idx_created_at` (`created_at`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='event pipeline execution records';
|
||||
|
||||
/* v8.5.0 builtin_metrics new fields */
|
||||
ALTER TABLE `builtin_metrics` ADD COLUMN `expression_type` varchar(32) NOT NULL DEFAULT 'promql' COMMENT 'expression type: metric_name or promql';
|
||||
ALTER TABLE `builtin_metrics` ADD COLUMN `metric_type` varchar(191) NOT NULL DEFAULT '' COMMENT 'metric type like counter/gauge';
|
||||
ALTER TABLE `builtin_metrics` ADD COLUMN `extra_fields` text COMMENT 'custom extra fields';
|
||||
ALTER TABLE `builtin_metrics` ADD COLUMN `extra_fields` text COMMENT 'custom extra fields';
|
||||
|
||||
@@ -68,18 +68,6 @@ func (epc *EventProcessorCacheType) Get(processorId int64) *models.EventPipeline
|
||||
return epc.eventPipelines[processorId]
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) GetProcessorsById(processorId int64) []models.Processor {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
|
||||
eventPipeline, ok := epc.eventPipelines[processorId]
|
||||
if !ok {
|
||||
return []models.Processor{}
|
||||
}
|
||||
|
||||
return eventPipeline.Processors
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) GetProcessorIds() []int64 {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
@@ -137,18 +125,7 @@ func (epc *EventProcessorCacheType) syncEventProcessors() error {
|
||||
|
||||
m := make(map[int64]*models.EventPipeline)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
eventPipeline := lst[i]
|
||||
for _, p := range eventPipeline.ProcessorConfigs {
|
||||
processor, err := models.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
logger.Warningf("event_pipeline_id: %d, event:%+v, processor:%+v get processor err: %+v", eventPipeline.ID, eventPipeline, p, err)
|
||||
continue
|
||||
}
|
||||
|
||||
eventPipeline.Processors = append(eventPipeline.Processors, processor)
|
||||
}
|
||||
|
||||
m[lst[i].ID] = eventPipeline
|
||||
m[lst[i].ID] = lst[i]
|
||||
}
|
||||
|
||||
epc.Set(m, stat.Total, stat.LastUpdated)
|
||||
|
||||
@@ -13,6 +13,10 @@ import (
|
||||
type EventPipeline struct {
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name" gorm:"type:varchar(128)"`
|
||||
Typ string `json:"typ" gorm:"type:varchar(128)"` // builtin, user-defined // event_pipeline, event_summary, metric_explorer
|
||||
UseCase string `json:"use_case" gorm:"type:varchar(128)"` // metric_explorer, event_summary, event_pipeline
|
||||
TriggerMode string `json:"trigger_mode" gorm:"type:varchar(128)"` // event, api, cron
|
||||
Disabled bool `json:"disabled" gorm:"type:boolean"`
|
||||
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
|
||||
TeamNames []string `json:"team_names" gorm:"-"`
|
||||
Description string `json:"description" gorm:"type:varchar(255)"`
|
||||
@@ -20,12 +24,18 @@ type EventPipeline struct {
|
||||
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
|
||||
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
|
||||
ProcessorConfigs []ProcessorConfig `json:"processors" gorm:"type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
|
||||
Processors []Processor `json:"-" gorm:"-"`
|
||||
// 工作流节点列表
|
||||
Nodes []WorkflowNode `json:"nodes,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 节点连接关系
|
||||
Connections Connections `json:"connections,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 环境变量(工作流级别的配置变量)
|
||||
EnvVariables []EnvVariable `json:"env_variables,omitempty" gorm:"type:text;serializer:json"`
|
||||
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
}
|
||||
|
||||
type ProcessorConfig struct {
|
||||
@@ -46,9 +56,6 @@ func (e *EventPipeline) Verify() error {
|
||||
return errors.New("team_ids cannot be empty")
|
||||
}
|
||||
|
||||
if len(e.TeamIds) == 0 {
|
||||
e.TeamIds = make([]int64, 0)
|
||||
}
|
||||
if len(e.LabelFilters) == 0 {
|
||||
e.LabelFilters = make([]TagFilter, 0)
|
||||
}
|
||||
@@ -59,6 +66,17 @@ func (e *EventPipeline) Verify() error {
|
||||
e.ProcessorConfigs = make([]ProcessorConfig, 0)
|
||||
}
|
||||
|
||||
// 初始化空数组,避免 null
|
||||
if e.Nodes == nil {
|
||||
e.Nodes = make([]WorkflowNode, 0)
|
||||
}
|
||||
if e.Connections == nil {
|
||||
e.Connections = make(Connections)
|
||||
}
|
||||
if e.EnvVariables == nil {
|
||||
e.EnvVariables = make([]EnvVariable, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -176,3 +194,87 @@ func EventPipelineStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
return stats[0], nil
|
||||
}
|
||||
|
||||
// 无论是新格式还是旧格式,都返回统一的 []WorkflowNode
|
||||
func (e *EventPipeline) GetWorkflowNodes() []WorkflowNode {
|
||||
// 优先使用新格式
|
||||
if len(e.Nodes) > 0 {
|
||||
return e.Nodes
|
||||
}
|
||||
|
||||
// 兼容旧格式:将 ProcessorConfigs 转换为 WorkflowNode
|
||||
nodes := make([]WorkflowNode, len(e.ProcessorConfigs))
|
||||
for i, pc := range e.ProcessorConfigs {
|
||||
nodeID := fmt.Sprintf("node_%d", i)
|
||||
nodeName := pc.Typ
|
||||
|
||||
nodes[i] = WorkflowNode{
|
||||
ID: nodeID,
|
||||
Name: nodeName,
|
||||
Type: pc.Typ,
|
||||
Config: pc.Config,
|
||||
}
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetWorkflowConnections() Connections {
|
||||
// 优先使用显式定义的连接
|
||||
if len(e.Connections) > 0 {
|
||||
return e.Connections
|
||||
}
|
||||
|
||||
// 自动生成线性连接:node_0 → node_1 → node_2 → ...
|
||||
nodes := e.GetWorkflowNodes()
|
||||
conns := make(Connections)
|
||||
|
||||
for i := 0; i < len(nodes)-1; i++ {
|
||||
conns[nodes[i].ID] = NodeConnections{
|
||||
Main: [][]ConnectionTarget{
|
||||
{{Node: nodes[i+1].ID, Type: "main", Index: 0}},
|
||||
},
|
||||
}
|
||||
}
|
||||
return conns
|
||||
}
|
||||
|
||||
func (e *EventPipeline) FillWorkflowFields() {
|
||||
if len(e.Nodes) == 0 && len(e.ProcessorConfigs) > 0 {
|
||||
e.Nodes = e.GetWorkflowNodes()
|
||||
e.Connections = e.GetWorkflowConnections()
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetEnvMap() map[string]string {
|
||||
envMap := make(map[string]string)
|
||||
for _, v := range e.EnvVariables {
|
||||
envMap[v.Key] = v.Value
|
||||
}
|
||||
return envMap
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetSecretKeys() map[string]bool {
|
||||
secretKeys := make(map[string]bool)
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Secret {
|
||||
secretKeys[v.Key] = true
|
||||
}
|
||||
}
|
||||
return secretKeys
|
||||
}
|
||||
|
||||
func (e *EventPipeline) ValidateEnvVariables(overrides map[string]string) error {
|
||||
// 合并默认值和覆盖值
|
||||
merged := e.GetEnvMap()
|
||||
for k, v := range overrides {
|
||||
merged[k] = v
|
||||
}
|
||||
|
||||
// 校验必填项
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Required && merged[v.Key] == "" {
|
||||
return fmt.Errorf("required env variable %s is missing", v.Key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
282
models/event_pipeline_execution.go
Normal file
282
models/event_pipeline_execution.go
Normal file
@@ -0,0 +1,282 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
// 执行状态常量
|
||||
const (
|
||||
ExecutionStatusRunning = "running"
|
||||
ExecutionStatusSuccess = "success"
|
||||
ExecutionStatusFailed = "failed"
|
||||
)
|
||||
|
||||
// EventPipelineExecution 工作流执行记录
|
||||
type EventPipelineExecution struct {
|
||||
ID string `json:"id" gorm:"primaryKey;type:varchar(36)"`
|
||||
PipelineID int64 `json:"pipeline_id" gorm:"index"`
|
||||
PipelineName string `json:"pipeline_name" gorm:"type:varchar(128)"`
|
||||
EventID int64 `json:"event_id" gorm:"index"`
|
||||
|
||||
// 触发模式:event(告警触发)、api(API触发)、cron(定时触发)
|
||||
Mode string `json:"mode" gorm:"type:varchar(16);index"`
|
||||
|
||||
// 状态:running、success、failed
|
||||
Status string `json:"status" gorm:"type:varchar(16);index"`
|
||||
|
||||
// 各节点执行结果(JSON)
|
||||
NodeResults string `json:"node_results" gorm:"type:mediumtext"`
|
||||
|
||||
// 错误信息
|
||||
ErrorMessage string `json:"error_message" gorm:"type:varchar(1024)"`
|
||||
ErrorNode string `json:"error_node" gorm:"type:varchar(36)"`
|
||||
|
||||
// 时间
|
||||
CreatedAt int64 `json:"created_at" gorm:"index"`
|
||||
FinishedAt int64 `json:"finished_at"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
|
||||
// 触发者信息
|
||||
TriggerBy string `json:"trigger_by" gorm:"type:varchar(64)"`
|
||||
|
||||
// 环境变量快照(脱敏后存储)
|
||||
EnvSnapshot string `json:"env_snapshot,omitempty" gorm:"type:text"`
|
||||
}
|
||||
|
||||
func (e *EventPipelineExecution) TableName() string {
|
||||
return "event_pipeline_execution"
|
||||
}
|
||||
|
||||
// SetNodeResults 设置节点执行结果(序列化为 JSON)
|
||||
func (e *EventPipelineExecution) SetNodeResults(results []*NodeExecutionResult) error {
|
||||
data, err := json.Marshal(results)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.NodeResults = string(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetNodeResults 获取节点执行结果(反序列化)
|
||||
func (e *EventPipelineExecution) GetNodeResults() ([]*NodeExecutionResult, error) {
|
||||
if e.NodeResults == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var results []*NodeExecutionResult
|
||||
err := json.Unmarshal([]byte(e.NodeResults), &results)
|
||||
return results, err
|
||||
}
|
||||
|
||||
// SetEnvSnapshot 设置环境变量快照(脱敏后存储)
|
||||
func (e *EventPipelineExecution) SetEnvSnapshot(env map[string]string) error {
|
||||
data, err := json.Marshal(env)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.EnvSnapshot = string(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEnvSnapshot 获取环境变量快照
|
||||
func (e *EventPipelineExecution) GetEnvSnapshot() (map[string]string, error) {
|
||||
if e.EnvSnapshot == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var env map[string]string
|
||||
err := json.Unmarshal([]byte(e.EnvSnapshot), &env)
|
||||
return env, err
|
||||
}
|
||||
|
||||
// CreateEventPipelineExecution 创建执行记录
|
||||
func CreateEventPipelineExecution(c *ctx.Context, execution *EventPipelineExecution) error {
|
||||
return DB(c).Create(execution).Error
|
||||
}
|
||||
|
||||
// UpdateEventPipelineExecution 更新执行记录
|
||||
func UpdateEventPipelineExecution(c *ctx.Context, execution *EventPipelineExecution) error {
|
||||
return DB(c).Save(execution).Error
|
||||
}
|
||||
|
||||
// GetEventPipelineExecution 获取单条执行记录
|
||||
func GetEventPipelineExecution(c *ctx.Context, id string) (*EventPipelineExecution, error) {
|
||||
var execution EventPipelineExecution
|
||||
err := DB(c).Where("id = ?", id).First(&execution).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &execution, nil
|
||||
}
|
||||
|
||||
// ListEventPipelineExecutions 获取 Pipeline 的执行记录列表
|
||||
func ListEventPipelineExecutions(c *ctx.Context, pipelineID int64, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
var executions []*EventPipelineExecution
|
||||
var total int64
|
||||
|
||||
session := DB(c).Model(&EventPipelineExecution{}).Where("pipeline_id = ?", pipelineID)
|
||||
|
||||
if mode != "" {
|
||||
session = session.Where("mode = ?", mode)
|
||||
}
|
||||
if status != "" {
|
||||
session = session.Where("status = ?", status)
|
||||
}
|
||||
|
||||
err := session.Count(&total).Error
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
err = session.Order("created_at desc").Limit(limit).Offset(offset).Find(&executions).Error
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return executions, total, nil
|
||||
}
|
||||
|
||||
// ListEventPipelineExecutionsByEventID 根据事件ID获取执行记录
|
||||
func ListEventPipelineExecutionsByEventID(c *ctx.Context, eventID int64) ([]*EventPipelineExecution, error) {
|
||||
var executions []*EventPipelineExecution
|
||||
err := DB(c).Where("event_id = ?", eventID).Order("created_at desc").Find(&executions).Error
|
||||
return executions, err
|
||||
}
|
||||
|
||||
// ListAllEventPipelineExecutions 获取所有 Pipeline 的执行记录列表
|
||||
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
var executions []*EventPipelineExecution
|
||||
var total int64
|
||||
|
||||
session := DB(c).Model(&EventPipelineExecution{})
|
||||
|
||||
if pipelineName != "" {
|
||||
session = session.Where("pipeline_name LIKE ?", "%"+pipelineName+"%")
|
||||
}
|
||||
if mode != "" {
|
||||
session = session.Where("mode = ?", mode)
|
||||
}
|
||||
if status != "" {
|
||||
session = session.Where("status = ?", status)
|
||||
}
|
||||
|
||||
err := session.Count(&total).Error
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
err = session.Order("created_at desc").Limit(limit).Offset(offset).Find(&executions).Error
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return executions, total, nil
|
||||
}
|
||||
|
||||
// DeleteEventPipelineExecutions 批量删除执行记录(按时间)
|
||||
func DeleteEventPipelineExecutions(c *ctx.Context, beforeTime int64) (int64, error) {
|
||||
result := DB(c).Where("created_at < ?", beforeTime).Delete(&EventPipelineExecution{})
|
||||
return result.RowsAffected, result.Error
|
||||
}
|
||||
|
||||
// DeleteEventPipelineExecutionsByPipelineID 删除指定 Pipeline 的所有执行记录
|
||||
func DeleteEventPipelineExecutionsByPipelineID(c *ctx.Context, pipelineID int64) error {
|
||||
return DB(c).Where("pipeline_id = ?", pipelineID).Delete(&EventPipelineExecution{}).Error
|
||||
}
|
||||
|
||||
// EventPipelineExecutionStatistics 执行统计
|
||||
type EventPipelineExecutionStatistics struct {
|
||||
Total int64 `json:"total"`
|
||||
Success int64 `json:"success"`
|
||||
Failed int64 `json:"failed"`
|
||||
Running int64 `json:"running"`
|
||||
AvgDurMs int64 `json:"avg_duration_ms"`
|
||||
LastRunAt int64 `json:"last_run_at"`
|
||||
}
|
||||
|
||||
// GetEventPipelineExecutionStatistics 获取执行统计信息
|
||||
func GetEventPipelineExecutionStatistics(c *ctx.Context, pipelineID int64) (*EventPipelineExecutionStatistics, error) {
|
||||
var stats EventPipelineExecutionStatistics
|
||||
|
||||
// 总数
|
||||
err := DB(c).Model(&EventPipelineExecution{}).Where("pipeline_id = ?", pipelineID).Count(&stats.Total).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 成功数
|
||||
err = DB(c).Model(&EventPipelineExecution{}).Where("pipeline_id = ? AND status = ?", pipelineID, ExecutionStatusSuccess).Count(&stats.Success).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 失败数
|
||||
err = DB(c).Model(&EventPipelineExecution{}).Where("pipeline_id = ? AND status = ?", pipelineID, ExecutionStatusFailed).Count(&stats.Failed).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 运行中
|
||||
err = DB(c).Model(&EventPipelineExecution{}).Where("pipeline_id = ? AND status = ?", pipelineID, ExecutionStatusRunning).Count(&stats.Running).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 平均耗时
|
||||
var avgDur struct {
|
||||
AvgDur float64 `gorm:"column:avg_dur"`
|
||||
}
|
||||
err = DB(c).Model(&EventPipelineExecution{}).
|
||||
Select("AVG(duration_ms) as avg_dur").
|
||||
Where("pipeline_id = ? AND status = ?", pipelineID, ExecutionStatusSuccess).
|
||||
Scan(&avgDur).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stats.AvgDurMs = int64(avgDur.AvgDur)
|
||||
|
||||
// 最后执行时间
|
||||
var lastExec EventPipelineExecution
|
||||
err = DB(c).Where("pipeline_id = ?", pipelineID).Order("created_at desc").First(&lastExec).Error
|
||||
if err == nil {
|
||||
stats.LastRunAt = lastExec.CreatedAt
|
||||
}
|
||||
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
// EventPipelineExecutionDetail 执行详情(包含解析后的节点结果)
|
||||
type EventPipelineExecutionDetail struct {
|
||||
EventPipelineExecution
|
||||
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
|
||||
EnvSnapshotParsed map[string]string `json:"env_snapshot_parsed"`
|
||||
}
|
||||
|
||||
// GetEventPipelineExecutionDetail 获取执行详情
|
||||
func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineExecutionDetail, error) {
|
||||
execution, err := GetEventPipelineExecution(c, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
detail := &EventPipelineExecutionDetail{
|
||||
EventPipelineExecution: *execution,
|
||||
}
|
||||
|
||||
// 解析节点结果
|
||||
nodeResults, err := execution.GetNodeResults()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse node results error: %w", err)
|
||||
}
|
||||
detail.NodeResultsParsed = nodeResults
|
||||
|
||||
// 解析环境变量快照
|
||||
envSnapshot, err := execution.GetEnvSnapshot()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse env snapshot error: %w", err)
|
||||
}
|
||||
detail.EnvSnapshotParsed = envSnapshot
|
||||
|
||||
return detail, nil
|
||||
}
|
||||
@@ -9,11 +9,21 @@ import (
|
||||
|
||||
type Processor interface {
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *AlertCurEvent) (*AlertCurEvent, string, error)
|
||||
Process(ctx *ctx.Context, wfCtx *WorkflowContext) (*WorkflowContext, string, error)
|
||||
// 处理器有三种情况:
|
||||
// 1. 处理成功,返回处理后的事件
|
||||
// 2. 处理成功,不需要返回处理后端事件,只返回处理结果,将处理结果放到 string 中,比如 eventdrop callback 处理器
|
||||
// 1. 处理成功,返回处理后的 WorkflowContext
|
||||
// 2. 处理成功,不需要返回处理后的上下文,只返回处理结果,将处理结果放到 string 中,比如 eventdrop callback 处理器
|
||||
// 3. 处理失败,返回错误,将错误放到 error 中
|
||||
// WorkflowContext 包含:Event(事件)、Env(环境变量/输入参数)、Metadata(执行元数据)
|
||||
}
|
||||
|
||||
// BranchProcessor 分支处理器接口
|
||||
// 用于 if、switch、foreach 等需要返回分支索引或特殊输出的处理器
|
||||
type BranchProcessor interface {
|
||||
Processor
|
||||
// ProcessWithBranch 处理事件并返回 NodeOutput
|
||||
// NodeOutput 包含:处理后的上下文、消息、是否终止、分支索引
|
||||
ProcessWithBranch(ctx *ctx.Context, wfCtx *WorkflowContext) (*NodeOutput, error)
|
||||
}
|
||||
|
||||
type NewProcessorFn func(settings interface{}) (Processor, error)
|
||||
|
||||
@@ -68,7 +68,7 @@ func MigrateTables(db *gorm.DB) error {
|
||||
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
|
||||
&models.MetricFilter{}, &models.NotificationRecord{}, &models.TargetBusiGroup{},
|
||||
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
|
||||
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{},
|
||||
&models.EventPipeline{}, &models.EventPipelineExecution{}, &models.EmbeddedProduct{}, &models.SourceToken{},
|
||||
&models.SavedView{}, &models.UserViewFavorite{}}
|
||||
|
||||
if isPostgres(db) {
|
||||
|
||||
160
models/workflow.go
Normal file
160
models/workflow.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package models
|
||||
|
||||
// WorkflowNode 工作流节点
|
||||
type WorkflowNode struct {
|
||||
ID string `json:"id"` // 节点唯一ID
|
||||
Name string `json:"name"` // 显示名称
|
||||
Type string `json:"type"` // 节点类型(对应 Processor typ)
|
||||
Position []float64 `json:"position,omitempty"` // [x, y] UI位置
|
||||
Config interface{} `json:"config"` // 节点配置
|
||||
|
||||
// 执行控制
|
||||
Disabled bool `json:"disabled,omitempty"`
|
||||
ContinueOnFail bool `json:"continue_on_fail,omitempty"`
|
||||
RetryOnFail bool `json:"retry_on_fail,omitempty"`
|
||||
MaxRetries int `json:"max_retries,omitempty"`
|
||||
RetryInterval int `json:"retry_interval,omitempty"` // 秒
|
||||
}
|
||||
|
||||
// Connections 节点连接关系 map[源节点ID]NodeConnections
|
||||
type Connections map[string]NodeConnections
|
||||
|
||||
// NodeConnections 单个节点的输出连接
|
||||
type NodeConnections struct {
|
||||
// Main 输出端口的连接
|
||||
// Main[outputIndex] = []ConnectionTarget
|
||||
Main [][]ConnectionTarget `json:"main"`
|
||||
}
|
||||
|
||||
// ConnectionTarget 连接目标
|
||||
type ConnectionTarget struct {
|
||||
Node string `json:"node"` // 目标节点ID
|
||||
Type string `json:"type"` // 输入类型,通常是 "main"
|
||||
Index int `json:"index"` // 目标节点的输入端口索引
|
||||
}
|
||||
|
||||
// EnvVariable 环境变量
|
||||
type EnvVariable struct {
|
||||
Key string `json:"key"` // 变量名
|
||||
Value string `json:"value"` // 默认值
|
||||
Description string `json:"description,omitempty"` // 描述
|
||||
Secret bool `json:"secret,omitempty"` // 是否敏感(日志脱敏)
|
||||
Required bool `json:"required,omitempty"` // 是否必填
|
||||
}
|
||||
|
||||
// NodeOutput 节点执行输出
|
||||
type NodeOutput struct {
|
||||
WfCtx *WorkflowContext `json:"wf_ctx"` // 处理后的工作流上下文
|
||||
Message string `json:"message"` // 处理消息
|
||||
Terminate bool `json:"terminate"` // 是否终止流程
|
||||
BranchIndex *int `json:"branch_index,omitempty"` // 分支索引(条件节点使用)
|
||||
|
||||
// 流式输出支持
|
||||
Stream bool `json:"stream,omitempty"` // 是否流式输出
|
||||
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
|
||||
}
|
||||
|
||||
// WorkflowResult 工作流执行结果
|
||||
type WorkflowResult struct {
|
||||
Event *AlertCurEvent `json:"event"` // 最终事件
|
||||
Status string `json:"status"` // success, failed, streaming
|
||||
Message string `json:"message"` // 汇总消息
|
||||
NodeResults []*NodeExecutionResult `json:"node_results"` // 各节点执行结果
|
||||
ErrorNode string `json:"error_node,omitempty"`
|
||||
|
||||
// 流式输出支持
|
||||
Stream bool `json:"stream,omitempty"` // 是否流式输出
|
||||
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
|
||||
}
|
||||
|
||||
// NodeExecutionResult 节点执行结果
|
||||
type NodeExecutionResult struct {
|
||||
NodeID string `json:"node_id"`
|
||||
NodeName string `json:"node_name"`
|
||||
NodeType string `json:"node_type"`
|
||||
Status string `json:"status"` // success, failed, skipped
|
||||
Message string `json:"message"`
|
||||
StartedAt int64 `json:"started_at"`
|
||||
FinishedAt int64 `json:"finished_at"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
Error string `json:"error,omitempty"`
|
||||
BranchIndex *int `json:"branch_index,omitempty"` // 条件节点的分支选择
|
||||
}
|
||||
|
||||
// 触发模式常量
|
||||
const (
|
||||
TriggerModeEvent = "event" // 告警事件触发
|
||||
TriggerModeAPI = "api" // API 触发
|
||||
TriggerModeCron = "cron" // 定时触发(后续支持)
|
||||
)
|
||||
|
||||
// WorkflowTriggerContext 工作流触发上下文
|
||||
type WorkflowTriggerContext struct {
|
||||
// 触发模式
|
||||
Mode string `json:"mode"`
|
||||
|
||||
// 触发者
|
||||
TriggerBy string `json:"trigger_by"`
|
||||
|
||||
// 请求ID(API/Cron 触发使用)
|
||||
RequestID string `json:"request_id"`
|
||||
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides"`
|
||||
|
||||
// 流式输出(API 调用时动态指定)
|
||||
Stream bool `json:"stream"`
|
||||
|
||||
// Cron 相关(后续使用)
|
||||
CronJobID string `json:"cron_job_id,omitempty"`
|
||||
CronExpr string `json:"cron_expr,omitempty"`
|
||||
ScheduledAt int64 `json:"scheduled_at,omitempty"`
|
||||
}
|
||||
|
||||
type WorkflowContext struct {
|
||||
Event *AlertCurEvent `json:"event"` // 当前事件
|
||||
Env map[string]string `json:"env"` // 环境变量/配置(静态,来自 Pipeline 配置)
|
||||
Vars map[string]interface{} `json:"vars"` // 节点间传递的数据(动态,运行时产生)
|
||||
Metadata map[string]string `json:"metadata"` // 执行元数据(request_id、start_time 等)
|
||||
Output map[string]interface{} `json:"output,omitempty"` // 输出结果(非告警场景使用)
|
||||
|
||||
// 流式输出支持
|
||||
Stream bool `json:"-"` // 是否启用流式输出(不序列化)
|
||||
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
|
||||
}
|
||||
|
||||
// SanitizedEnv 返回脱敏后的环境变量(用于日志和存储)
|
||||
func (ctx *WorkflowContext) SanitizedEnv(secretKeys map[string]bool) map[string]string {
|
||||
sanitized := make(map[string]string)
|
||||
for k, v := range ctx.Env {
|
||||
if secretKeys[k] {
|
||||
sanitized[k] = "******"
|
||||
} else {
|
||||
sanitized[k] = v
|
||||
}
|
||||
}
|
||||
return sanitized
|
||||
}
|
||||
|
||||
// StreamChunk 类型常量
|
||||
const (
|
||||
StreamTypeThinking = "thinking" // AI 思考过程(ReAct Thought)
|
||||
StreamTypeToolCall = "tool_call" // 工具调用
|
||||
StreamTypeToolResult = "tool_result" // 工具执行结果
|
||||
StreamTypeText = "text" // LLM 文本输出
|
||||
StreamTypeDone = "done" // 完成
|
||||
StreamTypeError = "error" // 错误
|
||||
)
|
||||
|
||||
// StreamChunk 流式数据块
|
||||
type StreamChunk struct {
|
||||
Type string `json:"type"` // thinking / tool_call / tool_result / text / done / error
|
||||
Content string `json:"content"` // 完整内容(累积)
|
||||
Delta string `json:"delta,omitempty"` // 增量内容
|
||||
NodeID string `json:"node_id,omitempty"` // 当前节点 ID
|
||||
RequestID string `json:"request_id,omitempty"` // 请求追踪 ID
|
||||
Metadata interface{} `json:"metadata,omitempty"` // 额外元数据(如工具调用参数)
|
||||
Done bool `json:"done"` // 是否结束
|
||||
Error string `json:"error,omitempty"` // 错误信息
|
||||
Timestamp int64 `json:"timestamp"` // 时间戳(毫秒)
|
||||
}
|
||||
Reference in New Issue
Block a user