Compare commits

...

29 Commits

Author SHA1 Message Date
ning
46083d741d fix: query data 2025-12-30 19:21:16 +08:00
ning
3eeb705b39 update ds perm check 2025-12-30 16:51:10 +08:00
ning
8d87e69ee7 fix: datasource delete 2025-12-30 16:50:10 +08:00
pioneerlfn
3da85d8e28 fix: doris exec sql timeout unit: s -> ms 2025-12-29 14:27:42 +08:00
pioneerlfn
b50410b88a refactor: update doris query 2025-12-26 16:32:57 +08:00
ning
c98241b3fd fix: search view api 2025-12-26 14:47:57 +08:00
ning
b30caf625b refactor: save view check name 2025-12-26 12:11:12 +08:00
SenCoder
32e8b961c2 refactor: add args parameter to CallIbex 2025-12-25 14:51:27 +08:00
ning
2ff0a8fdbb Merge branch 'main' of github.com:ccfos/nightingale 2025-12-25 14:42:33 +08:00
ning
7ff74d0948 fix: es query use ASCII control character as label separator to avoid truncation when user data contains -- 2025-12-25 14:42:16 +08:00
pioneerlfn
da58d825c0 refactor: doris add method showIndexes (#3011) 2025-12-25 11:48:29 +08:00
SenCoder
0014b77c4d refactor: change canDoIbex func to public (#3010) 2025-12-24 21:21:01 +08:00
Yening Qin
fc7fdde2d5 feat: support search view save (#3009) 2025-12-24 17:52:45 +08:00
pioneerlfn
61b63fc75c fix: doris query logs with interval (#3008) 2025-12-24 12:14:12 +08:00
pioneerlfn
80f564ec63 refactor: doris query data (#3003) 2025-12-22 16:04:36 +08:00
Ulric Qin
203c2a885b eval log: use Warn level when error message is not blank 2025-12-22 14:39:07 +08:00
ning
9bee3e1379 fix: vlogs query 2025-12-20 19:52:29 +08:00
ning
c214580e87 refactor: trim prom param 2025-12-18 19:45:06 +08:00
ning
f6faed0659 fix: webhook connection leak 2025-12-17 18:07:43 +08:00
ning
990819d6c1 fix: webhook connection leak 2025-12-17 18:01:28 +08:00
SenCoder
5fff517cce refactor: callibex return task Id (#2994) 2025-12-12 18:45:09 +08:00
ning
db1bb34277 refactor: index pattern delete check 2025-12-11 15:31:49 +08:00
ning
81e37c9ed4 refactor: removes invisible characters from user contacts 2025-12-11 14:52:57 +08:00
pioneerlfn
27ec6a2d04 fix: doris macro/time (#2990) 2025-12-11 12:27:54 +08:00
ning
372a8cff2f refactor: builtin tpl add 2025-12-08 16:53:58 +08:00
Yening Qin
68850800ed feat: support victorialogs alert (#2988) 2025-12-05 14:51:58 +08:00
Busyster996
717f7f1c4b docs: dockerfile add jinja2 (#2982) 2025-12-05 14:47:58 +08:00
Snowykami
82e1e715ad fix: remove elasticsearch version validate (#2986) 2025-12-05 14:47:10 +08:00
ning
d1058639fc fix: event concurrent map writes 2025-12-04 20:16:21 +08:00
33 changed files with 1722 additions and 102 deletions

View File

@@ -282,13 +282,16 @@ func PipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
tagMatch := true
if len(pipeline.LabelFilters) > 0 {
for i := range pipeline.LabelFilters {
if pipeline.LabelFilters[i].Func == "" {
pipeline.LabelFilters[i].Func = pipeline.LabelFilters[i].Op
// Deep copy to avoid concurrent map writes on cached objects
labelFiltersCopy := make([]models.TagFilter, len(pipeline.LabelFilters))
copy(labelFiltersCopy, pipeline.LabelFilters)
for i := range labelFiltersCopy {
if labelFiltersCopy[i].Func == "" {
labelFiltersCopy[i].Func = labelFiltersCopy[i].Op
}
}
tagFilters, err := models.ParseTagFilter(pipeline.LabelFilters)
tagFilters, err := models.ParseTagFilter(labelFiltersCopy)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v", err, event, pipeline)
return false
@@ -298,7 +301,11 @@ func PipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
attributesMatch := true
if len(pipeline.AttrFilters) > 0 {
tagFilters, err := models.ParseTagFilter(pipeline.AttrFilters)
// Deep copy to avoid concurrent map writes on cached objects
attrFiltersCopy := make([]models.TagFilter, len(pipeline.AttrFilters))
copy(attrFiltersCopy, pipeline.AttrFilters)
tagFilters, err := models.ParseTagFilter(attrFiltersCopy)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v err:%v", tagFilters, event, pipeline, err)
return false
@@ -379,13 +386,16 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
tagMatch := true
if len(notifyConfig.LabelKeys) > 0 {
for i := range notifyConfig.LabelKeys {
if notifyConfig.LabelKeys[i].Func == "" {
notifyConfig.LabelKeys[i].Func = notifyConfig.LabelKeys[i].Op
// Deep copy to avoid concurrent map writes on cached objects
labelKeysCopy := make([]models.TagFilter, len(notifyConfig.LabelKeys))
copy(labelKeysCopy, notifyConfig.LabelKeys)
for i := range labelKeysCopy {
if labelKeysCopy[i].Func == "" {
labelKeysCopy[i].Func = labelKeysCopy[i].Op
}
}
tagFilters, err := models.ParseTagFilter(notifyConfig.LabelKeys)
tagFilters, err := models.ParseTagFilter(labelKeysCopy)
if err != nil {
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v", err, event, notifyConfig)
return fmt.Errorf("failed to parse tag filter: %v", err)
@@ -399,7 +409,11 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
attributesMatch := true
if len(notifyConfig.Attributes) > 0 {
tagFilters, err := models.ParseTagFilter(notifyConfig.Attributes)
// Deep copy to avoid concurrent map writes on cached objects
attributesCopy := make([]models.TagFilter, len(notifyConfig.Attributes))
copy(attributesCopy, notifyConfig.Attributes)
tagFilters, err := models.ParseTagFilter(attributesCopy)
if err != nil {
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v err:%v", tagFilters, event, notifyConfig, err)
return fmt.Errorf("failed to parse tag filter: %v", err)
@@ -811,12 +825,12 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
if len(t.Host) == 0 {
sender.CallIbex(e.ctx, t.TplId, event.TargetIdent,
e.taskTplsCache, e.targetCache, e.userCache, event)
e.taskTplsCache, e.targetCache, e.userCache, event, "")
continue
}
for _, host := range t.Host {
sender.CallIbex(e.ctx, t.TplId, host,
e.taskTplsCache, e.targetCache, e.userCache, event)
e.taskTplsCache, e.targetCache, e.userCache, event, "")
}
}
}

View File

@@ -151,7 +151,7 @@ func (arw *AlertRuleWorker) Eval() {
if len(message) == 0 {
logger.Infof("rule_eval:%s finished, duration:%v", arw.Key(), time.Since(begin))
} else {
logger.Infof("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
logger.Warningf("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
}
}()
@@ -186,8 +186,7 @@ func (arw *AlertRuleWorker) Eval() {
}
if err != nil {
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
message = "failed to get anomaly points"
message = fmt.Sprintf("failed to get anomaly points: %v", err)
return
}

View File

@@ -86,30 +86,33 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
return
}
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event)
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event, "")
}
func CallIbex(ctx *ctx.Context, id int64, host string,
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
userCache *memsto.UserCacheType, event *models.AlertCurEvent, args string) (int64, error) {
logger.Infof("event_callback_ibex: id: %d, host: %s, args: %s, event: %+v", id, host, args, event)
tpl := taskTplCache.Get(id)
if tpl == nil {
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
return
err := fmt.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
logger.Errorf("%s", err)
return 0, err
}
// check perm
// tpl.GroupId - host - account 三元组校验权限
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
can, err := CanDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
if err != nil {
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
return
err = fmt.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return 0, err
}
if !can {
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
return
err = fmt.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
logger.Errorf("%s", err)
return 0, err
}
tagsMap := make(map[string]string)
@@ -133,11 +136,16 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
tags, err := json.Marshal(tagsMap)
if err != nil {
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
return
err = fmt.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
logger.Errorf("%s", err)
return 0, err
}
// call ibex
taskArgs := tpl.Args
if args != "" {
taskArgs = args
}
in := models.TaskForm{
Title: tpl.Title + " FH: " + host,
Account: tpl.Account,
@@ -146,7 +154,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
Timeout: tpl.Timeout,
Pause: tpl.Pause,
Script: tpl.Script,
Args: tpl.Args,
Args: taskArgs,
Stdin: string(tags),
Action: "start",
Creator: tpl.UpdateBy,
@@ -156,8 +164,9 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
if err != nil {
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
return
err = fmt.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return 0, err
}
// write db
@@ -178,11 +187,14 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
}
if err = record.Add(ctx); err != nil {
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
err = fmt.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
logger.Errorf("%s", err)
return id, err
}
return id, nil
}
func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
func CanDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
user := userCache.GetByUsername(username)
if user != nil && user.IsAdmin() {
return true, nil

View File

@@ -13,10 +13,53 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
// webhookClientCache 缓存 http.Client避免每次请求都创建新的 Client 导致连接泄露
var webhookClientCache sync.Map // key: clientKey (string), value: *http.Client
// 相同配置的 webhook 会复用同一个 Client
func getWebhookClient(webhook *models.Webhook) *http.Client {
clientKey := webhook.Hash()
if client, ok := webhookClientCache.Load(clientKey); ok {
return client.(*http.Client)
}
// 创建新的 Client
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipVerify},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
}
if poster.UseProxy(webhook.Url) {
transport.Proxy = http.ProxyFromEnvironment
}
timeout := webhook.Timeout
if timeout <= 0 {
timeout = 10
}
newClient := &http.Client{
Timeout: time.Duration(timeout) * time.Second,
Transport: transport,
}
// 使用 LoadOrStore 确保并发安全,避免重复创建
actual, loaded := webhookClientCache.LoadOrStore(clientKey, newClient)
if loaded {
return actual.(*http.Client)
}
return newClient
}
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) (bool, string, error) {
channel := "webhook"
if webhook.Type == models.RuleCallback {
@@ -55,25 +98,13 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
req.Header.Set(conf.Headers[i], conf.Headers[i+1])
}
}
insecureSkipVerify := false
if webhook != nil {
insecureSkipVerify = webhook.SkipVerify
}
if conf.Client == nil {
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
conf.Client = &http.Client{
Timeout: time.Duration(conf.Timeout) * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
},
}
}
// 使用全局 Client 缓存,避免每次请求都创建新的 Client 导致连接泄露
client := getWebhookClient(conf)
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
var resp *http.Response
var body []byte
resp, err = conf.Client.Do(req)
resp, err = client.Do(req)
if err != nil {
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()

View File

@@ -55,4 +55,10 @@ var Plugins = []Plugin{
Type: "opensearch",
TypeName: "OpenSearch",
},
{
Id: 10,
Category: "logging",
Type: "victorialogs",
TypeName: "VictoriaLogs",
},
}

View File

@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
pages.POST("/ds-query", rt.auth(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)
@@ -569,6 +569,14 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/pagerduty-service-list/:id", rt.auth(), rt.user(), rt.pagerDutyNotifyServicesGet)
pages.GET("/notify-channel-config", rt.auth(), rt.user(), rt.notifyChannelGetBy)
pages.GET("/notify-channel-config/idents", rt.notifyChannelIdentsGet)
// saved view 查询条件保存相关路由
pages.GET("/saved-views", rt.auth(), rt.user(), rt.savedViewGets)
pages.POST("/saved-views", rt.auth(), rt.user(), rt.savedViewAdd)
pages.PUT("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewPut)
pages.DELETE("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewDel)
pages.POST("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteAdd)
pages.DELETE("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteDel)
}
r.GET("/api/n9e/versions", func(c *gin.Context) {

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"time"
"github.com/ccfos/nightingale/v6/datasource/opensearch"
"github.com/ccfos/nightingale/v6/dskit/clickhouse"
@@ -229,6 +230,37 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
}
}
if req.PluginType == models.ELASTICSEARCH {
skipAuto := false
// 若用户输入了versionversion字符串存在且不为空则不自动获取
if req.SettingsJson != nil {
if v, ok := req.SettingsJson["version"]; ok {
switch vv := v.(type) {
case string:
if strings.TrimSpace(vv) != "" {
skipAuto = true
}
default:
if strings.TrimSpace(fmt.Sprint(vv)) != "" {
skipAuto = true
}
}
}
}
if !skipAuto {
version, err := getElasticsearchVersion(req, 10*time.Second)
if err != nil {
logger.Warningf("failed to get elasticsearch version: %v", err)
} else {
if req.SettingsJson == nil {
req.SettingsJson = make(map[string]interface{})
}
req.SettingsJson["version"] = version
}
}
}
if req.Id == 0 {
req.CreatedBy = username
req.Status = "enabled"
@@ -423,3 +455,82 @@ func (rt *Router) datasourceQuery(c *gin.Context) {
}
ginx.NewRender(c).Data(req, err)
}
// getElasticsearchVersion 该函数尝试从提供的Elasticsearch数据源中获取版本号遍历所有URL
// 直到成功获取版本号或所有URL均尝试失败为止。
func getElasticsearchVersion(ds models.Datasource, timeout time.Duration) (string, error) {
client := &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: ds.HTTPJson.TLS.SkipTlsVerify,
},
},
}
urls := make([]string, 0)
if len(ds.HTTPJson.Urls) > 0 {
urls = append(urls, ds.HTTPJson.Urls...)
}
if ds.HTTPJson.Url != "" {
urls = append(urls, ds.HTTPJson.Url)
}
if len(urls) == 0 {
return "", fmt.Errorf("no url provided")
}
var lastErr error
for _, raw := range urls {
baseURL := strings.TrimRight(raw, "/") + "/"
req, err := http.NewRequest("GET", baseURL, nil)
if err != nil {
lastErr = err
continue
}
if ds.AuthJson.BasicAuthUser != "" {
req.SetBasicAuth(ds.AuthJson.BasicAuthUser, ds.AuthJson.BasicAuthPassword)
}
for k, v := range ds.HTTPJson.Headers {
req.Header.Set(k, v)
}
resp, err := client.Do(req)
if err != nil {
lastErr = err
continue
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
lastErr = err
continue
}
if resp.StatusCode != 200 {
lastErr = fmt.Errorf("request to %s failed with status: %d body:%s", baseURL, resp.StatusCode, string(body))
continue
}
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
lastErr = err
continue
}
if version, ok := result["version"].(map[string]interface{}); ok {
if number, ok := version["number"].(string); ok && number != "" {
return number, nil
}
}
lastErr = fmt.Errorf("version not found in response from %s", baseURL)
}
if lastErr != nil {
return "", lastErr
}
return "", fmt.Errorf("failed to get elasticsearch version")
}

View File

@@ -1,18 +1,23 @@
package router
import (
"context"
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
// todo: 后续需要根据 cate 判断是否需要权限
return true
}
@@ -107,10 +112,13 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
}
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var resp []models.DataResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
var (
resp []models.DataResp
mu sync.Mutex
wg sync.WaitGroup
errs []error
rCtx = ctx.Request.Context()
)
for _, q := range f.Queries {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
@@ -122,12 +130,17 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
logger.Warningf("cluster:%d not exists", f.DatasourceId)
return nil, fmt.Errorf("cluster not exists")
}
vCtx := rCtx
if f.Cate == models.DORIS {
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, err := plug.QueryData(ctx.Request.Context(), query)
data, err := plug.QueryData(vCtx, query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()

View File

@@ -0,0 +1,144 @@
package router
import (
"net/http"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/slice"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func (rt *Router) savedViewGets(c *gin.Context) {
page := ginx.QueryStr(c, "page", "")
me := c.MustGet("user").(*models.User)
lst, err := models.SavedViewGets(rt.Ctx, page)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
favoriteMap, err := models.SavedViewFavoriteGetByUserId(rt.Ctx, me.Id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
favoriteViews := make([]models.SavedView, 0)
normalViews := make([]models.SavedView, 0)
for _, view := range lst {
visible := view.CreateBy == me.Username ||
view.PublicCate == 2 ||
(view.PublicCate == 1 && slice.HaveIntersection[int64](userGids, view.Gids))
if !visible {
continue
}
view.IsFavorite = favoriteMap[view.Id]
// 收藏的排前面
if view.IsFavorite {
favoriteViews = append(favoriteViews, view)
} else {
normalViews = append(normalViews, view)
}
}
ginx.NewRender(c).Data(append(favoriteViews, normalViews...), nil)
}
func (rt *Router) savedViewAdd(c *gin.Context) {
var f models.SavedView
ginx.BindJSON(c, &f)
me := c.MustGet("user").(*models.User)
f.Id = 0
f.CreateBy = me.Username
f.UpdateBy = me.Username
err := models.SavedViewAdd(rt.Ctx, &f)
ginx.NewRender(c).Data(f.Id, err)
}
func (rt *Router) savedViewPut(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
view, err := models.SavedViewGetById(rt.Ctx, id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
if view == nil {
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
return
}
me := c.MustGet("user").(*models.User)
// 只有创建者可以更新
if view.CreateBy != me.Username && !me.IsAdmin() {
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
return
}
var f models.SavedView
ginx.BindJSON(c, &f)
view.Name = f.Name
view.Filter = f.Filter
view.PublicCate = f.PublicCate
view.Gids = f.Gids
err = models.SavedViewUpdate(rt.Ctx, view, me.Username)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewDel(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
view, err := models.SavedViewGetById(rt.Ctx, id)
if err != nil {
ginx.NewRender(c).Data(nil, err)
return
}
if view == nil {
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
return
}
me := c.MustGet("user").(*models.User)
// 只有创建者或管理员可以删除
if view.CreateBy != me.Username && !me.IsAdmin() {
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
return
}
err = models.SavedViewDel(rt.Ctx, id)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewFavoriteAdd(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
me := c.MustGet("user").(*models.User)
err := models.UserViewFavoriteAdd(rt.Ctx, id, me.Id)
ginx.NewRender(c).Message(err)
}
func (rt *Router) savedViewFavoriteDel(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
me := c.MustGet("user").(*models.User)
err := models.UserViewFavoriteDel(rt.Ctx, id, me.Id)
ginx.NewRender(c).Message(err)
}

View File

@@ -26,6 +26,10 @@ const (
FieldId FixedField = "_id"
)
// LabelSeparator 用于分隔多个标签的分隔符
// 使用 ASCII 控制字符 Record Separator (0x1E),避免与用户数据中的 "--" 冲突
const LabelSeparator = "\x1e"
type Query struct {
Ref string `json:"ref" mapstructure:"ref"`
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
@@ -128,7 +132,7 @@ func TransferData(metric, ref string, m map[string][][]float64) []models.DataRes
}
data.Metric["__name__"] = model.LabelValue(metric)
labels := strings.Split(k, "--")
labels := strings.Split(k, LabelSeparator)
for _, label := range labels {
arr := strings.SplitN(label, "=", 2)
if len(arr) == 2 {
@@ -197,7 +201,7 @@ func GetBuckets(labelKey string, keys []string, arr []interface{}, metrics *Metr
case json.Number, string:
if !getTs {
if labels != "" {
newlabels = fmt.Sprintf("%s--%s=%v", labels, labelKey, keyValue)
newlabels = fmt.Sprintf("%s%s%s=%v", labels, LabelSeparator, labelKey, keyValue)
} else {
newlabels = fmt.Sprintf("%s=%v", labelKey, keyValue)
}

View File

@@ -67,6 +67,13 @@ func init() {
PluginType: "pgsql",
PluginTypeName: "PostgreSQL",
}
DatasourceTypes[7] = DatasourceType{
Id: 7,
Category: "logging",
PluginType: "victorialogs",
PluginTypeName: "VictoriaLogs",
}
}
type NewDatasourceFn func(settings map[string]interface{}) (Datasource, error)

View File

@@ -4,12 +4,13 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/mitchellh/mapstructure"
"github.com/toolkits/pkg/logger"
@@ -38,6 +39,8 @@ type QueryParam struct {
To int64 `json:"to" mapstructure:"to"`
TimeField string `json:"time_field" mapstructure:"time_field"`
TimeFormat string `json:"time_format" mapstructure:"time_format"`
Interval int64 `json:"interval" mapstructure:"interval"` // 查询时间间隔(秒)
Offset int `json:"offset" mapstructure:"offset"` // 延迟计算不在使用通用配置delay
}
func (d *Doris) InitClient() error {
@@ -146,13 +149,46 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
return nil, fmt.Errorf("valueKey is required")
}
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
// 设置默认 interval
if dorisQueryParam.Interval == 0 {
dorisQueryParam.Interval = 60
}
// 计算时间范围
now := time.Now().Unix()
var start, end int64
if dorisQueryParam.To != 0 && dorisQueryParam.From != 0 {
end = dorisQueryParam.To
start = dorisQueryParam.From
} else {
end = now
start = end - dorisQueryParam.Interval
}
if dorisQueryParam.Offset != 0 {
end -= int64(dorisQueryParam.Offset)
start -= int64(dorisQueryParam.Offset)
}
dorisQueryParam.From = start
dorisQueryParam.To = end
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
if err != nil {
return nil, err
}
}
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
Keys: types.Keys{
ValueKey: dorisQueryParam.Keys.ValueKey,
LabelKey: dorisQueryParam.Keys.LabelKey,
TimeKey: dorisQueryParam.Keys.TimeKey,
Offset: dorisQueryParam.Offset,
},
})
if err != nil {
@@ -180,6 +216,18 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
return nil, 0, err
}
// 记录规则预览场景下只传了interval, 没有传From和To
now := time.Now().Unix()
if dorisQueryParam.To == 0 && dorisQueryParam.From == 0 && dorisQueryParam.Interval != 0 {
dorisQueryParam.To = now
dorisQueryParam.From = now - dorisQueryParam.Interval
}
if dorisQueryParam.Offset != 0 {
dorisQueryParam.To -= int64(dorisQueryParam.Offset)
dorisQueryParam.From -= int64(dorisQueryParam.Offset)
}
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)

View File

@@ -110,25 +110,6 @@ func (e *Elasticsearch) InitClient() error {
return err
}
if e.Client != nil {
for _, addr := range e.Nodes {
if addr == "" {
continue
}
if ver, verr := e.Client.ElasticsearchVersion(addr); verr == nil {
logger.Infof("detected elasticsearch version from %s: %s", addr, ver)
e.Version = ver
e.Addr = addr
break
} else {
logger.Debugf("detect version failed from %s: %v", addr, verr)
}
}
if e.Version == "" {
logger.Warning("failed to detect elasticsearch version from configured nodes, keep configured version")
}
}
return err
}
@@ -190,10 +171,6 @@ func (e *Elasticsearch) Validate(ctx context.Context) (err error) {
e.Timeout = 60000
}
if !strings.HasPrefix(e.Version, "6") && !strings.HasPrefix(e.Version, "7") {
return fmt.Errorf("version must be 6.0+ or 7.0+")
}
return nil
}

View File

@@ -0,0 +1,339 @@
package victorialogs
import (
"context"
"fmt"
"net/url"
"reflect"
"strconv"
"time"
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/victorialogs"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/prometheus/common/model"
)
const (
VictoriaLogsType = "victorialogs"
)
// VictoriaLogs 数据源实现
type VictoriaLogs struct {
victorialogs.VictoriaLogs `json:",inline" mapstructure:",squash"`
}
// Query 查询参数
type Query struct {
Query string `json:"query" mapstructure:"query"` // LogsQL 查询语句
Start int64 `json:"start" mapstructure:"start"` // 开始时间(秒)
End int64 `json:"end" mapstructure:"end"` // 结束时间(秒)
Time int64 `json:"time" mapstructure:"time"` // 单点时间(秒)- 用于告警
Step string `json:"step" mapstructure:"step"` // 步长,如 "1m", "5m"
Limit int `json:"limit" mapstructure:"limit"` // 限制返回数量
Ref string `json:"ref" mapstructure:"ref"` // 变量引用名(如 A、B
}
// IsInstantQuery 判断是否为即时查询(告警场景)
func (q *Query) IsInstantQuery() bool {
return q.Time > 0 || (q.Start >= 0 && q.Start == q.End)
}
func init() {
datasource.RegisterDatasource(VictoriaLogsType, new(VictoriaLogs))
}
// Init 初始化配置
func (vl *VictoriaLogs) Init(settings map[string]interface{}) (datasource.Datasource, error) {
newest := new(VictoriaLogs)
err := mapstructure.Decode(settings, newest)
return newest, err
}
// InitClient 初始化客户端
func (vl *VictoriaLogs) InitClient() error {
if err := vl.InitHTTPClient(); err != nil {
return fmt.Errorf("failed to init victorialogs http client: %w", err)
}
return nil
}
// Validate 参数验证
func (vl *VictoriaLogs) Validate(ctx context.Context) error {
if vl.VictorialogsAddr == "" {
return fmt.Errorf("victorialogs.addr is required")
}
// 验证 URL 格式
_, err := url.Parse(vl.VictorialogsAddr)
if err != nil {
return fmt.Errorf("invalid victorialogs.addr: %w", err)
}
// 必须同时提供用户名和密码
if (vl.VictorialogsBasic.VictorialogsUser != "" && vl.VictorialogsBasic.VictorialogsPass == "") ||
(vl.VictorialogsBasic.VictorialogsUser == "" && vl.VictorialogsBasic.VictorialogsPass != "") {
return fmt.Errorf("both username and password must be provided")
}
// 设置默认值
if vl.Timeout == 0 {
vl.Timeout = 10000 // 默认 10 秒
}
if vl.MaxQueryRows == 0 {
vl.MaxQueryRows = 1000
}
return nil
}
// Equal 验证是否相等
func (vl *VictoriaLogs) Equal(other datasource.Datasource) bool {
o, ok := other.(*VictoriaLogs)
if !ok {
return false
}
return vl.VictorialogsAddr == o.VictorialogsAddr &&
vl.VictorialogsBasic.VictorialogsUser == o.VictorialogsBasic.VictorialogsUser &&
vl.VictorialogsBasic.VictorialogsPass == o.VictorialogsBasic.VictorialogsPass &&
vl.VictorialogsTls.SkipTlsVerify == o.VictorialogsTls.SkipTlsVerify &&
vl.Timeout == o.Timeout &&
reflect.DeepEqual(vl.Headers, o.Headers)
}
// QueryLog 日志查询
func (vl *VictoriaLogs) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, 0, fmt.Errorf("decode query param failed: %w", err)
}
logs, err := vl.Query(ctx, param.Query, param.Start, param.End, param.Limit)
if err != nil {
return nil, 0, err
}
// 转换为 interface{} 数组
result := make([]interface{}, len(logs))
for i, log := range logs {
result[i] = log
}
// 调用 HitsLogs 获取真实的 total
total, err := vl.HitsLogs(ctx, param.Query, param.Start, param.End)
if err != nil {
// 如果获取 total 失败,使用当前结果数量
total = int64(len(logs))
}
return result, total, nil
}
// QueryData 指标数据查询
func (vl *VictoriaLogs) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, fmt.Errorf("decode query param failed: %w", err)
}
// 判断使用哪个 API
if param.IsInstantQuery() {
return vl.queryDataInstant(ctx, param)
}
return vl.queryDataRange(ctx, param)
}
// queryDataInstant 告警场景,调用 /select/logsql/stats_query
func (vl *VictoriaLogs) queryDataInstant(ctx context.Context, param *Query) ([]models.DataResp, error) {
queryTime := param.Time
if queryTime == 0 {
queryTime = param.End // 如果没有 time使用 end 作为查询时间点
}
if queryTime == 0 {
queryTime = time.Now().Unix()
}
result, err := vl.StatsQuery(ctx, param.Query, queryTime)
if err != nil {
return nil, err
}
return convertPrometheusInstantToDataResp(result, param.Ref), nil
}
// queryDataRange 看图场景,调用 /select/logsql/stats_query_range
func (vl *VictoriaLogs) queryDataRange(ctx context.Context, param *Query) ([]models.DataResp, error) {
step := param.Step
if step == "" {
// 根据时间范围计算合适的步长
duration := param.End - param.Start
if duration <= 3600 {
step = "1m" // 1 小时内1 分钟步长
} else if duration <= 86400 {
step = "5m" // 1 天内5 分钟步长
} else {
step = "1h" // 超过 1 天1 小时步长
}
}
result, err := vl.StatsQueryRange(ctx, param.Query, param.Start, param.End, step)
if err != nil {
return nil, err
}
return convertPrometheusRangeToDataResp(result, param.Ref), nil
}
// convertPrometheusInstantToDataResp 将 Prometheus Instant Query 格式转换为 DataResp
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
var dataResps []models.DataResp
for _, item := range resp.Data.Result {
dataResp := models.DataResp{
Ref: ref,
}
// 转换 Metric
dataResp.Metric = make(model.Metric)
for k, v := range item.Metric {
dataResp.Metric[model.LabelName(k)] = model.LabelValue(v)
}
if len(item.Value) == 2 {
// [timestamp, value]
timestamp := item.Value[0].(float64)
value, _ := strconv.ParseFloat(item.Value[1].(string), 64)
dataResp.Values = [][]float64{
{timestamp, value},
}
}
dataResps = append(dataResps, dataResp)
}
return dataResps
}
// convertPrometheusRangeToDataResp 将 Prometheus Range Query 格式转换为 DataResp
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
var dataResps []models.DataResp
for _, item := range resp.Data.Result {
dataResp := models.DataResp{
Ref: ref,
}
// 转换 Metric
dataResp.Metric = make(model.Metric)
for k, v := range item.Metric {
dataResp.Metric[model.LabelName(k)] = model.LabelValue(v)
}
var values [][]float64
for _, v := range item.Values {
if len(v) == 2 {
timestamp := v[0].(float64)
value, _ := strconv.ParseFloat(v[1].(string), 64)
values = append(values, []float64{timestamp, value})
}
}
dataResp.Values = values
dataResps = append(dataResps, dataResp)
}
return dataResps
}
// MakeLogQuery 构造日志查询参数
func (vl *VictoriaLogs) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
q := &Query{
Start: start,
End: end,
Limit: 1000,
}
// 如果 query 是字符串,直接使用
if queryStr, ok := query.(string); ok {
q.Query = queryStr
} else if queryMap, ok := query.(map[string]interface{}); ok {
// 如果是 map尝试提取 query 字段
if qStr, exists := queryMap["query"]; exists {
q.Query = fmt.Sprintf("%v", qStr)
}
if limit, exists := queryMap["limit"]; exists {
if limitInt, ok := limit.(int); ok {
q.Limit = limitInt
} else if limitFloat, ok := limit.(float64); ok {
q.Limit = int(limitFloat)
}
}
}
return q, nil
}
// MakeTSQuery 构造时序查询参数
func (vl *VictoriaLogs) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
q := &Query{
Start: start,
End: end,
}
// 如果 query 是字符串,直接使用
if queryStr, ok := query.(string); ok {
q.Query = queryStr
} else if queryMap, ok := query.(map[string]interface{}); ok {
// 如果是 map提取相关字段
if qStr, exists := queryMap["query"]; exists {
q.Query = fmt.Sprintf("%v", qStr)
}
if step, exists := queryMap["step"]; exists {
q.Step = fmt.Sprintf("%v", step)
}
}
return q, nil
}
// QueryMapData 用于告警事件生成时获取额外数据
func (vl *VictoriaLogs) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
param := new(Query)
if err := mapstructure.Decode(query, param); err != nil {
return nil, err
}
// 扩大查询范围,解决时间滞后问题
if param.End > 0 && param.Start > 0 {
param.Start = param.Start - 30
}
// 限制只取 1 条
param.Limit = 1
logs, _, err := vl.QueryLog(ctx, param)
if err != nil {
return nil, err
}
var result []map[string]string
for _, log := range logs {
if logMap, ok := log.(map[string]interface{}); ok {
strMap := make(map[string]string)
for k, v := range logMap {
strMap[k] = fmt.Sprintf("%v", v)
}
result = append(result, strMap)
break // 只取第一条
}
}
return result, nil
}

View File

@@ -5,7 +5,7 @@ WORKDIR /app
ADD n9e /app/
ADD etc /app/etc/
ADD integrations /app/integrations/
RUN pip install requests
RUN pip install requests Jinja2
EXPOSE 17000

View File

@@ -57,3 +57,29 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
return cs.datas[cate][dsId], true
}
func (cs *Cache) Delete(cate string, dsId int64) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if _, found := cs.datas[cate]; !found {
return
}
delete(cs.datas[cate], dsId)
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
}
// GetAllIds 返回缓存中所有数据源的 ID按类型分组
func (cs *Cache) GetAllIds() map[string][]int64 {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
result := make(map[string][]int64)
for cate, dsMap := range cs.datas {
ids := make([]int64, 0, len(dsMap))
for dsId := range dsMap {
ids = append(ids, dsId)
}
result[cate] = ids
}
return result
}

View File

@@ -14,6 +14,7 @@ import (
_ "github.com/ccfos/nightingale/v6/datasource/mysql"
_ "github.com/ccfos/nightingale/v6/datasource/opensearch"
_ "github.com/ccfos/nightingale/v6/datasource/postgresql"
_ "github.com/ccfos/nightingale/v6/datasource/victorialogs"
"github.com/ccfos/nightingale/v6/dskit/tdengine"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
@@ -172,7 +173,10 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
}
func PutDatasources(items []datasource.DatasourceInfo) {
// 记录当前有效的数据源 ID按类型分组
validIds := make(map[string]map[int64]struct{})
ids := make([]int64, 0)
for _, item := range items {
if item.Type == "prometheus" {
continue
@@ -201,6 +205,12 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}
ids = append(ids, item.Id)
// 记录有效的数据源 ID
if _, ok := validIds[typ]; !ok {
validIds[typ] = make(map[int64]struct{})
}
validIds[typ][item.Id] = struct{}{}
// 异步初始化 client 不然数据源同步的会很慢
go func() {
defer func() {
@@ -212,5 +222,19 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}()
}
// 删除 items 中不存在但 DsCache 中存在的数据源
cachedIds := DsCache.GetAllIds()
for cate, dsIds := range cachedIds {
for _, dsId := range dsIds {
if _, ok := validIds[cate]; !ok {
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
DsCache.Delete(cate, dsId)
} else if _, ok := validIds[cate][dsId]; !ok {
// 该数据源 ID 在 items 中不存在,删除
DsCache.Delete(cate, dsId)
}
}
}
logger.Debugf("get plugin by type success Ids:%v", ids)
}

View File

@@ -18,13 +18,21 @@ import (
"github.com/mitchellh/mapstructure"
)
const (
ShowIndexFieldIndexType = "index_type"
ShowIndexFieldColumnName = "column_name"
ShowIndexKeyName = "key_name"
SQLShowIndex = "SHOW INDEX FROM "
)
// Doris struct to hold connection details and the connection object
type Doris struct {
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
User string `json:"doris.user" mapstructure:"doris.user"` //
Password string `json:"doris.password" mapstructure:"doris.password"` //
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
@@ -119,7 +127,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
if timeout == 0 {
timeout = 60
}
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
}
// ShowDatabases lists all databases in Doris
@@ -312,6 +320,88 @@ func (d *Doris) DescTable(ctx context.Context, database, table string) ([]*types
return columns, nil
}
type TableIndexInfo struct {
ColumnName string `json:"column_name"`
IndexName string `json:"index_name"`
IndexType string `json:"index_type"`
}
// ShowIndexes 查询表的所有索引信息
func (d *Doris) ShowIndexes(ctx context.Context, database, table string) ([]TableIndexInfo, error) {
if database == "" || table == "" {
return nil, fmt.Errorf("database and table names cannot be empty")
}
tCtx, cancel := d.createTimeoutContext(ctx)
defer cancel()
db, err := d.NewConn(tCtx, database)
if err != nil {
return nil, err
}
querySQL := fmt.Sprintf("%s `%s`.`%s`", SQLShowIndex, database, table)
rows, err := db.QueryContext(tCtx, querySQL)
if err != nil {
return nil, fmt.Errorf("failed to query indexes: %w", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("failed to get columns: %w", err)
}
count := len(columns)
// 预映射列索引
colIdx := map[string]int{
ShowIndexKeyName: -1,
ShowIndexFieldColumnName: -1,
ShowIndexFieldIndexType: -1,
}
for i, col := range columns {
lCol := strings.ToLower(col)
if lCol == ShowIndexKeyName || lCol == ShowIndexFieldColumnName || lCol == ShowIndexFieldIndexType {
colIdx[lCol] = i
}
}
var result []TableIndexInfo
for rows.Next() {
// 使用 sql.RawBytes 可以接受任何类型并转为 string避免复杂的类型断言
scanArgs := make([]interface{}, count)
values := make([]sql.RawBytes, count)
for i := range values {
scanArgs[i] = &values[i]
}
if err = rows.Scan(scanArgs...); err != nil {
return nil, err
}
info := TableIndexInfo{}
if i := colIdx[ShowIndexFieldColumnName]; i != -1 && i < count {
info.ColumnName = string(values[i])
}
if i := colIdx[ShowIndexKeyName]; i != -1 && i < count {
info.IndexName = string(values[i])
}
if i := colIdx[ShowIndexFieldIndexType]; i != -1 && i < count {
info.IndexType = string(values[i])
}
if info.ColumnName != "" {
result = append(result, info)
}
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating rows: %w", err)
}
return result, nil
}
// SelectRows selects rows from a specified table in Doris based on a given query with MaxQueryRows check
func (d *Doris) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) {
sql := fmt.Sprintf("SELECT * FROM %s.%s", database, table)

View File

@@ -10,13 +10,14 @@ const (
TimeseriesAggregationTimestamp = "__ts__"
)
// QueryLogs 查询日志
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 等同于 Query()
return d.Query(ctx, query)
return d.Query(ctx, query, true)
}
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
values, err := d.QueryTimeseries(ctx, query)
if err != nil {

View File

@@ -15,6 +15,10 @@ const (
TimeFieldFormatDateTime = "datetime"
)
type noNeedCheckMaxRowKey struct{}
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
// 不再拼接SQL, 完全信赖用户的输入
type QueryParam struct {
Database string `json:"database"`
@@ -39,7 +43,7 @@ var (
)
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
// 校验SQL的合法性, 过滤掉 write请求
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
for _, item := range sqlItem {
@@ -48,10 +52,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
}
}
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
if checkMaxRow {
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
}
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
@@ -63,8 +69,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
// 使用 Query 方法执行查询Query方法内部已包含MaxQueryRows检查
rows, err := d.Query(ctx, query)
// 默认需要检查,除非调用方声明不需要检查
checkMaxRow := true
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
checkMaxRow = false
}
rows, err := d.Query(ctx, query, checkMaxRow)
if err != nil {
return nil, err
}

View File

@@ -158,7 +158,10 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe
}
if !exists {
ts = float64(time.Now().Unix()) // Default to current time if not specified
// Default to current time if not specified
// 大多数情况下offset为空
// 对于记录规则延迟计算的情况,统计值的时间戳需要有偏移,以便跟统计值对应
ts = float64(time.Now().Unix()) - float64(keys.Offset)
}
valuePair := []float64{ts, value}
@@ -257,6 +260,14 @@ func parseTimeFromString(str, format string) (time.Time, error) {
if parsedTime, err := time.Parse(time.RFC3339, str); err == nil {
return parsedTime, nil
}
if parsedTime, err := time.Parse(time.DateTime, str); err == nil {
return parsedTime, nil
}
if parsedTime, err := time.Parse("2006-01-02 15:04:05.000000", str); err == nil {
return parsedTime, nil
}
if parsedTime, err := time.Parse(time.RFC3339Nano, str); err == nil {
return parsedTime, nil
}

View File

@@ -48,4 +48,5 @@ type Keys struct {
LabelKey string `json:"labelKey" mapstructure:"labelKey"` // 多个用空格分隔
TimeKey string `json:"timeKey" mapstructure:"timeKey"`
TimeFormat string `json:"timeFormat" mapstructure:"timeFormat"` // not used anymore
Offset int `json:"offset" mapstructure:"offset"`
}

View File

@@ -0,0 +1,304 @@
package victorialogs
import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
)
type VictoriaLogs struct {
VictorialogsAddr string `json:"victorialogs.addr" mapstructure:"victorialogs.addr"`
VictorialogsBasic struct {
VictorialogsUser string `json:"victorialogs.user" mapstructure:"victorialogs.user"`
VictorialogsPass string `json:"victorialogs.password" mapstructure:"victorialogs.password"`
IsEncrypt bool `json:"victorialogs.is_encrypt" mapstructure:"victorialogs.is_encrypt"`
} `json:"victorialogs.basic" mapstructure:"victorialogs.basic"`
VictorialogsTls struct {
SkipTlsVerify bool `json:"victorialogs.tls.skip_tls_verify" mapstructure:"victorialogs.tls.skip_tls_verify"`
} `json:"victorialogs.tls" mapstructure:"victorialogs.tls"`
Headers map[string]string `json:"victorialogs.headers" mapstructure:"victorialogs.headers"`
Timeout int64 `json:"victorialogs.timeout" mapstructure:"victorialogs.timeout"` // millis
ClusterName string `json:"victorialogs.cluster_name" mapstructure:"victorialogs.cluster_name"`
MaxQueryRows int `json:"victorialogs.max_query_rows" mapstructure:"victorialogs.max_query_rows"`
HTTPClient *http.Client `json:"-" mapstructure:"-"`
}
// LogEntry 日志条目
type LogEntry map[string]interface{}
// PrometheusResponse Prometheus 响应格式
type PrometheusResponse struct {
Status string `json:"status"`
Data PrometheusData `json:"data"`
Error string `json:"error,omitempty"`
}
// PrometheusData Prometheus 数据部分
type PrometheusData struct {
ResultType string `json:"resultType"`
Result []PrometheusItem `json:"result"`
}
// PrometheusItem Prometheus 数据项
type PrometheusItem struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value,omitempty"` // [timestamp, value]
Values [][]interface{} `json:"values,omitempty"` // [[timestamp, value], ...]
}
// HitsResult hits 查询响应
type HitsResult struct {
Hits []struct {
Total int64 `json:"total"`
}
}
// InitHTTPClient 初始化 HTTP 客户端
func (vl *VictoriaLogs) InitHTTPClient() error {
transport := &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: vl.VictorialogsTls.SkipTlsVerify,
},
}
timeout := time.Duration(vl.Timeout) * time.Millisecond
if timeout == 0 {
timeout = 60 * time.Second
}
vl.HTTPClient = &http.Client{
Transport: transport,
Timeout: timeout,
}
return nil
}
// Query 执行日志查询
// GET/POST /select/logsql/query?query=<query>&start=<start>&end=<end>&limit=<limit>
func (vl *VictoriaLogs) Query(ctx context.Context, query string, start, end int64, limit int) ([]LogEntry, error) {
params := url.Values{}
params.Set("query", query)
if start > 0 {
params.Set("start", strconv.FormatInt(start, 10))
}
if end > 0 {
params.Set("end", strconv.FormatInt(end, 10))
}
if limit > 0 {
params.Set("limit", strconv.Itoa(limit))
} else {
params.Set("limit", strconv.Itoa(vl.MaxQueryRows)) // 默认 1000 条
}
endpoint := fmt.Sprintf("%s/select/logsql/query", vl.VictorialogsAddr)
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("query failed: status=%d, body=%s", resp.StatusCode, string(body))
}
// VictoriaLogs returns NDJSON format (one JSON object per line)
var logs []LogEntry
scanner := bufio.NewScanner(strings.NewReader(string(body)))
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var entry LogEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return nil, fmt.Errorf("decode log entry failed: %w, line=%s", err, line)
}
logs = append(logs, entry)
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scan response failed: %w", err)
}
return logs, nil
}
// StatsQuery 执行统计查询(单点时间)
// POST /select/logsql/stats_query?query=<query>&time=<time>
func (vl *VictoriaLogs) StatsQuery(ctx context.Context, query string, time int64) (*PrometheusResponse, error) {
params := url.Values{}
params.Set("query", query)
if time > 0 {
params.Set("time", strconv.FormatInt(time, 10))
}
endpoint := fmt.Sprintf("%s/select/logsql/stats_query", vl.VictorialogsAddr)
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("stats query failed: status=%d, body=%s", resp.StatusCode, string(body))
}
var result PrometheusResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
}
if result.Status != "success" {
return nil, fmt.Errorf("query failed: %s", result.Error)
}
return &result, nil
}
// StatsQueryRange 执行统计查询(时间范围)
// POST /select/logsql/stats_query_range?query=<query>&start=<start>&end=<end>&step=<step>
func (vl *VictoriaLogs) StatsQueryRange(ctx context.Context, query string, start, end int64, step string) (*PrometheusResponse, error) {
params := url.Values{}
params.Set("query", query)
if start > 0 {
params.Set("start", strconv.FormatInt(start, 10))
}
if end > 0 {
params.Set("end", strconv.FormatInt(end, 10))
}
if step != "" {
params.Set("step", step)
}
endpoint := fmt.Sprintf("%s/select/logsql/stats_query_range", vl.VictorialogsAddr)
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response body failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("stats query range failed: status=%d, body=%s", resp.StatusCode, string(body))
}
var result PrometheusResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
}
if result.Status != "success" {
return nil, fmt.Errorf("query failed: %s", result.Error)
}
return &result, nil
}
// HitsLogs 返回查询命中的日志数量,用于计算 total
// POST /select/logsql/hits?query=<query>&start=<start>&end=<end>
func (vl *VictoriaLogs) HitsLogs(ctx context.Context, query string, start, end int64) (int64, error) {
params := url.Values{}
params.Set("query", query)
if start > 0 {
params.Set("start", strconv.FormatInt(start, 10))
}
if end > 0 {
params.Set("end", strconv.FormatInt(end, 10))
}
endpoint := fmt.Sprintf("%s/select/logsql/hits", vl.VictorialogsAddr)
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
if err != nil {
return 0, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("read response body failed: %w", err)
}
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("hits query failed: status=%d, body=%s", resp.StatusCode, string(body))
}
var result HitsResult
if err := json.Unmarshal(body, &result); err != nil {
return 0, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
}
if len(result.Hits) == 0 {
return 0, nil
}
return result.Hits[0].Total, nil
}
// doRequest 执行 HTTP 请求
func (vl *VictoriaLogs) doRequest(ctx context.Context, method, endpoint string, params url.Values) (*http.Response, error) {
var req *http.Request
var err error
if method == "GET" {
fullURL := endpoint
if len(params) > 0 {
fullURL = fmt.Sprintf("%s?%s", endpoint, params.Encode())
}
req, err = http.NewRequestWithContext(ctx, method, fullURL, nil)
} else {
// POST with form data
req, err = http.NewRequestWithContext(ctx, method, endpoint, strings.NewReader(params.Encode()))
if err == nil {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}
}
if err != nil {
return nil, fmt.Errorf("create request failed: %w", err)
}
if vl.VictorialogsBasic.VictorialogsUser != "" {
req.SetBasicAuth(vl.VictorialogsBasic.VictorialogsUser, vl.VictorialogsBasic.VictorialogsPass)
}
// Custom Headers
for k, v := range vl.Headers {
req.Header.Set(k, v)
}
return vl.HTTPClient.Do(req)
}

