mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 22:48:56 +00:00
Compare commits
9 Commits
update-wor
...
update-lis
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9c7481d4f | ||
|
|
e1cc37c753 | ||
|
|
2be94f592c | ||
|
|
5babc4310a | ||
|
|
f968fcd593 | ||
|
|
4dc7035550 | ||
|
|
2a2b46ca7b | ||
|
|
ed96ab9d5b | ||
|
|
2e2bbd6aeb |
@@ -253,7 +253,7 @@ func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, e
|
||||
// 统一使用工作流引擎执行(兼容线性模式和工作流模式)
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeEvent,
|
||||
TriggerBy: from,
|
||||
TriggerBy: from + "_" + strconv.FormatInt(id, 10),
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(eventPipeline, event, triggerCtx)
|
||||
|
||||
@@ -60,11 +60,11 @@ func (e *WorkflowEngine) Execute(pipeline *models.EventPipeline, event *models.A
|
||||
}
|
||||
|
||||
func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, event *models.AlertCurEvent, triggerCtx *models.WorkflowTriggerContext) *models.WorkflowContext {
|
||||
// 合并环境变量
|
||||
env := pipeline.GetEnvMap()
|
||||
if triggerCtx != nil && triggerCtx.EnvOverrides != nil {
|
||||
for k, v := range triggerCtx.EnvOverrides {
|
||||
env[k] = v
|
||||
// 合并输入参数
|
||||
inputs := pipeline.GetInputsMap()
|
||||
if triggerCtx != nil && triggerCtx.InputsOverrides != nil {
|
||||
for k, v := range triggerCtx.InputsOverrides {
|
||||
inputs[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, eve
|
||||
|
||||
return &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: env,
|
||||
Inputs: inputs,
|
||||
Vars: make(map[string]interface{}), // 初始化空的 Vars,供节点间传递数据
|
||||
Metadata: metadata,
|
||||
Stream: stream,
|
||||
@@ -182,7 +182,6 @@ func (e *WorkflowEngine) executeDAG(nodeMap map[string]*models.WorkflowNode, con
|
||||
result.Status = models.ExecutionStatusFailed
|
||||
result.ErrorNode = nodeID
|
||||
result.Message = fmt.Sprintf("node %s failed: %s", node.Name, nodeResult.Error)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,10 +370,8 @@ func (e *WorkflowEngine) saveExecutionRecord(pipeline *models.EventPipeline, wfC
|
||||
logger.Errorf("workflow: failed to set node results: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
secretKeys := pipeline.GetSecretKeys()
|
||||
sanitizedEnv := wfCtx.SanitizedEnv(secretKeys)
|
||||
if err := execution.SetEnvSnapshot(sanitizedEnv); err != nil {
|
||||
logger.Errorf("workflow: failed to set env snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
if err := execution.SetInputsSnapshot(wfCtx.Inputs); err != nil {
|
||||
logger.Errorf("workflow: failed to set inputs snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
if err := models.CreateEventPipelineExecution(e.ctx, execution); err != nil {
|
||||
|
||||
@@ -114,7 +114,7 @@ func (c *AISummaryConfig) initHTTPClient() error {
|
||||
func (c *AISummaryConfig) prepareEventInfo(wfCtx *models.WorkflowContext) (string, error) {
|
||||
var defs = []string{
|
||||
"{{$event := .Event}}",
|
||||
"{{$env := .Env}}",
|
||||
"{{$inputs := .Inputs}}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.PromptTemplate), "")
|
||||
|
||||
@@ -44,8 +44,8 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
|
||||
// 创建 WorkflowContext
|
||||
wfCtx := &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: map[string]string{},
|
||||
Event: event,
|
||||
Inputs: map[string]string{},
|
||||
}
|
||||
|
||||
// 测试模板处理
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/utils"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -71,12 +72,17 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext
|
||||
headers[k] = v
|
||||
}
|
||||
|
||||
url, err := utils.TplRender(wfCtx, c.URL)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to render url template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
req, err := http.NewRequest("POST", url, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContex
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Content), "")
|
||||
|
||||
@@ -148,7 +148,7 @@ func (c *IfConfig) evaluateExpressionCondition(wfCtx *models.WorkflowContext) (b
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Condition), "")
|
||||
|
||||
@@ -175,7 +175,7 @@ func (c *SwitchConfig) evaluateExpressionCondition(condition string, wfCtx *mode
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, condition), "")
|
||||
|
||||
32
alert/pipeline/processor/utils/utils.go
Normal file
32
alert/pipeline/processor/utils/utils.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
)
|
||||
|
||||
func TplRender(wfCtx *models.WorkflowContext, content string) (string, error) {
|
||||
var defs = []string{
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
text := strings.Join(append(defs, content), "")
|
||||
tpl, err := template.New("tpl").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse template: %v", err)
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
if err = tpl.Execute(&body, wfCtx); err != nil {
|
||||
return "", fmt.Errorf("failed to execute template: %v", err)
|
||||
}
|
||||
|
||||
return strings.TrimSpace(body.String()), nil
|
||||
}
|
||||
@@ -118,7 +118,7 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := storage.MSet(context.Background(), s.redis, newMap)
|
||||
err := storage.MSet(context.Background(), s.redis, newMap, 7*24*time.Hour)
|
||||
if err != nil {
|
||||
cstats.RedisOperationLatency.WithLabelValues("mset_target_meta", "fail").Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
@@ -127,7 +127,7 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
}
|
||||
|
||||
if len(extendMap) > 0 {
|
||||
err = storage.MSet(context.Background(), s.redis, extendMap)
|
||||
err = storage.MSet(context.Background(), s.redis, extendMap, 7*24*time.Hour)
|
||||
if err != nil {
|
||||
cstats.RedisOperationLatency.WithLabelValues("mset_target_extend", "fail").Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
|
||||
@@ -391,8 +391,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGets)
|
||||
pages.POST("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules/add"), rt.bgrw(), rt.recordingRuleAddByFE)
|
||||
pages.DELETE("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules/del"), rt.bgrw(), rt.recordingRuleDel)
|
||||
pages.PUT("/busi-group/:id/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules/put"), rt.bgrw(), rt.recordingRulePutByFE)
|
||||
pages.GET("/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGet)
|
||||
pages.PUT("/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRulePutByFE)
|
||||
pages.PUT("/busi-group/:id/recording-rules/fields", rt.auth(), rt.user(), rt.perm("/recording-rules/put"), rt.recordingRulePutFields)
|
||||
|
||||
pages.GET("/busi-groups/alert-mutes", rt.auth(), rt.user(), rt.perm("/alert-mutes"), rt.alertMuteGetsByGids)
|
||||
|
||||
@@ -36,6 +36,7 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
|
||||
for i := 0; i < len(ars); i++ {
|
||||
ars[i].FillNotifyGroups(rt.Ctx, cache)
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, ars)
|
||||
}
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
@@ -76,7 +77,6 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
|
||||
if err == nil {
|
||||
cache := make(map[int64]*models.UserGroup)
|
||||
rids := make([]int64, 0, len(ars))
|
||||
names := make([]string, 0, len(ars))
|
||||
for i := 0; i < len(ars); i++ {
|
||||
ars[i].FillNotifyGroups(rt.Ctx, cache)
|
||||
|
||||
@@ -85,7 +85,6 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
|
||||
}
|
||||
|
||||
rids = append(rids, ars[i].Id)
|
||||
names = append(names, ars[i].UpdateBy)
|
||||
}
|
||||
|
||||
stime, etime := GetAlertCueEventTimeRange(c)
|
||||
@@ -96,14 +95,7 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
users := models.UserMapGet(rt.Ctx, "username in (?)", names)
|
||||
if users != nil {
|
||||
for i := 0; i < len(ars); i++ {
|
||||
if user, exist := users[ars[i].UpdateBy]; exist {
|
||||
ars[i].UpdateByNickname = user.Nickname
|
||||
}
|
||||
}
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, ars)
|
||||
}
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
@@ -135,6 +127,7 @@ func (rt *Router) alertRulesGetByService(c *gin.Context) {
|
||||
ars[i].DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ars[i].Cate, ars[i].DatasourceQueries)
|
||||
}
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, ars)
|
||||
}
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ func (rt *Router) alertSubscribeGets(c *gin.Context) {
|
||||
ginx.Dangerous(lst[i].FillDatasourceIds(rt.Ctx))
|
||||
ginx.Dangerous(lst[i].DB2FE())
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
@@ -66,6 +67,7 @@ func (rt *Router) alertSubscribeGetsByGids(c *gin.Context) {
|
||||
ginx.Dangerous(lst[i].FillDatasourceIds(rt.Ctx))
|
||||
ginx.Dangerous(lst[i].DB2FE())
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -260,6 +260,9 @@ func (rt *Router) boardGets(c *gin.Context) {
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
|
||||
boards, err := models.BoardGetsByGroupId(rt.Ctx, bgid, query)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, boards)
|
||||
}
|
||||
ginx.NewRender(c).Data(boards, err)
|
||||
}
|
||||
|
||||
@@ -273,6 +276,9 @@ func (rt *Router) publicBoardGets(c *gin.Context) {
|
||||
ginx.Dangerous(err)
|
||||
|
||||
boards, err := models.BoardGets(rt.Ctx, "", "public=1 and (public_cate in (?) or id in (?))", []int64{0, 1}, boardIds)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, boards)
|
||||
}
|
||||
ginx.NewRender(c).Data(boards, err)
|
||||
}
|
||||
|
||||
@@ -312,6 +318,7 @@ func (rt *Router) boardGetsByGids(c *gin.Context) {
|
||||
boards[i].Bgids = ids
|
||||
}
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, boards)
|
||||
|
||||
ginx.NewRender(c).Data(boards, err)
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@ func (rt *Router) metricFilterGets(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
models.FillUpdateByNicknames(rt.Ctx, arr)
|
||||
|
||||
ginx.NewRender(c).Data(arr, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -119,6 +119,9 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
|
||||
if len(lst) == 0 {
|
||||
lst = []models.BusiGroup{}
|
||||
}
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,9 @@ func (rt *Router) configsGet(c *gin.Context) {
|
||||
prefix := ginx.QueryStr(c, "prefix", "")
|
||||
limit := ginx.QueryInt(c, "limit", 10)
|
||||
configs, err := models.ConfigsGets(rt.Ctx, prefix, limit, ginx.Offset(c, limit))
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, configs)
|
||||
}
|
||||
ginx.NewRender(c).Data(configs, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
func (rt *Router) embeddedProductGets(c *gin.Context) {
|
||||
products, err := models.EmbeddedProductGets(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
models.FillUpdateByNicknames(rt.Ctx, products)
|
||||
// 获取当前用户可访问的Group ID 列表
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
|
||||
@@ -69,6 +69,10 @@ func (rt *Router) esIndexPatternGetList(c *gin.Context) {
|
||||
lst, err = models.EsIndexPatternGets(rt.Ctx, "")
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ func (rt *Router) eventPipelinesList(c *gin.Context) {
|
||||
// 兼容处理:自动填充工作流字段
|
||||
pipeline.FillWorkflowFields()
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, pipelines)
|
||||
|
||||
gids, err := models.MyGroupIdsMap(rt.Ctx, me.Id)
|
||||
ginx.Dangerous(err)
|
||||
@@ -164,7 +165,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
PipelineConfig models.EventPipeline `json:"pipeline_config"`
|
||||
EnvVariables map[string]string `json:"env_variables,omitempty"`
|
||||
InputVariables map[string]string `json:"input_variables,omitempty"`
|
||||
}
|
||||
|
||||
ginx.BindJSON(c, &f)
|
||||
@@ -182,9 +183,9 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvVariables,
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
InputsOverrides: f.InputVariables,
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(&f.PipelineConfig, event, triggerCtx)
|
||||
@@ -302,8 +303,8 @@ func (rt *Router) eventPipelinesListByService(c *gin.Context) {
|
||||
type EventPipelineRequest struct {
|
||||
// 事件数据(可选,如果不传则使用空事件)
|
||||
Event *models.AlertCurEvent `json:"event,omitempty"`
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides,omitempty"`
|
||||
// 输入参数覆盖
|
||||
InputsOverrides map[string]string `json:"inputs_overrides,omitempty"`
|
||||
|
||||
Username string `json:"username,omitempty"`
|
||||
}
|
||||
@@ -321,20 +322,15 @@ func (rt *Router) executePipelineTrigger(pipeline *models.EventPipeline, req *Ev
|
||||
}
|
||||
}
|
||||
|
||||
// 校验必填环境变量
|
||||
if err := pipeline.ValidateEnvVariables(req.EnvOverrides); err != nil {
|
||||
return "", fmt.Errorf("env validation failed: %v", err)
|
||||
}
|
||||
|
||||
// 生成执行ID
|
||||
executionID := uuid.New().String()
|
||||
|
||||
// 创建触发上下文
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: triggerBy,
|
||||
EnvOverrides: req.EnvOverrides,
|
||||
RequestID: executionID,
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: triggerBy,
|
||||
InputsOverrides: req.InputsOverrides,
|
||||
RequestID: executionID,
|
||||
}
|
||||
|
||||
// 异步执行工作流
|
||||
@@ -401,6 +397,7 @@ func (rt *Router) triggerEventPipelineByAPI(c *gin.Context) {
|
||||
}
|
||||
|
||||
func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
|
||||
pipelineId := ginx.QueryInt64(c, "pipeline_id", 0)
|
||||
pipelineName := ginx.QueryStr(c, "pipeline_name", "")
|
||||
mode := ginx.QueryStr(c, "mode", "")
|
||||
status := ginx.QueryStr(c, "status", "")
|
||||
@@ -414,7 +411,7 @@ func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
|
||||
offset = 1
|
||||
}
|
||||
|
||||
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineName, mode, status, limit, (offset-1)*limit)
|
||||
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineId, pipelineName, mode, status, limit, (offset-1)*limit)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
@@ -509,11 +506,11 @@ func (rt *Router) streamEventPipeline(c *gin.Context) {
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
InputsOverrides: f.InputsOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
@@ -604,11 +601,11 @@ func (rt *Router) streamEventPipelineByService(c *gin.Context) {
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: f.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: f.Username,
|
||||
InputsOverrides: f.InputsOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
|
||||
@@ -154,6 +154,7 @@ func (rt *Router) messageTemplatesGet(c *gin.Context) {
|
||||
|
||||
lst, err := models.MessageTemplatesGetBy(rt.Ctx, notifyChannelIdents)
|
||||
ginx.Dangerous(err)
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
|
||||
if me.IsAdmin() {
|
||||
ginx.NewRender(c).Data(lst, nil)
|
||||
|
||||
@@ -22,6 +22,9 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
expired := ginx.QueryInt(c, "expired", -1)
|
||||
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, -1, expired, query)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
@@ -47,6 +50,9 @@ func (rt *Router) alertMuteGetsByGids(c *gin.Context) {
|
||||
}
|
||||
|
||||
lst, err := models.AlertMuteGetsByBGIds(rt.Ctx, gids)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
@@ -58,6 +64,9 @@ func (rt *Router) alertMuteGets(c *gin.Context) {
|
||||
disabled := ginx.QueryInt(c, "disabled", -1)
|
||||
expired := ginx.QueryInt(c, "expired", -1)
|
||||
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, disabled, expired, query)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -118,6 +118,9 @@ func (rt *Router) notifyChannelGetBy(c *gin.Context) {
|
||||
|
||||
func (rt *Router) notifyChannelsGet(c *gin.Context) {
|
||||
lst, err := models.NotifyChannelsGet(rt.Ctx, "", nil)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -118,6 +118,7 @@ func (rt *Router) notifyRulesGet(c *gin.Context) {
|
||||
|
||||
lst, err := models.NotifyRulesGet(rt.Ctx, "", nil)
|
||||
ginx.Dangerous(err)
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
if me.IsAdmin() {
|
||||
ginx.NewRender(c).Data(lst, nil)
|
||||
return
|
||||
|
||||
@@ -25,11 +25,14 @@ func (rt *Router) notifyTplGets(c *gin.Context) {
|
||||
m[models.EmailSubject] = struct{}{}
|
||||
|
||||
lst, err := models.NotifyTplGets(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
if _, exists := m[lst[i].Channel]; exists {
|
||||
lst[i].BuiltIn = true
|
||||
}
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
@@ -200,6 +203,9 @@ func (rt *Router) messageTemplateGets(c *gin.Context) {
|
||||
ident := ginx.QueryStr(c, "ident", "")
|
||||
|
||||
tpls, err := models.MessageTemplateGets(rt.Ctx, id, name, ident)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, tpls)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(tpls, err)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,9 @@ import (
|
||||
func (rt *Router) recordingRuleGets(c *gin.Context) {
|
||||
busiGroupId := ginx.UrlParamInt64(c, "id")
|
||||
ars, err := models.RecordingRuleGets(rt.Ctx, busiGroupId)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, ars)
|
||||
}
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
@@ -39,6 +42,9 @@ func (rt *Router) recordingRuleGetsByGids(c *gin.Context) {
|
||||
}
|
||||
|
||||
ars, err := models.RecordingRuleGetsByBGIds(rt.Ctx, gids)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, ars)
|
||||
}
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
@@ -112,6 +118,7 @@ func (rt *Router) recordingRulePutByFE(c *gin.Context) {
|
||||
}
|
||||
|
||||
rt.bgrwCheck(c, ar.GroupId)
|
||||
rt.bgroCheck(c, f.GroupId)
|
||||
|
||||
f.UpdateBy = c.MustGet("username").(string)
|
||||
ginx.NewRender(c).Message(ar.Update(rt.Ctx, f))
|
||||
|
||||
@@ -20,6 +20,7 @@ func (rt *Router) savedViewGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
|
||||
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
|
||||
if err != nil {
|
||||
|
||||
@@ -25,6 +25,7 @@ func (rt *Router) taskTplGets(c *gin.Context) {
|
||||
|
||||
list, err := models.TaskTplGets(rt.Ctx, []int64{groupId}, query, limit, ginx.Offset(c, limit))
|
||||
ginx.Dangerous(err)
|
||||
models.FillUpdateByNicknames(rt.Ctx, list)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"total": total,
|
||||
@@ -60,6 +61,7 @@ func (rt *Router) taskTplGetsByGids(c *gin.Context) {
|
||||
|
||||
list, err := models.TaskTplGets(rt.Ctx, gids, query, limit, ginx.Offset(c, limit))
|
||||
ginx.Dangerous(err)
|
||||
models.FillUpdateByNicknames(rt.Ctx, list)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"total": total,
|
||||
|
||||
@@ -27,6 +27,9 @@ func (rt *Router) userGroupGets(c *gin.Context) {
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
lst, err := me.UserGroups(rt.Ctx, limit, query)
|
||||
if err == nil {
|
||||
models.FillUpdateByNicknames(rt.Ctx, lst)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -301,7 +301,7 @@ ALTER TABLE `event_pipeline` ADD COLUMN `trigger_mode` varchar(128) NOT NULL DEF
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `disabled` tinyint(1) NOT NULL DEFAULT 0 COMMENT 'disabled flag';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `nodes` text COMMENT 'workflow nodes (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `connections` text COMMENT 'node connections (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `env_variables` text COMMENT 'environment variables (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `input_variables` text COMMENT 'input variables (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `label_filters` text COMMENT 'label filters (JSON)';
|
||||
|
||||
CREATE TABLE `event_pipeline_execution` (
|
||||
@@ -318,7 +318,7 @@ CREATE TABLE `event_pipeline_execution` (
|
||||
`finished_at` bigint DEFAULT 0 COMMENT 'finish timestamp',
|
||||
`duration_ms` bigint DEFAULT 0 COMMENT 'duration in milliseconds',
|
||||
`trigger_by` varchar(64) DEFAULT '' COMMENT 'trigger by',
|
||||
`env_snapshot` text COMMENT 'environment variables snapshot (sanitized)',
|
||||
`inputs_snapshot` text COMMENT 'inputs snapshot',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_pipeline_id` (`pipeline_id`),
|
||||
KEY `idx_event_id` (`event_id`),
|
||||
|
||||
@@ -192,6 +192,7 @@ type AlertMute struct {
|
||||
Activated int `json:"activated" gorm:"-"` // 0: not activated, 1: activated
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
ITags []TagFilter `json:"-" gorm:"-"` // inner tags
|
||||
|
||||
@@ -45,6 +45,7 @@ type AlertSubscribe struct {
|
||||
CreateAt int64 `json:"create_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
ITags []TagFilter `json:"-" gorm:"-"` // inner tags
|
||||
BusiGroups ormx.JSONArr `json:"busi_groups"`
|
||||
IBusiGroups []TagFilter `json:"-" gorm:"-"` // inner busiGroups
|
||||
|
||||
@@ -19,22 +19,23 @@ const (
|
||||
)
|
||||
|
||||
type Board struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"`
|
||||
Name string `json:"name"`
|
||||
Ident string `json:"ident"`
|
||||
Tags string `json:"tags"`
|
||||
Note string `json:"note"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Configs string `json:"configs" gorm:"-"`
|
||||
Public int `json:"public"` // 0: false, 1: true
|
||||
PublicCate int `json:"public_cate"` // 0: anonymous, 1: login, 2: busi
|
||||
Bgids []int64 `json:"bgids" gorm:"-"`
|
||||
BuiltIn int `json:"built_in"` // 0: false, 1: true
|
||||
Hide int `json:"hide"` // 0: false, 1: true
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"`
|
||||
Name string `json:"name"`
|
||||
Ident string `json:"ident"`
|
||||
Tags string `json:"tags"`
|
||||
Note string `json:"note"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
Configs string `json:"configs" gorm:"-"`
|
||||
Public int `json:"public"` // 0: false, 1: true
|
||||
PublicCate int `json:"public_cate"` // 0: anonymous, 1: login, 2: busi
|
||||
Bgids []int64 `json:"bgids" gorm:"-"`
|
||||
BuiltIn int `json:"built_in"` // 0: false, 1: true
|
||||
Hide int `json:"hide"` // 0: false, 1: true
|
||||
}
|
||||
|
||||
func (b *Board) TableName() string {
|
||||
|
||||
@@ -9,14 +9,15 @@ import (
|
||||
)
|
||||
|
||||
type MetricFilter struct {
|
||||
ID int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement;comment:'unique identifier'"`
|
||||
Name string `json:"name" gorm:"type:varchar(191);not null;index:idx_metricfilter_name,sort:asc;comment:'name of metric filter'"`
|
||||
Configs string `json:"configs" gorm:"type:varchar(4096);not null;comment:'configuration of metric filter'"`
|
||||
GroupsPerm []GroupPerm `json:"groups_perm" gorm:"type:text;serializer:json;"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0;comment:'create time'"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(191);not null;default:'';comment:'creator'"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0;comment:'update time'"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(191);not null;default:'';comment:'updater'"`
|
||||
ID int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement;comment:'unique identifier'"`
|
||||
Name string `json:"name" gorm:"type:varchar(191);not null;index:idx_metricfilter_name,sort:asc;comment:'name of metric filter'"`
|
||||
Configs string `json:"configs" gorm:"type:varchar(4096);not null;comment:'configuration of metric filter'"`
|
||||
GroupsPerm []GroupPerm `json:"groups_perm" gorm:"type:text;serializer:json;"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0;comment:'create time'"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(191);not null;default:'';comment:'creator'"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0;comment:'update time'"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(191);not null;default:'';comment:'updater'"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
type GroupPerm struct {
|
||||
|
||||
@@ -12,16 +12,17 @@ import (
|
||||
)
|
||||
|
||||
type BusiGroup struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name"`
|
||||
LabelEnable int `json:"label_enable"`
|
||||
LabelValue string `json:"label_value"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UserGroups []UserGroupWithPermFlag `json:"user_groups" gorm:"-"`
|
||||
DB *gorm.DB `json:"-" gorm:"-"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name"`
|
||||
LabelEnable int `json:"label_enable"`
|
||||
LabelValue string `json:"label_value"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
UserGroups []UserGroupWithPermFlag `json:"user_groups" gorm:"-"`
|
||||
DB *gorm.DB `json:"-" gorm:"-"`
|
||||
}
|
||||
|
||||
func New(db *gorm.DB) *BusiGroup {
|
||||
|
||||
@@ -20,16 +20,17 @@ import (
|
||||
)
|
||||
|
||||
type Configs struct { //ckey+external
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Ckey string `json:"ckey"` // Before inserting external configs, check if they are already defined as built-in configs.
|
||||
Cval string `json:"cval"`
|
||||
Note string `json:"note"`
|
||||
External int `json:"external"` //Controls frontend list display: 0 hides built-in (default), 1 shows external
|
||||
Encrypted int `json:"encrypted"` //Indicates whether the value(cval) is encrypted (1 for ciphertext, 0 for plaintext(default))
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Ckey string `json:"ckey"` // Before inserting external configs, check if they are already defined as built-in configs.
|
||||
Cval string `json:"cval"`
|
||||
Note string `json:"note"`
|
||||
External int `json:"external"` //Controls frontend list display: 0 hides built-in (default), 1 shows external
|
||||
Encrypted int `json:"encrypted"` //Indicates whether the value(cval) is encrypted (1 for ciphertext, 0 for plaintext(default))
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (Configs) TableName() string {
|
||||
|
||||
@@ -7,19 +7,20 @@ import (
|
||||
)
|
||||
|
||||
type DashAnnotation struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
DashboardId int64 `json:"dashboard_id"`
|
||||
PanelId string `json:"panel_id"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
Description string `json:"description"`
|
||||
Config string `json:"config"`
|
||||
TimeStart int64 `json:"time_start"`
|
||||
TimeEnd int64 `json:"time_end"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
DashboardId int64 `json:"dashboard_id"`
|
||||
PanelId string `json:"panel_id"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
Description string `json:"description"`
|
||||
Config string `json:"config"`
|
||||
TimeStart int64 `json:"time_start"`
|
||||
TimeEnd int64 `json:"time_end"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (da *DashAnnotation) TableName() string {
|
||||
|
||||
@@ -517,7 +517,8 @@ func (ds *Datasource) Encrypt(openRsa bool, publicKeyData []byte) error {
|
||||
// Decrypt 用于 edge 将从中心同步的数据源解密,中心不可调用
|
||||
func (ds *Datasource) Decrypt() error {
|
||||
if rsaConfig == nil {
|
||||
return errors.New("rsa config is nil")
|
||||
logger.Debugf("datasource %s rsa config is nil", ds.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !rsaConfig.OpenRSA {
|
||||
|
||||
@@ -14,15 +14,16 @@ import (
|
||||
)
|
||||
|
||||
type EmbeddedProduct struct {
|
||||
ID int64 `json:"id" gorm:"primaryKey"` // 主键
|
||||
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
|
||||
URL string `json:"url" gorm:"column:url;type:varchar(255)"`
|
||||
IsPrivate bool `json:"is_private" gorm:"column:is_private;type:boolean"`
|
||||
TeamIDs []int64 `json:"team_ids" gorm:"serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"column:create_at;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"column:create_by;type:varchar(64);not null;default:''"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"column:update_at;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(64);not null;default:''"`
|
||||
ID int64 `json:"id" gorm:"primaryKey"` // 主键
|
||||
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
|
||||
URL string `json:"url" gorm:"column:url;type:varchar(255)"`
|
||||
IsPrivate bool `json:"is_private" gorm:"column:is_private;type:boolean"`
|
||||
TeamIDs []int64 `json:"team_ids" gorm:"serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"column:create_at;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"column:create_by;type:varchar(64);not null;default:''"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"column:update_at;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(64);not null;default:''"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (e *EmbeddedProduct) TableName() string {
|
||||
|
||||
@@ -24,6 +24,7 @@ type EsIndexPattern struct {
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
CrossClusterEnabled int `json:"cross_cluster_enabled"`
|
||||
Note string `json:"note"`
|
||||
}
|
||||
|
||||
@@ -29,13 +29,14 @@ type EventPipeline struct {
|
||||
Nodes []WorkflowNode `json:"nodes,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 节点连接关系
|
||||
Connections Connections `json:"connections,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 环境变量(工作流级别的配置变量)
|
||||
EnvVariables []EnvVariable `json:"env_variables,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 输入参数(工作流级别的配置变量)
|
||||
Inputs []InputVariable `json:"inputs,omitempty" gorm:"type:text;serializer:json"`
|
||||
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
type ProcessorConfig struct {
|
||||
@@ -73,8 +74,8 @@ func (e *EventPipeline) Verify() error {
|
||||
if e.Connections == nil {
|
||||
e.Connections = make(Connections)
|
||||
}
|
||||
if e.EnvVariables == nil {
|
||||
e.EnvVariables = make([]EnvVariable, 0)
|
||||
if e.Inputs == nil {
|
||||
e.Inputs = make([]InputVariable, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -245,36 +246,10 @@ func (e *EventPipeline) FillWorkflowFields() {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetEnvMap() map[string]string {
|
||||
envMap := make(map[string]string)
|
||||
for _, v := range e.EnvVariables {
|
||||
envMap[v.Key] = v.Value
|
||||
func (e *EventPipeline) GetInputsMap() map[string]string {
|
||||
inputsMap := make(map[string]string)
|
||||
for _, v := range e.Inputs {
|
||||
inputsMap[v.Key] = v.Value
|
||||
}
|
||||
return envMap
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetSecretKeys() map[string]bool {
|
||||
secretKeys := make(map[string]bool)
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Secret {
|
||||
secretKeys[v.Key] = true
|
||||
}
|
||||
}
|
||||
return secretKeys
|
||||
}
|
||||
|
||||
func (e *EventPipeline) ValidateEnvVariables(overrides map[string]string) error {
|
||||
// 合并默认值和覆盖值
|
||||
merged := e.GetEnvMap()
|
||||
for k, v := range overrides {
|
||||
merged[k] = v
|
||||
}
|
||||
|
||||
// 校验必填项
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Required && merged[v.Key] == "" {
|
||||
return fmt.Errorf("required env variable %s is missing", v.Key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return inputsMap
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// 执行状态常量
|
||||
@@ -43,8 +46,8 @@ type EventPipelineExecution struct {
|
||||
// 触发者信息
|
||||
TriggerBy string `json:"trigger_by" gorm:"type:varchar(64)"`
|
||||
|
||||
// 环境变量快照(脱敏后存储)
|
||||
EnvSnapshot string `json:"env_snapshot,omitempty" gorm:"type:text"`
|
||||
// 输入参数快照(脱敏后存储)
|
||||
InputsSnapshot string `json:"inputs_snapshot,omitempty" gorm:"type:text"`
|
||||
}
|
||||
|
||||
func (e *EventPipelineExecution) TableName() string {
|
||||
@@ -71,24 +74,24 @@ func (e *EventPipelineExecution) GetNodeResults() ([]*NodeExecutionResult, error
|
||||
return results, err
|
||||
}
|
||||
|
||||
// SetEnvSnapshot 设置环境变量快照(脱敏后存储)
|
||||
func (e *EventPipelineExecution) SetEnvSnapshot(env map[string]string) error {
|
||||
data, err := json.Marshal(env)
|
||||
// SetInputsSnapshot 设置输入参数快照(脱敏后存储)
|
||||
func (e *EventPipelineExecution) SetInputsSnapshot(inputs map[string]string) error {
|
||||
data, err := json.Marshal(inputs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.EnvSnapshot = string(data)
|
||||
e.InputsSnapshot = string(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEnvSnapshot 获取环境变量快照
|
||||
func (e *EventPipelineExecution) GetEnvSnapshot() (map[string]string, error) {
|
||||
if e.EnvSnapshot == "" {
|
||||
// GetInputsSnapshot 获取输入参数快照
|
||||
func (e *EventPipelineExecution) GetInputsSnapshot() (map[string]string, error) {
|
||||
if e.InputsSnapshot == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var env map[string]string
|
||||
err := json.Unmarshal([]byte(e.EnvSnapshot), &env)
|
||||
return env, err
|
||||
var inputs map[string]string
|
||||
err := json.Unmarshal([]byte(e.InputsSnapshot), &inputs)
|
||||
return inputs, err
|
||||
}
|
||||
|
||||
// CreateEventPipelineExecution 创建执行记录
|
||||
@@ -109,6 +112,9 @@ func GetEventPipelineExecution(c *ctx.Context, id string) (*EventPipelineExecuti
|
||||
var execution EventPipelineExecution
|
||||
err := DB(c).Where("id = ?", id).First(&execution).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &execution, nil
|
||||
@@ -149,12 +155,15 @@ func ListEventPipelineExecutionsByEventID(c *ctx.Context, eventID int64) ([]*Eve
|
||||
}
|
||||
|
||||
// ListAllEventPipelineExecutions 获取所有 Pipeline 的执行记录列表
|
||||
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineId int64, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
var executions []*EventPipelineExecution
|
||||
var total int64
|
||||
|
||||
session := DB(c).Model(&EventPipelineExecution{})
|
||||
|
||||
if pipelineId > 0 {
|
||||
session = session.Where("pipeline_id = ?", pipelineId)
|
||||
}
|
||||
if pipelineName != "" {
|
||||
session = session.Where("pipeline_name LIKE ?", "%"+pipelineName+"%")
|
||||
}
|
||||
@@ -274,8 +283,8 @@ func GetEventPipelineExecutionStatistics(c *ctx.Context, pipelineID int64) (*Eve
|
||||
// EventPipelineExecutionDetail 执行详情(包含解析后的节点结果)
|
||||
type EventPipelineExecutionDetail struct {
|
||||
EventPipelineExecution
|
||||
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
|
||||
EnvSnapshotParsed map[string]string `json:"env_snapshot_parsed"`
|
||||
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
|
||||
InputsSnapshotParsed map[string]string `json:"inputs_snapshot_parsed"`
|
||||
}
|
||||
|
||||
// GetEventPipelineExecutionDetail 获取执行详情
|
||||
@@ -285,6 +294,10 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if execution == nil {
|
||||
return &EventPipelineExecutionDetail{}, nil
|
||||
}
|
||||
|
||||
detail := &EventPipelineExecutionDetail{
|
||||
EventPipelineExecution: *execution,
|
||||
}
|
||||
@@ -296,12 +309,12 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
|
||||
}
|
||||
detail.NodeResultsParsed = nodeResults
|
||||
|
||||
// 解析环境变量快照
|
||||
envSnapshot, err := execution.GetEnvSnapshot()
|
||||
// 解析输入参数快照
|
||||
inputsSnapshot, err := execution.GetInputsSnapshot()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse env snapshot error: %w", err)
|
||||
return nil, fmt.Errorf("parse inputs snapshot error: %w", err)
|
||||
}
|
||||
detail.EnvSnapshotParsed = envSnapshot
|
||||
detail.InputsSnapshotParsed = inputsSnapshot
|
||||
|
||||
return detail, nil
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ type MessageTemplate struct {
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func MessageTemplateStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
@@ -57,11 +57,12 @@ type NotifyChannelConfig struct {
|
||||
RequestType string `json:"request_type"` // http, stmp, script, flashduty
|
||||
RequestConfig *RequestConfig `json:"request_config,omitempty" gorm:"serializer:json"`
|
||||
|
||||
Weight int `json:"weight"` // 权重,根据此字段对内置模板进行排序
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Weight int `json:"weight"` // 权重,根据此字段对内置模板进行排序
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) TableName() string {
|
||||
|
||||
@@ -24,10 +24,11 @@ type NotifyRule struct {
|
||||
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
|
||||
ExtraConfig interface{} `json:"extra_config,omitempty" gorm:"serializer:json"`
|
||||
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
type PipelineConfig struct {
|
||||
|
||||
@@ -17,15 +17,16 @@ import (
|
||||
)
|
||||
|
||||
type NotifyTpl struct {
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Channel string `json:"channel"`
|
||||
Content string `json:"content"`
|
||||
BuiltIn bool `json:"built_in" gorm:"-"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Channel string `json:"channel"`
|
||||
Content string `json:"content"`
|
||||
BuiltIn bool `json:"built_in" gorm:"-"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (n *NotifyTpl) TableName() string {
|
||||
|
||||
@@ -36,6 +36,7 @@ type RecordingRule struct {
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
type QueryConfig struct {
|
||||
@@ -212,7 +213,6 @@ func (re *RecordingRule) Update(ctx *ctx.Context, ref RecordingRule) error {
|
||||
|
||||
ref.FE2DB()
|
||||
ref.Id = re.Id
|
||||
ref.GroupId = re.GroupId
|
||||
ref.CreateAt = re.CreateAt
|
||||
ref.CreateBy = re.CreateBy
|
||||
ref.UpdateAt = time.Now().Unix()
|
||||
|
||||
@@ -16,16 +16,17 @@ var (
|
||||
)
|
||||
|
||||
type SavedView struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
Name string `json:"name" gorm:"type:varchar(255);not null"`
|
||||
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
|
||||
Filter string `json:"filter" gorm:"type:text"`
|
||||
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
|
||||
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
Name string `json:"name" gorm:"type:varchar(255);not null"`
|
||||
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
|
||||
Filter string `json:"filter" gorm:"type:text"`
|
||||
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
|
||||
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
|
||||
// 查询时填充的字段
|
||||
IsFavorite bool `json:"is_favorite" gorm:"-"`
|
||||
|
||||
@@ -15,22 +15,23 @@ import (
|
||||
)
|
||||
|
||||
type TaskTpl struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"`
|
||||
Title string `json:"title"`
|
||||
Batch int `json:"batch"`
|
||||
Tolerance int `json:"tolerance"`
|
||||
Timeout int `json:"timeout"`
|
||||
Pause string `json:"pause"`
|
||||
Script string `json:"script"`
|
||||
Args string `json:"args"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
Account string `json:"account"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"`
|
||||
Title string `json:"title"`
|
||||
Batch int `json:"batch"`
|
||||
Tolerance int `json:"tolerance"`
|
||||
Timeout int `json:"timeout"`
|
||||
Pause string `json:"pause"`
|
||||
Script string `json:"script"`
|
||||
Args string `json:"args"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
Account string `json:"account"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
}
|
||||
|
||||
func (t *TaskTpl) TableName() string {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
@@ -410,6 +411,80 @@ func UserMapGet(ctx *ctx.Context, where string, args ...interface{}) map[string]
|
||||
return um
|
||||
}
|
||||
|
||||
// UserNicknameMap returns a deduplicated username -> nickname map.
|
||||
func UserNicknameMap(ctx *ctx.Context, names []string) map[string]string {
|
||||
m := make(map[string]string)
|
||||
if len(names) == 0 {
|
||||
return m
|
||||
}
|
||||
seen := make(map[string]struct{}, len(names))
|
||||
unique := make([]string, 0, len(names))
|
||||
for _, name := range names {
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[name]; ok {
|
||||
continue
|
||||
}
|
||||
seen[name] = struct{}{}
|
||||
unique = append(unique, name)
|
||||
}
|
||||
if len(unique) == 0 {
|
||||
return m
|
||||
}
|
||||
users := UserMapGet(ctx, "username in (?)", unique)
|
||||
for username, user := range users {
|
||||
m[username] = user.Nickname
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// FillUpdateByNicknames fills the UpdateByNickname field for each element in items
|
||||
// by looking up the UpdateBy username. Supports both []T and []*T slices.
|
||||
func FillUpdateByNicknames[T any](ctx *ctx.Context, items []T) {
|
||||
if len(items) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
elemType := reflect.TypeOf(items).Elem()
|
||||
isPtr := elemType.Kind() == reflect.Ptr
|
||||
if isPtr {
|
||||
elemType = elemType.Elem()
|
||||
}
|
||||
|
||||
updateByField, ok1 := elemType.FieldByName("UpdateBy")
|
||||
nicknameField, ok2 := elemType.FieldByName("UpdateByNickname")
|
||||
if !ok1 || !ok2 {
|
||||
return
|
||||
}
|
||||
|
||||
names := make([]string, 0, len(items))
|
||||
for i := range items {
|
||||
v := reflect.ValueOf(&items[i]).Elem()
|
||||
if isPtr {
|
||||
if v.IsNil() {
|
||||
continue
|
||||
}
|
||||
v = v.Elem()
|
||||
}
|
||||
names = append(names, v.FieldByIndex(updateByField.Index).String())
|
||||
}
|
||||
|
||||
nm := UserNicknameMap(ctx, names)
|
||||
|
||||
for i := range items {
|
||||
v := reflect.ValueOf(&items[i]).Elem()
|
||||
if isPtr {
|
||||
if v.IsNil() {
|
||||
continue
|
||||
}
|
||||
v = v.Elem()
|
||||
}
|
||||
updateBy := v.FieldByIndex(updateByField.Index).String()
|
||||
v.FieldByIndex(nicknameField.Index).SetString(nm[updateBy])
|
||||
}
|
||||
}
|
||||
|
||||
func UserGetByUsername(ctx *ctx.Context, username string) (*User, error) {
|
||||
return UserGet(ctx, "username=?", username)
|
||||
}
|
||||
|
||||
@@ -12,16 +12,17 @@ import (
|
||||
)
|
||||
|
||||
type UserGroup struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name"`
|
||||
Note string `json:"note"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UserIds []int64 `json:"-" gorm:"-"`
|
||||
Users []User `json:"users" gorm:"-"`
|
||||
BusiGroups []*BusiGroup `json:"busi_groups" gorm:"-"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name"`
|
||||
Note string `json:"note"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
|
||||
UserIds []int64 `json:"-" gorm:"-"`
|
||||
Users []User `json:"users" gorm:"-"`
|
||||
BusiGroups []*BusiGroup `json:"busi_groups" gorm:"-"`
|
||||
}
|
||||
|
||||
func (ug *UserGroup) TableName() string {
|
||||
|
||||
@@ -33,13 +33,11 @@ type ConnectionTarget struct {
|
||||
Index int `json:"index"` // 目标节点的输入端口索引
|
||||
}
|
||||
|
||||
// EnvVariable 环境变量
|
||||
type EnvVariable struct {
|
||||
// InputVariable 输入参数
|
||||
type InputVariable struct {
|
||||
Key string `json:"key"` // 变量名
|
||||
Value string `json:"value"` // 默认值
|
||||
Description string `json:"description,omitempty"` // 描述
|
||||
Secret bool `json:"secret,omitempty"` // 是否敏感(日志脱敏)
|
||||
Required bool `json:"required,omitempty"` // 是否必填
|
||||
}
|
||||
|
||||
// NodeOutput 节点执行输出
|
||||
@@ -104,8 +102,8 @@ type WorkflowTriggerContext struct {
|
||||
// 请求ID(API/Cron 触发使用)
|
||||
RequestID string `json:"request_id"`
|
||||
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides"`
|
||||
// 输入参数覆盖
|
||||
InputsOverrides map[string]string `json:"inputs_overrides"`
|
||||
|
||||
// 流式输出(API 调用时动态指定)
|
||||
Stream bool `json:"stream"`
|
||||
@@ -118,7 +116,7 @@ type WorkflowTriggerContext struct {
|
||||
|
||||
type WorkflowContext struct {
|
||||
Event *AlertCurEvent `json:"event"` // 当前事件
|
||||
Env map[string]string `json:"env"` // 环境变量/配置(静态,来自 Pipeline 配置)
|
||||
Inputs map[string]string `json:"inputs"` // 前置输入参数(静态,用户配置)
|
||||
Vars map[string]interface{} `json:"vars"` // 节点间传递的数据(动态,运行时产生)
|
||||
Metadata map[string]string `json:"metadata"` // 执行元数据(request_id、start_time 等)
|
||||
Output map[string]interface{} `json:"output,omitempty"` // 输出结果(非告警场景使用)
|
||||
@@ -128,19 +126,6 @@ type WorkflowContext struct {
|
||||
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
|
||||
}
|
||||
|
||||
// SanitizedEnv 返回脱敏后的环境变量(用于日志和存储)
|
||||
func (ctx *WorkflowContext) SanitizedEnv(secretKeys map[string]bool) map[string]string {
|
||||
sanitized := make(map[string]string)
|
||||
for k, v := range ctx.Env {
|
||||
if secretKeys[k] {
|
||||
sanitized[k] = "******"
|
||||
} else {
|
||||
sanitized[k] = v
|
||||
}
|
||||
}
|
||||
return sanitized
|
||||
}
|
||||
|
||||
// StreamChunk 类型常量
|
||||
const (
|
||||
StreamTypeThinking = "thinking" // AI 思考过程(ReAct Thought)
|
||||
|
||||
@@ -312,6 +312,8 @@ func (s *SsoClient) Callback(redis storage.Redis, ctx context.Context, code, sta
|
||||
|
||||
// 根据UsernameField配置确定username
|
||||
switch s.FeiShuConfig.UsernameField {
|
||||
case "userid":
|
||||
callbackOutput.Username = username
|
||||
case "name":
|
||||
if nickname == "" {
|
||||
return nil, errors.New("feishu user name is empty")
|
||||
|
||||
@@ -248,7 +248,7 @@ func (s *Set) writeTargetTsInRedis(ctx context.Context, redis storage.Redis, con
|
||||
|
||||
for i := 0; i < retryCount; i++ {
|
||||
start := time.Now()
|
||||
err := storage.MSet(ctx, redis, content)
|
||||
err := storage.MSet(ctx, redis, content, 24*time.Hour)
|
||||
duration := time.Since(start).Seconds()
|
||||
|
||||
logger.Debugf("update_ts: write target ts in redis, keys: %v, retryCount: %d, retryInterval: %v, error: %v", keys, retryCount, retryInterval, err)
|
||||
|
||||
@@ -35,6 +35,12 @@ type Pushgw struct {
|
||||
WriterOpt WriterGlobalOpt
|
||||
Writers []WriterOptions
|
||||
KafkaWriters []KafkaWriterOptions
|
||||
|
||||
// 预处理的字段,用于快速匹配只有 __name__ 的 DropSample 规则
|
||||
// key: metric name, value: struct{}
|
||||
DropMetricNames map[string]struct{}
|
||||
// 包含多个标签的复杂 DropSample 规则
|
||||
DropSampleComplex []map[string]string
|
||||
}
|
||||
|
||||
type WriterGlobalOpt struct {
|
||||
|
||||
@@ -109,21 +109,30 @@ func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) {
|
||||
}
|
||||
|
||||
func (rt *Router) DropSample(v *prompb.TimeSeries) bool {
|
||||
filters := rt.Pushgw.DropSample
|
||||
if len(filters) == 0 {
|
||||
// 快速路径:检查仅 __name__ 的过滤器 O(1)
|
||||
if len(rt.dropByNameOnly) > 0 {
|
||||
for i := 0; i < len(v.Labels); i++ {
|
||||
if v.Labels[i].Name == "__name__" {
|
||||
if _, ok := rt.dropByNameOnly[v.Labels[i].Value]; ok {
|
||||
return true
|
||||
}
|
||||
break // __name__ 只会出现一次,找到后直接跳出
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 慢速路径:处理复杂的多条件过滤器
|
||||
if len(rt.dropComplex) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
labelMap := make(map[string]string)
|
||||
// 只有复杂过滤器存在时才创建 labelMap
|
||||
labelMap := make(map[string]string, len(v.Labels))
|
||||
for i := 0; i < len(v.Labels); i++ {
|
||||
labelMap[v.Labels[i].Name] = v.Labels[i].Value
|
||||
}
|
||||
|
||||
for _, filter := range filters {
|
||||
if len(filter) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, filter := range rt.dropComplex {
|
||||
if matchSample(filter, labelMap) {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/center/metas"
|
||||
@@ -33,6 +34,10 @@ type Router struct {
|
||||
Ctx *ctx.Context
|
||||
HandleTS HandleTSFunc
|
||||
HeartbeatApi string
|
||||
|
||||
// 预编译的 DropSample 过滤器
|
||||
dropByNameOnly map[string]struct{} // 仅 __name__ 条件的快速匹配
|
||||
dropComplex []map[string]string // 多条件的复杂匹配
|
||||
}
|
||||
|
||||
func stat() gin.HandlerFunc {
|
||||
@@ -51,7 +56,7 @@ func stat() gin.HandlerFunc {
|
||||
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType,
|
||||
idents *idents.Set, metas *metas.Set,
|
||||
writers *writer.WritersType, ctx *ctx.Context) *Router {
|
||||
return &Router{
|
||||
rt := &Router{
|
||||
HTTP: httpConfig,
|
||||
Pushgw: pushgw,
|
||||
Aconf: aconf,
|
||||
@@ -63,6 +68,38 @@ func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *me
|
||||
MetaSet: metas,
|
||||
HandleTS: func(pt *prompb.TimeSeries) *prompb.TimeSeries { return pt },
|
||||
}
|
||||
|
||||
// 预编译 DropSample 过滤器
|
||||
rt.initDropSampleFilters()
|
||||
|
||||
return rt
|
||||
}
|
||||
|
||||
// initDropSampleFilters 预编译 DropSample 过滤器,将单条件 __name__ 过滤器
|
||||
// 放入 map 实现 O(1) 查找,多条件过滤器保留原有逻辑
|
||||
func (rt *Router) initDropSampleFilters() {
|
||||
rt.dropByNameOnly = make(map[string]struct{})
|
||||
rt.dropComplex = make([]map[string]string, 0)
|
||||
|
||||
for _, filter := range rt.Pushgw.DropSample {
|
||||
if len(filter) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 如果只有一个条件且是 __name__,放入快速匹配 map
|
||||
if len(filter) == 1 {
|
||||
if name, ok := filter["__name__"]; ok {
|
||||
rt.dropByNameOnly[name] = struct{}{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 其他情况放入复杂匹配列表
|
||||
rt.dropComplex = append(rt.dropComplex, filter)
|
||||
}
|
||||
|
||||
logger.Infof("DropSample filters initialized: %d name-only, %d complex",
|
||||
len(rt.dropByNameOnly), len(rt.dropComplex))
|
||||
}
|
||||
|
||||
func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
@@ -163,10 +163,10 @@ func MGet(ctx context.Context, r Redis, keys []string) [][]byte {
|
||||
return vals
|
||||
}
|
||||
|
||||
func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
|
||||
func MSet(ctx context.Context, r Redis, m map[string]interface{}, expiration time.Duration) error {
|
||||
pipe := r.Pipeline()
|
||||
for k, v := range m {
|
||||
pipe.Set(ctx, k, v, 0)
|
||||
pipe.Set(ctx, k, v, expiration)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestMiniRedisMGet(t *testing.T) {
|
||||
mp["key2"] = "value2"
|
||||
mp["key3"] = "value3"
|
||||
|
||||
err = MSet(context.Background(), rdb, mp)
|
||||
err = MSet(context.Background(), rdb, mp, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to set miniredis value: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user