mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
29 Commits
release-20
...
dev21-old
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46083d741d | ||
|
|
3eeb705b39 | ||
|
|
8d87e69ee7 | ||
|
|
3da85d8e28 | ||
|
|
b50410b88a | ||
|
|
c98241b3fd | ||
|
|
b30caf625b | ||
|
|
32e8b961c2 | ||
|
|
2ff0a8fdbb | ||
|
|
7ff74d0948 | ||
|
|
da58d825c0 | ||
|
|
0014b77c4d | ||
|
|
fc7fdde2d5 | ||
|
|
61b63fc75c | ||
|
|
80f564ec63 | ||
|
|
203c2a885b | ||
|
|
9bee3e1379 | ||
|
|
c214580e87 | ||
|
|
f6faed0659 | ||
|
|
990819d6c1 | ||
|
|
5fff517cce | ||
|
|
db1bb34277 | ||
|
|
81e37c9ed4 | ||
|
|
27ec6a2d04 | ||
|
|
372a8cff2f | ||
|
|
68850800ed | ||
|
|
717f7f1c4b | ||
|
|
82e1e715ad | ||
|
|
d1058639fc |
@@ -282,13 +282,16 @@ func PipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
|
||||
|
||||
tagMatch := true
|
||||
if len(pipeline.LabelFilters) > 0 {
|
||||
for i := range pipeline.LabelFilters {
|
||||
if pipeline.LabelFilters[i].Func == "" {
|
||||
pipeline.LabelFilters[i].Func = pipeline.LabelFilters[i].Op
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
labelFiltersCopy := make([]models.TagFilter, len(pipeline.LabelFilters))
|
||||
copy(labelFiltersCopy, pipeline.LabelFilters)
|
||||
for i := range labelFiltersCopy {
|
||||
if labelFiltersCopy[i].Func == "" {
|
||||
labelFiltersCopy[i].Func = labelFiltersCopy[i].Op
|
||||
}
|
||||
}
|
||||
|
||||
tagFilters, err := models.ParseTagFilter(pipeline.LabelFilters)
|
||||
tagFilters, err := models.ParseTagFilter(labelFiltersCopy)
|
||||
if err != nil {
|
||||
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v", err, event, pipeline)
|
||||
return false
|
||||
@@ -298,7 +301,11 @@ func PipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
|
||||
|
||||
attributesMatch := true
|
||||
if len(pipeline.AttrFilters) > 0 {
|
||||
tagFilters, err := models.ParseTagFilter(pipeline.AttrFilters)
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
attrFiltersCopy := make([]models.TagFilter, len(pipeline.AttrFilters))
|
||||
copy(attrFiltersCopy, pipeline.AttrFilters)
|
||||
|
||||
tagFilters, err := models.ParseTagFilter(attrFiltersCopy)
|
||||
if err != nil {
|
||||
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v err:%v", tagFilters, event, pipeline, err)
|
||||
return false
|
||||
@@ -379,13 +386,16 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
|
||||
|
||||
tagMatch := true
|
||||
if len(notifyConfig.LabelKeys) > 0 {
|
||||
for i := range notifyConfig.LabelKeys {
|
||||
if notifyConfig.LabelKeys[i].Func == "" {
|
||||
notifyConfig.LabelKeys[i].Func = notifyConfig.LabelKeys[i].Op
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
labelKeysCopy := make([]models.TagFilter, len(notifyConfig.LabelKeys))
|
||||
copy(labelKeysCopy, notifyConfig.LabelKeys)
|
||||
for i := range labelKeysCopy {
|
||||
if labelKeysCopy[i].Func == "" {
|
||||
labelKeysCopy[i].Func = labelKeysCopy[i].Op
|
||||
}
|
||||
}
|
||||
|
||||
tagFilters, err := models.ParseTagFilter(notifyConfig.LabelKeys)
|
||||
tagFilters, err := models.ParseTagFilter(labelKeysCopy)
|
||||
if err != nil {
|
||||
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v", err, event, notifyConfig)
|
||||
return fmt.Errorf("failed to parse tag filter: %v", err)
|
||||
@@ -399,7 +409,11 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
|
||||
|
||||
attributesMatch := true
|
||||
if len(notifyConfig.Attributes) > 0 {
|
||||
tagFilters, err := models.ParseTagFilter(notifyConfig.Attributes)
|
||||
// Deep copy to avoid concurrent map writes on cached objects
|
||||
attributesCopy := make([]models.TagFilter, len(notifyConfig.Attributes))
|
||||
copy(attributesCopy, notifyConfig.Attributes)
|
||||
|
||||
tagFilters, err := models.ParseTagFilter(attributesCopy)
|
||||
if err != nil {
|
||||
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v err:%v", tagFilters, event, notifyConfig, err)
|
||||
return fmt.Errorf("failed to parse tag filter: %v", err)
|
||||
@@ -811,12 +825,12 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
|
||||
|
||||
if len(t.Host) == 0 {
|
||||
sender.CallIbex(e.ctx, t.TplId, event.TargetIdent,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event, "")
|
||||
continue
|
||||
}
|
||||
for _, host := range t.Host {
|
||||
sender.CallIbex(e.ctx, t.TplId, host,
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event)
|
||||
e.taskTplsCache, e.targetCache, e.userCache, event, "")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,7 +151,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
if len(message) == 0 {
|
||||
logger.Infof("rule_eval:%s finished, duration:%v", arw.Key(), time.Since(begin))
|
||||
} else {
|
||||
logger.Infof("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
|
||||
logger.Warningf("rule_eval:%s finished, duration:%v, message:%s", arw.Key(), time.Since(begin), message)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -186,8 +186,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
|
||||
message = "failed to get anomaly points"
|
||||
message = fmt.Sprintf("failed to get anomaly points: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -86,30 +86,33 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
|
||||
return
|
||||
}
|
||||
|
||||
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event)
|
||||
CallIbex(ctx, id, host, c.taskTplCache, c.targetCache, c.userCache, event, "")
|
||||
}
|
||||
|
||||
func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
|
||||
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent, args string) (int64, error) {
|
||||
logger.Infof("event_callback_ibex: id: %d, host: %s, args: %s, event: %+v", id, host, args, event)
|
||||
|
||||
tpl := taskTplCache.Get(id)
|
||||
if tpl == nil {
|
||||
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
|
||||
return
|
||||
err := fmt.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
}
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
can, err := CanDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
|
||||
return
|
||||
err = fmt.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if !can {
|
||||
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
|
||||
return
|
||||
err = fmt.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
tagsMap := make(map[string]string)
|
||||
@@ -133,11 +136,16 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
tags, err := json.Marshal(tagsMap)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
|
||||
return
|
||||
err = fmt.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// call ibex
|
||||
taskArgs := tpl.Args
|
||||
if args != "" {
|
||||
taskArgs = args
|
||||
}
|
||||
in := models.TaskForm{
|
||||
Title: tpl.Title + " FH: " + host,
|
||||
Account: tpl.Account,
|
||||
@@ -146,7 +154,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
Timeout: tpl.Timeout,
|
||||
Pause: tpl.Pause,
|
||||
Script: tpl.Script,
|
||||
Args: tpl.Args,
|
||||
Args: taskArgs,
|
||||
Stdin: string(tags),
|
||||
Action: "start",
|
||||
Creator: tpl.UpdateBy,
|
||||
@@ -156,8 +164,9 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
|
||||
return
|
||||
err = fmt.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// write db
|
||||
@@ -178,11 +187,14 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
}
|
||||
|
||||
if err = record.Add(ctx); err != nil {
|
||||
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
|
||||
err = fmt.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
|
||||
logger.Errorf("%s", err)
|
||||
return id, err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
func CanDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
user := userCache.GetByUsername(username)
|
||||
if user != nil && user.IsAdmin() {
|
||||
return true, nil
|
||||
|
||||
@@ -13,10 +13,53 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// webhookClientCache 缓存 http.Client,避免每次请求都创建新的 Client 导致连接泄露
|
||||
var webhookClientCache sync.Map // key: clientKey (string), value: *http.Client
|
||||
|
||||
// 相同配置的 webhook 会复用同一个 Client
|
||||
func getWebhookClient(webhook *models.Webhook) *http.Client {
|
||||
clientKey := webhook.Hash()
|
||||
|
||||
if client, ok := webhookClientCache.Load(clientKey); ok {
|
||||
return client.(*http.Client)
|
||||
}
|
||||
|
||||
// 创建新的 Client
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: webhook.SkipVerify},
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
}
|
||||
|
||||
if poster.UseProxy(webhook.Url) {
|
||||
transport.Proxy = http.ProxyFromEnvironment
|
||||
}
|
||||
|
||||
timeout := webhook.Timeout
|
||||
if timeout <= 0 {
|
||||
timeout = 10
|
||||
}
|
||||
|
||||
newClient := &http.Client{
|
||||
Timeout: time.Duration(timeout) * time.Second,
|
||||
Transport: transport,
|
||||
}
|
||||
|
||||
// 使用 LoadOrStore 确保并发安全,避免重复创建
|
||||
actual, loaded := webhookClientCache.LoadOrStore(clientKey, newClient)
|
||||
if loaded {
|
||||
return actual.(*http.Client)
|
||||
}
|
||||
|
||||
return newClient
|
||||
}
|
||||
|
||||
func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats) (bool, string, error) {
|
||||
channel := "webhook"
|
||||
if webhook.Type == models.RuleCallback {
|
||||
@@ -55,25 +98,13 @@ func sendWebhook(webhook *models.Webhook, event interface{}, stats *astats.Stats
|
||||
req.Header.Set(conf.Headers[i], conf.Headers[i+1])
|
||||
}
|
||||
}
|
||||
insecureSkipVerify := false
|
||||
if webhook != nil {
|
||||
insecureSkipVerify = webhook.SkipVerify
|
||||
}
|
||||
|
||||
if conf.Client == nil {
|
||||
logger.Warningf("event_%s, event:%s, url: [%s], error: [%s]", channel, string(bs), conf.Url, "client is nil")
|
||||
conf.Client = &http.Client{
|
||||
Timeout: time.Duration(conf.Timeout) * time.Second,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: insecureSkipVerify},
|
||||
},
|
||||
}
|
||||
}
|
||||
// 使用全局 Client 缓存,避免每次请求都创建新的 Client 导致连接泄露
|
||||
client := getWebhookClient(conf)
|
||||
|
||||
stats.AlertNotifyTotal.WithLabelValues(channel).Inc()
|
||||
var resp *http.Response
|
||||
var body []byte
|
||||
resp, err = conf.Client.Do(req)
|
||||
resp, err = client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
stats.AlertNotifyErrorTotal.WithLabelValues(channel).Inc()
|
||||
|
||||
@@ -55,4 +55,10 @@ var Plugins = []Plugin{
|
||||
Type: "opensearch",
|
||||
TypeName: "OpenSearch",
|
||||
},
|
||||
{
|
||||
Id: 10,
|
||||
Category: "logging",
|
||||
Type: "victorialogs",
|
||||
TypeName: "VictoriaLogs",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
|
||||
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
|
||||
|
||||
pages.POST("/ds-query", rt.auth(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
|
||||
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
|
||||
|
||||
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
|
||||
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)
|
||||
@@ -569,6 +569,14 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/pagerduty-service-list/:id", rt.auth(), rt.user(), rt.pagerDutyNotifyServicesGet)
|
||||
pages.GET("/notify-channel-config", rt.auth(), rt.user(), rt.notifyChannelGetBy)
|
||||
pages.GET("/notify-channel-config/idents", rt.notifyChannelIdentsGet)
|
||||
|
||||
// saved view 查询条件保存相关路由
|
||||
pages.GET("/saved-views", rt.auth(), rt.user(), rt.savedViewGets)
|
||||
pages.POST("/saved-views", rt.auth(), rt.user(), rt.savedViewAdd)
|
||||
pages.PUT("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewPut)
|
||||
pages.DELETE("/saved-view/:id", rt.auth(), rt.user(), rt.savedViewDel)
|
||||
pages.POST("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteAdd)
|
||||
pages.DELETE("/saved-view/:id/favorite", rt.auth(), rt.user(), rt.savedViewFavoriteDel)
|
||||
}
|
||||
|
||||
r.GET("/api/n9e/versions", func(c *gin.Context) {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource/opensearch"
|
||||
"github.com/ccfos/nightingale/v6/dskit/clickhouse"
|
||||
@@ -229,6 +230,37 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
if req.PluginType == models.ELASTICSEARCH {
|
||||
skipAuto := false
|
||||
// 若用户输入了version(version字符串存在且不为空),则不自动获取
|
||||
if req.SettingsJson != nil {
|
||||
if v, ok := req.SettingsJson["version"]; ok {
|
||||
switch vv := v.(type) {
|
||||
case string:
|
||||
if strings.TrimSpace(vv) != "" {
|
||||
skipAuto = true
|
||||
}
|
||||
default:
|
||||
if strings.TrimSpace(fmt.Sprint(vv)) != "" {
|
||||
skipAuto = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !skipAuto {
|
||||
version, err := getElasticsearchVersion(req, 10*time.Second)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get elasticsearch version: %v", err)
|
||||
} else {
|
||||
if req.SettingsJson == nil {
|
||||
req.SettingsJson = make(map[string]interface{})
|
||||
}
|
||||
req.SettingsJson["version"] = version
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if req.Id == 0 {
|
||||
req.CreatedBy = username
|
||||
req.Status = "enabled"
|
||||
@@ -423,3 +455,82 @@ func (rt *Router) datasourceQuery(c *gin.Context) {
|
||||
}
|
||||
ginx.NewRender(c).Data(req, err)
|
||||
}
|
||||
|
||||
// getElasticsearchVersion 该函数尝试从提供的Elasticsearch数据源中获取版本号,遍历所有URL,
|
||||
// 直到成功获取版本号或所有URL均尝试失败为止。
|
||||
func getElasticsearchVersion(ds models.Datasource, timeout time.Duration) (string, error) {
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: ds.HTTPJson.TLS.SkipTlsVerify,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
urls := make([]string, 0)
|
||||
if len(ds.HTTPJson.Urls) > 0 {
|
||||
urls = append(urls, ds.HTTPJson.Urls...)
|
||||
}
|
||||
if ds.HTTPJson.Url != "" {
|
||||
urls = append(urls, ds.HTTPJson.Url)
|
||||
}
|
||||
if len(urls) == 0 {
|
||||
return "", fmt.Errorf("no url provided")
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for _, raw := range urls {
|
||||
baseURL := strings.TrimRight(raw, "/") + "/"
|
||||
req, err := http.NewRequest("GET", baseURL, nil)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
if ds.AuthJson.BasicAuthUser != "" {
|
||||
req.SetBasicAuth(ds.AuthJson.BasicAuthUser, ds.AuthJson.BasicAuthPassword)
|
||||
}
|
||||
|
||||
for k, v := range ds.HTTPJson.Headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
lastErr = fmt.Errorf("request to %s failed with status: %d body:%s", baseURL, resp.StatusCode, string(body))
|
||||
continue
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
if version, ok := result["version"].(map[string]interface{}); ok {
|
||||
if number, ok := version["number"].(string); ok && number != "" {
|
||||
return number, nil
|
||||
}
|
||||
}
|
||||
|
||||
lastErr = fmt.Errorf("version not found in response from %s", baseURL)
|
||||
}
|
||||
|
||||
if lastErr != nil {
|
||||
return "", lastErr
|
||||
}
|
||||
return "", fmt.Errorf("failed to get elasticsearch version")
|
||||
}
|
||||
|
||||
@@ -1,18 +1,23 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dscache"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
|
||||
|
||||
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
// todo: 后续需要根据 cate 判断是否需要权限
|
||||
return true
|
||||
}
|
||||
@@ -107,10 +112,13 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
}
|
||||
|
||||
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
|
||||
var resp []models.DataResp
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
var (
|
||||
resp []models.DataResp
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
errs []error
|
||||
rCtx = ctx.Request.Context()
|
||||
)
|
||||
|
||||
for _, q := range f.Queries {
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
@@ -122,12 +130,17 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
|
||||
logger.Warningf("cluster:%d not exists", f.DatasourceId)
|
||||
return nil, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
vCtx := rCtx
|
||||
if f.Cate == models.DORIS {
|
||||
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
data, err := plug.QueryData(ctx.Request.Context(), query)
|
||||
data, err := plug.QueryData(vCtx, query)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", query, err)
|
||||
mu.Lock()
|
||||
|
||||
144
center/router/router_saved_view.go
Normal file
144
center/router/router_saved_view.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/slice"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
func (rt *Router) savedViewGets(c *gin.Context) {
|
||||
page := ginx.QueryStr(c, "page", "")
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
lst, err := models.SavedViewGets(rt.Ctx, page)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
favoriteMap, err := models.SavedViewFavoriteGetByUserId(rt.Ctx, me.Id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
|
||||
favoriteViews := make([]models.SavedView, 0)
|
||||
normalViews := make([]models.SavedView, 0)
|
||||
|
||||
for _, view := range lst {
|
||||
visible := view.CreateBy == me.Username ||
|
||||
view.PublicCate == 2 ||
|
||||
(view.PublicCate == 1 && slice.HaveIntersection[int64](userGids, view.Gids))
|
||||
|
||||
if !visible {
|
||||
continue
|
||||
}
|
||||
|
||||
view.IsFavorite = favoriteMap[view.Id]
|
||||
|
||||
// 收藏的排前面
|
||||
if view.IsFavorite {
|
||||
favoriteViews = append(favoriteViews, view)
|
||||
} else {
|
||||
normalViews = append(normalViews, view)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(append(favoriteViews, normalViews...), nil)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewAdd(c *gin.Context) {
|
||||
var f models.SavedView
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
f.Id = 0
|
||||
f.CreateBy = me.Username
|
||||
f.UpdateBy = me.Username
|
||||
|
||||
err := models.SavedViewAdd(rt.Ctx, &f)
|
||||
ginx.NewRender(c).Data(f.Id, err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewPut(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
view, err := models.SavedViewGetById(rt.Ctx, id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
if view == nil {
|
||||
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
|
||||
return
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
// 只有创建者可以更新
|
||||
if view.CreateBy != me.Username && !me.IsAdmin() {
|
||||
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
|
||||
return
|
||||
}
|
||||
|
||||
var f models.SavedView
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
view.Name = f.Name
|
||||
view.Filter = f.Filter
|
||||
view.PublicCate = f.PublicCate
|
||||
view.Gids = f.Gids
|
||||
|
||||
err = models.SavedViewUpdate(rt.Ctx, view, me.Username)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewDel(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
view, err := models.SavedViewGetById(rt.Ctx, id)
|
||||
if err != nil {
|
||||
ginx.NewRender(c).Data(nil, err)
|
||||
return
|
||||
}
|
||||
if view == nil {
|
||||
ginx.NewRender(c, http.StatusNotFound).Message("saved view not found")
|
||||
return
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
// 只有创建者或管理员可以删除
|
||||
if view.CreateBy != me.Username && !me.IsAdmin() {
|
||||
ginx.NewRender(c, http.StatusForbidden).Message("forbidden")
|
||||
return
|
||||
}
|
||||
|
||||
err = models.SavedViewDel(rt.Ctx, id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewFavoriteAdd(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
err := models.UserViewFavoriteAdd(rt.Ctx, id, me.Id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) savedViewFavoriteDel(c *gin.Context) {
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
me := c.MustGet("user").(*models.User)
|
||||
|
||||
err := models.UserViewFavoriteDel(rt.Ctx, id, me.Id)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
@@ -26,6 +26,10 @@ const (
|
||||
FieldId FixedField = "_id"
|
||||
)
|
||||
|
||||
// LabelSeparator 用于分隔多个标签的分隔符
|
||||
// 使用 ASCII 控制字符 Record Separator (0x1E),避免与用户数据中的 "--" 冲突
|
||||
const LabelSeparator = "\x1e"
|
||||
|
||||
type Query struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
IndexType string `json:"index_type" mapstructure:"index_type"` // 普通索引:index 索引模式:index_pattern
|
||||
@@ -128,7 +132,7 @@ func TransferData(metric, ref string, m map[string][][]float64) []models.DataRes
|
||||
}
|
||||
|
||||
data.Metric["__name__"] = model.LabelValue(metric)
|
||||
labels := strings.Split(k, "--")
|
||||
labels := strings.Split(k, LabelSeparator)
|
||||
for _, label := range labels {
|
||||
arr := strings.SplitN(label, "=", 2)
|
||||
if len(arr) == 2 {
|
||||
@@ -197,7 +201,7 @@ func GetBuckets(labelKey string, keys []string, arr []interface{}, metrics *Metr
|
||||
case json.Number, string:
|
||||
if !getTs {
|
||||
if labels != "" {
|
||||
newlabels = fmt.Sprintf("%s--%s=%v", labels, labelKey, keyValue)
|
||||
newlabels = fmt.Sprintf("%s%s%s=%v", labels, LabelSeparator, labelKey, keyValue)
|
||||
} else {
|
||||
newlabels = fmt.Sprintf("%s=%v", labelKey, keyValue)
|
||||
}
|
||||
|
||||
@@ -67,6 +67,13 @@ func init() {
|
||||
PluginType: "pgsql",
|
||||
PluginTypeName: "PostgreSQL",
|
||||
}
|
||||
|
||||
DatasourceTypes[7] = DatasourceType{
|
||||
Id: 7,
|
||||
Category: "logging",
|
||||
PluginType: "victorialogs",
|
||||
PluginTypeName: "VictoriaLogs",
|
||||
}
|
||||
}
|
||||
|
||||
type NewDatasourceFn func(settings map[string]interface{}) (Datasource, error)
|
||||
|
||||
@@ -4,12 +4,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -38,6 +39,8 @@ type QueryParam struct {
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
TimeField string `json:"time_field" mapstructure:"time_field"`
|
||||
TimeFormat string `json:"time_format" mapstructure:"time_format"`
|
||||
Interval int64 `json:"interval" mapstructure:"interval"` // 查询时间间隔(秒)
|
||||
Offset int `json:"offset" mapstructure:"offset"` // 延迟计算,不在使用通用配置delay
|
||||
}
|
||||
|
||||
func (d *Doris) InitClient() error {
|
||||
@@ -146,13 +149,46 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
|
||||
return nil, fmt.Errorf("valueKey is required")
|
||||
}
|
||||
|
||||
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
|
||||
// 设置默认 interval
|
||||
if dorisQueryParam.Interval == 0 {
|
||||
dorisQueryParam.Interval = 60
|
||||
}
|
||||
|
||||
// 计算时间范围
|
||||
now := time.Now().Unix()
|
||||
var start, end int64
|
||||
if dorisQueryParam.To != 0 && dorisQueryParam.From != 0 {
|
||||
end = dorisQueryParam.To
|
||||
start = dorisQueryParam.From
|
||||
} else {
|
||||
end = now
|
||||
start = end - dorisQueryParam.Interval
|
||||
}
|
||||
|
||||
if dorisQueryParam.Offset != 0 {
|
||||
end -= int64(dorisQueryParam.Offset)
|
||||
start -= int64(dorisQueryParam.Offset)
|
||||
}
|
||||
|
||||
dorisQueryParam.From = start
|
||||
dorisQueryParam.To = end
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
ValueKey: dorisQueryParam.Keys.ValueKey,
|
||||
LabelKey: dorisQueryParam.Keys.LabelKey,
|
||||
TimeKey: dorisQueryParam.Keys.TimeKey,
|
||||
Offset: dorisQueryParam.Offset,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
@@ -180,6 +216,18 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 记录规则预览场景下,只传了interval, 没有传From和To
|
||||
now := time.Now().Unix()
|
||||
if dorisQueryParam.To == 0 && dorisQueryParam.From == 0 && dorisQueryParam.Interval != 0 {
|
||||
dorisQueryParam.To = now
|
||||
dorisQueryParam.From = now - dorisQueryParam.Interval
|
||||
}
|
||||
|
||||
if dorisQueryParam.Offset != 0 {
|
||||
dorisQueryParam.To -= int64(dorisQueryParam.Offset)
|
||||
dorisQueryParam.From -= int64(dorisQueryParam.Offset)
|
||||
}
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
|
||||
@@ -110,25 +110,6 @@ func (e *Elasticsearch) InitClient() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if e.Client != nil {
|
||||
for _, addr := range e.Nodes {
|
||||
if addr == "" {
|
||||
continue
|
||||
}
|
||||
if ver, verr := e.Client.ElasticsearchVersion(addr); verr == nil {
|
||||
logger.Infof("detected elasticsearch version from %s: %s", addr, ver)
|
||||
e.Version = ver
|
||||
e.Addr = addr
|
||||
break
|
||||
} else {
|
||||
logger.Debugf("detect version failed from %s: %v", addr, verr)
|
||||
}
|
||||
}
|
||||
if e.Version == "" {
|
||||
logger.Warning("failed to detect elasticsearch version from configured nodes, keep configured version")
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -190,10 +171,6 @@ func (e *Elasticsearch) Validate(ctx context.Context) (err error) {
|
||||
e.Timeout = 60000
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(e.Version, "6") && !strings.HasPrefix(e.Version, "7") {
|
||||
return fmt.Errorf("version must be 6.0+ or 7.0+")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
339
datasource/victorialogs/victorialogs.go
Normal file
339
datasource/victorialogs/victorialogs.go
Normal file
@@ -0,0 +1,339 @@
|
||||
package victorialogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/dskit/victorialogs"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
|
||||
const (
|
||||
VictoriaLogsType = "victorialogs"
|
||||
)
|
||||
|
||||
// VictoriaLogs 数据源实现
|
||||
type VictoriaLogs struct {
|
||||
victorialogs.VictoriaLogs `json:",inline" mapstructure:",squash"`
|
||||
}
|
||||
|
||||
// Query 查询参数
|
||||
type Query struct {
|
||||
Query string `json:"query" mapstructure:"query"` // LogsQL 查询语句
|
||||
Start int64 `json:"start" mapstructure:"start"` // 开始时间(秒)
|
||||
End int64 `json:"end" mapstructure:"end"` // 结束时间(秒)
|
||||
Time int64 `json:"time" mapstructure:"time"` // 单点时间(秒)- 用于告警
|
||||
Step string `json:"step" mapstructure:"step"` // 步长,如 "1m", "5m"
|
||||
Limit int `json:"limit" mapstructure:"limit"` // 限制返回数量
|
||||
Ref string `json:"ref" mapstructure:"ref"` // 变量引用名(如 A、B)
|
||||
}
|
||||
|
||||
// IsInstantQuery 判断是否为即时查询(告警场景)
|
||||
func (q *Query) IsInstantQuery() bool {
|
||||
return q.Time > 0 || (q.Start >= 0 && q.Start == q.End)
|
||||
}
|
||||
|
||||
func init() {
|
||||
datasource.RegisterDatasource(VictoriaLogsType, new(VictoriaLogs))
|
||||
}
|
||||
|
||||
// Init 初始化配置
|
||||
func (vl *VictoriaLogs) Init(settings map[string]interface{}) (datasource.Datasource, error) {
|
||||
newest := new(VictoriaLogs)
|
||||
err := mapstructure.Decode(settings, newest)
|
||||
return newest, err
|
||||
}
|
||||
|
||||
// InitClient 初始化客户端
|
||||
func (vl *VictoriaLogs) InitClient() error {
|
||||
if err := vl.InitHTTPClient(); err != nil {
|
||||
return fmt.Errorf("failed to init victorialogs http client: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate 参数验证
|
||||
func (vl *VictoriaLogs) Validate(ctx context.Context) error {
|
||||
if vl.VictorialogsAddr == "" {
|
||||
return fmt.Errorf("victorialogs.addr is required")
|
||||
}
|
||||
|
||||
// 验证 URL 格式
|
||||
_, err := url.Parse(vl.VictorialogsAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid victorialogs.addr: %w", err)
|
||||
}
|
||||
|
||||
// 必须同时提供用户名和密码
|
||||
if (vl.VictorialogsBasic.VictorialogsUser != "" && vl.VictorialogsBasic.VictorialogsPass == "") ||
|
||||
(vl.VictorialogsBasic.VictorialogsUser == "" && vl.VictorialogsBasic.VictorialogsPass != "") {
|
||||
return fmt.Errorf("both username and password must be provided")
|
||||
}
|
||||
|
||||
// 设置默认值
|
||||
if vl.Timeout == 0 {
|
||||
vl.Timeout = 10000 // 默认 10 秒
|
||||
}
|
||||
|
||||
if vl.MaxQueryRows == 0 {
|
||||
vl.MaxQueryRows = 1000
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Equal 验证是否相等
|
||||
func (vl *VictoriaLogs) Equal(other datasource.Datasource) bool {
|
||||
o, ok := other.(*VictoriaLogs)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return vl.VictorialogsAddr == o.VictorialogsAddr &&
|
||||
vl.VictorialogsBasic.VictorialogsUser == o.VictorialogsBasic.VictorialogsUser &&
|
||||
vl.VictorialogsBasic.VictorialogsPass == o.VictorialogsBasic.VictorialogsPass &&
|
||||
vl.VictorialogsTls.SkipTlsVerify == o.VictorialogsTls.SkipTlsVerify &&
|
||||
vl.Timeout == o.Timeout &&
|
||||
reflect.DeepEqual(vl.Headers, o.Headers)
|
||||
}
|
||||
|
||||
// QueryLog 日志查询
|
||||
func (vl *VictoriaLogs) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, 0, fmt.Errorf("decode query param failed: %w", err)
|
||||
}
|
||||
|
||||
logs, err := vl.Query(ctx, param.Query, param.Start, param.End, param.Limit)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 转换为 interface{} 数组
|
||||
result := make([]interface{}, len(logs))
|
||||
for i, log := range logs {
|
||||
result[i] = log
|
||||
}
|
||||
|
||||
// 调用 HitsLogs 获取真实的 total
|
||||
total, err := vl.HitsLogs(ctx, param.Query, param.Start, param.End)
|
||||
if err != nil {
|
||||
// 如果获取 total 失败,使用当前结果数量
|
||||
total = int64(len(logs))
|
||||
}
|
||||
|
||||
return result, total, nil
|
||||
}
|
||||
|
||||
// QueryData 指标数据查询
|
||||
func (vl *VictoriaLogs) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, fmt.Errorf("decode query param failed: %w", err)
|
||||
}
|
||||
|
||||
// 判断使用哪个 API
|
||||
if param.IsInstantQuery() {
|
||||
return vl.queryDataInstant(ctx, param)
|
||||
}
|
||||
return vl.queryDataRange(ctx, param)
|
||||
}
|
||||
|
||||
// queryDataInstant 告警场景,调用 /select/logsql/stats_query
|
||||
func (vl *VictoriaLogs) queryDataInstant(ctx context.Context, param *Query) ([]models.DataResp, error) {
|
||||
queryTime := param.Time
|
||||
if queryTime == 0 {
|
||||
queryTime = param.End // 如果没有 time,使用 end 作为查询时间点
|
||||
}
|
||||
if queryTime == 0 {
|
||||
queryTime = time.Now().Unix()
|
||||
}
|
||||
|
||||
result, err := vl.StatsQuery(ctx, param.Query, queryTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertPrometheusInstantToDataResp(result, param.Ref), nil
|
||||
}
|
||||
|
||||
// queryDataRange 看图场景,调用 /select/logsql/stats_query_range
|
||||
func (vl *VictoriaLogs) queryDataRange(ctx context.Context, param *Query) ([]models.DataResp, error) {
|
||||
step := param.Step
|
||||
if step == "" {
|
||||
// 根据时间范围计算合适的步长
|
||||
duration := param.End - param.Start
|
||||
if duration <= 3600 {
|
||||
step = "1m" // 1 小时内,1 分钟步长
|
||||
} else if duration <= 86400 {
|
||||
step = "5m" // 1 天内,5 分钟步长
|
||||
} else {
|
||||
step = "1h" // 超过 1 天,1 小时步长
|
||||
}
|
||||
}
|
||||
|
||||
result, err := vl.StatsQueryRange(ctx, param.Query, param.Start, param.End, step)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertPrometheusRangeToDataResp(result, param.Ref), nil
|
||||
}
|
||||
|
||||
// convertPrometheusInstantToDataResp 将 Prometheus Instant Query 格式转换为 DataResp
|
||||
func convertPrometheusInstantToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
|
||||
var dataResps []models.DataResp
|
||||
|
||||
for _, item := range resp.Data.Result {
|
||||
dataResp := models.DataResp{
|
||||
Ref: ref,
|
||||
}
|
||||
|
||||
// 转换 Metric
|
||||
dataResp.Metric = make(model.Metric)
|
||||
for k, v := range item.Metric {
|
||||
dataResp.Metric[model.LabelName(k)] = model.LabelValue(v)
|
||||
}
|
||||
|
||||
if len(item.Value) == 2 {
|
||||
// [timestamp, value]
|
||||
timestamp := item.Value[0].(float64)
|
||||
value, _ := strconv.ParseFloat(item.Value[1].(string), 64)
|
||||
|
||||
dataResp.Values = [][]float64{
|
||||
{timestamp, value},
|
||||
}
|
||||
}
|
||||
|
||||
dataResps = append(dataResps, dataResp)
|
||||
}
|
||||
|
||||
return dataResps
|
||||
}
|
||||
|
||||
// convertPrometheusRangeToDataResp 将 Prometheus Range Query 格式转换为 DataResp
|
||||
func convertPrometheusRangeToDataResp(resp *victorialogs.PrometheusResponse, ref string) []models.DataResp {
|
||||
var dataResps []models.DataResp
|
||||
|
||||
for _, item := range resp.Data.Result {
|
||||
dataResp := models.DataResp{
|
||||
Ref: ref,
|
||||
}
|
||||
|
||||
// 转换 Metric
|
||||
dataResp.Metric = make(model.Metric)
|
||||
for k, v := range item.Metric {
|
||||
dataResp.Metric[model.LabelName(k)] = model.LabelValue(v)
|
||||
}
|
||||
|
||||
var values [][]float64
|
||||
for _, v := range item.Values {
|
||||
if len(v) == 2 {
|
||||
timestamp := v[0].(float64)
|
||||
value, _ := strconv.ParseFloat(v[1].(string), 64)
|
||||
|
||||
values = append(values, []float64{timestamp, value})
|
||||
}
|
||||
}
|
||||
|
||||
dataResp.Values = values
|
||||
dataResps = append(dataResps, dataResp)
|
||||
}
|
||||
|
||||
return dataResps
|
||||
}
|
||||
|
||||
// MakeLogQuery 构造日志查询参数
|
||||
func (vl *VictoriaLogs) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
q := &Query{
|
||||
Start: start,
|
||||
End: end,
|
||||
Limit: 1000,
|
||||
}
|
||||
|
||||
// 如果 query 是字符串,直接使用
|
||||
if queryStr, ok := query.(string); ok {
|
||||
q.Query = queryStr
|
||||
} else if queryMap, ok := query.(map[string]interface{}); ok {
|
||||
// 如果是 map,尝试提取 query 字段
|
||||
if qStr, exists := queryMap["query"]; exists {
|
||||
q.Query = fmt.Sprintf("%v", qStr)
|
||||
}
|
||||
if limit, exists := queryMap["limit"]; exists {
|
||||
if limitInt, ok := limit.(int); ok {
|
||||
q.Limit = limitInt
|
||||
} else if limitFloat, ok := limit.(float64); ok {
|
||||
q.Limit = int(limitFloat)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// MakeTSQuery 构造时序查询参数
|
||||
func (vl *VictoriaLogs) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
q := &Query{
|
||||
Start: start,
|
||||
End: end,
|
||||
}
|
||||
|
||||
// 如果 query 是字符串,直接使用
|
||||
if queryStr, ok := query.(string); ok {
|
||||
q.Query = queryStr
|
||||
} else if queryMap, ok := query.(map[string]interface{}); ok {
|
||||
// 如果是 map,提取相关字段
|
||||
if qStr, exists := queryMap["query"]; exists {
|
||||
q.Query = fmt.Sprintf("%v", qStr)
|
||||
}
|
||||
if step, exists := queryMap["step"]; exists {
|
||||
q.Step = fmt.Sprintf("%v", step)
|
||||
}
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
// QueryMapData 用于告警事件生成时获取额外数据
|
||||
func (vl *VictoriaLogs) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(query, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 扩大查询范围,解决时间滞后问题
|
||||
if param.End > 0 && param.Start > 0 {
|
||||
param.Start = param.Start - 30
|
||||
}
|
||||
|
||||
// 限制只取 1 条
|
||||
param.Limit = 1
|
||||
|
||||
logs, _, err := vl.QueryLog(ctx, param)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []map[string]string
|
||||
for _, log := range logs {
|
||||
if logMap, ok := log.(map[string]interface{}); ok {
|
||||
strMap := make(map[string]string)
|
||||
for k, v := range logMap {
|
||||
strMap[k] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
result = append(result, strMap)
|
||||
break // 只取第一条
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -5,7 +5,7 @@ WORKDIR /app
|
||||
ADD n9e /app/
|
||||
ADD etc /app/etc/
|
||||
ADD integrations /app/integrations/
|
||||
RUN pip install requests
|
||||
RUN pip install requests Jinja2
|
||||
|
||||
EXPOSE 17000
|
||||
|
||||
|
||||
@@ -57,3 +57,29 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
|
||||
|
||||
return cs.datas[cate][dsId], true
|
||||
}
|
||||
|
||||
func (cs *Cache) Delete(cate string, dsId int64) {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
if _, found := cs.datas[cate]; !found {
|
||||
return
|
||||
}
|
||||
delete(cs.datas[cate], dsId)
|
||||
|
||||
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
|
||||
}
|
||||
|
||||
// GetAllIds 返回缓存中所有数据源的 ID,按类型分组
|
||||
func (cs *Cache) GetAllIds() map[string][]int64 {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
result := make(map[string][]int64)
|
||||
for cate, dsMap := range cs.datas {
|
||||
ids := make([]int64, 0, len(dsMap))
|
||||
for dsId := range dsMap {
|
||||
ids = append(ids, dsId)
|
||||
}
|
||||
result[cate] = ids
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/mysql"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/opensearch"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/postgresql"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/victorialogs"
|
||||
"github.com/ccfos/nightingale/v6/dskit/tdengine"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
@@ -172,7 +173,10 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
|
||||
}
|
||||
|
||||
func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
// 记录当前有效的数据源 ID,按类型分组
|
||||
validIds := make(map[string]map[int64]struct{})
|
||||
ids := make([]int64, 0)
|
||||
|
||||
for _, item := range items {
|
||||
if item.Type == "prometheus" {
|
||||
continue
|
||||
@@ -201,6 +205,12 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
ids = append(ids, item.Id)
|
||||
|
||||
// 记录有效的数据源 ID
|
||||
if _, ok := validIds[typ]; !ok {
|
||||
validIds[typ] = make(map[int64]struct{})
|
||||
}
|
||||
validIds[typ][item.Id] = struct{}{}
|
||||
|
||||
// 异步初始化 client 不然数据源同步的会很慢
|
||||
go func() {
|
||||
defer func() {
|
||||
@@ -212,5 +222,19 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}()
|
||||
}
|
||||
|
||||
// 删除 items 中不存在但 DsCache 中存在的数据源
|
||||
cachedIds := DsCache.GetAllIds()
|
||||
for cate, dsIds := range cachedIds {
|
||||
for _, dsId := range dsIds {
|
||||
if _, ok := validIds[cate]; !ok {
|
||||
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
|
||||
DsCache.Delete(cate, dsId)
|
||||
} else if _, ok := validIds[cate][dsId]; !ok {
|
||||
// 该数据源 ID 在 items 中不存在,删除
|
||||
DsCache.Delete(cate, dsId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -18,13 +18,21 @@ import (
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
const (
|
||||
ShowIndexFieldIndexType = "index_type"
|
||||
ShowIndexFieldColumnName = "column_name"
|
||||
ShowIndexKeyName = "key_name"
|
||||
|
||||
SQLShowIndex = "SHOW INDEX FROM "
|
||||
)
|
||||
|
||||
// Doris struct to hold connection details and the connection object
|
||||
type Doris struct {
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
|
||||
User string `json:"doris.user" mapstructure:"doris.user"` //
|
||||
Password string `json:"doris.password" mapstructure:"doris.password"` //
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
|
||||
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
|
||||
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
|
||||
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
|
||||
@@ -119,7 +127,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
|
||||
}
|
||||
|
||||
// ShowDatabases lists all databases in Doris
|
||||
@@ -312,6 +320,88 @@ func (d *Doris) DescTable(ctx context.Context, database, table string) ([]*types
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
type TableIndexInfo struct {
|
||||
ColumnName string `json:"column_name"`
|
||||
IndexName string `json:"index_name"`
|
||||
IndexType string `json:"index_type"`
|
||||
}
|
||||
|
||||
// ShowIndexes 查询表的所有索引信息
|
||||
func (d *Doris) ShowIndexes(ctx context.Context, database, table string) ([]TableIndexInfo, error) {
|
||||
if database == "" || table == "" {
|
||||
return nil, fmt.Errorf("database and table names cannot be empty")
|
||||
}
|
||||
|
||||
tCtx, cancel := d.createTimeoutContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
db, err := d.NewConn(tCtx, database)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
querySQL := fmt.Sprintf("%s `%s`.`%s`", SQLShowIndex, database, table)
|
||||
rows, err := db.QueryContext(tCtx, querySQL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query indexes: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
columns, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get columns: %w", err)
|
||||
}
|
||||
count := len(columns)
|
||||
|
||||
// 预映射列索引
|
||||
colIdx := map[string]int{
|
||||
ShowIndexKeyName: -1,
|
||||
ShowIndexFieldColumnName: -1,
|
||||
ShowIndexFieldIndexType: -1,
|
||||
}
|
||||
for i, col := range columns {
|
||||
lCol := strings.ToLower(col)
|
||||
if lCol == ShowIndexKeyName || lCol == ShowIndexFieldColumnName || lCol == ShowIndexFieldIndexType {
|
||||
colIdx[lCol] = i
|
||||
}
|
||||
}
|
||||
|
||||
var result []TableIndexInfo
|
||||
for rows.Next() {
|
||||
// 使用 sql.RawBytes 可以接受任何类型并转为 string,避免复杂的类型断言
|
||||
scanArgs := make([]interface{}, count)
|
||||
values := make([]sql.RawBytes, count)
|
||||
for i := range values {
|
||||
scanArgs[i] = &values[i]
|
||||
}
|
||||
|
||||
if err = rows.Scan(scanArgs...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := TableIndexInfo{}
|
||||
if i := colIdx[ShowIndexFieldColumnName]; i != -1 && i < count {
|
||||
info.ColumnName = string(values[i])
|
||||
}
|
||||
if i := colIdx[ShowIndexKeyName]; i != -1 && i < count {
|
||||
info.IndexName = string(values[i])
|
||||
}
|
||||
if i := colIdx[ShowIndexFieldIndexType]; i != -1 && i < count {
|
||||
info.IndexType = string(values[i])
|
||||
}
|
||||
|
||||
if info.ColumnName != "" {
|
||||
result = append(result, info)
|
||||
}
|
||||
}
|
||||
|
||||
if err = rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating rows: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SelectRows selects rows from a specified table in Doris based on a given query with MaxQueryRows check
|
||||
func (d *Doris) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) {
|
||||
sql := fmt.Sprintf("SELECT * FROM %s.%s", database, table)
|
||||
|
||||
@@ -10,13 +10,14 @@ const (
|
||||
TimeseriesAggregationTimestamp = "__ts__"
|
||||
)
|
||||
|
||||
// QueryLogs 查询日志
|
||||
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
|
||||
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
// 等同于 Query()
|
||||
return d.Query(ctx, query)
|
||||
return d.Query(ctx, query, true)
|
||||
}
|
||||
|
||||
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
|
||||
values, err := d.QueryTimeseries(ctx, query)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,6 +15,10 @@ const (
|
||||
TimeFieldFormatDateTime = "datetime"
|
||||
)
|
||||
|
||||
type noNeedCheckMaxRowKey struct{}
|
||||
|
||||
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
|
||||
|
||||
// 不再拼接SQL, 完全信赖用户的输入
|
||||
type QueryParam struct {
|
||||
Database string `json:"database"`
|
||||
@@ -39,7 +43,7 @@ var (
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
|
||||
// 校验SQL的合法性, 过滤掉 write请求
|
||||
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
|
||||
for _, item := range sqlItem {
|
||||
@@ -48,10 +52,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
}
|
||||
}
|
||||
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if checkMaxRow {
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
|
||||
@@ -63,8 +69,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
|
||||
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
|
||||
// 使用 Query 方法执行查询,Query方法内部已包含MaxQueryRows检查
|
||||
rows, err := d.Query(ctx, query)
|
||||
// 默认需要检查,除非调用方声明不需要检查
|
||||
checkMaxRow := true
|
||||
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
|
||||
checkMaxRow = false
|
||||
}
|
||||
rows, err := d.Query(ctx, query, checkMaxRow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -158,7 +158,10 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe
|
||||
}
|
||||
|
||||
if !exists {
|
||||
ts = float64(time.Now().Unix()) // Default to current time if not specified
|
||||
// Default to current time if not specified
|
||||
// 大多数情况下offset为空
|
||||
// 对于记录规则延迟计算的情况,统计值的时间戳需要有偏移,以便跟统计值对应
|
||||
ts = float64(time.Now().Unix()) - float64(keys.Offset)
|
||||
}
|
||||
|
||||
valuePair := []float64{ts, value}
|
||||
@@ -257,6 +260,14 @@ func parseTimeFromString(str, format string) (time.Time, error) {
|
||||
if parsedTime, err := time.Parse(time.RFC3339, str); err == nil {
|
||||
return parsedTime, nil
|
||||
}
|
||||
|
||||
if parsedTime, err := time.Parse(time.DateTime, str); err == nil {
|
||||
return parsedTime, nil
|
||||
}
|
||||
|
||||
if parsedTime, err := time.Parse("2006-01-02 15:04:05.000000", str); err == nil {
|
||||
return parsedTime, nil
|
||||
}
|
||||
if parsedTime, err := time.Parse(time.RFC3339Nano, str); err == nil {
|
||||
return parsedTime, nil
|
||||
}
|
||||
|
||||
@@ -48,4 +48,5 @@ type Keys struct {
|
||||
LabelKey string `json:"labelKey" mapstructure:"labelKey"` // 多个用空格分隔
|
||||
TimeKey string `json:"timeKey" mapstructure:"timeKey"`
|
||||
TimeFormat string `json:"timeFormat" mapstructure:"timeFormat"` // not used anymore
|
||||
Offset int `json:"offset" mapstructure:"offset"`
|
||||
}
|
||||
|
||||
304
dskit/victorialogs/victorialogs.go
Normal file
304
dskit/victorialogs/victorialogs.go
Normal file
@@ -0,0 +1,304 @@
|
||||
package victorialogs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type VictoriaLogs struct {
|
||||
VictorialogsAddr string `json:"victorialogs.addr" mapstructure:"victorialogs.addr"`
|
||||
VictorialogsBasic struct {
|
||||
VictorialogsUser string `json:"victorialogs.user" mapstructure:"victorialogs.user"`
|
||||
VictorialogsPass string `json:"victorialogs.password" mapstructure:"victorialogs.password"`
|
||||
IsEncrypt bool `json:"victorialogs.is_encrypt" mapstructure:"victorialogs.is_encrypt"`
|
||||
} `json:"victorialogs.basic" mapstructure:"victorialogs.basic"`
|
||||
VictorialogsTls struct {
|
||||
SkipTlsVerify bool `json:"victorialogs.tls.skip_tls_verify" mapstructure:"victorialogs.tls.skip_tls_verify"`
|
||||
} `json:"victorialogs.tls" mapstructure:"victorialogs.tls"`
|
||||
Headers map[string]string `json:"victorialogs.headers" mapstructure:"victorialogs.headers"`
|
||||
Timeout int64 `json:"victorialogs.timeout" mapstructure:"victorialogs.timeout"` // millis
|
||||
ClusterName string `json:"victorialogs.cluster_name" mapstructure:"victorialogs.cluster_name"`
|
||||
MaxQueryRows int `json:"victorialogs.max_query_rows" mapstructure:"victorialogs.max_query_rows"`
|
||||
|
||||
HTTPClient *http.Client `json:"-" mapstructure:"-"`
|
||||
}
|
||||
|
||||
// LogEntry 日志条目
|
||||
type LogEntry map[string]interface{}
|
||||
|
||||
// PrometheusResponse Prometheus 响应格式
|
||||
type PrometheusResponse struct {
|
||||
Status string `json:"status"`
|
||||
Data PrometheusData `json:"data"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// PrometheusData Prometheus 数据部分
|
||||
type PrometheusData struct {
|
||||
ResultType string `json:"resultType"`
|
||||
Result []PrometheusItem `json:"result"`
|
||||
}
|
||||
|
||||
// PrometheusItem Prometheus 数据项
|
||||
type PrometheusItem struct {
|
||||
Metric map[string]string `json:"metric"`
|
||||
Value []interface{} `json:"value,omitempty"` // [timestamp, value]
|
||||
Values [][]interface{} `json:"values,omitempty"` // [[timestamp, value], ...]
|
||||
}
|
||||
|
||||
// HitsResult hits 查询响应
|
||||
type HitsResult struct {
|
||||
Hits []struct {
|
||||
Total int64 `json:"total"`
|
||||
}
|
||||
}
|
||||
|
||||
// InitHTTPClient 初始化 HTTP 客户端
|
||||
func (vl *VictoriaLogs) InitHTTPClient() error {
|
||||
transport := &http.Transport{
|
||||
MaxIdleConns: 100,
|
||||
MaxIdleConnsPerHost: 10,
|
||||
IdleConnTimeout: 90 * time.Second,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: vl.VictorialogsTls.SkipTlsVerify,
|
||||
},
|
||||
}
|
||||
|
||||
timeout := time.Duration(vl.Timeout) * time.Millisecond
|
||||
if timeout == 0 {
|
||||
timeout = 60 * time.Second
|
||||
}
|
||||
|
||||
vl.HTTPClient = &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query 执行日志查询
|
||||
// GET/POST /select/logsql/query?query=<query>&start=<start>&end=<end>&limit=<limit>
|
||||
func (vl *VictoriaLogs) Query(ctx context.Context, query string, start, end int64, limit int) ([]LogEntry, error) {
|
||||
params := url.Values{}
|
||||
params.Set("query", query)
|
||||
|
||||
if start > 0 {
|
||||
params.Set("start", strconv.FormatInt(start, 10))
|
||||
}
|
||||
if end > 0 {
|
||||
params.Set("end", strconv.FormatInt(end, 10))
|
||||
}
|
||||
if limit > 0 {
|
||||
params.Set("limit", strconv.Itoa(limit))
|
||||
} else {
|
||||
params.Set("limit", strconv.Itoa(vl.MaxQueryRows)) // 默认 1000 条
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s/select/logsql/query", vl.VictorialogsAddr)
|
||||
|
||||
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response body failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("query failed: status=%d, body=%s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// VictoriaLogs returns NDJSON format (one JSON object per line)
|
||||
var logs []LogEntry
|
||||
scanner := bufio.NewScanner(strings.NewReader(string(body)))
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var entry LogEntry
|
||||
if err := json.Unmarshal([]byte(line), &entry); err != nil {
|
||||
return nil, fmt.Errorf("decode log entry failed: %w, line=%s", err, line)
|
||||
}
|
||||
logs = append(logs, entry)
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("scan response failed: %w", err)
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// StatsQuery 执行统计查询(单点时间)
|
||||
// POST /select/logsql/stats_query?query=<query>&time=<time>
|
||||
func (vl *VictoriaLogs) StatsQuery(ctx context.Context, query string, time int64) (*PrometheusResponse, error) {
|
||||
params := url.Values{}
|
||||
params.Set("query", query)
|
||||
|
||||
if time > 0 {
|
||||
params.Set("time", strconv.FormatInt(time, 10))
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s/select/logsql/stats_query", vl.VictorialogsAddr)
|
||||
|
||||
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response body failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("stats query failed: status=%d, body=%s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result PrometheusResponse
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return nil, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
|
||||
}
|
||||
|
||||
if result.Status != "success" {
|
||||
return nil, fmt.Errorf("query failed: %s", result.Error)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// StatsQueryRange 执行统计查询(时间范围)
|
||||
// POST /select/logsql/stats_query_range?query=<query>&start=<start>&end=<end>&step=<step>
|
||||
func (vl *VictoriaLogs) StatsQueryRange(ctx context.Context, query string, start, end int64, step string) (*PrometheusResponse, error) {
|
||||
params := url.Values{}
|
||||
params.Set("query", query)
|
||||
|
||||
if start > 0 {
|
||||
params.Set("start", strconv.FormatInt(start, 10))
|
||||
}
|
||||
if end > 0 {
|
||||
params.Set("end", strconv.FormatInt(end, 10))
|
||||
}
|
||||
if step != "" {
|
||||
params.Set("step", step)
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s/select/logsql/stats_query_range", vl.VictorialogsAddr)
|
||||
|
||||
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response body failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("stats query range failed: status=%d, body=%s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result PrometheusResponse
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return nil, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
|
||||
}
|
||||
|
||||
if result.Status != "success" {
|
||||
return nil, fmt.Errorf("query failed: %s", result.Error)
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// HitsLogs 返回查询命中的日志数量,用于计算 total
|
||||
// POST /select/logsql/hits?query=<query>&start=<start>&end=<end>
|
||||
func (vl *VictoriaLogs) HitsLogs(ctx context.Context, query string, start, end int64) (int64, error) {
|
||||
params := url.Values{}
|
||||
params.Set("query", query)
|
||||
|
||||
if start > 0 {
|
||||
params.Set("start", strconv.FormatInt(start, 10))
|
||||
}
|
||||
if end > 0 {
|
||||
params.Set("end", strconv.FormatInt(end, 10))
|
||||
}
|
||||
|
||||
endpoint := fmt.Sprintf("%s/select/logsql/hits", vl.VictorialogsAddr)
|
||||
|
||||
resp, err := vl.doRequest(ctx, "POST", endpoint, params)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read response body failed: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return 0, fmt.Errorf("hits query failed: status=%d, body=%s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var result HitsResult
|
||||
if err := json.Unmarshal(body, &result); err != nil {
|
||||
return 0, fmt.Errorf("decode response failed: %w, body=%s", err, string(body))
|
||||
}
|
||||
|
||||
if len(result.Hits) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return result.Hits[0].Total, nil
|
||||
}
|
||||
|
||||
// doRequest 执行 HTTP 请求
|
||||
func (vl *VictoriaLogs) doRequest(ctx context.Context, method, endpoint string, params url.Values) (*http.Response, error) {
|
||||
var req *http.Request
|
||||
var err error
|
||||
|
||||
if method == "GET" {
|
||||
fullURL := endpoint
|
||||
if len(params) > 0 {
|
||||
fullURL = fmt.Sprintf("%s?%s", endpoint, params.Encode())
|
||||
}
|
||||
req, err = http.NewRequestWithContext(ctx, method, fullURL, nil)
|
||||
} else {
|
||||
// POST with form data
|
||||
req, err = http.NewRequestWithContext(ctx, method, endpoint, strings.NewReader(params.Encode()))
|
||||
if err == nil {
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request failed: %w", err)
|
||||
}
|
||||
|
||||
if vl.VictorialogsBasic.VictorialogsUser != "" {
|
||||
req.SetBasicAuth(vl.VictorialogsBasic.VictorialogsUser, vl.VictorialogsBasic.VictorialogsPass)
|
||||
}
|
||||
|
||||
// Custom Headers
|
||||
for k, v := range vl.Headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
return vl.HTTPClient.Do(req)
|
||||
}
|
||||
136
dskit/victorialogs/victorialogs_test.go
Normal file
136
dskit/victorialogs/victorialogs_test.go
Normal file
@@ -0,0 +1,136 @@
|
||||
package victorialogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var v = VictoriaLogs{
|
||||
VictorialogsAddr: "http://127.0.0.1:9428",
|
||||
Headers: make(map[string]string),
|
||||
Timeout: 10000, // 10 seconds in milliseconds
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_InitHTTPClient(t *testing.T) {
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
if v.HTTPClient == nil {
|
||||
t.Fatal("HTTPClient should not be nil after initialization")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_Query(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Query logs with basic query
|
||||
now := time.Now().UnixNano()
|
||||
start := now - int64(time.Hour) // 1 hour ago
|
||||
end := now
|
||||
|
||||
logs, err := v.Query(ctx, "*", start, end, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("Query failed: %v", err)
|
||||
}
|
||||
t.Logf("Query returned %d log entries", len(logs))
|
||||
for i, log := range logs {
|
||||
t.Logf("Log[%d]: %v", i, log)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_StatsQuery(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Stats query with count
|
||||
now := time.Now().UnixNano()
|
||||
result, err := v.StatsQuery(ctx, "* | stats count() as total", now)
|
||||
if err != nil {
|
||||
t.Fatalf("StatsQuery failed: %v", err)
|
||||
}
|
||||
t.Logf("StatsQuery result: status=%s, resultType=%s", result.Status, result.Data.ResultType)
|
||||
for i, item := range result.Data.Result {
|
||||
t.Logf("Result[%d]: metric=%v, value=%v", i, item.Metric, item.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_StatsQueryRange(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Stats query range
|
||||
now := time.Now().UnixNano()
|
||||
start := now - int64(time.Hour) // 1 hour ago
|
||||
end := now
|
||||
|
||||
result, err := v.StatsQueryRange(ctx, "* | stats count() as total", start, end, "5m")
|
||||
if err != nil {
|
||||
t.Fatalf("StatsQueryRange failed: %v", err)
|
||||
}
|
||||
t.Logf("StatsQueryRange result: status=%s, resultType=%s", result.Status, result.Data.ResultType)
|
||||
for i, item := range result.Data.Result {
|
||||
t.Logf("Result[%d]: metric=%v, values count=%d", i, item.Metric, len(item.Values))
|
||||
}
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_HitsLogs(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Get total hits count
|
||||
now := time.Now().UnixNano()
|
||||
start := now - int64(time.Hour) // 1 hour ago
|
||||
end := now
|
||||
|
||||
count, err := v.HitsLogs(ctx, "*", start, end)
|
||||
if err != nil {
|
||||
t.Fatalf("HitsLogs failed: %v", err)
|
||||
}
|
||||
t.Logf("HitsLogs total count: %d", count)
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_QueryWithFilter(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Query with a filter condition
|
||||
now := time.Now().UnixNano()
|
||||
start := now - int64(time.Hour)
|
||||
end := now
|
||||
|
||||
logs, err := v.Query(ctx, "_stream:{app=\"test\"}", start, end, 5)
|
||||
if err != nil {
|
||||
t.Fatalf("Query with filter failed: %v", err)
|
||||
}
|
||||
t.Logf("Query with filter returned %d log entries", len(logs))
|
||||
}
|
||||
|
||||
func TestVictoriaLogs_StatsQueryByField(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
if err := v.InitHTTPClient(); err != nil {
|
||||
t.Fatalf("InitHTTPClient failed: %v", err)
|
||||
}
|
||||
|
||||
// Stats query grouped by field
|
||||
now := time.Now().UnixNano()
|
||||
result, err := v.StatsQuery(ctx, "* | stats by (level) count() as cnt", now)
|
||||
if err != nil {
|
||||
t.Fatalf("StatsQuery by field failed: %v", err)
|
||||
}
|
||||
t.Logf("StatsQuery by field result: status=%s", result.Status)
|
||||
for i, item := range result.Data.Result {
|
||||
t.Logf("Result[%d]: metric=%v, value=%v", i, item.Metric, item.Value)
|
||||
}
|
||||
}
|
||||
@@ -33,7 +33,8 @@ const (
|
||||
DORIS = "doris"
|
||||
OPENSEARCH = "opensearch"
|
||||
|
||||
CLICKHOUSE = "ck"
|
||||
CLICKHOUSE = "ck"
|
||||
VICTORIALOGS = "victorialogs"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -1219,7 +1220,8 @@ func (ar *AlertRule) IsInnerRule() bool {
|
||||
ar.Cate == MYSQL ||
|
||||
ar.Cate == POSTGRESQL ||
|
||||
ar.Cate == DORIS ||
|
||||
ar.Cate == OPENSEARCH
|
||||
ar.Cate == OPENSEARCH ||
|
||||
ar.Cate == VICTORIALOGS
|
||||
}
|
||||
|
||||
func (ar *AlertRule) GetRuleType() string {
|
||||
|
||||
@@ -76,6 +76,7 @@ func (bc *BuiltinComponent) Add(ctx *ctx.Context, username string) error {
|
||||
bc.CreatedAt = now
|
||||
bc.UpdatedAt = now
|
||||
bc.CreatedBy = username
|
||||
bc.UpdatedBy = username
|
||||
return Insert(ctx, bc)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
@@ -48,9 +51,70 @@ func EsIndexPatternDel(ctx *ctx.Context, ids []int64) error {
|
||||
if len(ids) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 检查是否有告警规则引用了这些 index pattern
|
||||
for _, id := range ids {
|
||||
alertRules, err := GetAlertRulesByEsIndexPatternId(ctx, id)
|
||||
if err != nil {
|
||||
return errors.WithMessage(err, "failed to check alert rules")
|
||||
}
|
||||
if len(alertRules) > 0 {
|
||||
names := make([]string, 0, len(alertRules))
|
||||
for _, rule := range alertRules {
|
||||
names = append(names, rule.Name)
|
||||
}
|
||||
return errors.Errorf("index pattern(id=%d) is used by alert rules: %s", id, strings.Join(names, ", "))
|
||||
}
|
||||
}
|
||||
|
||||
return DB(ctx).Where("id in ?", ids).Delete(new(EsIndexPattern)).Error
|
||||
}
|
||||
|
||||
// GetAlertRulesByEsIndexPatternId 获取引用了指定 index pattern 的告警规则
|
||||
func GetAlertRulesByEsIndexPatternId(ctx *ctx.Context, indexPatternId int64) ([]*AlertRule, error) {
|
||||
// index_pattern 存储在 rule_config JSON 字段的 queries 数组中
|
||||
// 格式如: {"queries":[{"index_type":"index_pattern","index_pattern":123,...}]}
|
||||
// 先用 LIKE 粗筛,再在代码中精确过滤
|
||||
pattern := fmt.Sprintf(`%%"index_pattern":%d%%`, indexPatternId)
|
||||
|
||||
var candidates []*AlertRule
|
||||
err := DB(ctx).Where("rule_config LIKE ?", pattern).Find(&candidates).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 精确过滤:解析 JSON 检查 index_pattern 字段值是否精确匹配
|
||||
var alertRules []*AlertRule
|
||||
for _, rule := range candidates {
|
||||
if ruleUsesIndexPattern(rule.RuleConfig, indexPatternId) {
|
||||
alertRules = append(alertRules, rule)
|
||||
}
|
||||
}
|
||||
|
||||
return alertRules, nil
|
||||
}
|
||||
|
||||
// ruleUsesIndexPattern 检查告警规则的 rule_config 是否引用了指定的 index_pattern
|
||||
func ruleUsesIndexPattern(ruleConfig string, indexPatternId int64) bool {
|
||||
var config struct {
|
||||
Queries []struct {
|
||||
IndexPattern int64 `json:"index_pattern"`
|
||||
} `json:"queries"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(ruleConfig), &config); err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, query := range config.Queries {
|
||||
if query.IndexPattern == indexPatternId {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (ei *EsIndexPattern) Update(ctx *ctx.Context, eip EsIndexPattern) error {
|
||||
if ei.Name != eip.Name || ei.DatasourceId != eip.DatasourceId {
|
||||
exists, err := EsIndexPatternExists(ctx, ei.Id, eip.DatasourceId, eip.Name)
|
||||
|
||||
@@ -68,7 +68,8 @@ func MigrateTables(db *gorm.DB) error {
|
||||
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
|
||||
&models.MetricFilter{}, &models.NotificationRecord{}, &models.TargetBusiGroup{},
|
||||
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
|
||||
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{}}
|
||||
&models.EventPipeline{}, &models.EmbeddedProduct{}, &models.SourceToken{},
|
||||
&models.SavedView{}, &models.UserViewFavorite{}}
|
||||
|
||||
if isPostgres(db) {
|
||||
dts = append(dts, &models.PostgresBuiltinComponent{})
|
||||
|
||||
@@ -31,7 +31,8 @@ type Webhook struct {
|
||||
RetryCount int `json:"retry_count"`
|
||||
RetryInterval int `json:"retry_interval"`
|
||||
Batch int `json:"batch"`
|
||||
Client *http.Client `json:"-"`
|
||||
|
||||
Client *http.Client `json:"-"`
|
||||
}
|
||||
|
||||
func (w *Webhook) Hash() string {
|
||||
|
||||
174
models/saved_view.go
Normal file
174
models/saved_view.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrSavedViewNameEmpty = errors.New("saved view name is blank")
|
||||
ErrSavedViewPageEmpty = errors.New("saved view page is blank")
|
||||
ErrSavedViewNotFound = errors.New("saved view not found")
|
||||
ErrSavedViewNameDuplicate = errors.New("saved view name already exists in this page")
|
||||
)
|
||||
|
||||
type SavedView struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
Name string `json:"name" gorm:"type:varchar(255);not null"`
|
||||
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
|
||||
Filter string `json:"filter" gorm:"type:text"`
|
||||
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
|
||||
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
|
||||
// 查询时填充的字段
|
||||
IsFavorite bool `json:"is_favorite" gorm:"-"`
|
||||
}
|
||||
|
||||
func (SavedView) TableName() string {
|
||||
return "saved_view"
|
||||
}
|
||||
|
||||
func (sv *SavedView) Verify() error {
|
||||
sv.Name = strings.TrimSpace(sv.Name)
|
||||
if sv.Name == "" {
|
||||
return ErrSavedViewNameEmpty
|
||||
}
|
||||
if sv.Page == "" {
|
||||
return ErrSavedViewPageEmpty
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SavedViewCheckDuplicateName(c *ctx.Context, page, name string, excludeId int64) error {
|
||||
var count int64
|
||||
session := DB(c).Model(&SavedView{}).Where("page = ? AND name = ? AND public_cate = 2", page, name)
|
||||
if excludeId > 0 {
|
||||
session = session.Where("id != ?", excludeId)
|
||||
}
|
||||
if err := session.Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return ErrSavedViewNameDuplicate
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SavedViewAdd(c *ctx.Context, sv *SavedView) error {
|
||||
if err := sv.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复
|
||||
if sv.PublicCate == 2 {
|
||||
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
sv.CreateAt = now
|
||||
sv.UpdateAt = now
|
||||
return Insert(c, sv)
|
||||
}
|
||||
|
||||
func SavedViewUpdate(c *ctx.Context, sv *SavedView, username string) error {
|
||||
if err := sv.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
// 当 PublicCate 为 all(2) 时,检查同一个 page 下 name 是否重复(排除自身)
|
||||
if sv.PublicCate == 2 {
|
||||
if err := SavedViewCheckDuplicateName(c, sv.Page, sv.Name, sv.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sv.UpdateAt = time.Now().Unix()
|
||||
sv.UpdateBy = username
|
||||
return DB(c).Model(sv).Select("name", "filter", "public_cate", "gids", "update_at", "update_by").Updates(sv).Error
|
||||
}
|
||||
|
||||
func SavedViewDel(c *ctx.Context, id int64) error {
|
||||
// 先删除收藏关联
|
||||
if err := DB(c).Where("view_id = ?", id).Delete(&UserViewFavorite{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
return DB(c).Where("id = ?", id).Delete(&SavedView{}).Error
|
||||
}
|
||||
|
||||
func SavedViewGetById(c *ctx.Context, id int64) (*SavedView, error) {
|
||||
var sv SavedView
|
||||
err := DB(c).Where("id = ?", id).First(&sv).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &sv, nil
|
||||
}
|
||||
|
||||
func SavedViewGets(c *ctx.Context, page string) ([]SavedView, error) {
|
||||
var views []SavedView
|
||||
|
||||
session := DB(c).Where("page = ?", page)
|
||||
|
||||
if err := session.Order("update_at DESC").Find(&views).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return views, nil
|
||||
}
|
||||
|
||||
func SavedViewFavoriteGetByUserId(c *ctx.Context, userId int64) (map[int64]bool, error) {
|
||||
var favorites []UserViewFavorite
|
||||
if err := DB(c).Where("user_id = ?", userId).Find(&favorites).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make(map[int64]bool)
|
||||
for _, f := range favorites {
|
||||
result[f.ViewId] = true
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type UserViewFavorite struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
ViewId int64 `json:"view_id" gorm:"index"`
|
||||
UserId int64 `json:"user_id" gorm:"index"`
|
||||
CreateAt int64 `json:"create_at"`
|
||||
}
|
||||
|
||||
func (UserViewFavorite) TableName() string {
|
||||
return "user_view_favorite"
|
||||
}
|
||||
|
||||
func UserViewFavoriteAdd(c *ctx.Context, viewId, userId int64) error {
|
||||
var count int64
|
||||
if err := DB(c).Model(&SavedView{}).Where("id = ?", viewId).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count == 0 {
|
||||
return ErrSavedViewNotFound
|
||||
}
|
||||
|
||||
if err := DB(c).Model(&UserViewFavorite{}).Where("view_id = ? AND user_id = ?", viewId, userId).Count(&count).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if count > 0 {
|
||||
return nil // 已收藏,直接返回成功
|
||||
}
|
||||
|
||||
fav := &UserViewFavorite{
|
||||
ViewId: viewId,
|
||||
UserId: userId,
|
||||
CreateAt: time.Now().Unix(),
|
||||
}
|
||||
return DB(c).Create(fav).Error
|
||||
}
|
||||
|
||||
func UserViewFavoriteDel(c *ctx.Context, viewId, userId int64) error {
|
||||
return DB(c).Where("view_id = ? AND user_id = ?", viewId, userId).Delete(&UserViewFavorite{}).Error
|
||||
}
|
||||
@@ -1,12 +1,14 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
@@ -142,6 +144,42 @@ func (u *User) CheckGroupPermission(ctx *ctx.Context, groupIds []int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// stripInvisibleChars removes invisible Unicode characters from a string
|
||||
// This includes zero-width spaces, control characters, and other invisible chars
|
||||
func stripInvisibleChars(s string) string {
|
||||
return strings.Map(func(r rune) rune {
|
||||
// Keep printable characters and common whitespace (space, tab, newline)
|
||||
if unicode.IsPrint(r) || r == ' ' || r == '\t' || r == '\n' || r == '\r' {
|
||||
return r
|
||||
}
|
||||
// Remove invisible characters
|
||||
return -1
|
||||
}, s)
|
||||
}
|
||||
|
||||
// stripInvisibleCharsFromContacts removes invisible characters from Contacts JSON values
|
||||
func stripInvisibleCharsFromContacts(contacts ormx.JSONObj) ormx.JSONObj {
|
||||
if len(contacts) == 0 {
|
||||
return contacts
|
||||
}
|
||||
|
||||
var contactsMap map[string]string
|
||||
if err := json.Unmarshal(contacts, &contactsMap); err != nil {
|
||||
return contacts
|
||||
}
|
||||
|
||||
for k, v := range contactsMap {
|
||||
contactsMap[k] = stripInvisibleChars(v)
|
||||
}
|
||||
|
||||
result, err := json.Marshal(contactsMap)
|
||||
if err != nil {
|
||||
return contacts
|
||||
}
|
||||
|
||||
return ormx.JSONObj(result)
|
||||
}
|
||||
|
||||
func (u *User) Verify() error {
|
||||
u.Username = strings.TrimSpace(u.Username)
|
||||
|
||||
@@ -165,6 +203,9 @@ func (u *User) Verify() error {
|
||||
return errors.New("Email invalid")
|
||||
}
|
||||
|
||||
// Strip invisible characters from Contacts values
|
||||
u.Contacts = stripInvisibleCharsFromContacts(u.Contacts)
|
||||
|
||||
if u.Phone != "" {
|
||||
return u.EncryptPhone()
|
||||
}
|
||||
|
||||
@@ -201,6 +201,11 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "业务组中仍有自愈脚本",
|
||||
"Some target busigroups still in the BusiGroup": "业务组中仍有监控对象",
|
||||
|
||||
"saved view not found": "保存的视图不存在",
|
||||
"saved view name is blank": "视图名称不能为空",
|
||||
"saved view page is blank": "视图页面不能为空",
|
||||
"saved view name already exists in this page": "该页面下已存在同名的公开视图",
|
||||
|
||||
"---------zh_CN--------": "---------zh_CN--------"
|
||||
},
|
||||
"zh_HK": {
|
||||
@@ -405,6 +410,11 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "業務組中仍有自愈腳本",
|
||||
"Some target busigroups still in the BusiGroup": "業務組中仍有監控對象",
|
||||
|
||||
"saved view not found": "保存的視圖不存在",
|
||||
"saved view name is blank": "視圖名稱不能為空",
|
||||
"saved view page is blank": "視圖頁面不能為空",
|
||||
"saved view name already exists in this page": "該頁面下已存在同名的公開視圖",
|
||||
|
||||
"---------zh_HK--------": "---------zh_HK--------"
|
||||
},
|
||||
"ja_JP": {
|
||||
@@ -606,6 +616,11 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "ビジネスグループにまだ自己回復スクリプトがあります",
|
||||
"Some target busigroups still in the BusiGroup": "ビジネスグループにまだ監視対象があります",
|
||||
|
||||
"saved view not found": "保存されたビューが見つかりません",
|
||||
"saved view name is blank": "ビュー名を空にすることはできません",
|
||||
"saved view page is blank": "ビューページを空にすることはできません",
|
||||
"saved view name already exists in this page": "このページには同名の公開ビューが既に存在します",
|
||||
|
||||
"---------ja_JP--------": "---------ja_JP--------"
|
||||
},
|
||||
"ru_RU": {
|
||||
@@ -807,6 +822,11 @@ var I18N = `{
|
||||
"Some recovery scripts still in the BusiGroup": "В бизнес-группе еще есть скрипты самоисцеления",
|
||||
"Some target busigroups still in the BusiGroup": "В бизнес-группе еще есть объекты мониторинга",
|
||||
|
||||
"saved view not found": "Сохраненный вид не найден",
|
||||
"saved view name is blank": "Название вида не может быть пустым",
|
||||
"saved view page is blank": "Страница вида не может быть пустой",
|
||||
"saved view name already exists in this page": "На этой странице уже существует публичный вид с таким названием",
|
||||
|
||||
"---------ru_RU--------": "---------ru_RU--------"
|
||||
}
|
||||
}`
|
||||
|
||||
@@ -85,15 +85,15 @@ func (pc *PromClientMap) loadFromDatabase() {
|
||||
var internalAddr string
|
||||
for k, v := range ds.SettingsJson {
|
||||
if strings.Contains(k, "write_addr") {
|
||||
writeAddr = v.(string)
|
||||
writeAddr = strings.TrimSpace(v.(string))
|
||||
} else if strings.Contains(k, "internal_addr") && v.(string) != "" {
|
||||
internalAddr = v.(string)
|
||||
internalAddr = strings.TrimSpace(v.(string))
|
||||
}
|
||||
}
|
||||
|
||||
po := PromOption{
|
||||
ClusterName: ds.Name,
|
||||
Url: ds.HTTPJson.Url,
|
||||
Url: strings.TrimSpace(ds.HTTPJson.Url),
|
||||
WriteAddr: writeAddr,
|
||||
BasicAuthUser: ds.AuthJson.BasicAuthUser,
|
||||
BasicAuthPass: ds.AuthJson.BasicAuthPassword,
|
||||
|
||||
Reference in New Issue
Block a user