mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
13 Commits
update-lis
...
release-16
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
445b6b2e41 | ||
|
|
fcf07df35f | ||
|
|
196e5241bf | ||
|
|
0d7ee0cfd9 | ||
|
|
aed6927d7e | ||
|
|
61b9ebb1e6 | ||
|
|
da27ca06d6 | ||
|
|
776f100eb9 | ||
|
|
ac8443fd66 | ||
|
|
751d2e8601 | ||
|
|
87855cb184 | ||
|
|
9ddeefea9b | ||
|
|
6b566f5a18 |
@@ -92,7 +92,7 @@
|
||||
|
||||
## 交流渠道
|
||||
- 报告Bug,优先推荐提交[夜莺GitHub Issue](https://github.com/ccfos/nightingale/issues/new?assignees=&labels=kind%2Fbug&projects=&template=bug_report.yml)
|
||||
- 推荐完整浏览[夜莺文档站点](https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v7/introduction/),了解更多信息
|
||||
- 关注[这个公众号](https://gitlink.org.cn/UlricQin)了解更多夜莺动态和知识
|
||||
- 加我微信:`picobyte`(我已关闭好友验证)拉入微信群,备注:`夜莺互助群`
|
||||
|
||||
## 广受关注
|
||||
|
||||
@@ -115,7 +115,9 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
|
||||
busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, alertc.Alerting, ctx, alertStats)
|
||||
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, alertc.Alerting, ctx, alertStats)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)
|
||||
|
||||
notifyRecordComsumer := sender.NewNotifyRecordConsumer(ctx)
|
||||
|
||||
@@ -15,6 +15,8 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/alert/astats"
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/relabel"
|
||||
"github.com/ccfos/nightingale/v6/alert/sender"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
@@ -35,6 +37,7 @@ type Dispatch struct {
|
||||
notifyRuleCache *memsto.NotifyRuleCacheType
|
||||
notifyChannelCache *memsto.NotifyChannelCacheType
|
||||
messageTemplateCache *memsto.MessageTemplateCacheType
|
||||
eventProcessorCache *memsto.EventProcessorCacheType
|
||||
|
||||
alerting aconf.Alerting
|
||||
|
||||
@@ -54,7 +57,7 @@ type Dispatch struct {
|
||||
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
|
||||
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
|
||||
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
|
||||
messageTemplateCache *memsto.MessageTemplateCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
|
||||
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
|
||||
notify := &Dispatch{
|
||||
alertRuleCache: alertRuleCache,
|
||||
userCache: userCache,
|
||||
@@ -66,6 +69,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
|
||||
notifyRuleCache: notifyRuleCache,
|
||||
notifyChannelCache: notifyChannelCache,
|
||||
messageTemplateCache: messageTemplateCache,
|
||||
eventProcessorCache: eventProcessorCache,
|
||||
|
||||
alerting: alerting,
|
||||
|
||||
@@ -77,6 +81,9 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
|
||||
ctx: ctx,
|
||||
Astats: astats,
|
||||
}
|
||||
|
||||
relabel.Init()
|
||||
|
||||
return notify
|
||||
}
|
||||
|
||||
@@ -141,11 +148,14 @@ func (e *Dispatch) reloadTpls() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Dispatch) HandleEventWithNotifyRule(event *models.AlertCurEvent, isSubscribe bool) {
|
||||
func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent) {
|
||||
|
||||
if len(event.NotifyRuleIDs) > 0 {
|
||||
for _, notifyRuleId := range event.NotifyRuleIDs {
|
||||
logger.Infof("notify rule ids: %v, event: %+v", notifyRuleId, event)
|
||||
if len(eventOrigin.NotifyRuleIDs) > 0 {
|
||||
for _, notifyRuleId := range eventOrigin.NotifyRuleIDs {
|
||||
// 深拷贝新的 event,避免并发修改 event 冲突
|
||||
eventCopy := eventOrigin.DeepCopy()
|
||||
|
||||
logger.Infof("notify rule ids: %v, event: %+v", notifyRuleId, eventCopy)
|
||||
notifyRule := e.notifyRuleCache.Get(notifyRuleId)
|
||||
if notifyRule == nil {
|
||||
continue
|
||||
@@ -155,33 +165,110 @@ func (e *Dispatch) HandleEventWithNotifyRule(event *models.AlertCurEvent, isSubs
|
||||
continue
|
||||
}
|
||||
|
||||
var processors []pipeline.Processor
|
||||
for _, pipelineConfig := range notifyRule.PipelineConfigs {
|
||||
if !pipelineConfig.Enable {
|
||||
continue
|
||||
}
|
||||
|
||||
eventPipeline := e.eventProcessorCache.Get(pipelineConfig.PipelineId)
|
||||
if eventPipeline == nil {
|
||||
logger.Warningf("notify_id: %d, event:%+v, processor not found", notifyRuleId, eventCopy)
|
||||
continue
|
||||
}
|
||||
|
||||
if !pipelineApplicable(eventPipeline, eventCopy) {
|
||||
logger.Debugf("notify_id: %d, event:%+v, pipeline_id: %d, not applicable", notifyRuleId, eventCopy, pipelineConfig.PipelineId)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, p := range eventPipeline.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
logger.Warningf("notify_id: %d, event:%+v, processor:%+v type not found", notifyRuleId, eventCopy, p)
|
||||
continue
|
||||
}
|
||||
processors = append(processors, processor)
|
||||
}
|
||||
}
|
||||
|
||||
for _, processor := range processors {
|
||||
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
|
||||
processor.Process(e.ctx, eventCopy)
|
||||
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
|
||||
if eventCopy == nil {
|
||||
logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// notify
|
||||
for i := range notifyRule.NotifyConfigs {
|
||||
if !NotifyRuleApplicable(¬ifyRule.NotifyConfigs[i], event) {
|
||||
if !NotifyRuleApplicable(¬ifyRule.NotifyConfigs[i], eventCopy) {
|
||||
continue
|
||||
}
|
||||
notifyChannel := e.notifyChannelCache.Get(notifyRule.NotifyConfigs[i].ChannelID)
|
||||
messageTemplate := e.messageTemplateCache.Get(notifyRule.NotifyConfigs[i].TemplateID)
|
||||
if notifyChannel == nil {
|
||||
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{event}, notifyRuleId, fmt.Sprintf("notify_channel_id:%d", notifyRule.NotifyConfigs[i].ChannelID), "", "", errors.New("notify_channel not found"))
|
||||
logger.Warningf("notify_id: %d, event:%+v, channel_id:%d, template_id: %d, notify_channel not found", notifyRuleId, event, notifyRule.NotifyConfigs[i].ChannelID, notifyRule.NotifyConfigs[i].TemplateID)
|
||||
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, fmt.Sprintf("notify_channel_id:%d", notifyRule.NotifyConfigs[i].ChannelID), "", "", errors.New("notify_channel not found"))
|
||||
logger.Warningf("notify_id: %d, event:%+v, channel_id:%d, template_id: %d, notify_channel not found", notifyRuleId, eventCopy, notifyRule.NotifyConfigs[i].ChannelID, notifyRule.NotifyConfigs[i].TemplateID)
|
||||
continue
|
||||
}
|
||||
|
||||
if notifyChannel.RequestType != "flashduty" && messageTemplate == nil {
|
||||
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, template_id: %d, message_template not found", notifyRuleId, notifyChannel.Ident, event, notifyRule.NotifyConfigs[i].TemplateID)
|
||||
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{event}, notifyRuleId, notifyChannel.Name, "", "", errors.New("message_template not found"))
|
||||
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, template_id: %d, message_template not found", notifyRuleId, notifyChannel.Ident, eventCopy, notifyRule.NotifyConfigs[i].TemplateID)
|
||||
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, notifyChannel.Name, "", "", errors.New("message_template not found"))
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// todo go send
|
||||
// todo 聚合 event
|
||||
go e.sendV2([]*models.AlertCurEvent{event}, notifyRuleId, ¬ifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
|
||||
go e.sendV2([]*models.AlertCurEvent{eventCopy}, notifyRuleId, ¬ifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
|
||||
if pipeline == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if !pipeline.FilterEnable {
|
||||
return true
|
||||
}
|
||||
|
||||
tagMatch := true
|
||||
if len(pipeline.LabelFilters) > 0 {
|
||||
for i := range pipeline.LabelFilters {
|
||||
if pipeline.LabelFilters[i].Func == "" {
|
||||
pipeline.LabelFilters[i].Func = pipeline.LabelFilters[i].Op
|
||||
}
|
||||
}
|
||||
|
||||
tagFilters, err := models.ParseTagFilter(pipeline.LabelFilters)
|
||||
if err != nil {
|
||||
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v", err, event, pipeline)
|
||||
return false
|
||||
}
|
||||
tagMatch = common.MatchTags(event.TagsMap, tagFilters)
|
||||
}
|
||||
|
||||
attributesMatch := true
|
||||
if len(pipeline.AttrFilters) > 0 {
|
||||
tagFilters, err := models.ParseTagFilter(pipeline.AttrFilters)
|
||||
if err != nil {
|
||||
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v err:%v", tagFilters, event, pipeline, err)
|
||||
return false
|
||||
}
|
||||
|
||||
attributesMatch = common.MatchTags(event.JsonTagsAndValue(), tagFilters)
|
||||
}
|
||||
|
||||
return tagMatch && attributesMatch
|
||||
}
|
||||
|
||||
func NotifyRuleApplicable(notifyConfig *models.NotifyConfig, event *models.AlertCurEvent) bool {
|
||||
tm := time.Unix(event.TriggerTime, 0)
|
||||
triggerTime := tm.Format("15:04")
|
||||
@@ -359,6 +446,10 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
|
||||
|
||||
switch notifyChannel.RequestType {
|
||||
case "flashduty":
|
||||
if len(flashDutyChannelIDs) == 0 {
|
||||
flashDutyChannelIDs = []int64{0} // 如果 flashduty 通道没有配置,则使用 0, 给 SendFlashDuty 判断使用, 不给 flashduty 传 channel_id 参数
|
||||
}
|
||||
|
||||
for i := range flashDutyChannelIDs {
|
||||
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
|
||||
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
|
||||
@@ -448,8 +539,7 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
|
||||
notifyTarget.AndMerge(handler(rule, event, notifyTarget, e))
|
||||
}
|
||||
|
||||
// 处理事件发送,这里用一个goroutine处理一个event的所有发送事件
|
||||
go e.HandleEventWithNotifyRule(event, isSubscribe)
|
||||
go e.HandleEventWithNotifyRule(event)
|
||||
go e.Send(rule, event, notifyTarget, isSubscribe)
|
||||
|
||||
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event
|
||||
@@ -646,6 +736,11 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
|
||||
}
|
||||
json.Unmarshal([]byte(rule.RuleConfig), &ruleConfig)
|
||||
|
||||
if event.IsRecovered {
|
||||
// 恢复事件不需要走故障自愈的逻辑
|
||||
return
|
||||
}
|
||||
|
||||
for _, t := range ruleConfig.TaskTpls {
|
||||
if t.TplId == 0 {
|
||||
continue
|
||||
|
||||
12
alert/pipeline/pipeline.go
Normal file
12
alert/pipeline/pipeline.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []Processor) {
|
||||
for _, processor := range processors {
|
||||
processor.Process(ctx, event)
|
||||
}
|
||||
}
|
||||
52
alert/pipeline/processor.go
Normal file
52
alert/pipeline/processor.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
)
|
||||
|
||||
// Processor 是处理器接口,所有处理器类型都需要实现此接口
|
||||
type Processor interface {
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *models.AlertCurEvent) // 处理告警事件
|
||||
}
|
||||
|
||||
// NewProcessorFn 创建处理器的函数类型
|
||||
type NewProcessorFn func(settings interface{}) (Processor, error)
|
||||
|
||||
// 处理器注册表,存储各种类型处理器的构造函数
|
||||
var processorRegister = map[string]NewProcessorFn{}
|
||||
|
||||
// // ProcessorTypes 存储所有支持的处理器类型
|
||||
// var Processors map[int64]models.Processor
|
||||
|
||||
// func init() {
|
||||
// Processors = make(map[int64]models.Processor)
|
||||
// }
|
||||
|
||||
// RegisterProcessor 注册处理器类型
|
||||
func RegisterProcessor(typ string, p Processor) {
|
||||
if _, found := processorRegister[typ]; found {
|
||||
return
|
||||
}
|
||||
processorRegister[typ] = p.Init
|
||||
}
|
||||
|
||||
// GetProcessorByType 根据类型获取处理器实例
|
||||
func GetProcessorByType(typ string, settings interface{}) (Processor, error) {
|
||||
typ = strings.TrimSpace(typ)
|
||||
fn, found := processorRegister[typ]
|
||||
if !found {
|
||||
return nil, fmt.Errorf("processor type %s not found", typ)
|
||||
}
|
||||
|
||||
processor, err := fn(settings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return processor, nil
|
||||
}
|
||||
124
alert/pipeline/processor/relabel/relabel.go
Normal file
124
alert/pipeline/processor/relabel/relabel.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package relabel
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
// RelabelConfig
|
||||
type RelabelConfig struct {
|
||||
SourceLabels []string `json:"source_labels"`
|
||||
Separator string `json:"separator"`
|
||||
Regex string `json:"regex"`
|
||||
RegexCompiled *regexp.Regexp
|
||||
If string `json:"if"`
|
||||
IfRegex *regexp.Regexp
|
||||
Modulus uint64 `json:"modulus"`
|
||||
TargetLabel string `json:"target_label"`
|
||||
Replacement string `json:"replacement"`
|
||||
Action string `json:"action"`
|
||||
}
|
||||
|
||||
func Init() {
|
||||
}
|
||||
|
||||
func init() {
|
||||
pipeline.RegisterProcessor("relabel", &RelabelConfig{})
|
||||
}
|
||||
|
||||
func (r *RelabelConfig) Init(settings interface{}) (pipeline.Processor, error) {
|
||||
b, err := json.Marshal(settings)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newProcessor := &RelabelConfig{}
|
||||
err = json.Unmarshal(b, &newProcessor)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newProcessor, nil
|
||||
}
|
||||
|
||||
const (
|
||||
REPLACE_DOT = "___"
|
||||
)
|
||||
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
|
||||
sourceLabels := make([]model.LabelName, len(r.SourceLabels))
|
||||
for i := range r.SourceLabels {
|
||||
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
|
||||
}
|
||||
|
||||
relabelConfigs := []*pconf.RelabelConfig{
|
||||
{
|
||||
SourceLabels: sourceLabels,
|
||||
Separator: r.Separator,
|
||||
Regex: r.Regex,
|
||||
RegexCompiled: r.RegexCompiled,
|
||||
If: r.If,
|
||||
IfRegex: r.IfRegex,
|
||||
Modulus: r.Modulus,
|
||||
TargetLabel: r.TargetLabel,
|
||||
Replacement: r.Replacement,
|
||||
Action: r.Action,
|
||||
},
|
||||
}
|
||||
|
||||
EventRelabel(event, relabelConfigs)
|
||||
}
|
||||
|
||||
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {
|
||||
labels := make([]prompb.Label, len(event.TagsJSON))
|
||||
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
|
||||
for i, tag := range event.TagsJSON {
|
||||
label := strings.SplitN(tag, "=", 2)
|
||||
if len(label) != 2 {
|
||||
continue
|
||||
}
|
||||
event.OriginalTagsJSON[i] = tag
|
||||
|
||||
label[0] = strings.ReplaceAll(string(label[0]), ".", REPLACE_DOT)
|
||||
labels[i] = prompb.Label{Name: label[0], Value: label[1]}
|
||||
}
|
||||
|
||||
for i := 0; i < len(relabelConfigs); i++ {
|
||||
if relabelConfigs[i].Replacement == "" {
|
||||
relabelConfigs[i].Replacement = "$1"
|
||||
}
|
||||
|
||||
if relabelConfigs[i].Separator == "" {
|
||||
relabelConfigs[i].Separator = ";"
|
||||
}
|
||||
|
||||
if relabelConfigs[i].Regex == "" {
|
||||
relabelConfigs[i].Regex = "(.*)"
|
||||
}
|
||||
|
||||
for j := range relabelConfigs[i].SourceLabels {
|
||||
relabelConfigs[i].SourceLabels[j] = model.LabelName(strings.ReplaceAll(string(relabelConfigs[i].SourceLabels[j]), ".", REPLACE_DOT))
|
||||
}
|
||||
}
|
||||
|
||||
gotLabels := writer.Process(labels, relabelConfigs...)
|
||||
event.TagsJSON = make([]string, len(gotLabels))
|
||||
event.TagsMap = make(map[string]string, len(gotLabels))
|
||||
for i, label := range gotLabels {
|
||||
label.Name = strings.ReplaceAll(string(label.Name), REPLACE_DOT, ".")
|
||||
event.TagsJSON[i] = fmt.Sprintf("%s=%s", label.Name, label.Value)
|
||||
event.TagsMap[label.Name] = label.Value
|
||||
}
|
||||
event.Tags = strings.Join(event.TagsJSON, ",,")
|
||||
}
|
||||
@@ -14,14 +14,13 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/alert/dispatch"
|
||||
"github.com/ccfos/nightingale/v6/alert/mute"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/relabel"
|
||||
"github.com/ccfos/nightingale/v6/alert/queue"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
@@ -61,11 +60,9 @@ type Processor struct {
|
||||
pendingsUseByRecover *AlertCurEventMap
|
||||
inhibit bool
|
||||
|
||||
tagsMap map[string]string
|
||||
tagsArr []string
|
||||
target string
|
||||
targetNote string
|
||||
groupName string
|
||||
tagsMap map[string]string
|
||||
tagsArr []string
|
||||
groupName string
|
||||
|
||||
alertRuleCache *memsto.AlertRuleCacheType
|
||||
TargetCache *memsto.TargetCacheType
|
||||
@@ -196,7 +193,7 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
|
||||
|
||||
func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, now int64, ruleHash string) *models.AlertCurEvent {
|
||||
p.fillTags(anomalyPoint)
|
||||
p.mayHandleIdent()
|
||||
|
||||
hash := Hash(p.rule.Id, p.datasourceId, anomalyPoint)
|
||||
ds := p.datasourceCache.GetById(p.datasourceId)
|
||||
var dsName string
|
||||
@@ -216,8 +213,6 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
|
||||
event.DatasourceId = p.datasourceId
|
||||
event.Cluster = dsName
|
||||
event.Hash = hash
|
||||
event.TargetIdent = p.target
|
||||
event.TargetNote = p.targetNote
|
||||
event.TriggerValue = anomalyPoint.ReadableValue()
|
||||
event.TriggerValues = anomalyPoint.Values
|
||||
event.TriggerValuesJson = models.EventTriggerValues{ValuesWithUnit: anomalyPoint.ValuesUnit}
|
||||
@@ -249,15 +244,6 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
|
||||
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)
|
||||
}
|
||||
|
||||
if p.target != "" {
|
||||
if pt, exist := p.TargetCache.Get(p.target); exist {
|
||||
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
|
||||
event.Target = pt
|
||||
} else {
|
||||
logger.Infof("Target[ident: %s] doesn't exist in cache.", p.target)
|
||||
}
|
||||
}
|
||||
|
||||
if event.TriggerValues != "" && strings.Count(event.TriggerValues, "$") > 1 {
|
||||
// TriggerValues 有多个变量,将多个变量都放到 TriggerValue 中
|
||||
event.TriggerValue = event.TriggerValues
|
||||
@@ -271,6 +257,19 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
|
||||
|
||||
// 生成事件之后,立马进程 relabel 处理
|
||||
Relabel(p.rule, event)
|
||||
|
||||
// 放到 Relabel(p.rule, event) 下面,为了处理 relabel 之后,标签里才出现 ident 的情况
|
||||
p.mayHandleIdent(event)
|
||||
|
||||
if event.TargetIdent != "" {
|
||||
if pt, exist := p.TargetCache.Get(event.TargetIdent); exist {
|
||||
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
|
||||
event.Target = pt
|
||||
} else {
|
||||
logger.Infof("fill event target error, ident: %s doesn't exist in cache.", event.TargetIdent)
|
||||
}
|
||||
}
|
||||
|
||||
return event
|
||||
}
|
||||
|
||||
@@ -279,44 +278,15 @@ func Relabel(rule *models.AlertRule, event *models.AlertCurEvent) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(rule.EventRelabelConfig) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// need to keep the original label
|
||||
event.OriginalTags = event.Tags
|
||||
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
|
||||
|
||||
labels := make([]prompb.Label, len(event.TagsJSON))
|
||||
for i, tag := range event.TagsJSON {
|
||||
label := strings.SplitN(tag, "=", 2)
|
||||
event.OriginalTagsJSON[i] = tag
|
||||
labels[i] = prompb.Label{Name: label[0], Value: label[1]}
|
||||
if len(rule.EventRelabelConfig) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < len(rule.EventRelabelConfig); i++ {
|
||||
if rule.EventRelabelConfig[i].Replacement == "" {
|
||||
rule.EventRelabelConfig[i].Replacement = "$1"
|
||||
}
|
||||
|
||||
if rule.EventRelabelConfig[i].Separator == "" {
|
||||
rule.EventRelabelConfig[i].Separator = ";"
|
||||
}
|
||||
|
||||
if rule.EventRelabelConfig[i].Regex == "" {
|
||||
rule.EventRelabelConfig[i].Regex = "(.*)"
|
||||
}
|
||||
}
|
||||
|
||||
// relabel process
|
||||
relabels := writer.Process(labels, rule.EventRelabelConfig...)
|
||||
event.TagsJSON = make([]string, len(relabels))
|
||||
event.TagsMap = make(map[string]string, len(relabels))
|
||||
for i, label := range relabels {
|
||||
event.TagsJSON[i] = fmt.Sprintf("%s=%s", label.Name, label.Value)
|
||||
event.TagsMap[label.Name] = label.Value
|
||||
}
|
||||
event.Tags = strings.Join(event.TagsJSON, ",,")
|
||||
relabel.EventRelabel(event, rule.EventRelabelConfig)
|
||||
}
|
||||
|
||||
func (p *Processor) HandleRecover(alertingKeys map[string]struct{}, now int64, inhibit bool) {
|
||||
@@ -641,19 +611,19 @@ func (p *Processor) fillTags(anomalyPoint models.AnomalyPoint) {
|
||||
p.tagsArr = labelMapToArr(tagsMap)
|
||||
}
|
||||
|
||||
func (p *Processor) mayHandleIdent() {
|
||||
func (p *Processor) mayHandleIdent(event *models.AlertCurEvent) {
|
||||
// handle ident
|
||||
if ident, has := p.tagsMap["ident"]; has {
|
||||
if ident, has := event.TagsMap["ident"]; has {
|
||||
if target, exists := p.TargetCache.Get(ident); exists {
|
||||
p.target = target.Ident
|
||||
p.targetNote = target.Note
|
||||
event.TargetIdent = target.Ident
|
||||
event.TargetNote = target.Note
|
||||
} else {
|
||||
p.target = ident
|
||||
p.targetNote = ""
|
||||
event.TargetIdent = ident
|
||||
event.TargetNote = ""
|
||||
}
|
||||
} else {
|
||||
p.target = ""
|
||||
p.targetNote = ""
|
||||
event.TargetIdent = ""
|
||||
event.TargetNote = ""
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -30,12 +30,14 @@ type IbexCallBacker struct {
|
||||
|
||||
func (c *IbexCallBacker) CallBack(ctx CallBackContext) {
|
||||
if len(ctx.CallBackURL) == 0 || len(ctx.Events) == 0 {
|
||||
logger.Warningf("event_callback_ibex: url or events is empty, url: %s, events: %+v", ctx.CallBackURL, ctx.Events)
|
||||
return
|
||||
}
|
||||
|
||||
event := ctx.Events[0]
|
||||
|
||||
if event.IsRecovered {
|
||||
logger.Infof("event_callback_ibex: event is recovered, event: %+v", event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -43,8 +45,9 @@ func (c *IbexCallBacker) CallBack(ctx CallBackContext) {
|
||||
}
|
||||
|
||||
func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent) {
|
||||
logger.Infof("event_callback_ibex: url: %s, event: %+v", url, event)
|
||||
if imodels.DB() == nil && ctx.IsCenter {
|
||||
logger.Warning("event_callback_ibex: db is nil")
|
||||
logger.Warningf("event_callback_ibex: db is nil, event: %+v", event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -63,17 +66,23 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
|
||||
|
||||
id, err := strconv.ParseInt(idstr, 10, 64)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: failed to parse url: %s", url)
|
||||
logger.Errorf("event_callback_ibex: failed to parse url: %s event: %+v", url, event)
|
||||
return
|
||||
}
|
||||
|
||||
if host == "" {
|
||||
// 用户在callback url中没有传入host,就从event中解析
|
||||
host = event.TargetIdent
|
||||
|
||||
if host == "" {
|
||||
if ident, has := event.TagsMap["ident"]; has {
|
||||
host = ident
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if host == "" {
|
||||
logger.Error("event_callback_ibex: failed to get host")
|
||||
logger.Errorf("event_callback_ibex: failed to get host, id: %d, event: %+v", id, event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -83,21 +92,23 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
|
||||
func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
|
||||
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
|
||||
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
|
||||
|
||||
tpl := taskTplCache.Get(id)
|
||||
if tpl == nil {
|
||||
logger.Errorf("event_callback_ibex: no such tpl(%d)", id)
|
||||
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
|
||||
return
|
||||
}
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
if !can {
|
||||
logger.Errorf("event_callback_ibex: user(%s) no permission", tpl.UpdateBy)
|
||||
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -122,7 +133,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
tags, err := json.Marshal(tagsMap)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v", tagsMap)
|
||||
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -145,7 +156,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
|
||||
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: call ibex fail: %v", err)
|
||||
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -167,7 +178,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
|
||||
}
|
||||
|
||||
if err = record.Add(ctx); err != nil {
|
||||
logger.Errorf("event_callback_ibex: persist task_record fail: %v", err)
|
||||
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +198,7 @@ func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *m
|
||||
|
||||
func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) {
|
||||
if storage.Cache == nil {
|
||||
logger.Warning("event_callback_ibex: redis cache is nil")
|
||||
logger.Warningf("event_callback_ibex: redis cache is nil, task: %+v", f)
|
||||
return 0, fmt.Errorf("redis cache is nil")
|
||||
}
|
||||
|
||||
|
||||
@@ -142,6 +142,13 @@ ops:
|
||||
cname: View Logs
|
||||
- name: "/log/index-patterns"
|
||||
cname: View Index Patterns
|
||||
- name: "/log/index-patterns/add"
|
||||
cname: Add Index Pattern
|
||||
- name: "/log/index-patterns/put"
|
||||
cname: Modify Index Pattern
|
||||
- name: "/log/index-patterns/del"
|
||||
cname: Delete Index Pattern
|
||||
|
||||
|
||||
- name: alert
|
||||
cname: Alert Rules
|
||||
@@ -323,6 +330,18 @@ ops:
|
||||
- name: "/notification-rules/del"
|
||||
cname: Delete Notify Rules
|
||||
|
||||
- name: event-pipelines
|
||||
cname: Event Pipelines
|
||||
ops:
|
||||
- name: "/event-pipelines"
|
||||
cname: View Event Pipelines
|
||||
- name: "/event-pipelines/add"
|
||||
cname: Add Event Pipeline
|
||||
- name: "/event-pipelines/put"
|
||||
cname: Modify Event Pipeline
|
||||
- name: "/event-pipelines/del"
|
||||
cname: Delete Event Pipeline
|
||||
|
||||
- name: notify-channels
|
||||
cname: Notify Channels
|
||||
ops:
|
||||
|
||||
@@ -481,9 +481,9 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
pages.GET("/es-index-pattern", rt.auth(), rt.esIndexPatternGet)
|
||||
pages.GET("/es-index-pattern-list", rt.auth(), rt.esIndexPatternGetList)
|
||||
pages.POST("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternAdd)
|
||||
pages.PUT("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternPut)
|
||||
pages.DELETE("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternDel)
|
||||
pages.POST("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/add"), rt.esIndexPatternAdd)
|
||||
pages.PUT("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/put"), rt.esIndexPatternPut)
|
||||
pages.DELETE("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/del"), rt.esIndexPatternDel)
|
||||
|
||||
pages.GET("/embedded-dashboards", rt.auth(), rt.user(), rt.perm("/embedded-dashboards"), rt.embeddedDashboardsGet)
|
||||
pages.PUT("/embedded-dashboards", rt.auth(), rt.user(), rt.perm("/embedded-dashboards/put"), rt.embeddedDashboardsPut)
|
||||
@@ -527,6 +527,16 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/notify-rules", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRulesGet)
|
||||
pages.POST("/notify-rule/test", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyTest)
|
||||
pages.GET("/notify-rule/custom-params", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRuleCustomParamsGet)
|
||||
pages.POST("/notify-rule/event-pipelines-tryrun", rt.auth(), rt.user(), rt.perm("/notification-rules/add"), rt.tryRunEventProcessorByNotifyRule)
|
||||
|
||||
// 事件Pipeline相关路由
|
||||
pages.GET("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.eventPipelinesList)
|
||||
pages.POST("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/add"), rt.addEventPipeline)
|
||||
pages.PUT("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/put"), rt.updateEventPipeline)
|
||||
pages.GET("/event-pipeline/:id", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.getEventPipeline)
|
||||
pages.DELETE("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines/del"), rt.deleteEventPipelines)
|
||||
pages.POST("/event-pipeline-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventPipeline)
|
||||
pages.POST("/event-processor-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventProcessor)
|
||||
|
||||
pages.POST("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/add"), rt.notifyChannelsAdd)
|
||||
pages.DELETE("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/del"), rt.notifyChannelsDel)
|
||||
@@ -647,6 +657,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
service.GET("/message-templates", rt.messageTemplateGets)
|
||||
|
||||
service.GET("/event-pipelines", rt.eventPipelinesListByService)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -123,7 +123,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
}
|
||||
err = req.Add(rt.Ctx)
|
||||
} else {
|
||||
err = req.Update(rt.Ctx, "name", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
|
||||
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
|
||||
}
|
||||
|
||||
Render(c, nil, err)
|
||||
|
||||
217
center/router/router_event_pipeline.go
Normal file
217
center/router/router_event_pipeline.go
Normal file
@@ -0,0 +1,217 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
// 获取事件Pipeline列表
|
||||
func (rt *Router) eventPipelinesList(c *gin.Context) {
|
||||
me := c.MustGet("user").(*models.User)
|
||||
pipelines, err := models.ListEventPipelines(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
allTids := make([]int64, 0)
|
||||
for _, pipeline := range pipelines {
|
||||
allTids = append(allTids, pipeline.TeamIds...)
|
||||
}
|
||||
ugMap, err := models.UserGroupIdAndNameMap(rt.Ctx, allTids)
|
||||
ginx.Dangerous(err)
|
||||
for _, pipeline := range pipelines {
|
||||
for _, tid := range pipeline.TeamIds {
|
||||
pipeline.TeamNames = append(pipeline.TeamNames, ugMap[tid])
|
||||
}
|
||||
}
|
||||
|
||||
gids, err := models.MyGroupIdsMap(rt.Ctx, me.Id)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
if me.IsAdmin() {
|
||||
ginx.NewRender(c).Data(pipelines, nil)
|
||||
return
|
||||
}
|
||||
|
||||
res := make([]*models.EventPipeline, 0)
|
||||
for _, pipeline := range pipelines {
|
||||
for _, tid := range pipeline.TeamIds {
|
||||
if _, ok := gids[tid]; ok {
|
||||
res = append(res, pipeline)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(res, nil)
|
||||
}
|
||||
|
||||
// 获取单个事件Pipeline详情
|
||||
func (rt *Router) getEventPipeline(c *gin.Context) {
|
||||
me := c.MustGet("user").(*models.User)
|
||||
id := ginx.UrlParamInt64(c, "id")
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, id)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
|
||||
err = pipeline.FillTeamNames(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(pipeline, nil)
|
||||
}
|
||||
|
||||
// 创建事件Pipeline
|
||||
func (rt *Router) addEventPipeline(c *gin.Context) {
|
||||
var pipeline models.EventPipeline
|
||||
ginx.BindJSON(c, &pipeline)
|
||||
|
||||
user := c.MustGet("user").(*models.User)
|
||||
now := time.Now().Unix()
|
||||
pipeline.CreateBy = user.Username
|
||||
pipeline.CreateAt = now
|
||||
pipeline.UpdateAt = now
|
||||
pipeline.UpdateBy = user.Username
|
||||
|
||||
err := pipeline.Verify()
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
ginx.Dangerous(user.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
err = models.CreateEventPipeline(rt.Ctx, &pipeline)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
// 更新事件Pipeline
|
||||
func (rt *Router) updateEventPipeline(c *gin.Context) {
|
||||
var f models.EventPipeline
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
f.UpdateBy = me.Username
|
||||
f.UpdateAt = time.Now().Unix()
|
||||
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, f.ID)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusNotFound, "No such event pipeline")
|
||||
}
|
||||
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
|
||||
ginx.NewRender(c).Message(pipeline.Update(rt.Ctx, &f))
|
||||
}
|
||||
|
||||
// 删除事件Pipeline
|
||||
func (rt *Router) deleteEventPipelines(c *gin.Context) {
|
||||
var f struct {
|
||||
Ids []int64 `json:"ids"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
if len(f.Ids) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "ids required")
|
||||
}
|
||||
|
||||
me := c.MustGet("user").(*models.User)
|
||||
for _, id := range f.Ids {
|
||||
pipeline, err := models.GetEventPipeline(rt.Ctx, id)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
|
||||
}
|
||||
|
||||
err := models.DeleteEventPipelines(rt.Ctx, f.Ids)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
// 测试事件Pipeline
|
||||
func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
PipelineConfig models.EventPipeline `json:"pipeline_config"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
if err != nil || hisEvent == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event not found")
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
for _, p := range f.PipelineConfig.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found, error: %s", p, err.Error())
|
||||
}
|
||||
processor.Process(rt.Ctx, event)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(event, nil)
|
||||
}
|
||||
|
||||
// 测试事件处理器
|
||||
func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
ProcessorConfig models.Processor `json:"processor_config"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
if err != nil || hisEvent == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event not found")
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
processor, err := pipeline.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor type: %s, error: %s", f.ProcessorConfig.Typ, err.Error())
|
||||
}
|
||||
processor.Process(rt.Ctx, event)
|
||||
|
||||
ginx.NewRender(c).Data(event, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
var f struct {
|
||||
EventId int64 `json:"event_id"`
|
||||
PipelineConfigs []models.PipelineConfig `json:"pipeline_configs"`
|
||||
}
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
if err != nil || hisEvent == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event not found")
|
||||
}
|
||||
event := hisEvent.ToCur()
|
||||
|
||||
pids := make([]int64, 0)
|
||||
for _, pc := range f.PipelineConfigs {
|
||||
if pc.Enable {
|
||||
pids = append(pids, pc.PipelineId)
|
||||
}
|
||||
}
|
||||
|
||||
pipelines, err := models.GetEventPipelinesByIds(rt.Ctx, pids)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processors not found")
|
||||
}
|
||||
|
||||
for _, pl := range pipelines {
|
||||
for _, p := range pl.Processors {
|
||||
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
|
||||
}
|
||||
processor.Process(rt.Ctx, event)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(event, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) eventPipelinesListByService(c *gin.Context) {
|
||||
pipelines, err := models.ListEventPipelines(rt.Ctx)
|
||||
ginx.NewRender(c).Data(pipelines, err)
|
||||
}
|
||||
@@ -40,6 +40,10 @@ func (rt *Router) statistic(c *gin.Context) {
|
||||
model = models.NotifyRule{}
|
||||
case "notify_channel":
|
||||
model = models.NotifyChannel{}
|
||||
case "event_pipeline":
|
||||
statistics, err = models.EventPipelineStatistics(rt.Ctx)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
return
|
||||
case "datasource":
|
||||
// datasource update_at is different from others
|
||||
statistics, err = models.DatasourceStatistics(rt.Ctx)
|
||||
|
||||
@@ -3,6 +3,7 @@ package router
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dscache"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
@@ -38,71 +39,116 @@ type LogResp struct {
|
||||
List []interface{} `json:"list"`
|
||||
}
|
||||
|
||||
func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
var f QueryFrom
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
func QueryLogBatchConcurrently(anonymousAccess bool, ctx *gin.Context, f QueryFrom) (LogResp, error) {
|
||||
var resp LogResp
|
||||
var errMsg string
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
|
||||
for _, q := range f.Queries {
|
||||
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, q.Did, q.DsCate, q) {
|
||||
ginx.Bomb(200, "no permission")
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, q.Did, q.DsCate, q) {
|
||||
return LogResp{}, fmt.Errorf("no permission")
|
||||
}
|
||||
|
||||
plug, exists := dscache.DsCache.Get(q.DsCate, q.Did)
|
||||
if !exists {
|
||||
logger.Warningf("cluster:%d not exists query:%+v", q.Did, q)
|
||||
ginx.Bomb(200, "cluster not exists")
|
||||
return LogResp{}, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
data, total, err := plug.QueryLog(c.Request.Context(), q.Query)
|
||||
if err != nil {
|
||||
errMsg += fmt.Sprintf("query data error: %v query:%v\n ", err, q)
|
||||
logger.Warningf("query data error: %v query:%v", err, q)
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(query Query) {
|
||||
defer wg.Done()
|
||||
|
||||
m := make(map[string]interface{})
|
||||
m["ref"] = q.Ref
|
||||
m["ds_id"] = q.Did
|
||||
m["ds_cate"] = q.DsCate
|
||||
m["data"] = data
|
||||
resp.List = append(resp.List, m)
|
||||
resp.Total += total
|
||||
data, total, err := plug.QueryLog(ctx.Request.Context(), query.Query)
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("query data error: %v query:%v\n ", err, query)
|
||||
logger.Warningf(errMsg)
|
||||
errs = append(errs, err)
|
||||
return
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
m["ref"] = query.Ref
|
||||
m["ds_id"] = query.Did
|
||||
m["ds_cate"] = query.DsCate
|
||||
m["data"] = data
|
||||
|
||||
resp.List = append(resp.List, m)
|
||||
resp.Total += total
|
||||
}(q)
|
||||
}
|
||||
|
||||
if errMsg != "" || len(resp.List) == 0 {
|
||||
ginx.Bomb(200, errMsg)
|
||||
wg.Wait()
|
||||
|
||||
if len(errs) > 0 {
|
||||
return LogResp{}, errs[0]
|
||||
}
|
||||
|
||||
if len(resp.List) == 0 {
|
||||
return LogResp{}, fmt.Errorf("no data")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
var f QueryFrom
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
resp, err := QueryLogBatchConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
|
||||
if err != nil {
|
||||
ginx.Bomb(200, "err:%v", err)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(resp, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) QueryData(c *gin.Context) {
|
||||
var f models.QueryParam
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
|
||||
var resp []models.DataResp
|
||||
var err error
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
|
||||
for _, q := range f.Querys {
|
||||
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, f.DatasourceId, f.Cate, q) {
|
||||
ginx.Bomb(403, "no permission")
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
return nil, fmt.Errorf("no permission")
|
||||
}
|
||||
|
||||
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
|
||||
if !exists {
|
||||
logger.Warningf("cluster:%d not exists", f.DatasourceId)
|
||||
ginx.Bomb(200, "cluster not exists")
|
||||
return nil, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
var datas []models.DataResp
|
||||
datas, err = plug.QueryData(c.Request.Context(), q)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", q, err)
|
||||
ginx.Bomb(200, "err:%v", err)
|
||||
}
|
||||
logger.Debugf("query data: req:%+v resp:%+v", q, datas)
|
||||
resp = append(resp, datas...)
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
datas, err := plug.QueryData(ctx.Request.Context(), query)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", query, err)
|
||||
mu.Lock()
|
||||
errs = append(errs, err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugf("query data: req:%+v resp:%+v", query, datas)
|
||||
mu.Lock()
|
||||
resp = append(resp, datas...)
|
||||
mu.Unlock()
|
||||
}(q)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if len(errs) > 0 {
|
||||
return nil, errs[0]
|
||||
}
|
||||
|
||||
// 面向API的统一处理
|
||||
// 按照 .Metric 排序
|
||||
// 确保仪表盘中相同图例的曲线颜色相同
|
||||
@@ -115,41 +161,80 @@ func (rt *Router) QueryData(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (rt *Router) QueryData(c *gin.Context) {
|
||||
var f models.QueryParam
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
resp, err := QueryDataConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
|
||||
if err != nil {
|
||||
ginx.Bomb(200, "err:%v", err)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(resp, nil)
|
||||
}
|
||||
|
||||
// QueryLogConcurrently 并发查询日志
|
||||
func QueryLogConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) (LogResp, error) {
|
||||
var resp LogResp
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
|
||||
for _, q := range f.Querys {
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
return LogResp{}, fmt.Errorf("no permission")
|
||||
}
|
||||
|
||||
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
|
||||
if !exists {
|
||||
logger.Warningf("cluster:%d not exists query:%+v", f.DatasourceId, f)
|
||||
return LogResp{}, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
data, total, err := plug.QueryLog(ctx.Request.Context(), query)
|
||||
logger.Debugf("query log: req:%+v resp:%+v", query, data)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("query data error: %v query:%v\n ", err, query)
|
||||
logger.Warningf(errMsg)
|
||||
mu.Lock()
|
||||
errs = append(errs, err)
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
resp.List = append(resp.List, data...)
|
||||
resp.Total += total
|
||||
mu.Unlock()
|
||||
}(q)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if len(errs) > 0 {
|
||||
return LogResp{}, errs[0]
|
||||
}
|
||||
|
||||
if len(resp.List) == 0 {
|
||||
return LogResp{}, fmt.Errorf("no data")
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (rt *Router) QueryLogV2(c *gin.Context) {
|
||||
var f models.QueryParam
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
var resp LogResp
|
||||
var errMsg string
|
||||
for _, q := range f.Querys {
|
||||
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, f.DatasourceId, f.Cate, q) {
|
||||
ginx.Bomb(200, "no permission")
|
||||
}
|
||||
|
||||
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
|
||||
if !exists {
|
||||
logger.Warningf("cluster:%d not exists query:%+v", f.DatasourceId, f)
|
||||
ginx.Bomb(200, "cluster not exists")
|
||||
}
|
||||
|
||||
data, total, err := plug.QueryLog(c.Request.Context(), q)
|
||||
if err != nil {
|
||||
errMsg += fmt.Sprintf("query data error: %v query:%v\n ", err, q)
|
||||
logger.Warningf("query data error: %v query:%v", err, q)
|
||||
continue
|
||||
}
|
||||
resp.List = append(resp.List, data...)
|
||||
resp.Total += total
|
||||
}
|
||||
|
||||
if errMsg != "" || len(resp.List) == 0 {
|
||||
ginx.Bomb(200, errMsg)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(resp, nil)
|
||||
resp, err := QueryLogConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
}
|
||||
|
||||
func (rt *Router) QueryLog(c *gin.Context) {
|
||||
|
||||
@@ -24,6 +24,7 @@ type Query struct {
|
||||
Index string `json:"index" mapstructure:"index"`
|
||||
IndexPatternId int64 `json:"index_pattern" mapstructure:"index_pattern"`
|
||||
Filter string `json:"filter" mapstructure:"filter"`
|
||||
Offset int64 `json:"offset" mapstructure:"offset"`
|
||||
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
|
||||
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
|
||||
DateField string `json:"date_field" mapstructure:"date_field"`
|
||||
@@ -372,6 +373,11 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
|
||||
start = start - delay
|
||||
}
|
||||
|
||||
if param.Offset > 0 {
|
||||
end = end - param.Offset
|
||||
start = start - param.Offset
|
||||
}
|
||||
|
||||
q.Gte(time.Unix(start, 0).UnixMilli())
|
||||
q.Lte(time.Unix(end, 0).UnixMilli())
|
||||
q.Format("epoch_millis")
|
||||
|
||||
@@ -537,7 +537,7 @@ CREATE TABLE `builtin_components` (
|
||||
`updated_by` varchar(191) NOT NULL DEFAULT '' COMMENT '''updater''',
|
||||
`disabled` int NOT NULL DEFAULT 0 COMMENT '''is disabled or not''',
|
||||
PRIMARY KEY (`id`),
|
||||
UNIQUE KEY `idx_ident` (`ident`)
|
||||
KEY (`ident`)
|
||||
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||
|
||||
CREATE TABLE `builtin_payloads` (
|
||||
@@ -793,6 +793,7 @@ CREATE TABLE `notify_rule` (
|
||||
`enable` tinyint(1) not null default 0,
|
||||
`user_group_ids` varchar(255) not null default '',
|
||||
`notify_configs` text,
|
||||
`pipeline_configs` text,
|
||||
`create_at` bigint not null default 0,
|
||||
`create_by` varchar(64) not null default '',
|
||||
`update_at` bigint not null default 0,
|
||||
@@ -833,6 +834,22 @@ CREATE TABLE `message_template` (
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
|
||||
|
||||
CREATE TABLE `event_pipeline` (
|
||||
`id` bigint unsigned not null auto_increment,
|
||||
`name` varchar(128) not null,
|
||||
`team_ids` text,
|
||||
`description` varchar(255) not null default '',
|
||||
`filter_enable` tinyint(1) not null default 0,
|
||||
`label_filters` text,
|
||||
`attribute_filters` text,
|
||||
`processors` text,
|
||||
`create_at` bigint not null default 0,
|
||||
`create_by` varchar(64) not null default '',
|
||||
`update_at` bigint not null default 0,
|
||||
`update_by` varchar(64) not null default '',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
|
||||
|
||||
CREATE TABLE `task_meta`
|
||||
(
|
||||
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
|
||||
@@ -2189,4 +2206,4 @@ CREATE TABLE task_host_99
|
||||
UNIQUE KEY `idx_id_host` (`id`, `host`),
|
||||
PRIMARY KEY (`ii`)
|
||||
) ENGINE = InnoDB
|
||||
DEFAULT CHARSET = utf8mb4;
|
||||
DEFAULT CHARSET = utf8mb4;
|
||||
|
||||
@@ -226,3 +226,22 @@ ALTER TABLE `notify_channel` ADD COLUMN `weight` int not null default 0;
|
||||
/* v8.0.0-beta.11 2025-04-10 */
|
||||
ALTER TABLE `es_index_pattern` ADD COLUMN `note` varchar(1024) not null default '';
|
||||
ALTER TABLE `datasource` ADD COLUMN `identifier` varchar(255) not null default '';
|
||||
|
||||
/* v8.0.0-beta.11 2025-05-15 */
|
||||
ALTER TABLE `notify_rule` ADD COLUMN `pipeline_configs` text;
|
||||
|
||||
CREATE TABLE `event_pipeline` (
|
||||
`id` bigint unsigned not null auto_increment,
|
||||
`name` varchar(128) not null,
|
||||
`team_ids` text,
|
||||
`description` varchar(255) not null default '',
|
||||
`filter_enable` tinyint(1) not null default 0,
|
||||
`label_filters` text,
|
||||
`attribute_filters` text,
|
||||
`processors` text,
|
||||
`create_at` bigint not null default 0,
|
||||
`create_by` varchar(64) not null default '',
|
||||
`update_at` bigint not null default 0,
|
||||
`update_by` varchar(64) not null default '',
|
||||
PRIMARY KEY (`id`)
|
||||
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
|
||||
@@ -47,6 +47,7 @@ var PromDefaultDatasourceId int64
|
||||
func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
for {
|
||||
if !fromAPI {
|
||||
foundDefaultDatasource := false
|
||||
items, err := models.GetDatasources(ctx)
|
||||
if err != nil {
|
||||
logger.Errorf("get datasource from database fail: %v", err)
|
||||
@@ -58,6 +59,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
for _, item := range items {
|
||||
if item.PluginType == "prometheus" && item.IsDefault {
|
||||
atomic.StoreInt64(&PromDefaultDatasourceId, item.Id)
|
||||
foundDefaultDatasource = true
|
||||
}
|
||||
|
||||
logger.Debugf("get datasource: %+v", item)
|
||||
@@ -90,6 +92,12 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
}
|
||||
dss = append(dss, ds)
|
||||
}
|
||||
|
||||
if !foundDefaultDatasource && atomic.LoadInt64(&PromDefaultDatasourceId) != 0 {
|
||||
logger.Debugf("no default datasource found")
|
||||
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
|
||||
}
|
||||
|
||||
PutDatasources(dss)
|
||||
} else {
|
||||
FromAPIHook()
|
||||
@@ -183,7 +191,14 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
ids = append(ids, item.Id)
|
||||
|
||||
// 异步初始化 client 不然数据源同步的会很慢
|
||||
go DsCache.Put(typ, item.Id, ds)
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logger.Errorf("panic in datasource item: %+v panic:%v", item, r)
|
||||
}
|
||||
}()
|
||||
DsCache.Put(typ, item.Id, ds)
|
||||
}()
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"name": " Kubernetes-Deployment/ Container",
|
||||
"name": "Kubernetes / Deployment / Container",
|
||||
"tags": "Categraf",
|
||||
"configs": {
|
||||
"panels": [
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"id": 0,
|
||||
"group_id": 0,
|
||||
"name": "Kubernetes / Container",
|
||||
"name": "Kubernetes / Pod",
|
||||
"ident": "",
|
||||
"tags": "Categraf",
|
||||
"create_at": 0,
|
||||
@@ -1748,20 +1748,34 @@
|
||||
],
|
||||
"var": [
|
||||
{
|
||||
"definition": "prometheus",
|
||||
"name": "datasource",
|
||||
"type": "datasource"
|
||||
"type": "datasource",
|
||||
"definition": "prometheus",
|
||||
"defaultValue": 40
|
||||
},
|
||||
{
|
||||
"name": "namespace",
|
||||
"type": "query",
|
||||
"hide": false,
|
||||
"datasource": {
|
||||
"cate": "prometheus",
|
||||
"value": "${datasource}"
|
||||
},
|
||||
"definition": "label_values(container_cpu_usage_seconds_total, pod)",
|
||||
"multi": false,
|
||||
"name": "pod_name",
|
||||
"definition": "label_values(container_cpu_usage_seconds_total, namespace)",
|
||||
"reg": "",
|
||||
"type": "query"
|
||||
"multi": false
|
||||
},
|
||||
{
|
||||
"name": "pod_name",
|
||||
"type": "query",
|
||||
"hide": false,
|
||||
"datasource": {
|
||||
"cate": "prometheus",
|
||||
"value": "${datasource}"
|
||||
},
|
||||
"definition": "label_values(container_cpu_usage_seconds_total{namespace=\"$namespace\"}, pod)",
|
||||
"reg": "",
|
||||
"multi": false
|
||||
}
|
||||
],
|
||||
"version": "3.0.0"
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"name": " Kubernetes-Statefulset / Container ",
|
||||
"name": "Kubernetes / Statefulset / Container ",
|
||||
"tags": "Categraf",
|
||||
"configs": {
|
||||
"panels": [
|
||||
|
||||
342
integrations/Kubernetes/metrics/k8s-node.json
Normal file
342
integrations/Kubernetes/metrics/k8s-node.json
Normal file
@@ -0,0 +1,342 @@
|
||||
[
|
||||
{
|
||||
"uuid": 1745735239727485700,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "TCP当前连接数",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_netstat_Tcp_CurrEstab * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239701096000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件描述符使用数",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_filefd_allocated * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239704160000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件描述符最大限制",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_filefd_maximum * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239750006800,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件系统inode使用率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: -",
|
||||
"lang": "zh_CN",
|
||||
"expression": "100 - (node_filesystem_files_free * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} / node_filesystem_files * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} * 100)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239746991600,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件系统使用率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: -",
|
||||
"lang": "zh_CN",
|
||||
"expression": "100 - ((node_filesystem_avail_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} * 100) / node_filesystem_size_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239753550000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件系统错误数",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(node_filesystem_device_error * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}) by (mountpoint)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239743097300,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "磁盘IO使用率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_disk_io_now[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239740169500,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "磁盘写入IOPS",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_disk_writes_completed_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239734228700,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "磁盘写入速率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_disk_written_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239737122600,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "磁盘读取IOPS",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_disk_reads_completed_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239730406000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "磁盘读取速率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_disk_read_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239694202600,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "系统上下文切换率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_context_switches_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239697167400,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "系统中断率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "rate(node_intr_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239724650200,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送丢包率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_transmit_drop_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239710266000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送带宽",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_transmit_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239716205000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送错误率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_transmit_errs_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239721688800,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收丢包率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_receive_drop_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239707241500,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收带宽",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_receive_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239713318000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收错误率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(node_network_receive_errs_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239783181800,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络连接跟踪条目数",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_nf_conntrack_entries * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239786134000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络连接跟踪限制",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_nf_conntrack_entries_limit * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239675145700,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点 CPU 使用率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: by",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum by (instance) (rate(node_cpu_seconds_total{mode!~\"idle|iowait|steal\"}[5m])) * on(instance) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} *100"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239691192000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点15分钟负载",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_load15 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239685264100,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点1分钟负载",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_load1 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239688232700,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点5分钟负载",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_load5 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239776256800,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点Swap使用量",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_SwapTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} - node_memory_SwapFree_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239779806500,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点Swap总量",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_SwapTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239681529300,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点上运行的Pod数量",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(kube_pod_info * on(node) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239678397700,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存使用率",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} - node_memory_MemAvailable_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}) / sum(node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239760507400,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存详细信息 - 可用",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_MemAvailable_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239756641800,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存详细信息 - 总量",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239772786200,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存详细信息 - 空闲",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_MemFree_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239769542000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存详细信息 - 缓冲区",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_Buffers_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
},
|
||||
{
|
||||
"uuid": 1745735239764136000,
|
||||
"collector": "Node",
|
||||
"typ": "Kubernetes",
|
||||
"name": "节点内存详细信息 - 缓存",
|
||||
"unit": "",
|
||||
"note": "节点指标\n类型: *",
|
||||
"lang": "zh_CN",
|
||||
"expression": "node_memory_Cached_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
|
||||
}
|
||||
]
|
||||
282
integrations/Kubernetes/metrics/k8s-pod.json
Normal file
282
integrations/Kubernetes/metrics/k8s-pod.json
Normal file
@@ -0,0 +1,282 @@
|
||||
[
|
||||
{
|
||||
"uuid": 1745893024149445000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "Inode数量",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_fs_inodes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024121015300,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "不可中断任务数量",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_tasks_state{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\", state=\"uninterruptible\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024130551800,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器cache使用",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "(sum(container_memory_cache{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024108569900,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器CPU Limit",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "(sum(container_spec_cpu_quota{namespace=\"$namespace\", pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\", pod=~\"$pod_name\"}) by (name))"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024112672500,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器CPU load 10",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_cpu_load_average_10s{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024026246700,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器CPU使用率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_cpu_usage_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024029544000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器CPU归一化后使用率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_cpu_usage_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)/((sum(container_spec_cpu_quota{namespace=\"$namespace\", pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\", pod=~\"$pod_name\"}) by (name)))"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024146207700,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器I/O",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_fs_io_current{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024136457000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器RSS内存使用",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "(sum(container_memory_rss{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024139900200,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器内存 Limit",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_spec_memory_limit_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024032984300,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器内存使用",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "(sum(container_memory_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024127585500,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器内存使用率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "((sum(container_memory_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)) /(sum(container_spec_memory_limit_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)))*100"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024093620000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器内核态CPU使用率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_cpu_system_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024102879000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器发生CPU throttle的比率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_cpu_cfs_throttled_periods_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m]))by(name) *100"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024143177000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器发生OOM次数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_oom_events_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024083942000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器启动时长(小时)",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum((time()-container_start_time_seconds{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"})) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024152466200,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器已使用的文件系统大小",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(container_fs_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024097849600,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "容器用户态CPU使用率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_cpu_user_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024036896800,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件系统写入速率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_fs_writes_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])) by(name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024057722000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "文件系统读取速率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_fs_reads_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])) by(name)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024166898000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送丢包数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_transmit_packets_dropped_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024160266500,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送数据包",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_transmit_packets_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024069935000,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送速率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_transmit_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024163721700,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络发送错误数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_transmit_errors_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024173485600,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收丢包数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_receive_packets_dropped_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024156389600,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收数据包数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_receive_packets_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024075864800,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收速率",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_receive_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
},
|
||||
{
|
||||
"uuid": 1745893024170233300,
|
||||
"collector": "Pod",
|
||||
"typ": "Kubernetes",
|
||||
"name": "网络接收错误数",
|
||||
"unit": "",
|
||||
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
|
||||
"lang": "zh_CN",
|
||||
"expression": "sum(rate(container_network_receive_errors_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
|
||||
}
|
||||
]
|
||||
8
integrations/Linux/collect/ntp/ntp.toml
Normal file
8
integrations/Linux/collect/ntp/ntp.toml
Normal file
@@ -0,0 +1,8 @@
|
||||
# # collect interval
|
||||
# interval = 15
|
||||
|
||||
# # ntp servers
|
||||
# ntp_servers = ["ntp.aliyun.com"]
|
||||
|
||||
# # response time out seconds
|
||||
# timeout = 5
|
||||
@@ -53,4 +53,8 @@ nr_alloc_batch = 0
|
||||
|
||||
## arp_package
|
||||
|
||||
统计 ARP 包的数量,该插件依赖 cgo,如果需要该插件需要下载 `with-cgo` 的 categraf 发布包。
|
||||
统计 ARP 包的数量,该插件依赖 cgo,如果需要该插件需要下载 `with-cgo` 的 categraf 发布包。
|
||||
|
||||
## ntp
|
||||
|
||||
监控机器时间偏移量,只需要给出 ntp 服务端地址,Categraf 就会周期性去请求,对比本机时间,得到偏移量,监控指标是 ntp_offset_ms 顾名思义,单位是毫秒,一般这个值不能超过 1000
|
||||
|
||||
140
memsto/event_processor_cache.go
Normal file
140
memsto/event_processor_cache.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package memsto
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dumper"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type EventProcessorCacheType struct {
|
||||
statTotal int64
|
||||
statLastUpdated int64
|
||||
ctx *ctx.Context
|
||||
stats *Stats
|
||||
|
||||
sync.RWMutex
|
||||
processors map[int64]*models.EventPipeline // key: pipeline id
|
||||
}
|
||||
|
||||
func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCacheType {
|
||||
epc := &EventProcessorCacheType{
|
||||
statTotal: -1,
|
||||
statLastUpdated: -1,
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
processors: make(map[int64]*models.EventPipeline),
|
||||
}
|
||||
epc.SyncEventProcessors()
|
||||
return epc
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) Reset() {
|
||||
epc.Lock()
|
||||
defer epc.Unlock()
|
||||
|
||||
epc.statTotal = -1
|
||||
epc.statLastUpdated = -1
|
||||
epc.processors = make(map[int64]*models.EventPipeline)
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
|
||||
if epc.statTotal == total && epc.statLastUpdated == lastUpdated {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total, lastUpdated int64) {
|
||||
epc.Lock()
|
||||
epc.processors = m
|
||||
epc.Unlock()
|
||||
|
||||
// only one goroutine used, so no need lock
|
||||
epc.statTotal = total
|
||||
epc.statLastUpdated = lastUpdated
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) Get(processorId int64) *models.EventPipeline {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
return epc.processors[processorId]
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) GetProcessorIds() []int64 {
|
||||
epc.RLock()
|
||||
defer epc.RUnlock()
|
||||
|
||||
count := len(epc.processors)
|
||||
list := make([]int64, 0, count)
|
||||
for processorId := range epc.processors {
|
||||
list = append(list, processorId)
|
||||
}
|
||||
|
||||
return list
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) SyncEventProcessors() {
|
||||
err := epc.syncEventProcessors()
|
||||
if err != nil {
|
||||
fmt.Println("failed to sync event processors:", err)
|
||||
exit(1)
|
||||
}
|
||||
|
||||
go epc.loopSyncEventProcessors()
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) loopSyncEventProcessors() {
|
||||
duration := time.Duration(9000) * time.Millisecond
|
||||
for {
|
||||
time.Sleep(duration)
|
||||
if err := epc.syncEventProcessors(); err != nil {
|
||||
logger.Warning("failed to sync event processors:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (epc *EventProcessorCacheType) syncEventProcessors() error {
|
||||
start := time.Now()
|
||||
|
||||
stat, err := models.EventPipelineStatistics(epc.ctx)
|
||||
if err != nil {
|
||||
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
|
||||
return errors.WithMessage(err, "failed to exec StatisticsGet for EventPipeline")
|
||||
}
|
||||
|
||||
if !epc.StatChanged(stat.Total, stat.LastUpdated) {
|
||||
epc.stats.GaugeCronDuration.WithLabelValues("sync_event_processors").Set(0)
|
||||
epc.stats.GaugeSyncNumber.WithLabelValues("sync_event_processors").Set(0)
|
||||
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "not changed")
|
||||
return nil
|
||||
}
|
||||
|
||||
lst, err := models.ListEventPipelines(epc.ctx)
|
||||
if err != nil {
|
||||
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "failed to query records: "+err.Error())
|
||||
return errors.WithMessage(err, "failed to exec ListEventPipelines")
|
||||
}
|
||||
|
||||
m := make(map[int64]*models.EventPipeline)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].ID] = lst[i]
|
||||
}
|
||||
|
||||
epc.Set(m, stat.Total, stat.LastUpdated)
|
||||
|
||||
ms := time.Since(start).Milliseconds()
|
||||
epc.stats.GaugeCronDuration.WithLabelValues("sync_event_processors").Set(float64(ms))
|
||||
epc.stats.GaugeSyncNumber.WithLabelValues("sync_event_processors").Set(float64(len(m)))
|
||||
logger.Infof("timer: sync event processors done, cost: %dms, number: %d", ms, len(m))
|
||||
dumper.PutSyncRecord("event_processors", start.Unix(), ms, len(m), "success")
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -41,30 +41,30 @@ type AlertCurEvent struct {
|
||||
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
|
||||
RunbookUrl string `json:"runbook_url"`
|
||||
NotifyRecovered int `json:"notify_recovered"`
|
||||
NotifyChannels string `json:"-"` // for db
|
||||
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
|
||||
NotifyGroups string `json:"-"` // for db
|
||||
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
|
||||
NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
|
||||
NotifyChannels string `json:"-"` // for db
|
||||
NotifyChannelsJSON []string `json:"notify_channels,omitempty" gorm:"-"` // for fe
|
||||
NotifyGroups string `json:"-"` // for db
|
||||
NotifyGroupsJSON []string `json:"notify_groups,omitempty" gorm:"-"` // for fe
|
||||
NotifyGroupsObj []*UserGroup `json:"notify_groups_obj,omitempty" gorm:"-"` // for fe
|
||||
TargetIdent string `json:"target_ident"`
|
||||
TargetNote string `json:"target_note"`
|
||||
TriggerTime int64 `json:"trigger_time"`
|
||||
TriggerValue string `json:"trigger_value"`
|
||||
TriggerValues string `json:"trigger_values" gorm:"-"`
|
||||
TriggerValuesJson EventTriggerValues `json:"trigger_values_json" gorm:"-"`
|
||||
Tags string `json:"-"` // for db
|
||||
TagsJSON []string `json:"tags" gorm:"-"` // for fe
|
||||
TagsMap map[string]string `json:"tags_map" gorm:"-"` // for internal usage
|
||||
OriginalTags string `json:"-"` // for db
|
||||
OriginalTagsJSON []string `json:"original_tags" gorm:"-"` // for fe
|
||||
Annotations string `json:"-"` //
|
||||
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
|
||||
IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py
|
||||
NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py
|
||||
LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间
|
||||
LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间
|
||||
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
|
||||
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
|
||||
Tags string `json:"-"` // for db
|
||||
TagsJSON []string `json:"tags" gorm:"-"` // for fe
|
||||
TagsMap map[string]string `json:"tags_map" gorm:"-"` // for internal usage
|
||||
OriginalTags string `json:"-"` // for db
|
||||
OriginalTagsJSON []string `json:"original_tags" gorm:"-"` // for fe
|
||||
Annotations string `json:"-"` //
|
||||
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
|
||||
IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py
|
||||
NotifyUsersObj []*User `json:"notify_users_obj,omitempty" gorm:"-"` // for notify.py
|
||||
LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间
|
||||
LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间
|
||||
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
|
||||
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
|
||||
ExtraConfig interface{} `json:"extra_config" gorm:"-"`
|
||||
Status int `json:"status" gorm:"-"`
|
||||
Claimant string `json:"claimant" gorm:"-"`
|
||||
@@ -869,3 +869,100 @@ func AlertCurEventStatistics(ctx *ctx.Context, stime time.Time) map[string]inter
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) DeepCopy() *AlertCurEvent {
|
||||
eventCopy := *e
|
||||
|
||||
// 复制指针字段
|
||||
if e.NotifyGroupsObj != nil {
|
||||
eventCopy.NotifyGroupsObj = make([]*UserGroup, len(e.NotifyGroupsObj))
|
||||
for i, group := range e.NotifyGroupsObj {
|
||||
if group != nil {
|
||||
groupCopy := *group
|
||||
eventCopy.NotifyGroupsObj[i] = &groupCopy
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if e.NotifyUsersObj != nil {
|
||||
eventCopy.NotifyUsersObj = make([]*User, len(e.NotifyUsersObj))
|
||||
for i, user := range e.NotifyUsersObj {
|
||||
if user != nil {
|
||||
userCopy := *user
|
||||
eventCopy.NotifyUsersObj[i] = &userCopy
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if e.Target != nil {
|
||||
targetCopy := *e.Target
|
||||
eventCopy.Target = &targetCopy
|
||||
}
|
||||
|
||||
// 复制切片字段
|
||||
if e.CallbacksJSON != nil {
|
||||
eventCopy.CallbacksJSON = make([]string, len(e.CallbacksJSON))
|
||||
copy(eventCopy.CallbacksJSON, e.CallbacksJSON)
|
||||
}
|
||||
|
||||
if e.NotifyChannelsJSON != nil {
|
||||
eventCopy.NotifyChannelsJSON = make([]string, len(e.NotifyChannelsJSON))
|
||||
copy(eventCopy.NotifyChannelsJSON, e.NotifyChannelsJSON)
|
||||
}
|
||||
|
||||
if e.NotifyGroupsJSON != nil {
|
||||
eventCopy.NotifyGroupsJSON = make([]string, len(e.NotifyGroupsJSON))
|
||||
copy(eventCopy.NotifyGroupsJSON, e.NotifyGroupsJSON)
|
||||
}
|
||||
|
||||
if e.TagsJSON != nil {
|
||||
eventCopy.TagsJSON = make([]string, len(e.TagsJSON))
|
||||
copy(eventCopy.TagsJSON, e.TagsJSON)
|
||||
}
|
||||
|
||||
if e.TagsMap != nil {
|
||||
eventCopy.TagsMap = make(map[string]string, len(e.TagsMap))
|
||||
for k, v := range e.TagsMap {
|
||||
eventCopy.TagsMap[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if e.OriginalTagsJSON != nil {
|
||||
eventCopy.OriginalTagsJSON = make([]string, len(e.OriginalTagsJSON))
|
||||
copy(eventCopy.OriginalTagsJSON, e.OriginalTagsJSON)
|
||||
}
|
||||
|
||||
if e.AnnotationsJSON != nil {
|
||||
eventCopy.AnnotationsJSON = make(map[string]string, len(e.AnnotationsJSON))
|
||||
for k, v := range e.AnnotationsJSON {
|
||||
eventCopy.AnnotationsJSON[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if e.ExtraInfo != nil {
|
||||
eventCopy.ExtraInfo = make([]string, len(e.ExtraInfo))
|
||||
copy(eventCopy.ExtraInfo, e.ExtraInfo)
|
||||
}
|
||||
|
||||
if e.ExtraInfoMap != nil {
|
||||
eventCopy.ExtraInfoMap = make([]map[string]string, len(e.ExtraInfoMap))
|
||||
for i, m := range e.ExtraInfoMap {
|
||||
if m != nil {
|
||||
eventCopy.ExtraInfoMap[i] = make(map[string]string, len(m))
|
||||
for k, v := range m {
|
||||
eventCopy.ExtraInfoMap[i][k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if e.NotifyRuleIDs != nil {
|
||||
eventCopy.NotifyRuleIDs = make([]int64, len(e.NotifyRuleIDs))
|
||||
copy(eventCopy.NotifyRuleIDs, e.NotifyRuleIDs)
|
||||
}
|
||||
|
||||
eventCopy.RuleConfigJson = e.RuleConfigJson
|
||||
eventCopy.ExtraConfig = e.ExtraConfig
|
||||
|
||||
return &eventCopy
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"text/template/parse"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
@@ -479,19 +477,10 @@ func (ar *AlertRule) Verify() error {
|
||||
return errors.New("name is blank")
|
||||
}
|
||||
|
||||
t, err := template.New("test").Parse(ar.Name)
|
||||
if err != nil {
|
||||
if str.Dangerous(ar.Name) {
|
||||
return errors.New("Name has invalid characters")
|
||||
}
|
||||
|
||||
for _, node := range t.Tree.Root.Nodes {
|
||||
if tn := node.(*parse.TextNode); tn != nil {
|
||||
if str.Dangerous(tn.String()) {
|
||||
return fmt.Errorf("Name has invalid characters: %s", tn.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ar.Prod == "" {
|
||||
ar.Prod = METRIC
|
||||
}
|
||||
|
||||
178
models/event_pipeline.go
Normal file
178
models/event_pipeline.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
// EventPipeline 事件Pipeline模型
|
||||
type EventPipeline struct {
|
||||
ID int64 `json:"id" gorm:"primaryKey"`
|
||||
Name string `json:"name" gorm:"type:varchar(128)"`
|
||||
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
|
||||
TeamNames []string `json:"team_names" gorm:"-"`
|
||||
Description string `json:"description" gorm:"type:varchar(255)"`
|
||||
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
|
||||
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
|
||||
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
|
||||
Processors []Processor `json:"processors" gorm:"type:text;serializer:json"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
|
||||
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
|
||||
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
|
||||
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
|
||||
}
|
||||
|
||||
type Processor struct {
|
||||
Typ string `json:"typ"`
|
||||
Config interface{} `json:"config"`
|
||||
}
|
||||
|
||||
func (e *EventPipeline) TableName() string {
|
||||
return "event_pipeline"
|
||||
}
|
||||
|
||||
func (e *EventPipeline) Verify() error {
|
||||
if e.Name == "" {
|
||||
return errors.New("name cannot be empty")
|
||||
}
|
||||
|
||||
if len(e.TeamIds) == 0 {
|
||||
return errors.New("team_ids cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *EventPipeline) DB2FE() {
|
||||
if e.TeamIds == nil {
|
||||
e.TeamIds = make([]int64, 0)
|
||||
}
|
||||
if e.LabelFilters == nil {
|
||||
e.LabelFilters = make([]TagFilter, 0)
|
||||
}
|
||||
if e.AttrFilters == nil {
|
||||
e.AttrFilters = make([]TagFilter, 0)
|
||||
}
|
||||
if e.Processors == nil {
|
||||
e.Processors = make([]Processor, 0)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateEventPipeline 创建事件Pipeline
|
||||
func CreateEventPipeline(ctx *ctx.Context, pipeline *EventPipeline) error {
|
||||
return DB(ctx).Create(pipeline).Error
|
||||
}
|
||||
|
||||
// GetEventPipeline 获取单个事件Pipeline
|
||||
func GetEventPipeline(ctx *ctx.Context, id int64) (*EventPipeline, error) {
|
||||
var pipeline EventPipeline
|
||||
err := DB(ctx).Where("id = ?", id).First(&pipeline).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pipeline.DB2FE()
|
||||
return &pipeline, nil
|
||||
}
|
||||
|
||||
func GetEventPipelinesByIds(ctx *ctx.Context, ids []int64) ([]*EventPipeline, error) {
|
||||
var pipelines []*EventPipeline
|
||||
err := DB(ctx).Where("id in ?", ids).Find(&pipelines).Error
|
||||
return pipelines, err
|
||||
}
|
||||
|
||||
// UpdateEventPipeline 更新事件Pipeline
|
||||
func UpdateEventPipeline(ctx *ctx.Context, pipeline *EventPipeline) error {
|
||||
return DB(ctx).Save(pipeline).Error
|
||||
}
|
||||
|
||||
// DeleteEventPipeline 删除事件Pipeline
|
||||
func DeleteEventPipeline(ctx *ctx.Context, id int64) error {
|
||||
return DB(ctx).Delete(&EventPipeline{}, id).Error
|
||||
}
|
||||
|
||||
// ListEventPipelines 获取事件Pipeline列表
|
||||
func ListEventPipelines(ctx *ctx.Context) ([]*EventPipeline, error) {
|
||||
if !ctx.IsCenter {
|
||||
pipelines, err := poster.GetByUrls[[]*EventPipeline](ctx, "/v1/n9e/event-pipelines")
|
||||
return pipelines, err
|
||||
}
|
||||
|
||||
var pipelines []*EventPipeline
|
||||
err := DB(ctx).Order("name desc").Find(&pipelines).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, p := range pipelines {
|
||||
p.DB2FE()
|
||||
}
|
||||
|
||||
return pipelines, nil
|
||||
}
|
||||
|
||||
// DeleteEventPipelines 批量删除事件Pipeline
|
||||
func DeleteEventPipelines(ctx *ctx.Context, ids []int64) error {
|
||||
return DB(ctx).Where("id in ?", ids).Delete(&EventPipeline{}).Error
|
||||
}
|
||||
|
||||
// Update 更新事件Pipeline
|
||||
func (e *EventPipeline) Update(ctx *ctx.Context, ref *EventPipeline) error {
|
||||
ref.ID = e.ID
|
||||
ref.CreateAt = e.CreateAt
|
||||
ref.CreateBy = e.CreateBy
|
||||
ref.UpdateAt = time.Now().Unix()
|
||||
|
||||
err := ref.Verify()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return DB(ctx).Model(e).Select("*").Updates(*ref).Error
|
||||
}
|
||||
|
||||
// FillTeamNames 填充团队名称
|
||||
func (e *EventPipeline) FillTeamNames(ctx *ctx.Context) error {
|
||||
e.TeamNames = make([]string, 0, len(e.TeamIds))
|
||||
if len(e.TeamIds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
teamMap, err := UserGroupIdAndNameMap(ctx, e.TeamIds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 按原始TeamIds顺序填充TeamNames
|
||||
for _, tid := range e.TeamIds {
|
||||
if name, exists := teamMap[tid]; exists {
|
||||
e.TeamNames = append(e.TeamNames, name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func EventPipelineStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=event_pipeline")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&EventPipeline{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
err := session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(stats) == 0 {
|
||||
return nil, fmt.Errorf("no event pipeline found")
|
||||
}
|
||||
|
||||
return stats[0], nil
|
||||
}
|
||||
@@ -67,7 +67,8 @@ func MigrateTables(db *gorm.DB) error {
|
||||
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
|
||||
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
|
||||
&models.MetricFilter{}, &models.NotificaitonRecord{}, &models.TargetBusiGroup{},
|
||||
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{}}
|
||||
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
|
||||
&models.EventPipeline{}}
|
||||
|
||||
if isPostgres(db) {
|
||||
dts = append(dts, &models.PostgresBuiltinComponent{})
|
||||
@@ -227,6 +228,26 @@ func InsertPermPoints(db *gorm.DB) {
|
||||
Operation: "/notification-rules/del",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/del",
|
||||
})
|
||||
|
||||
for _, op := range ops {
|
||||
var count int64
|
||||
|
||||
@@ -414,16 +435,17 @@ func (t *MessageTemplate) TableName() string {
|
||||
}
|
||||
|
||||
type NotifyRule struct {
|
||||
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
|
||||
Name string `gorm:"column:name;type:varchar(255);not null"`
|
||||
Description string `gorm:"column:description;type:text"`
|
||||
Enable bool `gorm:"column:enable;not null;default:false"`
|
||||
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
|
||||
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
|
||||
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
|
||||
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
|
||||
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`
|
||||
UpdateBy string `gorm:"column:update_by;type:varchar(64);not null;default:''"`
|
||||
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
|
||||
Name string `gorm:"column:name;type:varchar(255);not null"`
|
||||
Description string `gorm:"column:description;type:text"`
|
||||
Enable bool `gorm:"column:enable;not null;default:false"`
|
||||
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
|
||||
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
|
||||
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text"`
|
||||
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
|
||||
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
|
||||
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`
|
||||
UpdateBy string `gorm:"column:update_by;type:varchar(64);not null;default:''"`
|
||||
}
|
||||
|
||||
func (r *NotifyRule) TableName() string {
|
||||
|
||||
@@ -388,7 +388,10 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
|
||||
|
||||
// 设置 URL 参数
|
||||
query := req.URL.Query()
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
if flashDutyChannelID != 0 {
|
||||
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
|
||||
@@ -15,6 +15,8 @@ type NotifyRule struct {
|
||||
Enable bool `json:"enable"` // 启用状态
|
||||
UserGroupIds []int64 `json:"user_group_ids" gorm:"serializer:json"` // 告警组ID
|
||||
|
||||
PipelineConfigs []PipelineConfig `json:"pipeline_configs" gorm:"serializer:json"`
|
||||
|
||||
// 通知配置
|
||||
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
|
||||
|
||||
@@ -24,6 +26,11 @@ type NotifyRule struct {
|
||||
UpdateBy string `json:"update_by"`
|
||||
}
|
||||
|
||||
type PipelineConfig struct {
|
||||
PipelineId int64 `json:"pipeline_id"`
|
||||
Enable bool `json:"enable"`
|
||||
}
|
||||
|
||||
func (r *NotifyRule) TableName() string {
|
||||
return "notify_rule"
|
||||
}
|
||||
|
||||
@@ -121,6 +121,25 @@ func (u *User) IsAdmin() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// has group permission
|
||||
func (u *User) CheckGroupPermission(ctx *ctx.Context, groupIds []int64) error {
|
||||
if !u.IsAdmin() {
|
||||
ids, err := MyGroupIdsMap(ctx, u.Id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range groupIds {
|
||||
if _, ok := ids[id]; ok {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return errors.New("no permission")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *User) Verify() error {
|
||||
u.Username = strings.TrimSpace(u.Username)
|
||||
|
||||
|
||||
@@ -113,6 +113,19 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func UserGroupIdAndNameMap(ctx *ctx.Context, ids []int64) (map[int64]string, error) {
|
||||
lst, err := UserGroupGetByIds(ctx, ids)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := make(map[int64]string)
|
||||
for _, ug := range lst {
|
||||
m[ug.Id] = ug.Name
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/user-groups")
|
||||
|
||||
@@ -22,6 +22,20 @@ func MyGroupIds(ctx *ctx.Context, userId int64) ([]int64, error) {
|
||||
return ids, err
|
||||
}
|
||||
|
||||
func MyGroupIdsMap(ctx *ctx.Context, userId int64) (map[int64]struct{}, error) {
|
||||
ids, err := MyGroupIds(ctx, userId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := make(map[int64]struct{}, len(ids))
|
||||
for _, id := range ids {
|
||||
res[id] = struct{}{}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// my business group ids
|
||||
func MyBusiGroupIds(ctx *ctx.Context, userId int64) ([]int64, error) {
|
||||
groupIds, err := MyGroupIds(ctx, userId)
|
||||
|
||||
@@ -74,6 +74,9 @@ var I18N = `{
|
||||
"Log Analysis": "日志分析",
|
||||
"View Logs": "查看日志",
|
||||
"View Index Patterns": "查看索引模式",
|
||||
"Add Index Pattern": "添加索引模式",
|
||||
"Modify Index Pattern": "修改索引模式",
|
||||
"Delete Index Pattern": "删除索引模式",
|
||||
"Alert Rules": "告警规则",
|
||||
"View Alert Rules": "查看告警规则",
|
||||
"Add Alert Rule": "添加告警规则",
|
||||
@@ -227,6 +230,9 @@ var I18N = `{
|
||||
"Log Analysis": "日志分析",
|
||||
"View Logs": "查看日志",
|
||||
"View Index Patterns": "查看索引模式",
|
||||
"Add Index Pattern": "添加索引模式",
|
||||
"Modify Index Pattern": "修改索引模式",
|
||||
"Delete Index Pattern": "删除索引模式",
|
||||
"Alert Rules": "告警规则",
|
||||
"View Alert Rules": "查看告警规则",
|
||||
"Add Alert Rule": "添加告警规则",
|
||||
@@ -391,6 +397,9 @@ var I18N = `{
|
||||
"Log Analysis": "日誌分析",
|
||||
"View Logs": "查看日誌",
|
||||
"View Index Patterns": "查看索引模式",
|
||||
"Add Index Pattern": "添加索引模式",
|
||||
"Modify Index Pattern": "修改索引模式",
|
||||
"Delete Index Pattern": "刪除索引模式",
|
||||
"Alert Rules": "告警規則",
|
||||
"View Alert Rules": "查看告警規則",
|
||||
"Add Alert Rule": "添加告警規則",
|
||||
@@ -545,6 +554,9 @@ var I18N = `{
|
||||
"Log Analysis": "ログ分析",
|
||||
"View Logs": "ログの表示",
|
||||
"View Index Patterns": "インデックスパターンの表示",
|
||||
"Add Index Pattern": "インデックスパターンの追加",
|
||||
"Modify Index Pattern": "インデックスパターンの修正",
|
||||
"Delete Index Pattern": "インデックスパターンの削除",
|
||||
"Alert Rules": "アラートルール",
|
||||
"View Alert Rules": "アラートルールの表示",
|
||||
"Add Alert Rule": "アラートルールの追加",
|
||||
|
||||
Reference in New Issue
Block a user