Compare commits

..

20 Commits

Author SHA1 Message Date
huangjie
bec92fdc60 refactor: feishu sso support default usergroup (#3077) 2026-02-10 17:49:58 +08:00
Yening Qin
671f14092c update list api (#3075) 2026-02-09 20:26:18 +08:00
liufuniu
99d6ba9508 fix: doris datasource equal (#3073) 2026-02-05 17:46:05 +08:00
liufuniu
47f3eae09d refactor: update doris datasource (#3070) 2026-02-05 13:42:32 +08:00
liufuniu
5e89c670a8 doris:add write user (#3066) 2026-02-04 17:27:26 +08:00
ning
e1cc37c753 refactor: recording rule api 2026-02-02 16:29:24 +08:00
ning
2be94f592c refactor: change ident meta mset 2026-02-02 15:14:37 +08:00
ning
5babc4310a refactor: recording rule api 2026-02-02 14:42:02 +08:00
ning
f968fcd593 Merge branch 'release-21' of github.com:ccfos/nightingale into release-21 2026-01-28 10:41:24 +08:00
ning
4dc7035550 brain fix get datasource 2026-01-28 10:28:26 +08:00
huangjie
2a2b46ca7b feishu userid (#3058)
Co-authored-by: jie210 <huangjie@flashcat.com>
2026-01-26 20:20:45 +08:00
ning
ed96ab9d5b optimize drop sample 2026-01-23 15:26:46 +08:00
Yening Qin
2e2bbd6aeb Update workflow (#3051) 2026-01-22 19:45:41 +08:00
ning
c93694a2a9 refactor: update init metrics tpl 2026-01-21 19:45:57 +08:00
ning
cfb8c3b66a refactor: update doris check max rows 2026-01-21 16:03:04 +08:00
ning
cb5e62b7bb fix save workflow execution 2026-01-20 21:28:51 +08:00
yuansheng
ebfde8d6a0 refactor: record_rule support writeback_enabled (#3048) 2026-01-20 19:32:09 +08:00
ning
b4dcaebf83 refactor: update doris check max rows 2026-01-20 16:34:50 +08:00
huangjie
fa491e313a sso add feishu (#3046) 2026-01-19 14:12:38 +08:00
ning
4fe2b5042f refactor: update trigger value 2026-01-14 19:41:32 +08:00
63 changed files with 434 additions and 566 deletions

View File

@@ -844,7 +844,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
}
m["ident"] = target.Ident
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.BeatTime), trigger.Severity))
lst = append(lst, models.NewAnomalyPoint(trigger.Type, m, now, float64(now-target.UpdateAt), trigger.Severity))
}
case "offset":
idents, exists := arw.Processor.TargetsOfAlertRuleCache.Get(arw.Processor.EngineName, arw.Rule.Id)
@@ -873,7 +873,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
continue
}
if target, exists := targetMap[ident]; exists {
if now-target.BeatTime > 120 {
if now-target.UpdateAt > 120 {
// means this target is not a active host, do not check offset
continue
}

View File

@@ -36,6 +36,7 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
}
models.FillUpdateByNicknames(rt.Ctx, ars)
}
ginx.NewRender(c).Data(ars, err)
}
@@ -76,7 +77,6 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
if err == nil {
cache := make(map[int64]*models.UserGroup)
rids := make([]int64, 0, len(ars))
names := make([]string, 0, len(ars))
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
@@ -85,7 +85,6 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
}
rids = append(rids, ars[i].Id)
names = append(names, ars[i].UpdateBy)
}
stime, etime := GetAlertCueEventTimeRange(c)
@@ -96,14 +95,7 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
}
}
users := models.UserMapGet(rt.Ctx, "username in (?)", names)
if users != nil {
for i := 0; i < len(ars); i++ {
if user, exist := users[ars[i].UpdateBy]; exist {
ars[i].UpdateByNickname = user.Nickname
}
}
}
models.FillUpdateByNicknames(rt.Ctx, ars)
}
ginx.NewRender(c).Data(ars, err)
}
@@ -135,6 +127,7 @@ func (rt *Router) alertRulesGetByService(c *gin.Context) {
ars[i].DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ars[i].Cate, ars[i].DatasourceQueries)
}
}
models.FillUpdateByNicknames(rt.Ctx, ars)
}
ginx.NewRender(c).Data(ars, err)
}

View File

@@ -30,6 +30,7 @@ func (rt *Router) alertSubscribeGets(c *gin.Context) {
ginx.Dangerous(lst[i].FillDatasourceIds(rt.Ctx))
ginx.Dangerous(lst[i].DB2FE())
}
models.FillUpdateByNicknames(rt.Ctx, lst)
ginx.NewRender(c).Data(lst, err)
}
@@ -66,6 +67,7 @@ func (rt *Router) alertSubscribeGetsByGids(c *gin.Context) {
ginx.Dangerous(lst[i].FillDatasourceIds(rt.Ctx))
ginx.Dangerous(lst[i].DB2FE())
}
models.FillUpdateByNicknames(rt.Ctx, lst)
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -260,6 +260,9 @@ func (rt *Router) boardGets(c *gin.Context) {
query := ginx.QueryStr(c, "query", "")
boards, err := models.BoardGetsByGroupId(rt.Ctx, bgid, query)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, boards)
}
ginx.NewRender(c).Data(boards, err)
}
@@ -273,6 +276,9 @@ func (rt *Router) publicBoardGets(c *gin.Context) {
ginx.Dangerous(err)
boards, err := models.BoardGets(rt.Ctx, "", "public=1 and (public_cate in (?) or id in (?))", []int64{0, 1}, boardIds)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, boards)
}
ginx.NewRender(c).Data(boards, err)
}
@@ -312,6 +318,7 @@ func (rt *Router) boardGetsByGids(c *gin.Context) {
boards[i].Bgids = ids
}
}
models.FillUpdateByNicknames(rt.Ctx, boards)
ginx.NewRender(c).Data(boards, err)
}

View File

@@ -27,6 +27,8 @@ func (rt *Router) metricFilterGets(c *gin.Context) {
}
}
models.FillUpdateByNicknames(rt.Ctx, arr)
ginx.NewRender(c).Data(arr, err)
}

View File

@@ -119,6 +119,9 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
if len(lst) == 0 {
lst = []models.BusiGroup{}
}
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -15,6 +15,9 @@ func (rt *Router) configsGet(c *gin.Context) {
prefix := ginx.QueryStr(c, "prefix", "")
limit := ginx.QueryInt(c, "limit", 10)
configs, err := models.ConfigsGets(rt.Ctx, prefix, limit, ginx.Offset(c, limit))
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, configs)
}
ginx.NewRender(c).Data(configs, err)
}

View File

@@ -276,7 +276,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
}
err = req.Add(rt.Ctx)
} else {
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default", "weight")
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
}
Render(c, nil, err)

View File

