Compare commits

...

7 Commits

Author SHA1 Message Date
Yening Qin
37b0bbe3d3 Merge branch 'main' into change-workflow 2026-01-22 19:56:14 +08:00
ning
40722d2ff3 update workflow status 2026-01-16 19:56:24 +08:00
ning
ec7fbf313b update workflow 2026-01-16 15:29:46 +08:00
ning
b3ee1e56ad update callback 2026-01-16 15:15:48 +08:00
ning
d908240912 update workflow 2026-01-16 14:21:50 +08:00
ning
d54bcdd722 update workflow 2026-01-16 14:04:14 +08:00
ning
806b3effe9 update workflow 2026-01-14 19:23:15 +08:00
14 changed files with 125 additions and 122 deletions

View File

@@ -253,7 +253,7 @@ func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, e
// 统一使用工作流引擎执行(兼容线性模式和工作流模式)
triggerCtx := &models.WorkflowTriggerContext{
Mode: models.TriggerModeEvent,
TriggerBy: from,
TriggerBy: from + "_" + strconv.FormatInt(id, 10),
}
resultEvent, result, err := workflowEngine.Execute(eventPipeline, event, triggerCtx)

View File

@@ -60,11 +60,11 @@ func (e *WorkflowEngine) Execute(pipeline *models.EventPipeline, event *models.A
}
func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, event *models.AlertCurEvent, triggerCtx *models.WorkflowTriggerContext) *models.WorkflowContext {
// 合并环境变量
env := pipeline.GetEnvMap()
if triggerCtx != nil && triggerCtx.EnvOverrides != nil {
for k, v := range triggerCtx.EnvOverrides {
env[k] = v
// 合并输入参数
inputs := pipeline.GetInputsMap()
if triggerCtx != nil && triggerCtx.InputsOverrides != nil {
for k, v := range triggerCtx.InputsOverrides {
inputs[k] = v
}
}
@@ -84,7 +84,7 @@ func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, eve
return &models.WorkflowContext{
Event: event,
Env: env,
Inputs: inputs,
Vars: make(map[string]interface{}), // 初始化空的 Vars供节点间传递数据
Metadata: metadata,
Stream: stream,
@@ -182,7 +182,6 @@ func (e *WorkflowEngine) executeDAG(nodeMap map[string]*models.WorkflowNode, con
result.Status = models.ExecutionStatusFailed
result.ErrorNode = nodeID
result.Message = fmt.Sprintf("node %s failed: %s", node.Name, nodeResult.Error)
return result
}
}
@@ -371,10 +370,8 @@ func (e *WorkflowEngine) saveExecutionRecord(pipeline *models.EventPipeline, wfC
logger.Errorf("workflow: failed to set node results: pipeline_id=%d, error=%v", pipeline.ID, err)
}
secretKeys := pipeline.GetSecretKeys()
sanitizedEnv := wfCtx.SanitizedEnv(secretKeys)
if err := execution.SetEnvSnapshot(sanitizedEnv); err != nil {
logger.Errorf("workflow: failed to set env snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
if err := execution.SetInputsSnapshot(wfCtx.Inputs); err != nil {
logger.Errorf("workflow: failed to set inputs snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
}
if err := models.CreateEventPipelineExecution(e.ctx, execution); err != nil {

View File

@@ -114,7 +114,7 @@ func (c *AISummaryConfig) initHTTPClient() error {
func (c *AISummaryConfig) prepareEventInfo(wfCtx *models.WorkflowContext) (string, error) {
var defs = []string{
"{{$event := .Event}}",
"{{$env := .Env}}",
"{{$inputs := .Inputs}}",
}
text := strings.Join(append(defs, c.PromptTemplate), "")

View File

@@ -44,8 +44,8 @@ func TestAISummaryConfig_Process(t *testing.T) {
// 创建 WorkflowContext
wfCtx := &models.WorkflowContext{
Event: event,
Env: map[string]string{},
Event: event,
Inputs: map[string]string{},
}
// 测试模板处理

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/utils"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/toolkits/pkg/logger"
@@ -71,12 +72,17 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext
headers[k] = v
}
url, err := utils.TplRender(wfCtx, c.URL)
if err != nil {
return wfCtx, "", fmt.Errorf("failed to render url template: %v processor: %v", err, c)
}
body, err := json.Marshal(event)
if err != nil {
return wfCtx, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
}
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
req, err := http.NewRequest("POST", url, strings.NewReader(string(body)))
if err != nil {
return wfCtx, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
}

View File

@@ -36,7 +36,7 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContex
"{{ $event := .Event }}",
"{{ $labels := .Event.TagsMap }}",
"{{ $value := .Event.TriggerValue }}",
"{{ $env := .Env }}",
"{{ $inputs := .Inputs }}",
}
text := strings.Join(append(defs, c.Content), "")

View File

@@ -148,7 +148,7 @@ func (c *IfConfig) evaluateExpressionCondition(wfCtx *models.WorkflowContext) (b
"{{ $event := .Event }}",
"{{ $labels := .Event.TagsMap }}",
"{{ $value := .Event.TriggerValue }}",
"{{ $env := .Env }}",
"{{ $inputs := .Inputs }}",
}
text := strings.Join(append(defs, c.Condition), "")

View File

@@ -175,7 +175,7 @@ func (c *SwitchConfig) evaluateExpressionCondition(condition string, wfCtx *mode
"{{ $event := .Event }}",
"{{ $labels := .Event.TagsMap }}",
"{{ $value := .Event.TriggerValue }}",
"{{ $env := .Env }}",
"{{ $inputs := .Inputs }}",
}
text := strings.Join(append(defs, condition), "")

View File

@@ -0,0 +1,32 @@
package utils
import (
"bytes"
"fmt"
"strings"
"text/template"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/tplx"
)
func TplRender(wfCtx *models.WorkflowContext, content string) (string, error) {
var defs = []string{
"{{ $event := .Event }}",
"{{ $labels := .Event.TagsMap }}",
"{{ $value := .Event.TriggerValue }}",
"{{ $inputs := .Inputs }}",
}
text := strings.Join(append(defs, content), "")
tpl, err := template.New("tpl").Funcs(tplx.TemplateFuncMap).Parse(text)
if err != nil {
return "", fmt.Errorf("failed to parse template: %v", err)
}
var body bytes.Buffer
if err = tpl.Execute(&body, wfCtx); err != nil {
return "", fmt.Errorf("failed to execute template: %v", err)
}
return strings.TrimSpace(body.String()), nil
}

View File

@@ -164,7 +164,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
var f struct {
EventId int64 `json:"event_id"`
PipelineConfig models.EventPipeline `json:"pipeline_config"`
EnvVariables map[string]string `json:"env_variables,omitempty"`
InputVariables map[string]string `json:"input_variables,omitempty"`
}
ginx.BindJSON(c, &f)
@@ -182,9 +182,9 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
triggerCtx := &models.WorkflowTriggerContext{
Mode: models.TriggerModeAPI,
TriggerBy: me.Username,
EnvOverrides: f.EnvVariables,
Mode: models.TriggerModeAPI,
TriggerBy: me.Username,
InputsOverrides: f.InputVariables,
}
resultEvent, result, err := workflowEngine.Execute(&f.PipelineConfig, event, triggerCtx)
@@ -302,8 +302,8 @@ func (rt *Router) eventPipelinesListByService(c *gin.Context) {
type EventPipelineRequest struct {
// 事件数据(可选,如果不传则使用空事件)
Event *models.AlertCurEvent `json:"event,omitempty"`
// 环境变量覆盖
EnvOverrides map[string]string `json:"env_overrides,omitempty"`
// 输入参数覆盖
InputsOverrides map[string]string `json:"inputs_overrides,omitempty"`
Username string `json:"username,omitempty"`
}
@@ -321,20 +321,15 @@ func (rt *Router) executePipelineTrigger(pipeline *models.EventPipeline, req *Ev
}
}
// 校验必填环境变量
if err := pipeline.ValidateEnvVariables(req.EnvOverrides); err != nil {
return "", fmt.Errorf("env validation failed: %v", err)
}
// 生成执行ID
executionID := uuid.New().String()
// 创建触发上下文
triggerCtx := &models.WorkflowTriggerContext{
Mode: models.TriggerModeAPI,
TriggerBy: triggerBy,
EnvOverrides: req.EnvOverrides,
RequestID: executionID,
Mode: models.TriggerModeAPI,
TriggerBy: triggerBy,
InputsOverrides: req.InputsOverrides,
RequestID: executionID,
}
// 异步执行工作流
@@ -401,6 +396,7 @@ func (rt *Router) triggerEventPipelineByAPI(c *gin.Context) {
}
func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
pipelineId := ginx.QueryInt64(c, "pipeline_id", 0)
pipelineName := ginx.QueryStr(c, "pipeline_name", "")
mode := ginx.QueryStr(c, "mode", "")
status := ginx.QueryStr(c, "status", "")
@@ -414,7 +410,7 @@ func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
offset = 1
}
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineName, mode, status, limit, (offset-1)*limit)
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineId, pipelineName, mode, status, limit, (offset-1)*limit)
ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
@@ -509,11 +505,11 @@ func (rt *Router) streamEventPipeline(c *gin.Context) {
}
triggerCtx := &models.WorkflowTriggerContext{
Mode: models.TriggerModeAPI,
TriggerBy: me.Username,
EnvOverrides: f.EnvOverrides,
RequestID: uuid.New().String(),
Stream: true, // 流式端点强制启用流式输出
Mode: models.TriggerModeAPI,
TriggerBy: me.Username,
InputsOverrides: f.InputsOverrides,
RequestID: uuid.New().String(),
Stream: true, // 流式端点强制启用流式输出
}
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
@@ -604,11 +600,11 @@ func (rt *Router) streamEventPipelineByService(c *gin.Context) {
}
triggerCtx := &models.WorkflowTriggerContext{
Mode: models.TriggerModeAPI,
TriggerBy: f.Username,
EnvOverrides: f.EnvOverrides,
RequestID: uuid.New().String(),
Stream: true, // 流式端点强制启用流式输出
Mode: models.TriggerModeAPI,
TriggerBy: f.Username,
InputsOverrides: f.InputsOverrides,
RequestID: uuid.New().String(),
Stream: true, // 流式端点强制启用流式输出
}
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)

View File

@@ -301,7 +301,7 @@ ALTER TABLE `event_pipeline` ADD COLUMN `trigger_mode` varchar(128) NOT NULL DEF
ALTER TABLE `event_pipeline` ADD COLUMN `disabled` tinyint(1) NOT NULL DEFAULT 0 COMMENT 'disabled flag';
ALTER TABLE `event_pipeline` ADD COLUMN `nodes` text COMMENT 'workflow nodes (JSON)';
ALTER TABLE `event_pipeline` ADD COLUMN `connections` text COMMENT 'node connections (JSON)';
ALTER TABLE `event_pipeline` ADD COLUMN `env_variables` text COMMENT 'environment variables (JSON)';
ALTER TABLE `event_pipeline` ADD COLUMN `input_variables` text COMMENT 'input variables (JSON)';
ALTER TABLE `event_pipeline` ADD COLUMN `label_filters` text COMMENT 'label filters (JSON)';
CREATE TABLE `event_pipeline_execution` (
@@ -318,7 +318,7 @@ CREATE TABLE `event_pipeline_execution` (
`finished_at` bigint DEFAULT 0 COMMENT 'finish timestamp',
`duration_ms` bigint DEFAULT 0 COMMENT 'duration in milliseconds',
`trigger_by` varchar(64) DEFAULT '' COMMENT 'trigger by',
`env_snapshot` text COMMENT 'environment variables snapshot (sanitized)',
`inputs_snapshot` text COMMENT 'inputs snapshot',
PRIMARY KEY (`id`),
KEY `idx_pipeline_id` (`pipeline_id`),
KEY `idx_event_id` (`event_id`),

View File

@@ -29,8 +29,8 @@ type EventPipeline struct {
Nodes []WorkflowNode `json:"nodes,omitempty" gorm:"type:text;serializer:json"`
// 节点连接关系
Connections Connections `json:"connections,omitempty" gorm:"type:text;serializer:json"`
// 环境变量(工作流级别的配置变量)
EnvVariables []EnvVariable `json:"env_variables,omitempty" gorm:"type:text;serializer:json"`
// 输入参数(工作流级别的配置变量)
Inputs []InputVariable `json:"inputs,omitempty" gorm:"type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
@@ -73,8 +73,8 @@ func (e *EventPipeline) Verify() error {
if e.Connections == nil {
e.Connections = make(Connections)
}
if e.EnvVariables == nil {
e.EnvVariables = make([]EnvVariable, 0)
if e.Inputs == nil {
e.Inputs = make([]InputVariable, 0)
}
return nil
@@ -245,36 +245,10 @@ func (e *EventPipeline) FillWorkflowFields() {
}
}
func (e *EventPipeline) GetEnvMap() map[string]string {
envMap := make(map[string]string)
for _, v := range e.EnvVariables {
envMap[v.Key] = v.Value
func (e *EventPipeline) GetInputsMap() map[string]string {
inputsMap := make(map[string]string)
for _, v := range e.Inputs {
inputsMap[v.Key] = v.Value
}
return envMap
}
func (e *EventPipeline) GetSecretKeys() map[string]bool {
secretKeys := make(map[string]bool)
for _, v := range e.EnvVariables {
if v.Secret {
secretKeys[v.Key] = true
}
}
return secretKeys
}
func (e *EventPipeline) ValidateEnvVariables(overrides map[string]string) error {
// 合并默认值和覆盖值
merged := e.GetEnvMap()
for k, v := range overrides {
merged[k] = v
}
// 校验必填项
for _, v := range e.EnvVariables {
if v.Required && merged[v.Key] == "" {
return fmt.Errorf("required env variable %s is missing", v.Key)
}
}
return nil
return inputsMap
}

View File

@@ -2,10 +2,13 @@ package models
import (
"encoding/json"
"errors"
"fmt"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/poster"
"gorm.io/gorm"
)
// 执行状态常量
@@ -43,8 +46,8 @@ type EventPipelineExecution struct {
// 触发者信息
TriggerBy string `json:"trigger_by" gorm:"type:varchar(64)"`
// 环境变量快照(脱敏后存储)
EnvSnapshot string `json:"env_snapshot,omitempty" gorm:"type:text"`
// 输入参数快照(脱敏后存储)
InputsSnapshot string `json:"inputs_snapshot,omitempty" gorm:"type:text"`
}
func (e *EventPipelineExecution) TableName() string {
@@ -71,24 +74,24 @@ func (e *EventPipelineExecution) GetNodeResults() ([]*NodeExecutionResult, error
return results, err
}
// SetEnvSnapshot 设置环境变量快照(脱敏后存储)
func (e *EventPipelineExecution) SetEnvSnapshot(env map[string]string) error {
data, err := json.Marshal(env)
// SetInputsSnapshot 设置输入参数快照(脱敏后存储)
func (e *EventPipelineExecution) SetInputsSnapshot(inputs map[string]string) error {
data, err := json.Marshal(inputs)
if err != nil {
return err
}
e.EnvSnapshot = string(data)
e.InputsSnapshot = string(data)
return nil
}
// GetEnvSnapshot 获取环境变量快照
func (e *EventPipelineExecution) GetEnvSnapshot() (map[string]string, error) {
if e.EnvSnapshot == "" {
// GetInputsSnapshot 获取输入参数快照
func (e *EventPipelineExecution) GetInputsSnapshot() (map[string]string, error) {
if e.InputsSnapshot == "" {
return nil, nil
}
var env map[string]string
err := json.Unmarshal([]byte(e.EnvSnapshot), &env)
return env, err
var inputs map[string]string
err := json.Unmarshal([]byte(e.InputsSnapshot), &inputs)
return inputs, err
}
// CreateEventPipelineExecution 创建执行记录
@@ -109,6 +112,9 @@ func GetEventPipelineExecution(c *ctx.Context, id string) (*EventPipelineExecuti
var execution EventPipelineExecution
err := DB(c).Where("id = ?", id).First(&execution).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &execution, nil
@@ -149,12 +155,15 @@ func ListEventPipelineExecutionsByEventID(c *ctx.Context, eventID int64) ([]*Eve
}
// ListAllEventPipelineExecutions 获取所有 Pipeline 的执行记录列表
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineId int64, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
var executions []*EventPipelineExecution
var total int64
session := DB(c).Model(&EventPipelineExecution{})
if pipelineId > 0 {
session = session.Where("pipeline_id = ?", pipelineId)
}
if pipelineName != "" {
session = session.Where("pipeline_name LIKE ?", "%"+pipelineName+"%")
}
@@ -274,8 +283,8 @@ func GetEventPipelineExecutionStatistics(c *ctx.Context, pipelineID int64) (*Eve
// EventPipelineExecutionDetail 执行详情(包含解析后的节点结果)
type EventPipelineExecutionDetail struct {
EventPipelineExecution
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
EnvSnapshotParsed map[string]string `json:"env_snapshot_parsed"`
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
InputsSnapshotParsed map[string]string `json:"inputs_snapshot_parsed"`
}
// GetEventPipelineExecutionDetail 获取执行详情
@@ -285,6 +294,10 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
return nil, err
}
if execution == nil {
return &EventPipelineExecutionDetail{}, nil
}
detail := &EventPipelineExecutionDetail{
EventPipelineExecution: *execution,
}
@@ -296,12 +309,12 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
}
detail.NodeResultsParsed = nodeResults
// 解析环境变量快照
envSnapshot, err := execution.GetEnvSnapshot()
// 解析输入参数快照
inputsSnapshot, err := execution.GetInputsSnapshot()
if err != nil {
return nil, fmt.Errorf("parse env snapshot error: %w", err)
return nil, fmt.Errorf("parse inputs snapshot error: %w", err)
}
detail.EnvSnapshotParsed = envSnapshot
detail.InputsSnapshotParsed = inputsSnapshot
return detail, nil
}

View File

@@ -33,13 +33,11 @@ type ConnectionTarget struct {
Index int `json:"index"` // 目标节点的输入端口索引
}
// EnvVariable 环境变量
type EnvVariable struct {
// InputVariable 输入参数
type InputVariable struct {
Key string `json:"key"` // 变量名
Value string `json:"value"` // 默认值
Description string `json:"description,omitempty"` // 描述
Secret bool `json:"secret,omitempty"` // 是否敏感(日志脱敏)
Required bool `json:"required,omitempty"` // 是否必填
}
// NodeOutput 节点执行输出
@@ -104,8 +102,8 @@ type WorkflowTriggerContext struct {
// 请求IDAPI/Cron 触发使用)
RequestID string `json:"request_id"`
// 环境变量覆盖
EnvOverrides map[string]string `json:"env_overrides"`
// 输入参数覆盖
InputsOverrides map[string]string `json:"inputs_overrides"`
// 流式输出API 调用时动态指定)
Stream bool `json:"stream"`
@@ -118,7 +116,7 @@ type WorkflowTriggerContext struct {
type WorkflowContext struct {
Event *AlertCurEvent `json:"event"` // 当前事件
Env map[string]string `json:"env"` // 环境变量/配置(静态,来自 Pipeline 配置)
Inputs map[string]string `json:"inputs"` // 前置输入参数(静态,用户配置)
Vars map[string]interface{} `json:"vars"` // 节点间传递的数据(动态,运行时产生)
Metadata map[string]string `json:"metadata"` // 执行元数据request_id、start_time 等)
Output map[string]interface{} `json:"output,omitempty"` // 输出结果(非告警场景使用)
@@ -128,19 +126,6 @@ type WorkflowContext struct {
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
}
// SanitizedEnv 返回脱敏后的环境变量(用于日志和存储)
func (ctx *WorkflowContext) SanitizedEnv(secretKeys map[string]bool) map[string]string {
sanitized := make(map[string]string)
for k, v := range ctx.Env {
if secretKeys[k] {
sanitized[k] = "******"
} else {
sanitized[k] = v
}
}
return sanitized
}
// StreamChunk 类型常量
const (
StreamTypeThinking = "thinking" // AI 思考过程ReAct Thought