Compare commits

...

9 Commits

Author SHA1 Message Date
smx_Morgan
65a1ca95ff refactor: event notify (#2818) 2025-08-12 16:53:32 +08:00
Ulric Qin
9c5ccf0c8f fix: update update_at when batch-updating-rules 2025-08-06 20:21:57 +08:00
Ulric Qin
cd468af250 refactor batch updating rules 2025-08-06 17:16:02 +08:00
Ulric Qin
2d3449c0ec code refactor for batch updating 2025-08-06 15:45:36 +08:00
ning
e15bdbce92 refactor: optimize import prom alert rule 2025-08-06 11:09:53 +08:00
ning
3890243d42 fix: new mysql db client 2025-08-04 18:40:04 +08:00
ning
37fb4ee867 add case-insensitive search for builtin payload filtering 2025-08-04 16:31:28 +08:00
ning
6db63eafc1 refactor: change import prom rule 2025-08-01 19:00:06 +08:00
ning
1e9cbfc316 fix: event query log 2025-08-01 16:47:14 +08:00
11 changed files with 155 additions and 69 deletions

1
.issue Symbolic link
View File

@@ -0,0 +1 @@
/Users/ning/qinyening.com/issue/n9e

View File

@@ -390,9 +390,10 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
return payloads
}
queryLower := strings.ToLower(query)
var filtered []*models.BuiltinPayload
for _, p := range payloads {
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
filtered = append(filtered, p)
}
}

View File

@@ -308,19 +308,52 @@ func (rt *Router) alertRuleAddByImportPromRule(c *gin.Context) {
var f promRuleForm
ginx.Dangerous(c.BindJSON(&f))
// 首先尝试解析带 groups 的格式
var pr struct {
Groups []models.PromRuleGroup `yaml:"groups"`
}
err := yaml.Unmarshal([]byte(f.Payload), &pr)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "invalid yaml format, please use the example format. err: %v", err)
var groups []models.PromRuleGroup
if err != nil || len(pr.Groups) == 0 {
// 如果解析失败或没有 groups尝试解析规则数组格式
var rules []models.PromRule
err = yaml.Unmarshal([]byte(f.Payload), &rules)
if err != nil {
// 最后尝试解析单个规则格式
var singleRule models.PromRule
err = yaml.Unmarshal([]byte(f.Payload), &singleRule)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "invalid yaml format. err: %v", err)
}
// 验证单个规则是否有效
if singleRule.Alert == "" && singleRule.Record == "" {
ginx.Bomb(http.StatusBadRequest, "input yaml is empty or invalid")
}
rules = []models.PromRule{singleRule}
}
// 验证规则数组是否为空
if len(rules) == 0 {
ginx.Bomb(http.StatusBadRequest, "input yaml contains no rules")
}
// 将规则数组包装成 group
groups = []models.PromRuleGroup{
{
Name: "imported_rules",
Rules: rules,
},
}
} else {
// 使用已解析的 groups
groups = pr.Groups
}
if len(pr.Groups) == 0 {
ginx.Bomb(http.StatusBadRequest, "input yaml is empty")
}
lst := models.DealPromGroup(pr.Groups, f.DatasourceQueries, f.Disabled)
lst := models.DealPromGroup(groups, f.DatasourceQueries, f.Disabled)
username := c.MustGet("username").(string)
bgid := ginx.UrlParamInt64(c, "id")
ginx.NewRender(c).Data(rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language")), nil)
@@ -465,8 +498,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "fields empty")
}
f.Fields["update_by"] = c.MustGet("username").(string)
f.Fields["update_at"] = time.Now().Unix()
updateBy := c.MustGet("username").(string)
updateAt := time.Now().Unix()
for i := 0; i < len(f.Ids); i++ {
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
@@ -483,7 +516,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(originRule)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
continue
}
}
@@ -496,7 +528,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -509,7 +540,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -519,7 +549,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
callback := callbacks.(string)
if !strings.Contains(ar.Callbacks, callback) {
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
continue
}
}
}
@@ -529,7 +558,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
if callbacks, has := f.Fields["callbacks"]; has {
callback := callbacks.(string)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
continue
}
}
@@ -539,21 +567,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
bytes, err := json.Marshal(datasourceQueries)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
continue
}
}
// 检测是否是批量更新通知规则的字段,如果是清理掉旧版本的配置
for k := range f.Fields {
if k == "notify_rule_ids" {
f.Fields["notify_version"] = 1
f.Fields["notify_channels"] = ""
f.Fields["notify_groups"] = ""
f.Fields["callbacks"] = ""
}
if k == "notify_channels" {
f.Fields["notify_version"] = 0
}
}
@@ -569,6 +582,12 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
}
}
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
"update_by": updateBy,
"update_at": updateAt,
}))
}
ginx.NewRender(c).Message(nil)