@@ -13,6 +13,7 @@ import (
func (rt *Router) embeddedProductGets(c *gin.Context) {
products, err := models.EmbeddedProductGets(rt.Ctx)
ginx.Dangerous(err)
models.FillUpdateByNicknames(rt.Ctx, products)
// 获取当前用户可访问的Group ID 列表
me := c.MustGet("user").(*models.User)

View File

@@ -69,6 +69,10 @@ func (rt *Router) esIndexPatternGetList(c *gin.Context) {
lst, err = models.EsIndexPatternGets(rt.Ctx, "")
}
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -35,6 +35,7 @@ func (rt *Router) eventPipelinesList(c *gin.Context) {
// 兼容处理:自动填充工作流字段
pipeline.FillWorkflowFields()
}
models.FillUpdateByNicknames(rt.Ctx, pipelines)
gids, err := models.MyGroupIdsMap(rt.Ctx, me.Id)
ginx.Dangerous(err)

View File

@@ -581,7 +581,10 @@ func (rt *Router) loginCallbackFeiShu(c *gin.Context) {
ginx.Dangerous(user.Add(rt.Ctx))
if len(defaultUserGroups) > 0 {
ginx.Dangerous(user.UpdateUserGroup(rt.Ctx, defaultUserGroups))
err = user.AddToUserGroups(rt.Ctx, defaultUserGroups)
if err != nil {
logger.Errorf("sso feishu add user group error %v", ret, err)
}
}
}

View File

@@ -154,6 +154,7 @@ func (rt *Router) messageTemplatesGet(c *gin.Context) {
lst, err := models.MessageTemplatesGetBy(rt.Ctx, notifyChannelIdents)
ginx.Dangerous(err)
models.FillUpdateByNicknames(rt.Ctx, lst)
if me.IsAdmin() {
ginx.NewRender(c).Data(lst, nil)

View File

@@ -22,6 +22,9 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
query := ginx.QueryStr(c, "query", "")
expired := ginx.QueryInt(c, "expired", -1)
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, -1, expired, query)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}
@@ -47,6 +50,9 @@ func (rt *Router) alertMuteGetsByGids(c *gin.Context) {
}
lst, err := models.AlertMuteGetsByBGIds(rt.Ctx, gids)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}
@@ -58,6 +64,9 @@ func (rt *Router) alertMuteGets(c *gin.Context) {
disabled := ginx.QueryInt(c, "disabled", -1)
expired := ginx.QueryInt(c, "expired", -1)
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, disabled, expired, query)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -118,6 +118,9 @@ func (rt *Router) notifyChannelGetBy(c *gin.Context) {
func (rt *Router) notifyChannelsGet(c *gin.Context) {
lst, err := models.NotifyChannelsGet(rt.Ctx, "", nil)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -118,6 +118,7 @@ func (rt *Router) notifyRulesGet(c *gin.Context) {
lst, err := models.NotifyRulesGet(rt.Ctx, "", nil)
ginx.Dangerous(err)
models.FillUpdateByNicknames(rt.Ctx, lst)
if me.IsAdmin() {
ginx.NewRender(c).Data(lst, nil)
return

View File

@@ -25,11 +25,14 @@ func (rt *Router) notifyTplGets(c *gin.Context) {
m[models.EmailSubject] = struct{}{}
lst, err := models.NotifyTplGets(rt.Ctx)
ginx.Dangerous(err)
for i := 0; i < len(lst); i++ {
if _, exists := m[lst[i].Channel]; exists {
lst[i].BuiltIn = true
}
}
models.FillUpdateByNicknames(rt.Ctx, lst)
ginx.NewRender(c).Data(lst, err)
}
@@ -200,6 +203,9 @@ func (rt *Router) messageTemplateGets(c *gin.Context) {
ident := ginx.QueryStr(c, "ident", "")
tpls, err := models.MessageTemplateGets(rt.Ctx, id, name, ident)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, tpls)
}
ginx.NewRender(c).Data(tpls, err)
}

View File

@@ -1,16 +1,13 @@
package router
import (
"context"
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/alert/eval"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
@@ -120,13 +117,10 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
}
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var (
resp []models.DataResp
mu sync.Mutex
wg sync.WaitGroup
errs []error
rCtx = ctx.Request.Context()
)
var resp []models.DataResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Queries {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
@@ -138,17 +132,12 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
logger.Warningf("cluster:%d not exists", f.DatasourceId)
return nil, fmt.Errorf("cluster not exists")
}
vCtx := rCtx
if f.Cate == models.DORIS {
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, err := plug.QueryData(vCtx, query)
data, err := plug.QueryData(ctx.Request.Context(), query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()

View File

@@ -15,6 +15,9 @@ import (
func (rt *Router) recordingRuleGets(c *gin.Context) {
busiGroupId := ginx.UrlParamInt64(c, "id")
ars, err := models.RecordingRuleGets(rt.Ctx, busiGroupId)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, ars)
}
ginx.NewRender(c).Data(ars, err)
}
@@ -39,6 +42,9 @@ func (rt *Router) recordingRuleGetsByGids(c *gin.Context) {
}
ars, err := models.RecordingRuleGetsByBGIds(rt.Ctx, gids)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, ars)
}
ginx.NewRender(c).Data(ars, err)
}

View File

@@ -20,6 +20,7 @@ func (rt *Router) savedViewGets(c *gin.Context) {
ginx.NewRender(c).Data(nil, err)
return
}
models.FillUpdateByNicknames(rt.Ctx, lst)
userGids, err := models.MyGroupIds(rt.Ctx, me.Id)
if err != nil {

View File

@@ -38,16 +38,6 @@ func (rt *Router) targetGetsByHostFilter(c *gin.Context) {
total, err := models.TargetCountByFilter(rt.Ctx, query)
ginx.Dangerous(err)
models.FillTargetsBeatTime(rt.Redis, hosts)
now := time.Now().Unix()
for i := 0; i < len(hosts); i++ {
if now-hosts[i].BeatTime < 60 {
hosts[i].TargetUp = 2
} else if now-hosts[i].BeatTime < 180 {
hosts[i].TargetUp = 1
}
}
ginx.NewRender(c).Data(gin.H{
"list": hosts,
"total": total,
@@ -91,24 +81,9 @@ func (rt *Router) targetGets(c *gin.Context) {
models.BuildTargetWhereWithBgids(bgids),
models.BuildTargetWhereWithDsIds(dsIds),
models.BuildTargetWhereWithQuery(query),
models.BuildTargetWhereWithDowntime(downtime),
models.BuildTargetWhereWithHosts(hosts),
}
// downtime 筛选:从缓存获取心跳时间,选择较小的集合用 IN 或 NOT IN 过滤
if downtime != 0 {
downtimeOpt, hasMatch := rt.downtimeFilter(downtime)
if !hasMatch {
ginx.NewRender(c).Data(gin.H{
"list": []*models.Target{},
"total": 0,
}, nil)
return
}
if downtimeOpt != nil {
options = append(options, downtimeOpt)
}
}
total, err := models.TargetTotal(rt.Ctx, options...)
ginx.Dangerous(err)
@@ -127,17 +102,14 @@ func (rt *Router) targetGets(c *gin.Context) {
now := time.Now()
cache := make(map[int64]*models.BusiGroup)
// 从 Redis 补全 BeatTime
models.FillTargetsBeatTime(rt.Redis, list)
var keys []string
for i := 0; i < len(list); i++ {
ginx.Dangerous(list[i].FillGroup(rt.Ctx, cache))
keys = append(keys, models.WrapIdent(list[i].Ident))
if now.Unix()-list[i].BeatTime < 60 {
if now.Unix()-list[i].UpdateAt < 60 {
list[i].TargetUp = 2
} else if now.Unix()-list[i].BeatTime < 180 {
} else if now.Unix()-list[i].UpdateAt < 180 {
list[i].TargetUp = 1
}
}
@@ -176,43 +148,6 @@ func (rt *Router) targetGets(c *gin.Context) {
}, nil)
}
// downtimeFilter 从缓存获取心跳时间,生成 downtime 筛选条件
// 选择匹配集和非匹配集中较小的一方,用 IN 或 NOT IN 来减少 SQL 参数量
// 返回值:
// - option: 筛选条件nil 表示所有 target 都符合条件(无需过滤)
// - hasMatch: 是否有符合条件的 targetfalse 表示无匹配应返回空结果
func (rt *Router) downtimeFilter(downtime int64) (option models.BuildTargetWhereOption, hasMatch bool) {
now := time.Now().Unix()
targets := rt.TargetCache.GetAll()
var matchIdents, nonMatchIdents []string
for _, target := range targets {
matched := false
if downtime > 0 {
matched = target.BeatTime < now-downtime
} else if downtime < 0 {
matched = target.BeatTime > now+downtime
}
if matched {
matchIdents = append(matchIdents, target.Ident)
} else {
nonMatchIdents = append(nonMatchIdents, target.Ident)
}
}
if len(matchIdents) == 0 {
return nil, false
}
if len(nonMatchIdents) == 0 {
return nil, true
}
if len(matchIdents) <= len(nonMatchIdents) {
return models.BuildTargetWhereWithIdents(matchIdents), true
}
return models.BuildTargetWhereExcludeIdents(nonMatchIdents), true
}
func (rt *Router) targetExtendInfoByIdent(c *gin.Context) {
ident := ginx.QueryStr(c, "ident", "")
key := models.WrapExtendIdent(ident)

View File

@@ -25,6 +25,7 @@ func (rt *Router) taskTplGets(c *gin.Context) {
list, err := models.TaskTplGets(rt.Ctx, []int64{groupId}, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
models.FillUpdateByNicknames(rt.Ctx, list)
ginx.NewRender(c).Data(gin.H{
"total": total,
@@ -60,6 +61,7 @@ func (rt *Router) taskTplGetsByGids(c *gin.Context) {
list, err := models.TaskTplGets(rt.Ctx, gids, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
models.FillUpdateByNicknames(rt.Ctx, list)
ginx.NewRender(c).Data(gin.H{
"total": total,

View File

@@ -27,6 +27,9 @@ func (rt *Router) userGroupGets(c *gin.Context) {
me := c.MustGet("user").(*models.User)
lst, err := me.UserGroups(rt.Ctx, limit, query)
if err == nil {
models.FillUpdateByNicknames(rt.Ctx, lst)
}
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -71,10 +71,7 @@ CREATE TABLE `datasource`
`updated_at` bigint not null default 0,
`updated_by` varchar(64) not null default '',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
-- datasource add weight field
alter table `datasource` add `weight` int not null default 0;
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `builtin_cate` (
`id` bigint unsigned not null auto_increment,

View File

@@ -565,14 +565,6 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
return nil, err
}
// 检查是否有 shard failures有部分数据时仅记录警告继续处理
if shardErr := checkShardFailures(result.Shards, "query_data", searchSourceString); shardErr != nil {
if len(result.Aggregations["ts"]) == 0 {
return nil, shardErr
}
// 有部分数据checkShardFailures 已记录警告,继续处理
}
logger.Debugf("query_data searchSource:%s resp:%s", string(jsonSearchSource), string(result.Aggregations["ts"]))
js, err := simplejson.NewJson(result.Aggregations["ts"])
@@ -610,40 +602,6 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
return items, nil
}
// checkShardFailures 检查 ES 查询结果中的 shard failures返回格式化的错误信息
func checkShardFailures(shards *elastic.ShardsInfo, logPrefix string, queryContext interface{}) error {
if shards == nil || shards.Failed == 0 || len(shards.Failures) == 0 {
return nil
}
var failureReasons []string
for _, failure := range shards.Failures {
reason := ""
if failure.Reason != nil {
if reasonType, ok := failure.Reason["type"].(string); ok {
reason = reasonType
}
if reasonMsg, ok := failure.Reason["reason"].(string); ok {
if reason != "" {
reason += ": " + reasonMsg
} else {
reason = reasonMsg
}
}
}
if reason != "" {
failureReasons = append(failureReasons, fmt.Sprintf("index=%s shard=%d: %s", failure.Index, failure.Shard, reason))
}
}
if len(failureReasons) > 0 {
errMsg := fmt.Sprintf("elasticsearch shard failures (%d/%d failed): %s", shards.Failed, shards.Total, strings.Join(failureReasons, "; "))
logger.Warningf("%s query:%v %s", logPrefix, queryContext, errMsg)
return fmt.Errorf("%s", errMsg)
}
return nil
}
func HitFilter(typ string) bool {
switch typ {
case "keyword", "date", "long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float", "unsigned_long":
@@ -720,27 +678,21 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
} else {
source = source.From(param.P).Sort(param.DateField, param.Ascending)
}
sourceBytes, _ := json.Marshal(source)
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
if err != nil {
logger.Warningf("query_log source:%s error:%v", string(sourceBytes), err)
logger.Warningf("query data error:%v", err)
return nil, 0, err
}
// 检查是否有 shard failures有部分数据时仅记录警告继续处理
if shardErr := checkShardFailures(result.Shards, "query_log", string(sourceBytes)); shardErr != nil {
if len(result.Hits.Hits) == 0 {
return nil, 0, shardErr
}
// 有部分数据checkShardFailures 已记录警告,继续处理
}
total := result.TotalHits()
var ret []interface{}
logger.Debugf("query_log source:%s len:%d total:%d", string(sourceBytes), len(result.Hits.Hits), total)
b, _ := json.Marshal(source)
logger.Debugf("query data result query source:%s len:%d total:%d", string(b), len(result.Hits.Hits), total)
resultBytes, _ := json.Marshal(result)
logger.Debugf("query_log source:%s result:%s", string(sourceBytes), string(resultBytes))
logger.Debugf("query data result query source:%s result:%s", string(b), string(resultBytes))
if strings.HasPrefix(version, "6") {
for i := 0; i < len(result.Hits.Hits); i++ {

View File

@@ -133,5 +133,4 @@ type DatasourceInfo struct {
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
IsDefault bool `json:"is_default"`
Weight int `json:"weight"`
}

View File

@@ -148,7 +148,7 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
}
}
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
Keys: types.Keys{

View File

@@ -738,7 +738,6 @@ CREATE TABLE datasource
http varchar(4096) not null default '',
auth varchar(8192) not null default '',
is_default boolean not null default false,
weight int not null default 0,
created_at bigint not null default 0,
created_by varchar(64) not null default '',
updated_at bigint not null default 0,

View File

@@ -655,7 +655,6 @@ CREATE TABLE `datasource`
`http` varchar(4096) not null default '',
`auth` varchar(8192) not null default '',
`is_default` boolean COMMENT 'is default datasource',
`weight` int not null default 0,
`created_at` bigint not null default 0,
`created_by` varchar(64) not null default '',
`updated_at` bigint not null default 0,

View File

@@ -331,33 +331,3 @@ CREATE TABLE `event_pipeline_execution` (
ALTER TABLE `builtin_metrics` ADD COLUMN `expression_type` varchar(32) NOT NULL DEFAULT 'promql' COMMENT 'expression type: metric_name or promql';
ALTER TABLE `builtin_metrics` ADD COLUMN `metric_type` varchar(191) NOT NULL DEFAULT '' COMMENT 'metric type like counter/gauge';
ALTER TABLE `builtin_metrics` ADD COLUMN `extra_fields` text COMMENT 'custom extra fields';
/* v9 2026-01-16 saved_view */
CREATE TABLE `saved_view` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL COMMENT 'view name',
`page` varchar(64) NOT NULL COMMENT 'page identifier',
`filter` text COMMENT 'filter config (JSON)',
`public_cate` int NOT NULL DEFAULT 0 COMMENT 'public category: 0-self, 1-team, 2-all',
`gids` text COMMENT 'team group ids (JSON)',
`create_at` bigint NOT NULL DEFAULT 0 COMMENT 'create timestamp',
`create_by` varchar(64) NOT NULL DEFAULT '' COMMENT 'creator',
`update_at` bigint NOT NULL DEFAULT 0 COMMENT 'update timestamp',
`update_by` varchar(64) NOT NULL DEFAULT '' COMMENT 'updater',
PRIMARY KEY (`id`),
KEY `idx_page` (`page`),
KEY `idx_create_by` (`create_by`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='saved views for pages';
CREATE TABLE `user_view_favorite` (
`id` bigint NOT NULL AUTO_INCREMENT,
`view_id` bigint NOT NULL COMMENT 'saved view id',
`user_id` bigint NOT NULL COMMENT 'user id',
`create_at` bigint NOT NULL DEFAULT 0 COMMENT 'create timestamp',
PRIMARY KEY (`id`),
KEY `idx_view_id` (`view_id`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='user favorite views';
/* v9 2026-01-20 datasource weight */
ALTER TABLE `datasource` ADD COLUMN `weight` int not null default 0 COMMENT 'weight for sorting';

View File

@@ -589,7 +589,6 @@ CREATE TABLE `datasource`
`http` varchar(4096) not null default '',
`auth` varchar(8192) not null default '',
`is_default` tinyint not null default 0,
`weight` int not null default 0,
`created_at` bigint not null default 0,
`created_by` varchar(64) not null default '',
`updated_at` bigint not null default 0,

View File

@@ -90,7 +90,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
foundDefaultDatasource = true
}
// logger.Debugf("get datasource: %+v", item)
logger.Debugf("get datasource: %+v", item)
ds := datasource.DatasourceInfo{
Id: item.Id,
Name: item.Name,
@@ -104,7 +104,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
AuthJson: item.AuthJson,
Status: item.Status,
IsDefault: item.IsDefault,
Weight: item.Weight,
}
if item.PluginType == "elasticsearch" {
@@ -237,5 +236,5 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}
}
// logger.Debugf("get plugin by type success Ids:%v", ids)
logger.Debugf("get plugin by type success Ids:%v", ids)
}

View File

@@ -10,14 +10,13 @@ const (
TimeseriesAggregationTimestamp = "__ts__"
)
// QueryLogs 查询日志
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 等同于 Query()
return d.Query(ctx, query, true)
return d.Query(ctx, query)
}
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
values, err := d.QueryTimeseries(ctx, query)
if err != nil {

View File

@@ -15,10 +15,6 @@ const (
TimeFieldFormatDateTime = "datetime"
)
type noNeedCheckMaxRowKey struct{}
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
// 不再拼接SQL, 完全信赖用户的输入
type QueryParam struct {
Database string `json:"database"`
@@ -43,7 +39,7 @@ var (
)
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 校验SQL的合法性, 过滤掉 write请求
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
for _, item := range sqlItem {
@@ -52,12 +48,10 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
}
}
if checkMaxRow {
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
@@ -69,12 +63,8 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
// 默认需要检查,除非调用方声明不需要检查
checkMaxRow := true
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
checkMaxRow = false
}
rows, err := d.Query(ctx, query, checkMaxRow)
// 使用 Query 方法执行查询Query方法内部已包含MaxQueryRows检查
rows, err := d.Query(ctx, query)
if err != nil {
return nil, err
}

View File

@@ -27,8 +27,7 @@ type TargetCacheType struct {
redis storage.Redis
sync.RWMutex
targets map[string]*models.Target // key: ident
targetsIndex map[string][]string // key: ip, value: ident list
targets map[string]*models.Target // key: ident
}
func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *TargetCacheType {
@@ -39,7 +38,6 @@ func NewTargetCache(ctx *ctx.Context, stats *Stats, redis storage.Redis) *Target
stats: stats,
redis: redis,
targets: make(map[string]*models.Target),
targetsIndex: make(map[string][]string),
}
tc.SyncTargets()
@@ -53,7 +51,6 @@ func (tc *TargetCacheType) Reset() {
tc.statTotal = -1
tc.statLastUpdated = -1
tc.targets = make(map[string]*models.Target)
tc.targetsIndex = make(map[string][]string)
}
func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
@@ -65,17 +62,8 @@ func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
}
func (tc *TargetCacheType) Set(m map[string]*models.Target, total, lastUpdated int64) {
idx := make(map[string][]string, len(m))
for ident, target := range m {
if _, ok := idx[target.HostIp]; !ok {
idx[target.HostIp] = []string{}
}
idx[target.HostIp] = append(idx[target.HostIp], ident)
}
tc.Lock()
tc.targets = m
tc.targetsIndex = idx
tc.Unlock()
// only one goroutine used, so no need lock
@@ -90,75 +78,6 @@ func (tc *TargetCacheType) Get(ident string) (*models.Target, bool) {
return val, has
}
func (tc *TargetCacheType) GetByIp(ip string) ([]*models.Target, bool) {
tc.RLock()
defer tc.RUnlock()
idents, has := tc.targetsIndex[ip]
if !has {
return nil, false
}
targs := make([]*models.Target, 0, len(idents))
for _, ident := range idents {
if val, has := tc.targets[ident]; has {
targs = append(targs, val)
}
}
return targs, len(targs) > 0
}
func (tc *TargetCacheType) GetAll() []*models.Target {
tc.RLock()
defer tc.RUnlock()
lst := make([]*models.Target, 0, len(tc.targets))
for _, target := range tc.targets {
lst = append(lst, target)
}
return lst
}
// GetAllBeatTime 返回所有 target 的心跳时间 mapkey 为 identvalue 为 BeatTime
func (tc *TargetCacheType) GetAllBeatTime() map[string]int64 {
tc.RLock()
defer tc.RUnlock()
beatTimeMap := make(map[string]int64, len(tc.targets))
for ident, target := range tc.targets {
beatTimeMap[ident] = target.BeatTime
}
return beatTimeMap
}
// refreshBeatTime 从 Redis 刷新缓存中所有 target 的 BeatTime
func (tc *TargetCacheType) refreshBeatTime() {
if tc.redis == nil {
return
}
// 快照 ident 列表,避免持锁访问 Redis
tc.RLock()
idents := make([]string, 0, len(tc.targets))
for ident := range tc.targets {
idents = append(idents, ident)
}
tc.RUnlock()
if len(idents) == 0 {
return
}
beatTimes := models.FetchBeatTimesFromRedis(tc.redis, idents)
if len(beatTimes) == 0 {
return
}
tc.Lock()
for ident, ts := range beatTimes {
if target, ok := tc.targets[ident]; ok {
target.BeatTime = ts
}
}
tc.Unlock()
}
func (tc *TargetCacheType) Gets(idents []string) []*models.Target {
tc.RLock()
defer tc.RUnlock()
@@ -186,7 +105,7 @@ func (tc *TargetCacheType) GetOffsetHost(targets []*models.Target, now, offset i
continue
}
if now-target.BeatTime > 120 {
if now-target.UpdateAt > 120 {
// means this target is not a active host, do not check offset
continue
}
@@ -228,7 +147,6 @@ func (tc *TargetCacheType) syncTargets() error {
}
if !tc.StatChanged(stat.Total, stat.LastUpdated) {
tc.refreshBeatTime()
tc.stats.GaugeCronDuration.WithLabelValues("sync_targets").Set(0)
tc.stats.GaugeSyncNumber.WithLabelValues("sync_targets").Set(0)
dumper.PutSyncRecord("targets", start.Unix(), -1, -1, "not changed")
@@ -252,9 +170,6 @@ func (tc *TargetCacheType) syncTargets() error {
}
}
// 从 Redis 批量获取心跳时间填充 BeatTime
models.FillTargetsBeatTime(tc.redis, lst)
for i := 0; i < len(lst); i++ {
m[lst[i].Ident] = lst[i]
}
@@ -271,18 +186,57 @@ func (tc *TargetCacheType) syncTargets() error {
// get host update time
func (tc *TargetCacheType) GetHostUpdateTime(targets []string) map[string]int64 {
metaMap := make(map[string]int64)
if tc.redis == nil {
return make(map[string]int64)
return metaMap
}
metaMap := models.FetchBeatTimesFromRedis(tc.redis, targets)
num := 0
var keys []string
for i := 0; i < len(targets); i++ {
keys = append(keys, models.WrapIdentUpdateTime(targets[i]))
num++
if num == 100 {
vals := storage.MGet(context.Background(), tc.redis, keys)
for _, value := range vals {
var hostUpdateTime models.HostUpdateTime
if value == nil {
continue
}
err := json.Unmarshal(value, &hostUpdateTime)
if err != nil {
logger.Errorf("failed to unmarshal host meta: %s value:%v", err, value)
continue
}
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
}
keys = keys[:0]
num = 0
}
}
vals := storage.MGet(context.Background(), tc.redis, keys)
for _, value := range vals {
var hostUpdateTime models.HostUpdateTime
if value == nil {
continue
}
err := json.Unmarshal(value, &hostUpdateTime)
if err != nil {
logger.Warningf("failed to unmarshal host err:%v value:%s", err, string(value))
continue
}
metaMap[hostUpdateTime.Ident] = hostUpdateTime.UpdateTime
}
for _, ident := range targets {
if _, ok := metaMap[ident]; !ok {
// if not exists, get from cache
target, exists := tc.Get(ident)
if exists {
metaMap[ident] = target.BeatTime
metaMap[ident] = target.UpdateAt
}
}
}

View File

@@ -192,6 +192,7 @@ type AlertMute struct {
Activated int `json:"activated" gorm:"-"` // 0: not activated, 1: activated
CreateBy string `json:"create_by"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
CreateAt int64 `json:"create_at"`
UpdateAt int64 `json:"update_at"`
ITags []TagFilter `json:"-" gorm:"-"` // inner tags

View File

@@ -509,16 +509,10 @@ func (ar *AlertRule) Verify() error {
ar.AppendTags = strings.TrimSpace(ar.AppendTags)
arr := strings.Fields(ar.AppendTags)
appendTagKeys := make(map[string]struct{})
for i := 0; i < len(arr); i++ {
if !strings.Contains(arr[i], "=") {
return fmt.Errorf("AppendTags(%s) invalid", arr[i])
}
pair := strings.SplitN(arr[i], "=", 2)
if _, exists := appendTagKeys[pair[0]]; exists {
return fmt.Errorf("AppendTags has duplicate key: %s", pair[0])
}
appendTagKeys[pair[0]] = struct{}{}
}
gids := strings.Fields(ar.NotifyGroups)

View File

@@ -45,6 +45,7 @@ type AlertSubscribe struct {
CreateAt int64 `json:"create_at"`
UpdateBy string `json:"update_by"`
UpdateAt int64 `json:"update_at"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
ITags []TagFilter `json:"-" gorm:"-"` // inner tags
BusiGroups ormx.JSONArr `json:"busi_groups"`
IBusiGroups []TagFilter `json:"-" gorm:"-"` // inner busiGroups

View File

@@ -19,22 +19,23 @@ const (
)
type Board struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Name string `json:"name"`
Ident string `json:"ident"`
Tags string `json:"tags"`
Note string `json:"note"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Configs string `json:"configs" gorm:"-"`
Public int `json:"public"` // 0: false, 1: true
PublicCate int `json:"public_cate"` // 0: anonymous, 1: login, 2: busi
Bgids []int64 `json:"bgids" gorm:"-"`
BuiltIn int `json:"built_in"` // 0: false, 1: true
Hide int `json:"hide"` // 0: false, 1: true
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Name string `json:"name"`
Ident string `json:"ident"`
Tags string `json:"tags"`
Note string `json:"note"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
Configs string `json:"configs" gorm:"-"`
Public int `json:"public"` // 0: false, 1: true
PublicCate int `json:"public_cate"` // 0: anonymous, 1: login, 2: busi
Bgids []int64 `json:"bgids" gorm:"-"`
BuiltIn int `json:"built_in"` // 0: false, 1: true
Hide int `json:"hide"` // 0: false, 1: true
}
func (b *Board) TableName() string {

View File

@@ -9,14 +9,15 @@ import (
)
type MetricFilter struct {
ID int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement;comment:'unique identifier'"`
Name string `json:"name" gorm:"type:varchar(191);not null;index:idx_metricfilter_name,sort:asc;comment:'name of metric filter'"`
Configs string `json:"configs" gorm:"type:varchar(4096);not null;comment:'configuration of metric filter'"`
GroupsPerm []GroupPerm `json:"groups_perm" gorm:"type:text;serializer:json;"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0;comment:'create time'"`
CreateBy string `json:"create_by" gorm:"type:varchar(191);not null;default:'';comment:'creator'"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0;comment:'update time'"`
UpdateBy string `json:"update_by" gorm:"type:varchar(191);not null;default:'';comment:'updater'"`
ID int64 `json:"id" gorm:"primaryKey;type:bigint;autoIncrement;comment:'unique identifier'"`
Name string `json:"name" gorm:"type:varchar(191);not null;index:idx_metricfilter_name,sort:asc;comment:'name of metric filter'"`
Configs string `json:"configs" gorm:"type:varchar(4096);not null;comment:'configuration of metric filter'"`
GroupsPerm []GroupPerm `json:"groups_perm" gorm:"type:text;serializer:json;"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0;comment:'create time'"`
CreateBy string `json:"create_by" gorm:"type:varchar(191);not null;default:'';comment:'creator'"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0;comment:'update time'"`
UpdateBy string `json:"update_by" gorm:"type:varchar(191);not null;default:'';comment:'updater'"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
type GroupPerm struct {

View File

@@ -12,16 +12,17 @@ import (
)
type BusiGroup struct {
Id int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
LabelEnable int `json:"label_enable"`
LabelValue string `json:"label_value"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UserGroups []UserGroupWithPermFlag `json:"user_groups" gorm:"-"`
DB *gorm.DB `json:"-" gorm:"-"`
Id int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
LabelEnable int `json:"label_enable"`
LabelValue string `json:"label_value"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
UserGroups []UserGroupWithPermFlag `json:"user_groups" gorm:"-"`
DB *gorm.DB `json:"-" gorm:"-"`
}
func New(db *gorm.DB) *BusiGroup {

View File

@@ -20,16 +20,17 @@ import (
)
type Configs struct { //ckey+external
Id int64 `json:"id" gorm:"primaryKey"`
Ckey string `json:"ckey"` // Before inserting external configs, check if they are already defined as built-in configs.
Cval string `json:"cval"`
Note string `json:"note"`
External int `json:"external"` //Controls frontend list display: 0 hides built-in (default), 1 shows external
Encrypted int `json:"encrypted"` //Indicates whether the value(cval) is encrypted (1 for ciphertext, 0 for plaintext(default))
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id" gorm:"primaryKey"`
Ckey string `json:"ckey"` // Before inserting external configs, check if they are already defined as built-in configs.
Cval string `json:"cval"`
Note string `json:"note"`
External int `json:"external"` //Controls frontend list display: 0 hides built-in (default), 1 shows external
Encrypted int `json:"encrypted"` //Indicates whether the value(cval) is encrypted (1 for ciphertext, 0 for plaintext(default))
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (Configs) TableName() string {

View File

@@ -7,19 +7,20 @@ import (
)
type DashAnnotation struct {
Id int64 `json:"id" gorm:"primaryKey"`
DashboardId int64 `json:"dashboard_id"`
PanelId string `json:"panel_id"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
Description string `json:"description"`
Config string `json:"config"`
TimeStart int64 `json:"time_start"`
TimeEnd int64 `json:"time_end"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id" gorm:"primaryKey"`
DashboardId int64 `json:"dashboard_id"`
PanelId string `json:"panel_id"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
Description string `json:"description"`
Config string `json:"config"`
TimeStart int64 `json:"time_start"`
TimeEnd int64 `json:"time_end"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (da *DashAnnotation) TableName() string {

View File

@@ -45,7 +45,6 @@ type Datasource struct {
CreatedBy string `json:"created_by"`
UpdatedBy string `json:"updated_by"`
IsDefault bool `json:"is_default"`
Weight int `json:"weight"`
Transport *http.Transport `json:"-" gorm:"-"`
ForceSave bool `json:"force_save" gorm:"-"`
}

View File

@@ -14,15 +14,16 @@ import (
)
type EmbeddedProduct struct {
ID int64 `json:"id" gorm:"primaryKey"` // 主键
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
URL string `json:"url" gorm:"column:url;type:varchar(255)"`
IsPrivate bool `json:"is_private" gorm:"column:is_private;type:boolean"`
TeamIDs []int64 `json:"team_ids" gorm:"serializer:json"`
CreateAt int64 `json:"create_at" gorm:"column:create_at;not null;default:0"`
CreateBy string `json:"create_by" gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `json:"update_at" gorm:"column:update_at;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(64);not null;default:''"`
ID int64 `json:"id" gorm:"primaryKey"` // 主键
Name string `json:"name" gorm:"column:name;type:varchar(255)"`
URL string `json:"url" gorm:"column:url;type:varchar(255)"`
IsPrivate bool `json:"is_private" gorm:"column:is_private;type:boolean"`
TeamIDs []int64 `json:"team_ids" gorm:"serializer:json"`
CreateAt int64 `json:"create_at" gorm:"column:create_at;not null;default:0"`
CreateBy string `json:"create_by" gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `json:"update_at" gorm:"column:update_at;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"column:update_by;type:varchar(64);not null;default:''"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (e *EmbeddedProduct) TableName() string {

View File

@@ -24,6 +24,7 @@ type EsIndexPattern struct {
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
CrossClusterEnabled int `json:"cross_cluster_enabled"`
Note string `json:"note"`
}

View File

@@ -32,10 +32,11 @@ type EventPipeline struct {
// 输入参数(工作流级别的配置变量)
Inputs []InputVariable `json:"inputs,omitempty" gorm:"type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
type ProcessorConfig struct {

View File

@@ -31,6 +31,7 @@ type MessageTemplate struct {
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func MessageTemplateStatistics(ctx *ctx.Context) (*Statistics, error) {

View File

@@ -234,7 +234,6 @@ type Target struct {
type Datasource struct {
IsDefault bool `gorm:"column:is_default;type:boolean;comment:is default datasource"`
Identifier string `gorm:"column:identifier;type:varchar(255);default:'';comment:identifier"`
Weight int `gorm:"column:weight;type:int;default:0;comment:weight for sorting"`
}
type Configs struct {

View File

@@ -57,11 +57,12 @@ type NotifyChannelConfig struct {
RequestType string `json:"request_type"` // http, stmp, script, flashduty
RequestConfig *RequestConfig `json:"request_config,omitempty" gorm:"serializer:json"`
Weight int `json:"weight"` // 权重,根据此字段对内置模板进行排序
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Weight int `json:"weight"` // 权重,根据此字段对内置模板进行排序
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (ncc *NotifyChannelConfig) TableName() string {

View File

@@ -24,10 +24,11 @@ type NotifyRule struct {
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
ExtraConfig interface{} `json:"extra_config,omitempty" gorm:"serializer:json"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
type PipelineConfig struct {

View File

@@ -17,15 +17,16 @@ import (
)
type NotifyTpl struct {
Id int64 `json:"id"`
Name string `json:"name"`
Channel string `json:"channel"`
Content string `json:"content"`
BuiltIn bool `json:"built_in" gorm:"-"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id"`
Name string `json:"name"`
Channel string `json:"channel"`
Content string `json:"content"`
BuiltIn bool `json:"built_in" gorm:"-"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (n *NotifyTpl) TableName() string {

View File

@@ -36,6 +36,7 @@ type RecordingRule struct {
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
type QueryConfig struct {

View File

@@ -16,16 +16,17 @@ var (
)
type SavedView struct {
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
Name string `json:"name" gorm:"type:varchar(255);not null"`
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
Filter string `json:"filter" gorm:"type:text"`
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
Id int64 `json:"id" gorm:"primaryKey;autoIncrement"`
Name string `json:"name" gorm:"type:varchar(255);not null"`
Page string `json:"page" gorm:"type:varchar(64);not null;index"`
Filter string `json:"filter" gorm:"type:text"`
PublicCate int `json:"public_cate" gorm:"default:0"` // 0: self, 1: team, 2: all
Gids []int64 `json:"gids" gorm:"column:gids;type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint;not null;default:0"`
CreateBy string `json:"create_by" gorm:"type:varchar(64);index"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint;not null;default:0"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
// 查询时填充的字段
IsFavorite bool `json:"is_favorite" gorm:"-"`

View File

@@ -1,8 +1,6 @@
package models
import (
"context"
"encoding/json"
"log"
"sort"
"strings"
@@ -10,7 +8,6 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/storage"
"golang.org/x/exp/slices"
"github.com/pkg/errors"
@@ -39,7 +36,6 @@ type Target struct {
OS string `json:"os" gorm:"column:os"`
HostTags []string `json:"host_tags" gorm:"serializer:json"`
BeatTime int64 `json:"beat_time" gorm:"-"` // 实时心跳时间,从 Redis 获取
UnixTime int64 `json:"unixtime" gorm:"-"`
Offset int64 `json:"offset" gorm:"-"`
TargetUp float64 `json:"target_up" gorm:"-"`
@@ -101,6 +97,12 @@ func (t *Target) MatchGroupId(gid ...int64) bool {
}
func (t *Target) AfterFind(tx *gorm.DB) (err error) {
delta := time.Now().Unix() - t.UpdateAt
if delta < 60 {
t.TargetUp = 2
} else if delta < 180 {
t.TargetUp = 1
}
t.FillTagsMap()
return
}
@@ -180,24 +182,6 @@ func BuildTargetWhereWithHosts(hosts []string) BuildTargetWhereOption {
}
}
func BuildTargetWhereWithIdents(idents []string) BuildTargetWhereOption {
return func(session *gorm.DB) *gorm.DB {
if len(idents) > 0 {
session = session.Where("ident in (?)", idents)
}
return session
}
}
func BuildTargetWhereExcludeIdents(idents []string) BuildTargetWhereOption {
return func(session *gorm.DB) *gorm.DB {
if len(idents) > 0 {
session = session.Where("ident not in (?)", idents)
}
return session
}
}
func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
return func(session *gorm.DB) *gorm.DB {
if query != "" {
@@ -219,6 +203,17 @@ func BuildTargetWhereWithQuery(query string) BuildTargetWhereOption {
}
}
func BuildTargetWhereWithDowntime(downtime int64) BuildTargetWhereOption {
return func(session *gorm.DB) *gorm.DB {
if downtime > 0 {
session = session.Where("target.update_at < ?", time.Now().Unix()-downtime)
} else if downtime < 0 {
session = session.Where("target.update_at > ?", time.Now().Unix()+downtime)
}
return session
}
}
func buildTargetWhere(ctx *ctx.Context, options ...BuildTargetWhereOption) *gorm.DB {
sub := DB(ctx).Model(&Target{}).Distinct("target.ident")
for _, opt := range options {
@@ -269,6 +264,21 @@ func TargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}) (int6
return Count(session)
}
func MissTargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) ([]*Target, error) {
var lst []*Target
session := TargetFilterQueryBuild(ctx, query, 0, 0)
session = session.Where("update_at < ?", ts)
err := session.Order("ident").Find(&lst).Error
return lst, err
}
func MissTargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) (int64, error) {
session := TargetFilterQueryBuild(ctx, query, 0, 0)
session = session.Where("update_at < ?", ts)
return Count(session)
}
func TargetFilterQueryBuild(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) *gorm.DB {
sub := DB(ctx).Model(&Target{}).Distinct("target.ident").Joins("left join " +
"target_busi_group on target.ident = target_busi_group.target_ident")
@@ -609,66 +619,6 @@ func (t *Target) FillMeta(meta *HostMeta) {
t.RemoteAddr = meta.RemoteAddr
}
// FetchBeatTimesFromRedis 从 Redis 批量获取心跳时间,返回 ident -> updateTime 的映射
func FetchBeatTimesFromRedis(redis storage.Redis, idents []string) map[string]int64 {
result := make(map[string]int64, len(idents))
if redis == nil || len(idents) == 0 {
return result
}
num := 0
var keys []string
for i := 0; i < len(idents); i++ {
keys = append(keys, WrapIdentUpdateTime(idents[i]))
num++
if num == 100 {
fetchBeatTimeBatch(redis, keys, result)
keys = keys[:0]
num = 0
}
}
if len(keys) > 0 {
fetchBeatTimeBatch(redis, keys, result)
}
return result
}
func fetchBeatTimeBatch(redis storage.Redis, keys []string, result map[string]int64) {
vals := storage.MGet(context.Background(), redis, keys)
for _, value := range vals {
if value == nil {
continue
}
var hut HostUpdateTime
if err := json.Unmarshal(value, &hut); err != nil {
logger.Warningf("failed to unmarshal host update time: %v", err)
continue
}
result[hut.Ident] = hut.UpdateTime
}
}
// FillTargetsBeatTime 从 Redis 批量获取心跳时间填充 target.BeatTime
func FillTargetsBeatTime(redis storage.Redis, targets []*Target) {
if len(targets) == 0 {
return
}
idents := make([]string, len(targets))
for i, t := range targets {
idents[i] = t.Ident
}
beatTimes := FetchBeatTimesFromRedis(redis, idents)
for _, t := range targets {
if ts, ok := beatTimes[t.Ident]; ok {
t.BeatTime = ts
}
}
}
func TargetIdents(ctx *ctx.Context, ids []int64) ([]string, error) {
var ret []string

View File

@@ -15,22 +15,23 @@ import (
)
type TaskTpl struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Title string `json:"title"`
Batch int `json:"batch"`
Tolerance int `json:"tolerance"`
Timeout int `json:"timeout"`
Pause string `json:"pause"`
Script string `json:"script"`
Args string `json:"args"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
Account string `json:"account"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Title string `json:"title"`
Batch int `json:"batch"`
Tolerance int `json:"tolerance"`
Timeout int `json:"timeout"`
Pause string `json:"pause"`
Script string `json:"script"`
Args string `json:"args"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
Account string `json:"account"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
}
func (t *TaskTpl) TableName() string {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"os"
"reflect"
"regexp"
"strconv"
"strings"
@@ -315,7 +316,7 @@ func (u *User) UpdatePassword(ctx *ctx.Context, password, updateBy string) error
}).Error
}
func (u *User) UpdateUserGroup(ctx *ctx.Context, userGroupIds []int64) error {
func (u *User) AddToUserGroups(ctx *ctx.Context, userGroupIds []int64) error {
count := len(userGroupIds)
for i := 0; i < count; i++ {
@@ -422,6 +423,80 @@ func UserMapGet(ctx *ctx.Context, where string, args ...interface{}) map[string]
return um
}
// UserNicknameMap returns a deduplicated username -> nickname map.
func UserNicknameMap(ctx *ctx.Context, names []string) map[string]string {
m := make(map[string]string)
if len(names) == 0 {
return m
}
seen := make(map[string]struct{}, len(names))
unique := make([]string, 0, len(names))
for _, name := range names {
if name == "" {
continue
}
if _, ok := seen[name]; ok {
continue
}
seen[name] = struct{}{}
unique = append(unique, name)
}
if len(unique) == 0 {
return m
}
users := UserMapGet(ctx, "username in (?)", unique)
for username, user := range users {
m[username] = user.Nickname
}
return m
}
// FillUpdateByNicknames fills the UpdateByNickname field for each element in items
// by looking up the UpdateBy username. Supports both []T and []*T slices.
func FillUpdateByNicknames[T any](ctx *ctx.Context, items []T) {
if len(items) == 0 {
return
}
elemType := reflect.TypeOf(items).Elem()
isPtr := elemType.Kind() == reflect.Ptr
if isPtr {
elemType = elemType.Elem()
}
updateByField, ok1 := elemType.FieldByName("UpdateBy")
nicknameField, ok2 := elemType.FieldByName("UpdateByNickname")
if !ok1 || !ok2 {
return
}
names := make([]string, 0, len(items))
for i := range items {
v := reflect.ValueOf(&items[i]).Elem()
if isPtr {
if v.IsNil() {
continue
}
v = v.Elem()
}
names = append(names, v.FieldByIndex(updateByField.Index).String())
}
nm := UserNicknameMap(ctx, names)
for i := range items {
v := reflect.ValueOf(&items[i]).Elem()
if isPtr {
if v.IsNil() {
continue
}
v = v.Elem()
}
updateBy := v.FieldByIndex(updateByField.Index).String()
v.FieldByIndex(nicknameField.Index).SetString(nm[updateBy])
}
}
func UserGetByUsername(ctx *ctx.Context, username string) (*User, error) {
return UserGet(ctx, "username=?", username)
}

View File

@@ -12,16 +12,17 @@ import (
)
type UserGroup struct {
Id int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Note string `json:"note"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UserIds []int64 `json:"-" gorm:"-"`
Users []User `json:"users" gorm:"-"`
BusiGroups []*BusiGroup `json:"busi_groups" gorm:"-"`
Id int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Note string `json:"note"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
UpdateByNickname string `json:"update_by_nickname" gorm:"-"`
UserIds []int64 `json:"-" gorm:"-"`
Users []User `json:"users" gorm:"-"`
BusiGroups []*BusiGroup `json:"busi_groups" gorm:"-"`
}
func (ug *UserGroup) TableName() string {

View File

@@ -1,7 +1,7 @@
package cfg
import (
"os"
"io/ioutil"
)
type scanner struct {
@@ -23,6 +23,6 @@ func (s *scanner) Data() []byte {
func (s *scanner) Read(file string) {
if s.err == nil {
s.data, s.err = os.ReadFile(file)
s.data, s.err = ioutil.ReadFile(file)
}
}

View File

@@ -531,13 +531,8 @@ func Printf(format string, value interface{}) string {
switch valType {
case reflect.String:
strValue := value.(string)
// Check if it's a value with unit (contains both digits and non-numeric chars like letters or %)
if isValueWithUnit(strValue) {
return strValue
}
// Try converting string to float
if floatValue, err := strconv.ParseFloat(strValue, 64); err == nil {
if floatValue, err := strconv.ParseFloat(value.(string), 64); err == nil {
return fmt.Sprintf(format, floatValue)
}
return fmt.Sprintf(format, value)
@@ -549,32 +544,6 @@ func Printf(format string, value interface{}) string {
}
}
// isValueWithUnit checks if a string is a numeric value with unit
// e.g., "11.5%", "100MB", "10a" returns true
// e.g., "11", "11.11", "-3.14" returns false
func isValueWithUnit(s string) bool {
if s == "" {
return false
}
hasDigit := false
hasUnit := false
for _, r := range s {
if r >= '0' && r <= '9' {
hasDigit = true
} else if r == '.' || r == '-' || r == '+' {
// These are valid numeric characters, not units
continue
} else {
// Any other character (letters, %, etc.) is considered a unit
hasUnit = true
}
}
return hasDigit && hasUnit
}
func floatToTime(v float64) (*time.Time, error) {
if math.IsNaN(v) || math.IsInf(v, 0) {
return nil, errNaNOrInf

View File

@@ -7,7 +7,7 @@ import (
"regexp"
"strings"
templateT "text/template"
"encoding/base64"
"github.com/toolkits/pkg/logger"
)
@@ -64,16 +64,6 @@ var TemplateFuncMap = template.FuncMap{
"jsonMarshal": JsonMarshal,
"mapDifference": MapDifference,
"tagsMapToStr": TagsMapToStr,
"b64enc": func(s string) string {
return base64.StdEncoding.EncodeToString([]byte(s))
},
"b64dec": func(s string) string {
data, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return s
}
return string(data)
},
}
// NewTemplateFuncMap copy on write for TemplateFuncMap

View File

@@ -106,7 +106,6 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
return nil
}
// 心跳时间只写入 Redis不再写入 MySQL update_at
err := s.updateTargetsUpdateTs(lst, now, s.redis)
if err != nil {
logger.Errorf("update_ts: failed to update targets: %v error: %v", lst, err)
@@ -134,7 +133,12 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
return nil
}
// 新 target 仍需 INSERT 注册到 MySQL
if s.configs.UpdateDBTargetTimestampDisable {
// 如果 mysql 压力太大,关闭更新 db 的操作
return nil
}
// there are some idents not found in db, so insert them
var exists []string
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
@@ -149,9 +153,35 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
}
}
// 从批量更新一批机器的时间戳改成逐台更新是为了避免批量更新时mysql的锁竞争问题
start := time.Now()
duration := time.Since(start).Seconds()
if len(exists) > 0 {
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
wg := sync.WaitGroup{}
for i := 0; i < len(exists); i++ {
sema.Acquire()
wg.Add(1)
go func(ident string) {
defer sema.Release()
defer wg.Done()
s.updateDBTargetTs(ident, now)
}(exists[i])
}
wg.Wait()
}
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
return nil
}
func (s *Set) updateDBTargetTs(ident string, now int64) {
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
if err != nil {
logger.Error("update_target: failed to update target:", ident, "error:", err)
}
}
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
logger.Debugf("update_ts: redis is nil")

View File

@@ -18,6 +18,8 @@ type Pushgw struct {
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
UpdateDBTargetConcurrency int
UpdateDBTargetTimestampDisable bool
PushConcurrency int
UpdateTargetByUrlConcurrency int
@@ -33,6 +35,12 @@ type Pushgw struct {
WriterOpt WriterGlobalOpt
Writers []WriterOptions
KafkaWriters []KafkaWriterOptions
// 预处理的字段,用于快速匹配只有 __name__ 的 DropSample 规则
// key: metric name, value: struct{}
DropMetricNames map[string]struct{}
// 包含多个标签的复杂 DropSample 规则
DropSampleComplex []map[string]string
}
type WriterGlobalOpt struct {
@@ -127,6 +135,10 @@ func (p *Pushgw) PreCheck() {
p.UpdateTargetBatchSize = 20
}
if p.UpdateDBTargetConcurrency <= 0 {
p.UpdateDBTargetConcurrency = 16
}
if p.PushConcurrency <= 0 {
p.PushConcurrency = 16
}