Compare commits

...

34 Commits

Author SHA1 Message Date
ning
63672ccaec update i18n 2025-09-19 21:10:56 +08:00
ning
aac8f6b9b7 fix: event value is inf 2025-09-19 19:33:05 +08:00
ning
ecbbc91257 fix: datasource sync 2025-09-19 17:42:36 +08:00
ning
3cbf036557 update migrate 2025-09-17 14:25:53 +08:00
pioneerlfn
4e03b47101 for slice, when marshal, return [] instead null (#2879) 2025-09-15 17:39:08 +08:00
ning
205faa79d7 refactor: pushgw writer support async 2025-09-14 12:39:40 +08:00
ning
103ae30101 update gitignore 2025-09-11 16:04:16 +08:00
ning
85c39e7398 refactor: change datasource log 2025-09-11 16:02:15 +08:00
totoro
a28d4deb21 update(es):update struct name (#2865)
Co-authored-by: zbq <zbq@flashcat.cloud>
2025-09-04 14:06:47 +08:00
totoro
5208d9505d update(es): support search_after (#2862) 2025-09-03 16:39:45 +08:00
ning
4c62e7c22e refactor: push ts to kafka 2025-09-03 11:35:53 +08:00
ning
26fb8a319d change migrate board_busigroup 2025-09-02 14:35:57 +08:00
ning
b645e170bb add addAutoIncrementPrimaryKey 2025-09-01 20:26:19 +08:00
ning
cd31458500 add addAutoIncrementPrimaryKey 2025-09-01 19:11:02 +08:00
ning
e9b5d16a5b add addAutoIncrementPrimaryKey 2025-09-01 18:18:42 +08:00
ning
9dbec79e25 add addAutoIncrementPrimaryKey 2025-09-01 16:12:37 +08:00
ning
4b9e23b592 change ops update 2025-08-28 11:12:00 +08:00
ning
60eb8d989b add UpdateDBTargetTimestampDisable 2025-08-23 13:53:59 +08:00
ning
6618d4314b refactor: push writer support sync 2025-08-22 14:59:49 +08:00
ning
f4e1865fda refactor: event_script_notify_result log add stdin 2025-08-20 14:22:20 +08:00
ning
56d118b96d refactor: add update db metric 2025-08-15 18:54:27 +08:00
ning
6bf1f203e4 fix: target update ts 2025-08-15 18:24:57 +08:00
ning
534e7c3b6b refactor: ds query api 2025-08-13 11:50:25 +08:00
ning
cd9f8bb3df refactor: ds query api 2025-08-13 11:12:33 +08:00
ning
ef6cbb3508 refactor: ds query api 2025-08-13 11:01:35 +08:00
Ulric Qin
643e1c6785 fix: update update_at when batch-updating-rules 2025-08-06 21:04:12 +08:00
Ulric Qin
497c81e774 refactor batch updating rules 2025-08-06 21:04:12 +08:00
Ulric Qin
dc34cd6896 code refactor for batch updating 2025-08-06 21:04:12 +08:00
ning
d4db423dfe fix: new mysql db client 2025-08-04 18:39:43 +08:00
ning
d29334cfc2 add case-insensitive search for builtin payload filtering 2025-08-04 16:25:07 +08:00
Yening Qin
019b490793 fix: event query log (#2813) 2025-08-01 16:24:12 +08:00
ning
bfcfd5f38a fix: pgsql cross database query 2025-07-31 11:50:53 +08:00
Yening Qin
c4a2cb5209 refactor: update duty user sync (#2806) 2025-07-28 14:32:10 +08:00
Yening Qin
e056e01786 code refactor (#2800) 2025-07-23 22:04:47 +08:00
28 changed files with 478 additions and 197 deletions

1
.gitignore vendored
View File

@@ -58,6 +58,7 @@ _test
.idea
.index
.vscode
.claude
.DS_Store
.cache-loader
.payload

1
.issue Symbolic link
View File

@@ -0,0 +1 @@
/Users/ning/qinyening.com/issue/n9e

View File

@@ -390,9 +390,10 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
return payloads
}
queryLower := strings.ToLower(query)
var filtered []*models.BuiltinPayload
for _, p := range payloads {
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
filtered = append(filtered, p)
}
}

View File

@@ -465,8 +465,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "fields empty")
}
f.Fields["update_by"] = c.MustGet("username").(string)
f.Fields["update_at"] = time.Now().Unix()
updateBy := c.MustGet("username").(string)
updateAt := time.Now().Unix()
for i := 0; i < len(f.Ids); i++ {
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
@@ -483,7 +483,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(originRule)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
continue
}
}
@@ -496,7 +495,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -509,7 +507,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -519,7 +516,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
callback := callbacks.(string)
if !strings.Contains(ar.Callbacks, callback) {
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
continue
}
}
}
@@ -529,7 +525,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
if callbacks, has := f.Fields["callbacks"]; has {
callback := callbacks.(string)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
continue
}
}
@@ -539,21 +534,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
bytes, err := json.Marshal(datasourceQueries)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
continue
}
}
// 检测是否是批量更新通知规则的字段,如果是清理掉旧版本的配置
for k := range f.Fields {
if k == "notify_rule_ids" {
f.Fields["notify_version"] = 1
f.Fields["notify_channels"] = ""
f.Fields["notify_groups"] = ""
f.Fields["callbacks"] = ""
}
if k == "notify_channels" {
f.Fields["notify_version"] = 0
}
}
@@ -569,6 +549,12 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
}
}
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
"update_by": updateBy,
"update_at": updateAt,
}))
}
ginx.NewRender(c).Message(nil)

