Compare commits

..

1 Commits

Author SHA1 Message Date
Ulric Qin
93ff325f72 fix execution of notify script 2025-07-06 08:38:45 +08:00
36 changed files with 214 additions and 669 deletions

1
.gitignore vendored
View File

@@ -58,7 +58,6 @@ _test
.idea
.index
.vscode
.claude
.DS_Store
.cache-loader
.payload

1
.issue
View File

@@ -1 +0,0 @@
/Users/ning/qinyening.com/issue/n9e

View File

@@ -595,10 +595,6 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv
return
}
if !sub.MatchCate(event.Cate) {
return
}
if !common.MatchTags(event.TagsMap, sub.ITags) {
return
}

View File

@@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"text/template"
"time"
@@ -144,11 +143,7 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
// 合并自定义参数
for k, v := range c.CustomParams {
converted, err := convertCustomParam(v)
if err != nil {
return "", fmt.Errorf("failed to convert custom param %s: %v", k, err)
}
reqParams[k] = converted
reqParams[k] = v
}
// 序列化请求体
@@ -201,44 +196,3 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
return chatResp.Choices[0].Message.Content, nil
}
// convertCustomParam 将前端传入的参数转换为正确的类型
func convertCustomParam(value interface{}) (interface{}, error) {
if value == nil {
return nil, nil
}
// 如果是字符串,尝试转换为其他类型
if str, ok := value.(string); ok {
// 尝试转换为数字
if f, err := strconv.ParseFloat(str, 64); err == nil {
// 检查是否为整数
if f == float64(int64(f)) {
return int64(f), nil
}
return f, nil
}
// 尝试转换为布尔值
if b, err := strconv.ParseBool(str); err == nil {
return b, nil
}
// 尝试解析为JSON数组
if strings.HasPrefix(strings.TrimSpace(str), "[") {
var arr []interface{}
if err := json.Unmarshal([]byte(str), &arr); err == nil {
return arr, nil
}
}
// 尝试解析为JSON对象
if strings.HasPrefix(strings.TrimSpace(str), "{") {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(str), &obj); err == nil {
return obj, nil
}
}
}
return value, nil
}

View File

@@ -67,73 +67,3 @@ func TestAISummaryConfig_Process(t *testing.T) {
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
}
func TestConvertCustomParam(t *testing.T) {
tests := []struct {
name string
input interface{}
expected interface{}
hasError bool
}{
{
name: "nil value",
input: nil,
expected: nil,
hasError: false,
},
{
name: "string number to int64",
input: "123",
expected: int64(123),
hasError: false,
},
{
name: "string float to float64",
input: "123.45",
expected: 123.45,
hasError: false,
},
{
name: "string boolean to bool",
input: "true",
expected: true,
hasError: false,
},
{
name: "string false to bool",
input: "false",
expected: false,
hasError: false,
},
{
name: "JSON array string to slice",
input: `["a", "b", "c"]`,
expected: []interface{}{"a", "b", "c"},
hasError: false,
},
{
name: "JSON object string to map",
input: `{"key": "value", "num": 123}`,
expected: map[string]interface{}{"key": "value", "num": float64(123)},
hasError: false,
},
{
name: "plain string remains string",
input: "hello world",
expected: "hello world",
hasError: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
converted, err := convertCustomParam(test.input)
if test.hasError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.expected, converted)
})
}
}

View File

@@ -187,7 +187,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
CreatedBy: SYSTEM,
UpdatedBy: SYSTEM,
}
BuiltinPayloadInFile.AddBuiltinPayload(&builtinAlert)
BuiltinPayloadInFile.addBuiltinPayload(&builtinAlert)
}
}
@@ -245,7 +245,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
CreatedBy: SYSTEM,
UpdatedBy: SYSTEM,
}
BuiltinPayloadInFile.AddBuiltinPayload(&builtinDashboard)
BuiltinPayloadInFile.addBuiltinPayload(&builtinDashboard)
}
} else if err != nil {
logger.Warningf("read builtin component dash dir fail %s %v", component.Ident, err)
@@ -314,7 +314,7 @@ func NewBuiltinPayloadInFileType() *BuiltinPayloadInFileType {
}
}
func (b *BuiltinPayloadInFileType) AddBuiltinPayload(bp *models.BuiltinPayload) {
func (b *BuiltinPayloadInFileType) addBuiltinPayload(bp *models.BuiltinPayload) {
if _, exists := b.Data[bp.ComponentID]; !exists {
b.Data[bp.ComponentID] = make(map[string]map[string][]*models.BuiltinPayload)
}
@@ -390,10 +390,9 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
return payloads
}
queryLower := strings.ToLower(query)
var filtered []*models.BuiltinPayload
for _, p := range payloads {
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
filtered = append(filtered, p)
}
}

