Compare commits

..

13 Commits

Author SHA1 Message Date
ning
6d6460f138 update inhibit notify record 2025-09-05 18:17:47 +08:00
ning
9ec424f98c update notify aggr 2025-09-05 11:21:31 +08:00
ning
c931f43748 change notify 2025-09-04 17:34:03 +08:00
ning
f525dcf185 add event detail 2025-09-02 17:36:35 +08:00
ning
944ee5b801 refactor: events query 2025-09-01 20:27:51 +08:00
ning
fa6fef1689 update event noitfy 2025-08-26 19:54:49 +08:00
ning
167c8aece6 update notify 2025-08-22 19:12:12 +08:00
ning
378fece50b update notity 2025-08-22 10:08:27 +08:00
ning
bb6da02e7f update event api 2025-08-20 20:05:55 +08:00
ning
349e87ce8e update notify 2025-08-19 10:35:17 +08:00
ning
c37cfaa7ce refactor: update notify record 2025-08-15 13:32:05 +08:00
ning
4c1afb1191 refactor: update event tag filter 2025-08-15 10:28:04 +08:00
ning
14e3fd6fa3 update notify 2025-08-12 11:23:30 +08:00
52 changed files with 1868 additions and 9811 deletions

1
.gitignore vendored
View File

@@ -59,7 +59,6 @@ _test
.index
.vscode
.issue
.issue/*
.cursor
.claude
.DS_Store

1
.issue Symbolic link
View File

@@ -0,0 +1 @@
/Users/ning/qinyening.com/issue/n9e

View File

@@ -204,7 +204,7 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
eventCopy, res, err = processor.Process(e.ctx, eventCopy)
if eventCopy == nil {
logger.Warningf("after processor notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventOrigin}, notifyRuleId, "", "", res, errors.New("drop by processor"))
sender.NotifyRecord(e.ctx, []*models.AlertCurEvent{eventCopy}, notifyRuleId, "", "", res, errors.New("drop by processor"))
break
}
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
@@ -254,6 +254,7 @@ func shouldSkipNotify(ctx *ctx.Context, event *models.AlertCurEvent, notifyRuleI
// 如果 eventCopy 是恢复事件,且 NotifyRecovered 为 0则不发送通知
return true
}
return false
}

View File

@@ -141,7 +141,7 @@ func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
conf := smtp
if conf.Host == "" || conf.Port == 0 {
logger.Debug("SMTP configurations invalid")
logger.Warning("SMTP configurations invalid")
<-mailQuit
return
}

View File

@@ -316,7 +316,6 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/busi-groups/tags", rt.auth(), rt.user(), rt.busiGroupsGetTags)
pages.GET("/targets", rt.auth(), rt.user(), rt.targetGets)
pages.POST("/target-update", rt.auth(), rt.targetUpdate)
pages.GET("/target/extra-meta", rt.auth(), rt.user(), rt.targetExtendInfoByIdent)
pages.POST("/target/list", rt.auth(), rt.user(), rt.targetGetsByHostFilter)
pages.DELETE("/targets", rt.auth(), rt.user(), rt.perm("/targets/del"), rt.targetDel)
@@ -676,10 +675,6 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/message-templates", rt.messageTemplateGets)
service.GET("/event-pipelines", rt.eventPipelinesListByService)
// 手机号加密存储配置接口
service.POST("/users/phone/encrypt", rt.usersPhoneEncrypt)
service.POST("/users/phone/decrypt", rt.usersPhoneDecrypt)
}
}

View File

@@ -115,18 +115,7 @@ func (rt *Router) alertHisEventsDelete(c *gin.Context) {
time.Sleep(100 * time.Millisecond) // 防止锁表
}
}()
ginx.NewRender(c).Data("Alert history events deletion started", nil)
}
var TransferEventToCur func(*ctx.Context, *models.AlertHisEvent) *models.AlertCurEvent
func init() {
TransferEventToCur = transferEventToCur
}
func transferEventToCur(ctx *ctx.Context, event *models.AlertHisEvent) *models.AlertCurEvent {
cur := event.ToCur()
return cur
ginx.NewRender(c).Message("Alert history events deletion started")
}
func (rt *Router) alertHisEventGet(c *gin.Context) {
@@ -153,7 +142,7 @@ func (rt *Router) alertHisEventGet(c *gin.Context) {
ginx.Dangerous(err)
event.NotifyRules, err = GetEventNorifyRuleNames(rt.Ctx, event.NotifyRuleIds)
ginx.NewRender(c).Data(TransferEventToCur(rt.Ctx, event), err)
ginx.NewRender(c).Data(event, err)
}
func GetBusinessGroupIds(c *gin.Context, ctx *ctx.Context, onlySelfGroupView bool, myGroups bool) ([]int64, error) {

View File

@@ -35,12 +35,13 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
cache := make(map[int64]*models.UserGroup)
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
ars[i].FillSeverities()
}
}
ginx.NewRender(c).Data(ars, err)
}
func GetAlertCueEventTimeRange(c *gin.Context) (stime, etime int64) {
func getAlertCueEventTimeRange(c *gin.Context) (stime, etime int64) {
stime = ginx.QueryInt64(c, "stime", 0)
etime = ginx.QueryInt64(c, "etime", 0)
if etime == 0 {
@@ -79,6 +80,7 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
names := make([]string, 0, len(ars))
for i := 0; i < len(ars); i++ {
ars[i].FillNotifyGroups(rt.Ctx, cache)
ars[i].FillSeverities()
if len(ars[i].DatasourceQueries) != 0 {
ars[i].DatasourceIdsJson = rt.DatasourceCache.GetIDsByDsCateAndQueries(ars[i].Cate, ars[i].DatasourceQueries)
@@ -88,7 +90,7 @@ func (rt *Router) alertRuleGetsByGids(c *gin.Context) {
names = append(names, ars[i].UpdateBy)
}
stime, etime := GetAlertCueEventTimeRange(c)
stime, etime := getAlertCueEventTimeRange(c)
cnt := models.AlertCurEventCountByRuleId(rt.Ctx, rids, stime, etime)
if cnt != nil {
for i := 0; i < len(ars); i++ {
@@ -288,15 +290,6 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
models.DataSourceQueryAll,
}
}
// 将导入的规则统一转为新版本的通知规则配置
lst[i].NotifyVersion = 1
lst[i].NotifyChannelsJSON = []string{}
lst[i].NotifyGroupsJSON = []string{}
lst[i].NotifyChannels = ""
lst[i].NotifyGroups = ""
lst[i].Callbacks = ""
lst[i].CallbacksJSON = []string{}
}
bgid := ginx.UrlParamInt64(c, "id")

View File

@@ -193,9 +193,11 @@ func (rt *Router) eventsMessage(c *gin.Context) {
events[i] = he.ToCur()
}
renderData := make(map[string]interface{})
renderData["events"] = events
defs := models.GetDefs(renderData)
var defs = []string{
"{{$events := .}}",
"{{$event := index . 0}}",
"{{$aggr_key := \"\"}}",
}
ret := make(map[string]string, len(req.Tpl.Content))
for k, v := range req.Tpl.Content {
text := strings.Join(append(defs, v), "")
@@ -206,7 +208,7 @@ func (rt *Router) eventsMessage(c *gin.Context) {
}
var buf bytes.Buffer
err = tpl.Execute(&buf, renderData)
err = tpl.Execute(&buf, events)
if err != nil {
ret[k] = err.Error()
continue

View File

@@ -18,9 +18,7 @@ import (
// Return all, front-end search and paging
func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
bgid := ginx.UrlParamInt64(c, "id")
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
query := ginx.QueryStr(c, "query", "")
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, -1, query)
lst, err := models.AlertMuteGetsByBG(rt.Ctx, bgid)
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -148,8 +148,6 @@ func (rt *Router) dsProxy(c *gin.Context) {
if ds.AuthJson.BasicAuthUser != "" {
req.SetBasicAuth(ds.AuthJson.BasicAuthUser, ds.AuthJson.BasicAuthPassword)
} else {
req.Header.Del("Authorization")
}
headerCount := len(ds.HTTPJson.Headers)

View File

@@ -11,7 +11,6 @@ import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/strx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/storage"
"github.com/gin-gonic/gin"
@@ -602,10 +601,3 @@ func (rt *Router) targetsOfHostQuery(c *gin.Context) {
ginx.NewRender(c).Data(lst, nil)
}
func (rt *Router) targetUpdate(c *gin.Context) {
var f idents.TargetUpdate
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(rt.IdentSet.UpdateTargets(f.Lst, f.Now))
}

View File

@@ -1,7 +1,6 @@
package router
import (
"fmt"
"net/http"
"strings"
@@ -13,7 +12,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
"gorm.io/gorm"
)
func (rt *Router) userBusiGroupsGets(c *gin.Context) {
@@ -254,200 +252,3 @@ func (rt *Router) installDateGet(c *gin.Context) {
ginx.NewRender(c).Data(rootUser.CreateAt, nil)
}
// usersPhoneEncrypt 统一手机号加密
func (rt *Router) usersPhoneEncrypt(c *gin.Context) {
users, err := models.UserGetAll(rt.Ctx)
if err != nil {
ginx.NewRender(c).Message(fmt.Errorf("get users failed: %v", err))
return
}
// 获取RSA密钥
_, publicKey, _, err := models.GetRSAKeys(rt.Ctx)
if err != nil {
ginx.NewRender(c).Message(fmt.Errorf("get RSA keys failed: %v", err))
return
}
// 先启用手机号加密功能
err = models.SetPhoneEncryptionEnabled(rt.Ctx, true)
if err != nil {
ginx.NewRender(c).Message(fmt.Errorf("enable phone encryption failed: %v", err))
return
}
// 刷新配置缓存
err = models.RefreshPhoneEncryptionCache(rt.Ctx)
if err != nil {
logger.Errorf("Failed to refresh phone encryption cache: %v", err)
// 回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, false)
ginx.NewRender(c).Message(fmt.Errorf("refresh cache failed: %v", err))
return
}
successCount := 0
failCount := 0
var failedUsers []string
// 使用事务处理所有用户的手机号加密
err = models.DB(rt.Ctx).Transaction(func(tx *gorm.DB) error {
// 对每个用户的手机号进行加密
for _, user := range users {
if user.Phone == "" {
continue
}
if isPhoneEncrypted(user.Phone) {
continue
}
encryptedPhone, err := secu.EncryptValue(user.Phone, publicKey)
if err != nil {
logger.Errorf("Failed to encrypt phone for user %s: %v", user.Username, err)
failCount++
failedUsers = append(failedUsers, user.Username)
continue
}
err = tx.Model(&models.User{}).Where("id = ?", user.Id).Update("phone", encryptedPhone).Error
if err != nil {
logger.Errorf("Failed to update phone for user %s: %v", user.Username, err)
failCount++
failedUsers = append(failedUsers, user.Username)
continue
}
successCount++
logger.Debugf("Successfully encrypted phone for user %s", user.Username)
}
// 如果有失败的用户,回滚事务
if failCount > 0 {
return fmt.Errorf("encrypt failed users: %d, failed users: %v", failCount, failedUsers)
}
return nil
})
if err != nil {
// 加密失败,回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, false)
models.RefreshPhoneEncryptionCache(rt.Ctx)
ginx.NewRender(c).Message(fmt.Errorf("encrypt phone failed: %v", err))
return
}
ginx.NewRender(c).Data(gin.H{
"success_count": successCount,
"fail_count": failCount,
}, nil)
}
// usersPhoneDecrypt 统一手机号解密
func (rt *Router) usersPhoneDecrypt(c *gin.Context) {
// 先关闭手机号加密功能
err := models.SetPhoneEncryptionEnabled(rt.Ctx, false)
if err != nil {
ginx.NewRender(c).Message(fmt.Errorf("disable phone encryption failed: %v", err))
return
}
// 刷新配置缓存
err = models.RefreshPhoneEncryptionCache(rt.Ctx)
if err != nil {
logger.Errorf("Failed to refresh phone encryption cache: %v", err)
// 回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, true)
ginx.NewRender(c).Message(fmt.Errorf("refresh cache failed: %v", err))
return
}
// 获取所有用户(此时加密开关已关闭,直接读取数据库原始数据)
var users []*models.User
err = models.DB(rt.Ctx).Find(&users).Error
if err != nil {
// 回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, true)
models.RefreshPhoneEncryptionCache(rt.Ctx)
ginx.NewRender(c).Message(fmt.Errorf("get users failed: %v", err))
return
}
// 获取RSA密钥
privateKey, _, password, err := models.GetRSAKeys(rt.Ctx)
if err != nil {
// 回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, true)
models.RefreshPhoneEncryptionCache(rt.Ctx)
ginx.NewRender(c).Message(fmt.Errorf("get RSA keys failed: %v", err))
return
}
successCount := 0
failCount := 0
var failedUsers []string
// 使用事务处理所有用户的手机号解密
err = models.DB(rt.Ctx).Transaction(func(tx *gorm.DB) error {
// 对每个用户的手机号进行解密
for _, user := range users {
if user.Phone == "" {
continue
}
// 检查是否是加密的手机号
if !isPhoneEncrypted(user.Phone) {
continue
}
// 对手机号进行解密
decryptedPhone, err := secu.Decrypt(user.Phone, privateKey, password)
if err != nil {
logger.Errorf("Failed to decrypt phone for user %s: %v", user.Username, err)
failCount++
failedUsers = append(failedUsers, user.Username)
continue
}
// 直接更新数据库中的手机号字段绕过GORM钩子
err = tx.Model(&models.User{}).Where("id = ?", user.Id).Update("phone", decryptedPhone).Error
if err != nil {
logger.Errorf("Failed to update phone for user %s: %v", user.Username, err)
failCount++
failedUsers = append(failedUsers, user.Username)
continue
}
successCount++
logger.Debugf("Successfully decrypted phone for user %s", user.Username)
}
// 如果有失败的用户,回滚事务
if failCount > 0 {
return fmt.Errorf("decrypt failed users: %d, failed users: %v", failCount, failedUsers)
}
return nil
})
if err != nil {
// 解密失败,回滚配置
models.SetPhoneEncryptionEnabled(rt.Ctx, true)
models.RefreshPhoneEncryptionCache(rt.Ctx)
ginx.NewRender(c).Message(fmt.Errorf("decrypt phone failed: %v", err))
return
}
ginx.NewRender(c).Data(gin.H{
"success_count": successCount,
"fail_count": failCount,
}, nil)
}
// isPhoneEncrypted 检查手机号是否已经加密
func isPhoneEncrypted(phone string) bool {
// 检查是否有 "enc:" 前缀标记
return len(phone) > 4 && phone[:4] == "enc:"
}

View File

@@ -10,20 +10,12 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
)
type FixedField string
const (
FieldIndex FixedField = "_index"
FieldId FixedField = "_id"
)
type Query struct {
@@ -45,18 +37,6 @@ type Query struct {
Timeout int `json:"timeout" mapstructure:"timeout"`
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
}
type SortField struct {
Field string `json:"field" mapstructure:"field"`
Ascending bool `json:"ascending" mapstructure:"ascending"`
}
type SearchAfter struct {
SortFields []SortField `json:"sort_fields" mapstructure:"sort_fields"` // 指定排序字段, 一般是timestamp:desc, _index:asc, _id:asc 三者组合,构成唯一的排序字段
SearchAfter []interface{} `json:"search_after" mapstructure:"search_after"` // 指定排序字段的搜索值搜索值必须和sort_fields的顺序一致为上一次查询的最后一条日志的值
}
type MetricAggr struct {
@@ -631,27 +611,14 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
if param.MaxShard < 1 {
param.MaxShard = maxShard
}
// from+size 分页方式获取日志受es 的max_result_window参数限制默认最多返回1w条日志, 可以使用search_after方式获取更多日志
source := elastic.NewSearchSource().
TrackTotalHits(true).
Query(queryString).
Size(param.Limit)
// 是否使用search_after方式
if param.SearchAfter != nil {
// 设置默认排序字段
if len(param.SearchAfter.SortFields) == 0 {
source = source.Sort(param.DateField, param.Ascending).Sort(string(FieldIndex), true).Sort(string(FieldId), true)
} else {
for _, field := range param.SearchAfter.SortFields {
source = source.Sort(field.Field, field.Ascending)
}
}
if len(param.SearchAfter.SearchAfter) > 0 {
source = source.SearchAfter(param.SearchAfter.SearchAfter...)
}
} else {
source = source.From(param.P).Sort(param.DateField, param.Ascending)
}
From(param.P).
Size(param.Limit).
Sort(param.DateField, param.Ascending)
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
if err != nil {
logger.Warningf("query data error:%v", err)

View File

@@ -100,8 +100,7 @@ func (os *OpenSearch) InitClient() error {
Header: headers,
}
// 只要有用户名就添加认证,不依赖 Enable 字段
if os.Basic.Username != "" {
if os.Basic.Enable && os.Basic.Username != "" {
options.Username = os.Basic.Username
options.Password = os.Basic.Password
}
@@ -155,9 +154,8 @@ func (os *OpenSearch) Validate(ctx context.Context) (err error) {
}
}
// 如果提供了用户名,必须同时提供密码
if len(os.Basic.Username) > 0 && len(os.Basic.Password) == 0 {
return fmt.Errorf("password is required when username is provided")
if os.Basic.Enable && (len(os.Basic.Username) == 0 || len(os.Basic.Password) == 0) {
return fmt.Errorf("need a valid user, password")
}
if os.MaxShard == 0 {

View File

@@ -836,8 +836,8 @@ CREATE TABLE `event_pipeline` (
`description` varchar(255) not null default '',
`filter_enable` tinyint(1) not null default 0,
`label_filters` text,
`attr_filters` text,
`processor_configs` text,
`attribute_filters` text,
`processors` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,

View File

@@ -235,8 +235,9 @@ CREATE TABLE `event_pipeline` (
`team_ids` text,
`description` varchar(255) not null default '',
`filter_enable` tinyint(1) not null default 0,
`attr_filters` text,
`processor_configs` text,
`label_filters` text,
`attribute_filters` text,
`processors` text,
`create_at` bigint not null default 0,
`create_by` varchar(64) not null default '',
`update_at` bigint not null default 0,

View File

@@ -22,8 +22,6 @@ import (
var FromAPIHook func()
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
func Init(ctx *ctx.Context, fromAPI bool) {
go getDatasourcesFromDBLoop(ctx, fromAPI)
}
@@ -102,10 +100,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
if DatasourceProcessHook != nil {
dss = DatasourceProcessHook(dss)
}
PutDatasources(dss)
} else {
FromAPIHook()
@@ -169,7 +163,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
if err != nil {
logger.Debugf("get plugin:%+v fail: %v", item, err)
logger.Warningf("get plugin:%+v fail: %v", item, err)
continue
}

View File

@@ -129,7 +129,9 @@ func (c *Clickhouse) QueryRows(ctx context.Context, query string) (*sql.Rows, er
// ShowDatabases lists all databases in Clickhouse
func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
res := make([]string, 0)
var (
res []string
)
rows, err := c.QueryRows(ctx, ShowDatabases)
if err != nil {
@@ -149,7 +151,9 @@ func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
// ShowTables lists all tables in a given database
func (c *Clickhouse) ShowTables(ctx context.Context, database string) ([]string, error) {
res := make([]string, 0)
var (
res []string
)
showTables := fmt.Sprintf(ShowTables, database)
rows, err := c.QueryRows(ctx, showTables)

View File

@@ -138,7 +138,7 @@ func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) {
}
defer rows.Close()
databases := make([]string, 0)
var databases []string
for rows.Next() {
var dbName string
if err := rows.Scan(&dbName); err != nil {
@@ -201,7 +201,7 @@ func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]strin
}
// 将 map 转换为切片
resources := make([]string, 0)
var resources []string
for name := range distinctName {
resources = append(resources, name)
}
@@ -226,7 +226,7 @@ func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, erro
}
defer rows.Close()
tables := make([]string, 0)
var tables []string
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {

View File

@@ -48,7 +48,7 @@ func CloseDB(db *gorm.DB) error {
// ShowTables retrieves a list of all tables in the specified database
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
tables := make([]string, 0)
var tables []string
rows, err := db.WithContext(ctx).Raw(query).Rows()
if err != nil {

View File

@@ -122,7 +122,7 @@ func (tc *Tdengine) QueryTable(query string) (APIResponse, error) {
}
func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
databases := make([]string, 0)
var databases []string
data, err := tc.QueryTable("show databases")
if err != nil {
return databases, err
@@ -135,7 +135,7 @@ func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
}
func (tc *Tdengine) ShowTables(ctx context.Context, database string) ([]string, error) {
tables := make([]string, 0)
var tables []string
sql := fmt.Sprintf("show %s", database)
data, err := tc.QueryTable(sql)
if err != nil {

4
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/expr-lang/expr v1.16.1
github.com/flashcatcloud/ibex v1.3.6
github.com/flashcatcloud/ibex v1.3.5
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/glebarez/sqlite v1.11.0
@@ -160,5 +160,3 @@ require (
)
replace golang.org/x/exp v0.0.0-20231006140011-7918f672742d => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
// replace github.com/flashcatcloud/ibex => ../github.com/flashcatcloud/ibex

4
go.sum
View File

@@ -89,8 +89,8 @@ github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/flashcatcloud/ibex v1.3.6 h1:lJShPFxcZksmkB0w99a3uROGB+Fie1NsqOlkAdar12A=
github.com/flashcatcloud/ibex v1.3.6/go.mod h1:iTU1dKT9TnDNllRPRHUOjXe+HDTQkPH2TeaucHtSuh4=
github.com/flashcatcloud/ibex v1.3.5 h1:8GOOf5+aJT0TP/MC6izz7CO5JKJSdKVFBwL0vQp93Nc=
github.com/flashcatcloud/ibex v1.3.5/go.mod h1:T8hbMUySK2q6cXUaYp0AUVeKkU9Od2LjzwmB5lmTRBM=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
{
"name": "机器常用指标(如果只想看当前业务组内的机器修改大盘变量 ident 的变量类型为机器标识即可)",
"name": "机器常用指标(使用 Categraf 作为采集器,如果只想看当前业务组内的机器修改大盘变量 ident 的变量类型为机器标识即可)",
"tags": "Categraf",
"ident": "",
"uuid": 1737103014612000,

View File

@@ -1,300 +0,0 @@
{
"name": "Host Table NG",
"tags": "Categraf",
"ident": "",
"uuid": 1756720567064000,
"configs": {
"var": [
{
"name": "prom",
"label": "PROM",
"type": "datasource",
"hide": false,
"definition": "prometheus"
},
{
"name": "ident",
"label": "机器",
"type": "query",
"hide": false,
"datasource": {
"cate": "prometheus",
"value": "${prom}"
},
"definition": "label_values(mem_free, ident)",
"multi": true,
"allOption": true
}
],
"panels": [
{
"type": "tableNG",
"id": "306cab0d-f643-4d86-94d0-248fc05fd8a8",
"layout": {
"h": 10,
"w": 24,
"x": 0,
"y": 0,
"i": "306cab0d-f643-4d86-94d0-248fc05fd8a8",
"isResizable": true
},
"version": "3.1.0",
"datasourceCate": "prometheus",
"datasourceValue": "${prom}",
"targets": [
{
"refId": "A",
"expr": "cpu_usage_active{ident=~\"$ident\"}",
"instant": true
},
{
"expr": "100 - mem_available_percent{ident=~\"$ident\"}",
"__mode__": "__query__",
"refId": "B",
"instant": true
},
{
"expr": "disk_used_percent{path=\"/\", ident=~\"$ident\"}",
"__mode__": "__query__",
"refId": "C",
"instant": true
},
{
"expr": "categraf_info{ident=~\"$ident\"}",
"__mode__": "__query__",
"refId": "D",
"instant": true
}
],
"transformationsNG": [
{
"id": "joinByField",
"options": {
"mode": "outer",
"byField": "ident"
}
},
{
"id": "organize",
"options": {
"fields": [
"ident",
"__time_0",
"__name___0",
"cpu",
"__value_#A",
"__time_1",
"__value_#B",
"__time_2",
"__name___2",
"device",
"fstype",
"mode",
"path",
"__value_#C",
"__time_3",
"__name___3",
"version",
"__value_#D"
],
"renameByName": {
"ident": "机器",
"__value_#A": "CPU利用率%",
"__value_#B": "内存利用率%",
"__value_#C": "根分区利用率%",
"version": "Categraf Version"
},
"excludeByName": {
"__time_0": true,
"__name__": true,
"agent_isp": true,
"agent_region": true,
"cpu": true,
"env": true,
"myenv": true,
"__time_1": true,
"__time_2": true,
"__name___2": true,
"device": true,
"fstype": true,
"mode": true,
"path": true,
"__name___0": true,
"__value_#D": true,
"__time_3": true,
"__name___3": true
},
"indexByName": {
"ident": 0,
"version": 1,
"__time_0": 2,
"__name___0": 3,
"agent_isp": 4,
"agent_region": 5,
"cpu": 6,
"env": 7,
"myenv": 8,
"__value_#A": 9,
"__time_1": 10,
"__value_#B": 11,
"__time_2": 12,
"__name___2": 13,
"device": 14,
"fstype": 15,
"mode": 16,
"path": 17,
"__value_#C": 18,
"__time_3": 19,
"__name___3": 20,
"__value_#D": 21
}
}
}
],
"name": "机器表格样例",
"maxPerRow": 4,
"custom": {
"showHeader": true,
"filterable": true,
"cellOptions": {
"type": "none",
"wrapText": false
}
},
"options": {
"links": [
{
"title": "详情",
"url": "/components/dashboard/detail?__uuid__=1737103014612000&ident=${ident}&prom=${prom}",
"targetBlank": true
}
],
"standardOptions": {
"decimals": 2
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"value": "CPU利用率%"
},
"properties": {
"cellOptions": {
"type": "color-background",
"mode": "lcd",
"valueDisplayMode": "text"
},
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "rgb(255, 101, 107)",
"value": 85,
"type": ""
},
{
"color": "rgba(236, 210, 69, 1)",
"value": 70,
"type": ""
},
{
"color": "rgb(44, 157, 61)",
"value": null,
"type": "base"
}
]
},
"valueMappings": [],
"standardOptions": {
"util": "percent",
"decimals": 2,
"min": 0,
"max": 100
}
}
},
{
"matcher": {
"id": "byName",
"value": "内存利用率%"
},
"properties": {
"cellOptions": {
"type": "gauge",
"mode": "lcd",
"valueDisplayMode": "text"
},
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "rgb(255, 101, 107)",
"value": 80,
"type": ""
},
{
"color": "rgba(236, 210, 69, 1)",
"value": 60,
"type": ""
},
{
"color": "rgb(44, 157, 61)",
"value": null,
"type": "base"
}
]
},
"standardOptions": {
"util": "percent",
"decimals": 2,
"min": 0,
"max": 100
}
}
},
{
"matcher": {
"id": "byName",
"value": "根分区利用率%"
},
"properties": {
"cellOptions": {
"type": "gauge",
"mode": "basic",
"valueDisplayMode": "text"
},
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "rgb(255, 101, 107)",
"value": 90,
"type": ""
},
{
"color": "rgba(236, 210, 69, 1)",
"value": 60,
"type": ""
},
{
"color": "rgb(44, 157, 61)",
"value": null,
"type": "base"
}
]
},
"standardOptions": {
"util": "percent",
"decimals": 2,
"min": 0,
"max": 100
}
}
}
]
}
],
"version": "3.1.0"
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -276,7 +276,6 @@ func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *l
// processNotifyTask 处理通知任务(仅处理 http 类型)
func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
httpClient := ncc.GetHttpClient(task.NotifyChannel.ID)
logger.Debugf("processNotifyTask: task: %+v", task)
// 现在只处理 http 类型flashduty 保持直接发送
if task.NotifyChannel.RequestType == "http" {
@@ -295,7 +294,7 @@ func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
for i := range task.Sendtos {
start := time.Now()
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, []string{task.Sendtos[i]}, httpClient)
resp = fmt.Sprintf("send_time: %s duration: %d ms %s", time.Now().Format("2006-01-02 15:04:05"), time.Since(start).Milliseconds(), resp)
resp = fmt.Sprintf("duration: %d ms %s", time.Since(start).Milliseconds(), resp)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos[i], resp, err)
@@ -449,7 +448,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
logger.Warning("SMTP configurations invalid")
return
}
logger.Debugf("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
logger.Infof("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
d := gomail.NewDialer(conf.Host, conf.Port, conf.Username, conf.Password)
if conf.InsecureSkipVerify {

View File

@@ -83,7 +83,6 @@ type AlertCurEvent struct {
NotifyVersion int `json:"notify_version" gorm:"-"` // 0: old, 1: new
NotifyRules []*EventNotifyRule `json:"notify_rules" gorm:"-"`
RecoverTime int64 `json:"recover_time" gorm:"-"`
}
type EventNotifyRule struct {

View File

@@ -423,10 +423,6 @@ func (e *AlertHisEvent) ToCur() *AlertCurEvent {
NotifyChannelsJSON: e.NotifyChannelsJSON,
NotifyGroupsJSON: e.NotifyGroupsJSON,
OriginalTagsJSON: e.OriginalTagsJSON,
NotifyRuleIds: e.NotifyRuleIds,
NotifyRules: e.NotifyRules,
NotifyVersion: e.NotifyVersion,
RecoverTime: e.RecoverTime,
}
cur.SetTagsMap()

View File

@@ -380,7 +380,8 @@ func GetHostsQuery(queries []HostQuery) []map[string]interface{} {
if q.Op == "==" {
m["target_busi_group.group_id in (?)"] = ids
} else {
m["NOT EXISTS (SELECT 1 FROM target_busi_group tbg WHERE tbg.target_ident = target.ident AND tbg.group_id IN (?))"] = ids
m["target.ident not in (select target_ident "+
"from target_busi_group where group_id in (?))"] = ids
}
case "tags":
lst := []string{}
@@ -986,8 +987,6 @@ func (ar *AlertRule) DB2FE() error {
return err
}
ar.FillSeverities()
return nil
}

View File

@@ -5,7 +5,6 @@ import (
"log"
"os"
"regexp"
"sync"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
@@ -41,68 +40,13 @@ var (
)
const (
SALT = "salt"
RSA_PRIVATE_KEY = "rsa_private_key"
RSA_PUBLIC_KEY = "rsa_public_key"
RSA_PASSWORD = "rsa_password"
JWT_SIGNING_KEY = "jwt_signing_key"
PHONE_ENCRYPTION_ENABLED = "phone_encryption_enabled" // 手机号加密开关
SALT = "salt"
RSA_PRIVATE_KEY = "rsa_private_key"
RSA_PUBLIC_KEY = "rsa_public_key"
RSA_PASSWORD = "rsa_password"
JWT_SIGNING_KEY = "jwt_signing_key"
)
// 手机号加密配置缓存
var (
phoneEncryptionCache struct {
sync.RWMutex
enabled bool
privateKey []byte
publicKey []byte
password string
loaded bool
}
)
// LoadPhoneEncryptionConfig 加载手机号加密配置到缓存
func LoadPhoneEncryptionConfig(ctx *ctx.Context) error {
enabled, err := GetPhoneEncryptionEnabled(ctx)
if err != nil {
return errors.WithMessage(err, "failed to get phone encryption enabled")
}
privateKey, publicKey, password, err := GetRSAKeys(ctx)
if err != nil {
return errors.WithMessage(err, "failed to get RSA keys")
}
phoneEncryptionCache.Lock()
defer phoneEncryptionCache.Unlock()
phoneEncryptionCache.enabled = enabled
phoneEncryptionCache.privateKey = privateKey
phoneEncryptionCache.publicKey = publicKey
phoneEncryptionCache.password = password
phoneEncryptionCache.loaded = true
logger.Debugf("Phone encryption config loaded: enabled=%v", enabled)
return nil
}
// GetPhoneEncryptionConfigFromCache 从缓存获取手机号加密配置
func GetPhoneEncryptionConfigFromCache() (enabled bool, publicKey []byte, privateKey []byte, password string, loaded bool) {
phoneEncryptionCache.RLock()
defer phoneEncryptionCache.RUnlock()
return phoneEncryptionCache.enabled,
phoneEncryptionCache.publicKey,
phoneEncryptionCache.privateKey,
phoneEncryptionCache.password,
phoneEncryptionCache.loaded
}
// RefreshPhoneEncryptionCache 刷新缓存(在修改配置后调用)
func RefreshPhoneEncryptionCache(ctx *ctx.Context) error {
return LoadPhoneEncryptionConfig(ctx)
}
func InitJWTSigningKey(ctx *ctx.Context) string {
val, err := ConfigsGet(ctx, JWT_SIGNING_KEY)
if err != nil {
@@ -254,41 +198,6 @@ func ConfigsGetFlashDutyAppKey(ctx *ctx.Context) (string, error) {
return configs[0].Cval, nil
}
// GetPhoneEncryptionEnabled 获取手机号加密是否开启
func GetPhoneEncryptionEnabled(ctx *ctx.Context) (bool, error) {
val, err := ConfigsGet(ctx, PHONE_ENCRYPTION_ENABLED)
if err != nil {
return false, err
}
return val == "true" || val == "1", nil
}
// SetPhoneEncryptionEnabled 设置手机号加密开关
func SetPhoneEncryptionEnabled(ctx *ctx.Context, enabled bool) error {
val := "false"
if enabled {
val = "true"
}
return ConfigsSet(ctx, PHONE_ENCRYPTION_ENABLED, val)
}
// GetRSAKeys 获取RSA密钥对
func GetRSAKeys(ctx *ctx.Context) (privateKey []byte, publicKey []byte, password string, err error) {
privateKeyVal, err := ConfigsGet(ctx, RSA_PRIVATE_KEY)
if err != nil {
return nil, nil, "", errors.WithMessage(err, "failed to get RSA private key")
}
publicKeyVal, err := ConfigsGet(ctx, RSA_PUBLIC_KEY)
if err != nil {
return nil, nil, "", errors.WithMessage(err, "failed to get RSA public key")
}
passwordVal, err := ConfigsGet(ctx, RSA_PASSWORD)
if err != nil {
return nil, nil, "", errors.WithMessage(err, "failed to get RSA password")
}
return []byte(privateKeyVal), []byte(publicKeyVal), passwordVal, nil
}
func ConfigsSelectByCkey(ctx *ctx.Context, ckey string) ([]Configs, error) {
if !ctx.IsCenter {
return []Configs{}, nil

View File

@@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"gorm.io/gorm"
)
type Datasource struct {
@@ -141,7 +140,7 @@ func (ds *Datasource) Update(ctx *ctx.Context, selectField interface{}, selectFi
if ds.UpdatedAt == 0 {
ds.UpdatedAt = time.Now().Unix()
}
return DB(ctx).Model(ds).Session(&gorm.Session{SkipHooks: true}).Select(selectField, selectFields...).Updates(ds).Error
return DB(ctx).Model(ds).Select(selectField, selectFields...).Updates(ds).Error
}
func (ds *Datasource) Add(ctx *ctx.Context) error {

View File

@@ -16,7 +16,7 @@ type EventPipeline struct {
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
TeamNames []string `json:"team_names" gorm:"-"`
Description string `json:"description" gorm:"type:varchar(255)"`
FilterEnable bool `json:"filter_enable" gorm:"type:boolean"`
FilterEnable bool `json:"filter_enable" gorm:"type:bigint"`
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
ProcessorConfigs []ProcessorConfig `json:"processors" gorm:"type:text;serializer:json"`

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"html/template"
"regexp"
"sort"
"strings"
texttemplate "text/template"
"time"
@@ -15,6 +16,7 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
// MessageTemplate 消息模板结构
@@ -209,6 +211,7 @@ func (t MsgTplList) IfUsed(nr *NotifyRule) bool {
const (
DingtalkTitle = `{{if $event.IsRecovered}} Recovered {{else}}Triggered{{end}}: {{$event.RuleName}}`
FeishuCardTitle = `🔔 {{$event.RuleName}}`
FeishuAppTitle = `{{- if $event.IsRecovered }}🔔 ﹝恢复﹞ {{$event.RuleName}}{{- else }}🔔 ﹝告警﹞ {{$event.RuleName}}{{- end -}}`
LarkCardTitle = `🔔 {{$event.RuleName}}`
)
@@ -248,7 +251,7 @@ var NewTplMap = map[string]string{
{{- end}}
{{end}}
{{$domain := "http://127.0.0.1:17000" }}
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}}) | [屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}} | [查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph){{end}}`,
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}}) | [屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}} | [查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
Email: `<!DOCTYPE html>
<html lang="en">
<head>
@@ -505,7 +508,7 @@ var NewTplMap = map[string]string{
{{- end}}
{{- end}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph){{end}}`,
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
EmailSubject: `{{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}: {{$event.RuleName}} {{$event.TagsJSON}}`,
Mm: `级别状态: S{{$event.Severity}} {{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}
规则名称: {{$event.RuleName}}{{if $event.RuleNote}}
@@ -534,7 +537,7 @@ var NewTplMap = map[string]string{
{{$time_duration := sub now.Unix $event.FirstTriggerTime }}{{if $event.IsRecovered}}{{$time_duration = sub $event.LastEvalTime $event.FirstTriggerTime }}{{end}}**距离首次告警**: {{humanizeDurationInterface $time_duration}}
**发送时间**: {{timestamp}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph){{end}}`,
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
Lark: `级别状态: S{{$event.Severity}} {{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}
规则名称: {{$event.RuleName}}{{if $event.RuleNote}}
规则备注: {{$event.RuleNote}}{{end}}
@@ -567,7 +570,7 @@ var NewTplMap = map[string]string{
{{if $event.RuleNote }}**告警描述:** **{{$event.RuleNote}}**{{end}}
{{- end -}}
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph){{end}}`,
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
SlackWebhook: `{{ if $event.IsRecovered }}
{{- if ne $event.Cate "host"}}
*Alarm cluster:* {{$event.Cluster}}{{end}}
@@ -595,7 +598,7 @@ var NewTplMap = map[string]string{
{{$domain := "http://127.0.0.1:17000" }}
<{{$domain}}/alert-his-events/{{$event.Id}}|Event Details>
<{{$domain}}/alert-mutes/add?__event_id={{$event.Id}}|Block for 1 hour>
<{{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph|View Curve>`,
<{{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}|View Curve>`,
Discord: `**Level Status**: {{if $event.IsRecovered}}S{{$event.Severity}} Recovered{{else}}S{{$event.Severity}} Triggered{{end}}
**Rule Title**: {{$event.RuleName}}{{if $event.RuleNote}}
**Rule Note**: {{$event.RuleNote}}{{end}}{{if $event.TargetIdent}}
@@ -607,7 +610,7 @@ var NewTplMap = map[string]string{
**Send Time**: {{timestamp}}
{{$domain := "http://127.0.0.1:17000" }}
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}}) | [Silence 1h]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}) | [View Graph]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph)`,
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}}) | [Silence 1h]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}) | [View Graph]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}})`,
MattermostWebhook: `{{ if $event.IsRecovered }}
{{- if ne $event.Cate "host"}}
@@ -629,7 +632,26 @@ var NewTplMap = map[string]string{
{{if $event.RuleNote }}**Alarm description:** **{{$event.RuleNote}}**{{end}}
{{- end -}}
{{$domain := "http://127.0.0.1:17000" }}
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}})|[Block for 1 hour]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}})|[View Curve]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph)`,
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}})|[Block for 1 hour]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}})|[View Curve]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}})`,
FeishuApp: `{{- if $event.IsRecovered -}}
{{- if ne $event.Cate "host" -}}
**告警集群:** {{$event.Cluster}}{{end}}
**级别状态:** S{{$event.Severity}} Recovered
**告警名称:** {{$event.RuleName}}
**事件标签:** {{$event.TagsJSON}}
**恢复时间:** {{timeformat $event.LastEvalTime}}
**告警描述:** **服务已恢复**
{{- else }}
{{- if ne $event.Cate "host"}}
**告警集群:** {{$event.Cluster}}{{end}}
**级别状态:** S{{$event.Severity}} Triggered
**告警名称:** {{$event.RuleName}}
**事件标签:** {{$event.TagsJSON}}
**触发时间:** {{timeformat $event.TriggerTime}}
**发送时间:** {{timestamp}}
**触发时值:** {{$event.TriggerValue}}
{{if $event.RuleNote }}**告警描述:** **{{$event.RuleNote}}**{{end}}
{{- end -}}`,
}
var MsgTplMap = []MessageTemplate{
@@ -648,6 +670,7 @@ var MsgTplMap = []MessageTemplate{
{Name: "Lark", Ident: Lark, Weight: 5, Content: map[string]string{"content": NewTplMap[Lark]}},
{Name: "Feishu", Ident: Feishu, Weight: 4, Content: map[string]string{"content": NewTplMap[Feishu]}},
{Name: "FeishuCard", Ident: FeishuCard, Weight: 4, Content: map[string]string{"title": FeishuCardTitle, "content": NewTplMap[FeishuCard]}},
{Name: "FeishuApp", Ident: FeishuApp, Weight: 4, Content: map[string]string{"title": FeishuAppTitle, "content": NewTplMap[FeishuApp]}},
{Name: "Wecom", Ident: Wecom, Weight: 3, Content: map[string]string{"content": NewTplMap[Wecom]}},
{Name: "Dingtalk", Ident: Dingtalk, Weight: 2, Content: map[string]string{"title": NewTplMap[EmailSubject], "content": NewTplMap[Dingtalk]}},
{Name: "Email", Ident: Email, Weight: 1, Content: map[string]string{"subject": NewTplMap[EmailSubject], "content": NewTplMap[Email]}},
@@ -693,19 +716,19 @@ func (t *MessageTemplate) Upsert(ctx *ctx.Context, ident string) error {
return tpl.Update(ctx, *t)
}
var GetDefs func(map[string]interface{}) []string
func getDefs(renderData map[string]interface{}) []string {
return []string{
"{{ $events := .events }}",
"{{ $event := index $events 0 }}",
"{{ $labels := $event.TagsMap }}",
"{{ $value := $event.TriggerValue }}",
func GetAggrKey(events []*AlertCurEvent) string {
if len(events) <= 1 {
return ""
}
}
func init() {
GetDefs = getDefs
ids := make([]string, 0)
for i := range len(events) {
ids = append(ids, fmt.Sprintf("%d", events[i].Id))
}
sort.Strings(ids)
idsStr := strings.Join(ids, ",")
logger.Debugf("notify_hook aggr_key: %s, ids: %v", str.MD5(idsStr), ids)
return str.MD5(idsStr)
}
func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interface{} {
@@ -713,13 +736,17 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
return nil
}
renderData := make(map[string]interface{})
renderData["events"] = events
// event 内容渲染到 messageTemplate
tplContent := make(map[string]interface{})
for key, msgTpl := range t.Content {
defs := GetDefs(renderData)
var defs = []string{
"{{ $events := . }}",
"{{ $event := index $events 0 }}",
"{{ $labels := $event.TagsMap }}",
"{{ $value := $event.TriggerValue }}",
}
defs = append(defs, fmt.Sprintf("{{ $aggrkey := \"%s\" }}", GetAggrKey(events)))
var body bytes.Buffer
if t.NotifyChannelIdent == "email" {
@@ -732,7 +759,7 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
}
var body bytes.Buffer
if err = tpl.Execute(&body, renderData); err != nil {
if err = tpl.Execute(&body, events); err != nil {
logger.Errorf("failed to execute template: %v", err)
tplContent[key] = fmt.Sprintf("failed to execute template: %v", err)
continue
@@ -747,7 +774,7 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
continue
}
if err = tpl.Execute(&body, renderData); err != nil {
if err = tpl.Execute(&body, events); err != nil {
logger.Errorf("failed to execute template: %v events: %v", err, events)
continue
}
@@ -768,7 +795,7 @@ func (t *MessageTemplate) RenderEvent(events []*AlertCurEvent) map[string]interf
continue
}
if err = tpl.Execute(&body, renderData); err != nil {
if err = tpl.Execute(&body, events); err != nil {
logger.Errorf("failed to execute template: %v events: %v", err, events)
tplContent[key] = fmt.Sprintf("failed to execute template: %v", err)
continue

View File

@@ -127,6 +127,7 @@ func MigrateTables(db *gorm.DB) error {
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
InsertPermPoints(db)
return nil
}
@@ -166,6 +167,110 @@ func columnHasIndex(db *gorm.DB, dst interface{}, indexColumn string) bool {
return false
}
func InsertPermPoints(db *gorm.DB) {
var ops []models.RoleOperation
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/alert-mutes/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/log/index-patterns",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/help/variable-configs",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/ibex-settings",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/del",
})
for _, op := range ops {
var count int64
session := db.Session(&gorm.Session{}).Model(&models.RoleOperation{})
err := session.Where("operation = ? AND role_name = ?", op.Operation, op.RoleName).Count(&count).Error
if err != nil {
logger.Errorf("check role operation exists failed, %v", err)
continue
}
if count > 0 {
continue
}
err = session.Create(&op).Error
if err != nil {
logger.Errorf("insert role operation failed, %v", err)
}
}
}
type AlertRule struct {
ExtraConfig string `gorm:"type:text;column:extra_config"`
CronPattern string `gorm:"type:varchar(64);column:cron_pattern"`
@@ -261,7 +366,6 @@ type BoardBusigroup struct {
type Users struct {
Belong string `gorm:"column:belong;varchar(16);default:'';comment:belong"`
LastActiveTime int64 `gorm:"column:last_active_time;type:int;default:0;comment:last_active_time"`
Phone string `gorm:"column:phone;type:varchar(1024);not null;default:''"`
}
type SsoConfig struct {

View File

@@ -2,6 +2,7 @@ package models
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/tls"
@@ -93,9 +94,6 @@ type UserInfo struct {
type FlashDutyRequestConfig struct {
Proxy string `json:"proxy"`
IntegrationUrl string `json:"integration_url"`
Timeout int `json:"timeout"` // 超时时间(毫秒)
RetryTimes int `json:"retry_times"` // 重试次数
RetrySleep int `json:"retry_sleep"` // 重试等待时间(毫秒)
}
// ParamItem 自定义参数项
@@ -198,7 +196,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
cmd.Stderr = &buf
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
res := buf.String()
@@ -317,20 +315,9 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
}
httpConfig := nc.RequestConfig.HTTPRequestConfig
// 对于 FlashDuty 类型,优先使用 FlashDuty 配置中的超时时间
timeout := httpConfig.Timeout
if nc.RequestType == "flashduty" && nc.RequestConfig.FlashDutyRequestConfig != nil {
flashDutyTimeout := nc.RequestConfig.FlashDutyRequestConfig.Timeout
if flashDutyTimeout > 0 {
timeout = flashDutyTimeout
}
if httpConfig.Timeout == 0 {
httpConfig.Timeout = 10000
}
if timeout == 0 {
timeout = 10000 // HTTP 默认 10 秒
}
if httpConfig.Concurrency == 0 {
httpConfig.Concurrency = 5
}
@@ -360,78 +347,18 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
Proxy: proxyFunc,
TLSClientConfig: tlsConfig,
DialContext: (&net.Dialer{
Timeout: time.Duration(timeout) * time.Millisecond,
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
}).DialContext,
}
client := &http.Client{
Transport: transport,
Timeout: time.Duration(timeout) * time.Millisecond,
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
}
return client, nil
}
func (ncc *NotifyChannelConfig) makeHTTPRequest(httpConfig *HTTPRequestConfig, url string, headers map[string]string, parameters map[string]string, body []byte) (*http.Request, error) {
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v", err)
return nil, err
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return nil, err
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
return req, nil
}
func (ncc *NotifyChannelConfig) makeFlashDutyRequest(url string, bodyBytes []byte, flashDutyChannelID int64) (*http.Request, error) {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyBytes))
if err != nil {
return nil, err
}
// 设置 URL 参数
query := req.URL.Query()
if flashDutyChannelID != 0 {
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
}
req.URL.RawQuery = query.Encode()
req.Header.Add("Content-Type", "application/json")
return req, nil
}
func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDutyChannelID int64, client *http.Client) (string, error) {
// todo 每一个 channel 批量发送事件
if client == nil {
@@ -443,57 +370,46 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
return "", err
}
url := ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl
retrySleep := time.Second
if ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep > 0 {
retrySleep = time.Duration(ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep) * time.Millisecond
req, err := http.NewRequest("POST", ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v, event: %v", err, events)
return "", err
}
retryTimes := 3
if ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes > 0 {
retryTimes = ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes
// 设置 URL 参数
query := req.URL.Query()
if flashDutyChannelID != 0 {
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
}
req.URL.RawQuery = query.Encode()
req.Header.Add("Content-Type", "application/json")
// 把最后一次错误保存下来,后面返回,让用户在页面上也可以看到
var lastErrorMessage string
for i := 0; i <= retryTimes; i++ {
req, err := ncc.makeFlashDutyRequest(url, body, flashDutyChannelID)
if err != nil {
logger.Errorf("send_flashduty: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
return fmt.Sprintf("failed to create request. error: %v", err), err
}
// 直接使用客户端发送请求,超时时间已经在 client 中设置
// 重试机制
for i := 0; i <= 3; i++ {
logger.Infof("send flashduty req:%+v body:%+v", req, string(body))
resp, err := client.Do(req)
if err != nil {
logger.Errorf("send_flashduty: http_call=fail url=%s request_body=%s error=%v times=%d", url, string(body), err, i+1)
if i < retryTimes {
// 重试等待时间,后面要放到页面上配置
time.Sleep(retrySleep)
}
lastErrorMessage = err.Error()
logger.Errorf("send flashduty req:%+v err:%v", req, err)
time.Sleep(time.Duration(100) * time.Millisecond)
continue
}
defer resp.Body.Close()
// 走到这里,说明请求 Flashduty 成功,不管 Flashduty 返回了什么结果,都不判断,仅保存,给用户查看即可
// 比如服务端返回 5xx也不要重试重试可能会导致服务端数据有问题。告警事件这样的东西没有那么关键只要最终能在 UI 上看到调用结果就行
var resBody []byte
if resp.Body != nil {
defer resp.Body.Close()
resBody, err = io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("send_flashduty: failed to read response. request_body=%s, error=%v", string(body), err)
resBody = []byte("failed to read response. error: " + err.Error())
}
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response: %v, event: %v", err, events)
}
logger.Infof("send_flashduty: http_call=succ url=%s request_body=%s response_code=%d response_body=%s times=%d", url, string(body), resp.StatusCode, string(resBody), i+1)
return fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(resBody)), nil
logger.Infof("send flashduty req:%+v resp:%+v body:%+v err:%v", req, resp, string(body), err)
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
return lastErrorMessage, errors.New("failed to send request")
return "", errors.New("failed to send request")
}
func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string]interface{}, params map[string]string, sendtos []string, client *http.Client) (string, error) {
@@ -531,21 +447,54 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
url, headers, parameters := ncc.replaceVariables(fullTpl)
logger.Infof("url: %v, headers: %v, parameters: %v", url, headers, parameters)
// 重试机制
var lastErrorMessage string
for i := 0; i < httpConfig.RetryTimes; i++ {
var resp *http.Response
req, err := ncc.makeHTTPRequest(httpConfig, url, headers, parameters, body)
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v, event: %v", err, events)
return "", err
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
logger.Errorf("send_http: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
return fmt.Sprintf("failed to create request. error: %v", err), err
return "", err
}
resp, err = client.Do(req)
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
// 重试机制
for i := 0; i <= httpConfig.RetryTimes; i++ {
var resp *http.Response
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(httpConfig.Timeout)*time.Millisecond)
resp, err = client.Do(req.WithContext(ctx))
cancel() // 确保释放资源
if err != nil {
logger.Errorf("send_http: failed to send http notify. url=%s request_body=%s error=%v", url, string(body), err)
lastErrorMessage = err.Error()
time.Sleep(time.Duration(httpConfig.RetryInterval) * time.Second)
logger.Errorf("send http request failed to send http notify: %v", err)
continue
}
defer resp.Body.Close()
@@ -553,9 +502,11 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
// 读取响应
body, err := io.ReadAll(resp.Body)
logger.Debugf("send http request: %+v, response: %+v, body: %+v", req, resp, string(body))
if err != nil {
logger.Errorf("send_http: failed to read response. url=%s request_body=%s error=%v", url, string(body), err)
logger.Errorf("failed to send http notify: %v", err)
}
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
@@ -563,7 +514,8 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
return "", fmt.Errorf("failed to send request, status code: %d, body: %s", resp.StatusCode, string(body))
}
return lastErrorMessage, errors.New("all retries failed, last error: " + lastErrorMessage)
return "", err
}
// getAliQuery 获取阿里云API的查询参数和请求头
@@ -1296,7 +1248,8 @@ var NotiChMap = []*NotifyChannelConfig{
Method: "POST", Headers: map[string]string{"Content-Type": "application/json"},
Timeout: 10000, Concurrency: 5, RetryTimes: 3, RetryInterval: 100,
Request: RequestDetail{
Body: `{"msg_type": "text", "content": {"text": "{{$tpl.content}}"}}`,
Parameters: map[string]string{"token": "{{$params.token}}"},
Body: `{"msg_type": "text", "content": {"text": "{{$tpl.content}}"}}`,
},
},
},
@@ -1318,7 +1271,8 @@ var NotiChMap = []*NotifyChannelConfig{
Method: "POST", Headers: map[string]string{"Content-Type": "application/json"},
Timeout: 10000, Concurrency: 5, RetryTimes: 3, RetryInterval: 100,
Request: RequestDetail{
Body: `{"msg_type": "interactive", "card": {"config": {"wide_screen_mode": true}, "header": {"title": {"content": "{{$tpl.title}}", "tag": "plain_text"}, "template": "{{if $event.IsRecovered}}green{{else}}red{{end}}"}, "elements": [{"tag": "markdown", "content": "{{$tpl.content}}"}]}}`,
Parameters: map[string]string{"token": "{{$params.token}}"},
Body: `{"msg_type": "interactive", "card": {"config": {"wide_screen_mode": true}, "header": {"title": {"content": "{{$tpl.title}}", "tag": "plain_text"}, "template": "{{if $event.IsRecovered}}green{{else}}red{{end}}"}, "elements": [{"tag": "div", "text": {"tag": "lark_md","content": "{{$tpl.content}}"}}]}}`,
},
},
},
@@ -1331,6 +1285,27 @@ var NotiChMap = []*NotifyChannelConfig{
},
},
},
{
Name: "FeishuApp", Ident: FeishuApp, RequestType: "script", Weight: 5, Enable: false,
RequestConfig: &RequestConfig{
ScriptRequestConfig: &ScriptRequestConfig{
Timeout: 10000,
ScriptType: "script",
Script: FeishuAppBody,
},
},
ParamConfig: &NotifyParamConfig{
UserInfo: &UserInfo{
ContactKey: "email",
},
Custom: Params{
Params: []ParamItem{
{Key: "feishuapp_id", CName: "FeiShuAppID", Type: "string"},
{Key: "feishuapp_secret", CName: "FeiShuAppSecret", Type: "string"},
},
},
},
},
{
Name: "Feishu", Ident: Feishu, RequestType: "http", Weight: 5, Enable: true,
RequestConfig: &RequestConfig{
@@ -1360,7 +1335,7 @@ var NotiChMap = []*NotifyChannelConfig{
Method: "POST", Headers: map[string]string{"Content-Type": "application/json"},
Timeout: 10000, Concurrency: 5, RetryTimes: 3, RetryInterval: 100,
Request: RequestDetail{
Body: `{"msg_type": "interactive", "card": {"config": {"wide_screen_mode": true}, "header": {"title": {"content": "{{$tpl.title}}", "tag": "plain_text"}, "template": "{{if $event.IsRecovered}}green{{else}}red{{end}}"}, "elements": [{"tag": "markdown", "content": "{{$tpl.content}}"}]}}`,
Body: `{"msg_type": "interactive", "card": {"config": {"wide_screen_mode": true}, "header": {"title": {"content": "{{$tpl.title}}", "tag": "plain_text"}, "template": "{{if $event.IsRecovered}}green{{else}}red{{end}}"}, "elements": [{"tag": "div", "text": {"tag": "lark_md","content": "{{$tpl.content}}"}}]}}`,
},
},
},
@@ -1446,8 +1421,6 @@ var NotiChMap = []*NotifyChannelConfig{
},
FlashDutyRequestConfig: &FlashDutyRequestConfig{
IntegrationUrl: "flashduty integration url",
Timeout: 5000, // 默认5秒超时
RetryTimes: 3, // 默认重试3次
},
},
},
@@ -1485,3 +1458,460 @@ func (ncc *NotifyChannelConfig) Upsert(ctx *ctx.Context) error {
}
return ch.Update(ctx, *ncc)
}
var FeishuAppBody = `#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import sys
import json
import requests
import logging
import re
import traceback
import os
import copy
from typing import Dict, Any, Optional
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 常量
RECOVERED = "recovered"
TRIGGERED = "triggered"
def get_access_token(app_id: str, app_secret: str) -> str:
"""获取飞书访问令牌"""
logger.info(f"正在获取飞书访问令牌... AppID: {app_id}")
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
data = {
"app_id": app_id,
"app_secret": app_secret
}
try:
logger.info(f"发送请求到 {url}")
response = requests.post(
url,
json=data,
timeout=30,
headers={"Content-Type": "application/json"}
)
logger.info(f"收到响应: 状态码={response.status_code}")
if response.status_code != 200:
logger.error(f"获取令牌失败,状态码: {response.status_code}")
logger.error(f"响应内容: {response.text}")
return ""
resp_json = response.json()
if resp_json.get("msg") != "ok":
logger.error(f"飞书获取AccessToken失败: error={resp_json.get('msg')}")
logger.error(f"完整响应: {resp_json}")
return ""
else:
token = resp_json.get("tenant_access_token", "")
logger.info(f"飞书获取AccessToken成功Token长度: {len(token)}")
return token
except Exception as e:
logger.error(f"飞书获取AccessToken异常: error={e}")
logger.error(f"错误详情: {traceback.format_exc()}")
return ""
def get_user_info(token: str, emails: list) -> dict:
"""从飞书API获取用户信息"""
url = "https://open.feishu.cn/open-apis/contact/v3/users/batch_get_id?user_id_type=user_id"
data = {"emails": emails}
try:
response = requests.post(
url,
json=data,
timeout=30,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
)
return response.json()
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return {}
def send_urgent_app(message_id: str, user_id: str, title: str, token: str) -> Optional[Exception]:
"""发送紧急应用通知"""
if not message_id:
return Exception("消息ID为空")
if not user_id:
return Exception("用户ID为空")
user_body = {
"user_id_list": [user_id],
"content": {
"text": f"紧急告警: {title}",
"type": "text"
}
}
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id}/urgent_app?user_id_type=user_id"
try:
response = requests.patch(
url,
json=user_body,
timeout=30,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
)
res = response.json()
if response.status_code != 200:
return Exception(f"请求失败,状态码 {response.status_code}")
if res.get("code", -1) != 0:
return Exception(f"飞书拒绝请求: {res.get('msg')}")
return None
except Exception as e:
return e
def create_feishu_app_body(title: str, content: str, color: str, event_url: str) -> dict:
"""创建飞书应用卡片消息体
直接使用传入的模板内容确保使用markdown解析
"""
# 修复双重转义的换行符问题
# 1. 将 \\n 转换为真正的换行符
markdown_content = content.replace('\\n', '\n')
# 2. 处理HTML转义字符
markdown_content = markdown_content.replace('&#34;', '"').replace('&gt;', '>')
# 3. 修复特殊格式问题
# 处理可能导致错误的大括号表达式
markdown_content = markdown_content.replace('{map[', '{ map[')
markdown_content = markdown_content.replace('map[v:{', 'map[v:{ ')
# 创建消息卡片
app_body = {
"config": {
"wide_screen_mode": True,
"enable_forward": True
},
"header": {
"title": {
"tag": "plain_text",
"content": title
},
"template": color
},
"elements": [
{
"tag": "markdown",
"content": markdown_content
},
{
"tag": "hr"
},
{
"tag": "action",
"actions": [
{
"tag": "button",
"text": {
"content": "告警详情",
"tag": "plain_text"
},
"type": "primary",
"url": event_url
}
]
},
{
"tag": "hr"
},
{
"tag": "note",
"elements": [
{
"tag": "lark_md",
"content": title
}
]
}
]
}
# 记录修复后的内容
logger.info(f"飞书卡片标题: {title}")
logger.info(f"飞书卡片颜色: {color}")
logger.info(f"修复后的内容预览: {markdown_content[:100]}...")
return app_body
def extract_data_from_string(stdin_data):
"""从字符串中提取关键数据返回构建的payload"""
payload = {"tpl": {}, "params": {}, "sendto": []}
# 提取tplContent
content_match = re.search(r'tplContent:map\[content:(.*?) title:(.*?)\]', stdin_data)
if content_match:
payload["tpl"]["content"] = content_match.group(1)
payload["tpl"]["title"] = content_match.group(2)
# 提取customParams
params_match = re.search(r'customParams:map\[(.*?)\]', stdin_data)
if params_match:
params_str = params_match.group(1)
# 提取domain_url
domain_match = re.search(r'domain_url:(.*?)(?: |$)', params_str)
if domain_match:
payload["params"]["domain_url"] = domain_match.group(1)
# 提取feishuapp_id
app_id_match = re.search(r'feishuapp_id:(.*?)(?: |$)', params_str)
if app_id_match:
payload["params"]["feishuapp_id"] = app_id_match.group(1)
# 提取feishuapp_secret
secret_match = re.search(r'feishuapp_secret:(.*?)(?:\s|$)', params_str)
if secret_match:
payload["params"]["feishuapp_secret"] = secret_match.group(1)
# 检查是否有err字段
err_match = re.search(r'err:(.*?)(?:,|\s|$)', stdin_data)
if err_match:
error_msg = err_match.group(1)
logger.error(f"检测到脚本错误: {error_msg}")
# 不设置默认发送目标,允许为空
return payload
def send_feishu_app(payload) -> None:
"""
发送飞书应用通知
Args:
payload: 包含告警信息的字典
"""
try:
# 提取必要参数
app_id = payload.get('params', {}).get('feishuapp_id')
app_secret = payload.get('params', {}).get('feishuapp_secret')
domain_url = "https://your-n9e-addr.com"
# 从sendto获取通知人列表
sendtos = payload.get('sendtos', [])
if isinstance(sendtos, str):
# 如果sendto是字符串按逗号分割
sendtos = [s.strip() for s in sendtos.split(',') if s.strip()]
# 检查必要参数
if not app_id or not app_secret:
logger.error("未提供有效的飞书应用凭证 (app_id 或 app_secret)")
return
# 检查发送目标,如果为空则直接返回
if not sendtos:
logger.warning("未提供发送目标,无法发送消息")
return
logger.info(f"发送目标: {sendtos}")
# 提取事件信息 - 优先使用单个event如果没有则使用events中的第一个
event = payload.get('event', {})
if not event and payload.get('events') and len(payload.get('events', [])) > 0:
event = payload.get('events')[0]
# 获取通知内容 - 使用已渲染的模板内容
content = payload.get('tpl', {}).get('content', '未找到告警内容')
title = payload.get('tpl', {}).get('title', '告警通知')
# 获取访问令牌
token = get_access_token(app_id, app_secret)
if not token:
logger.error("获取飞书访问令牌失败,无法继续")
return
# 获取用户信息
user_info = get_user_info(token, sendtos)
# 创建邮箱到用户ID的映射
user_id_map = {}
if user_info and "data" in user_info and "user_list" in user_info["data"]:
for user in user_info["data"]["user_list"]:
if user.get("email"):
user_id_map[user["email"]] = user.get("user_id", "")
# 提取事件信息
event_id = event.get('id', 0)
rule_name = event.get('rule_name', title)
severity = event.get('severity', 1) # 默认为严重级别
# 设置颜色和标题 - 根据事件是否已恢复或严重性级别
color = "red" # 默认严重告警使用红色
send_title = title
# 根据事件状态确定颜色
if "Recovered" in content:
color = "green" # 已恢复告警使用绿色
elif severity == 1:
color = "red" # 严重告警
elif severity == 2:
color = "orange" # 警告
elif severity == 3:
color = "blue" # 信息
event_url = f"{domain_url}/alert-his-events/{event_id}"
# 为每个接收者发送消息
for recipient in sendtos:
if not recipient:
continue
# 确定receive_id_type
if recipient.startswith("ou_"):
receive_type = "open_id"
elif recipient.startswith("on_"):
receive_type = "union_id"
elif recipient.startswith("oc_"):
receive_type = "chat_id"
elif "@" in recipient:
receive_type = "email"
else:
receive_type = "user_id"
fs_url = f"https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type={receive_type}"
# 创建卡片消息体 - 直接使用模板生成的内容在create_feishu_app_body中处理转义问题
app_body = create_feishu_app_body(send_title, content, color, event_url)
content_str = json.dumps(app_body)
body = {
"msg_type": "interactive",
"receive_id": recipient,
"content": content_str
}
# 发送消息
try:
response = requests.post(
fs_url,
json=body,
timeout=30,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
)
send_res = response.json()
if response.status_code != 200 or send_res.get("code", -1) != 0:
logger.error(f"飞书消息发送失败: 状态码={response.status_code}, 错误={send_res.get('msg', '未知错误')}")
continue
message_id = send_res.get("data", {}).get("message_id", "")
logger.info(f"发送成功 → {recipient} [消息ID: {message_id}]")
# 如果是高严重性,发送紧急消息
if severity == 1 and "Recovered" not in content:
user_id = user_id_map.get(recipient, "")
if user_id:
err = send_urgent_app(message_id, user_id, send_title, token)
if err:
logger.error(f"加急通知失败: {err}")
else:
logger.info(f"已发送加急通知 → {recipient}")
else:
logger.warning(f"无法发送加急: 未找到用户ID (email={recipient})")
except Exception as e:
logger.error(f"发送异常: {e}")
logger.error(f"错误详情: {traceback.format_exc()}")
except Exception as e:
logger.error(f"处理异常: {e}")
logger.error(f"错误详情: {traceback.format_exc()}")
def main():
"""主函数:读取输入数据,解析并发送飞书通知"""
try:
logger.info("开始执行飞书应用告警脚本")
payload = None
# 读取标准输入
try:
stdin_data = sys.stdin.read()
# 保存安全处理后的原始输入
try:
with open(".raw_input", 'w') as f:
sanitized_data = stdin_data.replace(r'feishuapp_secret:[^ ]*', 'feishuapp_secret:[REDACTED]')
f.write(sanitized_data)
except:
pass
# 优先尝试解析JSON
try:
payload = json.loads(stdin_data)
except json.JSONDecodeError:
# JSON解析失败尝试字符串提取
if "tplContent" in stdin_data:
payload = extract_data_from_string(stdin_data)
logger.info("从原始文本提取数据成功")
else:
logger.error("无法识别的数据格式")
payload = {
"tpl": {"content": "无法解析输入数据", "title": "告警通知"},
"params": {},
"sendto": []
}
except Exception as e:
logger.error(f"读取输入失败: {e}")
payload = {
"tpl": {"content": "读取输入失败", "title": "告警通知"},
"params": {},
"sendto": []
}
# 保存处理后的payload隐藏敏感信息
try:
with open(".payload", 'w') as f:
safe_payload = copy.deepcopy(payload)
if 'params' in safe_payload and 'feishuapp_secret' in safe_payload['params']:
safe_payload['params']['feishuapp_secret'] = '[REDACTED]'
f.write(json.dumps(safe_payload, indent=4))
except:
pass
# 处理发送
send_feishu_app(payload)
except Exception as e:
logger.error(f"处理异常: {e}")
logger.error(f"错误详情: {traceback.format_exc()}")
sys.exit(1) # 确保错误状态正确传递
# 脚本入口点 - 只有一个入口点
if __name__ == "__main__":
main()
`

View File

@@ -22,7 +22,7 @@ type NotifyRule struct {
// 通知配置
NotifyConfigs []NotifyConfig `json:"notify_configs" gorm:"serializer:json"`
ExtraConfig interface{} `json:"extra_config,omitempty" gorm:"serializer:json"`
ExtraConfig interface{} `json:"extra_config" gorm:"serializer:json"`
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
@@ -196,12 +196,7 @@ func (r *NotifyRule) Update(ctx *ctx.Context, ref NotifyRule) error {
if err != nil {
return err
}
db := DB(ctx).Model(r).Select("*")
if ref.ExtraConfig == nil {
db = db.Omit("ExtraConfig")
}
return db.Updates(ref).Error
return DB(ctx).Model(r).Select("*").Updates(ref).Error
}
func (r *NotifyRule) DB2FE() {

View File

@@ -12,8 +12,8 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/container/set"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"github.com/toolkits/pkg/logger"
"gorm.io/gorm"
)
@@ -228,9 +228,6 @@ func TargetTotal(ctx *ctx.Context, options ...BuildTargetWhereOption) (int64, er
func TargetGets(ctx *ctx.Context, limit, offset int, order string, desc bool, options ...BuildTargetWhereOption) ([]*Target, error) {
var lst []*Target
order = validateOrderField(order, "ident")
if desc {
order += " desc"
} else {
@@ -664,7 +661,7 @@ func CanMigrateBg(ctx *ctx.Context) bool {
return false
}
if cnt == 0 {
logger.Debug("target table is empty, skip migration.")
log.Println("target table is empty, skip migration.")
return false
}

View File

@@ -3,7 +3,6 @@ package models
import (
"fmt"
"os"
"regexp"
"strconv"
"strings"
"time"
@@ -11,7 +10,6 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/secu"
"github.com/ccfos/nightingale/v6/storage"
"github.com/redis/go-redis/v9"
@@ -28,6 +26,7 @@ const (
Wecom = "wecom"
Feishu = "feishu"
FeishuCard = "feishucard"
FeishuApp = "feishuapp"
Discord = "discord"
MattermostWebhook = "mattermostwebhook"
MattermostBot = "mattermostbot"
@@ -164,10 +163,6 @@ func (u *User) Verify() error {
return errors.New("Email invalid")
}
if u.Phone != "" {
return u.EncryptPhone()
}
return nil
}
@@ -327,7 +322,6 @@ func UserGet(ctx *ctx.Context, where string, args ...interface{}) (*User, error)
lst[0].RolesLst = strings.Fields(lst[0].Roles)
lst[0].Admin = lst[0].IsAdmin()
lst[0].DecryptPhone() // 解密手机号
return lst[0], nil
}
@@ -342,7 +336,6 @@ func UsersGet(ctx *ctx.Context, where string, args ...interface{}) ([]*User, err
for _, user := range lst {
user.RolesLst = strings.Fields(user.Roles)
user.Admin = user.IsAdmin()
user.DecryptPhone() // 解密手机号
}
return lst, nil
@@ -556,47 +549,6 @@ func UserTotal(ctx *ctx.Context, query string, stime, etime int64) (num int64, e
return num, nil
}
var (
// 预编译正则表达式,避免重复编译
whitespaceRegex = regexp.MustCompile(`\s+`)
validOrderRegex = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)?$`)
)
func validateOrderField(order string, defaultField string) string {
// 空值检查
if order == "" {
return defaultField
}
// 长度检查
if len(order) > 64 {
logger.Warningf("SQL injection attempt detected: order field too long (%d chars)", len(order))
return defaultField
}
// 移除所有空白字符
order = whitespaceRegex.ReplaceAllString(order, "")
if order == "" {
return defaultField
}
// 检查危险字符
orderLower := strings.ToLower(order)
if strings.ContainsAny(order, "();,'\"` --/*\\=+-*/><|&^~") ||
strings.Contains(orderLower, "0x") || strings.Contains(orderLower, "0b") {
logger.Warningf("SQL injection attempt detected: contains dangerous characters")
return defaultField
}
// 使用正则表达式验证格式:只允许字母开头的字段名,可选择性包含表名
if !validOrderRegex.MatchString(order) {
logger.Warningf("SQL injection attempt detected: invalid order field format")
return defaultField
}
return order
}
func UserGets(ctx *ctx.Context, query string, limit, offset int, stime, etime int64,
order string, desc bool, usernames, phones, emails []string) ([]User, error) {
@@ -606,8 +558,6 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int, stime, etime in
session = session.Where("last_active_time between ? and ?", stime, etime)
}
order = validateOrderField(order, "username")
if desc {
order = order + " desc"
} else {
@@ -643,7 +593,6 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int, stime, etime in
users[i].RolesLst = strings.Fields(users[i].Roles)
users[i].Admin = users[i].IsAdmin()
users[i].Password = ""
users[i].DecryptPhone() // 解密手机号
// query for user group information
var userGroupIDs []int64
@@ -685,7 +634,6 @@ func UserGetAll(ctx *ctx.Context) ([]*User, error) {
for i := 0; i < len(lst); i++ {
lst[i].RolesLst = strings.Fields(lst[i].Roles)
lst[i].Admin = lst[i].IsAdmin()
lst[i].DecryptPhone() // 解密手机号
}
}
return lst, err
@@ -702,7 +650,6 @@ func UserGetsByIds(ctx *ctx.Context, ids []int64) ([]User, error) {
for i := 0; i < len(lst); i++ {
lst[i].RolesLst = strings.Fields(lst[i].Roles)
lst[i].Admin = lst[i].IsAdmin()
lst[i].DecryptPhone() // 解密手机号
}
}
@@ -1024,60 +971,3 @@ func (u *User) AddUserAndGroups(ctx *ctx.Context, coverTeams bool) error {
return nil
}
func (u *User) EncryptPhone() (err error) {
// 从缓存获取手机号加密配置
enabled, publicKey, _, _, loaded := GetPhoneEncryptionConfigFromCache()
if !loaded {
// 如果缓存未加载,记录日志但不阻止保存
logger.Infof("Phone encryption config cache not loaded, user: %s", u.Username)
return nil
}
// 检查是否启用了手机号加密
if enabled && u.Phone != "" {
// 检查手机号是否已经加密(避免重复加密)
if len(u.Phone) > 4 && u.Phone[:4] == "enc:" {
// 已经加密,跳过
return nil
}
encryptedPhone, err := secu.EncryptValue(u.Phone, publicKey)
if err != nil {
logger.Warningf("Failed to encrypt phone: %v, user: %s", err, u.Username)
return nil
}
u.Phone = encryptedPhone
}
return nil
}
// DecryptPhone 解密用户手机号(如果已加密)
func (u *User) DecryptPhone() {
if u.Phone == "" {
return
}
// 检查手机号是否是加密格式(有 "enc:" 前缀)
if len(u.Phone) <= 4 || u.Phone[:4] != "enc:" {
// 不是加密格式,不需要解密
return
}
// 从缓存获取手机号加密配置
enabled, _, privateKey, password, loaded := GetPhoneEncryptionConfigFromCache()
if !loaded || !enabled {
// 如果缓存未加载或未启用加密,不解密
return
}
// 对手机号进行解密
decryptedPhone, err := secu.Decrypt(u.Phone, privateKey, password)
if err != nil {
// 如果解密失败,记录错误但保持原样
logger.Warningf("Failed to decrypt phone for user %s: %v", u.Username, err)
return
}
u.Phone = decryptedPhone
}

View File

@@ -12,11 +12,6 @@ import (
)
func Decrypt(cipherText string, privateKeyByte []byte, password string) (decrypted string, err error) {
// 移除 "enc:" 前缀(如果存在)
if len(cipherText) > 4 && cipherText[:4] == "enc:" {
cipherText = cipherText[4:]
}
decodeCipher, _ := base64.StdEncoding.DecodeString(cipherText)
//pem解码
block, _ := pem.Decode(privateKeyByte)
@@ -58,8 +53,7 @@ func EncryptValue(value string, publicKeyData []byte) (string, error) {
if err != nil {
return "", fmt.Errorf("failed to encrypt value: %v", err)
}
// 添加 "enc:" 前缀标记这是加密数据
return "enc:" + BASE64StdEncode(ciphertext), nil
return BASE64StdEncode(ciphertext), nil
}
func GenerateRsaKeyPair(password string) (privateByte, publicByte []byte, err error) {

View File

@@ -739,25 +739,3 @@ func JsonMarshal(v interface{}) template.HTML {
}
return template.HTML(string(json))
}
func MapDifference(firstMap, secondMap map[string]string) (map[string]string, error) {
// 创建结果 map
result := make(map[string]string)
// 遍历第一个 map将不在第二个 map 中的键值对添加到结果中
for key, value := range firstMap {
if _, exists := secondMap[key]; !exists {
result[key] = value
}
}
return result, nil
}
func TagsMapToStr(m map[string]string) string {
strs := []string{}
for key, value := range m {
strs = append(strs, key+"="+value)
}
sort.Strings(strs)
return strings.Join(strs, ",")
}

View File

@@ -62,8 +62,6 @@ var TemplateFuncMap = template.FuncMap{
"batchContactsAtsInFeishuEmail": BatchContactsAtsInFeishuEmail,
"batchContactsAtsInFeishuId": BatchContactsAtsInFeishuId,
"jsonMarshal": JsonMarshal,
"mapDifference": MapDifference,
"tagsMapToStr": TagsMapToStr,
}
// NewTemplateFuncMap copy on write for TemplateFuncMap

View File

@@ -71,24 +71,6 @@ func ValueFormatter(unit string, decimals int, value float64) FormattedValue {
}
}
// Handle positive and negative infinity
if math.IsInf(value, 1) {
return FormattedValue{
Value: 9999999999,
Unit: "",
Text: "+Inf",
Stat: 9999999999,
}
}
if math.IsInf(value, -1) {
return FormattedValue{
Value: -9999999999,
Unit: "",
Text: "-Inf",
Stat: -9999999999,
}
}
// 处理时间单位
switch unit {
case "none":

View File

@@ -117,7 +117,7 @@ func (pc *PromClientMap) loadFromDatabase() {
continue
}
logger.Infof("setClientFromPromOption success, datasourceId: %d", dsId)
logger.Info("setClientFromPromOption success: ", dsId)
PromOptions.Set(dsId, po)
continue
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
)
@@ -24,7 +23,6 @@ type Set struct {
redis storage.Redis
ctx *ctx.Context
configs pconf.Pushgw
sema *semaphore.Semaphore
}
func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
@@ -34,7 +32,6 @@ func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
ctx: ctx,
configs: configs,
}
set.sema = semaphore.NewSemaphore(configs.UpdateTargetByUrlConcurrency)
set.Init()
return set
@@ -116,26 +113,8 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
Lst: lst,
Now: now,
}
if !s.sema.TryAcquire() {
logger.Warningf("update_targets: update target by url concurrency limit, skip update target: %v", lst)
return nil // 达到并发上限,放弃请求,只是页面上的机器时间不更新,不影响机器失联告警,降级处理下
}
go func() {
defer s.sema.Release()
// 修改为异步发送,防止机器太多,每个请求耗时比较长导致机器心跳时间更新不及时
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
if err != nil {
logger.Errorf("failed to post target update: %v", err)
}
}()
return nil
}
if s.configs.UpdateDBTargetTimestampDisable {
// 如果 mysql 压力太大,关闭更新 db 的操作
return nil
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
return err
}
// there are some idents not found in db, so insert them
@@ -154,34 +133,16 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
}
// 从批量更新一批机器的时间戳改成逐台更新是为了避免批量更新时mysql的锁竞争问题
start := time.Now()
duration := time.Since(start).Seconds()
if len(exists) > 0 {
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
wg := sync.WaitGroup{}
for i := 0; i < len(exists); i++ {
sema.Acquire()
wg.Add(1)
go func(ident string) {
defer sema.Release()
defer wg.Done()
s.updateDBTargetTs(ident, now)
}(exists[i])
for i := 0; i < len(exists); i++ {
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
if err != nil {
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
}
wg.Wait()
}
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
return nil
}
func (s *Set) updateDBTargetTs(ident string, now int64) {
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
if err != nil {
logger.Error("update_target: failed to update target:", ident, "error:", err)
}
}
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
return fmt.Errorf("redis is nil")

View File

@@ -18,10 +18,6 @@ type Pushgw struct {
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
UpdateDBTargetConcurrency int
UpdateDBTargetTimestampDisable bool
PushConcurrency int
UpdateTargetByUrlConcurrency int
BusiGroupLabelKey string
IdentMetrics []string
@@ -53,7 +49,6 @@ type WriterOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string
AsyncWrite bool // 如果有多个转发 writer对应不重要的 writer可以设置为 true异步转发提供转发效率
Timeout int64
DialTimeout int64
@@ -129,18 +124,6 @@ func (p *Pushgw) PreCheck() {
p.UpdateTargetBatchSize = 20
}
if p.UpdateDBTargetConcurrency <= 0 {
p.UpdateDBTargetConcurrency = 16
}
if p.PushConcurrency <= 0 {
p.PushConcurrency = 16
}
if p.UpdateTargetByUrlConcurrency <= 0 {
p.UpdateTargetByUrlConcurrency = 10
}
if p.BusiGroupLabelKey == "" {
p.BusiGroupLabelKey = "busigroup"
}

View File

@@ -105,17 +105,6 @@ var (
},
[]string{"operation", "status"},
)
DBOperationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "db_operation_latency_seconds",
Help: "Histogram of latencies for DB operations",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
},
[]string{"operation"},
)
)
func init() {
@@ -132,6 +121,5 @@ func init() {
GaugeSampleQueueSize,
CounterPushQueueOverLimitTotal,
RedisOperationLatency,
DBOperationLatency,
)
}

