Compare commits

...

29 Commits

Author SHA1 Message Date
Yening Qin
33cb7d5c6c Merge branch 'release-11' into fix-proxy-urls 2024-11-15 16:54:43 +08:00
ning
14e5ea414d code refactor 2024-11-15 16:39:49 +08:00
710leo
9173bf1668 fix: proxy api parse url 2024-11-13 23:11:57 +08:00
Yening Qin
d2ff106ac8 refactor: event notify (#2286) 2024-11-13 19:49:33 +08:00
710leo
d4afdb2b6e refactor: change log 2024-11-06 22:34:30 +08:00
flashbo
2befc8b0f1 refactor: migrate bg label (#2269) 2024-11-06 21:48:29 +08:00
Yening Qin
14fd2eb26d refactor: update tdengine query (#2270) 2024-11-06 20:27:21 +08:00
ning
0a938518d7 refactor: target_busi_group table name 2024-11-06 13:00:35 +08:00
ning
0eed5afa7e refactor: update target_busi_group character 2024-11-05 14:46:41 +08:00
Yening Qin
f82eaf0a1f refactor: optimize tdentine (#2262) 2024-11-04 17:33:18 +08:00
ning
f03278d68d refactor: append tags 2024-11-04 16:43:39 +08:00
shardingHe
7d1e143f60 docs: sync configurations for bind & ldap (#2253)
Co-authored-by: shardingHe <wangzihe@flashcat.cloud>
2024-11-02 16:49:49 +08:00
ning
078a0c7b1c refactor: prom query log 2024-11-01 15:28:23 +08:00
flashbo
d9cac65a18 refactor: improve prom_rule import (#2251) 2024-10-30 14:28:00 +08:00
ning
dd025ca87c refactor: migrate db and host_miss tag append 2024-10-30 14:20:16 +08:00
ning
04734b8940 Merge branch 'main' of github.com:ccfos/nightingale 2024-10-29 12:09:50 +08:00
ning
bf7bcf4196 docs: update notify tpl 2024-10-29 12:09:26 +08:00
ulricqin
16195abb89 Update docker-compose.yaml 2024-10-29 12:08:40 +08:00
ning
3f4891d65d refactor: event queue push 2024-10-28 20:51:21 +08:00
Yening Qin
102549c6a1 refactor: webhook send event (#2248)
Co-authored-by: Xu Bin <140785332+Reditiny@users.noreply.github.com>
2024-10-28 20:33:29 +08:00
Yening Qin
5213b1d7f1 refactor: es update config (#2247)
Co-authored-by: flashbo <36443248+lwb0214@users.noreply.github.com>
2024-10-28 20:32:45 +08:00
Yening Qin
24de97fb1e refactor: update default engine name (#2245) 2024-10-28 15:50:52 +08:00
ning
9c2cf679e0 refactor: center set default engine_name 2024-10-28 13:37:55 +08:00
Yening Qin
2aa4941010 refactor: optimize recover notify(#2242)
Co-authored-by: Xu Bin <140785332+Reditiny@users.noreply.github.com>
2024-10-25 16:53:44 +08:00
flashbo
a812f14442 refactor: record notify for callback (#2231) 2024-10-25 16:50:12 +08:00
flashbo
4fb7e8e2b5 refactor: fill group names in target (#2241) 2024-10-25 16:30:09 +08:00
ulricqin
113ad67104 Update README.md 2024-10-25 12:10:28 +08:00
flashbo
49d843540a refactor: add ExtraInfoMap in alert event (#2240) 2024-10-25 11:03:56 +08:00
Yening Qin
21f0e3310f fix: event relabel when target_label is blank (#2228)
Co-authored-by: Xu Bin <140785332+Reditiny@users.noreply.github.com>
2024-10-24 14:09:41 +08:00
42 changed files with 887 additions and 138 deletions

View File

@@ -38,6 +38,7 @@
- 👉 [文档中心](https://flashcat.cloud/docs/) | [下载中心](https://flashcat.cloud/download/nightingale/)
- ❤️ [报告 Bug](https://github.com/ccfos/nightingale/issues/new?assignees=&labels=&projects=&template=question.yml)
- 为了提供更快速的访问体验,上述文档和下载站点托管于 [FlashcatCloud](https://flashcat.cloud)
- 💡 前后端代码分离,前端代码仓库:[https://github.com/n9e/fe](https://github.com/n9e/fe)
## 功能特点

View File

@@ -139,6 +139,12 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
if rule == nil {
return
}
if e.blockEventNotify(rule, event) {
logger.Infof("block event notify: rule_id:%d event:%+v", rule.Id, event)
return
}
fillUsers(event, e.userCache, e.userGroupCache)
var (
@@ -175,6 +181,25 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
}
}
func (e *Dispatch) blockEventNotify(rule *models.AlertRule, event *models.AlertCurEvent) bool {
ruleType := rule.GetRuleType()
// 若为机器则先看机器是否删除
if ruleType == models.HOST {
host, ok := e.targetCache.Get(event.TagsMap["ident"])
if !ok || host == nil {
return true
}
}
// 恢复通知,检测规则配置是否改变
if event.IsRecovered && event.RuleHash != rule.Hash() {
return true
}
return false
}
func (e *Dispatch) handleSubs(event *models.AlertCurEvent) {
// handle alert subscribes
subscribes := make([]*models.AlertSubscribe, 0)

View File

@@ -3,6 +3,7 @@ package eval
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
@@ -117,21 +118,29 @@ func (arw *AlertRuleWorker) Eval() {
arw.processor.Stats.CounterRuleEval.WithLabelValues().Inc()
typ := cachedRule.GetRuleType()
var anomalyPoints []common.AnomalyPoint
var recoverPoints []common.AnomalyPoint
var (
anomalyPoints []common.AnomalyPoint
recoverPoints []common.AnomalyPoint
err error
)
switch typ {
case models.PROMETHEUS:
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
case models.HOST:
anomalyPoints = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
case models.TDENGINE:
anomalyPoints, recoverPoints = arw.GetTdengineAnomalyPoint(cachedRule, arw.processor.DatasourceId())
anomalyPoints, recoverPoints, err = arw.GetTdengineAnomalyPoint(cachedRule, arw.processor.DatasourceId())
case models.LOKI:
anomalyPoints = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
anomalyPoints, err = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
default:
return
}
if err != nil {
logger.Errorf("rule_eval:%s get anomaly point err:%s", arw.Key(), err.Error())
return
}
if arw.processor == nil {
logger.Warningf("rule_eval:%s processor is nil", arw.Key())
return
@@ -179,7 +188,7 @@ func (arw *AlertRuleWorker) Stop() {
close(arw.quit)
}
func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.AnomalyPoint {
func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
var severity int
@@ -187,13 +196,13 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, err
}
if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, errors.New("rule is nil")
}
arw.inhibit = rule.Inhibit
@@ -224,7 +233,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
logger.Errorf("rule_eval:%s promql:%s, error:%v", arw.Key(), promql, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
continue
return lst, err
}
if len(warnings) > 0 {
@@ -241,10 +250,10 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
}
lst = append(lst, points...)
}
return lst
return lst, nil
}
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint) {
func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId int64) ([]common.AnomalyPoint, []common.AnomalyPoint, error) {
// 获取查询和规则判断条件
points := []common.AnomalyPoint{}
recoverPoints := []common.AnomalyPoint{}
@@ -252,7 +261,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
if ruleConfig == "" {
logger.Warningf("rule_eval:%d promql is blank", rule.Id)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return points, recoverPoints
return points, recoverPoints, errors.New("rule config is nil")
}
var ruleQuery models.RuleQuery
@@ -261,7 +270,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval:%d promql parse error:%s", rule.Id, err.Error())
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId())).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return points, recoverPoints
return points, recoverPoints, err
}
arw.inhibit = ruleQuery.Inhibit
@@ -288,7 +297,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
arw.processor.Stats.CounterQueryDataErrorTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), QUERY_DATA).Inc()
continue
return points, recoverPoints, err
}
// 此条日志很重要,是告警判断的现场值
logger.Debugf("rule_eval rid:%d req:%+v resp:%+v", rule.Id, query, series)
@@ -305,10 +314,10 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
points, recoverPoints = GetAnomalyPoint(rule.Id, ruleQuery, seriesTagIndexes, seriesStore)
}
return points, recoverPoints
return points, recoverPoints, nil
}
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.AnomalyPoint {
func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]common.AnomalyPoint, error) {
var lst []common.AnomalyPoint
var severity int
@@ -316,13 +325,13 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
if err := json.Unmarshal([]byte(ruleConfig), &rule); err != nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:%v", arw.Key(), ruleConfig, err)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, err
}
if rule == nil {
logger.Errorf("rule_eval:%s rule_config:%s, error:rule is nil", arw.Key(), ruleConfig)
arw.processor.Stats.CounterRuleEvalErrorTotal.WithLabelValues(fmt.Sprintf("%v", arw.processor.DatasourceId()), GET_RULE_CONFIG).Inc()
return lst
return lst, errors.New("rule is nil")
}
arw.inhibit = rule.Inhibit
@@ -370,7 +379,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
targets := arw.processor.TargetCache.Gets(missTargets)
for _, target := range targets {
m := make(map[string]string)
target.FillTagsMap()
for k, v := range target.TagsMap {
m[k] = v
}
@@ -417,7 +425,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
m := make(map[string]string)
target, exists := arw.processor.TargetCache.Get(host)
if exists {
target.FillTagsMap()
for k, v := range target.TagsMap {
m[k] = v
}
@@ -449,7 +456,7 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom
}
}
}
return lst
return lst, nil
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]common.AnomalyPoint, []common.AnomalyPoint) {

View File

@@ -138,6 +138,9 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
return
}
// 在 rule 变化之前取到 ruleHash
ruleHash := p.rule.Hash()
p.rule = cachedRule
now := time.Now().Unix()
alertingKeys := map[string]struct{}{}
@@ -145,7 +148,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
// 根据 event 的 tag 将 events 分组,处理告警抑制的情况
eventsMap := make(map[string][]*models.AlertCurEvent)
for _, anomalyPoint := range anomalyPoints {
event := p.BuildEvent(anomalyPoint, from, now)
event := p.BuildEvent(anomalyPoint, from, now, ruleHash)
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
hash := event.Hash
alertingKeys[hash] = struct{}{}
@@ -175,7 +178,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
}
}
func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, now int64) *models.AlertCurEvent {
func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, now int64, ruleHash string) *models.AlertCurEvent {
p.fillTags(anomalyPoint)
p.mayHandleIdent()
hash := Hash(p.rule.Id, p.datasourceId, anomalyPoint)
@@ -214,9 +217,11 @@ func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, no
event.ExtraConfig = p.rule.ExtraConfigJSON
event.PromQl = anomalyPoint.Query
event.RecoverConfig = anomalyPoint.RecoverConfig
event.RuleHash = ruleHash
if p.target != "" {
if pt, exist := p.TargetCache.Get(p.target); exist {
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
event.Target = pt
} else {
logger.Infof("Target[ident: %s] doesn't exist in cache.", p.target)
@@ -538,6 +543,7 @@ func (p *Processor) RecoverAlertCurEventFromDb() {
event.DB2Mem()
target, exists := p.TargetCache.Get(event.TargetIdent)
if exists {
target.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(target.GroupIds)
event.Target = target
}

View File

@@ -129,16 +129,7 @@ func (c *DefaultCallBacker) CallBack(ctx CallBackContext) {
return
}
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
resp, code, err := poster.PostJSON(ctx.CallBackURL, 5*time.Second, event, 3)
if err != nil {
logger.Errorf("event_callback_fail(rule_id=%d url=%s), event:%+v, resp: %s, err: %v, code: %d",
event.RuleId, ctx.CallBackURL, event, string(resp), err, code)
ctx.Stats.AlertNotifyErrorTotal.WithLabelValues("rule_callback").Inc()
} else {
logger.Infof("event_callback_succ(rule_id=%d url=%s), event:%+v, resp: %s, code: %d",
event.RuleId, ctx.CallBackURL, event, string(resp), code)
}
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, event, "callback", ctx.Stats, event)
}
func doSendAndRecord(ctx *ctx.Context, url, token string, body interface{}, channel string,
@@ -195,8 +186,8 @@ func PushCallbackEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.
if queue == nil {
queue = &WebhookQueue{
list: NewSafeListLimited(QueueMaxSize),
closeCh: make(chan struct{}),
eventQueue: NewSafeEventQueue(QueueMaxSize),
closeCh: make(chan struct{}),
}
CallbackEventQueueLock.Lock()
@@ -206,8 +197,8 @@ func PushCallbackEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}
succ := queue.list.PushFront(event)
succ := queue.eventQueue.Push(event)
if !succ {
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.eventQueue.Len(), event)
}
}

View File

@@ -99,8 +99,6 @@ func (ds *DingtalkSender) CallBack(ctx CallBackContext) {
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body,
"callback", ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
// extract urls and ats from Users

View File

@@ -202,7 +202,11 @@ func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
}
for _, to := range m.mail.GetHeader("To") {
NotifyRecord(ctx, m.event, models.Email, to, "", err)
msg := ""
if err == nil {
msg = "ok"
}
NotifyRecord(ctx, m.event, models.Email, to, msg, err)
}
size++

View File

@@ -56,7 +56,6 @@ func (fs *FeishuSender) CallBack(ctx CallBackContext) {
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
func (fs *FeishuSender) Send(ctx MessageContext) {

View File

@@ -29,28 +29,28 @@ func (lk *LarkSender) CallBack(ctx CallBackContext) {
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
func (lk *LarkSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
}
urls := lk.extract(ctx.Users)
urls, tokens := lk.extract(ctx.Users)
message := BuildTplMessage(models.Lark, lk.tpl, ctx.Events)
for _, url := range urls {
for i, url := range urls {
body := feishu{
Msgtype: "text",
Content: feishuContent{
Text: message,
},
}
doSend(url, body, models.Lark, ctx.Stats)
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.Lark, ctx.Stats, ctx.Events[0])
}
}
func (lk *LarkSender) extract(users []*models.User) []string {
func (lk *LarkSender) extract(users []*models.User) ([]string, []string) {
urls := make([]string, 0, len(users))
tokens := make([]string, 0, len(users))
for _, user := range users {
if token, has := user.ExtractToken(models.Lark); has {
@@ -59,7 +59,8 @@ func (lk *LarkSender) extract(users []*models.User) []string {
url = "https://open.larksuite.com/open-apis/bot/v2/hook/" + token
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls
return urls, tokens
}

View File

@@ -64,7 +64,7 @@ func (fs *LarkCardSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return
}
urls, _ := fs.extract(ctx.Users)
urls, tokens := fs.extract(ctx.Users)
message := BuildTplMessage(models.LarkCard, fs.tpl, ctx.Events)
color := "red"
lowerUnicode := strings.ToLower(message)
@@ -80,14 +80,14 @@ func (fs *LarkCardSender) Send(ctx MessageContext) {
body.Card.Header.Template = color
body.Card.Elements[0].Text.Content = message
body.Card.Elements[2].Elements[0].Content = SendTitle
for _, url := range urls {
doSend(url, body, models.LarkCard, ctx.Stats)
for i, url := range urls {
doSendAndRecord(ctx.Ctx, url, tokens[i], body, models.LarkCard, ctx.Stats, ctx.Events[0])
}
}
func (fs *LarkCardSender) extract(users []*models.User) ([]string, []string) {
urls := make([]string, 0, len(users))
ats := make([]string, 0)
tokens := make([]string, 0)
for i := range users {
if token, has := users[i].ExtractToken(models.Lark); has {
url := token
@@ -95,7 +95,8 @@ func (fs *LarkCardSender) extract(users []*models.User) ([]string, []string) {
url = "https://open.larksuite.com/open-apis/bot/v2/hook/" + strings.TrimSpace(token)
}
urls = append(urls, url)
tokens = append(tokens, token)
}
}
return urls, ats
return urls, tokens
}

View File

@@ -43,7 +43,7 @@ func (ms *MmSender) Send(ctx MessageContext) {
Text: message,
Tokens: urls,
Stats: ctx.Stats,
}, ctx.Events[0])
}, ctx.Events[0], models.Mm)
}
func (ms *MmSender) CallBack(ctx CallBackContext) {
@@ -56,9 +56,7 @@ func (ms *MmSender) CallBack(ctx CallBackContext) {
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
}, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}, ctx.Events[0], "callback")
}
func (ms *MmSender) extract(users []*models.User) []string {
@@ -71,11 +69,12 @@ func (ms *MmSender) extract(users []*models.User) []string {
return tokens
}
func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurEvent) {
func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurEvent, channel string) {
for i := 0; i < len(message.Tokens); i++ {
u, err := url.Parse(message.Tokens[i])
if err != nil {
logger.Errorf("mm_sender: failed to parse error=%v", err)
NotifyRecord(ctx, event, channel, message.Tokens[i], "", err)
continue
}
@@ -104,7 +103,7 @@ func SendMM(ctx *ctx.Context, message MatterMostMessage, event *models.AlertCurE
Username: username,
Text: txt + message.Text,
}
doSendAndRecord(ctx, ur, message.Tokens[i], body, models.Mm, message.Stats, event)
doSendAndRecord(ctx, ur, message.Tokens[i], body, channel, message.Stats, event)
}
}
}

View File

@@ -1,6 +1,7 @@
package sender
import (
"errors"
"html/template"
"strings"
@@ -40,9 +41,7 @@ func (ts *TelegramSender) CallBack(ctx CallBackContext) {
Text: message,
Tokens: []string{ctx.CallBackURL},
Stats: ctx.Stats,
}, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}, ctx.Events[0], "callback")
}
func (ts *TelegramSender) Send(ctx MessageContext) {
@@ -56,7 +55,7 @@ func (ts *TelegramSender) Send(ctx MessageContext) {
Text: message,
Tokens: tokens,
Stats: ctx.Stats,
}, ctx.Events[0])
}, ctx.Events[0], models.Telegram)
}
func (ts *TelegramSender) extract(users []*models.User) []string {
@@ -69,10 +68,11 @@ func (ts *TelegramSender) extract(users []*models.User) []string {
return tokens
}
func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.AlertCurEvent) {
func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.AlertCurEvent, channel string) {
for i := 0; i < len(message.Tokens); i++ {
if !strings.Contains(message.Tokens[i], "/") && !strings.HasPrefix(message.Tokens[i], "https://") {
logger.Errorf("telegram_sender: result=fail invalid token=%s", message.Tokens[i])
NotifyRecord(ctx, event, channel, message.Tokens[i], "", errors.New("invalid token"))
continue
}
var url string
@@ -93,6 +93,6 @@ func SendTelegram(ctx *ctx.Context, message TelegramMessage, event *models.Alert
Text: message.Text,
}
doSendAndRecord(ctx, url, message.Tokens[i], body, models.Telegram, message.Stats, event)
doSendAndRecord(ctx, url, message.Tokens[i], body, channel, message.Stats, event)
}
}

