Compare commits

...

10 Commits

Author SHA1 Message Date
ning
9a95d55feb optimize drop sample 2026-01-23 13:03:27 +08:00
ning
93f07b5dce fix: event concurrent map writes 2025-12-04 20:06:41 +08:00
ning
665dc7ef47 refactor: update board api 2025-11-14 12:16:03 +08:00
ning
99a5df9b51 fix: user phone encrypt 2025-11-10 18:28:58 +08:00
ning
0a19e02a08 fix prom datasource update 2025-11-07 17:54:12 +08:00
ning
ba913abef5 fix: dsn 2025-11-05 11:25:51 +08:00
ning
f562b6823b refactor: optimize db dsn 2025-11-04 15:01:59 +08:00
Yening Qin
254484b4c4 feat: alert rule add event process (#2920) 2025-10-28 16:17:43 +08:00
ning
d4299bec9d update built in 2025-10-28 11:22:58 +08:00
ning
75505ff763 refactor: update alert mute api 2025-10-15 15:20:51 +08:00
20 changed files with 245 additions and 72 deletions

View File

@@ -118,7 +118,7 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
eventProcessorCache := memsto.NewEventProcessorCache(ctx, syncStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, taskTplsCache, notifyRuleCache, notifyChannelCache, messageTemplateCache, eventProcessorCache, alertc.Alerting, ctx, alertStats)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp, promClients, alertMuteCache)
notifyRecordComsumer := sender.NewNotifyRecordConsumer(ctx)

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/alert/common"
"github.com/ccfos/nightingale/v6/alert/queue"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
@@ -26,10 +27,15 @@ type Consumer struct {
alerting aconf.Alerting
ctx *ctx.Context
dispatch *Dispatch
promClients *prom.PromClientMap
dispatch *Dispatch
promClients *prom.PromClientMap
alertMuteCache *memsto.AlertMuteCacheType
}
type EventMuteHookFunc func(event *models.AlertCurEvent) bool
var EventMuteHook EventMuteHookFunc = func(event *models.AlertCurEvent) bool { return false }
func InitRegisterQueryFunc(promClients *prom.PromClientMap) {
tplx.RegisterQueryFunc(func(datasourceID int64, promql string) model.Value {
if promClients.IsNil(datasourceID) {
@@ -43,12 +49,14 @@ func InitRegisterQueryFunc(promClients *prom.PromClientMap) {
}
// 创建一个 Consumer 实例
func NewConsumer(alerting aconf.Alerting, ctx *ctx.Context, dispatch *Dispatch, promClients *prom.PromClientMap) *Consumer {
func NewConsumer(alerting aconf.Alerting, ctx *ctx.Context, dispatch *Dispatch, promClients *prom.PromClientMap, alertMuteCache *memsto.AlertMuteCacheType) *Consumer {
return &Consumer{
alerting: alerting,
ctx: ctx,
dispatch: dispatch,
promClients: promClients,
alertMuteCache: alertMuteCache,
}
}

View File

@@ -28,6 +28,8 @@ var ShouldSkipNotify func(*ctx.Context, *models.AlertCurEvent, int64) bool
var SendByNotifyRule func(*ctx.Context, *memsto.UserCacheType, *memsto.UserGroupCacheType, *memsto.NotifyChannelCacheType,
[]*models.AlertCurEvent, int64, *models.NotifyConfig, *models.NotifyChannelConfig, *models.MessageTemplate)
var EventProcessorCache *memsto.EventProcessorCacheType
func init() {
ShouldSkipNotify = shouldSkipNotify
SendByNotifyRule = SendNotifyRuleMessage
@@ -90,6 +92,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
}
pipeline.Init()
EventProcessorCache = eventProcessorCache
// 设置通知记录回调函数
notifyChannelCache.SetNotifyRecordFunc(sender.NotifyRecord)
@@ -177,39 +180,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
eventCopy.NotifyRuleId = notifyRuleId
eventCopy.NotifyRuleName = notifyRule.Name
var processors []models.Processor
for _, pipelineConfig := range notifyRule.PipelineConfigs {
if !pipelineConfig.Enable {
continue
}
eventPipeline := e.eventProcessorCache.Get(pipelineConfig.PipelineId)
if eventPipeline == nil {
logger.Warningf("notify_id: %d, event:%+v, processor not found", notifyRuleId, eventCopy)
continue
}
if !pipelineApplicable(eventPipeline, eventCopy) {
logger.Debugf("notify_id: %d, event:%+v, pipeline_id: %d, not applicable", notifyRuleId, eventCopy, pipelineConfig.PipelineId)
continue
}
processors = append(processors, e.eventProcessorCache.GetProcessorsById(pipelineConfig.PipelineId)...)
}
for _, processor := range processors {
var res string
var err error
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
eventCopy, res, err = processor.Process(e.ctx, eventCopy)
if eventCopy == nil {
logger.Warningf("after processor notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventOrigin}, notifyRuleId, "", "", res, errors.New("drop by processor"))
break
}
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
}
eventCopy = HandleEventPipeline(notifyRule.PipelineConfigs, eventOrigin, eventCopy, e.eventProcessorCache, e.ctx, notifyRuleId, "notify_rule")
if ShouldSkipNotify(e.ctx, eventCopy, notifyRuleId) {
logger.Infof("notify_id: %d, event:%+v, should skip notify", notifyRuleId, eventCopy)
continue
@@ -257,7 +228,48 @@ func shouldSkipNotify(ctx *ctx.Context, event *models.AlertCurEvent, notifyRuleI
return false
}
func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
func HandleEventPipeline(pipelineConfigs []models.PipelineConfig, eventOrigin, event *models.AlertCurEvent, eventProcessorCache *memsto.EventProcessorCacheType, ctx *ctx.Context, id int64, from string) *models.AlertCurEvent {
for _, pipelineConfig := range pipelineConfigs {
if !pipelineConfig.Enable {
continue
}
eventPipeline := eventProcessorCache.Get(pipelineConfig.PipelineId)
if eventPipeline == nil {
logger.Warningf("processor_by_%s_id:%d pipeline_id:%d, event pipeline not found, event: %+v", from, id, pipelineConfig.PipelineId, event)
continue
}
if !PipelineApplicable(eventPipeline, event) {
logger.Debugf("processor_by_%s_id:%d pipeline_id:%d, event pipeline not applicable, event: %+v", from, id, pipelineConfig.PipelineId, event)
continue
}
processors := eventProcessorCache.GetProcessorsById(pipelineConfig.PipelineId)
for _, processor := range processors {
var res string
var err error
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, before processor:%+v, event: %+v", from, id, pipelineConfig.PipelineId, processor, event)
event, res, err = processor.Process(ctx, event)
if event == nil {
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, event dropped, after processor:%+v, event: %+v", from, id, pipelineConfig.PipelineId, processor, eventOrigin)
if from == "notify_rule" {
// alert_rule 获取不到 eventId 记录没有意义
sender.NotifyRecord(ctx, []*models.AlertCurEvent{eventOrigin}, id, "", "", res, fmt.Errorf("processor_by_%s_id:%d pipeline_id:%d, drop by processor", from, id, pipelineConfig.PipelineId))
}
return nil
}
logger.Infof("processor_by_%s_id:%d pipeline_id:%d, after processor:%+v, event: %+v, res:%v, err:%v", from, id, pipelineConfig.PipelineId, processor, event, res, err)
}
}
event.FE2DB()
event.FillTagsMap()
return event
}
func PipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEvent) bool {
if pipeline == nil {
return true
}
@@ -268,13 +280,16 @@ func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
tagMatch := true
if len(pipeline.LabelFilters) > 0 {
for i := range pipeline.LabelFilters {
if pipeline.LabelFilters[i].Func == "" {
pipeline.LabelFilters[i].Func = pipeline.LabelFilters[i].Op
// Deep copy to avoid concurrent map writes on cached objects
labelFiltersCopy := make([]models.TagFilter, len(pipeline.LabelFilters))
copy(labelFiltersCopy, pipeline.LabelFilters)
for i := range labelFiltersCopy {
if labelFiltersCopy[i].Func == "" {
labelFiltersCopy[i].Func = labelFiltersCopy[i].Op
}
}
tagFilters, err := models.ParseTagFilter(pipeline.LabelFilters)
tagFilters, err := models.ParseTagFilter(labelFiltersCopy)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v", err, event, pipeline)
return false
@@ -284,7 +299,11 @@ func pipelineApplicable(pipeline *models.EventPipeline, event *models.AlertCurEv
attributesMatch := true
if len(pipeline.AttrFilters) > 0 {
tagFilters, err := models.ParseTagFilter(pipeline.AttrFilters)
// Deep copy to avoid concurrent map writes on cached objects
attrFiltersCopy := make([]models.TagFilter, len(pipeline.AttrFilters))
copy(attrFiltersCopy, pipeline.AttrFilters)
tagFilters, err := models.ParseTagFilter(attrFiltersCopy)
if err != nil {
logger.Errorf("pipeline applicable failed to parse tag filter: %v event:%+v pipeline:%+v err:%v", tagFilters, event, pipeline, err)
return false
@@ -365,13 +384,16 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
tagMatch := true
if len(notifyConfig.LabelKeys) > 0 {
for i := range notifyConfig.LabelKeys {
if notifyConfig.LabelKeys[i].Func == "" {
notifyConfig.LabelKeys[i].Func = notifyConfig.LabelKeys[i].Op
// Deep copy to avoid concurrent map writes on cached objects
labelKeysCopy := make([]models.TagFilter, len(notifyConfig.LabelKeys))
copy(labelKeysCopy, notifyConfig.LabelKeys)
for i := range labelKeysCopy {
if labelKeysCopy[i].Func == "" {
labelKeysCopy[i].Func = labelKeysCopy[i].Op
}
}
tagFilters, err := models.ParseTagFilter(notifyConfig.LabelKeys)
tagFilters, err := models.ParseTagFilter(labelKeysCopy)
if err != nil {
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v", err, event, notifyConfig)
return fmt.Errorf("failed to parse tag filter: %v", err)
@@ -385,7 +407,11 @@ func NotifyRuleMatchCheck(notifyConfig *models.NotifyConfig, event *models.Alert
attributesMatch := true
if len(notifyConfig.Attributes) > 0 {
tagFilters, err := models.ParseTagFilter(notifyConfig.Attributes)
// Deep copy to avoid concurrent map writes on cached objects
attributesCopy := make([]models.TagFilter, len(notifyConfig.Attributes))
copy(attributesCopy, notifyConfig.Attributes)
tagFilters, err := models.ParseTagFilter(attributesCopy)
if err != nil {
logger.Errorf("notify send failed to parse tag filter: %v event:%+v notify_config:%+v err:%v", tagFilters, event, notifyConfig, err)
return fmt.Errorf("failed to parse tag filter: %v", err)
@@ -496,7 +522,7 @@ func SendNotifyRuleMessage(ctx *ctx.Context, userCache *memsto.UserCacheType, us
start := time.Now()
respBody, err := notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], notifyChannelCache.GetHttpClient(notifyChannel.ID))
respBody = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), respBody)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
logger.Infof("duty_sender notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
}
@@ -527,7 +553,7 @@ func SendNotifyRuleMessage(ctx *ctx.Context, userCache *memsto.UserCacheType, us
start := time.Now()
target, res, err := notifyChannel.SendScript(events, tplContent, customParams, sendtos)
res = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), res)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
logger.Infof("script_sender notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, target:%s, res:%s, err:%v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, target, res, err)
sender.NotifyRecord(ctx, events, notifyRuleId, notifyChannel.Name, target, res, err)
default:
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v send type not found", notifyRuleId, notifyChannel.Name, events[0])

View File

@@ -286,7 +286,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) ([]models.Ano
continue
}
if query.VarEnabled {
if query.VarEnabled && strings.Contains(query.PromQl, "$") {
var anomalyPoints []models.AnomalyPoint
if hasLabelLossAggregator(query) || notExactMatch(query) {
// 若有聚合函数或非精确匹配则需要先填充变量然后查询,这个方式效率较低

View File

@@ -26,8 +26,6 @@ import (
"github.com/toolkits/pkg/str"
)
type EventMuteHookFunc func(event *models.AlertCurEvent) bool
type ExternalProcessorsType struct {
ExternalLock sync.RWMutex
Processors map[string]*Processor
@@ -76,7 +74,6 @@ type Processor struct {
HandleFireEventHook HandleEventFunc
HandleRecoverEventHook HandleEventFunc
EventMuteHook EventMuteHookFunc
ScheduleEntry cron.Entry
PromEvalInterval int
@@ -121,7 +118,6 @@ func NewProcessor(engineName string, rule *models.AlertRule, datasourceId int64,
HandleFireEventHook: func(event *models.AlertCurEvent) {},
HandleRecoverEventHook: func(event *models.AlertCurEvent) {},
EventMuteHook: func(event *models.AlertCurEvent) bool { return false },
}
p.mayHandleGroup()
@@ -155,9 +151,19 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
hash := event.Hash
alertingKeys[hash] = struct{}{}
// event processor
eventCopy := event.DeepCopy()
event = dispatch.HandleEventPipeline(cachedRule.PipelineConfigs, eventCopy, event, dispatch.EventProcessorCache, p.ctx, cachedRule.Id, "alert_rule")
if event == nil {
logger.Infof("rule_eval:%s is muted drop by pipeline event:%v", p.Key(), eventCopy)
continue
}
// event mute
isMuted, detail, muteId := mute.IsMuted(cachedRule, event, p.TargetCache, p.alertMuteCache)
if isMuted {
logger.Debugf("rule_eval:%s event:%v is muted, detail:%s", p.Key(), event, detail)
logger.Infof("rule_eval:%s is muted, detail:%s event:%v", p.Key(), detail, event)
p.Stats.CounterMuteTotal.WithLabelValues(
fmt.Sprintf("%v", event.GroupName),
fmt.Sprintf("%v", p.rule.Id),
@@ -167,8 +173,8 @@ func (p *Processor) Handle(anomalyPoints []models.AnomalyPoint, from string, inh
continue
}
if p.EventMuteHook(event) {
logger.Debugf("rule_eval:%s event:%v is muted by hook", p.Key(), event)
if dispatch.EventMuteHook(event) {
logger.Infof("rule_eval:%s is muted by hook event:%v", p.Key(), event)
p.Stats.CounterMuteTotal.WithLabelValues(
fmt.Sprintf("%v", event.GroupName),
fmt.Sprintf("%v", p.rule.Id),

View File

@@ -331,6 +331,30 @@ func (b *BuiltinPayloadInFileType) AddBuiltinPayload(bp *models.BuiltinPayload)
b.IndexData[bp.UUID] = bp
}
func (b *BuiltinPayloadInFileType) GetComponentIdentByCate(typ, cate string) string {
for _, source := range b.Data {
if source == nil {
continue
}
typeMap, exists := source[typ]
if !exists {
continue
}
payloads, exists := typeMap[cate]
if !exists {
continue
}
if len(payloads) > 0 {
return payloads[0].Component
}
}
return ""
}
func (b *BuiltinPayloadInFileType) GetBuiltinPayload(typ, cate, query string, componentId uint64) ([]*models.BuiltinPayload, error) {
var result []*models.BuiltinPayload

View File

@@ -681,6 +681,7 @@ func (rt *Router) Config(r *gin.Engine) {
// 手机号加密存储配置接口
service.POST("/users/phone/encrypt", rt.usersPhoneEncrypt)
service.POST("/users/phone/decrypt", rt.usersPhoneDecrypt)
service.POST("/users/phone/refresh-encryption-config", rt.usersPhoneDecryptRefresh)
}
}

View File

@@ -115,6 +115,10 @@ func (rt *Router) boardPureGet(c *gin.Context) {
ginx.Bomb(http.StatusNotFound, "No such dashboard")
}
// 清除创建者和更新者信息
board.CreateBy = ""
board.UpdateBy = ""
ginx.NewRender(c).Data(board, nil)
}

View File

@@ -76,7 +76,9 @@ func (rt *Router) alertMuteAdd(c *gin.Context) {
f.CreateBy = username
f.UpdateBy = username
f.GroupId = ginx.UrlParamInt64(c, "id")
ginx.NewRender(c).Message(f.Add(rt.Ctx))
ginx.Dangerous(f.Add(rt.Ctx))
ginx.NewRender(c).Data(f.Id, nil)
}
type MuteTestForm struct {

View File

@@ -235,6 +235,16 @@ func (rt *Router) userDel(c *gin.Context) {
return
}
// 如果要删除的用户是 admin 角色,检查是否是最后一个 admin
if target.IsAdmin() {
adminCount, err := models.CountAdminUsers(rt.Ctx)
ginx.Dangerous(err)
if adminCount <= 1 {
ginx.Bomb(http.StatusBadRequest, "Cannot delete the last admin user")
}
}
ginx.NewRender(c).Message(target.Del(rt.Ctx))
}
@@ -345,6 +355,16 @@ func (rt *Router) usersPhoneEncrypt(c *gin.Context) {
}, nil)
}
func (rt *Router) usersPhoneDecryptRefresh(c *gin.Context) {
err := models.RefreshPhoneEncryptionCache(rt.Ctx)
if err != nil {
ginx.NewRender(c).Message(fmt.Errorf("refresh phone encryption cache failed: %v", err))
return
}
ginx.NewRender(c).Message(nil)
}
// usersPhoneDecrypt 统一手机号解密
func (rt *Router) usersPhoneDecrypt(c *gin.Context) {
// 先关闭手机号加密功能

View File

@@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"strings"
"time"
@@ -117,7 +118,8 @@ func (p *PostgreSQL) NewConn(ctx context.Context, database string) (*gorm.DB, er
}()
// Simplified connection logic for PostgreSQL
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&TimeZone=Asia/Shanghai", p.Shard.User, p.Shard.Password, p.Shard.Addr, database)
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&TimeZone=Asia/Shanghai", url.QueryEscape(p.Shard.User), url.QueryEscape(p.Shard.Password), p.Shard.Addr, database)
db, err = sqlbase.NewDB(
ctx,
postgres.Open(dsn),

View File

@@ -42,6 +42,11 @@ func (c *CvalCache) initSyncConfigs() {
log.Fatalln("failed to sync configs:", err)
}
err = models.RefreshPhoneEncryptionCache(c.ctx)
if err != nil {
logger.Errorf("failed to refresh phone encryption cache: %v", err)
}
go c.loopSyncConfigs()
}

View File

@@ -284,7 +284,7 @@ func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
start := time.Now()
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, task.Sendtos, httpClient)
resp = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), resp)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
logger.Infof("http_sendernotify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos, resp, err)
// 调用通知记录回调函数
@@ -296,7 +296,7 @@ func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
start := time.Now()
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, []string{task.Sendtos[i]}, httpClient)
resp = fmt.Sprintf("send_time: %s duration: %d ms %s", time.Now().Format("2006-01-02 15:04:05"), time.Since(start).Milliseconds(), resp)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
logger.Infof("http_sender notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos[i], resp, err)
// 调用通知记录回调函数

View File

@@ -498,6 +498,23 @@ func (e *AlertCurEvent) FE2DB() {
}
func (e *AlertCurEvent) FillTagsMap() {
e.TagsMap = make(map[string]string)
for i := 0; i < len(e.TagsJSON); i++ {
pair := strings.TrimSpace(e.TagsJSON[i])
if pair == "" {
continue
}
arr := strings.SplitN(pair, "=", 2)
if len(arr) != 2 {
continue
}
e.TagsMap[arr[0]] = arr[1]
}
}
func (e *AlertCurEvent) DB2Mem() {
e.IsRecovered = false
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)

View File

@@ -119,6 +119,7 @@ type AlertRule struct {
UpdateByNickname string `json:"update_by_nickname" gorm:"-"` // for fe
CronPattern string `json:"cron_pattern"`
NotifyRuleIds []int64 `json:"notify_rule_ids" gorm:"serializer:json"`
PipelineConfigs []PipelineConfig `json:"pipeline_configs" gorm:"serializer:json"`
NotifyVersion int `json:"notify_version"` // 0: old, 1: new
}

View File

@@ -172,6 +172,7 @@ type AlertRule struct {
DatasourceQueries []models.DatasourceQuery `gorm:"datasource_queries;type:text;serializer:json"` // datasource queries
NotifyRuleIds []int64 `gorm:"column:notify_rule_ids;type:varchar(1024)"`
NotifyVersion int `gorm:"column:notify_version;type:int;default:0"`
PipelineConfigs []models.PipelineConfig `gorm:"column:pipeline_configs;type:text;serializer:json"`
}
type AlertSubscribe struct {
@@ -259,7 +260,7 @@ type BoardBusigroup struct {
}
type Users struct {
Belong string `gorm:"column:belong;varchar(16);default:'';comment:belong"`
Belong string `gorm:"column:belong;type:varchar(16);default:'';comment:belong"`
LastActiveTime int64 `gorm:"column:last_active_time;type:int;default:0;comment:last_active_time"`
Phone string `gorm:"column:phone;type:varchar(1024);not null;default:''"`
}

View File

@@ -369,6 +369,12 @@ func UserGetById(ctx *ctx.Context, id int64) (*User, error) {
return UserGet(ctx, "id=?", id)
}
func CountAdminUsers(ctx *ctx.Context) (int64, error) {
var count int64
err := DB(ctx).Model(&User{}).Where("roles LIKE ?", "%"+AdminRole+"%").Count(&count).Error
return count, err
}
func UsersGetByGroupIds(ctx *ctx.Context, groupIds []int64) ([]User, error) {
if len(groupIds) == 0 {
return nil, nil

View File

@@ -52,6 +52,10 @@ func (po *PromOption) Equal(target PromOption) bool {
return false
}
if po.InsecureSkipVerify != target.InsecureSkipVerify {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}

View File

@@ -109,21 +109,30 @@ func (rt *Router) debugSample(remoteAddr string, v *prompb.TimeSeries) {
}
func (rt *Router) DropSample(v *prompb.TimeSeries) bool {
filters := rt.Pushgw.DropSample
if len(filters) == 0 {
// 快速路径:检查仅 __name__ 的过滤器 O(1)
if len(rt.dropByNameOnly) > 0 {
for i := 0; i < len(v.Labels); i++ {
if v.Labels[i].Name == "__name__" {
if _, ok := rt.dropByNameOnly[v.Labels[i].Value]; ok {
return true
}
break // __name__ 只会出现一次,找到后直接跳出
}
}
}
// 慢速路径:处理复杂的多条件过滤器
if len(rt.dropComplex) == 0 {
return false
}
labelMap := make(map[string]string)
// 只有复杂过滤器存在时才创建 labelMap
labelMap := make(map[string]string, len(v.Labels))
for i := 0; i < len(v.Labels); i++ {
labelMap[v.Labels[i].Name] = v.Labels[i].Value
}
for _, filter := range filters {
if len(filter) == 0 {
continue
}
for _, filter := range rt.dropComplex {
if matchSample(filter, labelMap) {
return true
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/center/metas"
@@ -33,6 +34,10 @@ type Router struct {
Ctx *ctx.Context
HandleTS HandleTSFunc
HeartbeartApi string
// 预编译的 DropSample 过滤器
dropByNameOnly map[string]struct{} // 仅 __name__ 条件的快速匹配
dropComplex []map[string]string // 多条件的复杂匹配
}
func stat() gin.HandlerFunc {
@@ -51,7 +56,7 @@ func stat() gin.HandlerFunc {
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType,
idents *idents.Set, metas *metas.Set,
writers *writer.WritersType, ctx *ctx.Context) *Router {
return &Router{
rt := &Router{
HTTP: httpConfig,
Pushgw: pushgw,
Aconf: aconf,
@@ -63,6 +68,38 @@ func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *me
MetaSet: metas,
HandleTS: func(pt *prompb.TimeSeries) *prompb.TimeSeries { return pt },
}
// 预编译 DropSample 过滤器
rt.initDropSampleFilters()
return rt
}
// initDropSampleFilters 预编译 DropSample 过滤器,将单条件 __name__ 过滤器
// 放入 map 实现 O(1) 查找,多条件过滤器保留原有逻辑
func (rt *Router) initDropSampleFilters() {
rt.dropByNameOnly = make(map[string]struct{})
rt.dropComplex = make([]map[string]string, 0)
for _, filter := range rt.Pushgw.DropSample {
if len(filter) == 0 {
continue
}
// 如果只有一个条件且是 __name__放入快速匹配 map
if len(filter) == 1 {
if name, ok := filter["__name__"]; ok {
rt.dropByNameOnly[name] = struct{}{}
continue
}
}
// 其他情况放入复杂匹配列表
rt.dropComplex = append(rt.dropComplex, filter)
}
logger.Infof("DropSample filters initialized: %d name-only, %d complex",
len(rt.dropByNameOnly), len(rt.dropComplex))
}
func (rt *Router) Config(r *gin.Engine) {