View File

@@ -221,7 +221,6 @@ func SendNotifyChannelMessage(ctx *ctx.Context, userCache *memsto.UserCacheType,
if err != nil {
return "", fmt.Errorf("failed to get http client: %v", err)
}
if notifyChannel.RequestConfig == nil {
return "", fmt.Errorf("request config is nil")
}

View File

@@ -271,7 +271,10 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
}
for i := 0; i < len(eventTags); i++ {
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
}
if len(eventTags) > 0 {
@@ -295,7 +298,10 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
}
for i := 0; i < len(eventTags); i++ {
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
}
if len(eventTags) > 0 {

View File

@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
}()
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
return sqlbase.NewDB(
db, err = sqlbase.NewDB(
ctx,
mysql.Open(dsn),
shard.MaxIdleConns,
shard.MaxOpenConns,
time.Duration(shard.ConnMaxLifetime)*time.Second,
)
return db, err
}
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {

View File

@@ -33,6 +33,8 @@ type NotifyTask struct {
// NotifyRecordFunc 通知记录函数类型
type NotifyRecordFunc func(ctx *ctx.Context, events []*models.AlertCurEvent, notifyRuleId int64, channelName, target, resp string, err error)
type NotifyCallbackFunc func(ctx *ctx.Context, ident string, params map[string]string)
type NotifyChannelCacheType struct {
statTotal int64
statLastUpdated int64
@@ -52,13 +54,15 @@ type NotifyChannelCacheType struct {
// 通知记录回调函数
notifyRecordFunc NotifyRecordFunc
NotifyCallback NotifyCallbackFunc
}
func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheType {
func NewNotifyChannelCache(c *ctx.Context, stats *Stats) *NotifyChannelCacheType {
ncc := &NotifyChannelCacheType{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
ctx: c,
stats: stats,
channels: make(map[int64]*models.NotifyChannelConfig),
channelsQueue: make(map[int64]*list.SafeListLimited),
@@ -66,6 +70,9 @@ func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheTy
httpClient: make(map[int64]*http.Client),
smtpCh: make(map[int64]chan *models.EmailContext),
smtpQuitCh: make(map[int64]chan struct{}),
NotifyCallback: func(ctx *ctx.Context, ident string, params map[string]string) {
},
}
ncc.SyncNotifyChannels()
@@ -168,6 +175,8 @@ func (ncc *NotifyChannelCacheType) addOrUpdateChannels(newChannels map[int64]*mo
if newChannel.RequestType == "http" {
ncc.startHttpChannel(chID, newChannel)
}
go ncc.NotifyCallback(ncc.ctx, newChannel.Ident, newChannel.RequestConfig.HTTPRequestConfig.Request.Parameters)
case "smtp":
// 创建SMTP发送器
if newChannel.RequestConfig != nil && newChannel.RequestConfig.SMTPRequestConfig != nil {
@@ -305,6 +314,7 @@ func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
}
}
}
}
// 判断是否需要批量发送联系人

View File

@@ -735,6 +735,15 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
return DB(ctx).Model(ar).Updates(updates).Error
}
if column == "notify_groups" || column == "notify_channels" {
updates := map[string]interface{}{
column: value,
"notify_version": 0,
"notify_rule_ids": []int64{},
}
return DB(ctx).Model(ar).Updates(updates).Error
}
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
}

View File

@@ -714,6 +714,18 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
return tpl.Update(ctx, *t)
}
func (t *MessageTemplate) UpsertForce(ctx *ctx.Context, ident string) error {
tpl, err := MessageTemplateGet(ctx, "ident = ?", ident)
if err != nil {
return errors.WithMessage(err, "failed to get message tpl")
}
if tpl == nil {
return Insert(ctx, t)
}
return tpl.Update(ctx, *t)
}
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
if t == nil {
return nil

View File

@@ -42,6 +42,12 @@ type EmailContext struct {
Mail *gomail.Message
}
var NotifyHookQuery HookQuery = func(head map[string]string, ident string, query url.Values, params map[string]string) (url.Values, map[string]string, bool) {
return nil, nil, false
}
type HookQuery func(head map[string]string, ident string, query url.Values, params map[string]string) (url.Values, map[string]string, bool)
// NotifyChannelConfig 通知媒介
type NotifyChannelConfig struct {
ID int64 `json:"id" gorm:"primaryKey"`
@@ -454,21 +460,36 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return "", err
query, headers, getQueryOk := NotifyHookQuery(ncc.RequestConfig.HTTPRequestConfig.Headers, ncc.Ident, query, parameters)
if !getQueryOk {
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return "", err
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
} else {
for key, value := range headers {
@@ -476,12 +497,6 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
@@ -1459,6 +1474,20 @@ func (ncc *NotifyChannelConfig) Upsert(ctx *ctx.Context) error {
return ch.Update(ctx, *ncc)
}
// UpsertForce 强制更新,用于初始化时覆盖现有记录
func (ncc *NotifyChannelConfig) UpsertForce(ctx *ctx.Context) error {
ch, err := NotifyChannelGet(ctx, "name = ?", ncc.Name)
if err != nil {
return errors.WithMessage(err, "notify channel init failed to get message tpl")
}
if ch == nil {
return Insert(ctx, ncc)
}
return ch.Update(ctx, *ncc)
}
var FeishuAppBody = `#!/usr/bin/env python
# -*- coding: UTF-8 -*-

View File

@@ -27,8 +27,13 @@ func convertInterval(interval string) int {
duration, err := time.ParseDuration(interval)
if err != nil {
logger.Errorf("Error parsing interval `%s`, err: %v", interval, err)
return 0
return 60
}
if duration.Seconds() == 0 {
duration = 60 * time.Second
}
return int(duration.Seconds())
}
@@ -57,17 +62,12 @@ func ConvertAlert(rule PromRule, interval string, datasouceQueries []DatasourceQ
}
ar := AlertRule{
Name: rule.Alert,
Severity: severity,
Disabled: disabled,
PromForDuration: convertInterval(rule.For),
PromQl: rule.Expr,
PromEvalInterval: convertInterval(interval),
EnableStimeJSON: "00:00",
EnableEtimeJSON: "23:59",
EnableDaysOfWeekJSON: []string{
"1", "2", "3", "4", "5", "6", "0",
},
Name: rule.Alert,
Severity: severity,
Disabled: disabled,
PromForDuration: convertInterval(rule.For),
PromQl: rule.Expr,
CronPattern: fmt.Sprintf("@every %ds", convertInterval(interval)),
EnableInBG: AlertRuleEnableInGlobalBG,
NotifyRecovered: AlertRuleNotifyRecovered,
NotifyRepeatStep: AlertRuleNotifyRepeatStep60Min,
@@ -88,7 +88,7 @@ func DealPromGroup(promRule []PromRuleGroup, dataSourceQueries []DatasourceQuery
for _, group := range promRule {
interval := group.Interval
if interval == "" {
interval = "15s"
interval = "60s"
}
for _, rule := range group.Rules {
if rule.Alert != "" {