View File

@@ -121,8 +121,8 @@ var EventQueueLock sync.RWMutex
const QueueMaxSize = 100000
type WebhookQueue struct {
list *SafeListLimited
closeCh chan struct{}
eventQueue *SafeEventQueue
closeCh chan struct{}
}
func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCurEvent, stats *astats.Stats) {
@@ -132,8 +132,8 @@ func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCur
if queue == nil {
queue = &WebhookQueue{
list: NewSafeListLimited(QueueMaxSize),
closeCh: make(chan struct{}),
eventQueue: NewSafeEventQueue(QueueMaxSize),
closeCh: make(chan struct{}),
}
EventQueueLock.Lock()
@@ -143,10 +143,10 @@ func PushEvent(ctx *ctx.Context, webhook *models.Webhook, event *models.AlertCur
StartConsumer(ctx, queue, webhook.Batch, webhook, stats)
}
succ := queue.list.PushFront(event)
succ := queue.eventQueue.Push(event)
if !succ {
stats.AlertNotifyErrorTotal.WithLabelValues("push_event_queue").Inc()
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.list.Len(), event)
logger.Warningf("Write channel(%s) full, current channel size: %d event:%v", webhook.Url, queue.eventQueue.Len(), event)
}
}
@@ -157,7 +157,7 @@ func StartConsumer(ctx *ctx.Context, queue *WebhookQueue, popSize int, webhook *
logger.Infof("event queue:%v closed", queue)
return
default:
events := queue.list.PopBack(popSize)
events := queue.eventQueue.PopN(popSize)
if len(events) == 0 {
time.Sleep(time.Millisecond * 400)
continue

View File

@@ -0,0 +1,109 @@
package sender
import (
"container/list"
"sync"
"github.com/ccfos/nightingale/v6/models"
)
type SafeEventQueue struct {
lock sync.RWMutex
maxSize int
queueHigh *list.List
queueMiddle *list.List
queueLow *list.List
}
const (
High = 1
Middle = 2
Low = 3
)
func NewSafeEventQueue(maxSize int) *SafeEventQueue {
return &SafeEventQueue{
maxSize: maxSize,
lock: sync.RWMutex{},
queueHigh: list.New(),
queueMiddle: list.New(),
queueLow: list.New(),
}
}
func (spq *SafeEventQueue) Len() int {
spq.lock.RLock()
defer spq.lock.RUnlock()
return spq.queueHigh.Len() + spq.queueMiddle.Len() + spq.queueLow.Len()
}
// len 无锁读取长度,不要在本文件外调用
func (spq *SafeEventQueue) len() int {
return spq.queueHigh.Len() + spq.queueMiddle.Len() + spq.queueLow.Len()
}
func (spq *SafeEventQueue) Push(event *models.AlertCurEvent) bool {
spq.lock.Lock()
defer spq.lock.Unlock()
for spq.len() > spq.maxSize {
return false
}
switch event.Severity {
case High:
spq.queueHigh.PushBack(event)
case Middle:
spq.queueMiddle.PushBack(event)
case Low:
spq.queueLow.PushBack(event)
default:
return false
}
return true
}
// pop 无锁弹出事件,不要在本文件外调用
func (spq *SafeEventQueue) pop() *models.AlertCurEvent {
if spq.len() == 0 {
return nil
}
var elem interface{}
if spq.queueHigh.Len() > 0 {
elem = spq.queueHigh.Remove(spq.queueHigh.Front())
} else if spq.queueMiddle.Len() > 0 {
elem = spq.queueMiddle.Remove(spq.queueMiddle.Front())
} else {
elem = spq.queueLow.Remove(spq.queueLow.Front())
}
event, ok := elem.(*models.AlertCurEvent)
if !ok {
return nil
}
return event
}
func (spq *SafeEventQueue) Pop() *models.AlertCurEvent {
spq.lock.Lock()
defer spq.lock.Unlock()
return spq.pop()
}
func (spq *SafeEventQueue) PopN(n int) []*models.AlertCurEvent {
spq.lock.Lock()
defer spq.lock.Unlock()
events := make([]*models.AlertCurEvent, 0, n)
count := 0
for count < n && spq.len() > 0 {
event := spq.pop()
if event != nil {
events = append(events, event)
}
count++
}
return events
}

View File

@@ -0,0 +1,157 @@
package sender
import (
"sync"
"testing"
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/stretchr/testify/assert"
)
func TestSafePriorityQueue_ConcurrentPushPop(t *testing.T) {
spq := NewSafeEventQueue(100000)
var wg sync.WaitGroup
numGoroutines := 100
numEvents := 1000
// 并发 Push
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numEvents; j++ {
event := &models.AlertCurEvent{
Severity: goroutineID%3 + 1,
TriggerTime: time.Now().UnixNano(),
}
spq.Push(event)
}
}(i)
}
wg.Wait()
// 检查队列长度是否正确
expectedLen := numGoroutines * numEvents
assert.Equal(t, expectedLen, spq.Len(), "Queue length mismatch after concurrent pushes")
// 并发 Pop
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
for {
event := spq.Pop()
if event == nil {
return
}
}
}()
}
wg.Wait()
// 最终队列应该为空
assert.Equal(t, 0, spq.Len(), "Queue should be empty after concurrent pops")
}
func TestSafePriorityQueue_ConcurrentPopMax(t *testing.T) {
spq := NewSafeEventQueue(100000)
// 添加初始数据
for i := 0; i < 1000; i++ {
spq.Push(&models.AlertCurEvent{
Severity: i%3 + 1,
TriggerTime: time.Now().UnixNano(),
})
}
var wg sync.WaitGroup
numGoroutines := 10
popMax := 100
// 并发 PopN
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
events := spq.PopN(popMax)
assert.LessOrEqual(t, len(events), popMax, "PopN exceeded maximum")
}()
}
wg.Wait()
// 检查队列长度是否正确
expectedRemaining := 1000 - (numGoroutines * popMax)
if expectedRemaining < 0 {
expectedRemaining = 0
}
assert.Equal(t, expectedRemaining, spq.Len(), "Queue length mismatch after concurrent PopN")
}
func TestSafePriorityQueue_ConcurrentPushPopWithDifferentSeverities(t *testing.T) {
spq := NewSafeEventQueue(100000)
var wg sync.WaitGroup
numGoroutines := 50
numEvents := 500
// 并发 Push 不同优先级的事件
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func(goroutineID int) {
defer wg.Done()
for j := 0; j < numEvents; j++ {
event := &models.AlertCurEvent{
Severity: goroutineID%3 + 1, // 模拟不同的 Severity
TriggerTime: time.Now().UnixNano(),
}
spq.Push(event)
}
}(i)
}
wg.Wait()
// 检查队列长度是否正确
expectedLen := numGoroutines * numEvents
assert.Equal(t, expectedLen, spq.Len(), "Queue length mismatch after concurrent pushes")
// 检查事件的顺序是否按照优先级排列
var lastEvent *models.AlertCurEvent
for spq.Len() > 0 {
event := spq.Pop()
if lastEvent != nil {
assert.LessOrEqual(t, lastEvent.Severity, event.Severity, "Events are not in correct priority order")
}
lastEvent = event
}
}
func TestSafePriorityQueue_ExceedMaxSize(t *testing.T) {
spq := NewSafeEventQueue(5)
// 插入超过最大容量的事件
for i := 0; i < 10; i++ {
spq.Push(&models.AlertCurEvent{
Severity: i % 3,
TriggerTime: int64(i),
})
}
// 验证队列的长度是否不超过 maxSize
assert.LessOrEqual(t, spq.Len(), spq.maxSize)
// 验证队列中剩余事件的内容
expectedEvents := 5
if spq.Len() < 5 {
expectedEvents = spq.Len()
}
// 检查最后存入的事件是否是按优先级排序
for i := 0; i < expectedEvents; i++ {
event := spq.Pop()
if event != nil {
assert.LessOrEqual(t, event.Severity, 2)
}
}
}

