Compare commits

..

1 Commits

Author SHA1 Message Date
ning
47ce119d21 es sql alert 2025-12-11 16:52:57 +08:00
25 changed files with 288 additions and 701 deletions

View File

@@ -825,12 +825,12 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
if len(t.Host) == 0 {
sender.CallIbex(e.ctx, t.TplId, event.TargetIdent,
e.taskTplsCache, e.targetCache, e.userCache, event, "")
e.taskTplsCache, e.targetCache, e.userCache, event)
continue
}
for _, host := range t.Host {
sender.CallIbex(e.ctx, t.TplId, host,
e.taskTplsCache, e.targetCache, e.userCache, event, "")
e.taskTplsCache, e.targetCache, e.userCache, event)
}
}
}

View File

@@ -151,7 +151,7 @@ func (arw *AlertRuleWorker) Eval() {
if len(message) == 0 {
logger.Infof("rule_eval:%s finished, duration:%v", arw.Key(), time.Since(begin))
} else {
logger.Warningf("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
logger.Infof("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
}
}()
@@ -186,7 +186,8 @@ func (arw *AlertRuleWorker) Eval() {
}
if err != nil {
message = fmt.Sprintf("failed to get anomaly points: %v", err)
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
message = "failed to get anomaly points"
return
}

View File

@@ -86,33 +86,30 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
return
}
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event, "")
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event)
}
func CallIbex(ctx *ctx.Context, id int64, host string,
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
userCache *memsto.UserCacheType, event *models.AlertCurEvent, args string) (int64, error) {
logger.Infof("event_callback_ibex: id: %d, host: %s, args: %s, event: %+v", id, host, args, event)
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
tpl := taskTplCache.Get(id)
if tpl == nil {
err := fmt.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
logger.Errorf("%s", err)
return 0, err
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
return
}
// check perm
// tpl.GroupId - host - account 三元组校验权限
can, err := CanDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
if err != nil {
err = fmt.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return 0, err
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
return
}
if !can {
err = fmt.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
logger.Errorf("%s", err)
return 0, err
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
return
}
tagsMap := make(map[string]string)
@@ -136,16 +133,11 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
tags, err := json.Marshal(tagsMap)
if err != nil {
err = fmt.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
logger.Errorf("%s", err)
return 0, err
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
return
}
// call ibex
taskArgs := tpl.Args
if args != "" {
taskArgs = args
}
in := models.TaskForm{
Title: tpl.Title + " FH: " + host,
Account: tpl.Account,
@@ -154,7 +146,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
Timeout: tpl.Timeout,
Pause: tpl.Pause,
Script: tpl.Script,
Args: taskArgs,
Args: tpl.Args,
Stdin: string(tags),
Action: "start",
Creator: tpl.UpdateBy,
@@ -164,9 +156,8 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
if err != nil {
err = fmt.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return 0, err
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
return
}
// write db
@@ -187,14 +178,11 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
}
if err = record.Add(ctx); err != nil {
err = fmt.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return id, err
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
}
return id, nil
}
func CanDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
user := userCache.GetByUsername(username)
if user != nil && user.IsAdmin() {
return true, nil

View File

@@ -13,53 +13,10 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
// webhookClientCache 缓存 http.Client避免每次请求都创建新的 Client 导致连接泄露
var webhookClientCache sync.Map // key: clientKey (string), value: *http.Client
// 相同配置的 webhook 会复用同一个 Client
func getWebhookClient(webhook *models.Webhook) *http.Client {
clientKey := webhook.Hash()
if client, ok := webhookClientCache.Load(clientKey); ok {
return client.(*http.Client)
}
// 创建新的 Client
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipVerify},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
}
if poster.UseProxy(webhook.Url) {
transport.Proxy = http.ProxyFromEnvironment
}
timeout := webhook.Timeout
if timeout <= 0 {
timeout = 10
}
newClient := &http.Client{
Timeout: time.Duration(timeout) * time.Second,
Transport: transport,
}
// 使用 LoadOrStore 确保并发安全,避免重复创建
actual, loaded := webhookClientCache.LoadOrStore(clientKey, newClient)
if loaded {
return actual.(*http.Client)
}
return newClient
}
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) (bool, string, error) {
channel := "webhook"
if webhook.Type == models.RuleCallback {
@@ -98,13 +55,25 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
req.Header.Set(conf.Headers[i], conf.Headers[i+1])
}
}
// 使用全局 Client 缓存,避免每次请求都创建新的 Client 导致连接泄露
client := getWebhookClient(conf)
insecureSkipVerify := false
if webhook != nil {
insecureSkipVerify = webhook.SkipVerify
}
if conf.Client == nil {
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
conf.Client = &http.Client{
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
}
}
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
var resp *http.Response
var body []byte
resp, err = client.Do(req)
resp, err = conf.Client.Do(req)
if err != nil {
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()

View File

@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
pages.POST("/ds-query", rt.auth(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)
@@ -569,14 +569,6 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/pagerduty-service-list/:id", rt.auth(), rt.user(), rt.pagerDutyNotifyServicesGet)
pages.GET("/notify-channel-config", rt.auth(), rt.user(), rt.notifyChannelGetBy)
pages.GET("/notify-channel-config/idents", rt.notifyChannelIdentsGet)
// saved view 查询条件保存相关路由
pages.GET("/saved-views", rt.auth(), rt.user(), rt.savedViewGets)
pages.POST("/saved-views", rt.auth(), rt.user(), rt.savedViewAdd)
pages.PUT("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewPut)
pages.DELETE("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewDel)
pages.POST("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteAdd)
pages.DELETE("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteDel)
}
r.GET("/api/n9e/versions", func(c *gin.Context) {

View File

@@ -1,23 +1,18 @@
package router
import (
"context"
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
// todo: 后续需要根据 cate 判断是否需要权限
return true
}
@@ -112,13 +107,10 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
}
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var (
resp []models.DataResp
mu sync.Mutex
wg sync.WaitGroup
errs []error
rCtx = ctx.Request.Context()
)
var resp []models.DataResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Queries {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
@@ -130,17 +122,12 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
logger.Warningf("cluster:%d not exists", f.DatasourceId)
return nil, fmt.Errorf("cluster not exists")
}
vCtx := rCtx
if f.Cate == models.DORIS {
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, err := plug.QueryData(vCtx, query)
data, err := plug.QueryData(ctx.Request.Context(), query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()

View File

@@ -1,144 +0,0 @@
package router
import (
"net/http"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/slice"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func (rt *Router) savedViewGets(c *gin.Context) {
page := ginx.QueryStr(c, "page", "")
me := c.MustGet("user").(*models.User)
lst, err := models.SavedViewGets(rt.Ctx, page)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
favoriteMap, err := models.SavedViewFavoriteGetByUserId(rt.Ctx, me.Id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
favoriteViews := make([]models.SavedView, 0)
normalViews := make([]models.SavedView, 0)
for _, view := range lst {
visible := view.CreateBy == me.Username ||
view.PublicCate == 2 ||
(view.PublicCate == 1 && slice.HaveIntersection[int64](userGids, view.Gids))
if !visible {
continue
}
view.IsFavorite = favoriteMap[view.Id]
// 收藏的排前面
if view.IsFavorite {
favoriteViews = append(favoriteViews, view)
} else {
normalViews = append(normalViews, view)
}
}
ginx.NewRender(c).Data(append(favoriteViews, normalViews...), nil)
}
func (rt *Router) savedViewAdd(c *gin.Context) {
var f models.SavedView
ginx.BindJSON(c, &f)
me := c.MustGet("user").(*models.User)
f.Id = 0
f.CreateBy = me.Username
f.UpdateBy = me.Username
err := models.SavedViewAdd(rt.Ctx, &f)
ginx.NewRender(c).Data(f.Id, err)
}
func (rt *Router) savedViewPut(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
view, err := models.SavedViewGetById(rt.Ctx, id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
if view == nil {
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
return
}
me := c.MustGet("user").(*models.User)
// 只有创建者可以更新
if view.CreateBy != me.Username && !me.IsAdmin() {
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
return
}
var f models.SavedView
ginx.BindJSON(c, &f)
view.Name = f.Name
view.Filter = f.Filter
view.PublicCate = f.PublicCate
view.Gids = f.Gids
err = models.SavedViewUpdate(rt.Ctx, view, me.Username)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewDel(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
view, err := models.SavedViewGetById(rt.Ctx, id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
if view == nil {
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
return
}
me := c.MustGet("user").(*models.User)
// 只有创建者或管理员可以删除
if view.CreateBy != me.Username && !me.IsAdmin() {
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
return
}
err = models.SavedViewDel(rt.Ctx, id)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewFavoriteAdd(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
me := c.MustGet("user").(*models.User)
err := models.UserViewFavoriteAdd(rt.Ctx, id, me.Id)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewFavoriteDel(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
me := c.MustGet("user").(*models.User)
err := models.UserViewFavoriteDel(rt.Ctx, id, me.Id)
ginx.NewRender(c).Message(err)
}

View File

@@ -1,6 +1,7 @@
package eslike
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -10,13 +11,16 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
)
type FixedField string
@@ -26,10 +30,6 @@ const (
FieldId FixedField = "_id"
)
// LabelSeparator 用于分隔多个标签的分隔符
// 使用 ASCII 控制字符 Record Separator (0x1E),避免与用户数据中的 "--" 冲突
const LabelSeparator = "\x1e"
type Query struct {
Ref string `json:"ref" mapstructure:"ref"`
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
@@ -51,6 +51,12 @@ type Query struct {
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
QueryType string `json:"query_type" mapstructure:"query_type"`
Query string `json:"query" mapstructure:"query"`
CustomParams map[string]interface{} `json:"custom_params" mapstructure:"custom_params"`
MaxQueryRows int `json:"max_query_rows" mapstructure:"max_query_rows"`
Keys types.Keys `json:"keys" mapstructure:"keys"`
}
type SortField struct {
@@ -132,7 +138,7 @@ func TransferData(metric, ref string, m map[string][][]float64) []models.DataRes
}
data.Metric["__name__"] = model.LabelValue(metric)
labels := strings.Split(k, LabelSeparator)
labels := strings.Split(k, "--")
for _, label := range labels {
arr := strings.SplitN(label, "=", 2)
if len(arr) == 2 {
@@ -201,7 +207,7 @@ func GetBuckets(labelKey string, keys []string, arr []interface{}, metrics *Metr
case json.Number, string:
if !getTs {
if labels != "" {
newlabels = fmt.Sprintf("%s%s%s=%v", labels, LabelSeparator, labelKey, keyValue)
newlabels = fmt.Sprintf("%s--%s=%v", labels, labelKey, keyValue)
} else {
newlabels = fmt.Sprintf("%s=%v", labelKey, keyValue)
}
@@ -718,3 +724,121 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
return ret, total, nil
}
// execSQLQuery executes ES SQL query and returns rows as []map[string]interface{}
func execSQLQuery(ctx context.Context, param *Query, client *elasticsearch.Client) ([]map[string]interface{}, error) {
query := map[string]interface{}{
"query": param.Query,
}
for k, v := range param.CustomParams {
query[k] = v
}
if param.Timeout > 0 {
query["request_timeout"] = fmt.Sprintf("%ds", param.Timeout)
}
if param.MaxQueryRows > 0 {
query["fetch_size"] = param.MaxQueryRows
}
queryBytes, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to marshal SQL query: %v", err)
}
req := esapi.SQLQueryRequest{
Body: bytes.NewReader(queryBytes),
}
res, err := req.Do(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to execute SQL query: %v", err)
}
defer res.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to parse SQL response: %v", err)
}
if res.IsError() {
if errObj, ok := result["error"].(map[string]interface{}); ok {
return nil, fmt.Errorf("SQL query error: %s", errObj["reason"])
}
return nil, fmt.Errorf("SQL query error: unknown")
}
columns, _ := result["columns"].([]interface{})
rows, _ := result["rows"].([]interface{})
var rowMaps []map[string]interface{}
for _, row := range rows {
rowData, _ := row.([]interface{})
rowMap := make(map[string]interface{})
for i, col := range columns {
if colObj, ok := col.(map[string]interface{}); ok {
colName, _ := colObj["name"].(string)
if i < len(rowData) {
rowMap[colName] = rowData[i]
}
}
}
rowMaps = append(rowMaps, rowMap)
}
return rowMaps, nil
}
func QuerySQLData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, client *elasticsearch.Client) ([]models.DataResp, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, err
}
if param.Timeout == 0 {
param.Timeout = int(cliTimeout) / 1000
}
rowMaps, err := execSQLQuery(ctx, param, client)
if err != nil {
return nil, err
}
metricValues := sqlbase.FormatMetricValues(param.Keys, rowMaps)
var dataResps []models.DataResp
for _, mv := range metricValues {
dataResps = append(dataResps, models.DataResp{
Ref: param.Ref,
Metric: mv.Metric,
Values: mv.Values,
})
}
return dataResps, nil
}
func QuerySQLLog(ctx context.Context, queryParam interface{}, timeout int64, version string, client *elasticsearch.Client) ([]interface{}, int64, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, 0, err
}
if param.Timeout == 0 {
param.Timeout = int(timeout) / 1000
}
rowMaps, err := execSQLQuery(ctx, param, client)
if err != nil {
return nil, 0, err
}
ret := make([]interface{}, len(rowMaps))
for i, rowMap := range rowMaps {
ret[i] = rowMap
}
return ret, 0, nil
}

View File

@@ -4,13 +4,12 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/toolkits/pkg/logger"
@@ -39,8 +38,6 @@ type QueryParam struct {
To int64 `json:"to" mapstructure:"to"`
TimeField string `json:"time_field" mapstructure:"time_field"`
TimeFormat string `json:"time_format" mapstructure:"time_format"`
Interval int64 `json:"interval" mapstructure:"interval"` // 查询时间间隔(秒)
Offset int `json:"offset" mapstructure:"offset"` // 延迟计算不在使用通用配置delay
}
func (d *Doris) InitClient() error {
@@ -149,30 +146,6 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
return nil, fmt.Errorf("valueKey is required")
}
// 设置默认 interval
if dorisQueryParam.Interval == 0 {
dorisQueryParam.Interval = 60
}
// 计算时间范围
now := time.Now().Unix()
var start, end int64
if dorisQueryParam.To != 0 && dorisQueryParam.From != 0 {
end = dorisQueryParam.To
start = dorisQueryParam.From
} else {
end = now
start = end - dorisQueryParam.Interval
}
if dorisQueryParam.Offset != 0 {
end -= int64(dorisQueryParam.Offset)
start -= int64(dorisQueryParam.Offset)
}
dorisQueryParam.From = start
dorisQueryParam.To = end
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
@@ -181,14 +154,13 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
}
}
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
Keys: types.Keys{
ValueKey: dorisQueryParam.Keys.ValueKey,
LabelKey: dorisQueryParam.Keys.LabelKey,
TimeKey: dorisQueryParam.Keys.TimeKey,
Offset: dorisQueryParam.Offset,
},
})
if err != nil {
@@ -216,18 +188,6 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
return nil, 0, err
}
// 记录规则预览场景下只传了interval, 没有传From和To
now := time.Now().Unix()
if dorisQueryParam.To == 0 && dorisQueryParam.From == 0 && dorisQueryParam.Interval != 0 {
dorisQueryParam.To = now
dorisQueryParam.From = now - dorisQueryParam.Interval
}
if dorisQueryParam.Offset != 0 {
dorisQueryParam.To -= int64(dorisQueryParam.Offset)
dorisQueryParam.From -= int64(dorisQueryParam.Offset)
}
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)

View File

@@ -17,6 +17,7 @@ import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/tlsx"
"github.com/elastic/go-elasticsearch/v9"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/toolkits/pkg/logger"
@@ -27,18 +28,20 @@ const (
)
type Elasticsearch struct {
Addr string `json:"es.addr" mapstructure:"es.addr"`
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
Version string `json:"es.version" mapstructure:"es.version"`
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
Addr string `json:"es.addr" mapstructure:"es.addr"`
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
Version string `json:"es.version" mapstructure:"es.version"`
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
NewClient *elasticsearch.Client `json:"es.new_client" mapstructure:"es.new_client"`
MaxQueryRows int `json:"es.max_query_rows" mapstructure:"es.max_query_rows"`
}
type TLS struct {
@@ -110,6 +113,23 @@ func (e *Elasticsearch) InitClient() error {
return err
}
cfg := elasticsearch.Config{
Addresses: e.Nodes,
Transport: transport,
Header: http.Header{},
}
if e.Basic.Username != "" && e.Basic.Password != "" {
cfg.Username = e.Basic.Username
cfg.Password = e.Basic.Password
}
for k, v := range e.Headers {
cfg.Header[k] = []string{v}
}
e.NewClient, err = elasticsearch.NewClient(cfg)
return err
}
@@ -183,6 +203,15 @@ func (e *Elasticsearch) MakeTSQuery(ctx context.Context, query interface{}, even
}
func (e *Elasticsearch) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
param := new(eslike.Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, err
}
if param.QueryType == "SQL" {
return eslike.QuerySQLData(ctx, param, e.Timeout, e.Version, e.NewClient)
}
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
return e.Client.Search().
Index(indices...).
@@ -248,6 +277,20 @@ func (e *Elasticsearch) QueryFields(indexes []string) ([]string, error) {
}
func (e *Elasticsearch) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
param := new(eslike.Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, 0, err
}
if param.QueryType == "SQL" {
if param.CustomParams == nil {
param.CustomParams = make(map[string]interface{})
}
if e.MaxQueryRows > 0 {
param.MaxQueryRows = e.MaxQueryRows
}
return eslike.QuerySQLLog(ctx, param, e.Timeout, e.Version, e.NewClient)
}
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
// 应该是之前为了获取 fields 字段,做的这个兼容

View File

@@ -33,7 +33,6 @@ type Query struct {
Time int64 `json:"time" mapstructure:"time"` // 单点时间(秒)- 用于告警
Step string `json:"step" mapstructure:"step"` // 步长,如 "1m", "5m"
Limit int `json:"limit" mapstructure:"limit"` // 限制返回数量
Ref string `json:"ref" mapstructure:"ref"` // 变量引用名(如 A、B
}
// IsInstantQuery 判断是否为即时查询(告警场景)
@@ -163,7 +162,7 @@ func (vl *VictoriaLogs) queryDataInstant(ctx context.Context, param *Query) ([]m
return nil, err
}
return convertPrometheusInstantToDataResp(result, param.Ref), nil
return convertPrometheusInstantToDataResp(result), nil
}
// queryDataRange 看图场景,调用 /select/logsql/stats_query_range
@@ -186,17 +185,15 @@ func (vl *VictoriaLogs) queryDataRange(ctx context.Context, param *Query) ([]mod
return nil, err
}
return convertPrometheusRangeToDataResp(result, param.Ref), nil
return convertPrometheusRangeToDataResp(result), nil
}
// convertPrometheusInstantToDataResp 将 Prometheus Instant Query 格式转换为 DataResp
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse) []models.DataResp {
var dataResps []models.DataResp
for _, item := range resp.Data.Result {
dataResp := models.DataResp{
Ref: ref,
}
dataResp := models.DataResp{}
// 转换 Metric
dataResp.Metric = make(model.Metric)
@@ -221,13 +218,11 @@ func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, r
}
// convertPrometheusRangeToDataResp 将 Prometheus Range Query 格式转换为 DataResp
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse) []models.DataResp {
var dataResps []models.DataResp
for _, item := range resp.Data.Result {
dataResp := models.DataResp{
Ref: ref,
}
dataResp := models.DataResp{}
// 转换 Metric
dataResp.Metric = make(model.Metric)

View File

@@ -57,29 +57,3 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
return cs.datas[cate][dsId], true
}
func (cs *Cache) Delete(cate string, dsId int64) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if _, found := cs.datas[cate]; !found {
return
}
delete(cs.datas[cate], dsId)
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
}
// GetAllIds 返回缓存中所有数据源的 ID按类型分组
func (cs *Cache) GetAllIds() map[string][]int64 {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
result := make(map[string][]int64)
for cate, dsMap := range cs.datas {
ids := make([]int64, 0, len(dsMap))
for dsId := range dsMap {
ids = append(ids, dsId)
}
result[cate] = ids
}
return result
}

View File

@@ -170,13 +170,11 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
ds.Settings["es.min_interval"] = item.SettingsJson["min_interval"]
ds.Settings["es.max_shard"] = item.SettingsJson["max_shard"]
ds.Settings["es.enable_write"] = item.SettingsJson["enable_write"]
ds.Settings["es.max_query_rows"] = item.SettingsJson["max_query_rows"]
}
func PutDatasources(items []datasource.DatasourceInfo) {
// 记录当前有效的数据源 ID按类型分组
validIds := make(map[string]map[int64]struct{})
ids := make([]int64, 0)
for _, item := range items {
if item.Type == "prometheus" {
continue
@@ -205,12 +203,6 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}
ids = append(ids, item.Id)
// 记录有效的数据源 ID
if _, ok := validIds[typ]; !ok {
validIds[typ] = make(map[int64]struct{})
}
validIds[typ][item.Id] = struct{}{}
// 异步初始化 client 不然数据源同步的会很慢
go func() {
defer func() {
@@ -222,19 +214,5 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}()
}
// 删除 items 中不存在但 DsCache 中存在的数据源
cachedIds := DsCache.GetAllIds()
for cate, dsIds := range cachedIds {
for _, dsId := range dsIds {
if _, ok := validIds[cate]; !ok {
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
DsCache.Delete(cate, dsId)
} else if _, ok := validIds[cate][dsId]; !ok {
// 该数据源 ID 在 items 中不存在,删除
DsCache.Delete(cate, dsId)
}
}
}
logger.Debugf("get plugin by type success Ids:%v", ids)
}

View File

@@ -18,21 +18,13 @@ import (
"github.com/mitchellh/mapstructure"
)
const (
ShowIndexFieldIndexType = "index_type"
ShowIndexFieldColumnName = "column_name"
ShowIndexKeyName = "key_name"
SQLShowIndex = "SHOW INDEX FROM "
)
// Doris struct to hold connection details and the connection object
type Doris struct {
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
User string `json:"doris.user" mapstructure:"doris.user"` //
Password string `json:"doris.password" mapstructure:"doris.password"` //
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
@@ -127,7 +119,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
if timeout == 0 {
timeout = 60
}
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
}
// ShowDatabases lists all databases in Doris
@@ -320,88 +312,6 @@ func (d *Doris) DescTable(ctx context.Context, database, table string) ([]*types
return columns, nil
}
type TableIndexInfo struct {
ColumnName string `json:"column_name"`
IndexName string `json:"index_name"`
IndexType string `json:"index_type"`
}
// ShowIndexes 查询表的所有索引信息
func (d *Doris) ShowIndexes(ctx context.Context, database, table string) ([]TableIndexInfo, error) {
if database == "" || table == "" {
return nil, fmt.Errorf("database and table names cannot be empty")
}
tCtx, cancel := d.createTimeoutContext(ctx)
defer cancel()
db, err := d.NewConn(tCtx, database)
if err != nil {
return nil, err
}
querySQL := fmt.Sprintf("%s `%s`.`%s`", SQLShowIndex, database, table)
rows, err := db.QueryContext(tCtx, querySQL)
if err != nil {
return nil, fmt.Errorf("failed to query indexes: %w", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get columns: %w", err)
}
count := len(columns)
// 预映射列索引
colIdx := map[string]int{
ShowIndexKeyName: -1,
ShowIndexFieldColumnName: -1,
ShowIndexFieldIndexType: -1,
}
for i, col := range columns {
lCol := strings.ToLower(col)
if lCol == ShowIndexKeyName || lCol == ShowIndexFieldColumnName || lCol == ShowIndexFieldIndexType {
colIdx[lCol] = i
}
}
var result []TableIndexInfo
for rows.Next() {
// 使用 sql.RawBytes 可以接受任何类型并转为 string避免复杂的类型断言
scanArgs := make([]interface{}, count)
values := make([]sql.RawBytes, count)
for i := range values {
scanArgs[i] = &values[i]
}
if err = rows.Scan(scanArgs...); err != nil {
return nil, err
}
info := TableIndexInfo{}
if i := colIdx[ShowIndexFieldColumnName]; i != -1 && i < count {
info.ColumnName = string(values[i])
}
if i := colIdx[ShowIndexKeyName]; i != -1 && i < count {
info.IndexName = string(values[i])
}
if i := colIdx[ShowIndexFieldIndexType]; i != -1 && i < count {
info.IndexType = string(values[i])
}
if info.ColumnName != "" {
result = append(result, info)
}
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %w", err)
}
return result, nil
}
// SelectRows selects rows from a specified table in Doris based on a given query with MaxQueryRows check
func (d *Doris) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) {
sql := fmt.Sprintf("SELECT * FROM %s.%s", database, table)

View File

@@ -10,14 +10,13 @@ const (
TimeseriesAggregationTimestamp = "__ts__"
)
// QueryLogs 查询日志
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 等同于 Query()
return d.Query(ctx, query, true)
return d.Query(ctx, query)
}
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
values, err := d.QueryTimeseries(ctx, query)
if err != nil {

View File

@@ -15,10 +15,6 @@ const (
TimeFieldFormatDateTime = "datetime"
)
type noNeedCheckMaxRowKey struct{}
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
// 不再拼接SQL, 完全信赖用户的输入
type QueryParam struct {
Database string `json:"database"`
@@ -43,7 +39,7 @@ var (
)
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 校验SQL的合法性, 过滤掉 write请求
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
for _, item := range sqlItem {
@@ -52,12 +48,10 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
}
}
if checkMaxRow {
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
@@ -69,12 +63,8 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
// 默认需要检查,除非调用方声明不需要检查
checkMaxRow := true
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
checkMaxRow = false
}
rows, err := d.Query(ctx, query, checkMaxRow)
// 使用 Query 方法执行查询Query方法内部已包含MaxQueryRows检查
rows, err := d.Query(ctx, query)
if err != nil {
return nil, err
}

View File

@@ -158,10 +158,7 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe
}
if !exists {
// Default to current time if not specified
// 大多数情况下offset为空
// 对于记录规则延迟计算的情况,统计值的时间戳需要有偏移,以便跟统计值对应
ts = float64(time.Now().Unix()) - float64(keys.Offset)
ts = float64(time.Now().Unix()) // Default to current time if not specified
}
valuePair := []float64{ts, value}

View File

@@ -48,5 +48,4 @@ type Keys struct {
LabelKey string `json:"labelKey" mapstructure:"labelKey"` // 多个用空格分隔
TimeKey string `json:"timeKey" mapstructure:"timeKey"`
TimeFormat string `json:"timeFormat" mapstructure:"timeFormat"` // not used anymore
Offset int `json:"offset" mapstructure:"offset"`
}

11
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/go-elasticsearch/v9 v9.0.0
github.com/expr-lang/expr v1.16.1
github.com/flashcatcloud/ibex v1.3.6
github.com/gin-contrib/pprof v1.4.0
@@ -75,8 +76,8 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
)
require (
@@ -90,7 +91,10 @@ require (
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
github.com/glebarez/go-sqlite v1.21.2 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
@@ -103,11 +107,12 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
golang.org/x/sync v0.18.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
modernc.org/libc v1.22.5 // indirect

26
go.sum
View File

@@ -144,6 +144,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v9 v9.0.0 h1:krpgPeJ2lC8apkaw6B58gKDYJq5eUhP8AMwpPt01Q/U=
github.com/elastic/go-elasticsearch/v9 v9.0.0/go.mod h1:2PB5YQPpY5tWbF65MRqzEXA31PZOdXCkloQSOZtU14I=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -183,6 +187,11 @@ github.com/go-ldap/ldap/v3 v3.4.4 h1:qPjipEpt+qDa6SI/h1fzuGWoRUY+qqQ9sOZq67/PYUs
github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXgXtJC+aI=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -232,8 +241,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
@@ -463,10 +473,16 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=

View File

@@ -68,8 +68,7 @@ func MigrateTables(db *gorm.DB) error {
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.NotificationRecord{}, &models.TargetBusiGroup{},
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{},
&models.SavedView{}, &models.UserViewFavorite{}}
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{}}
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})

View File

@@ -31,8 +31,7 @@ type Webhook struct {
RetryCount int `json:"retry_count"`
RetryInterval int `json:"retry_interval"`
Batch int `json:"batch"`
Client *http.Client `json:"-"`
Client *http.Client `json:"-"`
}
func (w *Webhook) Hash() string {

View File

@@ -1,174 +0,0 @@
package models
import (
"errors"
"strings"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
var (
ErrSavedViewNameEmpty = errors.New("saved view name is blank")
ErrSavedViewPageEmpty = errors.New("saved view page is blank")
ErrSavedViewNotFound = errors.New("saved view not found")
ErrSavedViewNameDuplicate = errors.New("saved view name already exists in this page")
)
type SavedView struct {
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
Name string `json:"name" gorm:"type:varchar(255);not null"`
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
Filter string `json:"filter" gorm:"type:text"`
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
// 查询时填充的字段
IsFavorite bool `json:"is_favorite" gorm:"-"`
}
func (SavedView) TableName() string {
return "saved_view"
}
func (sv *SavedView) Verify() error {
sv.Name = strings.TrimSpace(sv.Name)
if sv.Name == "" {
return ErrSavedViewNameEmpty
}
if sv.Page == "" {
return ErrSavedViewPageEmpty
}
return nil
}
func SavedViewCheckDuplicateName(c *ctx.Context, page, name string, excludeId int64) error {
var count int64
session := DB(c).Model(&SavedView{}).Where("page = ? AND name = ? AND public_cate = 2", page, name)
if excludeId > 0 {
session = session.Where("id != ?", excludeId)
}
if err := session.Count(&count).Error; err != nil {
return err
}
if count > 0 {
return ErrSavedViewNameDuplicate
}
return nil
}
func SavedViewAdd(c *ctx.Context, sv *SavedView) error {
if err := sv.Verify(); err != nil {
return err
}
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复
if sv.PublicCate == 2 {
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, 0); err != nil {
return err
}
}
now := time.Now().Unix()
sv.CreateAt = now
sv.UpdateAt = now
return Insert(c, sv)
}
func SavedViewUpdate(c *ctx.Context, sv *SavedView, username string) error {
if err := sv.Verify(); err != nil {
return err
}
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复(排除自身)
if sv.PublicCate == 2 {
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, sv.Id); err != nil {
return err
}
}
sv.UpdateAt = time.Now().Unix()
sv.UpdateBy = username
return DB(c).Model(sv).Select("name", "filter", "public_cate", "gids", "update_at", "update_by").Updates(sv).Error
}
func SavedViewDel(c *ctx.Context, id int64) error {
// 先删除收藏关联
if err := DB(c).Where("view_id = ?", id).Delete(&UserViewFavorite{}).Error; err != nil {
return err
}
return DB(c).Where("id = ?", id).Delete(&SavedView{}).Error
}
func SavedViewGetById(c *ctx.Context, id int64) (*SavedView, error) {
var sv SavedView
err := DB(c).Where("id = ?", id).First(&sv).Error
if err != nil {
return nil, err
}
return &sv, nil
}
func SavedViewGets(c *ctx.Context, page string) ([]SavedView, error) {
var views []SavedView
session := DB(c).Where("page = ?", page)
if err := session.Order("update_at DESC").Find(&views).Error; err != nil {
return nil, err
}
return views, nil
}
func SavedViewFavoriteGetByUserId(c *ctx.Context, userId int64) (map[int64]bool, error) {
var favorites []UserViewFavorite
if err := DB(c).Where("user_id = ?", userId).Find(&favorites).Error; err != nil {
return nil, err
}
result := make(map[int64]bool)
for _, f := range favorites {
result[f.ViewId] = true
}
return result, nil
}
type UserViewFavorite struct {
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
ViewId int64 `json:"view_id" gorm:"index"`
UserId int64 `json:"user_id" gorm:"index"`
CreateAt int64 `json:"create_at"`
}
func (UserViewFavorite) TableName() string {
return "user_view_favorite"
}
func UserViewFavoriteAdd(c *ctx.Context, viewId, userId int64) error {
var count int64
if err := DB(c).Model(&SavedView{}).Where("id = ?", viewId).Count(&count).Error; err != nil {
return err
}
if count == 0 {
return ErrSavedViewNotFound
}
if err := DB(c).Model(&UserViewFavorite{}).Where("view_id = ? AND user_id = ?", viewId, userId).Count(&count).Error; err != nil {
return err
}
if count > 0 {
return nil // 已收藏,直接返回成功
}
fav := &UserViewFavorite{
ViewId: viewId,
UserId: userId,
CreateAt: time.Now().Unix(),
}
return DB(c).Create(fav).Error
}
func UserViewFavoriteDel(c *ctx.Context, viewId, userId int64) error {
return DB(c).Where("view_id = ? AND user_id = ?", viewId, userId).Delete(&UserViewFavorite{}).Error
}

View File

@@ -201,11 +201,6 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "业务组中仍有自愈脚本",
"Some target busigroups still in the BusiGroup": "业务组中仍有监控对象",
"saved view not found": "保存的视图不存在",
"saved view name is blank": "视图名称不能为空",
"saved view page is blank": "视图页面不能为空",
"saved view name already exists in this page": "该页面下已存在同名的公开视图",
"---------zh_CN--------": "---------zh_CN--------"
},
"zh_HK": {
@@ -410,11 +405,6 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "業務組中仍有自愈腳本",
"Some target busigroups still in the BusiGroup": "業務組中仍有監控對象",
"saved view not found": "保存的視圖不存在",
"saved view name is blank": "視圖名稱不能為空",
"saved view page is blank": "視圖頁面不能為空",
"saved view name already exists in this page": "該頁面下已存在同名的公開視圖",
"---------zh_HK--------": "---------zh_HK--------"
},
"ja_JP": {
@@ -616,11 +606,6 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "ビジネスグループにまだ自己回復スクリプトがあります",
"Some target busigroups still in the BusiGroup": "ビジネスグループにまだ監視対象があります",
"saved view not found": "保存されたビューが見つかりません",
"saved view name is blank": "ビュー名を空にすることはできません",
"saved view page is blank": "ビューページを空にすることはできません",
"saved view name already exists in this page": "このページには同名の公開ビューが既に存在します",
"---------ja_JP--------": "---------ja_JP--------"
},
"ru_RU": {
@@ -822,11 +807,6 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "В бизнес-группе еще есть скрипты самоисцеления",
"Some target busigroups still in the BusiGroup": "В бизнес-группе еще есть объекты мониторинга",
"saved view not found": "Сохраненный вид не найден",
"saved view name is blank": "Название вида не может быть пустым",
"saved view page is blank": "Страница вида не может быть пустой",
"saved view name already exists in this page": "На этой странице уже существует публичный вид с таким названием",
"---------ru_RU--------": "---------ru_RU--------"
}
}`

View File

@@ -85,15 +85,15 @@ func (pc *PromClientMap) loadFromDatabase() {
var internalAddr string
for k, v := range ds.SettingsJson {
if strings.Contains(k, "write_addr") {
writeAddr = strings.TrimSpace(v.(string))
writeAddr = v.(string)
} else if strings.Contains(k, "internal_addr") && v.(string) != "" {
internalAddr = strings.TrimSpace(v.(string))
internalAddr = v.(string)
}
}
po := PromOption{
ClusterName: ds.Name,
Url: strings.TrimSpace(ds.HTTPJson.Url),
Url: ds.HTTPJson.Url,
WriteAddr: writeAddr,
BasicAuthUser: ds.AuthJson.BasicAuthUser,
BasicAuthPass: ds.AuthJson.BasicAuthPassword,