Compare commits

...

13 Commits

Author SHA1 Message Date
ning
445b6b2e41 refactor: build event tags 2025-05-28 16:10:22 +08:00
ning
fcf07df35f refactor: datasource init add recover 2025-05-28 14:13:54 +08:00
ning
196e5241bf refactor: handle ibex 2025-05-28 13:13:09 +08:00
ning
0d7ee0cfd9 fix: ibex after event relabel 2025-05-28 12:24:48 +08:00
ning
aed6927d7e refactor: update send duty 2025-05-27 15:50:26 +08:00
ning
61b9ebb1e6 Merge branch 'release-16' of github.com:ccfos/nightingale into release-16 2025-05-26 19:57:42 +08:00
ning
da27ca06d6 refactor: update relabel processor 2025-05-26 19:57:00 +08:00
shardingHe
776f100eb9 docs: add config for ntp (#2691) 2025-05-21 16:25:05 +08:00
Yening Qin
ac8443fd66 feat: add event pipeline (#2680) 2025-05-15 20:17:27 +08:00
Yening Qin
751d2e8601 fix: alert rule verify (#2667) 2025-05-13 18:46:55 +08:00
Yening Qin
87855cb184 fix: default ds id update (#2665) 2025-05-13 15:44:52 +08:00
Yening Qin
9ddeefea9b update k8s dashboards (#2662) 2025-05-13 14:56:36 +08:00
Yening Qin
6b566f5a18 add offset, index-pattern-ops, log-query-api (#2656) 2025-05-12 15:38:19 +08:00
36 changed files with 2014 additions and 211 deletions

View File

@@ -92,7 +92,7 @@
## 交流渠道
- 报告Bug优先推荐提交[夜莺GitHub Issue](https://github.com/ccfos/nightingale/issues/new?assignees=&labels=kind%2Fbug&projects=&template=bug_report.yml)
- 推荐完整浏览[夜莺文档站点](https://flashcat.cloud/docs/content/flashcat-monitor/nightingale-v7/introduction/)了解更多信息
- 关注[这个公众号](https://gitlink.org.cn/UlricQin)了解更多夜莺动态和知识
- 加我微信:`picobyte`(我已关闭好友验证)拉入微信群,备注:`夜莺互助群`
## 广受关注

View File

@@ -115,7 +115,9 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, targetsOfAlertRulesCache,
busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, alertc.Alerting, ctx, alertStats)
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)
notifyRecordComsumer := sender.NewNotifyRecordConsumer(ctx)

View File

@@ -15,6 +15,8 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/relabel"
"github.com/ccfos/nightingale/v6/alert/sender"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
@@ -35,6 +37,7 @@ type Dispatch struct {
notifyRuleCache *memsto.NotifyRuleCacheType
notifyChannelCache *memsto.NotifyChannelCacheType
messageTemplateCache *memsto.MessageTemplateCacheType
eventProcessorCache *memsto.EventProcessorCacheType
alerting aconf.Alerting
@@ -54,7 +57,7 @@ type Dispatch struct {
func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.UserCacheType, userGroupCache *memsto.UserGroupCacheType,
alertSubscribeCache *memsto.AlertSubscribeCacheType, targetCache *memsto.TargetCacheType, notifyConfigCache *memsto.NotifyConfigCacheType,
taskTplsCache *memsto.TaskTplCache, notifyRuleCache *memsto.NotifyRuleCacheType, notifyChannelCache *memsto.NotifyChannelCacheType,
messageTemplateCache *memsto.MessageTemplateCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
messageTemplateCache *memsto.MessageTemplateCacheType, eventProcessorCache *memsto.EventProcessorCacheType, alerting aconf.Alerting, ctx *ctx.Context, astats *astats.Stats) *Dispatch {
notify := &Dispatch{
alertRuleCache: alertRuleCache,
userCache: userCache,
@@ -66,6 +69,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
notifyRuleCache: notifyRuleCache,
notifyChannelCache: notifyChannelCache,
messageTemplateCache: messageTemplateCache,
eventProcessorCache: eventProcessorCache,
alerting: alerting,
@@ -77,6 +81,9 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
ctx: ctx,
Astats: astats,
}
relabel.Init()
return notify
}
@@ -141,11 +148,14 @@ func (e *Dispatch) reloadTpls() error {
return nil
}
func (e *Dispatch) HandleEventWithNotifyRule(event *models.AlertCurEvent, isSubscribe bool) {
func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent) {
if len(event.NotifyRuleIDs) > 0 {
for _, notifyRuleId := range event.NotifyRuleIDs {
logger.Infof("notify rule ids: %v, event: %+v", notifyRuleId, event)
if len(eventOrigin.NotifyRuleIDs) > 0 {
for _, notifyRuleId := range eventOrigin.NotifyRuleIDs {
// 深拷贝新的 event避免并发修改 event 冲突
eventCopy := eventOrigin.DeepCopy()
logger.Infof("notify rule ids: %v, event: %+v", notifyRuleId, eventCopy)
notifyRule := e.notifyRuleCache.Get(notifyRuleId)
if notifyRule == nil {
continue
@@ -155,33 +165,110 @@ func (e *Dispatch) HandleEventWithNotifyRule(event *models.AlertCurEvent, isSubs
continue
}
var processors []pipeline.Processor
for _, pipelineConfig := range notifyRule.PipelineConfigs {
if !pipelineConfig.Enable {
continue
}
eventPipeline := e.eventProcessorCache.Get(pipelineConfig.PipelineId)
if eventPipeline == nil {
logger.Warningf("notify_id: %d, event:%+v, processor not found", notifyRuleId, eventCopy)
continue
}
if !pipelineApplicable(eventPipeline, eventCopy) {
logger.Debugf("notify_id: %d, event:%+v, pipeline_id: %d, not applicable", notifyRuleId, eventCopy, pipelineConfig.PipelineId)
continue
}
for _, p := range eventPipeline.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
if err != nil {
logger.Warningf("notify_id: %d, event:%+v, processor:%+v type not found", notifyRuleId, eventCopy, p)
continue
}
processors = append(processors, processor)
}
}
for _, processor := range processors {
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
processor.Process(e.ctx, eventCopy)
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
if eventCopy == nil {
logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
break
}
}
// notify
for i := range notifyRule.NotifyConfigs {
if !NotifyRuleApplicable(&notifyRule.NotifyConfigs[i], event) {
if !NotifyRuleApplicable(&notifyRule.NotifyConfigs[i], eventCopy) {
continue
}
notifyChannel := e.notifyChannelCache.Get(notifyRule.NotifyConfigs[i].ChannelID)
messageTemplate := e.messageTemplateCache.Get(notifyRule.NotifyConfigs[i].TemplateID)
if notifyChannel == nil {
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{event}, notifyRuleId, fmt.Sprintf("notify_channel_id:%d", notifyRule.NotifyConfigs[i].ChannelID), "", "", errors.New("notify_channel not found"))
logger.Warningf("notify_id: %d, event:%+v, channel_id:%d, template_id: %d, notify_channel not found", notifyRuleId, event, notifyRule.NotifyConfigs[i].ChannelID, notifyRule.NotifyConfigs[i].TemplateID)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, fmt.Sprintf("notify_channel_id:%d", notifyRule.NotifyConfigs[i].ChannelID), "", "", errors.New("notify_channel not found"))
logger.Warningf("notify_id: %d, event:%+v, channel_id:%d, template_id: %d, notify_channel not found", notifyRuleId, eventCopy, notifyRule.NotifyConfigs[i].ChannelID, notifyRule.NotifyConfigs[i].TemplateID)
continue
}
if notifyChannel.RequestType != "flashduty" && messageTemplate == nil {
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, template_id: %d, message_template not found", notifyRuleId, notifyChannel.Ident, event, notifyRule.NotifyConfigs[i].TemplateID)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{event}, notifyRuleId, notifyChannel.Name, "", "", errors.New("message_template not found"))
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, template_id: %d, message_template not found", notifyRuleId, notifyChannel.Ident, eventCopy, notifyRule.NotifyConfigs[i].TemplateID)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, notifyChannel.Name, "", "", errors.New("message_template not found"))
continue
}
// todo go send
// todo 聚合 event
go e.sendV2([]*models.AlertCurEvent{event}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
go e.sendV2([]*models.AlertCurEvent{eventCopy}, notifyRuleId, &notifyRule.NotifyConfigs[i], notifyChannel, messageTemplate)
}
}
}
}
func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
if pipeline == nil {
return true
}
if !pipeline.FilterEnable {
return true
}
tagMatch := true
if len(pipeline.LabelFilters) > 0 {
for i := range pipeline.LabelFilters {
if pipeline.LabelFilters[i].Func == "" {
pipeline.LabelFilters[i].Func = pipeline.LabelFilters[i].Op
}
}
tagFilters, err := models.ParseTagFilter(pipeline.LabelFilters)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v", err, event, pipeline)
return false
}
tagMatch = common.MatchTags(event.TagsMap, tagFilters)
}
attributesMatch := true
if len(pipeline.AttrFilters) > 0 {
tagFilters, err := models.ParseTagFilter(pipeline.AttrFilters)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v err:%v", tagFilters, event, pipeline, err)
return false
}
attributesMatch = common.MatchTags(event.JsonTagsAndValue(), tagFilters)
}
return tagMatch && attributesMatch
}
func NotifyRuleApplicable(notifyConfig *models.NotifyConfig, event *models.AlertCurEvent) bool {
tm := time.Unix(event.TriggerTime, 0)
triggerTime := tm.Format("15:04")
@@ -359,6 +446,10 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
switch notifyChannel.RequestType {
case "flashduty":
if len(flashDutyChannelIDs) == 0 {
flashDutyChannelIDs = []int64{0} // 如果 flashduty 通道没有配置,则使用 0, 给 SendFlashDuty 判断使用, 不给 flashduty 传 channel_id 参数
}
for i := range flashDutyChannelIDs {
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
@@ -448,8 +539,7 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
notifyTarget.AndMerge(handler(rule, event, notifyTarget, e))
}
// 处理事件发送,这里用一个goroutine处理一个event的所有发送事件
go e.HandleEventWithNotifyRule(event, isSubscribe)
go e.HandleEventWithNotifyRule(event)
go e.Send(rule, event, notifyTarget, isSubscribe)
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event
@@ -646,6 +736,11 @@ func (e *Dispatch) HandleIbex(rule *models.AlertRule, event *models.AlertCurEven
}
json.Unmarshal([]byte(rule.RuleConfig), &ruleConfig)
if event.IsRecovered {
// 恢复事件不需要走故障自愈的逻辑
return
}
for _, t := range ruleConfig.TaskTpls {
if t.TplId == 0 {
continue

View File

@@ -0,0 +1,12 @@
package pipeline
import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
func Pipeline(ctx *ctx.Context, event *models.AlertCurEvent, processors []Processor) {
for _, processor := range processors {
processor.Process(ctx, event)
}
}

View File

@@ -0,0 +1,52 @@
package pipeline
import (
"fmt"
"strings"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
)
// Processor 是处理器接口,所有处理器类型都需要实现此接口
type Processor interface {
Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *models.AlertCurEvent) // 处理告警事件
}
// NewProcessorFn 创建处理器的函数类型
type NewProcessorFn func(settings interface{}) (Processor, error)
// 处理器注册表,存储各种类型处理器的构造函数
var processorRegister = map[string]NewProcessorFn{}
// // ProcessorTypes 存储所有支持的处理器类型
// var Processors map[int64]models.Processor
// func init() {
// Processors = make(map[int64]models.Processor)
// }
// RegisterProcessor 注册处理器类型
func RegisterProcessor(typ string, p Processor) {
if _, found := processorRegister[typ]; found {
return
}
processorRegister[typ] = p.Init
}
// GetProcessorByType 根据类型获取处理器实例
func GetProcessorByType(typ string, settings interface{}) (Processor, error) {
typ = strings.TrimSpace(typ)
fn, found := processorRegister[typ]
if !found {
return nil, fmt.Errorf("processor type %s not found", typ)
}
processor, err := fn(settings)
if err != nil {
return nil, err
}
return processor, nil
}

View File

@@ -0,0 +1,124 @@
package relabel
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
// RelabelConfig
type RelabelConfig struct {
SourceLabels []string `json:"source_labels"`
Separator string `json:"separator"`
Regex string `json:"regex"`
RegexCompiled *regexp.Regexp
If string `json:"if"`
IfRegex *regexp.Regexp
Modulus uint64 `json:"modulus"`
TargetLabel string `json:"target_label"`
Replacement string `json:"replacement"`
Action string `json:"action"`
}
func Init() {
}
func init() {
pipeline.RegisterProcessor("relabel", &RelabelConfig{})
}
func (r *RelabelConfig) Init(settings interface{}) (pipeline.Processor, error) {
b, err := json.Marshal(settings)
if err != nil {
return nil, err
}
newProcessor := &RelabelConfig{}
err = json.Unmarshal(b, &newProcessor)
if err != nil {
return nil, err
}
return newProcessor, nil
}
const (
REPLACE_DOT = "___"
)
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) {
sourceLabels := make([]model.LabelName, len(r.SourceLabels))
for i := range r.SourceLabels {
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
}
relabelConfigs := []*pconf.RelabelConfig{
{
SourceLabels: sourceLabels,
Separator: r.Separator,
Regex: r.Regex,
RegexCompiled: r.RegexCompiled,
If: r.If,
IfRegex: r.IfRegex,
Modulus: r.Modulus,
TargetLabel: r.TargetLabel,
Replacement: r.Replacement,
Action: r.Action,
},
}
EventRelabel(event, relabelConfigs)
}
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {
labels := make([]prompb.Label, len(event.TagsJSON))
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
for i, tag := range event.TagsJSON {
label := strings.SplitN(tag, "=", 2)
if len(label) != 2 {
continue
}
event.OriginalTagsJSON[i] = tag
label[0] = strings.ReplaceAll(string(label[0]), ".", REPLACE_DOT)
labels[i] = prompb.Label{Name: label[0], Value: label[1]}
}
for i := 0; i < len(relabelConfigs); i++ {
if relabelConfigs[i].Replacement == "" {
relabelConfigs[i].Replacement = "$1"
}
if relabelConfigs[i].Separator == "" {
relabelConfigs[i].Separator = ";"
}
if relabelConfigs[i].Regex == "" {
relabelConfigs[i].Regex = "(.*)"
}
for j := range relabelConfigs[i].SourceLabels {
relabelConfigs[i].SourceLabels[j] = model.LabelName(strings.ReplaceAll(string(relabelConfigs[i].SourceLabels[j]), ".", REPLACE_DOT))
}
}
gotLabels := writer.Process(labels, relabelConfigs...)
event.TagsJSON = make([]string, len(gotLabels))
event.TagsMap = make(map[string]string, len(gotLabels))
for i, label := range gotLabels {
label.Name = strings.ReplaceAll(string(label.Name), REPLACE_DOT, ".")
event.TagsJSON[i] = fmt.Sprintf("%s=%s", label.Name, label.Value)
event.TagsMap[label.Name] = label.Value
}
event.Tags = strings.Join(event.TagsJSON, ",,")
}

View File

@@ -14,14 +14,13 @@ import (
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/dispatch"
"github.com/ccfos/nightingale/v6/alert/mute"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/relabel"
"github.com/ccfos/nightingale/v6/alert/queue"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/prometheus/prometheus/prompb"
"github.com/robfig/cron/v3"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
@@ -61,11 +60,9 @@ type Processor struct {
pendingsUseByRecover *AlertCurEventMap
inhibit bool
tagsMap map[string]string
tagsArr []string
target string
targetNote string
groupName string
tagsMap map[string]string
tagsArr []string
groupName string
alertRuleCache *memsto.AlertRuleCacheType
TargetCache *memsto.TargetCacheType
@@ -196,7 +193,7 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, now int64, ruleHash string) *models.AlertCurEvent {
p.fillTags(anomalyPoint)
p.mayHandleIdent()
hash := Hash(p.rule.Id, p.datasourceId, anomalyPoint)
ds := p.datasourceCache.GetById(p.datasourceId)
var dsName string
@@ -216,8 +213,6 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
event.DatasourceId = p.datasourceId
event.Cluster = dsName
event.Hash = hash
event.TargetIdent = p.target
event.TargetNote = p.targetNote
event.TriggerValue = anomalyPoint.ReadableValue()
event.TriggerValues = anomalyPoint.Values
event.TriggerValuesJson = models.EventTriggerValues{ValuesWithUnit: anomalyPoint.ValuesUnit}
@@ -249,15 +244,6 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)
}
if p.target != "" {
if pt, exist := p.TargetCache.Get(p.target); exist {
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
event.Target = pt
} else {
logger.Infof("Target[ident: %s] doesn't exist in cache.", p.target)
}
}
if event.TriggerValues != "" && strings.Count(event.TriggerValues, "$") > 1 {
// TriggerValues 有多个变量,将多个变量都放到 TriggerValue 中
event.TriggerValue = event.TriggerValues
@@ -271,6 +257,19 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
// 生成事件之后,立马进程 relabel 处理
Relabel(p.rule, event)
// 放到 Relabel(p.rule, event) 下面,为了处理 relabel 之后,标签里才出现 ident 的情况
p.mayHandleIdent(event)
if event.TargetIdent != "" {
if pt, exist := p.TargetCache.Get(event.TargetIdent); exist {
pt.GroupNames = p.BusiGroupCache.GetNamesByBusiGroupIds(pt.GroupIds)
event.Target = pt
} else {
logger.Infof("fill event target error, ident: %s doesn't exist in cache.", event.TargetIdent)
}
}
return event
}
@@ -279,44 +278,15 @@ func Relabel(rule *models.AlertRule, event *models.AlertCurEvent) {
return
}
if len(rule.EventRelabelConfig) == 0 {
return
}
// need to keep the original label
event.OriginalTags = event.Tags
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
labels := make([]prompb.Label, len(event.TagsJSON))
for i, tag := range event.TagsJSON {
label := strings.SplitN(tag, "=", 2)
event.OriginalTagsJSON[i] = tag
labels[i] = prompb.Label{Name: label[0], Value: label[1]}
if len(rule.EventRelabelConfig) == 0 {
return
}
for i := 0; i < len(rule.EventRelabelConfig); i++ {
if rule.EventRelabelConfig[i].Replacement == "" {
rule.EventRelabelConfig[i].Replacement = "$1"
}
if rule.EventRelabelConfig[i].Separator == "" {
rule.EventRelabelConfig[i].Separator = ";"
}
if rule.EventRelabelConfig[i].Regex == "" {
rule.EventRelabelConfig[i].Regex = "(.*)"
}
}
// relabel process
relabels := writer.Process(labels, rule.EventRelabelConfig...)
event.TagsJSON = make([]string, len(relabels))
event.TagsMap = make(map[string]string, len(relabels))
for i, label := range relabels {
event.TagsJSON[i] = fmt.Sprintf("%s=%s", label.Name, label.Value)
event.TagsMap[label.Name] = label.Value
}
event.Tags = strings.Join(event.TagsJSON, ",,")
relabel.EventRelabel(event, rule.EventRelabelConfig)
}
func (p *Processor) HandleRecover(alertingKeys map[string]struct{}, now int64, inhibit bool) {
@@ -641,19 +611,19 @@ func (p *Processor) fillTags(anomalyPoint models.AnomalyPoint) {
p.tagsArr = labelMapToArr(tagsMap)
}
func (p *Processor) mayHandleIdent() {
func (p *Processor) mayHandleIdent(event *models.AlertCurEvent) {
// handle ident
if ident, has := p.tagsMap["ident"]; has {
if ident, has := event.TagsMap["ident"]; has {
if target, exists := p.TargetCache.Get(ident); exists {
p.target = target.Ident
p.targetNote = target.Note
event.TargetIdent = target.Ident
event.TargetNote = target.Note
} else {
p.target = ident
p.targetNote = ""
event.TargetIdent = ident
event.TargetNote = ""
}
} else {
p.target = ""
p.targetNote = ""
event.TargetIdent = ""
event.TargetNote = ""
}
}

View File

@@ -30,12 +30,14 @@ type IbexCallBacker struct {
func (c *IbexCallBacker) CallBack(ctx CallBackContext) {
if len(ctx.CallBackURL) == 0 || len(ctx.Events) == 0 {
logger.Warningf("event_callback_ibex: url or events is empty, url: %s, events: %+v", ctx.CallBackURL, ctx.Events)
return
}
event := ctx.Events[0]
if event.IsRecovered {
logger.Infof("event_callback_ibex: event is recovered, event: %+v", event)
return
}
@@ -43,8 +45,9 @@ func (c *IbexCallBacker) CallBack(ctx CallBackContext) {
}
func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent) {
logger.Infof("event_callback_ibex: url: %s, event: %+v", url, event)
if imodels.DB() == nil && ctx.IsCenter {
logger.Warning("event_callback_ibex: db is nil")
logger.Warningf("event_callback_ibex: db is nil, event: %+v", event)
return
}
@@ -63,17 +66,23 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
id, err := strconv.ParseInt(idstr, 10, 64)
if err != nil {
logger.Errorf("event_callback_ibex: failed to parse url: %s", url)
logger.Errorf("event_callback_ibex: failed to parse url: %s event: %+v", url, event)
return
}
if host == "" {
// 用户在callback url中没有传入host就从event中解析
host = event.TargetIdent
if host == "" {
if ident, has := event.TagsMap["ident"]; has {
host = ident
}
}
}
if host == "" {
logger.Error("event_callback_ibex: failed to get host")
logger.Errorf("event_callback_ibex: failed to get host, id: %d, event: %+v", id, event)
return
}
@@ -83,21 +92,23 @@ func (c *IbexCallBacker) handleIbex(ctx *ctx.Context, url string, event *models.
func CallIbex(ctx *ctx.Context, id int64, host string,
taskTplCache *memsto.TaskTplCache, targetCache *memsto.TargetCacheType,
userCache *memsto.UserCacheType, event *models.AlertCurEvent) {
logger.Infof("event_callback_ibex: id: %d, host: %s, event: %+v", id, host, event)
tpl := taskTplCache.Get(id)
if tpl == nil {
logger.Errorf("event_callback_ibex: no such tpl(%d)", id)
logger.Errorf("event_callback_ibex: no such tpl(%d), event: %+v", id, event)
return
}
// check perm
// tpl.GroupId - host - account 三元组校验权限
can, err := canDoIbex(tpl.UpdateBy, tpl, host, targetCache, userCache)
if err != nil {
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
logger.Errorf("event_callback_ibex: check perm fail: %v, event: %+v", err, event)
return
}
if !can {
logger.Errorf("event_callback_ibex: user(%s) no permission", tpl.UpdateBy)
logger.Errorf("event_callback_ibex: user(%s) no permission, event: %+v", tpl.UpdateBy, event)
return
}
@@ -122,7 +133,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
tags, err := json.Marshal(tagsMap)
if err != nil {
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v", tagsMap)
logger.Errorf("event_callback_ibex: failed to marshal tags to json: %v, event: %+v", tagsMap, event)
return
}
@@ -145,7 +156,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
id, err = TaskAdd(in, tpl.UpdateBy, ctx.IsCenter)
if err != nil {
logger.Errorf("event_callback_ibex: call ibex fail: %v", err)
logger.Errorf("event_callback_ibex: call ibex fail: %v, event: %+v", err, event)
return
}
@@ -167,7 +178,7 @@ func CallIbex(ctx *ctx.Context, id int64, host string,
}
if err = record.Add(ctx); err != nil {
logger.Errorf("event_callback_ibex: persist task_record fail: %v", err)
logger.Errorf("event_callback_ibex: persist task_record fail: %v, event: %+v", err, event)
}
}
@@ -187,7 +198,7 @@ func canDoIbex(username string, tpl *models.TaskTpl, host string, targetCache *m
func TaskAdd(f models.TaskForm, authUser string, isCenter bool) (int64, error) {
if storage.Cache == nil {
logger.Warning("event_callback_ibex: redis cache is nil")
logger.Warningf("event_callback_ibex: redis cache is nil, task: %+v", f)
return 0, fmt.Errorf("redis cache is nil")
}

View File

@@ -142,6 +142,13 @@ ops:
cname: View Logs
- name: "/log/index-patterns"
cname: View Index Patterns
- name: "/log/index-patterns/add"
cname: Add Index Pattern
- name: "/log/index-patterns/put"
cname: Modify Index Pattern
- name: "/log/index-patterns/del"
cname: Delete Index Pattern
- name: alert
cname: Alert Rules
@@ -323,6 +330,18 @@ ops:
- name: "/notification-rules/del"
cname: Delete Notify Rules
- name: event-pipelines
cname: Event Pipelines
ops:
- name: "/event-pipelines"
cname: View Event Pipelines
- name: "/event-pipelines/add"
cname: Add Event Pipeline
- name: "/event-pipelines/put"
cname: Modify Event Pipeline
- name: "/event-pipelines/del"
cname: Delete Event Pipeline
- name: notify-channels
cname: Notify Channels
ops:

View File

@@ -481,9 +481,9 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/es-index-pattern", rt.auth(), rt.esIndexPatternGet)
pages.GET("/es-index-pattern-list", rt.auth(), rt.esIndexPatternGetList)
pages.POST("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternAdd)
pages.PUT("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternPut)
pages.DELETE("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternDel)
pages.POST("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/add"), rt.esIndexPatternAdd)
pages.PUT("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/put"), rt.esIndexPatternPut)
pages.DELETE("/es-index-pattern", rt.auth(), rt.user(), rt.perm("/log/index-patterns/del"), rt.esIndexPatternDel)
pages.GET("/embedded-dashboards", rt.auth(), rt.user(), rt.perm("/embedded-dashboards"), rt.embeddedDashboardsGet)
pages.PUT("/embedded-dashboards", rt.auth(), rt.user(), rt.perm("/embedded-dashboards/put"), rt.embeddedDashboardsPut)
@@ -527,6 +527,16 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/notify-rules", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRulesGet)
pages.POST("/notify-rule/test", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyTest)
pages.GET("/notify-rule/custom-params", rt.auth(), rt.user(), rt.perm("/notification-rules"), rt.notifyRuleCustomParamsGet)
pages.POST("/notify-rule/event-pipelines-tryrun", rt.auth(), rt.user(), rt.perm("/notification-rules/add"), rt.tryRunEventProcessorByNotifyRule)
// 事件Pipeline相关路由
pages.GET("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.eventPipelinesList)
pages.POST("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/add"), rt.addEventPipeline)
pages.PUT("/event-pipeline", rt.auth(), rt.user(), rt.perm("/event-pipelines/put"), rt.updateEventPipeline)
pages.GET("/event-pipeline/:id", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.getEventPipeline)
pages.DELETE("/event-pipelines", rt.auth(), rt.user(), rt.perm("/event-pipelines/del"), rt.deleteEventPipelines)
pages.POST("/event-pipeline-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventPipeline)
pages.POST("/event-processor-tryrun", rt.auth(), rt.user(), rt.perm("/event-pipelines"), rt.tryRunEventProcessor)
pages.POST("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/add"), rt.notifyChannelsAdd)
pages.DELETE("/notify-channel-configs", rt.auth(), rt.user(), rt.perm("/notification-channels/del"), rt.notifyChannelsDel)
@@ -647,6 +657,7 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/message-templates", rt.messageTemplateGets)
service.GET("/event-pipelines", rt.eventPipelinesListByService)
}
}

View File

@@ -123,7 +123,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
}
err = req.Add(rt.Ctx)
} else {
err = req.Update(rt.Ctx, "name", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
err = req.Update(rt.Ctx, "name", "identifier", "description", "cluster_name", "settings", "http", "auth", "updated_by", "updated_at", "is_default")
}
Render(c, nil, err)

View File

@@ -0,0 +1,217 @@
package router
import (
"net/http"
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
// 获取事件Pipeline列表
func (rt *Router) eventPipelinesList(c *gin.Context) {
me := c.MustGet("user").(*models.User)
pipelines, err := models.ListEventPipelines(rt.Ctx)
ginx.Dangerous(err)
allTids := make([]int64, 0)
for _, pipeline := range pipelines {
allTids = append(allTids, pipeline.TeamIds...)
}
ugMap, err := models.UserGroupIdAndNameMap(rt.Ctx, allTids)
ginx.Dangerous(err)
for _, pipeline := range pipelines {
for _, tid := range pipeline.TeamIds {
pipeline.TeamNames = append(pipeline.TeamNames, ugMap[tid])
}
}
gids, err := models.MyGroupIdsMap(rt.Ctx, me.Id)
ginx.Dangerous(err)
if me.IsAdmin() {
ginx.NewRender(c).Data(pipelines, nil)
return
}
res := make([]*models.EventPipeline, 0)
for _, pipeline := range pipelines {
for _, tid := range pipeline.TeamIds {
if _, ok := gids[tid]; ok {
res = append(res, pipeline)
break
}
}
}
ginx.NewRender(c).Data(res, nil)
}
// 获取单个事件Pipeline详情
func (rt *Router) getEventPipeline(c *gin.Context) {
me := c.MustGet("user").(*models.User)
id := ginx.UrlParamInt64(c, "id")
pipeline, err := models.GetEventPipeline(rt.Ctx, id)
ginx.Dangerous(err)
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
err = pipeline.FillTeamNames(rt.Ctx)
ginx.Dangerous(err)
ginx.NewRender(c).Data(pipeline, nil)
}
// 创建事件Pipeline
func (rt *Router) addEventPipeline(c *gin.Context) {
var pipeline models.EventPipeline
ginx.BindJSON(c, &pipeline)
user := c.MustGet("user").(*models.User)
now := time.Now().Unix()
pipeline.CreateBy = user.Username
pipeline.CreateAt = now
pipeline.UpdateAt = now
pipeline.UpdateBy = user.Username
err := pipeline.Verify()
if err != nil {
ginx.Bomb(http.StatusBadRequest, err.Error())
}
ginx.Dangerous(user.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
err = models.CreateEventPipeline(rt.Ctx, &pipeline)
ginx.NewRender(c).Message(err)
}
// 更新事件Pipeline
func (rt *Router) updateEventPipeline(c *gin.Context) {
var f models.EventPipeline
ginx.BindJSON(c, &f)
me := c.MustGet("user").(*models.User)
f.UpdateBy = me.Username
f.UpdateAt = time.Now().Unix()
pipeline, err := models.GetEventPipeline(rt.Ctx, f.ID)
if err != nil {
ginx.Bomb(http.StatusNotFound, "No such event pipeline")
}
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
ginx.NewRender(c).Message(pipeline.Update(rt.Ctx, &f))
}
// 删除事件Pipeline
func (rt *Router) deleteEventPipelines(c *gin.Context) {
var f struct {
Ids []int64 `json:"ids"`
}
ginx.BindJSON(c, &f)
if len(f.Ids) == 0 {
ginx.Bomb(http.StatusBadRequest, "ids required")
}
me := c.MustGet("user").(*models.User)
for _, id := range f.Ids {
pipeline, err := models.GetEventPipeline(rt.Ctx, id)
ginx.Dangerous(err)
ginx.Dangerous(me.CheckGroupPermission(rt.Ctx, pipeline.TeamIds))
}
err := models.DeleteEventPipelines(rt.Ctx, f.Ids)
ginx.NewRender(c).Message(err)
}
// 测试事件Pipeline
func (rt *Router) tryRunEventPipeline(c *gin.Context) {
var f struct {
EventId int64 `json:"event_id"`
PipelineConfig models.EventPipeline `json:"pipeline_config"`
}
ginx.BindJSON(c, &f)
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
if err != nil || hisEvent == nil {
ginx.Bomb(http.StatusBadRequest, "event not found")
}
event := hisEvent.ToCur()
for _, p := range f.PipelineConfig.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found, error: %s", p, err.Error())
}
processor.Process(rt.Ctx, event)
}
ginx.NewRender(c).Data(event, nil)
}
// 测试事件处理器
func (rt *Router) tryRunEventProcessor(c *gin.Context) {
var f struct {
EventId int64 `json:"event_id"`
ProcessorConfig models.Processor `json:"processor_config"`
}
ginx.BindJSON(c, &f)
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
if err != nil || hisEvent == nil {
ginx.Bomb(http.StatusBadRequest, "event not found")
}
event := hisEvent.ToCur()
processor, err := pipeline.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor type: %s, error: %s", f.ProcessorConfig.Typ, err.Error())
}
processor.Process(rt.Ctx, event)
ginx.NewRender(c).Data(event, nil)
}
func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
var f struct {
EventId int64 `json:"event_id"`
PipelineConfigs []models.PipelineConfig `json:"pipeline_configs"`
}
ginx.BindJSON(c, &f)
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
if err != nil || hisEvent == nil {
ginx.Bomb(http.StatusBadRequest, "event not found")
}
event := hisEvent.ToCur()
pids := make([]int64, 0)
for _, pc := range f.PipelineConfigs {
if pc.Enable {
pids = append(pids, pc.PipelineId)
}
}
pipelines, err := models.GetEventPipelinesByIds(rt.Ctx, pids)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processors not found")
}
for _, pl := range pipelines {
for _, p := range pl.Processors {
processor, err := pipeline.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
}
processor.Process(rt.Ctx, event)
}
}
ginx.NewRender(c).Data(event, nil)
}
func (rt *Router) eventPipelinesListByService(c *gin.Context) {
pipelines, err := models.ListEventPipelines(rt.Ctx)
ginx.NewRender(c).Data(pipelines, err)
}

View File

@@ -40,6 +40,10 @@ func (rt *Router) statistic(c *gin.Context) {
model = models.NotifyRule{}
case "notify_channel":
model = models.NotifyChannel{}
case "event_pipeline":
statistics, err = models.EventPipelineStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
case "datasource":
// datasource update_at is different from others
statistics, err = models.DatasourceStatistics(rt.Ctx)

View File

@@ -3,6 +3,7 @@ package router
import (
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/models"
@@ -38,71 +39,116 @@ type LogResp struct {
List []interface{} `json:"list"`
}
func (rt *Router) QueryLogBatch(c *gin.Context) {
var f QueryFrom
ginx.BindJSON(c, &f)
func QueryLogBatchConcurrently(anonymousAccess bool, ctx *gin.Context, f QueryFrom) (LogResp, error) {
var resp LogResp
var errMsg string
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Queries {
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, q.Did, q.DsCate, q) {
ginx.Bomb(200, "no permission")
if !anonymousAccess && !CheckDsPerm(ctx, q.Did, q.DsCate, q) {
return LogResp{}, fmt.Errorf("no permission")
}
plug, exists := dscache.DsCache.Get(q.DsCate, q.Did)
if !exists {
logger.Warningf("cluster:%d not exists query:%+v", q.Did, q)
ginx.Bomb(200, "cluster not exists")
return LogResp{}, fmt.Errorf("cluster not exists")
}
data, total, err := plug.QueryLog(c.Request.Context(), q.Query)
if err != nil {
errMsg += fmt.Sprintf("query data error: %v query:%v\n ", err, q)
logger.Warningf("query data error: %v query:%v", err, q)
continue
}
wg.Add(1)
go func(query Query) {
defer wg.Done()
m := make(map[string]interface{})
m["ref"] = q.Ref
m["ds_id"] = q.Did
m["ds_cate"] = q.DsCate
m["data"] = data
resp.List = append(resp.List, m)
resp.Total += total
data, total, err := plug.QueryLog(ctx.Request.Context(), query.Query)
mu.Lock()
defer mu.Unlock()
if err != nil {
errMsg := fmt.Sprintf("query data error: %v query:%v\n ", err, query)
logger.Warningf(errMsg)
errs = append(errs, err)
return
}
m := make(map[string]interface{})
m["ref"] = query.Ref
m["ds_id"] = query.Did
m["ds_cate"] = query.DsCate
m["data"] = data
resp.List = append(resp.List, m)
resp.Total += total
}(q)
}
if errMsg != "" || len(resp.List) == 0 {
ginx.Bomb(200, errMsg)
wg.Wait()
if len(errs) > 0 {
return LogResp{}, errs[0]
}
if len(resp.List) == 0 {
return LogResp{}, fmt.Errorf("no data")
}
return resp, nil
}
func (rt *Router) QueryLogBatch(c *gin.Context) {
var f QueryFrom
ginx.BindJSON(c, &f)
resp, err := QueryLogBatchConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
if err != nil {
ginx.Bomb(200, "err:%v", err)
}
ginx.NewRender(c).Data(resp, nil)
}
func (rt *Router) QueryData(c *gin.Context) {
var f models.QueryParam
ginx.BindJSON(c, &f)
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var resp []models.DataResp
var err error
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Querys {
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, f.DatasourceId, f.Cate, q) {
ginx.Bomb(403, "no permission")
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
return nil, fmt.Errorf("no permission")
}
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
if !exists {
logger.Warningf("cluster:%d not exists", f.DatasourceId)
ginx.Bomb(200, "cluster not exists")
return nil, fmt.Errorf("cluster not exists")
}
var datas []models.DataResp
datas, err = plug.QueryData(c.Request.Context(), q)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", q, err)
ginx.Bomb(200, "err:%v", err)
}
logger.Debugf("query data: req:%+v resp:%+v", q, datas)
resp = append(resp, datas...)
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
datas, err := plug.QueryData(ctx.Request.Context(), query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
logger.Debugf("query data: req:%+v resp:%+v", query, datas)
mu.Lock()
resp = append(resp, datas...)
mu.Unlock()
}(q)
}
wg.Wait()
if len(errs) > 0 {
return nil, errs[0]
}
// 面向API的统一处理
// 按照 .Metric 排序
// 确保仪表盘中相同图例的曲线颜色相同
@@ -115,41 +161,80 @@ func (rt *Router) QueryData(c *gin.Context) {
})
}
ginx.NewRender(c).Data(resp, err)
return resp, nil
}
func (rt *Router) QueryData(c *gin.Context) {
var f models.QueryParam
ginx.BindJSON(c, &f)
resp, err := QueryDataConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
if err != nil {
ginx.Bomb(200, "err:%v", err)
}
ginx.NewRender(c).Data(resp, nil)
}
// QueryLogConcurrently 并发查询日志
func QueryLogConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) (LogResp, error) {
var resp LogResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Querys {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
return LogResp{}, fmt.Errorf("no permission")
}
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
if !exists {
logger.Warningf("cluster:%d not exists query:%+v", f.DatasourceId, f)
return LogResp{}, fmt.Errorf("cluster not exists")
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, total, err := plug.QueryLog(ctx.Request.Context(), query)
logger.Debugf("query log: req:%+v resp:%+v", query, data)
if err != nil {
errMsg := fmt.Sprintf("query data error: %v query:%v\n ", err, query)
logger.Warningf(errMsg)
mu.Lock()
errs = append(errs, err)
mu.Unlock()
return
}
mu.Lock()
resp.List = append(resp.List, data...)
resp.Total += total
mu.Unlock()
}(q)
}
wg.Wait()
if len(errs) > 0 {
return LogResp{}, errs[0]
}
if len(resp.List) == 0 {
return LogResp{}, fmt.Errorf("no data")
}
return resp, nil
}
func (rt *Router) QueryLogV2(c *gin.Context) {
var f models.QueryParam
ginx.BindJSON(c, &f)
var resp LogResp
var errMsg string
for _, q := range f.Querys {
if !rt.Center.AnonymousAccess.PromQuerier && !CheckDsPerm(c, f.DatasourceId, f.Cate, q) {
ginx.Bomb(200, "no permission")
}
plug, exists := dscache.DsCache.Get(f.Cate, f.DatasourceId)
if !exists {
logger.Warningf("cluster:%d not exists query:%+v", f.DatasourceId, f)
ginx.Bomb(200, "cluster not exists")
}
data, total, err := plug.QueryLog(c.Request.Context(), q)
if err != nil {
errMsg += fmt.Sprintf("query data error: %v query:%v\n ", err, q)
logger.Warningf("query data error: %v query:%v", err, q)
continue
}
resp.List = append(resp.List, data...)
resp.Total += total
}
if errMsg != "" || len(resp.List) == 0 {
ginx.Bomb(200, errMsg)
}
ginx.NewRender(c).Data(resp, nil)
resp, err := QueryLogConcurrently(rt.Center.AnonymousAccess.PromQuerier, c, f)
ginx.NewRender(c).Data(resp, err)
}
func (rt *Router) QueryLog(c *gin.Context) {

View File

@@ -24,6 +24,7 @@ type Query struct {
Index string `json:"index" mapstructure:"index"`
IndexPatternId int64 `json:"index_pattern" mapstructure:"index_pattern"`
Filter string `json:"filter" mapstructure:"filter"`
Offset int64 `json:"offset" mapstructure:"offset"`
MetricAggr MetricAggr `json:"value" mapstructure:"value"`
GroupBy []GroupBy `json:"group_by" mapstructure:"group_by"`
DateField string `json:"date_field" mapstructure:"date_field"`
@@ -372,6 +373,11 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
start = start - delay
}
if param.Offset > 0 {
end = end - param.Offset
start = start - param.Offset
}
q.Gte(time.Unix(start, 0).UnixMilli())
q.Lte(time.Unix(end, 0).UnixMilli())
q.Format("epoch_millis")

View File

@@ -537,7 +537,7 @@ CREATE TABLE `builtin_components` (
`updated_by` varchar(191) NOT NULL DEFAULT '' COMMENT '''updater''',
`disabled` int NOT NULL DEFAULT 0 COMMENT '''is disabled or not''',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_ident` (`ident`)
KEY (`ident`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `builtin_payloads` (
@@ -793,6 +793,7 @@ CREATE TABLE `notify_rule` (
`enable` tinyint(1) not null default 0,
`user_group_ids` varchar(255) not null default '',
`notify_configs` text,
`pipeline_configs` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,
@@ -833,6 +834,22 @@ CREATE TABLE `message_template` (
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `event_pipeline` (
`id` bigint unsigned not null auto_increment,
`name` varchar(128) not null,
`team_ids` text,
`description` varchar(255) not null default '',
`filter_enable` tinyint(1) not null default 0,
`label_filters` text,
`attribute_filters` text,
`processors` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,
`update_by` varchar(64) not null default '',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `task_meta`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
@@ -2189,4 +2206,4 @@ CREATE TABLE task_host_99
UNIQUE KEY `idx_id_host` (`id`, `host`),
PRIMARY KEY (`ii`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
DEFAULT CHARSET = utf8mb4;

View File

@@ -226,3 +226,22 @@ ALTER TABLE `notify_channel` ADD COLUMN `weight` int not null default 0;
/* v8.0.0-beta.11 2025-04-10 */
ALTER TABLE `es_index_pattern` ADD COLUMN `note` varchar(1024) not null default '';
ALTER TABLE `datasource` ADD COLUMN `identifier` varchar(255) not null default '';
/* v8.0.0-beta.11 2025-05-15 */
ALTER TABLE `notify_rule` ADD COLUMN `pipeline_configs` text;
CREATE TABLE `event_pipeline` (
`id` bigint unsigned not null auto_increment,
`name` varchar(128) not null,
`team_ids` text,
`description` varchar(255) not null default '',
`filter_enable` tinyint(1) not null default 0,
`label_filters` text,
`attribute_filters` text,
`processors` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,
`update_by` varchar(64) not null default '',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

View File

@@ -47,6 +47,7 @@ var PromDefaultDatasourceId int64
func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
for {
if !fromAPI {
foundDefaultDatasource := false
items, err := models.GetDatasources(ctx)
if err != nil {
logger.Errorf("get datasource from database fail: %v", err)
@@ -58,6 +59,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
for _, item := range items {
if item.PluginType == "prometheus" && item.IsDefault {
atomic.StoreInt64(&PromDefaultDatasourceId, item.Id)
foundDefaultDatasource = true
}
logger.Debugf("get datasource: %+v", item)
@@ -90,6 +92,12 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
}
dss = append(dss, ds)
}
if !foundDefaultDatasource && atomic.LoadInt64(&PromDefaultDatasourceId) != 0 {
logger.Debugf("no default datasource found")
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
PutDatasources(dss)
} else {
FromAPIHook()
@@ -183,7 +191,14 @@ func PutDatasources(items []datasource.DatasourceInfo) {
ids = append(ids, item.Id)
// 异步初始化 client 不然数据源同步的会很慢
go DsCache.Put(typ, item.Id, ds)
go func() {
defer func() {
if r := recover(); r != nil {
logger.Errorf("panic in datasource item: %+v panic:%v", item, r)
}
}()
DsCache.Put(typ, item.Id, ds)
}()
}
logger.Debugf("get plugin by type success Ids:%v", ids)

View File

@@ -1,5 +1,5 @@
{
"name": " Kubernetes-Deployment/ Container",
"name": "Kubernetes / Deployment / Container",
"tags": "Categraf",
"configs": {
"panels": [

View File

@@ -1,7 +1,7 @@
{
"id": 0,
"group_id": 0,
"name": "Kubernetes / Container",
"name": "Kubernetes / Pod",
"ident": "",
"tags": "Categraf",
"create_at": 0,
@@ -1748,20 +1748,34 @@
],
"var": [
{
"definition": "prometheus",
"name": "datasource",
"type": "datasource"
"type": "datasource",
"definition": "prometheus",
"defaultValue": 40
},
{
"name": "namespace",
"type": "query",
"hide": false,
"datasource": {
"cate": "prometheus",
"value": "${datasource}"
},
"definition": "label_values(container_cpu_usage_seconds_total, pod)",
"multi": false,
"name": "pod_name",
"definition": "label_values(container_cpu_usage_seconds_total, namespace)",
"reg": "",
"type": "query"
"multi": false
},
{
"name": "pod_name",
"type": "query",
"hide": false,
"datasource": {
"cate": "prometheus",
"value": "${datasource}"
},
"definition": "label_values(container_cpu_usage_seconds_total{namespace=\"$namespace\"}, pod)",
"reg": "",
"multi": false
}
],
"version": "3.0.0"

View File

@@ -1,5 +1,5 @@
{
"name": " Kubernetes-Statefulset / Container ",
"name": "Kubernetes / Statefulset / Container ",
"tags": "Categraf",
"configs": {
"panels": [

View File

@@ -0,0 +1,342 @@
[
{
"uuid": 1745735239727485700,
"collector": "Node",
"typ": "Kubernetes",
"name": "TCP当前连接数",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_netstat_Tcp_CurrEstab * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239701096000,
"collector": "Node",
"typ": "Kubernetes",
"name": "文件描述符使用数",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_filefd_allocated * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239704160000,
"collector": "Node",
"typ": "Kubernetes",
"name": "文件描述符最大限制",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_filefd_maximum * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239750006800,
"collector": "Node",
"typ": "Kubernetes",
"name": "文件系统inode使用率",
"unit": "",
"note": "节点指标\n类型: -",
"lang": "zh_CN",
"expression": "100 - (node_filesystem_files_free * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} / node_filesystem_files * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} * 100)"
},
{
"uuid": 1745735239746991600,
"collector": "Node",
"typ": "Kubernetes",
"name": "文件系统使用率",
"unit": "",
"note": "节点指标\n类型: -",
"lang": "zh_CN",
"expression": "100 - ((node_filesystem_avail_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} * 100) / node_filesystem_size_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239753550000,
"collector": "Node",
"typ": "Kubernetes",
"name": "文件系统错误数",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(node_filesystem_device_error * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}) by (mountpoint)"
},
{
"uuid": 1745735239743097300,
"collector": "Node",
"typ": "Kubernetes",
"name": "磁盘IO使用率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_disk_io_now[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239740169500,
"collector": "Node",
"typ": "Kubernetes",
"name": "磁盘写入IOPS",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_disk_writes_completed_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239734228700,
"collector": "Node",
"typ": "Kubernetes",
"name": "磁盘写入速率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_disk_written_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239737122600,
"collector": "Node",
"typ": "Kubernetes",
"name": "磁盘读取IOPS",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_disk_reads_completed_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239730406000,
"collector": "Node",
"typ": "Kubernetes",
"name": "磁盘读取速率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_disk_read_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239694202600,
"collector": "Node",
"typ": "Kubernetes",
"name": "系统上下文切换率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_context_switches_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239697167400,
"collector": "Node",
"typ": "Kubernetes",
"name": "系统中断率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "rate(node_intr_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239724650200,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络发送丢包率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_transmit_drop_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239710266000,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络发送带宽",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_transmit_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239716205000,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络发送错误率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_transmit_errs_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239721688800,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络接收丢包率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_receive_drop_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239707241500,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络接收带宽",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_receive_bytes_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239713318000,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络接收错误率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(rate(node_network_receive_errs_total[5m]) * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239783181800,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络连接跟踪条目数",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_nf_conntrack_entries * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239786134000,
"collector": "Node",
"typ": "Kubernetes",
"name": "网络连接跟踪限制",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_nf_conntrack_entries_limit * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239675145700,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点 CPU 使用率",
"unit": "",
"note": "节点指标\n类型: by",
"lang": "zh_CN",
"expression": "sum by (instance) (rate(node_cpu_seconds_total{mode!~\"idle|iowait|steal\"}[5m])) * on(instance) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} *100"
},
{
"uuid": 1745735239691192000,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点15分钟负载",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_load15 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239685264100,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点1分钟负载",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_load1 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239688232700,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点5分钟负载",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_load5 * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239776256800,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点Swap使用量",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_SwapTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} - node_memory_SwapFree_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239779806500,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点Swap总量",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_SwapTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239681529300,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点上运行的Pod数量",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(kube_pod_info * on(node) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239678397700,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存使用率",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "sum(node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"} - node_memory_MemAvailable_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}) / sum(node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"})"
},
{
"uuid": 1745735239760507400,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存详细信息 - 可用",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_MemAvailable_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239756641800,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存详细信息 - 总量",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_MemTotal_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239772786200,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存详细信息 - 空闲",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_MemFree_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239769542000,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存详细信息 - 缓冲区",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_Buffers_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
},
{
"uuid": 1745735239764136000,
"collector": "Node",
"typ": "Kubernetes",
"name": "节点内存详细信息 - 缓存",
"unit": "",
"note": "节点指标\n类型: *",
"lang": "zh_CN",
"expression": "node_memory_Cached_bytes * on(instance, cluster) group_left(nodename) node_uname_info{nodename=~\"$node_name\"}"
}
]

View File

@@ -0,0 +1,282 @@
[
{
"uuid": 1745893024149445000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "Inode数量",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_fs_inodes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024121015300,
"collector": "Pod",
"typ": "Kubernetes",
"name": "不可中断任务数量",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_tasks_state{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\", state=\"uninterruptible\"}) by (name)"
},
{
"uuid": 1745893024130551800,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器cache使用",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "(sum(container_memory_cache{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
},
{
"uuid": 1745893024108569900,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器CPU Limit",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\",",
"lang": "zh_CN",
"expression": "(sum(container_spec_cpu_quota{namespace=\"$namespace\", pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\", pod=~\"$pod_name\"}) by (name))"
},
{
"uuid": 1745893024112672500,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器CPU load 10",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_cpu_load_average_10s{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024026246700,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器CPU使用率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_cpu_usage_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
},
{
"uuid": 1745893024029544000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器CPU归一化后使用率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_cpu_usage_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)/((sum(container_spec_cpu_quota{namespace=\"$namespace\", pod=~\"$pod_name\"}/container_spec_cpu_period{namespace=\"$namespace\", pod=~\"$pod_name\"}) by (name)))"
},
{
"uuid": 1745893024146207700,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器I/O",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_fs_io_current{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024136457000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器RSS内存使用",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "(sum(container_memory_rss{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
},
{
"uuid": 1745893024139900200,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器内存 Limit",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_spec_memory_limit_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024032984300,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器内存使用",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "(sum(container_memory_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name))"
},
{
"uuid": 1745893024127585500,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器内存使用率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "((sum(container_memory_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)) /(sum(container_spec_memory_limit_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)))*100"
},
{
"uuid": 1745893024093620000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器内核态CPU使用率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_cpu_system_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
},
{
"uuid": 1745893024102879000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器发生CPU throttle的比率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_cpu_cfs_throttled_periods_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m]))by(name) *100"
},
{
"uuid": 1745893024143177000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器发生OOM次数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_oom_events_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024083942000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器启动时长(小时)",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum((time()-container_start_time_seconds{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"})) by (name)"
},
{
"uuid": 1745893024152466200,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器已使用的文件系统大小",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(container_fs_usage_bytes{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}) by (name)"
},
{
"uuid": 1745893024097849600,
"collector": "Pod",
"typ": "Kubernetes",
"name": "容器用户态CPU使用率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_cpu_user_seconds_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])*100) by(name)"
},
{
"uuid": 1745893024036896800,
"collector": "Pod",
"typ": "Kubernetes",
"name": "文件系统写入速率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_fs_writes_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])) by(name)"
},
{
"uuid": 1745893024057722000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "文件系统读取速率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\",",
"lang": "zh_CN",
"expression": "sum(rate(container_fs_reads_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\", image!~\".*pause.*\"}[1m])) by(name)"
},
{
"uuid": 1745893024166898000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络发送丢包数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_transmit_packets_dropped_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024160266500,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络发送数据包",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_transmit_packets_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024069935000,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络发送速率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_transmit_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024163721700,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络发送错误数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_transmit_errors_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024173485600,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络接收丢包数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_receive_packets_dropped_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024156389600,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络接收数据包数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_receive_packets_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024075864800,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络接收速率",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_receive_bytes_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
},
{
"uuid": 1745893024170233300,
"collector": "Pod",
"typ": "Kubernetes",
"name": "网络接收错误数",
"unit": "",
"note": "Pod自身指标\n类型: pod=~\"$pod_name\"}[1m]))",
"lang": "zh_CN",
"expression": "sum(rate(container_network_receive_errors_total{namespace=\"$namespace\", pod=~\"$pod_name\"}[1m])) by(name, interface)"
}
]

View File

@@ -0,0 +1,8 @@
# # collect interval
# interval = 15
# # ntp servers
# ntp_servers = ["ntp.aliyun.com"]
# # response time out seconds
# timeout = 5

View File

@@ -53,4 +53,8 @@ nr_alloc_batch = 0
## arp_package
统计 ARP 包的数量,该插件依赖 cgo如果需要该插件需要下载 `with-cgo` 的 categraf 发布包。
统计 ARP 包的数量,该插件依赖 cgo如果需要该插件需要下载 `with-cgo` 的 categraf 发布包。
## ntp
监控机器时间偏移量,只需要给出 ntp 服务端地址Categraf 就会周期性去请求,对比本机时间,得到偏移量,监控指标是 ntp_offset_ms 顾名思义,单位是毫秒,一般这个值不能超过 1000

View File

@@ -0,0 +1,140 @@
package memsto
import (
"fmt"
"sync"
"time"
"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
type EventProcessorCacheType struct {
statTotal int64
statLastUpdated int64
ctx *ctx.Context
stats *Stats
sync.RWMutex
processors map[int64]*models.EventPipeline // key: pipeline id
}
func NewEventProcessorCache(ctx *ctx.Context, stats *Stats) *EventProcessorCacheType {
epc := &EventProcessorCacheType{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
stats: stats,
processors: make(map[int64]*models.EventPipeline),
}
epc.SyncEventProcessors()
return epc
}
func (epc *EventProcessorCacheType) Reset() {
epc.Lock()
defer epc.Unlock()
epc.statTotal = -1
epc.statLastUpdated = -1
epc.processors = make(map[int64]*models.EventPipeline)
}
func (epc *EventProcessorCacheType) StatChanged(total, lastUpdated int64) bool {
if epc.statTotal == total && epc.statLastUpdated == lastUpdated {
return false
}
return true
}
func (epc *EventProcessorCacheType) Set(m map[int64]*models.EventPipeline, total, lastUpdated int64) {
epc.Lock()
epc.processors = m
epc.Unlock()
// only one goroutine used, so no need lock
epc.statTotal = total
epc.statLastUpdated = lastUpdated
}
func (epc *EventProcessorCacheType) Get(processorId int64) *models.EventPipeline {
epc.RLock()
defer epc.RUnlock()
return epc.processors[processorId]
}
func (epc *EventProcessorCacheType) GetProcessorIds() []int64 {
epc.RLock()
defer epc.RUnlock()
count := len(epc.processors)
list := make([]int64, 0, count)
for processorId := range epc.processors {
list = append(list, processorId)
}
return list
}
func (epc *EventProcessorCacheType) SyncEventProcessors() {
err := epc.syncEventProcessors()
if err != nil {
fmt.Println("failed to sync event processors:", err)
exit(1)
}
go epc.loopSyncEventProcessors()
}
func (epc *EventProcessorCacheType) loopSyncEventProcessors() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := epc.syncEventProcessors(); err != nil {
logger.Warning("failed to sync event processors:", err)
}
}
}
func (epc *EventProcessorCacheType) syncEventProcessors() error {
start := time.Now()
stat, err := models.EventPipelineStatistics(epc.ctx)
if err != nil {
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
return errors.WithMessage(err, "failed to exec StatisticsGet for EventPipeline")
}
if !epc.StatChanged(stat.Total, stat.LastUpdated) {
epc.stats.GaugeCronDuration.WithLabelValues("sync_event_processors").Set(0)
epc.stats.GaugeSyncNumber.WithLabelValues("sync_event_processors").Set(0)
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "not changed")
return nil
}
lst, err := models.ListEventPipelines(epc.ctx)
if err != nil {
dumper.PutSyncRecord("event_processors", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to exec ListEventPipelines")
}
m := make(map[int64]*models.EventPipeline)
for i := 0; i < len(lst); i++ {
m[lst[i].ID] = lst[i]
}
epc.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
epc.stats.GaugeCronDuration.WithLabelValues("sync_event_processors").Set(float64(ms))
epc.stats.GaugeSyncNumber.WithLabelValues("sync_event_processors").Set(float64(len(m)))
logger.Infof("timer: sync event processors done, cost: %dms, number: %d", ms, len(m))
dumper.PutSyncRecord("event_processors", start.Unix(), ms, len(m), "success")
return nil
}

View File

@@ -41,30 +41,30 @@ type AlertCurEvent struct {
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"`
NotifyRecovered int `json:"notify_recovered"`
NotifyChannels string `json:"-"` // for db
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // for db
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
NotifyChannels string `json:"-"` // for db
NotifyChannelsJSON []string `json:"notify_channels,omitempty" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // for db
NotifyGroupsJSON []string `json:"notify_groups,omitempty" gorm:"-"` // for fe
NotifyGroupsObj []*UserGroup `json:"notify_groups_obj,omitempty" gorm:"-"` // for fe
TargetIdent string `json:"target_ident"`
TargetNote string `json:"target_note"`
TriggerTime int64 `json:"trigger_time"`
TriggerValue string `json:"trigger_value"`
TriggerValues string `json:"trigger_values" gorm:"-"`
TriggerValuesJson EventTriggerValues `json:"trigger_values_json" gorm:"-"`
Tags string `json:"-"` // for db
TagsJSON []string `json:"tags" gorm:"-"` // for fe
TagsMap map[string]string `json:"tags_map" gorm:"-"` // for internal usage
OriginalTags string `json:"-"` // for db
OriginalTagsJSON []string `json:"original_tags" gorm:"-"` // for fe
Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py
NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py
LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间
LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
Tags string `json:"-"` // for db
TagsJSON []string `json:"tags" gorm:"-"` // for fe
TagsMap map[string]string `json:"tags_map" gorm:"-"` // for internal usage
OriginalTags string `json:"-"` // for db
OriginalTagsJSON []string `json:"original_tags" gorm:"-"` // for fe
Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py
NotifyUsersObj []*User `json:"notify_users_obj,omitempty" gorm:"-"` // for notify.py
LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间
LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
ExtraConfig interface{} `json:"extra_config" gorm:"-"`
Status int `json:"status" gorm:"-"`
Claimant string `json:"claimant" gorm:"-"`
@@ -869,3 +869,100 @@ func AlertCurEventStatistics(ctx *ctx.Context, stime time.Time) map[string]inter
return res
}
func (e *AlertCurEvent) DeepCopy() *AlertCurEvent {
eventCopy := *e
// 复制指针字段
if e.NotifyGroupsObj != nil {
eventCopy.NotifyGroupsObj = make([]*UserGroup, len(e.NotifyGroupsObj))
for i, group := range e.NotifyGroupsObj {
if group != nil {
groupCopy := *group
eventCopy.NotifyGroupsObj[i] = &groupCopy
}
}
}
if e.NotifyUsersObj != nil {
eventCopy.NotifyUsersObj = make([]*User, len(e.NotifyUsersObj))
for i, user := range e.NotifyUsersObj {
if user != nil {
userCopy := *user
eventCopy.NotifyUsersObj[i] = &userCopy
}
}
}
if e.Target != nil {
targetCopy := *e.Target
eventCopy.Target = &targetCopy
}
// 复制切片字段
if e.CallbacksJSON != nil {
eventCopy.CallbacksJSON = make([]string, len(e.CallbacksJSON))
copy(eventCopy.CallbacksJSON, e.CallbacksJSON)
}
if e.NotifyChannelsJSON != nil {
eventCopy.NotifyChannelsJSON = make([]string, len(e.NotifyChannelsJSON))
copy(eventCopy.NotifyChannelsJSON, e.NotifyChannelsJSON)
}
if e.NotifyGroupsJSON != nil {
eventCopy.NotifyGroupsJSON = make([]string, len(e.NotifyGroupsJSON))
copy(eventCopy.NotifyGroupsJSON, e.NotifyGroupsJSON)
}
if e.TagsJSON != nil {
eventCopy.TagsJSON = make([]string, len(e.TagsJSON))
copy(eventCopy.TagsJSON, e.TagsJSON)
}
if e.TagsMap != nil {
eventCopy.TagsMap = make(map[string]string, len(e.TagsMap))
for k, v := range e.TagsMap {
eventCopy.TagsMap[k] = v
}
}
if e.OriginalTagsJSON != nil {
eventCopy.OriginalTagsJSON = make([]string, len(e.OriginalTagsJSON))
copy(eventCopy.OriginalTagsJSON, e.OriginalTagsJSON)
}
if e.AnnotationsJSON != nil {
eventCopy.AnnotationsJSON = make(map[string]string, len(e.AnnotationsJSON))
for k, v := range e.AnnotationsJSON {
eventCopy.AnnotationsJSON[k] = v
}
}
if e.ExtraInfo != nil {
eventCopy.ExtraInfo = make([]string, len(e.ExtraInfo))
copy(eventCopy.ExtraInfo, e.ExtraInfo)
}
if e.ExtraInfoMap != nil {
eventCopy.ExtraInfoMap = make([]map[string]string, len(e.ExtraInfoMap))
for i, m := range e.ExtraInfoMap {
if m != nil {
eventCopy.ExtraInfoMap[i] = make(map[string]string, len(m))
for k, v := range m {
eventCopy.ExtraInfoMap[i][k] = v
}
}
}
}
if e.NotifyRuleIDs != nil {
eventCopy.NotifyRuleIDs = make([]int64, len(e.NotifyRuleIDs))
copy(eventCopy.NotifyRuleIDs, e.NotifyRuleIDs)
}
eventCopy.RuleConfigJson = e.RuleConfigJson
eventCopy.ExtraConfig = e.ExtraConfig
return &eventCopy
}

View File

@@ -5,8 +5,6 @@ import (
"fmt"
"strconv"
"strings"
"text/template"
"text/template/parse"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
@@ -479,19 +477,10 @@ func (ar *AlertRule) Verify() error {
return errors.New("name is blank")
}
t, err := template.New("test").Parse(ar.Name)
if err != nil {
if str.Dangerous(ar.Name) {
return errors.New("Name has invalid characters")
}
for _, node := range t.Tree.Root.Nodes {
if tn := node.(*parse.TextNode); tn != nil {
if str.Dangerous(tn.String()) {
return fmt.Errorf("Name has invalid characters: %s", tn.String())
}
}
}
if ar.Prod == "" {
ar.Prod = METRIC
}

178
models/event_pipeline.go Normal file
View File

@@ -0,0 +1,178 @@
package models
import (
"errors"
"fmt"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
// EventPipeline 事件Pipeline模型
type EventPipeline struct {
ID int64 `json:"id" gorm:"primaryKey"`
Name string `json:"name" gorm:"type:varchar(128)"`
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
TeamNames []string `json:"team_names" gorm:"-"`
Description string `json:"description" gorm:"type:varchar(255)"`
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
Processors []Processor `json:"processors" gorm:"type:text;serializer:json"`
CreateAt int64 `json:"create_at" gorm:"type:bigint"`
CreateBy string `json:"create_by" gorm:"type:varchar(64)"`
UpdateAt int64 `json:"update_at" gorm:"type:bigint"`
UpdateBy string `json:"update_by" gorm:"type:varchar(64)"`
}
type Processor struct {
Typ string `json:"typ"`
Config interface{} `json:"config"`
}
func (e *EventPipeline) TableName() string {
return "event_pipeline"
}
func (e *EventPipeline) Verify() error {
if e.Name == "" {
return errors.New("name cannot be empty")
}
if len(e.TeamIds) == 0 {
return errors.New("team_ids cannot be empty")
}
return nil
}
func (e *EventPipeline) DB2FE() {
if e.TeamIds == nil {
e.TeamIds = make([]int64, 0)
}
if e.LabelFilters == nil {
e.LabelFilters = make([]TagFilter, 0)
}
if e.AttrFilters == nil {
e.AttrFilters = make([]TagFilter, 0)
}
if e.Processors == nil {
e.Processors = make([]Processor, 0)
}
}
// CreateEventPipeline 创建事件Pipeline
func CreateEventPipeline(ctx *ctx.Context, pipeline *EventPipeline) error {
return DB(ctx).Create(pipeline).Error
}
// GetEventPipeline 获取单个事件Pipeline
func GetEventPipeline(ctx *ctx.Context, id int64) (*EventPipeline, error) {
var pipeline EventPipeline
err := DB(ctx).Where("id = ?", id).First(&pipeline).Error
if err != nil {
return nil, err
}
pipeline.DB2FE()
return &pipeline, nil
}
func GetEventPipelinesByIds(ctx *ctx.Context, ids []int64) ([]*EventPipeline, error) {
var pipelines []*EventPipeline
err := DB(ctx).Where("id in ?", ids).Find(&pipelines).Error
return pipelines, err
}
// UpdateEventPipeline 更新事件Pipeline
func UpdateEventPipeline(ctx *ctx.Context, pipeline *EventPipeline) error {
return DB(ctx).Save(pipeline).Error
}
// DeleteEventPipeline 删除事件Pipeline
func DeleteEventPipeline(ctx *ctx.Context, id int64) error {
return DB(ctx).Delete(&EventPipeline{}, id).Error
}
// ListEventPipelines 获取事件Pipeline列表
func ListEventPipelines(ctx *ctx.Context) ([]*EventPipeline, error) {
if !ctx.IsCenter {
pipelines, err := poster.GetByUrls[[]*EventPipeline](ctx, "/v1/n9e/event-pipelines")
return pipelines, err
}
var pipelines []*EventPipeline
err := DB(ctx).Order("name desc").Find(&pipelines).Error
if err != nil {
return nil, err
}
for _, p := range pipelines {
p.DB2FE()
}
return pipelines, nil
}
// DeleteEventPipelines 批量删除事件Pipeline
func DeleteEventPipelines(ctx *ctx.Context, ids []int64) error {
return DB(ctx).Where("id in ?", ids).Delete(&EventPipeline{}).Error
}
// Update 更新事件Pipeline
func (e *EventPipeline) Update(ctx *ctx.Context, ref *EventPipeline) error {
ref.ID = e.ID
ref.CreateAt = e.CreateAt
ref.CreateBy = e.CreateBy
ref.UpdateAt = time.Now().Unix()
err := ref.Verify()
if err != nil {
return err
}
return DB(ctx).Model(e).Select("*").Updates(*ref).Error
}
// FillTeamNames 填充团队名称
func (e *EventPipeline) FillTeamNames(ctx *ctx.Context) error {
e.TeamNames = make([]string, 0, len(e.TeamIds))
if len(e.TeamIds) == 0 {
return nil
}
teamMap, err := UserGroupIdAndNameMap(ctx, e.TeamIds)
if err != nil {
return err
}
// 按原始TeamIds顺序填充TeamNames
for _, tid := range e.TeamIds {
if name, exists := teamMap[tid]; exists {
e.TeamNames = append(e.TeamNames, name)
}
}
return nil
}
func EventPipelineStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=event_pipeline")
return s, err
}
session := DB(ctx).Model(&EventPipeline{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
if len(stats) == 0 {
return nil, fmt.Errorf("no event pipeline found")
}
return stats[0], nil
}

View File

@@ -67,7 +67,8 @@ func MigrateTables(db *gorm.DB) error {
&TaskRecord{}, &ChartShare{}, &Target{}, &Configs{}, &Datasource{}, &NotifyTpl{},
&Board{}, &BoardBusigroup{}, &Users{}, &SsoConfig{}, &models.BuiltinMetric{},
&models.MetricFilter{}, &models.NotificaitonRecord{}, &models.TargetBusiGroup{},
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{}}
&models.UserToken{}, &models.DashAnnotation{}, MessageTemplate{}, NotifyRule{}, NotifyChannelConfig{}, &EsIndexPatternMigrate{},
&models.EventPipeline{}}
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})
@@ -227,6 +228,26 @@ func InsertPermPoints(db *gorm.DB) {
Operation: "/notification-rules/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/del",
})
for _, op := range ops {
var count int64
@@ -414,16 +435,17 @@ func (t *MessageTemplate) TableName() string {
}
type NotifyRule struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
Name string `gorm:"column:name;type:varchar(255);not null"`
Description string `gorm:"column:description;type:text"`
Enable bool `gorm:"column:enable;not null;default:false"`
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`
UpdateBy string `gorm:"column:update_by;type:varchar(64);not null;default:''"`
ID int64 `gorm:"column:id;primaryKey;autoIncrement"`
Name string `gorm:"column:name;type:varchar(255);not null"`
Description string `gorm:"column:description;type:text"`
Enable bool `gorm:"column:enable;not null;default:false"`
UserGroupIds []int64 `gorm:"column:user_group_ids;type:varchar(255)"`
NotifyConfigs []models.NotifyConfig `gorm:"column:notify_configs;type:text"`
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text"`
CreateAt int64 `gorm:"column:create_at;not null;default:0"`
CreateBy string `gorm:"column:create_by;type:varchar(64);not null;default:''"`
UpdateAt int64 `gorm:"column:update_at;not null;default:0"`
UpdateBy string `gorm:"column:update_by;type:varchar(64);not null;default:''"`
}
func (r *NotifyRule) TableName() string {

View File

@@ -388,7 +388,10 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
// 设置 URL 参数
query := req.URL.Query()
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
if flashDutyChannelID != 0 {
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
}
req.URL.RawQuery = query.Encode()
req.Header.Add("Content-Type", "application/json")

View File

@@ -15,6 +15,8 @@ type NotifyRule struct {
Enable bool `json:"enable"` // 启用状态
UserGroupIds []int64 `json:"user_group_ids" gorm:"serializer:json"` // 告警组ID
PipelineConfigs []PipelineConfig `json:"pipeline_configs" gorm:"serializer:json"`
// 通知配置
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
@@ -24,6 +26,11 @@ type NotifyRule struct {
UpdateBy string `json:"update_by"`
}
type PipelineConfig struct {
PipelineId int64 `json:"pipeline_id"`
Enable bool `json:"enable"`
}
func (r *NotifyRule) TableName() string {
return "notify_rule"
}

View File

@@ -121,6 +121,25 @@ func (u *User) IsAdmin() bool {
return false
}
// has group permission
func (u *User) CheckGroupPermission(ctx *ctx.Context, groupIds []int64) error {
if !u.IsAdmin() {
ids, err := MyGroupIdsMap(ctx, u.Id)
if err != nil {
return err
}
for _, id := range groupIds {
if _, ok := ids[id]; ok {
return nil
}
}
return errors.New("no permission")
}
return nil
}
func (u *User) Verify() error {
u.Username = strings.TrimSpace(u.Username)

View File

@@ -113,6 +113,19 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
return lst, err
}
func UserGroupIdAndNameMap(ctx *ctx.Context, ids []int64) (map[int64]string, error) {
lst, err := UserGroupGetByIds(ctx, ids)
if err != nil {
return nil, err
}
m := make(map[int64]string)
for _, ug := range lst {
m[ug.Id] = ug.Name
}
return m, nil
}
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/user-groups")

View File

@@ -22,6 +22,20 @@ func MyGroupIds(ctx *ctx.Context, userId int64) ([]int64, error) {
return ids, err
}
func MyGroupIdsMap(ctx *ctx.Context, userId int64) (map[int64]struct{}, error) {
ids, err := MyGroupIds(ctx, userId)
if err != nil {
return nil, err
}
res := make(map[int64]struct{}, len(ids))
for _, id := range ids {
res[id] = struct{}{}
}
return res, nil
}
// my business group ids
func MyBusiGroupIds(ctx *ctx.Context, userId int64) ([]int64, error) {
groupIds, err := MyGroupIds(ctx, userId)

View File

@@ -74,6 +74,9 @@ var I18N = `{
"Log Analysis": "日志分析",
"View Logs": "查看日志",
"View Index Patterns": "查看索引模式",
"Add Index Pattern": "添加索引模式",
"Modify Index Pattern": "修改索引模式",
"Delete Index Pattern": "删除索引模式",
"Alert Rules": "告警规则",
"View Alert Rules": "查看告警规则",
"Add Alert Rule": "添加告警规则",
@@ -227,6 +230,9 @@ var I18N = `{
"Log Analysis": "日志分析",
"View Logs": "查看日志",
"View Index Patterns": "查看索引模式",
"Add Index Pattern": "添加索引模式",
"Modify Index Pattern": "修改索引模式",
"Delete Index Pattern": "删除索引模式",
"Alert Rules": "告警规则",
"View Alert Rules": "查看告警规则",
"Add Alert Rule": "添加告警规则",
@@ -391,6 +397,9 @@ var I18N = `{
"Log Analysis": "日誌分析",
"View Logs": "查看日誌",
"View Index Patterns": "查看索引模式",
"Add Index Pattern": "添加索引模式",
"Modify Index Pattern": "修改索引模式",
"Delete Index Pattern": "刪除索引模式",
"Alert Rules": "告警規則",
"View Alert Rules": "查看告警規則",
"Add Alert Rule": "添加告警規則",
@@ -545,6 +554,9 @@ var I18N = `{
"Log Analysis": "ログ分析",
"View Logs": "ログの表示",
"View Index Patterns": "インデックスパターンの表示",
"Add Index Pattern": "インデックスパターンの追加",
"Modify Index Pattern": "インデックスパターンの修正",
"Delete Index Pattern": "インデックスパターンの削除",
"Alert Rules": "アラートルール",
"View Alert Rules": "アラートルールの表示",
"Add Alert Rule": "アラートルールの追加",