View File

@@ -39,7 +39,6 @@ func (ws *WecomSender) CallBack(ctx CallBackContext) {
doSendAndRecord(ctx.Ctx, ctx.CallBackURL, ctx.CallBackURL, body, "callback",
ctx.Stats, ctx.Events[0])
ctx.Stats.AlertNotifyTotal.WithLabelValues("rule_callback").Inc()
}
func (ws *WecomSender) Send(ctx MessageContext) {

View File

@@ -48,6 +48,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
cconf.MergeOperationConf()
if config.Alert.Heartbeat.EngineName == "" {
config.Alert.Heartbeat.EngineName = "default"
}
logxClean, err := logx.Init(config.Log)
if err != nil {
return nil, err
@@ -116,7 +120,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
go func() {
if models.CanMigrateBg(ctx) {
models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
}
}()
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)

View File

@@ -122,12 +122,14 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
}
func DatasourceCheck(ds models.Datasource) error {
if ds.HTTPJson.Url == "" {
return fmt.Errorf("url is empty")
}
if ds.PluginType != models.ELASTICSEARCH {
if ds.HTTPJson.Url == "" {
return fmt.Errorf("url is empty")
}
if !strings.HasPrefix(ds.HTTPJson.Url, "http") {
return fmt.Errorf("url must start with http or https")
if !strings.HasPrefix(ds.HTTPJson.Url, "http") {
return fmt.Errorf("url must start with http or https")
}
}
client := &http.Client{
@@ -138,11 +140,11 @@ func DatasourceCheck(ds models.Datasource) error {
},
}
fullURL := ds.HTTPJson.Url
req, err := http.NewRequest("GET", fullURL, nil)
var fullURL string
req, err := ds.HTTPJson.NewReq(&fullURL)
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request url:%s failed", fullURL)
return fmt.Errorf("request urls:%v failed", ds.HTTPJson.GetUrls())
}
if ds.PluginType == models.PROMETHEUS {

View File

@@ -7,7 +7,6 @@ import (
"net"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
@@ -112,9 +111,9 @@ func (rt *Router) dsProxy(c *gin.Context) {
return
}
target, err := url.Parse(ds.HTTPJson.Url)
target, err := ds.HTTPJson.ParseUrl()
if err != nil {
c.String(http.StatusInternalServerError, "invalid url: %s", ds.HTTPJson.Url)
c.String(http.StatusInternalServerError, "invalid urls: %s", ds.HTTPJson.GetUrls())
return
}

View File

@@ -1,5 +1,3 @@
version: "3.7"
networks:
nightingale:
driver: bridge

3
go.mod
View File

@@ -32,6 +32,7 @@ require (
github.com/rakyll/statik v0.1.7
github.com/redis/go-redis/v9 v9.0.2
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.8
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
@@ -44,6 +45,8 @@ require (
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde
)
require github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
require (
github.com/Azure/go-ntlmssp v0.0.0-20220621081337-cb9428e4ac1e // indirect
github.com/beorn7/perks v1.0.1 // indirect

2
go.sum
View File

@@ -234,6 +234,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pquerna/cachecontrol v0.1.0 h1:yJMy84ti9h/+OEWa752kBTKv4XC30OtVVHYv/8cTqKc=
github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQnrHV5K9mBcUI=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
@@ -288,6 +289,7 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/gjson v1.14.0 h1:6aeJ0bzojgWLa82gDQHcx3S0Lr/O51I9bJ5nv6JFx5w=
github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=

View File

@@ -1,6 +1,6 @@
## Appdynamics
## AppDynamics
Appdynamics 采集插件, 采集 Appdynamics 数据
AppDynamics 采集插件, 采集 AppDynamics 数据
## Configuration

View File

@@ -0,0 +1,8 @@
[[instances]]
urls = [
# "http://localhost:8053/xml/v3",
]
gather_memory_contexts = true
gather_views = true
timeout = "5s"
# labels={app="bind"}

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

View File

@@ -0,0 +1,13 @@
forked from [telegraf/snmp](https://github.com/influxdata/telegraf/tree/master/plugins/inputs/bind)
配置示例
```
[[instances]]
urls = [
#"http://localhost:8053/xml/v3",
]
timeout = "5s"
gather_memory_contexts = true
gather_views = true
```

View File

@@ -0,0 +1,3 @@
## canal
canal 默认提供了 prometheus 格式指标的接口 [Prometheus-QuickStart](https://github.com/alibaba/canal/wiki/Prometheus-QuickStart) ,所以可以直接通过[ prometheus 插件](https://flashcat.cloud/docs/content/flashcat-monitor/categraf/plugin/prometheus)采集。

View File

@@ -0,0 +1,37 @@
# # collect interval
# interval = 15
[[instances]]
# # append some labels for series
# labels = { region="cloud", product="n9e" }
# # interval = global.interval * interval_times
# interval_times = 1
## Server to monitor
## The scheme determines the mode to use for connection with
## ldap://... -- unencrypted (non-TLS) connection
## ldaps://... -- TLS connection
## starttls://... -- StartTLS connection
## If no port is given, the default ports, 389 for ldap and starttls and
## 636 for ldaps, are used.
#server = "ldap://localhost"
## Server dialect, can be "openldap" or "389ds"
# dialect = "openldap"
# DN and password to bind with
## If bind_dn is empty an anonymous bind is performed.
bind_dn = ""
bind_password = ""
## Reverse the field names constructed from the monitoring DN
# reverse_field_names = false
## Optional TLS Config
# use_tls = false
# tls_ca = "/etc/categraf/ca.pem"
# tls_cert = "/etc/categraf/cert.pem"
# tls_key = "/etc/categraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.1 KiB

View File

@@ -0,0 +1,113 @@
# LDAP Input Plugin
This plugin gathers metrics from LDAP servers' monitoring (`cn=Monitor`)
backend. Currently this plugin supports [OpenLDAP](https://www.openldap.org/)
and [389ds](https://www.port389.org/) servers.
To use this plugin you must enable the monitoring backend/plugin of your LDAP
server. See
[OpenLDAP](https://www.openldap.org/devel/admin/monitoringslapd.html) or 389ds
documentation for details.
## Metrics
Depending on the server dialect, different metrics are produced. The metrics
are usually named according to the selected dialect.
### Tags
- server -- Server name or IP
- port -- Port used for connecting
## Example Output
Using the `openldap` dialect
```text
openldap_modify_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_referrals_statistics agent_hostname=zy-fat port=389 server=localhost 0
openldap_unbind_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_delete_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_extended_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_pdu_statistics agent_hostname=zy-fat port=389 server=localhost 42
openldap_starting_threads agent_hostname=zy-fat port=389 server=localhost 0
openldap_active_threads agent_hostname=zy-fat port=389 server=localhost 1
openldap_uptime_time agent_hostname=zy-fat port=389 server=localhost 102
openldap_bytes_statistics agent_hostname=zy-fat port=389 server=localhost 3176
openldap_compare_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_bind_operations_completed agent_hostname=zy-fat port=389 server=localhost 1
openldap_total_connections agent_hostname=zy-fat port=389 server=localhost 1002
openldap_search_operations_completed agent_hostname=zy-fat port=389 server=localhost 1
openldap_abandon_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_add_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_open_threads agent_hostname=zy-fat port=389 server=localhost 1
openldap_add_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_operations_initiated agent_hostname=zy-fat port=389 server=localhost 3
openldap_write_waiters agent_hostname=zy-fat port=389 server=localhost 0
openldap_entries_statistics agent_hostname=zy-fat port=389 server=localhost 41
openldap_modrdn_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_pending_threads agent_hostname=zy-fat port=389 server=localhost 0
openldap_max_pending_threads agent_hostname=zy-fat port=389 server=localhost 0
openldap_bind_operations_initiated agent_hostname=zy-fat port=389 server=localhost 1
openldap_max_file_descriptors_connections agent_hostname=zy-fat port=389 server=localhost 1024
openldap_compare_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_search_operations_initiated agent_hostname=zy-fat port=389 server=localhost 2
openldap_modrdn_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_read_waiters agent_hostname=zy-fat port=389 server=localhost 1
openldap_backload_threads agent_hostname=zy-fat port=389 server=localhost 1
openldap_current_connections agent_hostname=zy-fat port=389 server=localhost 1
openldap_unbind_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_delete_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_extended_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_modify_operations_initiated agent_hostname=zy-fat port=389 server=localhost 0
openldap_max_threads agent_hostname=zy-fat port=389 server=localhost 16
openldap_abandon_operations_completed agent_hostname=zy-fat port=389 server=localhost 0
openldap_operations_completed agent_hostname=zy-fat port=389 server=localhost 2
openldap_database_2_databases agent_hostname=zy-fat port=389 server=localhost 0
```
Using the `389ds` dialect
```text
389ds_current_connections_at_max_threads agent_hostname=zy-fat port=389 server=localhost 0
389ds_connections_max_threads agent_hostname=zy-fat port=389 server=localhost 0
389ds_add_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_dtablesize agent_hostname=zy-fat port=389 server=localhost 63936
389ds_strongauth_binds agent_hostname=zy-fat port=389 server=localhost 13
389ds_modrdn_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_maxthreads_per_conn_hits agent_hostname=zy-fat port=389 server=localhost 0
389ds_current_connections agent_hostname=zy-fat port=389 server=localhost 2
389ds_security_errors agent_hostname=zy-fat port=389 server=localhost 0
389ds_entries_sent agent_hostname=zy-fat port=389 server=localhost 13
389ds_cache_entries agent_hostname=zy-fat port=389 server=localhost 0
389ds_backends agent_hostname=zy-fat port=389 server=localhost 0
389ds_threads agent_hostname=zy-fat port=389 server=localhost 17
389ds_connections agent_hostname=zy-fat port=389 server=localhost 2
389ds_read_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_entries_returned agent_hostname=zy-fat port=389 server=localhost 13
389ds_unauth_binds agent_hostname=zy-fat port=389 server=localhost 0
389ds_search_operations agent_hostname=zy-fat port=389 server=localhost 14
389ds_simpleauth_binds agent_hostname=zy-fat port=389 server=localhost 0
389ds_operations_completed agent_hostname=zy-fat port=389 server=localhost 51
389ds_connections_in_max_threads agent_hostname=zy-fat port=389 server=localhost 0
389ds_modify_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_wholesubtree_search_operations agent_hostname=zy-fat port=389 server=localhost 1
389ds_read_waiters agent_hostname=zy-fat port=389 server=localhost 0
389ds_compare_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_errors agent_hostname=zy-fat port=389 server=localhost 13
389ds_in_operations agent_hostname=zy-fat port=389 server=localhost 52
389ds_total_connections agent_hostname=zy-fat port=389 server=localhost 15
389ds_cache_hits agent_hostname=zy-fat port=389 server=localhost 0
389ds_list_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_referrals_returned agent_hostname=zy-fat port=389 server=localhost 0
389ds_copy_entries agent_hostname=zy-fat port=389 server=localhost 0
389ds_operations_initiated agent_hostname=zy-fat port=389 server=localhost 52
389ds_chainings agent_hostname=zy-fat port=389 server=localhost 0
389ds_bind_security_errors agent_hostname=zy-fat port=389 server=localhost 0
389ds_onelevel_search_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_bytes_sent agent_hostname=zy-fat port=389 server=localhost 1702
389ds_bytes_received agent_hostname=zy-fat port=389 server=localhost 0
389ds_referrals agent_hostname=zy-fat port=389 server=localhost 0
389ds_delete_operations agent_hostname=zy-fat port=389 server=localhost 0
389ds_anonymous_binds agent_hostname=zy-fat port=389 server=localhost 0
```

View File

@@ -60,6 +60,18 @@ func (c *BusiGroupCacheType) GetByBusiGroupId(id int64) *models.BusiGroup {
return c.ugs[id]
}
func (c *BusiGroupCacheType) GetNamesByBusiGroupIds(ids []int64) []string {
c.RLock()
defer c.RUnlock()
names := make([]string, 0, len(ids))
for _, id := range ids {
if ug, exists := c.ugs[id]; exists {
names = append(names, ug.Name)
}
}
return names
}
func (c *BusiGroupCacheType) SyncBusiGroups() {
err := c.syncBusiGroups()
if err != nil {

View File

@@ -69,6 +69,8 @@ type AlertCurEvent struct {
ExtraInfo []string `json:"extra_info" gorm:"-"`
Target *Target `json:"target" gorm:"-"`
RecoverConfig RecoverConfig `json:"recover_config" gorm:"-"`
RuleHash string `json:"rule_hash" gorm:"-"`
ExtraInfoMap []map[string]string `json:"extra_info_map" gorm:"-"`
}
func (e *AlertCurEvent) TableName() string {

View File

@@ -23,8 +23,9 @@ const (
HOST = "host"
LOKI = "loki"
PROMETHEUS = "prometheus"
TDENGINE = "tdengine"
PROMETHEUS = "prometheus"
TDENGINE = "tdengine"
ELASTICSEARCH = "elasticsearch"
)
const (
@@ -1126,3 +1127,7 @@ func InsertAlertRule(ctx *ctx.Context, ars []*AlertRule) error {
}
return DB(ctx).Create(ars).Error
}
func (ar *AlertRule) Hash() string {
return str.MD5(fmt.Sprintf("%d_%s_%s", ar.Id, ar.DatasourceIds, ar.RuleConfig))
}

View File

@@ -2,7 +2,9 @@ package models
import (
"encoding/json"
"math/rand"
"net/http"
"net/url"
"strings"
"time"
@@ -50,6 +52,7 @@ type HTTP struct {
TLS TLS `json:"tls"`
MaxIdleConnsPerHost int `json:"max_idle_conns_per_host"`
Url string `json:"url"`
Urls []string `json:"urls"`
Headers map[string]string `json:"headers"`
}
@@ -68,6 +71,49 @@ func (h HTTP) IsLoki() bool {
return false
}
func (h HTTP) GetUrls() []string {
var urls []string
if len(h.Urls) == 0 {
urls = []string{h.Url}
} else {
// 复制切片以避免修改原始数据
urls = make([]string, len(h.Urls))
copy(urls, h.Urls)
}
// 使用 Fisher-Yates 洗牌算法随机打乱顺序
for i := len(urls) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
urls[i], urls[j] = urls[j], urls[i]
}
return urls
}
func (h HTTP) NewReq(reqUrl *string) (req *http.Request, err error) {
urls := h.GetUrls()
for i := 0; i < len(urls); i++ {
if req, err = http.NewRequest("GET", urls[i], nil); err == nil {
*reqUrl = urls[i]
return
}
}
return
}
func (h HTTP) ParseUrl() (target *url.URL, err error) {
urls := h.GetUrls()
if len(urls) == 0 {
return nil, errors.New("no urls")
}
target, err = url.Parse(urls[0])
if err != nil {
return nil, err
}
return
}
type TLS struct {
SkipTlsVerify bool `json:"skip_tls_verify"`
}
@@ -300,6 +346,10 @@ func (ds *Datasource) DB2FE() error {
ds.HTTPJson.MaxIdleConnsPerHost = 100
}
if ds.PluginType == ELASTICSEARCH && len(ds.HTTPJson.Urls) == 0 {
ds.HTTPJson.Urls = []string{ds.HTTPJson.Url}
}
if ds.Auth != "" {
err := json.Unmarshal([]byte(ds.Auth), &ds.AuthJson)
if err != nil {

View File

@@ -283,7 +283,7 @@ var TplMap = map[string]string{
{{- end}}
{{- end}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl}})`,
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
Email: `<!DOCTYPE html>
<html lang="en">
<head>
@@ -529,7 +529,7 @@ var TplMap = map[string]string{
{{if .RuleNote }}**告警描述:** **{{.RuleNote}}**{{end}}
{{- end -}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl}})`,
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
EmailSubject: `{{if .IsRecovered}}Recovered{{else}}Triggered{{end}}: {{.RuleName}} {{.TagsJSON}}`,
Mm: `级别状态: S{{.Severity}} {{if .IsRecovered}}Recovered{{else}}Triggered{{end}}
规则名称: {{.RuleName}}{{if .RuleNote}}
@@ -557,7 +557,7 @@ var TplMap = map[string]string{
{{$time_duration := sub now.Unix .FirstTriggerTime }}{{if .IsRecovered}}{{$time_duration = sub .LastEvalTime .FirstTriggerTime }}{{end}}**距离首次告警**: {{humanizeDurationInterface $time_duration}}
**发送时间**: {{timestamp}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl}})`,
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
Lark: `级别状态: S{{.Severity}} {{if .IsRecovered}}Recovered{{else}}Triggered{{end}}
规则名称: {{.RuleName}}{{if .RuleNote}}
规则备注: {{.RuleNote}}{{end}}
@@ -588,5 +588,5 @@ var TplMap = map[string]string{
{{if .RuleNote }}**告警描述:** **{{.RuleNote}}**{{end}}
{{- end -}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl}})`,
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
}

View File

@@ -36,6 +36,7 @@ func ConvertAlert(rule PromRule, interval string, datasouceIds []int64, disabled
appendTags := []string{}
severity := 2
ruleName := rule.Alert
if len(rule.Labels) > 0 {
for k, v := range rule.Labels {
if k != "severity" {
@@ -49,12 +50,13 @@ func ConvertAlert(rule PromRule, interval string, datasouceIds []int64, disabled
case "info":
severity = 3
}
ruleName += "-" + v
}
}
}
return AlertRule{
Name: rule.Alert,
Name: ruleName,
Severity: severity,
DatasourceIdsJson: datasouceIds,
Disabled: disabled,

View File

@@ -34,15 +34,16 @@ type Target struct {
OS string `json:"os" gorm:"column:os"`
HostTags []string `json:"host_tags" gorm:"serializer:json"`
UnixTime int64 `json:"unixtime" gorm:"-"`
Offset int64 `json:"offset" gorm:"-"`
TargetUp float64 `json:"target_up" gorm:"-"`
MemUtil float64 `json:"mem_util" gorm:"-"`
CpuNum int `json:"cpu_num" gorm:"-"`
CpuUtil float64 `json:"cpu_util" gorm:"-"`
Arch string `json:"arch" gorm:"-"`
RemoteAddr string `json:"remote_addr" gorm:"-"`
GroupIds []int64 `json:"group_ids" gorm:"-"`
UnixTime int64 `json:"unixtime" gorm:"-"`
Offset int64 `json:"offset" gorm:"-"`
TargetUp float64 `json:"target_up" gorm:"-"`
MemUtil float64 `json:"mem_util" gorm:"-"`
CpuNum int `json:"cpu_num" gorm:"-"`
CpuUtil float64 `json:"cpu_util" gorm:"-"`
Arch string `json:"arch" gorm:"-"`
RemoteAddr string `json:"remote_addr" gorm:"-"`
GroupIds []int64 `json:"group_ids" gorm:"-"`
GroupNames []string `json:"group_names" gorm:"-"`
}
func (t *Target) TableName() string {
@@ -563,19 +564,34 @@ func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}
return DB(ctx).Model(m).Updates(fields).Error
}
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
// 1. 判断是否已经完成迁移
// 1. 是否可以进行 busi_group 迁移
func CanMigrateBg(ctx *ctx.Context) bool {
// 1.1 检查 target 表是否为空
var cnt int64
if err := DB(ctx).Model(&Target{}).Count(&cnt).Error; err != nil {
log.Println("failed to get target table count, err:", err)
return false
}
if cnt == 0 {
log.Println("target table is empty, skip migration.")
return false
}
// 1.2 判断是否已经完成迁移
var maxGroupId int64
if err := DB(ctx).Model(&Target{}).Select("MAX(group_id)").Scan(&maxGroupId).Error; err != nil {
log.Println("failed to get max group_id from target table, err:", err)
return
return false
}
if maxGroupId == 0 {
log.Println("migration bgid has been completed.")
return
return false
}
return true
}
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
err := DoMigrateBg(ctx, bgLabelKey)
if err != nil {
log.Println("failed to migrate bgid, err:", err)

View File

@@ -80,7 +80,7 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
var dataResp DataResponse[T]
err = json.Unmarshal(body, &dataResp)
if err != nil {
return dat, fmt.Errorf("failed to decode response: %w", err)
return dat, fmt.Errorf("failed to decode:%s response: %w", string(body), err)
}
if dataResp.Err != "" {

View File

@@ -697,15 +697,18 @@ func (h *httpAPI) LabelValues(ctx context.Context, label string, matchs []string
}
func (h *httpAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error) {
var err error
var warnings Warnings
var value model.Value
for i := 0; i < 3; i++ {
value, warnings, err := h.query(ctx, query, ts)
value, warnings, err = h.query(ctx, query, ts)
if err == nil {
return value, warnings, nil
}
time.Sleep(100 * time.Millisecond)
}
return nil, nil, errors.New("query failed")
return nil, warnings, err
}
func (h *httpAPI) query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error) {

View File

@@ -20,6 +20,12 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
for key, value := range target.TagsMap {
if index, has := labelKeys[key]; has {
// e.g. busigroup=cloud
if _, has := labelKeys[rt.Pushgw.BusiGroupLabelKey]; has {
// busigroup key already exists, skip
continue
}
// overwrite labels
if rt.Pushgw.LabelRewrite {
pt.Labels[index].Value = value

View File

@@ -154,6 +154,11 @@ func relabel(lset []prompb.Label, cfg *pconf.RelabelConfig) []prompb.Label {
}
func handleReplace(lb *LabelBuilder, regx *regexp.Regexp, cfg *pconf.RelabelConfig, val string, lset []prompb.Label) []prompb.Label {
// replace 如果没有 target_label直接返回原标签
if len(cfg.TargetLabel) == 0 {
return lb.labels()
}
// 如果没有 source_labels直接设置标签新增标签
if len(cfg.SourceLabels) == 0 {
lb.set(cfg.TargetLabel, cfg.Replacement)

View File

@@ -185,11 +185,23 @@ func (tc *tdengineClient) QueryTable(query string) (APIResponse, error) {
}
defer resp.Body.Close()
// 限制响应体大小为10MB
maxSize := int64(10 * 1024 * 1024) // 10MB
limitedReader := http.MaxBytesReader(nil, resp.Body, maxSize)
if resp.StatusCode != http.StatusOK {
return apiResp, fmt.Errorf("HTTP error, status: %s", resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(&apiResp)
return apiResp, err
err = json.NewDecoder(limitedReader).Decode(&apiResp)
if err != nil {
if strings.Contains(err.Error(), "http: request body too large") {
return apiResp, fmt.Errorf("response body exceeds 10MB limit")
}
return apiResp, err
}
return apiResp, nil
}
func (tc *tdengineClient) QueryLog(query interface{}) (APIResponse, error) {
@@ -245,15 +257,23 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colType, ok := colData[1].(string)
if !ok {
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
if int(t) == 9 { // TIMESTAMP type in v2
tsIdx = colIndex
break
}
case string:
// v3版本直接使用字符串类型
if t == "TIMESTAMP" {
tsIdx = colIndex
break
}
default:
logger.Warningf("unexpected column type: %v", colData[1])
return src
}
if colType == "TIMESTAMP" {
tsIdx = colIndex
break
continue
}
}
@@ -262,15 +282,19 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
}
for i := range src.Data {
ts, ok := src.Data[i][tsIdx].(string)
if !ok {
logger.Warningf("unexpected timestamp type: %v", src.Data[i][tsIdx])
continue
}
var t time.Time
var err error
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", src.Data[i], err)
switch tsVal := src.Data[i][tsIdx].(type) {
case string:
// 尝试解析不同格式的时间字符串
t, err = parseTimeString(tsVal)
if err != nil {
logger.Warningf("parse timestamp string failed: %v, value: %v", err, tsVal)
continue
}
default:
logger.Warningf("unexpected timestamp type: %T, value: %v", tsVal, tsVal)
continue
}
@@ -279,7 +303,73 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
return src
}
func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
func parseTimeString(ts string) (time.Time, error) {
// 尝试不同的时间格式
formats := []string{
// 标准格式
time.Layout, // "01/02 03:04:05PM '06 -0700"
time.ANSIC, // "Mon Jan _2 15:04:05 2006"
time.UnixDate, // "Mon Jan _2 15:04:05 MST 2006"
time.RubyDate, // "Mon Jan 02 15:04:05 -0700 2006"
time.RFC822, // "02 Jan 06 15:04 MST"
time.RFC822Z, // "02 Jan 06 15:04 -0700"
time.RFC850, // "Monday, 02-Jan-06 15:04:05 MST"
time.RFC1123, // "Mon, 02 Jan 2006 15:04:05 MST"
time.RFC1123Z, // "Mon, 02 Jan 2006 15:04:05 -0700"
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
time.RFC3339Nano, // "2006-01-02T15:04:05.999999999Z07:00"
time.Kitchen, // "3:04PM"
// 实用时间戳格式
time.Stamp, // "Jan _2 15:04:05"
time.StampMilli, // "Jan _2 15:04:05.000"
time.StampMicro, // "Jan _2 15:04:05.000000"
time.StampNano, // "Jan _2 15:04:05.000000000"
time.DateTime, // "2006-01-02 15:04:05"
time.DateOnly, // "2006-01-02"
time.TimeOnly, // "15:04:05"
// 常用自定义格式
"2006-01-02T15:04:05", // 无时区的ISO格式
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05Z",
"2006-01-02 15:04:05.999999999", // 纳秒
"2006-01-02 15:04:05.999999", // 微秒
"2006-01-02 15:04:05.999", // 毫秒
"2006/01/02",
"20060102",
"01/02/2006",
"2006年01月02日",
"2006年01月02日 15:04:05",
}
var lastErr error
for _, format := range formats {
t, err := time.Parse(format, ts)
if err == nil {
return t, nil
}
lastErr = err
}
// 尝试解析 Unix 时间戳
if timestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
switch len(ts) {
case 10: // 秒
return time.Unix(timestamp, 0), nil
case 13: // 毫秒
return time.Unix(timestamp/1000, (timestamp%1000)*1000000), nil
case 16: // 微秒
return time.Unix(timestamp/1000000, (timestamp%1000000)*1000), nil
case 19: // 纳秒
return time.Unix(timestamp/1000000000, timestamp%1000000000), nil
}
}
return time.Time{}, fmt.Errorf("failed to parse time with any format: %v", lastErr)
}
func (tc *tdengineClient) Query(query interface{}, delay ...int) ([]models.DataResp, error) {
b, err := json.Marshal(query)
if err != nil {
return nil, err
@@ -294,9 +384,14 @@ func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
q.Interval = 60
}
delaySec := 0
if len(delay) > 0 {
delaySec = delay[0]
}
if q.From == "" {
// 2023-09-21T05:37:30.000Z format
to := time.Now().Unix()
to := time.Now().Unix() - int64(delaySec)
q.To = time.Unix(to, 0).UTC().Format(time.RFC3339)
from := to - q.Interval
q.From = time.Unix(from, 0).UTC().Format(time.RFC3339)
@@ -368,9 +463,45 @@ func (tc *tdengineClient) GetColumns(database, table string) ([]Column, error) {
return columns, err
}
for _, row := range data.ColumnMeta {
var colType string
switch t := row[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", row[1])
colType = "UNKNOWN"
}
column := Column{
Name: row[0].(string),
Type: row[1].(string),
Type: colType,
Size: int(row[2].(float64)),
}
columns = append(columns, column)
@@ -454,9 +585,44 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colName := colData[0].(string)
colType := colData[1].(string)
var colType string
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", colData[1])
continue
}
switch colType {
case "TIMESTAMP":
tsIdx = colIndex
@@ -470,7 +636,6 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
} else {
metricIdxMap[colName] = colIndex
}
default:
if len(labelMap) > 0 {
if _, ok := labelMap[colName]; !ok {
@@ -505,7 +670,7 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
metric[model.MetricNameLabel] = model.LabelValue(metricName)
// transfer 2022-06-29T05:52:16.603Z to unix timestamp
t, err := time.Parse(time.RFC3339, row[tsIdx].(string))
t, err := parseTimeString(row[tsIdx].(string))
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", row, err)
continue