View File

@@ -0,0 +1,136 @@
package victorialogs
import (
"context"
"testing"
"time"
)
var v = VictoriaLogs{
VictorialogsAddr: "http://127.0.0.1:9428",
Headers: make(map[string]string),
Timeout: 10000, // 10 seconds in milliseconds
}
func TestVictoriaLogs_InitHTTPClient(t *testing.T) {
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
if v.HTTPClient == nil {
t.Fatal("HTTPClient should not be nil after initialization")
}
}
func TestVictoriaLogs_Query(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Query logs with basic query
now := time.Now().UnixNano()
start := now - int64(time.Hour) // 1 hour ago
end := now
logs, err := v.Query(ctx, "*", start, end, 10)
if err != nil {
t.Fatalf("Query failed: %v", err)
}
t.Logf("Query returned %d log entries", len(logs))
for i, log := range logs {
t.Logf("Log[%d]: %v", i, log)
}
}
func TestVictoriaLogs_StatsQuery(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Stats query with count
now := time.Now().UnixNano()
result, err := v.StatsQuery(ctx, "* | stats count() as total", now)
if err != nil {
t.Fatalf("StatsQuery failed: %v", err)
}
t.Logf("StatsQuery result: status=%s, resultType=%s", result.Status, result.Data.ResultType)
for i, item := range result.Data.Result {
t.Logf("Result[%d]: metric=%v, value=%v", i, item.Metric, item.Value)
}
}
func TestVictoriaLogs_StatsQueryRange(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Stats query range
now := time.Now().UnixNano()
start := now - int64(time.Hour) // 1 hour ago
end := now
result, err := v.StatsQueryRange(ctx, "* | stats count() as total", start, end, "5m")
if err != nil {
t.Fatalf("StatsQueryRange failed: %v", err)
}
t.Logf("StatsQueryRange result: status=%s, resultType=%s", result.Status, result.Data.ResultType)
for i, item := range result.Data.Result {
t.Logf("Result[%d]: metric=%v, values count=%d", i, item.Metric, len(item.Values))
}
}
func TestVictoriaLogs_HitsLogs(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Get total hits count
now := time.Now().UnixNano()
start := now - int64(time.Hour) // 1 hour ago
end := now
count, err := v.HitsLogs(ctx, "*", start, end)
if err != nil {
t.Fatalf("HitsLogs failed: %v", err)
}
t.Logf("HitsLogs total count: %d", count)
}
func TestVictoriaLogs_QueryWithFilter(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Query with a filter condition
now := time.Now().UnixNano()
start := now - int64(time.Hour)
end := now
logs, err := v.Query(ctx, "_stream:{app=\"test\"}", start, end, 5)
if err != nil {
t.Fatalf("Query with filter failed: %v", err)
}
t.Logf("Query with filter returned %d log entries", len(logs))
}
func TestVictoriaLogs_StatsQueryByField(t *testing.T) {
ctx := context.Background()
if err := v.InitHTTPClient(); err != nil {
t.Fatalf("InitHTTPClient failed: %v", err)
}
// Stats query grouped by field
now := time.Now().UnixNano()
result, err := v.StatsQuery(ctx, "* | stats by (level) count() as cnt", now)
if err != nil {
t.Fatalf("StatsQuery by field failed: %v", err)
}
t.Logf("StatsQuery by field result: status=%s", result.Status)
for i, item := range result.Data.Result {
t.Logf("Result[%d]: metric=%v, value=%v", i, item.Metric, item.Value)
}
}