View File

@@ -465,8 +465,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Bomb(http.StatusBadRequest, "fields empty")
}
updateBy := c.MustGet("username").(string)
updateAt := time.Now().Unix()
f.Fields["update_by"] = c.MustGet("username").(string)
f.Fields["update_at"] = time.Now().Unix()
for i := 0; i < len(f.Ids); i++ {
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
@@ -483,6 +483,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(originRule)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
continue
}
}
@@ -495,6 +496,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -507,6 +509,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
b, err := json.Marshal(ar.AnnotationsJSON)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
continue
}
}
@@ -516,6 +519,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
callback := callbacks.(string)
if !strings.Contains(ar.Callbacks, callback) {
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
continue
}
}
}
@@ -525,6 +529,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
if callbacks, has := f.Fields["callbacks"]; has {
callback := callbacks.(string)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
continue
}
}
@@ -534,6 +539,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
bytes, err := json.Marshal(datasourceQueries)
ginx.Dangerous(err)
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
continue
}
}
@@ -549,12 +555,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
}
}
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
"update_by": updateBy,
"update_at": updateAt,
}))
}
ginx.NewRender(c).Message(nil)

View File

@@ -288,7 +288,6 @@ func (rt *Router) alertSubscribePut(c *gin.Context) {
"busi_groups",
"note",
"notify_rule_ids",
"notify_version",
))
}

View File

