Compare commits

...

23 Commits

Author SHA1 Message Date
ning
df9ba52e71 network bg label migrate 2024-11-08 10:32:19 +08:00
ning
ba6d2b664d fix build 2024-11-05 20:14:13 +08:00
Yening Qin
2ddcf507f9 update td query (#2267) 2024-11-05 20:01:11 +08:00
Yening Qin
17cc588a9d refactor: update pushgw append label (#2263) 2024-11-04 18:06:15 +08:00
Yening Qin
e9c7eef546 optimize tdentine (#2261) 2024-11-04 17:32:46 +08:00
Yening Qin
7451ad2e23 update default engine name (#2244) 2024-10-28 15:07:17 +08:00
ulricqin
31b3434e87 Update README.md 2024-10-22 14:19:33 +08:00
ning
2576a0f815 fix: edge get all configs 2024-10-21 19:30:13 +08:00
ning
0ac4bc7421 docs: update linux dashboard tpl 2024-10-21 18:07:52 +08:00
ning
95e6ea98f4 refactor: prom client query api add retry 2024-10-21 17:57:31 +08:00
ning
dc60c74c0d docs: update automq dashboard tpl 2024-10-21 16:50:36 +08:00
ning
a15adc196d docs: update linux dashboard tpl 2024-10-21 16:35:53 +08:00
ning
f89ef04e85 refactor: optimize code robustness 2024-10-21 14:54:48 +08:00
Yening Qin
f55cd9b32e feat: config access log in web (#2227) 2024-10-21 12:11:19 +08:00
Xu Bin
305a898f8b feat: alert recover ckeck (#2226) 2024-10-21 12:07:54 +08:00
Yening Qin
60c31d8eb2 feat: support query set opration (#2225) 2024-10-20 21:18:12 +08:00
ning
7da49a8c68 refactor: update go.mod 2024-10-20 14:04:31 +08:00
flashbo
65b1410b09 refactor: support output logs to one file (#2209) 2024-10-20 14:02:44 +08:00
ning
3901671c0e docs: update n9e.sql 2024-10-18 15:24:33 +08:00
Xu Bin
9c02937e81 refactor: alert mute retain (#2223) 2024-10-18 12:08:31 +08:00
flashbo
0a255ee33a fix: unbind bgids when delete target (#2219) 2024-10-16 10:00:08 +08:00
Xu Bin
8dc198b4b1 fix: smtp update (#2213) 2024-10-12 11:37:14 +08:00
Yening Qin
9696f63a71 rename tpl name 2024-10-11 16:23:57 +08:00
39 changed files with 727 additions and 182 deletions

View File

@@ -90,7 +90,7 @@
- 推荐搜索关注夜莺公众号,第一时间获取社区动态:`夜莺监控Nightingale`
- 日常问题交流:
- QQ群730841964
- [加入微信群](https://download.flashcat.cloud/ulric/20241008153952.png),如果二维码过期了,可以联系我(我的微信:`picobyte`)拉群,备注: `夜莺互助群`
- [加入微信群](https://download.flashcat.cloud/ulric/20241022141621.png),如果二维码过期了,可以联系我(我的微信:`picobyte`)拉群,备注: `夜莺互助群`
## 广受关注
[![Stargazers over time](https://api.star-history.com/svg?repos=ccfos/nightingale&type=Date)](https://star-history.com/#ccfos/nightingale&Date)

View File

@@ -62,6 +62,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplsCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
@@ -70,7 +71,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP,
configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
if config.Ibex.Enable {

View File

@@ -5,18 +5,20 @@ import (
"math"
"strings"
"github.com/ccfos/nightingale/v6/models"
"github.com/prometheus/common/model"
)
type AnomalyPoint struct {
Key string `json:"key"`
Labels model.Metric `json:"labels"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Severity int `json:"severity"`
Triggered bool `json:"triggered"`
Query string `json:"query"`
Values string `json:"values"`
Key string `json:"key"`
Labels model.Metric `json:"labels"`
Timestamp int64 `json:"timestamp"`
Value float64 `json:"value"`
Severity int `json:"severity"`
Triggered bool `json:"triggered"`
Query string `json:"query"`
Values string `json:"values"`
RecoverConfig models.RecoverConfig `json:"recover_config"`
}
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"math"
"reflect"
"sort"
"strings"
"time"
@@ -160,13 +161,13 @@ func (arw *AlertRuleWorker) Eval() {
now := time.Now().Unix()
for _, point := range pointsMap {
str := fmt.Sprintf("%v", point.Value)
arw.processor.RecoverSingle(process.Hash(cachedRule.Id, arw.processor.DatasourceId(), point), now, &str)
arw.processor.RecoverSingle(true, process.Hash(cachedRule.Id, arw.processor.DatasourceId(), point), now, &str)
}
} else {
now := time.Now().Unix()
for _, point := range recoverPoints {
str := fmt.Sprintf("%v", point.Value)
arw.processor.RecoverSingle(process.Hash(cachedRule.Id, arw.processor.DatasourceId(), point), now, &str)
arw.processor.RecoverSingle(true, process.Hash(cachedRule.Id, arw.processor.DatasourceId(), point), now, &str)
}
}
@@ -267,7 +268,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
if len(ruleQuery.Queries) > 0 {
seriesStore := make(map[uint64]models.DataResp)
// 将不同查询的 hash 索引分组存放
seriesTagIndexes := make([]map[uint64][]uint64, 0)
seriesTagIndexes := make(map[string]map[uint64][]uint64)
for _, query := range ruleQuery.Queries {
seriesTagIndex := make(map[uint64][]uint64)
@@ -281,7 +282,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
continue
}
series, err := cli.Query(query)
series, err := cli.Query(query,0)
arw.processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
if err != nil {
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
@@ -292,7 +293,13 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
MakeSeriesMap(series, seriesTagIndex, seriesStore)
seriesTagIndexes = append(seriesTagIndexes, seriesTagIndex)
ref, err := GetQueryRef(query)
if err != nil {
logger.Warningf("rule_eval rid:%d query ref error: %v query:%+v", rule.Id, err, query)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
continue
}
seriesTagIndexes[ref] = seriesTagIndex
}
points, recoverPoints = GetAnomalyPoint(rule.Id, ruleQuery, seriesTagIndexes, seriesStore)
@@ -445,7 +452,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
return lst
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes []map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
@@ -459,59 +466,7 @@ func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes
for _, trigger := range ruleQuery.Triggers {
// seriesTagIndex 的 key 仅做分组使用value 为每组 series 的 hash
seriesTagIndex := make(map[uint64][]uint64)
if len(trigger.Joins) == 0 {
// 没有 join 条件,走原逻辑
last := seriesTagIndexes[0]
for i := 1; i < len(seriesTagIndexes); i++ {
last = originalJoin(last, seriesTagIndexes[i])
}
seriesTagIndex = last
} else {
// 有 join 条件,按条件依次合并
if len(seriesTagIndexes) != len(trigger.Joins)+1 {
logger.Errorf("rule_eval rid:%d queries' count: %d not match join condition's count: %d", ruleId, len(seriesTagIndexes), len(trigger.Joins))
continue
}
last := seriesTagIndexes[0]
lastRehashed := rehashSet(last, seriesStore, trigger.Joins[0].On)
for i := range trigger.Joins {
cur := seriesTagIndexes[i+1]
switch trigger.Joins[i].JoinType {
case "original":
last = originalJoin(last, cur)
case "none":
last = noneJoin(last, cur)
case "cartesian":
last = cartesianJoin(last, cur)
case "inner_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(lastRehashed, curRehashed, Inner)
last = flatten(lastRehashed)
case "left_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(lastRehashed, curRehashed, Left)
last = flatten(lastRehashed)
case "right_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(curRehashed, lastRehashed, Right)
last = flatten(lastRehashed)
case "left_exclude":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = exclude(lastRehashed, curRehashed)
last = flatten(lastRehashed)
case "right_exclude":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = exclude(curRehashed, lastRehashed)
last = flatten(lastRehashed)
default:
logger.Warningf("rule_eval rid:%d join type:%s not support", ruleId, trigger.Joins[i].JoinType)
}
}
seriesTagIndex = last
}
seriesTagIndex := ProcessJoins(ruleId, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
sort.Slice(seriesHash, func(i, j int) bool {
@@ -558,23 +513,37 @@ func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes
}
point := common.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
}
if sample.Query != "" {
point.Query = sample.Query
}
// 恢复条件判断经过讨论是只在表达式模式下支持,表达式模式会通过 isTriggered 判断是告警点还是恢复点
// 1. 不设置恢复判断,满足恢复条件产生 recoverPoint 恢复,无数据不产生 anomalyPoint 恢复
// 2. 设置满足条件才恢复,仅可通过产生 recoverPoint 恢复,不能通过不产生 anomalyPoint 恢复
// 3. 设置无数据不恢复,仅可通过产生 recoverPoint 恢复,不产生 anomalyPoint 恢复
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// 对齐原实现 do nothing
case models.RecoverOnCondition:
// 额外判断恢复条件,满足才恢复
fulfill := parser.Calc(trigger.RecoverConfig.RecoverExp, m)
if !fulfill {
continue
}
}
recoverPoints = append(recoverPoints, point)
}
}
@@ -607,7 +576,7 @@ func flatten(rehashed map[uint64][][]uint64) map[uint64][]uint64 {
// [[A3{data_base=2, table=board}B2{data_base=2, table=alert}][A4{data_base=2, table=alert}B2{data_base=2, table=alert}]]
func onJoin(reHashTagIndex1 map[uint64][][]uint64, reHashTagIndex2 map[uint64][][]uint64, joinType JoinType) map[uint64][][]uint64 {
reHashTagIndex := make(map[uint64][][]uint64)
for rehash, _ := range reHashTagIndex1 {
for rehash := range reHashTagIndex1 {
if _, ok := reHashTagIndex2[rehash]; ok {
// 若有 rehash 相同的记录,两两合并
for i1 := range reHashTagIndex1[rehash] {
@@ -650,6 +619,7 @@ func rehashSet(seriesTagIndex1 map[uint64][]uint64, seriesStore map[uint64]model
if !exists {
continue
}
rehash := hash.GetTargetTagHash(series.Metric, on)
if _, ok := reHashTagIndex[rehash]; !ok {
reHashTagIndex[rehash] = make([][]uint64, 0)
@@ -741,3 +711,100 @@ func mergeNewArray(arg ...[]uint64) []uint64 {
}
return res
}
func ProcessJoins(ruleId int64, trigger models.Trigger, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) map[uint64][]uint64 {
last := make(map[uint64][]uint64)
if len(seriesTagIndexes) == 0 {
return last
}
if len(trigger.Joins) == 0 {
idx := 0
for _, seriesTagIndex := range seriesTagIndexes {
if idx == 0 {
last = seriesTagIndex
} else {
last = originalJoin(last, seriesTagIndex)
}
idx++
}
return last
}
// 有 join 条件,按条件依次合并
if len(seriesTagIndexes) < len(trigger.Joins)+1 {
logger.Errorf("rule_eval rid:%d queries' count: %d not match join condition's count: %d", ruleId, len(seriesTagIndexes), len(trigger.Joins))
return nil
}
last = seriesTagIndexes[trigger.JoinRef]
lastRehashed := rehashSet(last, seriesStore, trigger.Joins[0].On)
for i := range trigger.Joins {
cur := seriesTagIndexes[trigger.Joins[i].Ref]
switch trigger.Joins[i].JoinType {
case "original":
last = originalJoin(last, cur)
case "none":
last = noneJoin(last, cur)
case "cartesian":
last = cartesianJoin(last, cur)
case "inner_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(lastRehashed, curRehashed, Inner)
last = flatten(lastRehashed)
case "left_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(lastRehashed, curRehashed, Left)
last = flatten(lastRehashed)
case "right_join":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = onJoin(curRehashed, lastRehashed, Right)
last = flatten(lastRehashed)
case "left_exclude":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = exclude(lastRehashed, curRehashed)
last = flatten(lastRehashed)
case "right_exclude":
curRehashed := rehashSet(cur, seriesStore, trigger.Joins[i].On)
lastRehashed = exclude(curRehashed, lastRehashed)
last = flatten(lastRehashed)
default:
logger.Warningf("rule_eval rid:%d join type:%s not support", ruleId, trigger.Joins[i].JoinType)
}
}
return last
}
func GetQueryRef(query interface{}) (string, error) {
// 首先检查是否为 map
if m, ok := query.(map[string]interface{}); ok {
if ref, exists := m["ref"]; exists {
if refStr, ok := ref.(string); ok {
return refStr, nil
}
return "", fmt.Errorf("ref 字段不是字符串类型")
}
return "", fmt.Errorf("query 中没有找到 ref 字段")
}
// 如果不是 map则按原来的方式处理结构体
v := reflect.ValueOf(query)
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
if v.Kind() != reflect.Struct {
return "", fmt.Errorf("query not a struct or map")
}
refField := v.FieldByName("Ref")
if !refField.IsValid() {
return "", fmt.Errorf("not find ref field")
}
if refField.Kind() != reflect.String {
return "", fmt.Errorf("ref not a string")
}
return refField.String(), nil
}

View File

@@ -170,7 +170,9 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
p.handleEvent(events)
}
p.HandleRecover(alertingKeys, now, inhibit)
if from == "inner" {
p.HandleRecover(alertingKeys, now, inhibit)
}
}
func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, now int64) *models.AlertCurEvent {
@@ -211,6 +213,7 @@ func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, no
event.Severity = anomalyPoint.Severity
event.ExtraConfig = p.rule.ExtraConfigJSON
event.PromQl = anomalyPoint.Query
event.RecoverConfig = anomalyPoint.RecoverConfig
if p.target != "" {
if pt, exist := p.TargetCache.Get(p.target); exist {
@@ -290,7 +293,7 @@ func (p *Processor) HandleRecover(alertingKeys map[string]struct{}, now int64, i
}
hashArr := make([]string, 0, len(alertingKeys))
for hash := range p.fires.GetAll() {
for hash, _ := range p.fires.GetAll() {
if _, has := alertingKeys[hash]; has {
continue
}
@@ -309,7 +312,7 @@ func (p *Processor) HandleRecoverEvent(hashArr []string, now int64, inhibit bool
if !inhibit {
for _, hash := range hashArr {
p.RecoverSingle(hash, now, nil)
p.RecoverSingle(false, hash, now, nil)
}
return
}
@@ -337,11 +340,11 @@ func (p *Processor) HandleRecoverEvent(hashArr []string, now int64, inhibit bool
}
for _, event := range eventMap {
p.RecoverSingle(event.Hash, now, nil)
p.RecoverSingle(false, event.Hash, now, nil)
}
}
func (p *Processor) RecoverSingle(hash string, now int64, value *string, values ...string) {
func (p *Processor) RecoverSingle(byRecover bool, hash string, now int64, value *string, values ...string) {
cachedRule := p.rule
if cachedRule == nil {
return
@@ -367,6 +370,12 @@ func (p *Processor) RecoverSingle(hash string, now int64, value *string, values
}
}
// 如果设置了恢复条件,则不能在此处恢复,必须依靠 recoverPoint 来恢复
if event.RecoverConfig.JudgeType != models.Origin && !byRecover {
logger.Debugf("rule_eval:%s event:%v not recover", p.Key(), event)
return
}
if value != nil {
event.TriggerValue = *value
if len(values) > 0 {

View File

@@ -129,7 +129,7 @@ func (rt *Router) makeEvent(c *gin.Context) {
} else {
for _, vector := range events[i].AnomalyPoints {
readableString := vector.ReadableValue()
go ruleWorker.RecoverSingle(process.Hash(events[i].RuleId, events[i].DatasourceId, vector), vector.Timestamp, &readableString)
go ruleWorker.RecoverSingle(false, process.Hash(events[i].RuleId, events[i].DatasourceId, vector), vector.Timestamp, &readableString)
}
}
}

View File

@@ -123,7 +123,7 @@ func InitEmailSender(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
mailch = make(chan *EmailContext, 100000)
go updateSmtp(ctx, ncc)
smtpConfig = ncc.GetSMTP()
startEmailSender(ctx, smtpConfig)
go startEmailSender(ctx, smtpConfig)
}
func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
@@ -143,6 +143,7 @@ func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
conf := smtp
if conf.Host == "" || conf.Port == 0 {
logger.Warning("SMTP configurations invalid")
<-mailQuit
return
}
logger.Infof("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)

View File

@@ -185,6 +185,11 @@ func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *m
}
func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) {
if storage.Cache == nil {
logger.Warning("event_callback_ibex: redis cache is nil")
return 0, fmt.Errorf("redis cache is nil")
}
hosts := cleanHosts(f.Hosts)
if len(hosts) == 0 {
return 0, fmt.Errorf("arg(hosts) empty")

View File

@@ -48,6 +48,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
cconf.MergeOperationConf()
if config.Alert.Heartbeat.EngineName == "" {
config.Alert.Heartbeat.EngineName = "default"
}
logxClean, err := logx.Init(config.Log)
if err != nil {
return nil, err
@@ -95,6 +99,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
sso := sso.Init(config.Center, ctx, configCache)
promClients := prom.NewPromClient(ctx)
@@ -115,9 +120,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
go func() {
if models.CanMigrateBg(ctx) {
models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
}
}()
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
centerRouter.Config(r)
alertrtRouter.Config(r)

View File

@@ -552,6 +552,7 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/config/:id", rt.configGet)
service.GET("/configs", rt.configsGet)
service.GET("/config", rt.configGetByKey)
service.GET("/all-configs", rt.configGetAll)
service.PUT("/configs", rt.configsPut)
service.POST("/configs", rt.configsPost)
service.DELETE("/configs", rt.configsDel)

View File

@@ -24,6 +24,11 @@ func (rt *Router) configGet(c *gin.Context) {
ginx.NewRender(c).Data(configs, err)
}
func (rt *Router) configGetAll(c *gin.Context) {
config, err := models.ConfigsGetAll(rt.Ctx)
ginx.NewRender(c).Data(config, err)
}
func (rt *Router) configGetByKey(c *gin.Context) {
config, err := models.ConfigsGet(rt.Ctx, ginx.QueryStr(c, "key"))
ginx.NewRender(c).Data(config, err)

View File

@@ -45,6 +45,10 @@ func (rt *Router) statistic(c *gin.Context) {
statistics, err = models.ConfigsUserVariableStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
case "cval":
statistics, err = models.ConfigCvalStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
default:
ginx.Bomb(http.StatusBadRequest, "invalid name")
}

View File

@@ -50,7 +50,8 @@ func (rt *Router) alertMuteGets(c *gin.Context) {
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
bgid := ginx.QueryInt64(c, "bgid", -1)
query := ginx.QueryStr(c, "query", "")
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, query)
disabled := ginx.QueryInt(c, "disabled", -1)
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, disabled, query)
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -82,7 +82,7 @@ func (rt *Router) QueryData(c *gin.Context) {
var err error
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
for _, q := range f.Querys {
datas, err := tdClient.Query(q)
datas, err := tdClient.Query(q, 0)
ginx.Dangerous(err)
resp = append(resp, datas...)
}

View File

@@ -52,11 +52,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
idents := idents.New(ctx, redis)
metas := metas.New(redis)
writers := writer.NewWriters(config.Pushgw)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
pushgwRouter.Config(r)
if !config.Alert.Disable {

View File

@@ -363,6 +363,7 @@ CREATE TABLE `target` (
`ident` varchar(191) not null comment 'target id',
`note` varchar(255) not null default '' comment 'append to alert event as field',
`tags` varchar(512) not null default '' comment 'append to series data as tags, split by space, append external space at suffix',
`host_tags` varchar(512) not null default '' comment 'append to series data as tags, split by space, append external space at suffix',
`host_ip` varchar(15) default '' COMMENT 'IPv4 string',
`agent_version` varchar(255) default '' COMMENT 'agent version',
`engine_name` varchar(255) default '' COMMENT 'engine_name',
@@ -725,6 +726,15 @@ CREATE TABLE `metric_filter` (
KEY `idx_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `target_busi_group` (
`id` bigint NOT NULL AUTO_INCREMENT,
`target_ident` varchar(191) NOT NULL,
`group_id` bigint NOT NULL,
`update_at` bigint NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `idx_target_group` (`target_ident`,`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `task_meta`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,

2
go.mod
View File

@@ -33,7 +33,7 @@ require (
github.com/redis/go-redis/v9 v9.0.2
github.com/spaolacci/murmur3 v1.1.0
github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.6
github.com/toolkits/pkg v1.3.8
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
golang.org/x/oauth2 v0.10.0
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df

4
go.sum
View File

@@ -294,8 +294,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/toolkits/pkg v1.3.6 h1:47e1amsY6mJmcnF3Y2lIpkJXfoYY2RmgI09PtwdAEMU=
github.com/toolkits/pkg v1.3.6/go.mod h1:M9ecwFGW1vxCTUFM9sr2ZjXSKb04N+1sTQ6SA3RNAIU=
github.com/toolkits/pkg v1.3.8 h1:2yamC20c5mHRtbcGiLY99Lm/2mVitFn6onE8KKvMT1o=
github.com/toolkits/pkg v1.3.8/go.mod h1:M9ecwFGW1vxCTUFM9sr2ZjXSKb04N+1sTQ6SA3RNAIU=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=

View File

Before

Width:  |  Height:  |  Size: 2.8 KiB

After

Width:  |  Height:  |  Size: 2.8 KiB

View File

@@ -584,7 +584,7 @@
"links": [
{
"title": "下钻",
"url": "/dashboards/automq-group-metrics?TSDB=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026group_id=${__field.labels.consumer_group}\u0026partition=all\u0026topic=${__field.labels.topic}"
"url": "/built-in-components/dashboard/detail?__uuid__=1717556327172992000&TSDB=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026group_id=${__field.labels.consumer_group}\u0026partition=all\u0026topic=${__field.labels.topic}"
}
],
"showHeader": true
@@ -669,7 +669,7 @@
"links": [
{
"title": "下钻",
"url": "/dashboards/automq-topic-metrics?TSDB=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026topic=${__field.labels.topic}"
"url": "/built-in-components/dashboard/detail?__uuid__=1717556327174664000&TSDB=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026topic=${__field.labels.topic}"
}
],
"showHeader": true
@@ -781,7 +781,7 @@
"links": [
{
"title": "下钻",
"url": "/dashboards/automq-broker-metrics?DS_PROMETHEUS=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026node_id=${__field.labels.instance}"
"url": "/built-in-components/dashboard/detail?__uuid__=1717556327159415000&DS_PROMETHEUS=${DS_PROMETHEUS}\u0026cluster_id=${cluster_id}\u0026node_id=${__field.labels.instance}"
}
],
"showHeader": true

View File

@@ -1,13 +1,6 @@
{
"id": 0,
"group_id": 0,
"name": "机器台账表格视图",
"ident": "",
"tags": "",
"create_at": 0,
"create_by": "",
"update_at": 0,
"update_by": "",
"configs": {
"links": [
{
@@ -28,7 +21,7 @@
"colorRange": [
"thresholds"
],
"detailUrl": "/dashboards-built-in/detail?__built-in-cate=Linux\u0026__built-in-name=Linux%20Host%20by%20Categraf%20v2\u0026ident=${__field.labels.ident}",
"detailUrl": "/built-in-components/dashboard/detail?__uuid__=1717556327744505000&ident=${__field.labels.ident}",
"textMode": "valueAndName",
"valueField": "Value"
},
@@ -98,7 +91,7 @@
"colorRange": [
"thresholds"
],
"detailUrl": "/dashboards-built-in/detail?__built-in-cate=Linux\u0026__built-in-name=Linux%20Host%20by%20Categraf%20v2\u0026ident=${__field.labels.ident}",
"detailUrl": "/built-in-components/dashboard/detail?__uuid__=1717556327744505000&ident=${__field.labels.ident}",
"textMode": "valueAndName",
"valueField": "Value"
},
@@ -171,13 +164,16 @@
"linkMode": "appendLinkColumn",
"links": [
{
"targetBlank": true,
"title": "详情",
"url": "/dashboards-built-in/detail?__built-in-cate=Linux\u0026__built-in-name=Linux%20Host%20by%20Categraf%20v2\u0026ident=${__field.labels.ident}"
"url": "/built-in-components/dashboard/detail?__uuid__=1717556327744505000&ident=${__field.labels.ident}"
}
],
"nowrap": false,
"showHeader": true,
"sortColumn": "ident",
"sortOrder": "ascend"
"sortOrder": "ascend",
"tableLayout": "fixed"
},
"datasourceCate": "prometheus",
"datasourceValue": "${prom}",
@@ -385,10 +381,5 @@
],
"version": "3.0.0"
},
"public": 0,
"public_cate": 0,
"bgids": null,
"built_in": 0,
"hide": 0,
"uuid": 1717556327742611000
}

150
memsto/config_cval_cache.go Normal file
View File

@@ -0,0 +1,150 @@
package memsto
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
type CvalCache struct {
statTotal int64
statLastUpdated int64
ctx *ctx.Context
stats *Stats
mu sync.RWMutex
cvals map[string]string
}
func NewCvalCache(ctx *ctx.Context, stats *Stats) *CvalCache {
cvalCache := &CvalCache{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
stats: stats,
cvals: make(map[string]string),
}
cvalCache.initSyncConfigs()
return cvalCache
}
func (c *CvalCache) initSyncConfigs() {
err := c.syncConfigs()
if err != nil {
log.Fatalln("failed to sync configs:", err)
}
go c.loopSyncConfigs()
}
func (c *CvalCache) loopSyncConfigs() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := c.syncConfigs(); err != nil {
logger.Warning("failed to sync configs:", err)
}
}
}
func (c *CvalCache) syncConfigs() error {
start := time.Now()
stat, err := models.ConfigCvalStatistics(c.ctx)
if err != nil {
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
return errors.WithMessage(err, "failed to call ConfigCvalStatistics")
}
if !c.statChanged(stat.Total, stat.LastUpdated) {
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(0)
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(0)
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "not changed")
return nil
}
cvals, err := models.ConfigsGetAll(c.ctx)
if err != nil {
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to call ConfigsGet")
}
c.Set(cvals, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(float64(ms))
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(float64(len(c.cvals)))
logger.Infof("timer: sync cvals done, cost: %dms", ms)
dumper.PutSyncRecord("cvals", start.Unix(), ms, len(c.cvals), "success")
return nil
}
func (c *CvalCache) statChanged(total int64, updated int64) bool {
if c.statTotal == total && c.statLastUpdated == updated {
return false
}
return true
}
func (c *CvalCache) Set(cvals []*models.Configs, total int64, updated int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.statTotal = total
c.statLastUpdated = updated
for _, cfg := range cvals {
c.cvals[cfg.Ckey] = cfg.Cval
}
}
func (c *CvalCache) Get(ckey string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cvals[ckey]
}
func (c *CvalCache) GetLastUpdateTime() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.statLastUpdated
}
type SiteInfo struct {
PrintBodyPaths []string `json:"print_body_paths"`
PrintAccessLog bool `json:"print_access_log"`
}
func (c *CvalCache) GetSiteInfo() *SiteInfo {
c.mu.RLock()
defer c.mu.RUnlock()
si := SiteInfo{}
if siteInfoStr := c.Get("site_info"); siteInfoStr != "" {
if err := json.Unmarshal([]byte(siteInfoStr), &si); err != nil {
logger.Errorf("Failed to unmarshal site info: %v", err)
}
}
return &si
}
func (c *CvalCache) PrintBodyPaths() map[string]struct{} {
printBodyPaths := c.GetSiteInfo().PrintBodyPaths
pbp := make(map[string]struct{}, len(printBodyPaths))
for _, p := range printBodyPaths {
pbp[p] = struct{}{}
}
return pbp
}
func (c *CvalCache) PrintAccessLog() bool {
return c.GetSiteInfo().PrintAccessLog
}

View File

@@ -68,6 +68,7 @@ type AlertCurEvent struct {
SubRuleId int64 `json:"sub_rule_id" gorm:"-"`
ExtraInfo []string `json:"extra_info" gorm:"-"`
Target *Target `json:"target" gorm:"-"`
RecoverConfig RecoverConfig `json:"recover_config" gorm:"-"`
}
func (e *AlertCurEvent) TableName() string {

View File

@@ -108,7 +108,7 @@ func AlertMuteGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertMu
return lst[0], err
}
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (lst []AlertMute, err error) {
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, disabled int, query string) (lst []AlertMute, err error) {
session := DB(ctx)
if bgid != -1 {
@@ -119,6 +119,14 @@ func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (
session = session.Where("prod in (?)", prods)
}
if disabled != -1 {
if disabled == 0 {
session = session.Where("disabled = 0")
} else {
session = session.Where("disabled = 1")
}
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -287,16 +295,9 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
return s, err
}
// clean expired first
buf := int64(30)
err := DB(ctx).Where("etime < ? and mute_time_type = 0", time.Now().Unix()-buf).Delete(new(AlertMute)).Error
if err != nil {
return nil, err
}
session := DB(ctx).Model(&AlertMute{}).Select("count(*) as total", "max(update_at) as last_updated")
err = session.Find(&stats).Error
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
@@ -308,7 +309,7 @@ func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
// get my cluster's mutes
var lst []*AlertMute
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes")
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes?disabled=0")
if err != nil {
return nil, err
}
@@ -318,7 +319,7 @@ func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
return lst, err
}
session := DB(ctx).Model(&AlertMute{})
session := DB(ctx).Model(&AlertMute{}).Where("disabled = 0")
err := session.Find(&lst).Error
if err != nil {

View File

@@ -128,6 +128,19 @@ type PromRuleConfig struct {
AlgoParams interface{} `json:"algo_params"`
}
type RecoverJudge int
const (
Origin RecoverJudge = 0
RecoverWithoutData RecoverJudge = 1
RecoverOnCondition RecoverJudge = 2
)
type RecoverConfig struct {
JudgeType RecoverJudge `json:"judge_type"`
RecoverExp string `json:"recover_exp"`
}
type HostRuleConfig struct {
Queries []HostQuery `json:"queries"`
Triggers []HostTrigger `json:"triggers"`
@@ -135,8 +148,9 @@ type HostRuleConfig struct {
}
type PromQuery struct {
PromQl string `json:"prom_ql"`
Severity int `json:"severity"`
PromQl string `json:"prom_ql"`
Severity int `json:"severity"`
RecoverConfig RecoverConfig `json:"recover_config"`
}
type HostTrigger struct {
@@ -163,10 +177,13 @@ type Trigger struct {
Duration int `json:"duration,omitempty"`
Percent int `json:"percent,omitempty"`
Joins []Join `json:"joins"`
JoinRef string `json:"join_ref"`
RecoverConfig RecoverConfig `json:"recover_config"`
}
type Join struct {
JoinType string `json:"join_type"`
Ref string `json:"ref"`
On []string `json:"on"`
}

View File

@@ -106,10 +106,8 @@ func InitRSAPassWord(ctx *ctx.Context) (string, error) {
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-in type configs
if !ctx.IsCenter {
if !ctx.IsCenter {
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
var lst []string
@@ -125,6 +123,22 @@ func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-
return "", nil
}
func ConfigsGetAll(ctx *ctx.Context) ([]*Configs, error) { // select built-in type configs
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*Configs](ctx, "/v1/n9e/all-configs")
return lst, err
}
var lst []*Configs
err := DB(ctx).Model(&Configs{}).Select("ckey, cval").
Where("ckey!='' and external=? ", 0).Find(&lst).Error
if err != nil {
return nil, errors.WithMessage(err, "failed to query configs")
}
return lst, nil
}
func ConfigsSet(ctx *ctx.Context, ckey, cval string) error {
return ConfigsSetWithUname(ctx, ckey, cval, "default")
}
@@ -355,3 +369,19 @@ func ConfigUserVariableGetDecryptMap(context *ctx.Context, privateKey []byte, pa
return ret, nil
}
func ConfigCvalStatistics(context *ctx.Context) (*Statistics, error) {
if !context.IsCenter {
return poster.GetByUrls[*Statistics](context, "/v1/n9e/statistic?name=cval")
}
session := DB(context).Model(&Configs{}).Select("count(*) as total",
"max(update_at) as last_updated").Where("ckey!='' and external=? ", 0) // built-in config
var stats []*Statistics
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
return stats[0], nil
}

View File

@@ -28,7 +28,7 @@ func MigrateIbexTables(db *gorm.DB) {
db = db.Set("gorm:table_options", tableOptions)
}
dts := []interface{}{&imodels.TaskMeta{}, &imodels.TaskScheduler{}, &imodels.TaskSchedulerHealth{}, &imodels.TaskHostDoing{}, &imodels.TaskAction{}}
dts := []interface{}{&imodels.TaskMeta{}, &imodels.TaskScheduler{}, &imodels.TaskSchedulerHealth{}, &TaskHostDoing{}, &imodels.TaskAction{}}
for _, dt := range dts {
err := db.AutoMigrate(dt)
if err != nil {
@@ -280,3 +280,15 @@ type BuiltinPayloads struct {
UUID int64 `json:"uuid" gorm:"type:bigint;not null;index:idx_uuid;comment:'uuid of payload'"`
ComponentID int64 `json:"component_id" gorm:"type:bigint;index:idx_component,sort:asc;not null;default:0;comment:'component_id of payload'"`
}
type TaskHostDoing struct {
Id int64 `gorm:"column:id;index;primaryKey:false"`
Host string `gorm:"column:host;size:128;not null;index"`
Clock int64 `gorm:"column:clock;not null;default:0"`
Action string `gorm:"column:action;size:16;not null"`
AlertTriggered bool `gorm:"-"`
}
func (TaskHostDoing) TableName() string {
return "task_host_doing"
}

View File

@@ -133,6 +133,10 @@ func TargetDel(ctx *ctx.Context, idents []string, deleteHook TargetDeleteHookFun
if txErr != nil {
return txErr
}
txErr = TargetDeleteBgids(ctx, idents)
if txErr != nil {
return txErr
}
return nil
})
}
@@ -559,6 +563,32 @@ func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}
return DB(ctx).Model(m).Updates(fields).Error
}
func CanMigrateBg(ctx *ctx.Context) bool {
// 1.1 检查 target 表是否为空
var cnt int64
if err := DB(ctx).Model(&Target{}).Count(&cnt).Error; err != nil {
log.Println("failed to get target table count, err:", err)
return false
}
if cnt == 0 {
log.Println("target table is empty, skip migration.")
return false
}
// 1.2 判断是否已经完成迁移
var maxGroupId int64
if err := DB(ctx).Model(&Target{}).Select("MAX(group_id)").Scan(&maxGroupId).Error; err != nil {
log.Println("failed to get max group_id from target table, err:", err)
return false
}
if maxGroupId == 0 {
log.Println("migration bgid has been completed.")
return false
}
return true
}
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
// 1. 判断是否已经完成迁移
var maxGroupId int64

View File

@@ -7,6 +7,7 @@ import (
"io"
"net/http"
"os"
"strings"
"time"
"github.com/gin-gonic/gin"
@@ -41,14 +42,21 @@ type LoggerConfig struct {
// Output is a writer where logs are written.
// Optional. Default value is gin.DefaultWriter.
Output io.Writer
PrintBody bool
Output io.Writer
PrintAccessLog func() bool
PrintBodyPaths func() map[string]struct{}
// SkipPaths is a url path array which logs are not written.
// Optional.
SkipPaths []string
}
func (c *LoggerConfig) ContainsPath(path string) bool {
path = strings.Split(path, "?")[0]
_, exist := c.PrintBodyPaths()[path]
return exist
}
// LogFormatter gives the signature of the formatter function passed to LoggerWithFormatter
type LogFormatter func(params LogFormatterParams) string
@@ -255,6 +263,11 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
}
return func(c *gin.Context) {
if !conf.PrintAccessLog() {
c.Next()
return
}
// Start timer
start := time.Now()
path := c.Request.URL.Path
@@ -271,7 +284,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
}
c.Writer = bodyWriter
if conf.PrintBody {
if conf.ContainsPath(c.Request.RequestURI) {
buf, _ := io.ReadAll(c.Request.Body)
rdr1 = io.NopCloser(bytes.NewBuffer(buf))
rdr2 = io.NopCloser(bytes.NewBuffer(buf))
@@ -309,7 +322,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
// fmt.Fprint(out, formatter(param))
logger.Info(formatter(param))
if conf.PrintBody {
if conf.ContainsPath(c.Request.RequestURI) {
respBody := readBody(bytes.NewReader(bodyWriter.body.Bytes()), c.Writer.Header().Get("Content-Encoding"))
reqBody := readBody(rdr1, c.Request.Header.Get("Content-Encoding"))
logger.Debugf("path:%s req body:%s resp:%s", path, reqBody, respBody)

View File

@@ -70,10 +70,12 @@ type JWTAuth struct {
RedisKeyPrefix string
}
func GinEngine(mode string, cfg Config) *gin.Engine {
func GinEngine(mode string, cfg Config, printBodyPaths func() map[string]struct{},
printAccessLog func() bool) *gin.Engine {
gin.SetMode(mode)
loggerMid := aop.Logger(aop.LoggerConfig{PrintBody: cfg.PrintBody})
loggerMid := aop.Logger(aop.LoggerConfig{PrintAccessLog: printAccessLog,
PrintBodyPaths: printBodyPaths})
recoveryMid := aop.Recovery()
if strings.ToLower(mode) == "release" {
@@ -84,10 +86,7 @@ func GinEngine(mode string, cfg Config) *gin.Engine {
r.Use(recoveryMid)
// whether print access log
if cfg.PrintAccessLog {
r.Use(loggerMid)
}
r.Use(loggerMid)
if cfg.PProf {
pprof.Register(r, "/api/debug/pprof")

View File

@@ -8,12 +8,13 @@ import (
)
type Config struct {
Dir string
Level string
Output string
KeepHours uint
RotateNum int
RotateSize uint64
Dir string
Level string
Output string
KeepHours uint
RotateNum int
RotateSize uint64
OutputToOneFile bool
}
func Init(c Config) (func(), error) {
@@ -35,6 +36,7 @@ func Init(c Config) (func(), error) {
} else {
return nil, errors.New("KeepHours and Rotatenum both are 0")
}
lb.OutputToOneFile(c.OutputToOneFile)
logger.SetLogging(c.Level, lb)
}

View File

@@ -208,7 +208,7 @@ func (s *SsoClient) exchangeUser(code string) (*CallbackOutput, error) {
if err != nil {
return nil, fmt.Errorf("failed to exchange token: %s", err)
}
userInfo, err := s.getUserInfo(s.UserInfoAddr, oauth2Token.AccessToken, s.TranTokenMethod)
userInfo, err := s.getUserInfo(s.Config.ClientID, s.UserInfoAddr, oauth2Token.AccessToken, s.TranTokenMethod)
if err != nil {
logger.Errorf("failed to get user info: %s", err)
return nil, fmt.Errorf("failed to get user info: %s", err)
@@ -223,10 +223,10 @@ func (s *SsoClient) exchangeUser(code string) (*CallbackOutput, error) {
}, nil
}
func (s *SsoClient) getUserInfo(UserInfoAddr, accessToken string, TranTokenMethod string) ([]byte, error) {
func (s *SsoClient) getUserInfo(ClientId, UserInfoAddr, accessToken string, TranTokenMethod string) ([]byte, error) {
var req *http.Request
if TranTokenMethod == "formdata" {
body := bytes.NewBuffer([]byte("access_token=" + accessToken))
body := bytes.NewBuffer([]byte("access_token=" + accessToken + "&client_id=" + ClientId))
r, err := http.NewRequest("POST", UserInfoAddr, body)
if err != nil {
return nil, err
@@ -234,7 +234,7 @@ func (s *SsoClient) getUserInfo(UserInfoAddr, accessToken string, TranTokenMetho
r.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req = r
} else if TranTokenMethod == "querystring" {
r, err := http.NewRequest("GET", UserInfoAddr+"?access_token="+accessToken, nil)
r, err := http.NewRequest("GET", UserInfoAddr+"?access_token="+accessToken+"&client_id="+ClientId, nil)
if err != nil {
return nil, err
}
@@ -246,6 +246,7 @@ func (s *SsoClient) getUserInfo(UserInfoAddr, accessToken string, TranTokenMetho
return nil, err
}
r.Header.Add("Authorization", "Bearer "+accessToken)
r.Header.Add("client_id", ClientId)
req = r
}

View File

@@ -80,14 +80,15 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
var dataResp DataResponse[T]
err = json.Unmarshal(body, &dataResp)
if err != nil {
return dat, fmt.Errorf("failed to decode response: %w", err)
return dat, fmt.Errorf("failed to decode:%s response: %w", string(body), err)
}
if dataResp.Err != "" {
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
prettyData, _ := json.Marshal(dataResp.Dat)
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
return dataResp.Dat, nil
}
@@ -176,9 +177,9 @@ func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err e
return t, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
prettyData, _ := json.Marshal(dataResp.Dat)
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
return dataResp.Dat, nil
}
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {

View File

@@ -697,6 +697,18 @@ func (h *httpAPI) LabelValues(ctx context.Context, label string, matchs []string
}
func (h *httpAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error) {
for i := 0; i < 3; i++ {
value, warnings, err := h.query(ctx, query, ts)
if err == nil {
return value, warnings, nil
}
time.Sleep(100 * time.Millisecond)
}
return nil, nil, errors.New("query failed")
}
func (h *httpAPI) query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error) {
u := h.client.URL(epQuery, nil)
q := u.Query()

View File

@@ -48,10 +48,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats, nil)
configCvalCache := memsto.NewCvalCache(ctx, stats)
writers := writer.NewWriters(config.Pushgw)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
rt := router.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
rt.Config(r)

View File

@@ -20,6 +20,12 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
for key, value := range target.TagsMap {
if index, has := labelKeys[key]; has {
// e.g. busigroup=cloud
if _, has := labelKeys[rt.Pushgw.BusiGroupLabelKey]; has {
// busigroup key already exists, skip
continue
}
// overwrite labels
if rt.Pushgw.LabelRewrite {
pt.Labels[index].Value = value

View File

@@ -185,11 +185,23 @@ func (tc *tdengineClient) QueryTable(query string) (APIResponse, error) {
}
defer resp.Body.Close()
// 限制响应体大小为10MB
maxSize := int64(10 * 1024 * 1024) // 10MB
limitedReader := http.MaxBytesReader(nil, resp.Body, maxSize)
if resp.StatusCode != http.StatusOK {
return apiResp, fmt.Errorf("HTTP error, status: %s", resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(&apiResp)
return apiResp, err
err = json.NewDecoder(limitedReader).Decode(&apiResp)
if err != nil {
if strings.Contains(err.Error(), "http: request body too large") {
return apiResp, fmt.Errorf("response body exceeds 10MB limit")
}
return apiResp, err
}
return apiResp, nil
}
func (tc *tdengineClient) QueryLog(query interface{}) (APIResponse, error) {
@@ -245,15 +257,23 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colType, ok := colData[1].(string)
if !ok {
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
if int(t) == 9 { // TIMESTAMP type in v2
tsIdx = colIndex
break
}
case string:
// v3版本直接使用字符串类型
if t == "TIMESTAMP" {
tsIdx = colIndex
break
}
default:
logger.Warningf("unexpected column type: %v", colData[1])
return src
}
if colType == "TIMESTAMP" {
tsIdx = colIndex
break
continue
}
}
@@ -262,15 +282,19 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
}
for i := range src.Data {
ts, ok := src.Data[i][tsIdx].(string)
if !ok {
logger.Warningf("unexpected timestamp type: %v", src.Data[i][tsIdx])
continue
}
var t time.Time
var err error
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", src.Data[i], err)
switch tsVal := src.Data[i][tsIdx].(type) {
case string:
// 尝试解析不同格式的时间字符串
t, err = parseTimeString(tsVal)
if err != nil {
logger.Warningf("parse timestamp string failed: %v, value: %v", err, tsVal)
continue
}
default:
logger.Warningf("unexpected timestamp type: %T, value: %v", tsVal, tsVal)
continue
}
@@ -279,7 +303,73 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
return src
}
func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
func parseTimeString(ts string) (time.Time, error) {
// 尝试不同的时间格式
formats := []string{
// 标准格式
time.Layout, // "01/02 03:04:05PM '06 -0700"
time.ANSIC, // "Mon Jan _2 15:04:05 2006"
time.UnixDate, // "Mon Jan _2 15:04:05 MST 2006"
time.RubyDate, // "Mon Jan 02 15:04:05 -0700 2006"
time.RFC822, // "02 Jan 06 15:04 MST"
time.RFC822Z, // "02 Jan 06 15:04 -0700"
time.RFC850, // "Monday, 02-Jan-06 15:04:05 MST"
time.RFC1123, // "Mon, 02 Jan 2006 15:04:05 MST"
time.RFC1123Z, // "Mon, 02 Jan 2006 15:04:05 -0700"
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
time.RFC3339Nano, // "2006-01-02T15:04:05.999999999Z07:00"
time.Kitchen, // "3:04PM"
// 实用时间戳格式
time.Stamp, // "Jan _2 15:04:05"
time.StampMilli, // "Jan _2 15:04:05.000"
time.StampMicro, // "Jan _2 15:04:05.000000"
time.StampNano, // "Jan _2 15:04:05.000000000"
time.DateTime, // "2006-01-02 15:04:05"
time.DateOnly, // "2006-01-02"
time.TimeOnly, // "15:04:05"
// 常用自定义格式
"2006-01-02T15:04:05", // 无时区的ISO格式
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05Z",
"2006-01-02 15:04:05.999999999", // 纳秒
"2006-01-02 15:04:05.999999", // 微秒
"2006-01-02 15:04:05.999", // 毫秒
"2006/01/02",
"20060102",
"01/02/2006",
"2006年01月02日",
"2006年01月02日 15:04:05",
}
var lastErr error
for _, format := range formats {
t, err := time.Parse(format, ts)
if err == nil {
return t, nil
}
lastErr = err
}
// 尝试解析 Unix 时间戳
if timestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
switch len(ts) {
case 10: // 秒
return time.Unix(timestamp, 0), nil
case 13: // 毫秒
return time.Unix(timestamp/1000, (timestamp%1000)*1000000), nil
case 16: // 微秒
return time.Unix(timestamp/1000000, (timestamp%1000000)*1000), nil
case 19: // 纳秒
return time.Unix(timestamp/1000000000, timestamp%1000000000), nil
}
}
return time.Time{}, fmt.Errorf("failed to parse time with any format: %v", lastErr)
}
func (tc *tdengineClient) Query(query interface{}, delay int) ([]models.DataResp, error) {
b, err := json.Marshal(query)
if err != nil {
return nil, err
@@ -296,9 +386,9 @@ func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
if q.From == "" {
// 2023-09-21T05:37:30.000Z format
to := time.Now().Unix()
to := time.Now().Unix() - int64(delay)
q.To = time.Unix(to, 0).UTC().Format(time.RFC3339)
from := to - q.Interval
from := to - q.Interval - int64(delay)
q.From = time.Unix(from, 0).UTC().Format(time.RFC3339)
}
@@ -368,9 +458,45 @@ func (tc *tdengineClient) GetColumns(database, table string) ([]Column, error) {
return columns, err
}
for _, row := range data.ColumnMeta {
var colType string
switch t := row[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", row[1])
colType = "UNKNOWN"
}
column := Column{
Name: row[0].(string),
Type: row[1].(string),
Type: colType,
Size: int(row[2].(float64)),
}
columns = append(columns, column)
@@ -454,9 +580,44 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colName := colData[0].(string)
colType := colData[1].(string)
var colType string
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", colData[1])
continue
}
switch colType {
case "TIMESTAMP":
tsIdx = colIndex
@@ -470,7 +631,6 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
} else {
metricIdxMap[colName] = colIndex
}
default:
if len(labelMap) > 0 {
if _, ok := labelMap[colName]; !ok {
@@ -505,7 +665,7 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
metric[model.MetricNameLabel] = model.LabelValue(metricName)
// transfer 2022-06-29T05:52:16.603Z to unix timestamp
t, err := time.Parse(time.RFC3339, row[tsIdx].(string))
t, err := parseTimeString(row[tsIdx].(string))
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", row, err)
continue