View File

@@ -83,7 +83,34 @@ func (rt *Router) datasourceBriefs(c *gin.Context) {
dss = rt.DatasourceCache.DatasourceFilter(dss, user)
}
ginx.NewRender(c).Data(dss, err)
// 实现搜索过滤逻辑
query := ginx.QueryStr(c, "query", "")
var intersection []*models.Datasource
if query != "" {
keywords := strings.Fields(strings.ToLower(query))
if len(keywords) > 0 {
// 将 PluginType 和 name 拼成一个字符串来搜索
for _, ds := range dss {
searchStr := strings.ToLower(ds.PluginType + " " + ds.Name + " ")
matchedCount := 0
// 检查是否所有关键词都匹配(交集逻辑)
for _, keyword := range keywords {
if strings.Contains(searchStr, keyword) {
matchedCount++
}
}
// 只有当所有关键词都匹配时才添加到结果中
if matchedCount == len(keywords) {
intersection = append(intersection, ds)
}
}
}
} else {
// 如果没有查询条件,返回所有数据源
intersection = dss
}
ginx.NewRender(c).Data(intersection, err)
}
func (rt *Router) datasourceUpsert(c *gin.Context) {

View File

@@ -149,6 +149,12 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
f.Fields["datasource_queries"] = string(bytes)
}
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
bytes, err := json.Marshal(datasourceIds)
ginx.Dangerous(err)
f.Fields["datasource_ids"] = string(bytes)
}
for i := 0; i < len(f.Ids); i++ {
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
ginx.Dangerous(err)

View File

@@ -10,12 +10,20 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
)
type FixedField string
const (
FieldIndex FixedField = "_index"
FieldId FixedField = "_id"
)
type Query struct {
@@ -37,6 +45,18 @@ type Query struct {
Timeout int `json:"timeout" mapstructure:"timeout"`
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
}
type SortField struct {
Field string `json:"field" mapstructure:"field"`
Ascending bool `json:"ascending" mapstructure:"ascending"`
}
type SearchAfter struct {
SortFields []SortField `json:"sort_fields" mapstructure:"sort_fields"` // 指定排序字段, 一般是timestamp:desc, _index:asc, _id:asc 三者组合,构成唯一的排序字段
SearchAfter []interface{} `json:"search_after" mapstructure:"search_after"` // 指定排序字段的搜索值搜索值必须和sort_fields的顺序一致为上一次查询的最后一条日志的值
}
type MetricAggr struct {
@@ -271,7 +291,10 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
}
for i := 0; i < len(eventTags); i++ {
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
}
if len(eventTags) > 0 {
@@ -295,7 +318,10 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
}
for i := 0; i < len(eventTags); i++ {
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
}
if len(eventTags) > 0 {
@@ -605,14 +631,27 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
if param.MaxShard < 1 {
param.MaxShard = maxShard
}
// from+size 分页方式获取日志受es 的max_result_window参数限制默认最多返回1w条日志, 可以使用search_after方式获取更多日志
source := elastic.NewSearchSource().
TrackTotalHits(true).
Query(queryString).
From(param.P).
Size(param.Limit).
Sort(param.DateField, param.Ascending)
Size(param.Limit)
// 是否使用search_after方式
if param.SearchAfter != nil {
// 设置默认排序字段
if len(param.SearchAfter.SortFields) == 0 {
source = source.Sort(param.DateField, param.Ascending).Sort(string(FieldIndex), true).Sort(string(FieldId), true)
} else {
for _, field := range param.SearchAfter.SortFields {
source = source.Sort(field.Field, field.Ascending)
}
}
if len(param.SearchAfter.SearchAfter) > 0 {
source = source.SearchAfter(param.SearchAfter.SearchAfter...)
}
} else {
source = source.From(param.P).Sort(param.DateField, param.Ascending)
}
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
if err != nil {
logger.Warningf("query data error:%v", err)

View File

@@ -23,7 +23,7 @@ const (
)
var (
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
)
func init() {
@@ -162,6 +162,7 @@ func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models
return nil, err
}
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
@@ -229,6 +230,7 @@ func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interfa
p.Shards[0].DB = db
}
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
@@ -280,7 +282,17 @@ func parseDBName(sql string) (db string, err error) {
if len(matches) != 4 {
return "", fmt.Errorf("no valid table name in format database.schema.table found")
}
return matches[1], nil
return strings.Trim(matches[1], `"`), nil
}
// formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
// 在pgsql中大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的所以这里转为 "dbname".scheme."tabname"
func formatSQLDatabaseNameWithRegex(sql string) string {
// 匹配 from dbname.scheme.table_name 的模式
// 使用捕获组来精确匹配数据库名称确保后面跟着scheme和table
re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
}
func extractColumns(sql string) ([]string, error) {

View File

@@ -22,6 +22,8 @@ import (
var FromAPIHook func()
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
func Init(ctx *ctx.Context, fromAPI bool) {
go getDatasourcesFromDBLoop(ctx, fromAPI)
}
@@ -100,6 +102,10 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
if DatasourceProcessHook != nil {
dss = DatasourceProcessHook(dss)
}
PutDatasources(dss)
} else {
FromAPIHook()
@@ -163,7 +169,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
if err != nil {
logger.Warningf("get plugin:%+v fail: %v", item, err)
logger.Debugf("get plugin:%+v fail: %v", item, err)
continue
}

View File

@@ -129,9 +129,7 @@ func (c *Clickhouse) QueryRows(ctx context.Context, query string) (*sql.Rows, er
// ShowDatabases lists all databases in Clickhouse
func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
var (
res []string
)
res := make([]string, 0)
rows, err := c.QueryRows(ctx, ShowDatabases)
if err != nil {
@@ -151,9 +149,7 @@ func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
// ShowTables lists all tables in a given database
func (c *Clickhouse) ShowTables(ctx context.Context, database string) ([]string, error) {
var (
res []string
)
res := make([]string, 0)
showTables := fmt.Sprintf(ShowTables, database)
rows, err := c.QueryRows(ctx, showTables)

View File

@@ -138,7 +138,7 @@ func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) {
}
defer rows.Close()
var databases []string
databases := make([]string, 0)
for rows.Next() {
var dbName string
if err := rows.Scan(&dbName); err != nil {
@@ -201,7 +201,7 @@ func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]strin
}
// 将 map 转换为切片
var resources []string
resources := make([]string, 0)
for name := range distinctName {
resources = append(resources, name)
}
@@ -226,7 +226,7 @@ func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, erro
}
defer rows.Close()
var tables []string
tables := make([]string, 0)
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {

View File

@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
}()
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
return sqlbase.NewDB(
db, err = sqlbase.NewDB(
ctx,
mysql.Open(dsn),
shard.MaxIdleConns,
shard.MaxOpenConns,
time.Duration(shard.ConnMaxLifetime)*time.Second,
)
return db, err
}
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {

View File

@@ -48,7 +48,7 @@ func CloseDB(db *gorm.DB) error {
// ShowTables retrieves a list of all tables in the specified database
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
var tables []string
tables := make([]string, 0)
rows, err := db.WithContext(ctx).Raw(query).Rows()
if err != nil {

View File

@@ -122,7 +122,7 @@ func (tc *Tdengine) QueryTable(query string) (APIResponse, error) {
}
func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
var databases []string
databases := make([]string, 0)
data, err := tc.QueryTable("show databases")
if err != nil {
return databases, err
@@ -135,7 +135,7 @@ func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
}
func (tc *Tdengine) ShowTables(ctx context.Context, database string) ([]string, error) {
var tables []string
tables := make([]string, 0)
sql := fmt.Sprintf("show %s", database)
data, err := tc.QueryTable(sql)
if err != nil {

View File

@@ -502,7 +502,11 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
}
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
// 记录通知详情
if ncc.notifyRecordFunc != nil {
target := strings.Join(m.Mail.GetHeader("To"), ",")
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
}
size++
if size >= conf.Batch {

View File

@@ -735,6 +735,15 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
return DB(ctx).Model(ar).Updates(updates).Error
}
if column == "notify_groups" || column == "notify_channels" {
updates := map[string]interface{}{
column: value,
"notify_version": 0,
"notify_rule_ids": []int64{},
}
return DB(ctx).Model(ar).Updates(updates).Error
}
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"gorm.io/gorm"
)
type Datasource struct {
@@ -140,7 +141,7 @@ func (ds *Datasource) Update(ctx *ctx.Context, selectField interface{}, selectFi
if ds.UpdatedAt == 0 {
ds.UpdatedAt = time.Now().Unix()
}
return DB(ctx).Model(ds).Select(selectField, selectFields...).Updates(ds).Error
return DB(ctx).Model(ds).Session(&gorm.Session{SkipHooks: true}).Select(selectField, selectFields...).Updates(ds).Error
}
func (ds *Datasource) Add(ctx *ctx.Context) error {

View File

@@ -127,7 +127,9 @@ func MigrateTables(db *gorm.DB) error {
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
InsertPermPoints(db)
// 添加自增主键 ii 列到指定表
// addAutoIncrementPrimaryKey(db)
return nil
}
@@ -167,110 +169,6 @@ func columnHasIndex(db *gorm.DB, dst interface{}, indexColumn string) bool {
return false
}
func InsertPermPoints(db *gorm.DB) {
var ops []models.RoleOperation
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/alert-mutes/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/log/index-patterns",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/help/variable-configs",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/ibex-settings",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/del",
})
for _, op := range ops {
var count int64
session := db.Session(&gorm.Session{}).Model(&models.RoleOperation{})
err := session.Where("operation = ? AND role_name = ?", op.Operation, op.RoleName).Count(&count).Error
if err != nil {
logger.Errorf("check role operation exists failed, %v", err)
continue
}
if count > 0 {
continue
}
err = session.Create(&op).Error
if err != nil {
logger.Errorf("insert role operation failed, %v", err)
}
}
}
type AlertRule struct {
ExtraConfig string `gorm:"type:text;column:extra_config"`
CronPattern string `gorm:"type:varchar(64);column:cron_pattern"`
@@ -473,3 +371,43 @@ type NotifyChannelConfig struct {
func (c *NotifyChannelConfig) TableName() string {
return "notify_channel"
}
// addAutoIncrementPrimaryKey 为指定表添加自增主键 ii 列
func addAutoIncrementPrimaryKey(db *gorm.DB) {
// 只在 MySQL 数据库上执行这些操作
switch db.Dialector.(type) {
case *mysql.Dialector:
// 为 task_scheduler_health 表添加 ii 列作为主键
if db.Migrator().HasTable("task_scheduler_health") && !db.Migrator().HasColumn("task_scheduler_health", "ii") {
err := db.Exec("ALTER TABLE `task_scheduler_health` ADD `ii` INT PRIMARY KEY AUTO_INCREMENT").Error
if err != nil {
logger.Errorf("failed to add ii column to task_scheduler_health: %v", err)
}
}
// 为 board_busigroup 表重构主键
if db.Migrator().HasTable("board_busigroup") && !db.Migrator().HasColumn("board_busigroup", "ii") {
// 使用 SHOW KEYS 检查主键是否存在(权限要求更低)
var keyInfo []map[string]interface{}
err := db.Raw("SHOW KEYS FROM `board_busigroup` WHERE Key_name = 'PRIMARY'").Find(&keyInfo).Error
if err != nil {
logger.Errorf("failed to check primary key for board_busigroup: %v", err)
}
// 只有当主键存在时才删除
if len(keyInfo) > 0 {
err = db.Exec("ALTER TABLE `board_busigroup` DROP PRIMARY KEY").Error
if err != nil {
logger.Errorf("failed to drop primary key from board_busigroup: %v", err)
}
}
// 添加新的自增主键列
err = db.Exec("ALTER TABLE `board_busigroup` ADD COLUMN `ii` INT AUTO_INCREMENT PRIMARY KEY FIRST").Error
if err != nil {
logger.Errorf("failed to add new primary key to board_busigroup: %v", err)
}
}
default:
}
}

View File

@@ -196,7 +196,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
cmd.Stderr = &buf
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
res := buf.String()

View File

@@ -89,7 +89,7 @@ func diffMap(m1, m2 map[int64]*models.User) []models.User {
func updateUser(appKey string, m1, m2 map[int64]*models.User) {
for i := range m1 {
if _, ok := m2[i]; ok {
if m1[i].Email != m2[i].Email || m1[i].Phone != m2[i].Phone || m1[i].Username != m2[i].Username {
if m1[i].Email != m2[i].Email || !PhoneIsSame(m1[i].Phone, m2[i].Phone) || m1[i].Username != m2[i].Username {
var flashdutyUser User
flashdutyUser = User{
@@ -110,6 +110,30 @@ func updateUser(appKey string, m1, m2 map[int64]*models.User) {
}
}
func PhoneIsSame(phone1, phone2 string) bool {
// 兼容不同国家/地区前缀,例如 +86、+1、+44 等,以及包含空格或短横线的格式
normalize := func(p string) string {
p = strings.TrimSpace(p)
p = strings.ReplaceAll(p, " ", "")
p = strings.ReplaceAll(p, "-", "")
p = strings.TrimPrefix(p, "+")
return p
}
p1 := normalize(phone1)
p2 := normalize(phone2)
if p1 == p2 {
return true
}
// 如果长度相差不超过 3 且较长的以较短的结尾,则认为是相同号码(忽略最多 3 位国家区号差异)
if len(p1) > len(p2) {
return len(p1)-len(p2) <= 3 && strings.HasSuffix(p1, p2)
}
return len(p2)-len(p1) <= 3 && strings.HasSuffix(p2, p1)
}
type User struct {
Email string `json:"email,omitempty"`
Phone string `json:"phone,omitempty"`

View File

@@ -0,0 +1,67 @@
package flashduty
import "testing"
func TestPhoneIsSame(t *testing.T) {
tests := []struct {
name string
phone1 string
phone2 string
same bool
}{
{
name: "blank",
phone1: "",
phone2: "",
same: true,
},
{
name: "China +86 prefix",
phone1: "+8613812345678",
phone2: "13812345678",
same: true,
},
{
name: "China +86 with spaces and hyphens",
phone1: "+86 138-1234-5678",
phone2: "13812345678",
same: true,
},
{
name: "USA +1 prefix",
phone1: "+1 234-567-8900",
phone2: "2345678900",
same: true,
},
{
name: "UK +44 prefix",
phone1: "+442078765432",
phone2: "2078765432",
same: true,
},
{
name: "India +91 prefix",
phone1: "+919876543210",
phone2: "9876543210",
same: true,
},
{
name: "Germany +49 prefix",
phone1: "+4915123456789",
phone2: "15123456789",
same: true,
},
{
name: "Different numbers",
phone1: "+8613812345678",
phone2: "13812345679",
same: false,
},
}
for _, tt := range tests {
if got := PhoneIsSame(tt.phone1, tt.phone2); got != tt.same {
t.Errorf("%s: expected %v, got %v", tt.name, tt.same, got)
}
}
}

View File

@@ -77,6 +77,7 @@ var I18N = `{
"event tag not match tag filter": "事件标签不匹配标签过滤器",
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
"Alert history events deletion started": "告警历史事件删除任务已启动",
"Infrastructure": "基础设施",
"Host - View": "机器 - 查看",
@@ -267,6 +268,7 @@ var I18N = `{
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
"Alert history events deletion started": "告警歷史事件刪除任務已啟動",
"Infrastructure": "基礎設施",
"Host - View": "機器 - 查看",
@@ -454,6 +456,7 @@ var I18N = `{
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
"Alert history events deletion started": "アラート履歴イベントの削除タスクが開始されました",
"Infrastructure": "インフラストラクチャ",
"Host - View": "機器 - 閲覧",
@@ -641,6 +644,7 @@ var I18N = `{
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
"Alert history events deletion started": "Задача удаления исторических событий оповещений запущена",
"Infrastructure": "Инфраструктура",
"Host - View": "Хост - Просмотр",

View File

@@ -71,6 +71,24 @@ func ValueFormatter(unit string, decimals int, value float64) FormattedValue {
}
}
// Handle positive and negative infinity
if math.IsInf(value, 1) {
return FormattedValue{
Value: 9999999999,
Unit: "",
Text: "+Inf",
Stat: 9999999999,
}
}
if math.IsInf(value, -1) {
return FormattedValue{
Value: -9999999999,
Unit: "",
Text: "-Inf",
Stat: -9999999999,
}
}
// 处理时间单位
switch unit {
case "none":

View File

@@ -13,6 +13,7 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
)
@@ -117,6 +118,11 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
return err
}
if s.configs.UpdateDBTargetTimestampDisable {
// 如果 mysql 压力太大,关闭更新 db 的操作
return nil
}
// there are some idents not found in db, so insert them
var exists []string
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
@@ -133,16 +139,34 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
}
// 从批量更新一批机器的时间戳改成逐台更新是为了避免批量更新时mysql的锁竞争问题
for i := 0; i < len(exists); i++ {
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
if err != nil {
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
start := time.Now()
duration := time.Since(start).Seconds()
if len(exists) > 0 {
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
wg := sync.WaitGroup{}
for i := 0; i < len(exists); i++ {
sema.Acquire()
wg.Add(1)
go func(ident string) {
defer sema.Release()
defer wg.Done()
s.updateDBTargetTs(ident, now)
}(exists[i])
}
wg.Wait()
}
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
return nil
}
func (s *Set) updateDBTargetTs(ident string, now int64) {
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
if err != nil {
logger.Error("update_target: failed to update target:", ident, "error:", err)
}
}
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
return fmt.Errorf("redis is nil")

View File

@@ -18,6 +18,9 @@ type Pushgw struct {
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
UpdateDBTargetConcurrency int
UpdateDBTargetTimestampDisable bool
PushConcurrency int
BusiGroupLabelKey string
IdentMetrics []string
@@ -49,6 +52,7 @@ type WriterOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string
AsyncWrite bool // 如果有多个转发 writer对应不重要的 writer可以设置为 true异步转发提供转发效率
Timeout int64
DialTimeout int64
@@ -124,6 +128,14 @@ func (p *Pushgw) PreCheck() {
p.UpdateTargetBatchSize = 20
}
if p.UpdateDBTargetConcurrency <= 0 {
p.UpdateDBTargetConcurrency = 16
}
if p.PushConcurrency <= 0 {
p.PushConcurrency = 16
}
if p.BusiGroupLabelKey == "" {
p.BusiGroupLabelKey = "busigroup"
}

View File

@@ -105,6 +105,17 @@ var (
},
[]string{"operation", "status"},
)
DBOperationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "db_operation_latency_seconds",
Help: "Histogram of latencies for DB operations",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
},
[]string{"operation"},
)
)
func init() {
@@ -121,5 +132,6 @@ func init() {
GaugeSampleQueueSize,
CounterPushQueueOverLimitTotal,
RedisOperationLatency,
DBOperationLatency,
)
}

View File

@@ -1,27 +0,0 @@
package json
import (
"math"
"unsafe"
jsoniter "github.com/json-iterator/go"
)
func init() {
// 为了处理prom数据中的NaN值
jsoniter.RegisterTypeEncoderFunc("float64", func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
f := *(*float64)(ptr)
if math.IsNaN(f) {
stream.WriteString("null")
} else {
stream.WriteFloat64(f)
}
}, func(ptr unsafe.Pointer) bool {
return true
})
}
func MarshalWithCustomFloat(items interface{}) ([]byte, error) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
return json.Marshal(items)
}

View File

@@ -3,7 +3,9 @@ package writer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"strings"
"sync"
@@ -15,7 +17,6 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/kafka"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/pushgw/writer/json"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
@@ -52,8 +53,49 @@ func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, e
return proto.Marshal(req)
}
// 如果是 json 格式,将 NaN 值的数据丢弃掉
return json.Marshal(filterNaNSamples(items))
}
return json.MarshalWithCustomFloat(items)
func filterNaNSamples(items []prompb.TimeSeries) []prompb.TimeSeries {
// 早期检查如果没有NaN值直接返回原始数据
hasNaN := false
for i := range items {
for j := range items[i].Samples {
if math.IsNaN(items[i].Samples[j].Value) {
hasNaN = true
break
}
}
if hasNaN {
break
}
}
if !hasNaN {
return items
}
// 有NaN值时进行过滤原地修改以减少内存分配
for i := range items {
samples := items[i].Samples
validCount := 0
// 原地过滤 samples避免额外的内存分配
for j := range samples {
if !math.IsNaN(samples[j].Value) {
if validCount != j {
samples[validCount] = samples[j]
}
validCount++
}
}
// 保留所有时间序列即使没有有效样本此时Samples为空
items[i].Samples = samples[:validCount]
}
return items
}
func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
@@ -157,10 +199,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}
type WritersType struct {
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
PushConcurrency atomic.Int64
sync.RWMutex
}
@@ -210,6 +253,31 @@ func (ws *WritersType) Put(name string, writer Writer) {
ws.backends[name] = writer
}
func (ws *WritersType) isCriticalBackend(key string) bool {
backend, exists := ws.backends[key]
if !exists {
return false
}
// 使用类型断言判断
switch backend.(type) {
case WriterType:
if backend.(WriterType).Opts.AsyncWrite {
return false
}
// HTTP Writer 作为关键后端
return true
case KafkaWriterType:
// Kafka Writer 作为非关键后端
return false
default:
// 未知类型,保守起见作为关键后端
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
return true
}
}
func (ws *WritersType) CleanExpQueue() {
for {
ws.Lock()
@@ -278,12 +346,64 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
continue
}
for key := range ws.backends {
ws.backends[key].Write(key, series)
if ws.isCriticalBackend(key) {
ws.backends[key].Write(key, series)
} else {
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
ws.writeToNonCriticalBackend(key, series)
}
}
}
}
}
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
// 原子性地检查并增加并发数
currentConcurrency := ws.PushConcurrency.Add(1)
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
// 超过限制,立即减少计数并丢弃
ws.PushConcurrency.Add(-1)
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
return
}
// 深拷贝数据,确保并发安全
seriesCopy := ws.deepCopySeries(series)
// 启动goroutine处理
go func(backendKey string, data []prompb.TimeSeries) {
defer func() {
ws.PushConcurrency.Add(-1)
if r := recover(); r != nil {
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
}
}()
ws.backends[backendKey].Write(backendKey, data)
}(key, seriesCopy)
}
// 完整的深拷贝方法
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
seriesCopy := make([]prompb.TimeSeries, len(series))
for i := range series {
seriesCopy[i] = series[i]
if len(series[i].Samples) > 0 {
samples := make([]prompb.Sample, len(series[i].Samples))
copy(samples, series[i].Samples)
seriesCopy[i].Samples = samples
}
}
return seriesCopy
}
func (ws *WritersType) Init() error {
ws.AllQueueLen.Store(int64(0))