mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
34 Commits
v8.2.2
...
release-18
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63672ccaec | ||
|
|
aac8f6b9b7 | ||
|
|
ecbbc91257 | ||
|
|
3cbf036557 | ||
|
|
4e03b47101 | ||
|
|
205faa79d7 | ||
|
|
103ae30101 | ||
|
|
85c39e7398 | ||
|
|
a28d4deb21 | ||
|
|
5208d9505d | ||
|
|
4c62e7c22e | ||
|
|
26fb8a319d | ||
|
|
b645e170bb | ||
|
|
cd31458500 | ||
|
|
e9b5d16a5b | ||
|
|
9dbec79e25 | ||
|
|
4b9e23b592 | ||
|
|
60eb8d989b | ||
|
|
6618d4314b | ||
|
|
f4e1865fda | ||
|
|
56d118b96d | ||
|
|
6bf1f203e4 | ||
|
|
534e7c3b6b | ||
|
|
cd9f8bb3df | ||
|
|
ef6cbb3508 | ||
|
|
643e1c6785 | ||
|
|
497c81e774 | ||
|
|
dc34cd6896 | ||
|
|
d4db423dfe | ||
|
|
d29334cfc2 | ||
|
|
019b490793 | ||
|
|
bfcfd5f38a | ||
|
|
c4a2cb5209 | ||
|
|
e056e01786 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -58,6 +58,7 @@ _test
|
||||
.idea
|
||||
.index
|
||||
.vscode
|
||||
.claude
|
||||
.DS_Store
|
||||
.cache-loader
|
||||
.payload
|
||||
|
||||
@@ -390,9 +390,10 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
|
||||
return payloads
|
||||
}
|
||||
|
||||
queryLower := strings.ToLower(query)
|
||||
var filtered []*models.BuiltinPayload
|
||||
for _, p := range payloads {
|
||||
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
|
||||
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,8 +465,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "fields empty")
|
||||
}
|
||||
|
||||
f.Fields["update_by"] = c.MustGet("username").(string)
|
||||
f.Fields["update_at"] = time.Now().Unix()
|
||||
updateBy := c.MustGet("username").(string)
|
||||
updateAt := time.Now().Unix()
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
|
||||
@@ -483,7 +483,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(originRule)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,7 +495,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,7 +507,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,7 +516,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
callback := callbacks.(string)
|
||||
if !strings.Contains(ar.Callbacks, callback) {
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -529,7 +525,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
if callbacks, has := f.Fields["callbacks"]; has {
|
||||
callback := callbacks.(string)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,21 +534,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
bytes, err := json.Marshal(datasourceQueries)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// 检测是否是批量更新通知规则的字段,如果是清理掉旧版本的配置
|
||||
for k := range f.Fields {
|
||||
if k == "notify_rule_ids" {
|
||||
f.Fields["notify_version"] = 1
|
||||
f.Fields["notify_channels"] = ""
|
||||
f.Fields["notify_groups"] = ""
|
||||
f.Fields["callbacks"] = ""
|
||||
}
|
||||
|
||||
if k == "notify_channels" {
|
||||
f.Fields["notify_version"] = 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -569,6 +549,12 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
|
||||
}
|
||||
}
|
||||
|
||||
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
|
||||
"update_by": updateBy,
|
||||
"update_at": updateAt,
|
||||
}))
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Message(nil)
|
||||
|
||||
@@ -83,7 +83,34 @@ func (rt *Router) datasourceBriefs(c *gin.Context) {
|
||||
dss = rt.DatasourceCache.DatasourceFilter(dss, user)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(dss, err)
|
||||
// 实现搜索过滤逻辑
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
var intersection []*models.Datasource
|
||||
if query != "" {
|
||||
keywords := strings.Fields(strings.ToLower(query))
|
||||
if len(keywords) > 0 {
|
||||
// 将 PluginType 和 name 拼成一个字符串来搜索
|
||||
for _, ds := range dss {
|
||||
searchStr := strings.ToLower(ds.PluginType + " " + ds.Name + " ")
|
||||
matchedCount := 0
|
||||
// 检查是否所有关键词都匹配(交集逻辑)
|
||||
for _, keyword := range keywords {
|
||||
if strings.Contains(searchStr, keyword) {
|
||||
matchedCount++
|
||||
}
|
||||
}
|
||||
// 只有当所有关键词都匹配时才添加到结果中
|
||||
if matchedCount == len(keywords) {
|
||||
intersection = append(intersection, ds)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果没有查询条件,返回所有数据源
|
||||
intersection = dss
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(intersection, err)
|
||||
}
|
||||
|
||||
func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
|
||||
@@ -149,6 +149,12 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
|
||||
f.Fields["datasource_queries"] = string(bytes)
|
||||
}
|
||||
|
||||
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
|
||||
bytes, err := json.Marshal(datasourceIds)
|
||||
ginx.Dangerous(err)
|
||||
f.Fields["datasource_ids"] = string(bytes)
|
||||
}
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
|
||||
ginx.Dangerous(err)
|
||||
|
||||
@@ -10,12 +10,20 @@ import (
|
||||
|
||||
"github.com/araddon/dateparse"
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
)
|
||||
|
||||
type FixedField string
|
||||
|
||||
const (
|
||||
FieldIndex FixedField = "_index"
|
||||
FieldId FixedField = "_id"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
@@ -37,6 +45,18 @@ type Query struct {
|
||||
|
||||
Timeout int `json:"timeout" mapstructure:"timeout"`
|
||||
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
|
||||
|
||||
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
|
||||
}
|
||||
|
||||
type SortField struct {
|
||||
Field string `json:"field" mapstructure:"field"`
|
||||
Ascending bool `json:"ascending" mapstructure:"ascending"`
|
||||
}
|
||||
|
||||
type SearchAfter struct {
|
||||
SortFields []SortField `json:"sort_fields" mapstructure:"sort_fields"` // 指定排序字段, 一般是timestamp:desc, _index:asc, _id:asc 三者组合,构成唯一的排序字段
|
||||
SearchAfter []interface{} `json:"search_after" mapstructure:"search_after"` // 指定排序字段的搜索值,搜索值必须和sort_fields的顺序一致,为上一次查询的最后一条日志的值
|
||||
}
|
||||
|
||||
type MetricAggr struct {
|
||||
@@ -271,7 +291,10 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
@@ -295,7 +318,10 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
@@ -605,14 +631,27 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
|
||||
if param.MaxShard < 1 {
|
||||
param.MaxShard = maxShard
|
||||
}
|
||||
|
||||
// from+size 分页方式获取日志,受es 的max_result_window参数限制,默认最多返回1w条日志, 可以使用search_after方式获取更多日志
|
||||
source := elastic.NewSearchSource().
|
||||
TrackTotalHits(true).
|
||||
Query(queryString).
|
||||
From(param.P).
|
||||
Size(param.Limit).
|
||||
Sort(param.DateField, param.Ascending)
|
||||
|
||||
Size(param.Limit)
|
||||
// 是否使用search_after方式
|
||||
if param.SearchAfter != nil {
|
||||
// 设置默认排序字段
|
||||
if len(param.SearchAfter.SortFields) == 0 {
|
||||
source = source.Sort(param.DateField, param.Ascending).Sort(string(FieldIndex), true).Sort(string(FieldId), true)
|
||||
} else {
|
||||
for _, field := range param.SearchAfter.SortFields {
|
||||
source = source.Sort(field.Field, field.Ascending)
|
||||
}
|
||||
}
|
||||
if len(param.SearchAfter.SearchAfter) > 0 {
|
||||
source = source.SearchAfter(param.SearchAfter.SearchAfter...)
|
||||
}
|
||||
} else {
|
||||
source = source.From(param.P).Sort(param.DateField, param.Ascending)
|
||||
}
|
||||
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error:%v", err)
|
||||
|
||||
@@ -23,7 +23,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
|
||||
regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -162,6 +162,7 @@ func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models
|
||||
return nil, err
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -229,6 +230,7 @@ func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interfa
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -280,7 +282,17 @@ func parseDBName(sql string) (db string, err error) {
|
||||
if len(matches) != 4 {
|
||||
return "", fmt.Errorf("no valid table name in format database.schema.table found")
|
||||
}
|
||||
return matches[1], nil
|
||||
return strings.Trim(matches[1], `"`), nil
|
||||
}
|
||||
|
||||
// formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
|
||||
// 在pgsql中,大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的,所以这里转为 "dbname".scheme."tabname"
|
||||
func formatSQLDatabaseNameWithRegex(sql string) string {
|
||||
// 匹配 from dbname.scheme.table_name 的模式
|
||||
// 使用捕获组来精确匹配数据库名称,确保后面跟着scheme和table
|
||||
re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
|
||||
|
||||
return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
|
||||
}
|
||||
|
||||
func extractColumns(sql string) ([]string, error) {
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
|
||||
var FromAPIHook func()
|
||||
|
||||
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
|
||||
|
||||
func Init(ctx *ctx.Context, fromAPI bool) {
|
||||
go getDatasourcesFromDBLoop(ctx, fromAPI)
|
||||
}
|
||||
@@ -100,6 +102,10 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
|
||||
}
|
||||
|
||||
if DatasourceProcessHook != nil {
|
||||
dss = DatasourceProcessHook(dss)
|
||||
}
|
||||
|
||||
PutDatasources(dss)
|
||||
} else {
|
||||
FromAPIHook()
|
||||
@@ -163,7 +169,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
|
||||
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
|
||||
if err != nil {
|
||||
logger.Warningf("get plugin:%+v fail: %v", item, err)
|
||||
logger.Debugf("get plugin:%+v fail: %v", item, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -129,9 +129,7 @@ func (c *Clickhouse) QueryRows(ctx context.Context, query string) (*sql.Rows, er
|
||||
|
||||
// ShowDatabases lists all databases in Clickhouse
|
||||
func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
var (
|
||||
res []string
|
||||
)
|
||||
res := make([]string, 0)
|
||||
|
||||
rows, err := c.QueryRows(ctx, ShowDatabases)
|
||||
if err != nil {
|
||||
@@ -151,9 +149,7 @@ func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
|
||||
// ShowTables lists all tables in a given database
|
||||
func (c *Clickhouse) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
var (
|
||||
res []string
|
||||
)
|
||||
res := make([]string, 0)
|
||||
|
||||
showTables := fmt.Sprintf(ShowTables, database)
|
||||
rows, err := c.QueryRows(ctx, showTables)
|
||||
|
||||
@@ -138,7 +138,7 @@ func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var databases []string
|
||||
databases := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var dbName string
|
||||
if err := rows.Scan(&dbName); err != nil {
|
||||
@@ -201,7 +201,7 @@ func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]strin
|
||||
}
|
||||
|
||||
// 将 map 转换为切片
|
||||
var resources []string
|
||||
resources := make([]string, 0)
|
||||
for name := range distinctName {
|
||||
resources = append(resources, name)
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, erro
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tables []string
|
||||
tables := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var tableName string
|
||||
if err := rows.Scan(&tableName); err != nil {
|
||||
|
||||
@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
|
||||
}()
|
||||
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
|
||||
|
||||
return sqlbase.NewDB(
|
||||
db, err = sqlbase.NewDB(
|
||||
ctx,
|
||||
mysql.Open(dsn),
|
||||
shard.MaxIdleConns,
|
||||
shard.MaxOpenConns,
|
||||
time.Duration(shard.ConnMaxLifetime)*time.Second,
|
||||
)
|
||||
return db, err
|
||||
}
|
||||
|
||||
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
|
||||
@@ -48,7 +48,7 @@ func CloseDB(db *gorm.DB) error {
|
||||
|
||||
// ShowTables retrieves a list of all tables in the specified database
|
||||
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
|
||||
var tables []string
|
||||
tables := make([]string, 0)
|
||||
|
||||
rows, err := db.WithContext(ctx).Raw(query).Rows()
|
||||
if err != nil {
|
||||
|
||||
@@ -122,7 +122,7 @@ func (tc *Tdengine) QueryTable(query string) (APIResponse, error) {
|
||||
}
|
||||
|
||||
func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
|
||||
var databases []string
|
||||
databases := make([]string, 0)
|
||||
data, err := tc.QueryTable("show databases")
|
||||
if err != nil {
|
||||
return databases, err
|
||||
@@ -135,7 +135,7 @@ func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
|
||||
}
|
||||
|
||||
func (tc *Tdengine) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
var tables []string
|
||||
tables := make([]string, 0)
|
||||
sql := fmt.Sprintf("show %s", database)
|
||||
data, err := tc.QueryTable(sql)
|
||||
if err != nil {
|
||||
|
||||
@@ -502,7 +502,11 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
|
||||
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
|
||||
}
|
||||
|
||||
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
|
||||
// 记录通知详情
|
||||
if ncc.notifyRecordFunc != nil {
|
||||
target := strings.Join(m.Mail.GetHeader("To"), ",")
|
||||
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
|
||||
}
|
||||
size++
|
||||
|
||||
if size >= conf.Batch {
|
||||
|
||||
@@ -735,6 +735,15 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
if column == "notify_groups" || column == "notify_channels" {
|
||||
updates := map[string]interface{}{
|
||||
column: value,
|
||||
"notify_version": 0,
|
||||
"notify_rule_ids": []int64{},
|
||||
}
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
|
||||
}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Datasource struct {
|
||||
@@ -140,7 +141,7 @@ func (ds *Datasource) Update(ctx *ctx.Context, selectField interface{}, selectFi
|
||||
if ds.UpdatedAt == 0 {
|
||||
ds.UpdatedAt = time.Now().Unix()
|
||||
}
|
||||
return DB(ctx).Model(ds).Select(selectField, selectFields...).Updates(ds).Error
|
||||
return DB(ctx).Model(ds).Session(&gorm.Session{SkipHooks: true}).Select(selectField, selectFields...).Updates(ds).Error
|
||||
}
|
||||
|
||||
func (ds *Datasource) Add(ctx *ctx.Context) error {
|
||||
|
||||
@@ -127,7 +127,9 @@ func MigrateTables(db *gorm.DB) error {
|
||||
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
|
||||
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
|
||||
|
||||
InsertPermPoints(db)
|
||||
// 添加自增主键 ii 列到指定表
|
||||
// addAutoIncrementPrimaryKey(db)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -167,110 +169,6 @@ func columnHasIndex(db *gorm.DB, dst interface{}, indexColumn string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func InsertPermPoints(db *gorm.DB) {
|
||||
var ops []models.RoleOperation
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/alert-mutes/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/log/index-patterns",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/help/variable-configs",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/ibex-settings",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/del",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/del",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/del",
|
||||
})
|
||||
|
||||
for _, op := range ops {
|
||||
var count int64
|
||||
|
||||
session := db.Session(&gorm.Session{}).Model(&models.RoleOperation{})
|
||||
err := session.Where("operation = ? AND role_name = ?", op.Operation, op.RoleName).Count(&count).Error
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("check role operation exists failed, %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
err = session.Create(&op).Error
|
||||
if err != nil {
|
||||
logger.Errorf("insert role operation failed, %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AlertRule struct {
|
||||
ExtraConfig string `gorm:"type:text;column:extra_config"`
|
||||
CronPattern string `gorm:"type:varchar(64);column:cron_pattern"`
|
||||
@@ -473,3 +371,43 @@ type NotifyChannelConfig struct {
|
||||
func (c *NotifyChannelConfig) TableName() string {
|
||||
return "notify_channel"
|
||||
}
|
||||
|
||||
// addAutoIncrementPrimaryKey 为指定表添加自增主键 ii 列
|
||||
func addAutoIncrementPrimaryKey(db *gorm.DB) {
|
||||
// 只在 MySQL 数据库上执行这些操作
|
||||
switch db.Dialector.(type) {
|
||||
case *mysql.Dialector:
|
||||
// 为 task_scheduler_health 表添加 ii 列作为主键
|
||||
if db.Migrator().HasTable("task_scheduler_health") && !db.Migrator().HasColumn("task_scheduler_health", "ii") {
|
||||
err := db.Exec("ALTER TABLE `task_scheduler_health` ADD `ii` INT PRIMARY KEY AUTO_INCREMENT").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to add ii column to task_scheduler_health: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 为 board_busigroup 表重构主键
|
||||
if db.Migrator().HasTable("board_busigroup") && !db.Migrator().HasColumn("board_busigroup", "ii") {
|
||||
// 使用 SHOW KEYS 检查主键是否存在(权限要求更低)
|
||||
var keyInfo []map[string]interface{}
|
||||
err := db.Raw("SHOW KEYS FROM `board_busigroup` WHERE Key_name = 'PRIMARY'").Find(&keyInfo).Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to check primary key for board_busigroup: %v", err)
|
||||
}
|
||||
|
||||
// 只有当主键存在时才删除
|
||||
if len(keyInfo) > 0 {
|
||||
err = db.Exec("ALTER TABLE `board_busigroup` DROP PRIMARY KEY").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to drop primary key from board_busigroup: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加新的自增主键列
|
||||
err = db.Exec("ALTER TABLE `board_busigroup` ADD COLUMN `ii` INT AUTO_INCREMENT PRIMARY KEY FIRST").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to add new primary key to board_busigroup: %v", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
|
||||
cmd.Stderr = &buf
|
||||
|
||||
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
|
||||
|
||||
res := buf.String()
|
||||
|
||||
|
||||
@@ -89,7 +89,7 @@ func diffMap(m1, m2 map[int64]*models.User) []models.User {
|
||||
func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
for i := range m1 {
|
||||
if _, ok := m2[i]; ok {
|
||||
if m1[i].Email != m2[i].Email || m1[i].Phone != m2[i].Phone || m1[i].Username != m2[i].Username {
|
||||
if m1[i].Email != m2[i].Email || !PhoneIsSame(m1[i].Phone, m2[i].Phone) || m1[i].Username != m2[i].Username {
|
||||
var flashdutyUser User
|
||||
|
||||
flashdutyUser = User{
|
||||
@@ -110,6 +110,30 @@ func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
}
|
||||
}
|
||||
|
||||
func PhoneIsSame(phone1, phone2 string) bool {
|
||||
// 兼容不同国家/地区前缀,例如 +86、+1、+44 等,以及包含空格或短横线的格式
|
||||
normalize := func(p string) string {
|
||||
p = strings.TrimSpace(p)
|
||||
p = strings.ReplaceAll(p, " ", "")
|
||||
p = strings.ReplaceAll(p, "-", "")
|
||||
p = strings.TrimPrefix(p, "+")
|
||||
return p
|
||||
}
|
||||
|
||||
p1 := normalize(phone1)
|
||||
p2 := normalize(phone2)
|
||||
|
||||
if p1 == p2 {
|
||||
return true
|
||||
}
|
||||
|
||||
// 如果长度相差不超过 3 且较长的以较短的结尾,则认为是相同号码(忽略最多 3 位国家区号差异)
|
||||
if len(p1) > len(p2) {
|
||||
return len(p1)-len(p2) <= 3 && strings.HasSuffix(p1, p2)
|
||||
}
|
||||
return len(p2)-len(p1) <= 3 && strings.HasSuffix(p2, p1)
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Email string `json:"email,omitempty"`
|
||||
Phone string `json:"phone,omitempty"`
|
||||
|
||||
67
pkg/flashduty/sync_user_test.go
Normal file
67
pkg/flashduty/sync_user_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package flashduty
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPhoneIsSame(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
phone1 string
|
||||
phone2 string
|
||||
same bool
|
||||
}{
|
||||
{
|
||||
name: "blank",
|
||||
phone1: "",
|
||||
phone2: "",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 prefix",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 with spaces and hyphens",
|
||||
phone1: "+86 138-1234-5678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "USA +1 prefix",
|
||||
phone1: "+1 234-567-8900",
|
||||
phone2: "2345678900",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "UK +44 prefix",
|
||||
phone1: "+442078765432",
|
||||
phone2: "2078765432",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "India +91 prefix",
|
||||
phone1: "+919876543210",
|
||||
phone2: "9876543210",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Germany +49 prefix",
|
||||
phone1: "+4915123456789",
|
||||
phone2: "15123456789",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Different numbers",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345679",
|
||||
same: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := PhoneIsSame(tt.phone1, tt.phone2); got != tt.same {
|
||||
t.Errorf("%s: expected %v, got %v", tt.name, tt.same, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,6 +77,7 @@ var I18N = `{
|
||||
"event tag not match tag filter": "事件标签不匹配标签过滤器",
|
||||
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
|
||||
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
|
||||
"Alert history events deletion started": "告警历史事件删除任务已启动",
|
||||
|
||||
"Infrastructure": "基础设施",
|
||||
"Host - View": "机器 - 查看",
|
||||
@@ -267,6 +268,7 @@ var I18N = `{
|
||||
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
|
||||
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
|
||||
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
|
||||
"Alert history events deletion started": "告警歷史事件刪除任務已啟動",
|
||||
|
||||
"Infrastructure": "基礎設施",
|
||||
"Host - View": "機器 - 查看",
|
||||
@@ -454,6 +456,7 @@ var I18N = `{
|
||||
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
|
||||
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
|
||||
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
|
||||
"Alert history events deletion started": "アラート履歴イベントの削除タスクが開始されました",
|
||||
|
||||
"Infrastructure": "インフラストラクチャ",
|
||||
"Host - View": "機器 - 閲覧",
|
||||
@@ -641,6 +644,7 @@ var I18N = `{
|
||||
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
|
||||
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
|
||||
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
|
||||
"Alert history events deletion started": "Задача удаления исторических событий оповещений запущена",
|
||||
|
||||
"Infrastructure": "Инфраструктура",
|
||||
"Host - View": "Хост - Просмотр",
|
||||
|
||||
@@ -71,6 +71,24 @@ func ValueFormatter(unit string, decimals int, value float64) FormattedValue {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle positive and negative infinity
|
||||
if math.IsInf(value, 1) {
|
||||
return FormattedValue{
|
||||
Value: 9999999999,
|
||||
Unit: "",
|
||||
Text: "+Inf",
|
||||
Stat: 9999999999,
|
||||
}
|
||||
}
|
||||
if math.IsInf(value, -1) {
|
||||
return FormattedValue{
|
||||
Value: -9999999999,
|
||||
Unit: "",
|
||||
Text: "-Inf",
|
||||
Stat: -9999999999,
|
||||
}
|
||||
}
|
||||
|
||||
// 处理时间单位
|
||||
switch unit {
|
||||
case "none":
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
)
|
||||
@@ -117,6 +118,11 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
var exists []string
|
||||
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
@@ -133,16 +139,34 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
for i := 0; i < len(exists); i++ {
|
||||
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
|
||||
if err != nil {
|
||||
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
return fmt.Errorf("redis is nil")
|
||||
|
||||
@@ -18,6 +18,9 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
|
||||
BusiGroupLabelKey string
|
||||
IdentMetrics []string
|
||||
@@ -49,6 +52,7 @@ type WriterOptions struct {
|
||||
Url string
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
AsyncWrite bool // 如果有多个转发 writer,对应不重要的 writer,可以设置为 true,异步转发提供转发效率
|
||||
|
||||
Timeout int64
|
||||
DialTimeout int64
|
||||
@@ -124,6 +128,14 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
if p.BusiGroupLabelKey == "" {
|
||||
p.BusiGroupLabelKey = "busigroup"
|
||||
}
|
||||
|
||||
@@ -105,6 +105,17 @@ var (
|
||||
},
|
||||
[]string{"operation", "status"},
|
||||
)
|
||||
|
||||
DBOperationLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "db_operation_latency_seconds",
|
||||
Help: "Histogram of latencies for DB operations",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -121,5 +132,6 @@ func init() {
|
||||
GaugeSampleQueueSize,
|
||||
CounterPushQueueOverLimitTotal,
|
||||
RedisOperationLatency,
|
||||
DBOperationLatency,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
package json
|
||||
|
||||
import (
|
||||
"math"
|
||||
"unsafe"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// 为了处理prom数据中的NaN值
|
||||
jsoniter.RegisterTypeEncoderFunc("float64", func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
|
||||
f := *(*float64)(ptr)
|
||||
if math.IsNaN(f) {
|
||||
stream.WriteString("null")
|
||||
} else {
|
||||
stream.WriteFloat64(f)
|
||||
}
|
||||
}, func(ptr unsafe.Pointer) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func MarshalWithCustomFloat(items interface{}) ([]byte, error) {
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
return json.Marshal(items)
|
||||
}
|
||||
@@ -3,7 +3,9 @@ package writer
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -15,7 +17,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/kafka"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer/json"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
@@ -52,8 +53,49 @@ func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, e
|
||||
|
||||
return proto.Marshal(req)
|
||||
}
|
||||
// 如果是 json 格式,将 NaN 值的数据丢弃掉
|
||||
return json.Marshal(filterNaNSamples(items))
|
||||
}
|
||||
|
||||
return json.MarshalWithCustomFloat(items)
|
||||
func filterNaNSamples(items []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
// 早期检查:如果没有NaN值,直接返回原始数据
|
||||
hasNaN := false
|
||||
for i := range items {
|
||||
for j := range items[i].Samples {
|
||||
if math.IsNaN(items[i].Samples[j].Value) {
|
||||
hasNaN = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasNaN {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasNaN {
|
||||
return items
|
||||
}
|
||||
|
||||
// 有NaN值时进行过滤,原地修改以减少内存分配
|
||||
for i := range items {
|
||||
samples := items[i].Samples
|
||||
validCount := 0
|
||||
|
||||
// 原地过滤 samples,避免额外的内存分配
|
||||
for j := range samples {
|
||||
if !math.IsNaN(samples[j].Value) {
|
||||
if validCount != j {
|
||||
samples[validCount] = samples[j]
|
||||
}
|
||||
validCount++
|
||||
}
|
||||
}
|
||||
|
||||
// 保留所有时间序列,即使没有有效样本(此时Samples为空)
|
||||
items[i].Samples = samples[:validCount]
|
||||
}
|
||||
|
||||
return items
|
||||
}
|
||||
|
||||
func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
|
||||
@@ -157,10 +199,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
}
|
||||
|
||||
type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
PushConcurrency atomic.Int64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -210,6 +253,31 @@ func (ws *WritersType) Put(name string, writer Writer) {
|
||||
ws.backends[name] = writer
|
||||
}
|
||||
|
||||
func (ws *WritersType) isCriticalBackend(key string) bool {
|
||||
backend, exists := ws.backends[key]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// 使用类型断言判断
|
||||
switch backend.(type) {
|
||||
case WriterType:
|
||||
if backend.(WriterType).Opts.AsyncWrite {
|
||||
return false
|
||||
}
|
||||
|
||||
// HTTP Writer 作为关键后端
|
||||
return true
|
||||
case KafkaWriterType:
|
||||
// Kafka Writer 作为非关键后端
|
||||
return false
|
||||
default:
|
||||
// 未知类型,保守起见作为关键后端
|
||||
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) CleanExpQueue() {
|
||||
for {
|
||||
ws.Lock()
|
||||
@@ -278,12 +346,64 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
continue
|
||||
}
|
||||
for key := range ws.backends {
|
||||
ws.backends[key].Write(key, series)
|
||||
|
||||
if ws.isCriticalBackend(key) {
|
||||
ws.backends[key].Write(key, series)
|
||||
} else {
|
||||
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
|
||||
ws.writeToNonCriticalBackend(key, series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
|
||||
// 原子性地检查并增加并发数
|
||||
currentConcurrency := ws.PushConcurrency.Add(1)
|
||||
|
||||
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
|
||||
// 超过限制,立即减少计数并丢弃
|
||||
ws.PushConcurrency.Add(-1)
|
||||
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
|
||||
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
|
||||
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
|
||||
return
|
||||
}
|
||||
|
||||
// 深拷贝数据,确保并发安全
|
||||
seriesCopy := ws.deepCopySeries(series)
|
||||
|
||||
// 启动goroutine处理
|
||||
go func(backendKey string, data []prompb.TimeSeries) {
|
||||
defer func() {
|
||||
ws.PushConcurrency.Add(-1)
|
||||
if r := recover(); r != nil {
|
||||
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
|
||||
}
|
||||
}()
|
||||
|
||||
ws.backends[backendKey].Write(backendKey, data)
|
||||
}(key, seriesCopy)
|
||||
}
|
||||
|
||||
// 完整的深拷贝方法
|
||||
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
seriesCopy := make([]prompb.TimeSeries, len(series))
|
||||
|
||||
for i := range series {
|
||||
seriesCopy[i] = series[i]
|
||||
|
||||
if len(series[i].Samples) > 0 {
|
||||
samples := make([]prompb.Sample, len(series[i].Samples))
|
||||
copy(samples, series[i].Samples)
|
||||
seriesCopy[i].Samples = samples
|
||||
}
|
||||
}
|
||||
|
||||
return seriesCopy
|
||||
}
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
ws.AllQueueLen.Store(int64(0))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user