mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 22:48:56 +00:00
Compare commits
1 Commits
dev21-old
...
es-sql-ale
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47ce119d21 |
@@ -825,12 +825,12 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
|
||||
|
||||
if len(t.Host) == 0 {
|
||||
sender.CallIbex(e.ctx, t.TplId, event.TargetIdent,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event, "")
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
continue
|
||||
}
|
||||
for _, host := range t.Host {
|
||||
sender.CallIbex(e.ctx, t.TplId, host,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event, "")
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
if len(message) == 0 {
|
||||
logger.Infof("rule_eval:%s finished, duration:%v", arw.Key(), time.Since(begin))
|
||||
} else {
|
||||
logger.Warningf("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
|
||||
logger.Infof("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -186,7 +186,8 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
message = fmt.Sprintf("failed to get anomaly points: %v", err)
|
||||
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
|
||||
message = "failed to get anomaly points"
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -86,33 +86,30 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
|
||||
return
|
||||
}
|
||||
|
||||
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event, "")
|
||||
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event)
|
||||
}
|
||||
|
||||
func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent, args string) (int64, error) {
|
||||
logger.Infof("event_callback_ibex: id: %d, host: %s, args: %s, event: %+v", id, host, args, event)
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
|
||||
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
|
||||
|
||||
tpl := taskTplCache.Get(id)
|
||||
if tpl == nil {
|
||||
err := fmt.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
|
||||
return
|
||||
}
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := CanDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
if !can {
|
||||
err = fmt.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
|
||||
return
|
||||
}
|
||||
|
||||
tagsMap := make(map[string]string)
|
||||
@@ -136,16 +133,11 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
tags, err := json.Marshal(tagsMap)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
|
||||
return
|
||||
}
|
||||
|
||||
// call ibex
|
||||
taskArgs := tpl.Args
|
||||
if args != "" {
|
||||
taskArgs = args
|
||||
}
|
||||
in := models.TaskForm{
|
||||
Title: tpl.Title + " FH: " + host,
|
||||
Account: tpl.Account,
|
||||
@@ -154,7 +146,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
Timeout: tpl.Timeout,
|
||||
Pause: tpl.Pause,
|
||||
Script: tpl.Script,
|
||||
Args: taskArgs,
|
||||
Args: tpl.Args,
|
||||
Stdin: string(tags),
|
||||
Action: "start",
|
||||
Creator: tpl.UpdateBy,
|
||||
@@ -164,9 +156,8 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
// write db
|
||||
@@ -187,14 +178,11 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
}
|
||||
|
||||
if err = record.Add(ctx); err != nil {
|
||||
err = fmt.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return id, err
|
||||
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func CanDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
user := userCache.GetByUsername(username)
|
||||
if user != nil && user.IsAdmin() {
|
||||
return true, nil
|
||||
|
||||
@@ -13,53 +13,10 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// webhookClientCache 缓存 http.Client,避免每次请求都创建新的 Client 导致连接泄露
|
||||
var webhookClientCache sync.Map // key: clientKey (string), value: *http.Client
|
||||
|
||||
// 相同配置的 webhook 会复用同一个 Client
|
||||
func getWebhookClient(webhook *models.Webhook) *http.Client {
|
||||
clientKey := webhook.Hash()
|
||||
|
||||
if client, ok := webhookClientCache.Load(clientKey); ok {
|
||||
return client.(*http.Client)
|
||||
}
|
||||
|
||||
// 创建新的 Client
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipVerify},
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
}
|
||||
|
||||
if poster.UseProxy(webhook.Url) {
|
||||
transport.Proxy = http.ProxyFromEnvironment
|
||||
}
|
||||
|
||||
timeout := webhook.Timeout
|
||||
if timeout <= 0 {
|
||||
timeout = 10
|
||||
}
|
||||
|
||||
newClient := &http.Client{
|
||||
Timeout: time.Duration(timeout) * time.Second,
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
// 使用 LoadOrStore 确保并发安全,避免重复创建
|
||||
actual, loaded := webhookClientCache.LoadOrStore(clientKey, newClient)
|
||||
if loaded {
|
||||
return actual.(*http.Client)
|
||||
}
|
||||
|
||||
return newClient
|
||||
}
|
||||
|
||||
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) (bool, string, error) {
|
||||
channel := "webhook"
|
||||
if webhook.Type == models.RuleCallback {
|
||||
@@ -98,13 +55,25 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
|
||||
req.Header.Set(conf.Headers[i], conf.Headers[i+1])
|
||||
}
|
||||
}
|
||||
// 使用全局 Client 缓存,避免每次请求都创建新的 Client 导致连接泄露
|
||||
client := getWebhookClient(conf)
|
||||
insecureSkipVerify := false
|
||||
if webhook != nil {
|
||||
insecureSkipVerify = webhook.SkipVerify
|
||||
}
|
||||
|
||||
if conf.Client == nil {
|
||||
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
|
||||
conf.Client = &http.Client{
|
||||
Timeout: time.Duration(conf.Timeout) * time.Second,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
|
||||
var resp *http.Response
|
||||
var body []byte
|
||||
resp, err = client.Do(req)
|
||||
resp, err = conf.Client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
|
||||
|
||||
@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
|
||||
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
|
||||
|
||||
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
|
||||
pages.POST("/ds-query", rt.auth(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
|
||||
|
||||
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
|
||||
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)
|
||||
@@ -569,14 +569,6 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/pagerduty-service-list/:id", rt.auth(), rt.user(), rt.pagerDutyNotifyServicesGet)
|
||||
pages.GET("/notify-channel-config", rt.auth(), rt.user(), rt.notifyChannelGetBy)
|
||||
pages.GET("/notify-channel-config/idents", rt.notifyChannelIdentsGet)
|
||||
|
||||
// saved view 查询条件保存相关路由
|
||||
pages.GET("/saved-views", rt.auth(), rt.user(), rt.savedViewGets)
|
||||
pages.POST("/saved-views", rt.auth(), rt.user(), rt.savedViewAdd)
|
||||
pages.PUT("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewPut)
|
||||
pages.DELETE("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewDel)
|
||||
pages.POST("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteAdd)
|
||||
pages.DELETE("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteDel)
|
||||
}
|
||||
|
||||
r.GET("/api/n9e/versions", func(c *gin.Context) {
|
||||
|
||||
@@ -1,23 +1,18 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dscache"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
|
||||
|
||||
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
// todo: 后续需要根据 cate 判断是否需要权限
|
||||
return true
|
||||
}
|
||||
@@ -112,13 +107,10 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
}
|
||||
|
||||
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
|
||||
var (
|
||||
resp []models.DataResp
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
errs []error
|
||||
rCtx = ctx.Request.Context()
|
||||
)
|
||||
var resp []models.DataResp
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
|
||||
for _, q := range f.Queries {
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
@@ -130,17 +122,12 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
|
||||
logger.Warningf("cluster:%d not exists", f.DatasourceId)
|
||||
return nil, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
vCtx := rCtx
|
||||
if f.Cate == models.DORIS {
|
||||
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
data, err := plug.QueryData(vCtx, query)
|
||||
data, err := plug.QueryData(ctx.Request.Context(), query)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", query, err)
|
||||
mu.Lock()
|
||||
|
||||
@@ -1,144 +0,0 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/slice"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
func (rt *Router) savedViewGets(c *gin.Context) {
|
||||
page := ginx.QueryStr(c, "page", "")
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
lst, err := models.SavedViewGets(rt.Ctx, page)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
favoriteMap, err := models.SavedViewFavoriteGetByUserId(rt.Ctx, me.Id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
favoriteViews := make([]models.SavedView, 0)
|
||||
normalViews := make([]models.SavedView, 0)
|
||||
|
||||
for _, view := range lst {
|
||||
visible := view.CreateBy == me.Username ||
|
||||
view.PublicCate == 2 ||
|
||||
(view.PublicCate == 1 && slice.HaveIntersection[int64](userGids, view.Gids))
|
||||
|
||||
if !visible {
|
||||
continue
|
||||
}
|
||||
|
||||
view.IsFavorite = favoriteMap[view.Id]
|
||||
|
||||
// 收藏的排前面
|
||||
if view.IsFavorite {
|
||||
favoriteViews = append(favoriteViews, view)
|
||||
} else {
|
||||
normalViews = append(normalViews, view)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(append(favoriteViews, normalViews...), nil)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewAdd(c *gin.Context) {
|
||||
var f models.SavedView
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
f.Id = 0
|
||||
f.CreateBy = me.Username
|
||||
f.UpdateBy = me.Username
|
||||
|
||||
err := models.SavedViewAdd(rt.Ctx, &f)
|
||||
ginx.NewRender(c).Data(f.Id, err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewPut(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
view, err := models.SavedViewGetById(rt.Ctx, id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
if view == nil {
|
||||
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
|
||||
return
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
// 只有创建者可以更新
|
||||
if view.CreateBy != me.Username && !me.IsAdmin() {
|
||||
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
|
||||
return
|
||||
}
|
||||
|
||||
var f models.SavedView
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
view.Name = f.Name
|
||||
view.Filter = f.Filter
|
||||
view.PublicCate = f.PublicCate
|
||||
view.Gids = f.Gids
|
||||
|
||||
err = models.SavedViewUpdate(rt.Ctx, view, me.Username)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewDel(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
view, err := models.SavedViewGetById(rt.Ctx, id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
if view == nil {
|
||||
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
|
||||
return
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
// 只有创建者或管理员可以删除
|
||||
if view.CreateBy != me.Username && !me.IsAdmin() {
|
||||
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
|
||||
return
|
||||
}
|
||||
|
||||
err = models.SavedViewDel(rt.Ctx, id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewFavoriteAdd(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
err := models.UserViewFavoriteAdd(rt.Ctx, id, me.Id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewFavoriteDel(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
err := models.UserViewFavoriteDel(rt.Ctx, id, me.Id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package eslike
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -10,13 +11,16 @@ import (
|
||||
|
||||
"github.com/araddon/dateparse"
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/elastic/go-elasticsearch/v9"
|
||||
"github.com/elastic/go-elasticsearch/v9/esapi"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
)
|
||||
|
||||
type FixedField string
|
||||
@@ -26,10 +30,6 @@ const (
|
||||
FieldId FixedField = "_id"
|
||||
)
|
||||
|
||||
// LabelSeparator 用于分隔多个标签的分隔符
|
||||
// 使用 ASCII 控制字符 Record Separator (0x1E),避免与用户数据中的 "--" 冲突
|
||||
const LabelSeparator = "\x1e"
|
||||
|
||||
type Query struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
|
||||
@@ -51,6 +51,12 @@ type Query struct {
|
||||
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
|
||||
|
||||
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
|
||||
|
||||
QueryType string `json:"query_type" mapstructure:"query_type"`
|
||||
Query string `json:"query" mapstructure:"query"`
|
||||
CustomParams map[string]interface{} `json:"custom_params" mapstructure:"custom_params"`
|
||||
MaxQueryRows int `json:"max_query_rows" mapstructure:"max_query_rows"`
|
||||
Keys types.Keys `json:"keys" mapstructure:"keys"`
|
||||
}
|
||||
|
||||
type SortField struct {
|
||||
@@ -132,7 +138,7 @@ func TransferData(metric, ref string, m map[string][][]float64) []models.DataRes
|
||||
}
|
||||
|
||||
data.Metric["__name__"] = model.LabelValue(metric)
|
||||
labels := strings.Split(k, LabelSeparator)
|
||||
labels := strings.Split(k, "--")
|
||||
for _, label := range labels {
|
||||
arr := strings.SplitN(label, "=", 2)
|
||||
if len(arr) == 2 {
|
||||
@@ -201,7 +207,7 @@ func GetBuckets(labelKey string, keys []string, arr []interface{}, metrics *Metr
|
||||
case json.Number, string:
|
||||
if !getTs {
|
||||
if labels != "" {
|
||||
newlabels = fmt.Sprintf("%s%s%s=%v", labels, LabelSeparator, labelKey, keyValue)
|
||||
newlabels = fmt.Sprintf("%s--%s=%v", labels, labelKey, keyValue)
|
||||
} else {
|
||||
newlabels = fmt.Sprintf("%s=%v", labelKey, keyValue)
|
||||
}
|
||||
@@ -718,3 +724,121 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
|
||||
|
||||
return ret, total, nil
|
||||
}
|
||||
|
||||
// execSQLQuery executes ES SQL query and returns rows as []map[string]interface{}
|
||||
func execSQLQuery(ctx context.Context, param *Query, client *elasticsearch.Client) ([]map[string]interface{}, error) {
|
||||
query := map[string]interface{}{
|
||||
"query": param.Query,
|
||||
}
|
||||
|
||||
for k, v := range param.CustomParams {
|
||||
query[k] = v
|
||||
}
|
||||
|
||||
if param.Timeout > 0 {
|
||||
query["request_timeout"] = fmt.Sprintf("%ds", param.Timeout)
|
||||
}
|
||||
|
||||
if param.MaxQueryRows > 0 {
|
||||
query["fetch_size"] = param.MaxQueryRows
|
||||
}
|
||||
|
||||
queryBytes, err := json.Marshal(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal SQL query: %v", err)
|
||||
}
|
||||
|
||||
req := esapi.SQLQueryRequest{
|
||||
Body: bytes.NewReader(queryBytes),
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute SQL query: %v", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse SQL response: %v", err)
|
||||
}
|
||||
|
||||
if res.IsError() {
|
||||
if errObj, ok := result["error"].(map[string]interface{}); ok {
|
||||
return nil, fmt.Errorf("SQL query error: %s", errObj["reason"])
|
||||
}
|
||||
return nil, fmt.Errorf("SQL query error: unknown")
|
||||
}
|
||||
|
||||
columns, _ := result["columns"].([]interface{})
|
||||
rows, _ := result["rows"].([]interface{})
|
||||
|
||||
var rowMaps []map[string]interface{}
|
||||
for _, row := range rows {
|
||||
rowData, _ := row.([]interface{})
|
||||
rowMap := make(map[string]interface{})
|
||||
for i, col := range columns {
|
||||
if colObj, ok := col.(map[string]interface{}); ok {
|
||||
colName, _ := colObj["name"].(string)
|
||||
if i < len(rowData) {
|
||||
rowMap[colName] = rowData[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
rowMaps = append(rowMaps, rowMap)
|
||||
}
|
||||
|
||||
return rowMaps, nil
|
||||
}
|
||||
|
||||
func QuerySQLData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, client *elasticsearch.Client) ([]models.DataResp, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if param.Timeout == 0 {
|
||||
param.Timeout = int(cliTimeout) / 1000
|
||||
}
|
||||
|
||||
rowMaps, err := execSQLQuery(ctx, param, client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
metricValues := sqlbase.FormatMetricValues(param.Keys, rowMaps)
|
||||
|
||||
var dataResps []models.DataResp
|
||||
for _, mv := range metricValues {
|
||||
dataResps = append(dataResps, models.DataResp{
|
||||
Ref: param.Ref,
|
||||
Metric: mv.Metric,
|
||||
Values: mv.Values,
|
||||
})
|
||||
}
|
||||
|
||||
return dataResps, nil
|
||||
}
|
||||
|
||||
func QuerySQLLog(ctx context.Context, queryParam interface{}, timeout int64, version string, client *elasticsearch.Client) ([]interface{}, int64, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if param.Timeout == 0 {
|
||||
param.Timeout = int(timeout) / 1000
|
||||
}
|
||||
|
||||
rowMaps, err := execSQLQuery(ctx, param, client)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
ret := make([]interface{}, len(rowMaps))
|
||||
for i, rowMap := range rowMaps {
|
||||
ret[i] = rowMap
|
||||
}
|
||||
|
||||
return ret, 0, nil
|
||||
}
|
||||
|
||||
@@ -4,13 +4,12 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -39,8 +38,6 @@ type QueryParam struct {
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
TimeField string `json:"time_field" mapstructure:"time_field"`
|
||||
TimeFormat string `json:"time_format" mapstructure:"time_format"`
|
||||
Interval int64 `json:"interval" mapstructure:"interval"` // 查询时间间隔(秒)
|
||||
Offset int `json:"offset" mapstructure:"offset"` // 延迟计算,不在使用通用配置delay
|
||||
}
|
||||
|
||||
func (d *Doris) InitClient() error {
|
||||
@@ -149,30 +146,6 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
|
||||
return nil, fmt.Errorf("valueKey is required")
|
||||
}
|
||||
|
||||
// 设置默认 interval
|
||||
if dorisQueryParam.Interval == 0 {
|
||||
dorisQueryParam.Interval = 60
|
||||
}
|
||||
|
||||
// 计算时间范围
|
||||
now := time.Now().Unix()
|
||||
var start, end int64
|
||||
if dorisQueryParam.To != 0 && dorisQueryParam.From != 0 {
|
||||
end = dorisQueryParam.To
|
||||
start = dorisQueryParam.From
|
||||
} else {
|
||||
end = now
|
||||
start = end - dorisQueryParam.Interval
|
||||
}
|
||||
|
||||
if dorisQueryParam.Offset != 0 {
|
||||
end -= int64(dorisQueryParam.Offset)
|
||||
start -= int64(dorisQueryParam.Offset)
|
||||
}
|
||||
|
||||
dorisQueryParam.From = start
|
||||
dorisQueryParam.To = end
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
@@ -181,14 +154,13 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
|
||||
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
ValueKey: dorisQueryParam.Keys.ValueKey,
|
||||
LabelKey: dorisQueryParam.Keys.LabelKey,
|
||||
TimeKey: dorisQueryParam.Keys.TimeKey,
|
||||
Offset: dorisQueryParam.Offset,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@@ -216,18 +188,6 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 记录规则预览场景下,只传了interval, 没有传From和To
|
||||
now := time.Now().Unix()
|
||||
if dorisQueryParam.To == 0 && dorisQueryParam.From == 0 && dorisQueryParam.Interval != 0 {
|
||||
dorisQueryParam.To = now
|
||||
dorisQueryParam.From = now - dorisQueryParam.Interval
|
||||
}
|
||||
|
||||
if dorisQueryParam.Offset != 0 {
|
||||
dorisQueryParam.To -= int64(dorisQueryParam.Offset)
|
||||
dorisQueryParam.From -= int64(dorisQueryParam.Offset)
|
||||
}
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tlsx"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v9"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -27,18 +28,20 @@ const (
|
||||
)
|
||||
|
||||
type Elasticsearch struct {
|
||||
Addr string `json:"es.addr" mapstructure:"es.addr"`
|
||||
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
|
||||
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
|
||||
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
|
||||
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
|
||||
Version string `json:"es.version" mapstructure:"es.version"`
|
||||
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
|
||||
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
|
||||
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
|
||||
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
|
||||
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
|
||||
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
|
||||
Addr string `json:"es.addr" mapstructure:"es.addr"`
|
||||
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
|
||||
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
|
||||
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
|
||||
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
|
||||
Version string `json:"es.version" mapstructure:"es.version"`
|
||||
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
|
||||
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
|
||||
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
|
||||
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
|
||||
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
|
||||
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
|
||||
NewClient *elasticsearch.Client `json:"es.new_client" mapstructure:"es.new_client"`
|
||||
MaxQueryRows int `json:"es.max_query_rows" mapstructure:"es.max_query_rows"`
|
||||
}
|
||||
|
||||
type TLS struct {
|
||||
@@ -110,6 +113,23 @@ func (e *Elasticsearch) InitClient() error {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: e.Nodes,
|
||||
Transport: transport,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
if e.Basic.Username != "" && e.Basic.Password != "" {
|
||||
cfg.Username = e.Basic.Username
|
||||
cfg.Password = e.Basic.Password
|
||||
}
|
||||
|
||||
for k, v := range e.Headers {
|
||||
cfg.Header[k] = []string{v}
|
||||
}
|
||||
|
||||
e.NewClient, err = elasticsearch.NewClient(cfg)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -183,6 +203,15 @@ func (e *Elasticsearch) MakeTSQuery(ctx context.Context, query interface{}, even
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
|
||||
param := new(eslike.Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if param.QueryType == "SQL" {
|
||||
return eslike.QuerySQLData(ctx, param, e.Timeout, e.Version, e.NewClient)
|
||||
}
|
||||
|
||||
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
|
||||
return e.Client.Search().
|
||||
Index(indices...).
|
||||
@@ -248,6 +277,20 @@ func (e *Elasticsearch) QueryFields(indexes []string) ([]string, error) {
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
|
||||
param := new(eslike.Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if param.QueryType == "SQL" {
|
||||
if param.CustomParams == nil {
|
||||
param.CustomParams = make(map[string]interface{})
|
||||
}
|
||||
if e.MaxQueryRows > 0 {
|
||||
param.MaxQueryRows = e.MaxQueryRows
|
||||
}
|
||||
return eslike.QuerySQLLog(ctx, param, e.Timeout, e.Version, e.NewClient)
|
||||
}
|
||||
|
||||
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
|
||||
// 应该是之前为了获取 fields 字段,做的这个兼容
|
||||
|
||||
@@ -33,7 +33,6 @@ type Query struct {
|
||||
Time int64 `json:"time" mapstructure:"time"` // 单点时间(秒)- 用于告警
|
||||
Step string `json:"step" mapstructure:"step"` // 步长,如 "1m", "5m"
|
||||
Limit int `json:"limit" mapstructure:"limit"` // 限制返回数量
|
||||
Ref string `json:"ref" mapstructure:"ref"` // 变量引用名(如 A、B)
|
||||
}
|
||||
|
||||
// IsInstantQuery 判断是否为即时查询(告警场景)
|
||||
@@ -163,7 +162,7 @@ func (vl *VictoriaLogs) queryDataInstant(ctx context.Context, param *Query) ([]m
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertPrometheusInstantToDataResp(result, param.Ref), nil
|
||||
return convertPrometheusInstantToDataResp(result), nil
|
||||
}
|
||||
|
||||
// queryDataRange 看图场景,调用 /select/logsql/stats_query_range
|
||||
@@ -186,17 +185,15 @@ func (vl *VictoriaLogs) queryDataRange(ctx context.Context, param *Query) ([]mod
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertPrometheusRangeToDataResp(result, param.Ref), nil
|
||||
return convertPrometheusRangeToDataResp(result), nil
|
||||
}
|
||||
|
||||
// convertPrometheusInstantToDataResp 将 Prometheus Instant Query 格式转换为 DataResp
|
||||
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
|
||||
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse) []models.DataResp {
|
||||
var dataResps []models.DataResp
|
||||
|
||||
for _, item := range resp.Data.Result {
|
||||
dataResp := models.DataResp{
|
||||
Ref: ref,
|
||||
}
|
||||
dataResp := models.DataResp{}
|
||||
|
||||
// 转换 Metric
|
||||
dataResp.Metric = make(model.Metric)
|
||||
@@ -221,13 +218,11 @@ func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, r
|
||||
}
|
||||
|
||||
// convertPrometheusRangeToDataResp 将 Prometheus Range Query 格式转换为 DataResp
|
||||
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
|
||||
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse) []models.DataResp {
|
||||
var dataResps []models.DataResp
|
||||
|
||||
for _, item := range resp.Data.Result {
|
||||
dataResp := models.DataResp{
|
||||
Ref: ref,
|
||||
}
|
||||
dataResp := models.DataResp{}
|
||||
|
||||
// 转换 Metric
|
||||
dataResp.Metric = make(model.Metric)
|
||||
|
||||
@@ -57,29 +57,3 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
|
||||
|
||||
return cs.datas[cate][dsId], true
|
||||
}
|
||||
|
||||
func (cs *Cache) Delete(cate string, dsId int64) {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
if _, found := cs.datas[cate]; !found {
|
||||
return
|
||||
}
|
||||
delete(cs.datas[cate], dsId)
|
||||
|
||||
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
|
||||
}
|
||||
|
||||
// GetAllIds 返回缓存中所有数据源的 ID,按类型分组
|
||||
func (cs *Cache) GetAllIds() map[string][]int64 {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
result := make(map[string][]int64)
|
||||
for cate, dsMap := range cs.datas {
|
||||
ids := make([]int64, 0, len(dsMap))
|
||||
for dsId := range dsMap {
|
||||
ids = append(ids, dsId)
|
||||
}
|
||||
result[cate] = ids
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -170,13 +170,11 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
|
||||
ds.Settings["es.min_interval"] = item.SettingsJson["min_interval"]
|
||||
ds.Settings["es.max_shard"] = item.SettingsJson["max_shard"]
|
||||
ds.Settings["es.enable_write"] = item.SettingsJson["enable_write"]
|
||||
ds.Settings["es.max_query_rows"] = item.SettingsJson["max_query_rows"]
|
||||
}
|
||||
|
||||
func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
// 记录当前有效的数据源 ID,按类型分组
|
||||
validIds := make(map[string]map[int64]struct{})
|
||||
ids := make([]int64, 0)
|
||||
|
||||
for _, item := range items {
|
||||
if item.Type == "prometheus" {
|
||||
continue
|
||||
@@ -205,12 +203,6 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
ids = append(ids, item.Id)
|
||||
|
||||
// 记录有效的数据源 ID
|
||||
if _, ok := validIds[typ]; !ok {
|
||||
validIds[typ] = make(map[int64]struct{})
|
||||
}
|
||||
validIds[typ][item.Id] = struct{}{}
|
||||
|
||||
// 异步初始化 client 不然数据源同步的会很慢
|
||||
go func() {
|
||||
defer func() {
|
||||
@@ -222,19 +214,5 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}()
|
||||
}
|
||||
|
||||
// 删除 items 中不存在但 DsCache 中存在的数据源
|
||||
cachedIds := DsCache.GetAllIds()
|
||||
for cate, dsIds := range cachedIds {
|
||||
for _, dsId := range dsIds {
|
||||
if _, ok := validIds[cate]; !ok {
|
||||
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
|
||||
DsCache.Delete(cate, dsId)
|
||||
} else if _, ok := validIds[cate][dsId]; !ok {
|
||||
// 该数据源 ID 在 items 中不存在,删除
|
||||
DsCache.Delete(cate, dsId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -18,21 +18,13 @@ import (
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
const (
|
||||
ShowIndexFieldIndexType = "index_type"
|
||||
ShowIndexFieldColumnName = "column_name"
|
||||
ShowIndexKeyName = "key_name"
|
||||
|
||||
SQLShowIndex = "SHOW INDEX FROM "
|
||||
)
|
||||
|
||||
// Doris struct to hold connection details and the connection object
|
||||
type Doris struct {
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
|
||||
User string `json:"doris.user" mapstructure:"doris.user"` //
|
||||
Password string `json:"doris.password" mapstructure:"doris.password"` //
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
|
||||
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
|
||||
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
|
||||
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
|
||||
@@ -127,7 +119,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
}
|
||||
|
||||
// ShowDatabases lists all databases in Doris
|
||||
@@ -320,88 +312,6 @@ func (d *Doris) DescTable(ctx context.Context, database, table string) ([]*types
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
type TableIndexInfo struct {
|
||||
ColumnName string `json:"column_name"`
|
||||
IndexName string `json:"index_name"`
|
||||
IndexType string `json:"index_type"`
|
||||
}
|
||||
|
||||
// ShowIndexes 查询表的所有索引信息
|
||||
func (d *Doris) ShowIndexes(ctx context.Context, database, table string) ([]TableIndexInfo, error) {
|
||||
if database == "" || table == "" {
|
||||
return nil, fmt.Errorf("database and table names cannot be empty")
|
||||
}
|
||||
|
||||
tCtx, cancel := d.createTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
db, err := d.NewConn(tCtx, database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
querySQL := fmt.Sprintf("%s `%s`.`%s`", SQLShowIndex, database, table)
|
||||
rows, err := db.QueryContext(tCtx, querySQL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query indexes: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get columns: %w", err)
|
||||
}
|
||||
count := len(columns)
|
||||
|
||||
// 预映射列索引
|
||||
colIdx := map[string]int{
|
||||
ShowIndexKeyName: -1,
|
||||
ShowIndexFieldColumnName: -1,
|
||||
ShowIndexFieldIndexType: -1,
|
||||
}
|
||||
for i, col := range columns {
|
||||
lCol := strings.ToLower(col)
|
||||
if lCol == ShowIndexKeyName || lCol == ShowIndexFieldColumnName || lCol == ShowIndexFieldIndexType {
|
||||
colIdx[lCol] = i
|
||||
}
|
||||
}
|
||||
|
||||
var result []TableIndexInfo
|
||||
for rows.Next() {
|
||||
// 使用 sql.RawBytes 可以接受任何类型并转为 string,避免复杂的类型断言
|
||||
scanArgs := make([]interface{}, count)
|
||||
values := make([]sql.RawBytes, count)
|
||||
for i := range values {
|
||||
scanArgs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err = rows.Scan(scanArgs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := TableIndexInfo{}
|
||||
if i := colIdx[ShowIndexFieldColumnName]; i != -1 && i < count {
|
||||
info.ColumnName = string(values[i])
|
||||
}
|
||||
if i := colIdx[ShowIndexKeyName]; i != -1 && i < count {
|
||||
info.IndexName = string(values[i])
|
||||
}
|
||||
if i := colIdx[ShowIndexFieldIndexType]; i != -1 && i < count {
|
||||
info.IndexType = string(values[i])
|
||||
}
|
||||
|
||||
if info.ColumnName != "" {
|
||||
result = append(result, info)
|
||||
}
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating rows: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SelectRows selects rows from a specified table in Doris based on a given query with MaxQueryRows check
|
||||
func (d *Doris) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) {
|
||||
sql := fmt.Sprintf("SELECT * FROM %s.%s", database, table)
|
||||
|
||||
@@ -10,14 +10,13 @@ const (
|
||||
TimeseriesAggregationTimestamp = "__ts__"
|
||||
)
|
||||
|
||||
// QueryLogs 查询日志
|
||||
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
|
||||
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
// 等同于 Query()
|
||||
return d.Query(ctx, query, true)
|
||||
return d.Query(ctx, query)
|
||||
}
|
||||
|
||||
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
|
||||
values, err := d.QueryTimeseries(ctx, query)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,10 +15,6 @@ const (
|
||||
TimeFieldFormatDateTime = "datetime"
|
||||
)
|
||||
|
||||
type noNeedCheckMaxRowKey struct{}
|
||||
|
||||
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
|
||||
|
||||
// 不再拼接SQL, 完全信赖用户的输入
|
||||
type QueryParam struct {
|
||||
Database string `json:"database"`
|
||||
@@ -43,7 +39,7 @@ var (
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
// 校验SQL的合法性, 过滤掉 write请求
|
||||
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
|
||||
for _, item := range sqlItem {
|
||||
@@ -52,12 +48,10 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
|
||||
}
|
||||
}
|
||||
|
||||
if checkMaxRow {
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
|
||||
@@ -69,12 +63,8 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
|
||||
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
|
||||
// 默认需要检查,除非调用方声明不需要检查
|
||||
checkMaxRow := true
|
||||
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
|
||||
checkMaxRow = false
|
||||
}
|
||||
rows, err := d.Query(ctx, query, checkMaxRow)
|
||||
// 使用 Query 方法执行查询,Query方法内部已包含MaxQueryRows检查
|
||||
rows, err := d.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -158,10 +158,7 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe
|
||||
}
|
||||
|
||||
if !exists {
|
||||
// Default to current time if not specified
|
||||
// 大多数情况下offset为空
|
||||
// 对于记录规则延迟计算的情况,统计值的时间戳需要有偏移,以便跟统计值对应
|
||||
ts = float64(time.Now().Unix()) - float64(keys.Offset)
|
||||
ts = float64(time.Now().Unix()) // Default to current time if not specified
|
||||
}
|
||||
|
||||
valuePair := []float64{ts, value}
|
||||
|
||||
@@ -48,5 +48,4 @@ type Keys struct {
|
||||
LabelKey string `json:"labelKey" mapstructure:"labelKey"` // 多个用空格分隔
|
||||
TimeKey string `json:"timeKey" mapstructure:"timeKey"`
|
||||
TimeFormat string `json:"timeFormat" mapstructure:"timeFormat"` // not used anymore
|
||||
Offset int `json:"offset" mapstructure:"offset"`
|
||||
}
|
||||
|
||||
11
go.mod
11
go.mod
@@ -18,6 +18,7 @@ require (
|
||||
github.com/coreos/go-oidc v2.2.1+incompatible
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0
|
||||
github.com/expr-lang/expr v1.16.1
|
||||
github.com/flashcatcloud/ibex v1.3.6
|
||||
github.com/gin-contrib/pprof v1.4.0
|
||||
@@ -75,8 +76,8 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
go.opentelemetry.io/otel v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -90,7 +91,10 @@ require (
|
||||
github.com/eapache/go-resiliency v1.7.0 // indirect
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
|
||||
github.com/glebarez/go-sqlite v1.21.2 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.3 // indirect
|
||||
@@ -103,11 +107,12 @@ require (
|
||||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rogpeppe/go-internal v1.13.1 // indirect
|
||||
github.com/tjfoc/gmsm v1.4.1 // indirect
|
||||
github.com/valyala/fastrand v1.1.0 // indirect
|
||||
github.com/valyala/histogram v1.2.0 // indirect
|
||||
github.com/yuin/gopher-lua v1.1.1 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
modernc.org/libc v1.22.5 // indirect
|
||||
|
||||
26
go.sum
26
go.sum
@@ -144,6 +144,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
|
||||
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0 h1:krpgPeJ2lC8apkaw6B58gKDYJq5eUhP8AMwpPt01Q/U=
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0/go.mod h1:2PB5YQPpY5tWbF65MRqzEXA31PZOdXCkloQSOZtU14I=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
@@ -183,6 +187,11 @@ github.com/go-ldap/ldap/v3 v3.4.4 h1:qPjipEpt+qDa6SI/h1fzuGWoRUY+qqQ9sOZq67/PYUs
|
||||
github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXgXtJC+aI=
|
||||
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
|
||||
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
@@ -232,8 +241,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
|
||||
@@ -463,10 +473,16 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
|
||||
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
|
||||
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
|
||||
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
|
||||
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
|
||||
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
|
||||
|
||||
@@ -68,8 +68,7 @@ func MigrateTables(db *gorm.DB) error {
|
||||
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
|
||||
&models.MetricFilter{}, &models.NotificationRecord{}, &models.TargetBusiGroup{},
|
||||
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
|
||||
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{},
|
||||
&models.SavedView{}, &models.UserViewFavorite{}}
|
||||
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{}}
|
||||
|
||||
if isPostgres(db) {
|
||||
dts = append(dts, &models.PostgresBuiltinComponent{})
|
||||
|
||||
@@ -31,8 +31,7 @@ type Webhook struct {
|
||||
RetryCount int `json:"retry_count"`
|
||||
RetryInterval int `json:"retry_interval"`
|
||||
Batch int `json:"batch"`
|
||||
|
||||
Client *http.Client `json:"-"`
|
||||
Client *http.Client `json:"-"`
|
||||
}
|
||||
|
||||
func (w *Webhook) Hash() string {
|
||||
|
||||
@@ -1,174 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrSavedViewNameEmpty = errors.New("saved view name is blank")
|
||||
ErrSavedViewPageEmpty = errors.New("saved view page is blank")
|
||||
ErrSavedViewNotFound = errors.New("saved view not found")
|
||||
ErrSavedViewNameDuplicate = errors.New("saved view name already exists in this page")
|
||||
)
|
||||
|
||||
type SavedView struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
Name string `json:"name" gorm:"type:varchar(255);not null"`
|
||||
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
|
||||
Filter string `json:"filter" gorm:"type:text"`
|
||||
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
|
||||
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
|
||||
// 查询时填充的字段
|
||||
IsFavorite bool `json:"is_favorite" gorm:"-"`
|
||||
}
|
||||
|
||||
func (SavedView) TableName() string {
|
||||
return "saved_view"
|
||||
}
|
||||
|
||||
func (sv *SavedView) Verify() error {
|
||||
sv.Name = strings.TrimSpace(sv.Name)
|
||||
if sv.Name == "" {
|
||||
return ErrSavedViewNameEmpty
|
||||
}
|
||||
if sv.Page == "" {
|
||||
return ErrSavedViewPageEmpty
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SavedViewCheckDuplicateName(c *ctx.Context, page, name string, excludeId int64) error {
|
||||
var count int64
|
||||
session := DB(c).Model(&SavedView{}).Where("page = ? AND name = ? AND public_cate = 2", page, name)
|
||||
if excludeId > 0 {
|
||||
session = session.Where("id != ?", excludeId)
|
||||
}
|
||||
if err := session.Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return ErrSavedViewNameDuplicate
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SavedViewAdd(c *ctx.Context, sv *SavedView) error {
|
||||
if err := sv.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复
|
||||
if sv.PublicCate == 2 {
|
||||
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
sv.CreateAt = now
|
||||
sv.UpdateAt = now
|
||||
return Insert(c, sv)
|
||||
}
|
||||
|
||||
func SavedViewUpdate(c *ctx.Context, sv *SavedView, username string) error {
|
||||
if err := sv.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复(排除自身)
|
||||
if sv.PublicCate == 2 {
|
||||
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, sv.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sv.UpdateAt = time.Now().Unix()
|
||||
sv.UpdateBy = username
|
||||
return DB(c).Model(sv).Select("name", "filter", "public_cate", "gids", "update_at", "update_by").Updates(sv).Error
|
||||
}
|
||||
|
||||
func SavedViewDel(c *ctx.Context, id int64) error {
|
||||
// 先删除收藏关联
|
||||
if err := DB(c).Where("view_id = ?", id).Delete(&UserViewFavorite{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
return DB(c).Where("id = ?", id).Delete(&SavedView{}).Error
|
||||
}
|
||||
|
||||
func SavedViewGetById(c *ctx.Context, id int64) (*SavedView, error) {
|
||||
var sv SavedView
|
||||
err := DB(c).Where("id = ?", id).First(&sv).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &sv, nil
|
||||
}
|
||||
|
||||
func SavedViewGets(c *ctx.Context, page string) ([]SavedView, error) {
|
||||
var views []SavedView
|
||||
|
||||
session := DB(c).Where("page = ?", page)
|
||||
|
||||
if err := session.Order("update_at DESC").Find(&views).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return views, nil
|
||||
}
|
||||
|
||||
func SavedViewFavoriteGetByUserId(c *ctx.Context, userId int64) (map[int64]bool, error) {
|
||||
var favorites []UserViewFavorite
|
||||
if err := DB(c).Where("user_id = ?", userId).Find(&favorites).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make(map[int64]bool)
|
||||
for _, f := range favorites {
|
||||
result[f.ViewId] = true
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type UserViewFavorite struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
ViewId int64 `json:"view_id" gorm:"index"`
|
||||
UserId int64 `json:"user_id" gorm:"index"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
}
|
||||
|
||||
func (UserViewFavorite) TableName() string {
|
||||
return "user_view_favorite"
|
||||
}
|
||||
|
||||
func UserViewFavoriteAdd(c *ctx.Context, viewId, userId int64) error {
|
||||
var count int64
|
||||
if err := DB(c).Model(&SavedView{}).Where("id = ?", viewId).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count == 0 {
|
||||
return ErrSavedViewNotFound
|
||||
}
|
||||
|
||||
if err := DB(c).Model(&UserViewFavorite{}).Where("view_id = ? AND user_id = ?", viewId, userId).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return nil // 已收藏,直接返回成功
|
||||
}
|
||||
|
||||
fav := &UserViewFavorite{
|
||||
ViewId: viewId,
|
||||
UserId: userId,
|
||||
CreateAt: time.Now().Unix(),
|
||||
}
|
||||
return DB(c).Create(fav).Error
|
||||
}
|
||||
|
||||
func UserViewFavoriteDel(c *ctx.Context, viewId, userId int64) error {
|
||||
return DB(c).Where("view_id = ? AND user_id = ?", viewId, userId).Delete(&UserViewFavorite{}).Error
|
||||
}
|
||||
@@ -201,11 +201,6 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "业务组中仍有自愈脚本",
|
||||
"Some target busigroups still in the BusiGroup": "业务组中仍有监控对象",
|
||||
|
||||
"saved view not found": "保存的视图不存在",
|
||||
"saved view name is blank": "视图名称不能为空",
|
||||
"saved view page is blank": "视图页面不能为空",
|
||||
"saved view name already exists in this page": "该页面下已存在同名的公开视图",
|
||||
|
||||
"---------zh_CN--------": "---------zh_CN--------"
|
||||
},
|
||||
"zh_HK": {
|
||||
@@ -410,11 +405,6 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "業務組中仍有自愈腳本",
|
||||
"Some target busigroups still in the BusiGroup": "業務組中仍有監控對象",
|
||||
|
||||
"saved view not found": "保存的視圖不存在",
|
||||
"saved view name is blank": "視圖名稱不能為空",
|
||||
"saved view page is blank": "視圖頁面不能為空",
|
||||
"saved view name already exists in this page": "該頁面下已存在同名的公開視圖",
|
||||
|
||||
"---------zh_HK--------": "---------zh_HK--------"
|
||||
},
|
||||
"ja_JP": {
|
||||
@@ -616,11 +606,6 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "ビジネスグループにまだ自己回復スクリプトがあります",
|
||||
"Some target busigroups still in the BusiGroup": "ビジネスグループにまだ監視対象があります",
|
||||
|
||||
"saved view not found": "保存されたビューが見つかりません",
|
||||
"saved view name is blank": "ビュー名を空にすることはできません",
|
||||
"saved view page is blank": "ビューページを空にすることはできません",
|
||||
"saved view name already exists in this page": "このページには同名の公開ビューが既に存在します",
|
||||
|
||||
"---------ja_JP--------": "---------ja_JP--------"
|
||||
},
|
||||
"ru_RU": {
|
||||
@@ -822,11 +807,6 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "В бизнес-группе еще есть скрипты самоисцеления",
|
||||
"Some target busigroups still in the BusiGroup": "В бизнес-группе еще есть объекты мониторинга",
|
||||
|
||||
"saved view not found": "Сохраненный вид не найден",
|
||||
"saved view name is blank": "Название вида не может быть пустым",
|
||||
"saved view page is blank": "Страница вида не может быть пустой",
|
||||
"saved view name already exists in this page": "На этой странице уже существует публичный вид с таким названием",
|
||||
|
||||
"---------ru_RU--------": "---------ru_RU--------"
|
||||
}
|
||||
}`
|
||||
|
||||
@@ -85,15 +85,15 @@ func (pc *PromClientMap) loadFromDatabase() {
|
||||
var internalAddr string
|
||||
for k, v := range ds.SettingsJson {
|
||||
if strings.Contains(k, "write_addr") {
|
||||
writeAddr = strings.TrimSpace(v.(string))
|
||||
writeAddr = v.(string)
|
||||
} else if strings.Contains(k, "internal_addr") && v.(string) != "" {
|
||||
internalAddr = strings.TrimSpace(v.(string))
|
||||
internalAddr = v.(string)
|
||||
}
|
||||
}
|
||||
|
||||
po := PromOption{
|
||||
ClusterName: ds.Name,
|
||||
Url: strings.TrimSpace(ds.HTTPJson.Url),
|
||||
Url: ds.HTTPJson.Url,
|
||||
WriteAddr: writeAddr,
|
||||
BasicAuthUser: ds.AuthJson.BasicAuthUser,
|
||||
BasicAuthPass: ds.AuthJson.BasicAuthPassword,
|
||||
|
||||
Reference in New Issue
Block a user