View File

@@ -33,7 +33,8 @@ const (
DORIS = "doris"
OPENSEARCH = "opensearch"
CLICKHOUSE = "ck"
CLICKHOUSE = "ck"
VICTORIALOGS = "victorialogs"
)
const (
@@ -1219,7 +1220,8 @@ func (ar *AlertRule) IsInnerRule() bool {
ar.Cate == MYSQL ||
ar.Cate == POSTGRESQL ||
ar.Cate == DORIS ||
ar.Cate == OPENSEARCH
ar.Cate == OPENSEARCH ||
ar.Cate == VICTORIALOGS
}
func (ar *AlertRule) GetRuleType() string {

View File

@@ -76,6 +76,7 @@ func (bc *BuiltinComponent) Add(ctx *ctx.Context, username string) error {
bc.CreatedAt = now
bc.UpdatedAt = now
bc.CreatedBy = username
bc.UpdatedBy = username
return Insert(ctx, bc)
}

View File

@@ -1,6 +1,9 @@
package models
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
@@ -48,9 +51,70 @@ func EsIndexPatternDel(ctx *ctx.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
// 检查是否有告警规则引用了这些 index pattern
for _, id := range ids {
alertRules, err := GetAlertRulesByEsIndexPatternId(ctx, id)
if err != nil {
return errors.WithMessage(err, "failed to check alert rules")
}
if len(alertRules) > 0 {
names := make([]string, 0, len(alertRules))
for _, rule := range alertRules {
names = append(names, rule.Name)
}
return errors.Errorf("index pattern(id=%d) is used by alert rules: %s", id, strings.Join(names, ", "))
}
}
return DB(ctx).Where("id in ?", ids).Delete(new(EsIndexPattern)).Error
}
// GetAlertRulesByEsIndexPatternId 获取引用了指定 index pattern 的告警规则
func GetAlertRulesByEsIndexPatternId(ctx *ctx.Context, indexPatternId int64) ([]*AlertRule, error) {
// index_pattern 存储在 rule_config JSON 字段的 queries 数组中
// 格式如: {"queries":[{"index_type":"index_pattern","index_pattern":123,...}]}
// 先用 LIKE 粗筛,再在代码中精确过滤
pattern := fmt.Sprintf(`%%"index_pattern":%d%%`, indexPatternId)
var candidates []*AlertRule
err := DB(ctx).Where("rule_config LIKE ?", pattern).Find(&candidates).Error
if err != nil {
return nil, err
}
// 精确过滤:解析 JSON 检查 index_pattern 字段值是否精确匹配
var alertRules []*AlertRule
for _, rule := range candidates {
if ruleUsesIndexPattern(rule.RuleConfig, indexPatternId) {
alertRules = append(alertRules, rule)
}
}
return alertRules, nil
}
// ruleUsesIndexPattern 检查告警规则的 rule_config 是否引用了指定的 index_pattern
func ruleUsesIndexPattern(ruleConfig string, indexPatternId int64) bool {
var config struct {
Queries []struct {
IndexPattern int64 `json:"index_pattern"`
} `json:"queries"`
}
if err := json.Unmarshal([]byte(ruleConfig), &config); err != nil {
return false
}
for _, query := range config.Queries {
if query.IndexPattern == indexPatternId {
return true
}
}
return false
}
func (ei *EsIndexPattern) Update(ctx *ctx.Context, eip EsIndexPattern) error {
if ei.Name != eip.Name || ei.DatasourceId != eip.DatasourceId {
exists, err := EsIndexPatternExists(ctx, ei.Id, eip.DatasourceId, eip.Name)

View File

@@ -68,7 +68,8 @@ func MigrateTables(db *gorm.DB) error {
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.NotificationRecord{}, &models.TargetBusiGroup{},
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{}}
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{},
&models.SavedView{}, &models.UserViewFavorite{}}
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})

View File

@@ -31,7 +31,8 @@ type Webhook struct {
RetryCount int `json:"retry_count"`
RetryInterval int `json:"retry_interval"`
Batch int `json:"batch"`
Client *http.Client `json:"-"`
Client *http.Client `json:"-"`
}
func (w *Webhook) Hash() string {

174
models/saved_view.go Normal file
View File

@@ -0,0 +1,174 @@
package models
import (
"errors"
"strings"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
var (
ErrSavedViewNameEmpty = errors.New("saved view name is blank")
ErrSavedViewPageEmpty = errors.New("saved view page is blank")
ErrSavedViewNotFound = errors.New("saved view not found")
ErrSavedViewNameDuplicate = errors.New("saved view name already exists in this page")
)
type SavedView struct {
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
Name string `json:"name" gorm:"type:varchar(255);not null"`
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
Filter string `json:"filter" gorm:"type:text"`
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
// 查询时填充的字段
IsFavorite bool `json:"is_favorite" gorm:"-"`
}
func (SavedView) TableName() string {
return "saved_view"
}
func (sv *SavedView) Verify() error {
sv.Name = strings.TrimSpace(sv.Name)
if sv.Name == "" {
return ErrSavedViewNameEmpty
}
if sv.Page == "" {
return ErrSavedViewPageEmpty
}
return nil
}
func SavedViewCheckDuplicateName(c *ctx.Context, page, name string, excludeId int64) error {
var count int64
session := DB(c).Model(&SavedView{}).Where("page = ? AND name = ? AND public_cate = 2", page, name)
if excludeId > 0 {
session = session.Where("id != ?", excludeId)
}
if err := session.Count(&count).Error; err != nil {
return err
}
if count > 0 {
return ErrSavedViewNameDuplicate
}
return nil
}
func SavedViewAdd(c *ctx.Context, sv *SavedView) error {
if err := sv.Verify(); err != nil {
return err
}
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复
if sv.PublicCate == 2 {
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, 0); err != nil {
return err
}
}
now := time.Now().Unix()
sv.CreateAt = now
sv.UpdateAt = now
return Insert(c, sv)
}
func SavedViewUpdate(c *ctx.Context, sv *SavedView, username string) error {
if err := sv.Verify(); err != nil {
return err
}
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复(排除自身)
if sv.PublicCate == 2 {
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, sv.Id); err != nil {
return err
}
}
sv.UpdateAt = time.Now().Unix()
sv.UpdateBy = username
return DB(c).Model(sv).Select("name", "filter", "public_cate", "gids", "update_at", "update_by").Updates(sv).Error
}
func SavedViewDel(c *ctx.Context, id int64) error {
// 先删除收藏关联
if err := DB(c).Where("view_id = ?", id).Delete(&UserViewFavorite{}).Error; err != nil {
return err
}
return DB(c).Where("id = ?", id).Delete(&SavedView{}).Error
}
func SavedViewGetById(c *ctx.Context, id int64) (*SavedView, error) {
var sv SavedView
err := DB(c).Where("id = ?", id).First(&sv).Error
if err != nil {
return nil, err
}
return &sv, nil
}
func SavedViewGets(c *ctx.Context, page string) ([]SavedView, error) {
var views []SavedView
session := DB(c).Where("page = ?", page)
if err := session.Order("update_at DESC").Find(&views).Error; err != nil {
return nil, err
}
return views, nil
}
func SavedViewFavoriteGetByUserId(c *ctx.Context, userId int64) (map[int64]bool, error) {
var favorites []UserViewFavorite
if err := DB(c).Where("user_id = ?", userId).Find(&favorites).Error; err != nil {
return nil, err
}
result := make(map[int64]bool)
for _, f := range favorites {
result[f.ViewId] = true
}
return result, nil
}
type UserViewFavorite struct {
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
ViewId int64 `json:"view_id" gorm:"index"`
UserId int64 `json:"user_id" gorm:"index"`
CreateAt int64 `json:"create_at"`
}
func (UserViewFavorite) TableName() string {
return "user_view_favorite"
}
func UserViewFavoriteAdd(c *ctx.Context, viewId, userId int64) error {
var count int64
if err := DB(c).Model(&SavedView{}).Where("id = ?", viewId).Count(&count).Error; err != nil {
return err
}
if count == 0 {
return ErrSavedViewNotFound
}
if err := DB(c).Model(&UserViewFavorite{}).Where("view_id = ? AND user_id = ?", viewId, userId).Count(&count).Error; err != nil {
return err
}
if count > 0 {
return nil // 已收藏,直接返回成功
}
fav := &UserViewFavorite{
ViewId: viewId,
UserId: userId,
CreateAt: time.Now().Unix(),
}
return DB(c).Create(fav).Error
}
func UserViewFavoriteDel(c *ctx.Context, viewId, userId int64) error {
return DB(c).Where("view_id = ? AND user_id = ?", viewId, userId).Delete(&UserViewFavorite{}).Error
}

View File

@@ -1,12 +1,14 @@
package models
import (
"encoding/json"
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
"unicode"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
@@ -142,6 +144,42 @@ func (u *User) CheckGroupPermission(ctx *ctx.Context, groupIds []int64) error {
return nil
}
// stripInvisibleChars removes invisible Unicode characters from a string
// This includes zero-width spaces, control characters, and other invisible chars
func stripInvisibleChars(s string) string {
return strings.Map(func(r rune) rune {
// Keep printable characters and common whitespace (space, tab, newline)
if unicode.IsPrint(r) || r == ' ' || r == '\t' || r == '\n' || r == '\r' {
return r
}
// Remove invisible characters
return -1
}, s)
}
// stripInvisibleCharsFromContacts removes invisible characters from Contacts JSON values
func stripInvisibleCharsFromContacts(contacts ormx.JSONObj) ormx.JSONObj {
if len(contacts) == 0 {
return contacts
}
var contactsMap map[string]string
if err := json.Unmarshal(contacts, &contactsMap); err != nil {
return contacts
}
for k, v := range contactsMap {
contactsMap[k] = stripInvisibleChars(v)
}
result, err := json.Marshal(contactsMap)
if err != nil {
return contacts
}
return ormx.JSONObj(result)
}
func (u *User) Verify() error {
u.Username = strings.TrimSpace(u.Username)
@@ -165,6 +203,9 @@ func (u *User) Verify() error {
return errors.New("Email invalid")
}
// Strip invisible characters from Contacts values
u.Contacts = stripInvisibleCharsFromContacts(u.Contacts)
if u.Phone != "" {
return u.EncryptPhone()
}

View File

@@ -201,6 +201,11 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "业务组中仍有自愈脚本",
"Some target busigroups still in the BusiGroup": "业务组中仍有监控对象",
"saved view not found": "保存的视图不存在",
"saved view name is blank": "视图名称不能为空",
"saved view page is blank": "视图页面不能为空",
"saved view name already exists in this page": "该页面下已存在同名的公开视图",
"---------zh_CN--------": "---------zh_CN--------"
},
"zh_HK": {
@@ -405,6 +410,11 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "業務組中仍有自愈腳本",
"Some target busigroups still in the BusiGroup": "業務組中仍有監控對象",
"saved view not found": "保存的視圖不存在",
"saved view name is blank": "視圖名稱不能為空",
"saved view page is blank": "視圖頁面不能為空",
"saved view name already exists in this page": "該頁面下已存在同名的公開視圖",
"---------zh_HK--------": "---------zh_HK--------"
},
"ja_JP": {
@@ -606,6 +616,11 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "ビジネスグループにまだ自己回復スクリプトがあります",
"Some target busigroups still in the BusiGroup": "ビジネスグループにまだ監視対象があります",
"saved view not found": "保存されたビューが見つかりません",
"saved view name is blank": "ビュー名を空にすることはできません",
"saved view page is blank": "ビューページを空にすることはできません",
"saved view name already exists in this page": "このページには同名の公開ビューが既に存在します",
"---------ja_JP--------": "---------ja_JP--------"
},
"ru_RU": {
@@ -807,6 +822,11 @@ var I18N = `{
"Some recovery scripts still in the BusiGroup": "В бизнес-группе еще есть скрипты самоисцеления",
"Some target busigroups still in the BusiGroup": "В бизнес-группе еще есть объекты мониторинга",
"saved view not found": "Сохраненный вид не найден",
"saved view name is blank": "Название вида не может быть пустым",
"saved view page is blank": "Страница вида не может быть пустой",
"saved view name already exists in this page": "На этой странице уже существует публичный вид с таким названием",
"---------ru_RU--------": "---------ru_RU--------"
}
}`

View File

@@ -85,15 +85,15 @@ func (pc *PromClientMap) loadFromDatabase() {
var internalAddr string
for k, v := range ds.SettingsJson {
if strings.Contains(k, "write_addr") {
writeAddr = v.(string)
writeAddr = strings.TrimSpace(v.(string))
} else if strings.Contains(k, "internal_addr") && v.(string) != "" {
internalAddr = v.(string)
internalAddr = strings.TrimSpace(v.(string))
}
}
po := PromOption{
ClusterName: ds.Name,
Url: ds.HTTPJson.Url,
Url: strings.TrimSpace(ds.HTTPJson.Url),
WriteAddr: writeAddr,
BasicAuthUser: ds.AuthJson.BasicAuthUser,
BasicAuthPass: ds.AuthJson.BasicAuthPassword,