@@ -83,34 +83,7 @@ func (rt *Router) datasourceBriefs(c *gin.Context) {
dss = rt.DatasourceCache.DatasourceFilter(dss, user)
}
// 实现搜索过滤逻辑
query := ginx.QueryStr(c, "query", "")
var intersection []*models.Datasource
if query != "" {
keywords := strings.Fields(strings.ToLower(query))
if len(keywords) > 0 {
// 将 PluginType 和 name 拼成一个字符串来搜索
for _, ds := range dss {
searchStr := strings.ToLower(ds.PluginType + " " + ds.Name + " ")
matchedCount := 0
// 检查是否所有关键词都匹配(交集逻辑)
for _, keyword := range keywords {
if strings.Contains(searchStr, keyword) {
matchedCount++
}
}
// 只有当所有关键词都匹配时才添加到结果中
if matchedCount == len(keywords) {
intersection = append(intersection, ds)
}
}
}
} else {
// 如果没有查询条件,返回所有数据源
intersection = dss
}
ginx.NewRender(c).Data(intersection, err)
ginx.NewRender(c).Data(dss, err)
}
func (rt *Router) datasourceUpsert(c *gin.Context) {

View File

@@ -149,12 +149,6 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
f.Fields["datasource_queries"] = string(bytes)
}
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
bytes, err := json.Marshal(datasourceIds)
ginx.Dangerous(err)
f.Fields["datasource_ids"] = string(bytes)
}
for i := 0; i < len(f.Ids); i++ {
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
ginx.Dangerous(err)

View File

@@ -10,20 +10,12 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
)
type FixedField string
const (
FieldIndex FixedField = "_index"
FieldId FixedField = "_id"
)
type Query struct {
@@ -45,18 +37,6 @@ type Query struct {
Timeout int `json:"timeout" mapstructure:"timeout"`
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
}
type SortField struct {
Field string `json:"field" mapstructure:"field"`
Ascending bool `json:"ascending" mapstructure:"ascending"`
}
type SearchAfter struct {
SortFields []SortField `json:"sort_fields" mapstructure:"sort_fields"` // 指定排序字段, 一般是timestamp:desc, _index:asc, _id:asc 三者组合,构成唯一的排序字段
SearchAfter []interface{} `json:"search_after" mapstructure:"search_after"` // 指定排序字段的搜索值搜索值必须和sort_fields的顺序一致为上一次查询的最后一条日志的值
}
type MetricAggr struct {
@@ -291,10 +271,7 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
}
for i := 0; i < len(eventTags); i++ {
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
}
if len(eventTags) > 0 {
@@ -318,10 +295,7 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
}
for i := 0; i < len(eventTags); i++ {
arr := strings.SplitN(eventTags[i], "=", 2)
if len(arr) == 2 {
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
}
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
}
if len(eventTags) > 0 {
@@ -631,27 +605,14 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
if param.MaxShard < 1 {
param.MaxShard = maxShard
}
// from+size 分页方式获取日志受es 的max_result_window参数限制默认最多返回1w条日志, 可以使用search_after方式获取更多日志
source := elastic.NewSearchSource().
TrackTotalHits(true).
Query(queryString).
Size(param.Limit)
// 是否使用search_after方式
if param.SearchAfter != nil {
// 设置默认排序字段
if len(param.SearchAfter.SortFields) == 0 {
source = source.Sort(param.DateField, param.Ascending).Sort(string(FieldIndex), true).Sort(string(FieldId), true)
} else {
for _, field := range param.SearchAfter.SortFields {
source = source.Sort(field.Field, field.Ascending)
}
}
if len(param.SearchAfter.SearchAfter) > 0 {
source = source.SearchAfter(param.SearchAfter.SearchAfter...)
}
} else {
source = source.From(param.P).Sort(param.DateField, param.Ascending)
}
From(param.P).
Size(param.Limit).
Sort(param.DateField, param.Ascending)
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
if err != nil {
logger.Warningf("query data error:%v", err)

View File

@@ -8,7 +8,6 @@ import (
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
@@ -28,16 +27,11 @@ type Doris struct {
}
type QueryParam struct {
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
Limit int `json:"limit" mapstructure:"limit"`
From int64 `json:"from" mapstructure:"from"`
To int64 `json:"to" mapstructure:"to"`
TimeField string `json:"time_field" mapstructure:"time_field"`
TimeFormat string `json:"time_format" mapstructure:"time_format"`
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
}
func (d *Doris) InitClient() error {
@@ -72,7 +66,7 @@ func (d *Doris) Validate(ctx context.Context) error {
func (d *Doris) Equal(p datasource.Datasource) bool {
newest, ok := p.(*Doris)
if !ok {
logger.Errorf("unexpected plugin type, expected is doris")
logger.Errorf("unexpected plugin type, expected is ck")
return false
}
@@ -180,14 +174,6 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
return nil, 0, err
}
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
if err != nil {
return nil, 0, err
}
}
items, err := d.QueryLogs(ctx, &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
@@ -201,7 +187,7 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
logs = append(logs, items[i])
}
return logs, int64(len(logs)), nil
return logs, 0, nil
}
func (d *Doris) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {

View File

@@ -23,7 +23,7 @@ const (
)
var (
regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
)
func init() {
@@ -162,7 +162,6 @@ func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models
return nil, err
}
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
@@ -230,7 +229,6 @@ func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interfa
p.Shards[0].DB = db
}
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
@@ -282,17 +280,7 @@ func parseDBName(sql string) (db string, err error) {
if len(matches) != 4 {
return "", fmt.Errorf("no valid table name in format database.schema.table found")
}
return strings.Trim(matches[1], `"`), nil
}
// formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
// 在pgsql中大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的所以这里转为 "dbname".scheme."tabname"
func formatSQLDatabaseNameWithRegex(sql string) string {
// 匹配 from dbname.scheme.table_name 的模式
// 使用捕获组来精确匹配数据库名称确保后面跟着scheme和table
re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
return matches[1], nil
}
func extractColumns(sql string) ([]string, error) {

View File

@@ -956,7 +956,7 @@ CREATE TABLE notify_rule (
id bigserial PRIMARY KEY,
name varchar(255) NOT NULL,
description text,
enable boolean DEFAULT false,
enable smallint NOT NULL DEFAULT 0,
user_group_ids varchar(255) NOT NULL DEFAULT '',
notify_configs text,
pipeline_configs text,
@@ -971,7 +971,7 @@ CREATE TABLE notify_channel (
name varchar(255) NOT NULL,
ident varchar(255) NOT NULL,
description text,
enable boolean DEFAULT false,
enable smallint NOT NULL DEFAULT 0,
param_config text,
request_type varchar(50) NOT NULL,
request_config text,

View File

@@ -22,8 +22,6 @@ import (
var FromAPIHook func()
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
func Init(ctx *ctx.Context, fromAPI bool) {
go getDatasourcesFromDBLoop(ctx, fromAPI)
}
@@ -102,10 +100,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
if DatasourceProcessHook != nil {
dss = DatasourceProcessHook(dss)
}
PutDatasources(dss)
} else {
FromAPIHook()
@@ -169,7 +163,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
if err != nil {
logger.Debugf("get plugin:%+v fail: %v", item, err)
logger.Warningf("get plugin:%+v fail: %v", item, err)
continue
}

View File

@@ -129,7 +129,9 @@ func (c *Clickhouse) QueryRows(ctx context.Context, query string) (*sql.Rows, er
// ShowDatabases lists all databases in Clickhouse
func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
res := make([]string, 0)
var (
res []string
)
rows, err := c.QueryRows(ctx, ShowDatabases)
if err != nil {
@@ -149,7 +151,9 @@ func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
// ShowTables lists all tables in a given database
func (c *Clickhouse) ShowTables(ctx context.Context, database string) ([]string, error) {
res := make([]string, 0)
var (
res []string
)
showTables := fmt.Sprintf(ShowTables, database)
rows, err := c.QueryRows(ctx, showTables)

View File

@@ -20,8 +20,8 @@ import (
// Doris struct to hold connection details and the connection object
type Doris struct {
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // be node
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe node
User string `json:"doris.user" mapstructure:"doris.user"` //
Password string `json:"doris.password" mapstructure:"doris.password"` //
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
@@ -138,7 +138,7 @@ func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) {
}
defer rows.Close()
databases := make([]string, 0)
var databases []string
for rows.Next() {
var dbName string
if err := rows.Scan(&dbName); err != nil {
@@ -201,7 +201,7 @@ func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]strin
}
// 将 map 转换为切片
resources := make([]string, 0)
var resources []string
for name := range distinctName {
resources = append(resources, name)
}
@@ -226,7 +226,7 @@ func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, erro
}
defer rows.Close()
tables := make([]string, 0)
var tables []string
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {

View File

@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
}()
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
db, err = sqlbase.NewDB(
return sqlbase.NewDB(
ctx,
mysql.Open(dsn),
shard.MaxIdleConns,
shard.MaxOpenConns,
time.Duration(shard.ConnMaxLifetime)*time.Second,
)
return db, err
}
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {

View File

@@ -48,7 +48,7 @@ func CloseDB(db *gorm.DB) error {
// ShowTables retrieves a list of all tables in the specified database
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
tables := make([]string, 0)
var tables []string
rows, err := db.WithContext(ctx).Raw(query).Rows()
if err != nil {

View File

@@ -122,7 +122,7 @@ func (tc *Tdengine) QueryTable(query string) (APIResponse, error) {
}
func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
databases := make([]string, 0)
var databases []string
data, err := tc.QueryTable("show databases")
if err != nil {
return databases, err
@@ -135,7 +135,7 @@ func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
}
func (tc *Tdengine) ShowTables(ctx context.Context, database string) ([]string, error) {
tables := make([]string, 0)
var tables []string
sql := fmt.Sprintf("show %s", database)
data, err := tc.QueryTable(sql)
if err != nil {

View File

@@ -240,17 +240,17 @@ func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.
go ncc.startNotifyConsumer(chID, queue, quitCh)
}
logger.Debugf("started %d notify consumers for channel %d", concurrency, chID)
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
}
// 启动通知消费者协程
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
logger.Debugf("starting notify consumer for channel %d", channelID)
logger.Infof("starting notify consumer for channel %d", channelID)
for {
select {
case <-quitCh:
logger.Debugf("notify consumer for channel %d stopped", channelID)
logger.Infof("notify consumer for channel %d stopped", channelID)
return
default:
// 从队列中取出任务
@@ -502,11 +502,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
}
// 记录通知详情
if ncc.notifyRecordFunc != nil {
target := strings.Join(m.Mail.GetHeader("To"), ",")
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
}
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
size++
if size >= conf.Batch {

View File

@@ -735,15 +735,6 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
return DB(ctx).Model(ar).Updates(updates).Error
}
if column == "notify_groups" || column == "notify_channels" {
updates := map[string]interface{}{
column: value,
"notify_version": 0,
"notify_rule_ids": []int64{},
}
return DB(ctx).Model(ar).Updates(updates).Error
}
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
}

View File

@@ -116,18 +116,7 @@ func (s *AlertSubscribe) Verify() error {
return errors.New("severities is required")
}
if s.NotifyVersion == 1 {
if len(s.NotifyRuleIds) == 0 {
return errors.New("no notify rules selected")
}
s.UserGroupIds = ""
s.RedefineChannels = 0
s.NewChannels = ""
s.RedefineWebhooks = 0
s.Webhooks = ""
s.RedefineSeverity = 0
s.NewSeverity = 0
if len(s.NotifyRuleIds) > 0 {
return nil
}
@@ -143,8 +132,8 @@ func (s *AlertSubscribe) Verify() error {
}
}
if s.NotifyVersion == 0 {
s.NotifyRuleIds = []int64{}
if s.NotifyVersion == 1 && len(s.NotifyRuleIds) == 0 {
return errors.New("no notify rules selected")
}
return nil
@@ -392,17 +381,6 @@ func (s *AlertSubscribe) MatchProd(prod string) bool {
return s.Prod == prod
}
func (s *AlertSubscribe) MatchCate(cate string) bool {
if s.Cate == "" {
return true
}
if s.Cate == "host" {
return cate == "host"
}
return true
}
func (s *AlertSubscribe) MatchCluster(dsId int64) bool {
// 没有配置数据源, 或者事件不需要关联数据源
// do not match any datasource or event not related to datasource

View File

@@ -13,7 +13,6 @@ import (
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"gorm.io/gorm"
)
type Datasource struct {
@@ -141,7 +140,7 @@ func (ds *Datasource) Update(ctx *ctx.Context, selectField interface{}, selectFi
if ds.UpdatedAt == 0 {
ds.UpdatedAt = time.Now().Unix()
}
return DB(ctx).Model(ds).Session(&gorm.Session{SkipHooks: true}).Select(selectField, selectFields...).Updates(ds).Error
return DB(ctx).Model(ds).Select(selectField, selectFields...).Updates(ds).Error
}
func (ds *Datasource) Add(ctx *ctx.Context) error {

View File

@@ -72,10 +72,8 @@ func MigrateTables(db *gorm.DB) error {
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})
DropUniqueFiledLimit(db, &models.PostgresBuiltinComponent{}, "idx_ident", "idx_ident")
} else {
dts = append(dts, &models.BuiltinComponent{})
DropUniqueFiledLimit(db, &models.BuiltinComponent{}, "idx_ident", "idx_ident")
}
if !db.Migrator().HasColumn(&imodels.TaskSchedulerHealth{}, "scheduler") {
@@ -126,19 +124,11 @@ func MigrateTables(db *gorm.DB) error {
DropUniqueFiledLimit(db, &Configs{}, "ckey", "configs_ckey_key")
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
// 添加自增主键 ii 列到指定表
// addAutoIncrementPrimaryKey(db)
InsertPermPoints(db)
return nil
}
func DropUniqueFiledLimit(db *gorm.DB, dst interface{}, uniqueFiled string, pgUniqueFiled string) { // UNIQUE KEY (`ckey`)
// 先检查表是否存在,如果不存在则直接返回
if !db.Migrator().HasTable(dst) {
return
}
if db.Migrator().HasIndex(dst, uniqueFiled) {
err := db.Migrator().DropIndex(dst, uniqueFiled) //mysql DROP INDEX
if err != nil {
@@ -169,6 +159,110 @@ func columnHasIndex(db *gorm.DB, dst interface{}, indexColumn string) bool {
return false
}
func InsertPermPoints(db *gorm.DB) {
var ops []models.RoleOperation
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/alert-mutes/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/log/index-patterns",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/help/variable-configs",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/ibex-settings",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-templates/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/notification-rules/del",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/add",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/put",
})
ops = append(ops, models.RoleOperation{
RoleName: "Standard",
Operation: "/event-pipelines/del",
})
for _, op := range ops {
var count int64
session := db.Session(&gorm.Session{}).Model(&models.RoleOperation{})
err := session.Where("operation = ? AND role_name = ?", op.Operation, op.RoleName).Count(&count).Error
if err != nil {
logger.Errorf("check role operation exists failed, %v", err)
continue
}
if count > 0 {
continue
}
err = session.Create(&op).Error
if err != nil {
logger.Errorf("insert role operation failed, %v", err)
}
}
}
type AlertRule struct {
ExtraConfig string `gorm:"type:text;column:extra_config"`
CronPattern string `gorm:"type:varchar(64);column:cron_pattern"`
@@ -371,43 +465,3 @@ type NotifyChannelConfig struct {
func (c *NotifyChannelConfig) TableName() string {
return "notify_channel"
}
// addAutoIncrementPrimaryKey 为指定表添加自增主键 ii 列
func addAutoIncrementPrimaryKey(db *gorm.DB) {
// 只在 MySQL 数据库上执行这些操作
switch db.Dialector.(type) {
case *mysql.Dialector:
// 为 task_scheduler_health 表添加 ii 列作为主键
if db.Migrator().HasTable("task_scheduler_health") && !db.Migrator().HasColumn("task_scheduler_health", "ii") {
err := db.Exec("ALTER TABLE `task_scheduler_health` ADD `ii` INT PRIMARY KEY AUTO_INCREMENT").Error
if err != nil {
logger.Errorf("failed to add ii column to task_scheduler_health: %v", err)
}
}
// 为 board_busigroup 表重构主键
if db.Migrator().HasTable("board_busigroup") && !db.Migrator().HasColumn("board_busigroup", "ii") {
// 使用 SHOW KEYS 检查主键是否存在(权限要求更低)
var keyInfo []map[string]interface{}
err := db.Raw("SHOW KEYS FROM `board_busigroup` WHERE Key_name = 'PRIMARY'").Find(&keyInfo).Error
if err != nil {
logger.Errorf("failed to check primary key for board_busigroup: %v", err)
}
// 只有当主键存在时才删除
if len(keyInfo) > 0 {
err = db.Exec("ALTER TABLE `board_busigroup` DROP PRIMARY KEY").Error
if err != nil {
logger.Errorf("failed to drop primary key from board_busigroup: %v", err)
}
}
// 添加新的自增主键列
err = db.Exec("ALTER TABLE `board_busigroup` ADD COLUMN `ii` INT AUTO_INCREMENT PRIMARY KEY FIRST").Error
if err != nil {
logger.Errorf("failed to add new primary key to board_busigroup: %v", err)
}
}
default:
}
}

View File

@@ -196,7 +196,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
cmd.Stderr = &buf
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
res := buf.String()
@@ -922,6 +922,11 @@ func (ncc *NotifyChannelConfig) ValidateFlashDutyRequestConfig() error {
}
func (ncc *NotifyChannelConfig) Update(ctx *ctx.Context, ref NotifyChannelConfig) error {
// ref.FE2DB()
if ncc.Ident != ref.Ident {
return errors.New("cannot update ident")
}
ref.ID = ncc.ID
ref.CreateAt = ncc.CreateAt
ref.CreateBy = ncc.CreateBy

View File

@@ -8,11 +8,11 @@ import (
type UserToken struct {
Id int64 `json:"id" gorm:"primaryKey"`
Username string `json:"username" gorm:"type:varchar(255); not null; default ''"`
TokenName string `json:"token_name" gorm:"type:varchar(255); not null; default ''"`
Token string `json:"token" gorm:"type:varchar(255); not null; default ''"`
CreateAt int64 `json:"create_at" gorm:"type:bigint; not null; default 0"`
LastUsed int64 `json:"last_used" gorm:"type:bigint; not null; default 0"`
Username string `json:"username" gorm:"type:varchar(255) not null default ''"`
TokenName string `json:"token_name" gorm:"type:varchar(255) not null default ''"`
Token string `json:"token" gorm:"type:varchar(255) not null default ''"`
CreateAt int64 `json:"create_at" gorm:"type:bigint not null default 0"`
LastUsed int64 `json:"last_used" gorm:"type:bigint not null default 0"`
}
func (UserToken) TableName() string {

View File

@@ -89,7 +89,7 @@ func diffMap(m1, m2 map[int64]*models.User) []models.User {
func updateUser(appKey string, m1, m2 map[int64]*models.User) {
for i := range m1 {
if _, ok := m2[i]; ok {
if m1[i].Email != m2[i].Email || !PhoneIsSame(m1[i].Phone, m2[i].Phone) || m1[i].Username != m2[i].Username {
if m1[i].Email != m2[i].Email || m1[i].Phone != m2[i].Phone || m1[i].Username != m2[i].Username {
var flashdutyUser User
flashdutyUser = User{
@@ -110,30 +110,6 @@ func updateUser(appKey string, m1, m2 map[int64]*models.User) {
}
}
func PhoneIsSame(phone1, phone2 string) bool {
// 兼容不同国家/地区前缀,例如 +86、+1、+44 等,以及包含空格或短横线的格式
normalize := func(p string) string {
p = strings.TrimSpace(p)
p = strings.ReplaceAll(p, " ", "")
p = strings.ReplaceAll(p, "-", "")
p = strings.TrimPrefix(p, "+")
return p
}
p1 := normalize(phone1)
p2 := normalize(phone2)
if p1 == p2 {
return true
}
// 如果长度相差不超过 3 且较长的以较短的结尾,则认为是相同号码(忽略最多 3 位国家区号差异)
if len(p1) > len(p2) {
return len(p1)-len(p2) <= 3 && strings.HasSuffix(p1, p2)
}
return len(p2)-len(p1) <= 3 && strings.HasSuffix(p2, p1)
}
type User struct {
Email string `json:"email,omitempty"`
Phone string `json:"phone,omitempty"`

View File

@@ -1,67 +0,0 @@
package flashduty
import "testing"
func TestPhoneIsSame(t *testing.T) {
tests := []struct {
name string
phone1 string
phone2 string
same bool
}{
{
name: "blank",
phone1: "",
phone2: "",
same: true,
},
{
name: "China +86 prefix",
phone1: "+8613812345678",
phone2: "13812345678",
same: true,
},
{
name: "China +86 with spaces and hyphens",
phone1: "+86 138-1234-5678",
phone2: "13812345678",
same: true,
},
{
name: "USA +1 prefix",
phone1: "+1 234-567-8900",
phone2: "2345678900",
same: true,
},
{
name: "UK +44 prefix",
phone1: "+442078765432",
phone2: "2078765432",
same: true,
},
{
name: "India +91 prefix",
phone1: "+919876543210",
phone2: "9876543210",
same: true,
},
{
name: "Germany +49 prefix",
phone1: "+4915123456789",
phone2: "15123456789",
same: true,
},
{
name: "Different numbers",
phone1: "+8613812345678",
phone2: "13812345679",
same: false,
},
}
for _, tt := range tests {
if got := PhoneIsSame(tt.phone1, tt.phone2); got != tt.same {
t.Errorf("%s: expected %v, got %v", tt.name, tt.same, got)
}
}
}

View File

@@ -77,7 +77,6 @@ var I18N = `{
"event tag not match tag filter": "事件标签不匹配标签过滤器",
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
"Alert history events deletion started": "告警历史事件删除任务已启动",
"Infrastructure": "基础设施",
"Host - View": "机器 - 查看",
@@ -268,7 +267,6 @@ var I18N = `{
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
"Alert history events deletion started": "告警歷史事件刪除任務已啟動",
"Infrastructure": "基礎設施",
"Host - View": "機器 - 查看",
@@ -456,7 +454,6 @@ var I18N = `{
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
"Alert history events deletion started": "アラート履歴イベントの削除タスクが開始されました",
"Infrastructure": "インフラストラクチャ",
"Host - View": "機器 - 閲覧",
@@ -644,7 +641,6 @@ var I18N = `{
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
"Alert history events deletion started": "Задача удаления исторических событий оповещений запущена",
"Infrastructure": "Инфраструктура",
"Host - View": "Хост - Просмотр",

View File

@@ -71,24 +71,6 @@ func ValueFormatter(unit string, decimals int, value float64) FormattedValue {
}
}
// Handle positive and negative infinity
if math.IsInf(value, 1) {
return FormattedValue{
Value: 9999999999,
Unit: "",
Text: "+Inf",
Stat: 9999999999,
}
}
if math.IsInf(value, -1) {
return FormattedValue{
Value: -9999999999,
Unit: "",
Text: "-Inf",
Stat: -9999999999,
}
}
// 处理时间单位
switch unit {
case "none":

View File

@@ -13,7 +13,6 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
)
@@ -118,11 +117,6 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
return err
}
if s.configs.UpdateDBTargetTimestampDisable {
// 如果 mysql 压力太大,关闭更新 db 的操作
return nil
}
// there are some idents not found in db, so insert them
var exists []string
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
@@ -139,34 +133,16 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
}
// 从批量更新一批机器的时间戳改成逐台更新是为了避免批量更新时mysql的锁竞争问题
start := time.Now()
duration := time.Since(start).Seconds()
if len(exists) > 0 {
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
wg := sync.WaitGroup{}
for i := 0; i < len(exists); i++ {
sema.Acquire()
wg.Add(1)
go func(ident string) {
defer sema.Release()
defer wg.Done()
s.updateDBTargetTs(ident, now)
}(exists[i])
for i := 0; i < len(exists); i++ {
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
if err != nil {
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
}
wg.Wait()
}
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
return nil
}
func (s *Set) updateDBTargetTs(ident string, now int64) {
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
if err != nil {
logger.Error("update_target: failed to update target:", ident, "error:", err)
}
}
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
return fmt.Errorf("redis is nil")

View File

@@ -18,9 +18,6 @@ type Pushgw struct {
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
UpdateDBTargetConcurrency int
UpdateDBTargetTimestampDisable bool
PushConcurrency int
BusiGroupLabelKey string
IdentMetrics []string
@@ -52,7 +49,6 @@ type WriterOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string
AsyncWrite bool // 如果有多个转发 writer对应不重要的 writer可以设置为 true异步转发提供转发效率
Timeout int64
DialTimeout int64
@@ -128,14 +124,6 @@ func (p *Pushgw) PreCheck() {
p.UpdateTargetBatchSize = 20
}
if p.UpdateDBTargetConcurrency <= 0 {
p.UpdateDBTargetConcurrency = 16
}
if p.PushConcurrency <= 0 {
p.PushConcurrency = 16
}
if p.BusiGroupLabelKey == "" {
p.BusiGroupLabelKey = "busigroup"
}

View File

@@ -105,17 +105,6 @@ var (
},
[]string{"operation", "status"},
)
DBOperationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "db_operation_latency_seconds",
Help: "Histogram of latencies for DB operations",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
},
[]string{"operation"},
)
)
func init() {
@@ -132,6 +121,5 @@ func init() {
GaugeSampleQueueSize,
CounterPushQueueOverLimitTotal,
RedisOperationLatency,
DBOperationLatency,
)
}

View File

@@ -0,0 +1,27 @@
package json
import (
"math"
"unsafe"
jsoniter "github.com/json-iterator/go"
)
func init() {
// 为了处理prom数据中的NaN值
jsoniter.RegisterTypeEncoderFunc("float64", func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
f := *(*float64)(ptr)
if math.IsNaN(f) {
stream.WriteString("null")
} else {
stream.WriteFloat64(f)
}
}, func(ptr unsafe.Pointer) bool {
return true
})
}
func MarshalWithCustomFloat(items interface{}) ([]byte, error) {
var json = jsoniter.ConfigCompatibleWithStandardLibrary
return json.Marshal(items)
}

View File

@@ -3,9 +3,7 @@ package writer
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"strings"
"sync"
@@ -17,6 +15,7 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/kafka"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/pushgw/writer/json"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
@@ -53,49 +52,8 @@ func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, e
return proto.Marshal(req)
}
// 如果是 json 格式,将 NaN 值的数据丢弃掉
return json.Marshal(filterNaNSamples(items))
}
func filterNaNSamples(items []prompb.TimeSeries) []prompb.TimeSeries {
// 早期检查如果没有NaN值直接返回原始数据
hasNaN := false
for i := range items {
for j := range items[i].Samples {
if math.IsNaN(items[i].Samples[j].Value) {
hasNaN = true
break
}
}
if hasNaN {
break
}
}
if !hasNaN {
return items
}
// 有NaN值时进行过滤原地修改以减少内存分配
for i := range items {
samples := items[i].Samples
validCount := 0
// 原地过滤 samples避免额外的内存分配
for j := range samples {
if !math.IsNaN(samples[j].Value) {
if validCount != j {
samples[validCount] = samples[j]
}
validCount++
}
}
// 保留所有时间序列即使没有有效样本此时Samples为空
items[i].Samples = samples[:validCount]
}
return items
return json.MarshalWithCustomFloat(items)
}
func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
@@ -199,11 +157,10 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}
type WritersType struct {
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
PushConcurrency atomic.Int64
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
sync.RWMutex
}
@@ -253,31 +210,6 @@ func (ws *WritersType) Put(name string, writer Writer) {
ws.backends[name] = writer
}
func (ws *WritersType) isCriticalBackend(key string) bool {
backend, exists := ws.backends[key]
if !exists {
return false
}
// 使用类型断言判断
switch backend.(type) {
case WriterType:
if backend.(WriterType).Opts.AsyncWrite {
return false
}
// HTTP Writer 作为关键后端
return true
case KafkaWriterType:
// Kafka Writer 作为非关键后端
return false
default:
// 未知类型,保守起见作为关键后端
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
return true
}
}
func (ws *WritersType) CleanExpQueue() {
for {
ws.Lock()
@@ -346,64 +278,12 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
continue
}
for key := range ws.backends {
if ws.isCriticalBackend(key) {
ws.backends[key].Write(key, series)
} else {
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
ws.writeToNonCriticalBackend(key, series)
}
ws.backends[key].Write(key, series)
}
}
}
}
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
// 原子性地检查并增加并发数
currentConcurrency := ws.PushConcurrency.Add(1)
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
// 超过限制,立即减少计数并丢弃
ws.PushConcurrency.Add(-1)
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
return
}
// 深拷贝数据,确保并发安全
seriesCopy := ws.deepCopySeries(series)
// 启动goroutine处理
go func(backendKey string, data []prompb.TimeSeries) {
defer func() {
ws.PushConcurrency.Add(-1)
if r := recover(); r != nil {
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
}
}()
ws.backends[backendKey].Write(backendKey, data)
}(key, seriesCopy)
}
// 完整的深拷贝方法
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
seriesCopy := make([]prompb.TimeSeries, len(series))
for i := range series {
seriesCopy[i] = series[i]
if len(series[i].Samples) > 0 {
samples := make([]prompb.Sample, len(series[i].Samples))
copy(samples, series[i].Samples)
seriesCopy[i].Samples = samples
}
}
return seriesCopy
}
func (ws *WritersType) Init() error {
ws.AllQueueLen.Store(int64(0))