View File

@@ -0,0 +1,27 @@
package json
import (
"math"
"unsafe"
jsoniter "github.com/json-iterator/go"
)
func init() {
// 为了处理prom数据中的NaN值
jsoniter.RegisterTypeEncoderFunc("float64", func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
f := *(*float64)(ptr)
if math.IsNaN(f) {
stream.WriteString("null")
} else {
stream.WriteFloat64(f)
}
}, func(ptr unsafe.Pointer) bool {
return true
})
}
func MarshalWithCustomFloat(items interface{}) ([]byte, error) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
return json.Marshal(items)
}

View File

@@ -3,9 +3,7 @@ package writer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"strings"
"sync"
@@ -17,6 +15,7 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/kafka"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/pushgw/writer/json"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
@@ -53,49 +52,8 @@ func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, e
return proto.Marshal(req)
}
// 如果是 json 格式,将 NaN 值的数据丢弃掉
return json.Marshal(filterNaNSamples(items))
}
func filterNaNSamples(items []prompb.TimeSeries) []prompb.TimeSeries {
// 早期检查如果没有NaN值直接返回原始数据
hasNaN := false
for i := range items {
for j := range items[i].Samples {
if math.IsNaN(items[i].Samples[j].Value) {
hasNaN = true
break
}
}
if hasNaN {
break
}
}
if !hasNaN {
return items
}
// 有NaN值时进行过滤原地修改以减少内存分配
for i := range items {
samples := items[i].Samples
validCount := 0
// 原地过滤 samples避免额外的内存分配
for j := range samples {
if !math.IsNaN(samples[j].Value) {
if validCount != j {
samples[validCount] = samples[j]
}
validCount++
}
}
// 保留所有时间序列即使没有有效样本此时Samples为空
items[i].Samples = samples[:validCount]
}
return items
return json.MarshalWithCustomFloat(items)
}
func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
@@ -199,11 +157,10 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}
type WritersType struct {
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
PushConcurrency atomic.Int64
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
sync.RWMutex
}
@@ -253,31 +210,6 @@ func (ws *WritersType) Put(name string, writer Writer) {
ws.backends[name] = writer
}
func (ws *WritersType) isCriticalBackend(key string) bool {
backend, exists := ws.backends[key]
if !exists {
return false
}
// 使用类型断言判断
switch backend.(type) {
case WriterType:
if backend.(WriterType).Opts.AsyncWrite {
return false
}
// HTTP Writer 作为关键后端
return true
case KafkaWriterType:
// Kafka Writer 作为非关键后端
return false
default:
// 未知类型,保守起见作为关键后端
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
return true
}
}
func (ws *WritersType) CleanExpQueue() {
for {
ws.Lock()
@@ -346,64 +278,12 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
continue
}
for key := range ws.backends {
if ws.isCriticalBackend(key) {
ws.backends[key].Write(key, series)
} else {
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
ws.writeToNonCriticalBackend(key, series)
}
ws.backends[key].Write(key, series)
}
}
}
}
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
// 原子性地检查并增加并发数
currentConcurrency := ws.PushConcurrency.Add(1)
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
// 超过限制,立即减少计数并丢弃
ws.PushConcurrency.Add(-1)
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
return
}
// 深拷贝数据,确保并发安全
seriesCopy := ws.deepCopySeries(series)
// 启动goroutine处理
go func(backendKey string, data []prompb.TimeSeries) {
defer func() {
ws.PushConcurrency.Add(-1)
if r := recover(); r != nil {
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
}
}()
ws.backends[backendKey].Write(backendKey, data)
}(key, seriesCopy)
}
// 完整的深拷贝方法
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
seriesCopy := make([]prompb.TimeSeries, len(series))
for i := range series {
seriesCopy[i] = series[i]
if len(series[i].Samples) > 0 {
samples := make([]prompb.Sample, len(series[i].Samples))
copy(samples, series[i].Samples)
seriesCopy[i].Samples = samples
}
}
return seriesCopy
}
func (ws *WritersType) Init() error {
ws.AllQueueLen.Store(int64(0))