mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-05 23:49:07 +00:00
Compare commits
44 Commits
change-wor
...
dev21
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b02ddeec7b | ||
|
|
d5528541c3 | ||
|
|
33ec277ac1 | ||
|
|
a63d6a1e49 | ||
|
|
84a179c4f4 | ||
|
|
f5811bc5f7 | ||
|
|
f27bbb4a51 | ||
|
|
5de63d7307 | ||
|
|
6a44da4dda | ||
|
|
b0fbca21b8 | ||
|
|
0a65616fbb | ||
|
|
a0e8c5f764 | ||
|
|
0c71eeac2a | ||
|
|
d64dbb6909 | ||
|
|
656b91e976 | ||
|
|
fe6dce403f | ||
|
|
48820a6bd5 | ||
|
|
faa348a086 | ||
|
|
635b781ae1 | ||
|
|
f60771ad9c | ||
|
|
6bd2f9a89f | ||
|
|
a76049822c | ||
|
|
52421f2477 | ||
|
|
a9ab02e1ad | ||
|
|
e5acc9199b | ||
|
|
ec7fbf313b | ||
|
|
1180066df3 | ||
|
|
b3ee1e56ad | ||
|
|
0b71d1ef82 | ||
|
|
2934dab4c7 | ||
|
|
d908240912 | ||
|
|
ff1aa83b8c | ||
|
|
d54bcdd722 | ||
|
|
81b5ce20ae | ||
|
|
806b3effe9 | ||
|
|
cd0b529b69 | ||
|
|
9e99e4a63a | ||
|
|
6b25a4ce90 | ||
|
|
1a50d22573 | ||
|
|
46083d741d | ||
|
|
3eeb705b39 | ||
|
|
8d87e69ee7 | ||
|
|
3da85d8e28 | ||
|
|
b50410b88a |
@@ -253,7 +253,7 @@ func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, e
|
||||
// 统一使用工作流引擎执行(兼容线性模式和工作流模式)
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeEvent,
|
||||
TriggerBy: from,
|
||||
TriggerBy: from + "_" + strconv.FormatInt(id, 10),
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(eventPipeline, event, triggerCtx)
|
||||
|
||||
@@ -844,7 +844,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
}
|
||||
m["ident"] = target.Ident
|
||||
|
||||
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
|
||||
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.BeatTime), trigger.Severity))
|
||||
}
|
||||
case "offset":
|
||||
idents, exists := arw.Processor.TargetsOfAlertRuleCache.Get(arw.Processor.EngineName, arw.Rule.Id)
|
||||
@@ -873,7 +873,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
continue
|
||||
}
|
||||
if target, exists := targetMap[ident]; exists {
|
||||
if now-target.UpdateAt > 120 {
|
||||
if now-target.BeatTime > 120 {
|
||||
// means this target is not a active host, do not check offset
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -60,11 +60,11 @@ func (e *WorkflowEngine) Execute(pipeline *models.EventPipeline, event *models.A
|
||||
}
|
||||
|
||||
func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, event *models.AlertCurEvent, triggerCtx *models.WorkflowTriggerContext) *models.WorkflowContext {
|
||||
// 合并环境变量
|
||||
env := pipeline.GetEnvMap()
|
||||
if triggerCtx != nil && triggerCtx.EnvOverrides != nil {
|
||||
for k, v := range triggerCtx.EnvOverrides {
|
||||
env[k] = v
|
||||
// 合并输入参数
|
||||
inputs := pipeline.GetInputsMap()
|
||||
if triggerCtx != nil && triggerCtx.InputsOverrides != nil {
|
||||
for k, v := range triggerCtx.InputsOverrides {
|
||||
inputs[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ func (e *WorkflowEngine) initWorkflowContext(pipeline *models.EventPipeline, eve
|
||||
|
||||
return &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: env,
|
||||
Inputs: inputs,
|
||||
Vars: make(map[string]interface{}), // 初始化空的 Vars,供节点间传递数据
|
||||
Metadata: metadata,
|
||||
Stream: stream,
|
||||
@@ -182,7 +182,6 @@ func (e *WorkflowEngine) executeDAG(nodeMap map[string]*models.WorkflowNode, con
|
||||
result.Status = models.ExecutionStatusFailed
|
||||
result.ErrorNode = nodeID
|
||||
result.Message = fmt.Sprintf("node %s failed: %s", node.Name, nodeResult.Error)
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -371,10 +370,8 @@ func (e *WorkflowEngine) saveExecutionRecord(pipeline *models.EventPipeline, wfC
|
||||
logger.Errorf("workflow: failed to set node results: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
secretKeys := pipeline.GetSecretKeys()
|
||||
sanitizedEnv := wfCtx.SanitizedEnv(secretKeys)
|
||||
if err := execution.SetEnvSnapshot(sanitizedEnv); err != nil {
|
||||
logger.Errorf("workflow: failed to set env snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
if err := execution.SetInputsSnapshot(wfCtx.Inputs); err != nil {
|
||||
logger.Errorf("workflow: failed to set inputs snapshot: pipeline_id=%d, error=%v", pipeline.ID, err)
|
||||
}
|
||||
|
||||
if err := models.CreateEventPipelineExecution(e.ctx, execution); err != nil {
|
||||
|
||||
@@ -114,7 +114,7 @@ func (c *AISummaryConfig) initHTTPClient() error {
|
||||
func (c *AISummaryConfig) prepareEventInfo(wfCtx *models.WorkflowContext) (string, error) {
|
||||
var defs = []string{
|
||||
"{{$event := .Event}}",
|
||||
"{{$env := .Env}}",
|
||||
"{{$inputs := .Inputs}}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.PromptTemplate), "")
|
||||
|
||||
@@ -44,8 +44,8 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
|
||||
// 创建 WorkflowContext
|
||||
wfCtx := &models.WorkflowContext{
|
||||
Event: event,
|
||||
Env: map[string]string{},
|
||||
Event: event,
|
||||
Inputs: map[string]string{},
|
||||
}
|
||||
|
||||
// 测试模板处理
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/utils"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -71,12 +72,17 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContext
|
||||
headers[k] = v
|
||||
}
|
||||
|
||||
url, err := utils.TplRender(wfCtx, c.URL)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to render url template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
req, err := http.NewRequest("POST", url, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
return wfCtx, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, wfCtx *models.WorkflowContex
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Content), "")
|
||||
|
||||
@@ -148,7 +148,7 @@ func (c *IfConfig) evaluateExpressionCondition(wfCtx *models.WorkflowContext) (b
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.Condition), "")
|
||||
|
||||
@@ -175,7 +175,7 @@ func (c *SwitchConfig) evaluateExpressionCondition(condition string, wfCtx *mode
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $env := .Env }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, condition), "")
|
||||
|
||||
32
alert/pipeline/processor/utils/utils.go
Normal file
32
alert/pipeline/processor/utils/utils.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
)
|
||||
|
||||
func TplRender(wfCtx *models.WorkflowContext, content string) (string, error) {
|
||||
var defs = []string{
|
||||
"{{ $event := .Event }}",
|
||||
"{{ $labels := .Event.TagsMap }}",
|
||||
"{{ $value := .Event.TriggerValue }}",
|
||||
"{{ $inputs := .Inputs }}",
|
||||
}
|
||||
text := strings.Join(append(defs, content), "")
|
||||
tpl, err := template.New("tpl").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse template: %v", err)
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
if err = tpl.Execute(&body, wfCtx); err != nil {
|
||||
return "", fmt.Errorf("failed to execute template: %v", err)
|
||||
}
|
||||
|
||||
return strings.TrimSpace(body.String()), nil
|
||||
}
|
||||
@@ -118,7 +118,7 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
err := storage.MSet(context.Background(), s.redis, newMap)
|
||||
err := storage.MSet(context.Background(), s.redis, newMap, 7*24*time.Hour)
|
||||
if err != nil {
|
||||
cstats.RedisOperationLatency.WithLabelValues("mset_target_meta", "fail").Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
@@ -127,7 +127,7 @@ func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
}
|
||||
|
||||
if len(extendMap) > 0 {
|
||||
err = storage.MSet(context.Background(), s.redis, extendMap)
|
||||
err = storage.MSet(context.Background(), s.redis, extendMap, 7*24*time.Hour)
|
||||
if err != nil {
|
||||
cstats.RedisOperationLatency.WithLabelValues("mset_target_extend", "fail").Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
|
||||
@@ -391,8 +391,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGets)
|
||||
pages.POST("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules/add"), rt.bgrw(), rt.recordingRuleAddByFE)
|
||||
pages.DELETE("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules/del"), rt.bgrw(), rt.recordingRuleDel)
|
||||
pages.PUT("/busi-group/:id/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules/put"), rt.bgrw(), rt.recordingRulePutByFE)
|
||||
pages.GET("/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGet)
|
||||
pages.PUT("/recording-rule/:rrid", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRulePutByFE)
|
||||
pages.PUT("/busi-group/:id/recording-rules/fields", rt.auth(), rt.user(), rt.perm("/recording-rules/put"), rt.recordingRulePutFields)
|
||||
|
||||
pages.GET("/busi-groups/alert-mutes", rt.auth(), rt.user(), rt.perm("/alert-mutes"), rt.alertMuteGetsByGids)
|
||||
|
||||
@@ -276,7 +276,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
}
|
||||
err = req.Add(rt.Ctx)
|
||||
} else {
|
||||
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
|
||||
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default", "weight")
|
||||
}
|
||||
|
||||
Render(c, nil, err)
|
||||
|
||||
@@ -164,7 +164,7 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
PipelineConfig models.EventPipeline `json:"pipeline_config"`
|
||||
EnvVariables map[string]string `json:"env_variables,omitempty"`
|
||||
InputVariables map[string]string `json:"input_variables,omitempty"`
|
||||
}
|
||||
|
||||
ginx.BindJSON(c, &f)
|
||||
@@ -182,9 +182,9 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvVariables,
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
InputsOverrides: f.InputVariables,
|
||||
}
|
||||
|
||||
resultEvent, result, err := workflowEngine.Execute(&f.PipelineConfig, event, triggerCtx)
|
||||
@@ -302,8 +302,8 @@ func (rt *Router) eventPipelinesListByService(c *gin.Context) {
|
||||
type EventPipelineRequest struct {
|
||||
// 事件数据(可选,如果不传则使用空事件)
|
||||
Event *models.AlertCurEvent `json:"event,omitempty"`
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides,omitempty"`
|
||||
// 输入参数覆盖
|
||||
InputsOverrides map[string]string `json:"inputs_overrides,omitempty"`
|
||||
|
||||
Username string `json:"username,omitempty"`
|
||||
}
|
||||
@@ -321,20 +321,15 @@ func (rt *Router) executePipelineTrigger(pipeline *models.EventPipeline, req *Ev
|
||||
}
|
||||
}
|
||||
|
||||
// 校验必填环境变量
|
||||
if err := pipeline.ValidateEnvVariables(req.EnvOverrides); err != nil {
|
||||
return "", fmt.Errorf("env validation failed: %v", err)
|
||||
}
|
||||
|
||||
// 生成执行ID
|
||||
executionID := uuid.New().String()
|
||||
|
||||
// 创建触发上下文
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: triggerBy,
|
||||
EnvOverrides: req.EnvOverrides,
|
||||
RequestID: executionID,
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: triggerBy,
|
||||
InputsOverrides: req.InputsOverrides,
|
||||
RequestID: executionID,
|
||||
}
|
||||
|
||||
// 异步执行工作流
|
||||
@@ -401,6 +396,7 @@ func (rt *Router) triggerEventPipelineByAPI(c *gin.Context) {
|
||||
}
|
||||
|
||||
func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
|
||||
pipelineId := ginx.QueryInt64(c, "pipeline_id", 0)
|
||||
pipelineName := ginx.QueryStr(c, "pipeline_name", "")
|
||||
mode := ginx.QueryStr(c, "mode", "")
|
||||
status := ginx.QueryStr(c, "status", "")
|
||||
@@ -414,7 +410,7 @@ func (rt *Router) listAllEventPipelineExecutions(c *gin.Context) {
|
||||
offset = 1
|
||||
}
|
||||
|
||||
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineName, mode, status, limit, (offset-1)*limit)
|
||||
executions, total, err := models.ListAllEventPipelineExecutions(rt.Ctx, pipelineId, pipelineName, mode, status, limit, (offset-1)*limit)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
@@ -509,11 +505,11 @@ func (rt *Router) streamEventPipeline(c *gin.Context) {
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: me.Username,
|
||||
InputsOverrides: f.InputsOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
@@ -604,11 +600,11 @@ func (rt *Router) streamEventPipelineByService(c *gin.Context) {
|
||||
}
|
||||
|
||||
triggerCtx := &models.WorkflowTriggerContext{
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: f.Username,
|
||||
EnvOverrides: f.EnvOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
Mode: models.TriggerModeAPI,
|
||||
TriggerBy: f.Username,
|
||||
InputsOverrides: f.InputsOverrides,
|
||||
RequestID: uuid.New().String(),
|
||||
Stream: true, // 流式端点强制启用流式输出
|
||||
}
|
||||
|
||||
workflowEngine := engine.NewWorkflowEngine(rt.Ctx)
|
||||
|
||||
@@ -571,12 +571,19 @@ func (rt *Router) loginCallbackFeiShu(c *gin.Context) {
|
||||
} else {
|
||||
user = new(models.User)
|
||||
defaultRoles := []string{}
|
||||
defaultUserGroups := []int64{}
|
||||
if rt.Sso.FeiShu != nil && rt.Sso.FeiShu.FeiShuConfig != nil {
|
||||
defaultRoles = rt.Sso.FeiShu.FeiShuConfig.DefaultRoles
|
||||
defaultUserGroups = rt.Sso.FeiShu.FeiShuConfig.DefaultUserGroups
|
||||
}
|
||||
|
||||
user.FullSsoFields(feishu.SsoTypeName, ret.Username, ret.Nickname, ret.Phone, ret.Email, defaultRoles)
|
||||
// create user from feishu
|
||||
ginx.Dangerous(user.Add(rt.Ctx))
|
||||
|
||||
if len(defaultUserGroups) > 0 {
|
||||
ginx.Dangerous(user.UpdateUserGroup(rt.Ctx, defaultUserGroups))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// set user login state
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/eval"
|
||||
"github.com/ccfos/nightingale/v6/dscache"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -117,10 +120,13 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
}
|
||||
|
||||
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
|
||||
var resp []models.DataResp
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
var (
|
||||
resp []models.DataResp
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
errs []error
|
||||
rCtx = ctx.Request.Context()
|
||||
)
|
||||
|
||||
for _, q := range f.Queries {
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
@@ -132,12 +138,17 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
|
||||
logger.Warningf("cluster:%d not exists", f.DatasourceId)
|
||||
return nil, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
vCtx := rCtx
|
||||
if f.Cate == models.DORIS {
|
||||
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
data, err := plug.QueryData(ctx.Request.Context(), query)
|
||||
data, err := plug.QueryData(vCtx, query)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", query, err)
|
||||
mu.Lock()
|
||||
|
||||
@@ -112,6 +112,7 @@ func (rt *Router) recordingRulePutByFE(c *gin.Context) {
|
||||
}
|
||||
|
||||
rt.bgrwCheck(c, ar.GroupId)
|
||||
rt.bgroCheck(c, f.GroupId)
|
||||
|
||||
f.UpdateBy = c.MustGet("username").(string)
|
||||
ginx.NewRender(c).Message(ar.Update(rt.Ctx, f))
|
||||
|
||||
@@ -38,6 +38,16 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
|
||||
total, err := models.TargetCountByFilter(rt.Ctx, query)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
models.FillTargetsBeatTime(rt.Redis, hosts)
|
||||
now := time.Now().Unix()
|
||||
for i := 0; i < len(hosts); i++ {
|
||||
if now-hosts[i].BeatTime < 60 {
|
||||
hosts[i].TargetUp = 2
|
||||
} else if now-hosts[i].BeatTime < 180 {
|
||||
hosts[i].TargetUp = 1
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": hosts,
|
||||
"total": total,
|
||||
@@ -81,9 +91,24 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
models.BuildTargetWhereWithBgids(bgids),
|
||||
models.BuildTargetWhereWithDsIds(dsIds),
|
||||
models.BuildTargetWhereWithQuery(query),
|
||||
models.BuildTargetWhereWithDowntime(downtime),
|
||||
models.BuildTargetWhereWithHosts(hosts),
|
||||
}
|
||||
|
||||
// downtime 筛选:从缓存获取心跳时间,选择较小的集合用 IN 或 NOT IN 过滤
|
||||
if downtime != 0 {
|
||||
downtimeOpt, hasMatch := rt.downtimeFilter(downtime)
|
||||
if !hasMatch {
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": []*models.Target{},
|
||||
"total": 0,
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
if downtimeOpt != nil {
|
||||
options = append(options, downtimeOpt)
|
||||
}
|
||||
}
|
||||
|
||||
total, err := models.TargetTotal(rt.Ctx, options...)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
@@ -102,14 +127,17 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
now := time.Now()
|
||||
cache := make(map[int64]*models.BusiGroup)
|
||||
|
||||
// 从 Redis 补全 BeatTime
|
||||
models.FillTargetsBeatTime(rt.Redis, list)
|
||||
|
||||
var keys []string
|
||||
for i := 0; i < len(list); i++ {
|
||||
ginx.Dangerous(list[i].FillGroup(rt.Ctx, cache))
|
||||
keys = append(keys, models.WrapIdent(list[i].Ident))
|
||||
|
||||
if now.Unix()-list[i].UpdateAt < 60 {
|
||||
if now.Unix()-list[i].BeatTime < 60 {
|
||||
list[i].TargetUp = 2
|
||||
} else if now.Unix()-list[i].UpdateAt < 180 {
|
||||
} else if now.Unix()-list[i].BeatTime < 180 {
|
||||
list[i].TargetUp = 1
|
||||
}
|
||||
}
|
||||
@@ -148,6 +176,43 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// downtimeFilter 从缓存获取心跳时间,生成 downtime 筛选条件
|
||||
// 选择匹配集和非匹配集中较小的一方,用 IN 或 NOT IN 来减少 SQL 参数量
|
||||
// 返回值:
|
||||
// - option: 筛选条件,nil 表示所有 target 都符合条件(无需过滤)
|
||||
// - hasMatch: 是否有符合条件的 target,false 表示无匹配应返回空结果
|
||||
func (rt *Router) downtimeFilter(downtime int64) (option models.BuildTargetWhereOption, hasMatch bool) {
|
||||
now := time.Now().Unix()
|
||||
targets := rt.TargetCache.GetAll()
|
||||
var matchIdents, nonMatchIdents []string
|
||||
for _, target := range targets {
|
||||
matched := false
|
||||
if downtime > 0 {
|
||||
matched = target.BeatTime < now-downtime
|
||||
} else if downtime < 0 {
|
||||
matched = target.BeatTime > now+downtime
|
||||
}
|
||||
if matched {
|
||||
matchIdents = append(matchIdents, target.Ident)
|
||||
} else {
|
||||
nonMatchIdents = append(nonMatchIdents, target.Ident)
|
||||
}
|
||||
}
|
||||
|
||||
if len(matchIdents) == 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if len(nonMatchIdents) == 0 {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
if len(matchIdents) <= len(nonMatchIdents) {
|
||||
return models.BuildTargetWhereWithIdents(matchIdents), true
|
||||
}
|
||||
return models.BuildTargetWhereExcludeIdents(nonMatchIdents), true
|
||||
}
|
||||
|
||||
func (rt *Router) targetExtendInfoByIdent(c *gin.Context) {
|
||||
ident := ginx.QueryStr(c, "ident", "")
|
||||
key := models.WrapExtendIdent(ident)
|
||||
|
||||
@@ -71,7 +71,10 @@ CREATE TABLE `datasource`
|
||||
`updated_at` bigint not null default 0,
|
||||
`updated_by` varchar(64) not null default '',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
|
||||
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
|
||||
|
||||
-- datasource add weight field
|
||||
alter table `datasource` add `weight` int not null default 0;
|
||||
|
||||
CREATE TABLE `builtin_cate` (
|
||||
`id` bigint unsigned not null auto_increment,
|
||||
|
||||
@@ -565,6 +565,14 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 检查是否有 shard failures,有部分数据时仅记录警告继续处理
|
||||
if shardErr := checkShardFailures(result.Shards, "query_data", searchSourceString); shardErr != nil {
|
||||
if len(result.Aggregations["ts"]) == 0 {
|
||||
return nil, shardErr
|
||||
}
|
||||
// 有部分数据,checkShardFailures 已记录警告,继续处理
|
||||
}
|
||||
|
||||
logger.Debugf("query_data searchSource:%s resp:%s", string(jsonSearchSource), string(result.Aggregations["ts"]))
|
||||
|
||||
js, err := simplejson.NewJson(result.Aggregations["ts"])
|
||||
@@ -602,6 +610,40 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
|
||||
return items, nil
|
||||
}
|
||||
|
||||
// checkShardFailures 检查 ES 查询结果中的 shard failures,返回格式化的错误信息
|
||||
func checkShardFailures(shards *elastic.ShardsInfo, logPrefix string, queryContext interface{}) error {
|
||||
if shards == nil || shards.Failed == 0 || len(shards.Failures) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var failureReasons []string
|
||||
for _, failure := range shards.Failures {
|
||||
reason := ""
|
||||
if failure.Reason != nil {
|
||||
if reasonType, ok := failure.Reason["type"].(string); ok {
|
||||
reason = reasonType
|
||||
}
|
||||
if reasonMsg, ok := failure.Reason["reason"].(string); ok {
|
||||
if reason != "" {
|
||||
reason += ": " + reasonMsg
|
||||
} else {
|
||||
reason = reasonMsg
|
||||
}
|
||||
}
|
||||
}
|
||||
if reason != "" {
|
||||
failureReasons = append(failureReasons, fmt.Sprintf("index=%s shard=%d: %s", failure.Index, failure.Shard, reason))
|
||||
}
|
||||
}
|
||||
|
||||
if len(failureReasons) > 0 {
|
||||
errMsg := fmt.Sprintf("elasticsearch shard failures (%d/%d failed): %s", shards.Failed, shards.Total, strings.Join(failureReasons, "; "))
|
||||
logger.Warningf("%s query:%v %s", logPrefix, queryContext, errMsg)
|
||||
return fmt.Errorf("%s", errMsg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func HitFilter(typ string) bool {
|
||||
switch typ {
|
||||
case "keyword", "date", "long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float", "unsigned_long":
|
||||
@@ -678,21 +720,27 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
|
||||
} else {
|
||||
source = source.From(param.P).Sort(param.DateField, param.Ascending)
|
||||
}
|
||||
sourceBytes, _ := json.Marshal(source)
|
||||
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error:%v", err)
|
||||
logger.Warningf("query_log source:%s error:%v", string(sourceBytes), err)
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 检查是否有 shard failures,有部分数据时仅记录警告继续处理
|
||||
if shardErr := checkShardFailures(result.Shards, "query_log", string(sourceBytes)); shardErr != nil {
|
||||
if len(result.Hits.Hits) == 0 {
|
||||
return nil, 0, shardErr
|
||||
}
|
||||
// 有部分数据,checkShardFailures 已记录警告,继续处理
|
||||
}
|
||||
|
||||
total := result.TotalHits()
|
||||
|
||||
var ret []interface{}
|
||||
|
||||
b, _ := json.Marshal(source)
|
||||
logger.Debugf("query data result query source:%s len:%d total:%d", string(b), len(result.Hits.Hits), total)
|
||||
logger.Debugf("query_log source:%s len:%d total:%d", string(sourceBytes), len(result.Hits.Hits), total)
|
||||
|
||||
resultBytes, _ := json.Marshal(result)
|
||||
logger.Debugf("query data result query source:%s result:%s", string(b), string(resultBytes))
|
||||
logger.Debugf("query_log source:%s result:%s", string(sourceBytes), string(resultBytes))
|
||||
|
||||
if strings.HasPrefix(version, "6") {
|
||||
for i := 0; i < len(result.Hits.Hits); i++ {
|
||||
|
||||
@@ -133,4 +133,5 @@ type DatasourceInfo struct {
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
IsDefault bool `json:"is_default"`
|
||||
Weight int `json:"weight"`
|
||||
}
|
||||
|
||||
@@ -79,52 +79,19 @@ func (d *Doris) Equal(p datasource.Datasource) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// only compare first shard
|
||||
if d.Addr != newest.Addr {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.User != newest.User {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.Password != newest.Password {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.EnableWrite != newest.EnableWrite {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.FeAddr != newest.FeAddr {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.MaxQueryRows != newest.MaxQueryRows {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.Timeout != newest.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.MaxIdleConns != newest.MaxIdleConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.MaxOpenConns != newest.MaxOpenConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.ConnMaxLifetime != newest.ConnMaxLifetime {
|
||||
return false
|
||||
}
|
||||
|
||||
if d.ClusterName != newest.ClusterName {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
return d.Addr == newest.Addr &&
|
||||
d.FeAddr == newest.FeAddr &&
|
||||
d.User == newest.User &&
|
||||
d.Password == newest.Password &&
|
||||
d.EnableWrite == newest.EnableWrite &&
|
||||
d.UserWrite == newest.UserWrite &&
|
||||
d.PasswordWrite == newest.PasswordWrite &&
|
||||
d.MaxQueryRows == newest.MaxQueryRows &&
|
||||
d.Timeout == newest.Timeout &&
|
||||
d.MaxIdleConns == newest.MaxIdleConns &&
|
||||
d.MaxOpenConns == newest.MaxOpenConns &&
|
||||
d.ConnMaxLifetime == newest.ConnMaxLifetime &&
|
||||
d.ClusterName == newest.ClusterName
|
||||
}
|
||||
|
||||
func (d *Doris) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
@@ -181,7 +148,7 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
|
||||
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
|
||||
@@ -738,6 +738,7 @@ CREATE TABLE datasource
|
||||
http varchar(4096) not null default '',
|
||||
auth varchar(8192) not null default '',
|
||||
is_default boolean not null default false,
|
||||
weight int not null default 0,
|
||||
created_at bigint not null default 0,
|
||||
created_by varchar(64) not null default '',
|
||||
updated_at bigint not null default 0,
|
||||
|
||||
@@ -655,6 +655,7 @@ CREATE TABLE `datasource`
|
||||
`http` varchar(4096) not null default '',
|
||||
`auth` varchar(8192) not null default '',
|
||||
`is_default` boolean COMMENT 'is default datasource',
|
||||
`weight` int not null default 0,
|
||||
`created_at` bigint not null default 0,
|
||||
`created_by` varchar(64) not null default '',
|
||||
`updated_at` bigint not null default 0,
|
||||
|
||||
@@ -301,7 +301,7 @@ ALTER TABLE `event_pipeline` ADD COLUMN `trigger_mode` varchar(128) NOT NULL DEF
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `disabled` tinyint(1) NOT NULL DEFAULT 0 COMMENT 'disabled flag';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `nodes` text COMMENT 'workflow nodes (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `connections` text COMMENT 'node connections (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `env_variables` text COMMENT 'environment variables (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `input_variables` text COMMENT 'input variables (JSON)';
|
||||
ALTER TABLE `event_pipeline` ADD COLUMN `label_filters` text COMMENT 'label filters (JSON)';
|
||||
|
||||
CREATE TABLE `event_pipeline_execution` (
|
||||
@@ -318,7 +318,7 @@ CREATE TABLE `event_pipeline_execution` (
|
||||
`finished_at` bigint DEFAULT 0 COMMENT 'finish timestamp',
|
||||
`duration_ms` bigint DEFAULT 0 COMMENT 'duration in milliseconds',
|
||||
`trigger_by` varchar(64) DEFAULT '' COMMENT 'trigger by',
|
||||
`env_snapshot` text COMMENT 'environment variables snapshot (sanitized)',
|
||||
`inputs_snapshot` text COMMENT 'inputs snapshot',
|
||||
PRIMARY KEY (`id`),
|
||||
KEY `idx_pipeline_id` (`pipeline_id`),
|
||||
KEY `idx_event_id` (`event_id`),
|
||||
@@ -358,3 +358,6 @@ CREATE TABLE `user_view_favorite` (
|
||||
KEY `idx_view_id` (`view_id`),
|
||||
KEY `idx_user_id` (`user_id`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='user favorite views';
|
||||
|
||||
/* v9 2026-01-20 datasource weight */
|
||||
ALTER TABLE `datasource` ADD COLUMN `weight` int not null default 0 COMMENT 'weight for sorting';
|
||||
|
||||
@@ -589,6 +589,7 @@ CREATE TABLE `datasource`
|
||||
`http` varchar(4096) not null default '',
|
||||
`auth` varchar(8192) not null default '',
|
||||
`is_default` tinyint not null default 0,
|
||||
`weight` int not null default 0,
|
||||
`created_at` bigint not null default 0,
|
||||
`created_by` varchar(64) not null default '',
|
||||
`updated_at` bigint not null default 0,
|
||||
|
||||
@@ -90,7 +90,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
foundDefaultDatasource = true
|
||||
}
|
||||
|
||||
logger.Debugf("get datasource: %+v", item)
|
||||
// logger.Debugf("get datasource: %+v", item)
|
||||
ds := datasource.DatasourceInfo{
|
||||
Id: item.Id,
|
||||
Name: item.Name,
|
||||
@@ -104,6 +104,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
AuthJson: item.AuthJson,
|
||||
Status: item.Status,
|
||||
IsDefault: item.IsDefault,
|
||||
Weight: item.Weight,
|
||||
}
|
||||
|
||||
if item.PluginType == "elasticsearch" {
|
||||
@@ -236,5 +237,5 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
// logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -39,6 +39,9 @@ type Doris struct {
|
||||
MaxQueryRows int `json:"doris.max_query_rows" mapstructure:"doris.max_query_rows"`
|
||||
ClusterName string `json:"doris.cluster_name" mapstructure:"doris.cluster_name"`
|
||||
EnableWrite bool `json:"doris.enable_write" mapstructure:"doris.enable_write"`
|
||||
// 写用户,用来区分读写用户,减少数据源
|
||||
UserWrite string `json:"doris.user_write" mapstructure:"doris.user_write"`
|
||||
PasswordWrite string `json:"doris.password_write" mapstructure:"doris.password_write"`
|
||||
}
|
||||
|
||||
// NewDorisWithSettings initializes a new Doris instance with the given settings
|
||||
@@ -88,13 +91,13 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
|
||||
|
||||
var keys []string
|
||||
keys = append(keys, d.Addr)
|
||||
keys = append(keys, d.Password, d.User)
|
||||
keys = append(keys, d.User, d.Password)
|
||||
if len(database) > 0 {
|
||||
keys = append(keys, database)
|
||||
}
|
||||
cachedkey := strings.Join(keys, ":")
|
||||
cachedKey := strings.Join(keys, ":")
|
||||
// cache conn with database
|
||||
conn, ok := pool.PoolClient.Load(cachedkey)
|
||||
conn, ok := pool.PoolClient.Load(cachedKey)
|
||||
if ok {
|
||||
return conn.(*sql.DB), nil
|
||||
}
|
||||
@@ -102,7 +105,7 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
|
||||
var err error
|
||||
defer func() {
|
||||
if db != nil && err == nil {
|
||||
pool.PoolClient.Store(cachedkey, db)
|
||||
pool.PoolClient.Store(cachedKey, db)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -121,6 +124,79 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// NewWriteConn establishes a new connection to Doris for write operations
|
||||
// When EnableWrite is true and UserWrite is configured, it uses the write user credentials
|
||||
// Otherwise, it reuses the read connection from NewConn
|
||||
func (d *Doris) NewWriteConn(ctx context.Context, database string) (*sql.DB, error) {
|
||||
// If write user is not configured, reuse the read connection
|
||||
if !d.EnableWrite || len(d.UserWrite) == 0 {
|
||||
return d.NewConn(ctx, database)
|
||||
}
|
||||
|
||||
if len(d.Addr) == 0 {
|
||||
return nil, errors.New("empty fe-node addr")
|
||||
}
|
||||
|
||||
// Set default values similar to postgres implementation
|
||||
if d.Timeout == 0 {
|
||||
d.Timeout = 60000
|
||||
}
|
||||
if d.MaxIdleConns == 0 {
|
||||
d.MaxIdleConns = 10
|
||||
}
|
||||
if d.MaxOpenConns == 0 {
|
||||
d.MaxOpenConns = 100
|
||||
}
|
||||
if d.ConnMaxLifetime == 0 {
|
||||
d.ConnMaxLifetime = 14400
|
||||
}
|
||||
if d.MaxQueryRows == 0 {
|
||||
d.MaxQueryRows = 500
|
||||
}
|
||||
|
||||
// Use write user credentials
|
||||
user := d.UserWrite
|
||||
password := d.PasswordWrite
|
||||
|
||||
var keys []string
|
||||
keys = append(keys, d.Addr)
|
||||
keys = append(keys, user, password)
|
||||
if len(database) > 0 {
|
||||
keys = append(keys, database)
|
||||
}
|
||||
cachedKey := strings.Join(keys, ":")
|
||||
// cache conn with database
|
||||
conn, ok := pool.PoolClient.Load(cachedKey)
|
||||
if ok {
|
||||
return conn.(*sql.DB), nil
|
||||
}
|
||||
var db *sql.DB
|
||||
var err error
|
||||
defer func() {
|
||||
if db != nil && err == nil {
|
||||
pool.PoolClient.Store(cachedKey, db)
|
||||
}
|
||||
}()
|
||||
|
||||
// Simplified connection logic for Doris using MySQL driver
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", user, password, d.Addr, database)
|
||||
db, err = sql.Open("mysql", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set connection pool configuration for write connections
|
||||
// Use more conservative values since write operations are typically less frequent
|
||||
writeMaxIdleConns := max(d.MaxIdleConns/5, 2)
|
||||
writeMaxOpenConns := max(d.MaxOpenConns/10, 5)
|
||||
|
||||
db.SetMaxIdleConns(writeMaxIdleConns)
|
||||
db.SetMaxOpenConns(writeMaxOpenConns)
|
||||
db.SetConnMaxLifetime(time.Duration(d.ConnMaxLifetime) * time.Second)
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// createTimeoutContext creates a context with timeout based on Doris configuration
|
||||
func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
timeout := d.Timeout
|
||||
@@ -472,7 +548,7 @@ func (d *Doris) ExecContext(ctx context.Context, database string, sql string) er
|
||||
timeoutCtx, cancel := d.createTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
db, err := d.NewConn(timeoutCtx, database)
|
||||
db, err := d.NewWriteConn(timeoutCtx, database)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -10,13 +10,14 @@ const (
|
||||
TimeseriesAggregationTimestamp = "__ts__"
|
||||
)
|
||||
|
||||
// QueryLogs 查询日志
|
||||
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
|
||||
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
// 等同于 Query()
|
||||
return d.Query(ctx, query)
|
||||
return d.Query(ctx, query, true)
|
||||
}
|
||||
|
||||
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
|
||||
values, err := d.QueryTimeseries(ctx, query)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,6 +15,10 @@ const (
|
||||
TimeFieldFormatDateTime = "datetime"
|
||||
)
|
||||
|
||||
type noNeedCheckMaxRowKey struct{}
|
||||
|
||||
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
|
||||
|
||||
// 不再拼接SQL, 完全信赖用户的输入
|
||||
type QueryParam struct {
|
||||
Database string `json:"database"`
|
||||
@@ -39,7 +43,7 @@ var (
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
|
||||
// 校验SQL的合法性, 过滤掉 write请求
|
||||
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
|
||||
for _, item := range sqlItem {
|
||||
@@ -48,10 +52,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
}
|
||||
}
|
||||
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if checkMaxRow {
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
|
||||
@@ -63,8 +69,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
|
||||
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
|
||||
// 使用 Query 方法执行查询,Query方法内部已包含MaxQueryRows检查
|
||||
rows, err := d.Query(ctx, query)
|
||||
// 默认需要检查,除非调用方声明不需要检查
|
||||
checkMaxRow := true
|
||||
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
|
||||
checkMaxRow = false
|
||||
}
|
||||
rows, err := d.Query(ctx, query, checkMaxRow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -27,7 +27,8 @@ type TargetCacheType struct {
|
||||
redis storage.Redis
|
||||
|
||||
sync.RWMutex
|
||||
targets map[string]*models.Target // key: ident
|
||||
targets map[string]*models.Target // key: ident
|
||||
targetsIndex map[string][]string // key: ip, value: ident list
|
||||
}
|
||||
|
||||
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
|
||||
@@ -38,6 +39,7 @@ func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *Target
|
||||
stats: stats,
|
||||
redis: redis,
|
||||
targets: make(map[string]*models.Target),
|
||||
targetsIndex: make(map[string][]string),
|
||||
}
|
||||
|
||||
tc.SyncTargets()
|
||||
@@ -51,6 +53,7 @@ func (tc *TargetCacheType) Reset() {
|
||||
tc.statTotal = -1
|
||||
tc.statLastUpdated = -1
|
||||
tc.targets = make(map[string]*models.Target)
|
||||
tc.targetsIndex = make(map[string][]string)
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
@@ -62,8 +65,17 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
|
||||
idx := make(map[string][]string, len(m))
|
||||
for ident, target := range m {
|
||||
if _, ok := idx[target.HostIp]; !ok {
|
||||
idx[target.HostIp] = []string{}
|
||||
}
|
||||
idx[target.HostIp] = append(idx[target.HostIp], ident)
|
||||
}
|
||||
|
||||
tc.Lock()
|
||||
tc.targets = m
|
||||
tc.targetsIndex = idx
|
||||
tc.Unlock()
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
@@ -78,6 +90,75 @@ func (tc *TargetCacheType) Get(ident string) (*models.Target, bool) {
|
||||
return val, has
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) GetByIp(ip string) ([]*models.Target, bool) {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
idents, has := tc.targetsIndex[ip]
|
||||
if !has {
|
||||
return nil, false
|
||||
}
|
||||
targs := make([]*models.Target, 0, len(idents))
|
||||
for _, ident := range idents {
|
||||
if val, has := tc.targets[ident]; has {
|
||||
targs = append(targs, val)
|
||||
}
|
||||
}
|
||||
return targs, len(targs) > 0
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) GetAll() []*models.Target {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
lst := make([]*models.Target, 0, len(tc.targets))
|
||||
for _, target := range tc.targets {
|
||||
lst = append(lst, target)
|
||||
}
|
||||
return lst
|
||||
}
|
||||
|
||||
// GetAllBeatTime 返回所有 target 的心跳时间 map,key 为 ident,value 为 BeatTime
|
||||
func (tc *TargetCacheType) GetAllBeatTime() map[string]int64 {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
beatTimeMap := make(map[string]int64, len(tc.targets))
|
||||
for ident, target := range tc.targets {
|
||||
beatTimeMap[ident] = target.BeatTime
|
||||
}
|
||||
return beatTimeMap
|
||||
}
|
||||
|
||||
// refreshBeatTime 从 Redis 刷新缓存中所有 target 的 BeatTime
|
||||
func (tc *TargetCacheType) refreshBeatTime() {
|
||||
if tc.redis == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// 快照 ident 列表,避免持锁访问 Redis
|
||||
tc.RLock()
|
||||
idents := make([]string, 0, len(tc.targets))
|
||||
for ident := range tc.targets {
|
||||
idents = append(idents, ident)
|
||||
}
|
||||
tc.RUnlock()
|
||||
|
||||
if len(idents) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
beatTimes := models.FetchBeatTimesFromRedis(tc.redis, idents)
|
||||
if len(beatTimes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
tc.Lock()
|
||||
for ident, ts := range beatTimes {
|
||||
if target, ok := tc.targets[ident]; ok {
|
||||
target.BeatTime = ts
|
||||
}
|
||||
}
|
||||
tc.Unlock()
|
||||
}
|
||||
|
||||
func (tc *TargetCacheType) Gets(idents []string) []*models.Target {
|
||||
tc.RLock()
|
||||
defer tc.RUnlock()
|
||||
@@ -105,7 +186,7 @@ func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, now, offset i
|
||||
continue
|
||||
}
|
||||
|
||||
if now-target.UpdateAt > 120 {
|
||||
if now-target.BeatTime > 120 {
|
||||
// means this target is not a active host, do not check offset
|
||||
continue
|
||||
}
|
||||
@@ -147,6 +228,7 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
}
|
||||
|
||||
if !tc.StatChanged(stat.Total, stat.LastUpdated) {
|
||||
tc.refreshBeatTime()
|
||||
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(0)
|
||||
tc.stats.GaugeSyncNumber.WithLabelValues("sync_targets").Set(0)
|
||||
dumper.PutSyncRecord("targets", start.Unix(), -1, -1, "not changed")
|
||||
@@ -170,6 +252,9 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
}
|
||||
}
|
||||
|
||||
// 从 Redis 批量获取心跳时间填充 BeatTime
|
||||
models.FillTargetsBeatTime(tc.redis, lst)
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].Ident] = lst[i]
|
||||
}
|
||||
@@ -186,57 +271,18 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
|
||||
// get host update time
|
||||
func (tc *TargetCacheType) GetHostUpdateTime(targets []string) map[string]int64 {
|
||||
metaMap := make(map[string]int64)
|
||||
if tc.redis == nil {
|
||||
return metaMap
|
||||
return make(map[string]int64)
|
||||
}
|
||||
|
||||
num := 0
|
||||
var keys []string
|
||||
for i := 0; i < len(targets); i++ {
|
||||
keys = append(keys, models.WrapIdentUpdateTime(targets[i]))
|
||||
num++
|
||||
if num == 100 {
|
||||
vals := storage.MGet(context.Background(), tc.redis, keys)
|
||||
for _, value := range vals {
|
||||
var hostUpdateTime models.HostUpdateTime
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := json.Unmarshal(value, &hostUpdateTime)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value)
|
||||
continue
|
||||
}
|
||||
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
|
||||
}
|
||||
keys = keys[:0]
|
||||
num = 0
|
||||
}
|
||||
}
|
||||
|
||||
vals := storage.MGet(context.Background(), tc.redis, keys)
|
||||
for _, value := range vals {
|
||||
var hostUpdateTime models.HostUpdateTime
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := json.Unmarshal(value, &hostUpdateTime)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to unmarshal host err:%v value:%s", err, string(value))
|
||||
continue
|
||||
}
|
||||
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
|
||||
}
|
||||
metaMap := models.FetchBeatTimesFromRedis(tc.redis, targets)
|
||||
|
||||
for _, ident := range targets {
|
||||
if _, ok := metaMap[ident]; !ok {
|
||||
// if not exists, get from cache
|
||||
target, exists := tc.Get(ident)
|
||||
if exists {
|
||||
metaMap[ident] = target.UpdateAt
|
||||
metaMap[ident] = target.BeatTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,10 +509,16 @@ func (ar *AlertRule) Verify() error {
|
||||
|
||||
ar.AppendTags = strings.TrimSpace(ar.AppendTags)
|
||||
arr := strings.Fields(ar.AppendTags)
|
||||
appendTagKeys := make(map[string]struct{})
|
||||
for i := 0; i < len(arr); i++ {
|
||||
if !strings.Contains(arr[i], "=") {
|
||||
return fmt.Errorf("AppendTags(%s) invalid", arr[i])
|
||||
}
|
||||
pair := strings.SplitN(arr[i], "=", 2)
|
||||
if _, exists := appendTagKeys[pair[0]]; exists {
|
||||
return fmt.Errorf("AppendTags has duplicate key: %s", pair[0])
|
||||
}
|
||||
appendTagKeys[pair[0]] = struct{}{}
|
||||
}
|
||||
|
||||
gids := strings.Fields(ar.NotifyGroups)
|
||||
|
||||
@@ -45,6 +45,7 @@ type Datasource struct {
|
||||
CreatedBy string `json:"created_by"`
|
||||
UpdatedBy string `json:"updated_by"`
|
||||
IsDefault bool `json:"is_default"`
|
||||
Weight int `json:"weight"`
|
||||
Transport *http.Transport `json:"-" gorm:"-"`
|
||||
ForceSave bool `json:"force_save" gorm:"-"`
|
||||
}
|
||||
@@ -517,7 +518,8 @@ func (ds *Datasource) Encrypt(openRsa bool, publicKeyData []byte) error {
|
||||
// Decrypt 用于 edge 将从中心同步的数据源解密,中心不可调用
|
||||
func (ds *Datasource) Decrypt() error {
|
||||
if rsaConfig == nil {
|
||||
return errors.New("rsa config is nil")
|
||||
logger.Debugf("datasource %s rsa config is nil", ds.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !rsaConfig.OpenRSA {
|
||||
|
||||
@@ -29,8 +29,8 @@ type EventPipeline struct {
|
||||
Nodes []WorkflowNode `json:"nodes,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 节点连接关系
|
||||
Connections Connections `json:"connections,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 环境变量(工作流级别的配置变量)
|
||||
EnvVariables []EnvVariable `json:"env_variables,omitempty" gorm:"type:text;serializer:json"`
|
||||
// 输入参数(工作流级别的配置变量)
|
||||
Inputs []InputVariable `json:"inputs,omitempty" gorm:"type:text;serializer:json"`
|
||||
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
@@ -73,8 +73,8 @@ func (e *EventPipeline) Verify() error {
|
||||
if e.Connections == nil {
|
||||
e.Connections = make(Connections)
|
||||
}
|
||||
if e.EnvVariables == nil {
|
||||
e.EnvVariables = make([]EnvVariable, 0)
|
||||
if e.Inputs == nil {
|
||||
e.Inputs = make([]InputVariable, 0)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -245,36 +245,10 @@ func (e *EventPipeline) FillWorkflowFields() {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetEnvMap() map[string]string {
|
||||
envMap := make(map[string]string)
|
||||
for _, v := range e.EnvVariables {
|
||||
envMap[v.Key] = v.Value
|
||||
func (e *EventPipeline) GetInputsMap() map[string]string {
|
||||
inputsMap := make(map[string]string)
|
||||
for _, v := range e.Inputs {
|
||||
inputsMap[v.Key] = v.Value
|
||||
}
|
||||
return envMap
|
||||
}
|
||||
|
||||
func (e *EventPipeline) GetSecretKeys() map[string]bool {
|
||||
secretKeys := make(map[string]bool)
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Secret {
|
||||
secretKeys[v.Key] = true
|
||||
}
|
||||
}
|
||||
return secretKeys
|
||||
}
|
||||
|
||||
func (e *EventPipeline) ValidateEnvVariables(overrides map[string]string) error {
|
||||
// 合并默认值和覆盖值
|
||||
merged := e.GetEnvMap()
|
||||
for k, v := range overrides {
|
||||
merged[k] = v
|
||||
}
|
||||
|
||||
// 校验必填项
|
||||
for _, v := range e.EnvVariables {
|
||||
if v.Required && merged[v.Key] == "" {
|
||||
return fmt.Errorf("required env variable %s is missing", v.Key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return inputsMap
|
||||
}
|
||||
|
||||
@@ -2,10 +2,13 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// 执行状态常量
|
||||
@@ -43,8 +46,8 @@ type EventPipelineExecution struct {
|
||||
// 触发者信息
|
||||
TriggerBy string `json:"trigger_by" gorm:"type:varchar(64)"`
|
||||
|
||||
// 环境变量快照(脱敏后存储)
|
||||
EnvSnapshot string `json:"env_snapshot,omitempty" gorm:"type:text"`
|
||||
// 输入参数快照(脱敏后存储)
|
||||
InputsSnapshot string `json:"inputs_snapshot,omitempty" gorm:"type:text"`
|
||||
}
|
||||
|
||||
func (e *EventPipelineExecution) TableName() string {
|
||||
@@ -71,24 +74,24 @@ func (e *EventPipelineExecution) GetNodeResults() ([]*NodeExecutionResult, error
|
||||
return results, err
|
||||
}
|
||||
|
||||
// SetEnvSnapshot 设置环境变量快照(脱敏后存储)
|
||||
func (e *EventPipelineExecution) SetEnvSnapshot(env map[string]string) error {
|
||||
data, err := json.Marshal(env)
|
||||
// SetInputsSnapshot 设置输入参数快照(脱敏后存储)
|
||||
func (e *EventPipelineExecution) SetInputsSnapshot(inputs map[string]string) error {
|
||||
data, err := json.Marshal(inputs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.EnvSnapshot = string(data)
|
||||
e.InputsSnapshot = string(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetEnvSnapshot 获取环境变量快照
|
||||
func (e *EventPipelineExecution) GetEnvSnapshot() (map[string]string, error) {
|
||||
if e.EnvSnapshot == "" {
|
||||
// GetInputsSnapshot 获取输入参数快照
|
||||
func (e *EventPipelineExecution) GetInputsSnapshot() (map[string]string, error) {
|
||||
if e.InputsSnapshot == "" {
|
||||
return nil, nil
|
||||
}
|
||||
var env map[string]string
|
||||
err := json.Unmarshal([]byte(e.EnvSnapshot), &env)
|
||||
return env, err
|
||||
var inputs map[string]string
|
||||
err := json.Unmarshal([]byte(e.InputsSnapshot), &inputs)
|
||||
return inputs, err
|
||||
}
|
||||
|
||||
// CreateEventPipelineExecution 创建执行记录
|
||||
@@ -109,6 +112,9 @@ func GetEventPipelineExecution(c *ctx.Context, id string) (*EventPipelineExecuti
|
||||
var execution EventPipelineExecution
|
||||
err := DB(c).Where("id = ?", id).First(&execution).Error
|
||||
if err != nil {
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return &execution, nil
|
||||
@@ -149,12 +155,15 @@ func ListEventPipelineExecutionsByEventID(c *ctx.Context, eventID int64) ([]*Eve
|
||||
}
|
||||
|
||||
// ListAllEventPipelineExecutions 获取所有 Pipeline 的执行记录列表
|
||||
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
func ListAllEventPipelineExecutions(c *ctx.Context, pipelineId int64, pipelineName, mode, status string, limit, offset int) ([]*EventPipelineExecution, int64, error) {
|
||||
var executions []*EventPipelineExecution
|
||||
var total int64
|
||||
|
||||
session := DB(c).Model(&EventPipelineExecution{})
|
||||
|
||||
if pipelineId > 0 {
|
||||
session = session.Where("pipeline_id = ?", pipelineId)
|
||||
}
|
||||
if pipelineName != "" {
|
||||
session = session.Where("pipeline_name LIKE ?", "%"+pipelineName+"%")
|
||||
}
|
||||
@@ -274,8 +283,8 @@ func GetEventPipelineExecutionStatistics(c *ctx.Context, pipelineID int64) (*Eve
|
||||
// EventPipelineExecutionDetail 执行详情(包含解析后的节点结果)
|
||||
type EventPipelineExecutionDetail struct {
|
||||
EventPipelineExecution
|
||||
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
|
||||
EnvSnapshotParsed map[string]string `json:"env_snapshot_parsed"`
|
||||
NodeResultsParsed []*NodeExecutionResult `json:"node_results_parsed"`
|
||||
InputsSnapshotParsed map[string]string `json:"inputs_snapshot_parsed"`
|
||||
}
|
||||
|
||||
// GetEventPipelineExecutionDetail 获取执行详情
|
||||
@@ -285,6 +294,10 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if execution == nil {
|
||||
return &EventPipelineExecutionDetail{}, nil
|
||||
}
|
||||
|
||||
detail := &EventPipelineExecutionDetail{
|
||||
EventPipelineExecution: *execution,
|
||||
}
|
||||
@@ -296,12 +309,12 @@ func GetEventPipelineExecutionDetail(c *ctx.Context, id string) (*EventPipelineE
|
||||
}
|
||||
detail.NodeResultsParsed = nodeResults
|
||||
|
||||
// 解析环境变量快照
|
||||
envSnapshot, err := execution.GetEnvSnapshot()
|
||||
// 解析输入参数快照
|
||||
inputsSnapshot, err := execution.GetInputsSnapshot()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse env snapshot error: %w", err)
|
||||
return nil, fmt.Errorf("parse inputs snapshot error: %w", err)
|
||||
}
|
||||
detail.EnvSnapshotParsed = envSnapshot
|
||||
detail.InputsSnapshotParsed = inputsSnapshot
|
||||
|
||||
return detail, nil
|
||||
}
|
||||
|
||||
@@ -234,6 +234,7 @@ type Target struct {
|
||||
type Datasource struct {
|
||||
IsDefault bool `gorm:"column:is_default;type:boolean;comment:is default datasource"`
|
||||
Identifier string `gorm:"column:identifier;type:varchar(255);default:'';comment:identifier"`
|
||||
Weight int `gorm:"column:weight;type:int;default:0;comment:weight for sorting"`
|
||||
}
|
||||
|
||||
type Configs struct {
|
||||
|
||||
@@ -212,7 +212,6 @@ func (re *RecordingRule) Update(ctx *ctx.Context, ref RecordingRule) error {
|
||||
|
||||
ref.FE2DB()
|
||||
ref.Id = re.Id
|
||||
ref.GroupId = re.GroupId
|
||||
ref.CreateAt = re.CreateAt
|
||||
ref.CreateBy = re.CreateBy
|
||||
ref.UpdateAt = time.Now().Unix()
|
||||
|
||||
114
models/target.go
114
models/target.go
@@ -1,6 +1,8 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -8,6 +10,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -36,6 +39,7 @@ type Target struct {
|
||||
OS string `json:"os" gorm:"column:os"`
|
||||
HostTags []string `json:"host_tags" gorm:"serializer:json"`
|
||||
|
||||
BeatTime int64 `json:"beat_time" gorm:"-"` // 实时心跳时间,从 Redis 获取
|
||||
UnixTime int64 `json:"unixtime" gorm:"-"`
|
||||
Offset int64 `json:"offset" gorm:"-"`
|
||||
TargetUp float64 `json:"target_up" gorm:"-"`
|
||||
@@ -97,12 +101,6 @@ func (t *Target) MatchGroupId(gid ...int64) bool {
|
||||
}
|
||||
|
||||
func (t *Target) AfterFind(tx *gorm.DB) (err error) {
|
||||
delta := time.Now().Unix() - t.UpdateAt
|
||||
if delta < 60 {
|
||||
t.TargetUp = 2
|
||||
} else if delta < 180 {
|
||||
t.TargetUp = 1
|
||||
}
|
||||
t.FillTagsMap()
|
||||
return
|
||||
}
|
||||
@@ -182,6 +180,24 @@ func BuildTargetWhereWithHosts(hosts []string) BuildTargetWhereOption {
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithIdents(idents []string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if len(idents) > 0 {
|
||||
session = session.Where("ident in (?)", idents)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereExcludeIdents(idents []string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if len(idents) > 0 {
|
||||
session = session.Where("ident not in (?)", idents)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if query != "" {
|
||||
@@ -203,17 +219,6 @@ func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
|
||||
}
|
||||
}
|
||||
|
||||
func BuildTargetWhereWithDowntime(downtime int64) BuildTargetWhereOption {
|
||||
return func(session *gorm.DB) *gorm.DB {
|
||||
if downtime > 0 {
|
||||
session = session.Where("target.update_at < ?", time.Now().Unix()-downtime)
|
||||
} else if downtime < 0 {
|
||||
session = session.Where("target.update_at > ?", time.Now().Unix()+downtime)
|
||||
}
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
func buildTargetWhere(ctx *ctx.Context, options ...BuildTargetWhereOption) *gorm.DB {
|
||||
sub := DB(ctx).Model(&Target{}).Distinct("target.ident")
|
||||
for _, opt := range options {
|
||||
@@ -264,21 +269,6 @@ func TargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}) (int6
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func MissTargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) ([]*Target, error) {
|
||||
var lst []*Target
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
|
||||
err := session.Order("ident").Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func MissTargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) (int64, error) {
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func TargetFilterQueryBuild(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) *gorm.DB {
|
||||
sub := DB(ctx).Model(&Target{}).Distinct("target.ident").Joins("left join " +
|
||||
"target_busi_group on target.ident = target_busi_group.target_ident")
|
||||
@@ -619,6 +609,66 @@ func (t *Target) FillMeta(meta *HostMeta) {
|
||||
t.RemoteAddr = meta.RemoteAddr
|
||||
}
|
||||
|
||||
// FetchBeatTimesFromRedis 从 Redis 批量获取心跳时间,返回 ident -> updateTime 的映射
|
||||
func FetchBeatTimesFromRedis(redis storage.Redis, idents []string) map[string]int64 {
|
||||
result := make(map[string]int64, len(idents))
|
||||
if redis == nil || len(idents) == 0 {
|
||||
return result
|
||||
}
|
||||
|
||||
num := 0
|
||||
var keys []string
|
||||
for i := 0; i < len(idents); i++ {
|
||||
keys = append(keys, WrapIdentUpdateTime(idents[i]))
|
||||
num++
|
||||
if num == 100 {
|
||||
fetchBeatTimeBatch(redis, keys, result)
|
||||
keys = keys[:0]
|
||||
num = 0
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) > 0 {
|
||||
fetchBeatTimeBatch(redis, keys, result)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func fetchBeatTimeBatch(redis storage.Redis, keys []string, result map[string]int64) {
|
||||
vals := storage.MGet(context.Background(), redis, keys)
|
||||
for _, value := range vals {
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
var hut HostUpdateTime
|
||||
if err := json.Unmarshal(value, &hut); err != nil {
|
||||
logger.Warningf("failed to unmarshal host update time: %v", err)
|
||||
continue
|
||||
}
|
||||
result[hut.Ident] = hut.UpdateTime
|
||||
}
|
||||
}
|
||||
|
||||
// FillTargetsBeatTime 从 Redis 批量获取心跳时间填充 target.BeatTime
|
||||
func FillTargetsBeatTime(redis storage.Redis, targets []*Target) {
|
||||
if len(targets) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
idents := make([]string, len(targets))
|
||||
for i, t := range targets {
|
||||
idents[i] = t.Ident
|
||||
}
|
||||
|
||||
beatTimes := FetchBeatTimesFromRedis(redis, idents)
|
||||
for _, t := range targets {
|
||||
if ts, ok := beatTimes[t.Ident]; ok {
|
||||
t.BeatTime = ts
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TargetIdents(ctx *ctx.Context, ids []int64) ([]string, error) {
|
||||
var ret []string
|
||||
|
||||
|
||||
@@ -315,6 +315,18 @@ func (u *User) UpdatePassword(ctx *ctx.Context, password, updateBy string) error
|
||||
}).Error
|
||||
}
|
||||
|
||||
func (u *User) UpdateUserGroup(ctx *ctx.Context, userGroupIds []int64) error {
|
||||
|
||||
count := len(userGroupIds)
|
||||
for i := 0; i < count; i++ {
|
||||
err := UserGroupMemberAdd(ctx, userGroupIds[i], u.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdateUserLastActiveTime(ctx *ctx.Context, userId int64, lastActiveTime int64) error {
|
||||
return DB(ctx).Model(&User{}).Where("id = ?", userId).Updates(map[string]interface{}{
|
||||
"last_active_time": lastActiveTime,
|
||||
|
||||
@@ -33,13 +33,11 @@ type ConnectionTarget struct {
|
||||
Index int `json:"index"` // 目标节点的输入端口索引
|
||||
}
|
||||
|
||||
// EnvVariable 环境变量
|
||||
type EnvVariable struct {
|
||||
// InputVariable 输入参数
|
||||
type InputVariable struct {
|
||||
Key string `json:"key"` // 变量名
|
||||
Value string `json:"value"` // 默认值
|
||||
Description string `json:"description,omitempty"` // 描述
|
||||
Secret bool `json:"secret,omitempty"` // 是否敏感(日志脱敏)
|
||||
Required bool `json:"required,omitempty"` // 是否必填
|
||||
}
|
||||
|
||||
// NodeOutput 节点执行输出
|
||||
@@ -104,8 +102,8 @@ type WorkflowTriggerContext struct {
|
||||
// 请求ID(API/Cron 触发使用)
|
||||
RequestID string `json:"request_id"`
|
||||
|
||||
// 环境变量覆盖
|
||||
EnvOverrides map[string]string `json:"env_overrides"`
|
||||
// 输入参数覆盖
|
||||
InputsOverrides map[string]string `json:"inputs_overrides"`
|
||||
|
||||
// 流式输出(API 调用时动态指定)
|
||||
Stream bool `json:"stream"`
|
||||
@@ -118,7 +116,7 @@ type WorkflowTriggerContext struct {
|
||||
|
||||
type WorkflowContext struct {
|
||||
Event *AlertCurEvent `json:"event"` // 当前事件
|
||||
Env map[string]string `json:"env"` // 环境变量/配置(静态,来自 Pipeline 配置)
|
||||
Inputs map[string]string `json:"inputs"` // 前置输入参数(静态,用户配置)
|
||||
Vars map[string]interface{} `json:"vars"` // 节点间传递的数据(动态,运行时产生)
|
||||
Metadata map[string]string `json:"metadata"` // 执行元数据(request_id、start_time 等)
|
||||
Output map[string]interface{} `json:"output,omitempty"` // 输出结果(非告警场景使用)
|
||||
@@ -128,19 +126,6 @@ type WorkflowContext struct {
|
||||
StreamChan chan *StreamChunk `json:"-"` // 流式数据通道(不序列化)
|
||||
}
|
||||
|
||||
// SanitizedEnv 返回脱敏后的环境变量(用于日志和存储)
|
||||
func (ctx *WorkflowContext) SanitizedEnv(secretKeys map[string]bool) map[string]string {
|
||||
sanitized := make(map[string]string)
|
||||
for k, v := range ctx.Env {
|
||||
if secretKeys[k] {
|
||||
sanitized[k] = "******"
|
||||
} else {
|
||||
sanitized[k] = v
|
||||
}
|
||||
}
|
||||
return sanitized
|
||||
}
|
||||
|
||||
// StreamChunk 类型常量
|
||||
const (
|
||||
StreamTypeThinking = "thinking" // AI 思考过程(ReAct Thought)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package cfg
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
type scanner struct {
|
||||
@@ -23,6 +23,6 @@ func (s *scanner) Data() []byte {
|
||||
|
||||
func (s *scanner) Read(file string) {
|
||||
if s.err == nil {
|
||||
s.data, s.err = ioutil.ReadFile(file)
|
||||
s.data, s.err = os.ReadFile(file)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,17 +32,18 @@ type SsoClient struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enable bool `json:"enable"`
|
||||
AuthURL string `json:"auth_url"`
|
||||
DisplayName string `json:"display_name"`
|
||||
AppID string `json:"app_id"`
|
||||
AppSecret string `json:"app_secret"`
|
||||
RedirectURL string `json:"redirect_url"`
|
||||
UsernameField string `json:"username_field"` // name, email, phone
|
||||
FeiShuEndpoint string `json:"feishu_endpoint"` // 飞书API端点,默认为 open.feishu.cn
|
||||
Proxy string `json:"proxy"`
|
||||
CoverAttributes bool `json:"cover_attributes"`
|
||||
DefaultRoles []string `json:"default_roles"`
|
||||
Enable bool `json:"enable"`
|
||||
AuthURL string `json:"auth_url"`
|
||||
DisplayName string `json:"display_name"`
|
||||
AppID string `json:"app_id"`
|
||||
AppSecret string `json:"app_secret"`
|
||||
RedirectURL string `json:"redirect_url"`
|
||||
UsernameField string `json:"username_field"` // name, email, phone
|
||||
FeiShuEndpoint string `json:"feishu_endpoint"` // 飞书API端点,默认为 open.feishu.cn
|
||||
Proxy string `json:"proxy"`
|
||||
CoverAttributes bool `json:"cover_attributes"`
|
||||
DefaultRoles []string `json:"default_roles"`
|
||||
DefaultUserGroups []int64 `json:"default_user_groups"`
|
||||
}
|
||||
|
||||
type CallbackOutput struct {
|
||||
@@ -312,6 +313,8 @@ func (s *SsoClient) Callback(redis storage.Redis, ctx context.Context, code, sta
|
||||
|
||||
// 根据UsernameField配置确定username
|
||||
switch s.FeiShuConfig.UsernameField {
|
||||
case "userid":
|
||||
callbackOutput.Username = username
|
||||
case "name":
|
||||
if nickname == "" {
|
||||
return nil, errors.New("feishu user name is empty")
|
||||
|
||||
@@ -106,6 +106,7 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 心跳时间只写入 Redis,不再写入 MySQL update_at
|
||||
err := s.updateTargetsUpdateTs(lst, now, s.redis)
|
||||
if err != nil {
|
||||
logger.Errorf("update_ts: failed to update targets: %v error: %v", lst, err)
|
||||
@@ -133,12 +134,7 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
// 新 target 仍需 INSERT 注册到 MySQL
|
||||
var exists []string
|
||||
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
if err != nil {
|
||||
@@ -153,35 +149,9 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
logger.Debugf("update_ts: redis is nil")
|
||||
@@ -248,7 +218,7 @@ func (s *Set) writeTargetTsInRedis(ctx context.Context, redis storage.Redis, con
|
||||
|
||||
for i := 0; i < retryCount; i++ {
|
||||
start := time.Now()
|
||||
err := storage.MSet(ctx, redis, content)
|
||||
err := storage.MSet(ctx, redis, content, 24*time.Hour)
|
||||
duration := time.Since(start).Seconds()
|
||||
|
||||
logger.Debugf("update_ts: write target ts in redis, keys: %v, retryCount: %d, retryInterval: %v, error: %v", keys, retryCount, retryInterval, err)
|
||||
|
||||
@@ -18,8 +18,6 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
UpdateTargetByUrlConcurrency int
|
||||
|
||||
@@ -129,10 +127,6 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
@@ -109,21 +109,30 @@ func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) {
|
||||
}
|
||||
|
||||
func (rt *Router) DropSample(v *prompb.TimeSeries) bool {
|
||||
filters := rt.Pushgw.DropSample
|
||||
if len(filters) == 0 {
|
||||
// 快速路径:检查仅 __name__ 的过滤器 O(1)
|
||||
if len(rt.dropByNameOnly) > 0 {
|
||||
for i := 0; i < len(v.Labels); i++ {
|
||||
if v.Labels[i].Name == "__name__" {
|
||||
if _, ok := rt.dropByNameOnly[v.Labels[i].Value]; ok {
|
||||
return true
|
||||
}
|
||||
break // __name__ 只会出现一次,找到后直接跳出
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 慢速路径:处理复杂的多条件过滤器
|
||||
if len(rt.dropComplex) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
labelMap := make(map[string]string)
|
||||
// 只有复杂过滤器存在时才创建 labelMap
|
||||
labelMap := make(map[string]string, len(v.Labels))
|
||||
for i := 0; i < len(v.Labels); i++ {
|
||||
labelMap[v.Labels[i].Name] = v.Labels[i].Value
|
||||
}
|
||||
|
||||
for _, filter := range filters {
|
||||
if len(filter) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, filter := range rt.dropComplex {
|
||||
if matchSample(filter, labelMap) {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/center/metas"
|
||||
@@ -33,6 +34,10 @@ type Router struct {
|
||||
Ctx *ctx.Context
|
||||
HandleTS HandleTSFunc
|
||||
HeartbeatApi string
|
||||
|
||||
// 预编译的 DropSample 过滤器
|
||||
dropByNameOnly map[string]struct{} // 仅 __name__ 条件的快速匹配
|
||||
dropComplex []map[string]string // 多条件的复杂匹配
|
||||
}
|
||||
|
||||
func stat() gin.HandlerFunc {
|
||||
@@ -51,7 +56,7 @@ func stat() gin.HandlerFunc {
|
||||
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType,
|
||||
idents *idents.Set, metas *metas.Set,
|
||||
writers *writer.WritersType, ctx *ctx.Context) *Router {
|
||||
return &Router{
|
||||
rt := &Router{
|
||||
HTTP: httpConfig,
|
||||
Pushgw: pushgw,
|
||||
Aconf: aconf,
|
||||
@@ -63,6 +68,38 @@ func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *me
|
||||
MetaSet: metas,
|
||||
HandleTS: func(pt *prompb.TimeSeries) *prompb.TimeSeries { return pt },
|
||||
}
|
||||
|
||||
// 预编译 DropSample 过滤器
|
||||
rt.initDropSampleFilters()
|
||||
|
||||
return rt
|
||||
}
|
||||
|
||||
// initDropSampleFilters 预编译 DropSample 过滤器,将单条件 __name__ 过滤器
|
||||
// 放入 map 实现 O(1) 查找,多条件过滤器保留原有逻辑
|
||||
func (rt *Router) initDropSampleFilters() {
|
||||
rt.dropByNameOnly = make(map[string]struct{})
|
||||
rt.dropComplex = make([]map[string]string, 0)
|
||||
|
||||
for _, filter := range rt.Pushgw.DropSample {
|
||||
if len(filter) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// 如果只有一个条件且是 __name__,放入快速匹配 map
|
||||
if len(filter) == 1 {
|
||||
if name, ok := filter["__name__"]; ok {
|
||||
rt.dropByNameOnly[name] = struct{}{}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 其他情况放入复杂匹配列表
|
||||
rt.dropComplex = append(rt.dropComplex, filter)
|
||||
}
|
||||
|
||||
logger.Infof("DropSample filters initialized: %d name-only, %d complex",
|
||||
len(rt.dropByNameOnly), len(rt.dropComplex))
|
||||
}
|
||||
|
||||
func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
@@ -163,10 +163,10 @@ func MGet(ctx context.Context, r Redis, keys []string) [][]byte {
|
||||
return vals
|
||||
}
|
||||
|
||||
func MSet(ctx context.Context, r Redis, m map[string]interface{}) error {
|
||||
func MSet(ctx context.Context, r Redis, m map[string]interface{}, expiration time.Duration) error {
|
||||
pipe := r.Pipeline()
|
||||
for k, v := range m {
|
||||
pipe.Set(ctx, k, v, 0)
|
||||
pipe.Set(ctx, k, v, expiration)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
return err
|
||||
|
||||
@@ -30,7 +30,7 @@ func TestMiniRedisMGet(t *testing.T) {
|
||||
mp["key2"] = "value2"
|
||||
mp["key3"] = "value3"
|
||||
|
||||
err = MSet(context.Background(), rdb, mp)
|
||||
err = MSet(context.Background(), rdb, mp, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to set miniredis value: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user