mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 14:38:55 +00:00
Compare commits
1 Commits
release-18
...
fix-exec-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93ff325f72 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -58,7 +58,6 @@ _test
|
||||
.idea
|
||||
.index
|
||||
.vscode
|
||||
.claude
|
||||
.DS_Store
|
||||
.cache-loader
|
||||
.payload
|
||||
|
||||
@@ -595,10 +595,6 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv
|
||||
return
|
||||
}
|
||||
|
||||
if !sub.MatchCate(event.Cate) {
|
||||
return
|
||||
}
|
||||
|
||||
if !common.MatchTags(event.TagsMap, sub.ITags) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
@@ -144,11 +143,7 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
|
||||
|
||||
// 合并自定义参数
|
||||
for k, v := range c.CustomParams {
|
||||
converted, err := convertCustomParam(v)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to convert custom param %s: %v", k, err)
|
||||
}
|
||||
reqParams[k] = converted
|
||||
reqParams[k] = v
|
||||
}
|
||||
|
||||
// 序列化请求体
|
||||
@@ -201,44 +196,3 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
|
||||
|
||||
return chatResp.Choices[0].Message.Content, nil
|
||||
}
|
||||
|
||||
// convertCustomParam 将前端传入的参数转换为正确的类型
|
||||
func convertCustomParam(value interface{}) (interface{}, error) {
|
||||
if value == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// 如果是字符串,尝试转换为其他类型
|
||||
if str, ok := value.(string); ok {
|
||||
// 尝试转换为数字
|
||||
if f, err := strconv.ParseFloat(str, 64); err == nil {
|
||||
// 检查是否为整数
|
||||
if f == float64(int64(f)) {
|
||||
return int64(f), nil
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// 尝试转换为布尔值
|
||||
if b, err := strconv.ParseBool(str); err == nil {
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// 尝试解析为JSON数组
|
||||
if strings.HasPrefix(strings.TrimSpace(str), "[") {
|
||||
var arr []interface{}
|
||||
if err := json.Unmarshal([]byte(str), &arr); err == nil {
|
||||
return arr, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试解析为JSON对象
|
||||
if strings.HasPrefix(strings.TrimSpace(str), "{") {
|
||||
var obj map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(str), &obj); err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
@@ -67,73 +67,3 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
|
||||
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
|
||||
}
|
||||
|
||||
func TestConvertCustomParam(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input interface{}
|
||||
expected interface{}
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "nil value",
|
||||
input: nil,
|
||||
expected: nil,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string number to int64",
|
||||
input: "123",
|
||||
expected: int64(123),
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string float to float64",
|
||||
input: "123.45",
|
||||
expected: 123.45,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string boolean to bool",
|
||||
input: "true",
|
||||
expected: true,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string false to bool",
|
||||
input: "false",
|
||||
expected: false,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "JSON array string to slice",
|
||||
input: `["a", "b", "c"]`,
|
||||
expected: []interface{}{"a", "b", "c"},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "JSON object string to map",
|
||||
input: `{"key": "value", "num": 123}`,
|
||||
expected: map[string]interface{}{"key": "value", "num": float64(123)},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "plain string remains string",
|
||||
input: "hello world",
|
||||
expected: "hello world",
|
||||
hasError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
converted, err := convertCustomParam(test.input)
|
||||
if test.hasError {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expected, converted)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +187,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
|
||||
CreatedBy: SYSTEM,
|
||||
UpdatedBy: SYSTEM,
|
||||
}
|
||||
BuiltinPayloadInFile.AddBuiltinPayload(&builtinAlert)
|
||||
BuiltinPayloadInFile.addBuiltinPayload(&builtinAlert)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -245,7 +245,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
|
||||
CreatedBy: SYSTEM,
|
||||
UpdatedBy: SYSTEM,
|
||||
}
|
||||
BuiltinPayloadInFile.AddBuiltinPayload(&builtinDashboard)
|
||||
BuiltinPayloadInFile.addBuiltinPayload(&builtinDashboard)
|
||||
}
|
||||
} else if err != nil {
|
||||
logger.Warningf("read builtin component dash dir fail %s %v", component.Ident, err)
|
||||
@@ -314,7 +314,7 @@ func NewBuiltinPayloadInFileType() *BuiltinPayloadInFileType {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BuiltinPayloadInFileType) AddBuiltinPayload(bp *models.BuiltinPayload) {
|
||||
func (b *BuiltinPayloadInFileType) addBuiltinPayload(bp *models.BuiltinPayload) {
|
||||
if _, exists := b.Data[bp.ComponentID]; !exists {
|
||||
b.Data[bp.ComponentID] = make(map[string]map[string][]*models.BuiltinPayload)
|
||||
}
|
||||
@@ -390,10 +390,9 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
|
||||
return payloads
|
||||
}
|
||||
|
||||
queryLower := strings.ToLower(query)
|
||||
var filtered []*models.BuiltinPayload
|
||||
for _, p := range payloads {
|
||||
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
|
||||
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,8 +465,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "fields empty")
|
||||
}
|
||||
|
||||
updateBy := c.MustGet("username").(string)
|
||||
updateAt := time.Now().Unix()
|
||||
f.Fields["update_by"] = c.MustGet("username").(string)
|
||||
f.Fields["update_at"] = time.Now().Unix()
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
|
||||
@@ -483,6 +483,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(originRule)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,6 +496,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -507,6 +509,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -516,6 +519,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
callback := callbacks.(string)
|
||||
if !strings.Contains(ar.Callbacks, callback) {
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -525,6 +529,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
if callbacks, has := f.Fields["callbacks"]; has {
|
||||
callback := callbacks.(string)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -534,6 +539,7 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
bytes, err := json.Marshal(datasourceQueries)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -549,12 +555,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
|
||||
}
|
||||
}
|
||||
|
||||
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
|
||||
"update_by": updateBy,
|
||||
"update_at": updateAt,
|
||||
}))
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Message(nil)
|
||||
|
||||
@@ -288,7 +288,6 @@ func (rt *Router) alertSubscribePut(c *gin.Context) {
|
||||
"busi_groups",
|
||||
"note",
|
||||
"notify_rule_ids",
|
||||
"notify_version",
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -83,34 +83,7 @@ func (rt *Router) datasourceBriefs(c *gin.Context) {
|
||||
dss = rt.DatasourceCache.DatasourceFilter(dss, user)
|
||||
}
|
||||
|
||||
// 实现搜索过滤逻辑
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
var intersection []*models.Datasource
|
||||
if query != "" {
|
||||
keywords := strings.Fields(strings.ToLower(query))
|
||||
if len(keywords) > 0 {
|
||||
// 将 PluginType 和 name 拼成一个字符串来搜索
|
||||
for _, ds := range dss {
|
||||
searchStr := strings.ToLower(ds.PluginType + " " + ds.Name + " ")
|
||||
matchedCount := 0
|
||||
// 检查是否所有关键词都匹配(交集逻辑)
|
||||
for _, keyword := range keywords {
|
||||
if strings.Contains(searchStr, keyword) {
|
||||
matchedCount++
|
||||
}
|
||||
}
|
||||
// 只有当所有关键词都匹配时才添加到结果中
|
||||
if matchedCount == len(keywords) {
|
||||
intersection = append(intersection, ds)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 如果没有查询条件,返回所有数据源
|
||||
intersection = dss
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(intersection, err)
|
||||
ginx.NewRender(c).Data(dss, err)
|
||||
}
|
||||
|
||||
func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
|
||||
@@ -149,12 +149,6 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
|
||||
f.Fields["datasource_queries"] = string(bytes)
|
||||
}
|
||||
|
||||
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
|
||||
bytes, err := json.Marshal(datasourceIds)
|
||||
ginx.Dangerous(err)
|
||||
f.Fields["datasource_ids"] = string(bytes)
|
||||
}
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
|
||||
ginx.Dangerous(err)
|
||||
|
||||
@@ -10,20 +10,12 @@ import (
|
||||
|
||||
"github.com/araddon/dateparse"
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
)
|
||||
|
||||
type FixedField string
|
||||
|
||||
const (
|
||||
FieldIndex FixedField = "_index"
|
||||
FieldId FixedField = "_id"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
@@ -45,18 +37,6 @@ type Query struct {
|
||||
|
||||
Timeout int `json:"timeout" mapstructure:"timeout"`
|
||||
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
|
||||
|
||||
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
|
||||
}
|
||||
|
||||
type SortField struct {
|
||||
Field string `json:"field" mapstructure:"field"`
|
||||
Ascending bool `json:"ascending" mapstructure:"ascending"`
|
||||
}
|
||||
|
||||
type SearchAfter struct {
|
||||
SortFields []SortField `json:"sort_fields" mapstructure:"sort_fields"` // 指定排序字段, 一般是timestamp:desc, _index:asc, _id:asc 三者组合,构成唯一的排序字段
|
||||
SearchAfter []interface{} `json:"search_after" mapstructure:"search_after"` // 指定排序字段的搜索值,搜索值必须和sort_fields的顺序一致,为上一次查询的最后一条日志的值
|
||||
}
|
||||
|
||||
type MetricAggr struct {
|
||||
@@ -291,10 +271,7 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
@@ -318,10 +295,7 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
@@ -631,27 +605,14 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
|
||||
if param.MaxShard < 1 {
|
||||
param.MaxShard = maxShard
|
||||
}
|
||||
// from+size 分页方式获取日志,受es 的max_result_window参数限制,默认最多返回1w条日志, 可以使用search_after方式获取更多日志
|
||||
|
||||
source := elastic.NewSearchSource().
|
||||
TrackTotalHits(true).
|
||||
Query(queryString).
|
||||
Size(param.Limit)
|
||||
// 是否使用search_after方式
|
||||
if param.SearchAfter != nil {
|
||||
// 设置默认排序字段
|
||||
if len(param.SearchAfter.SortFields) == 0 {
|
||||
source = source.Sort(param.DateField, param.Ascending).Sort(string(FieldIndex), true).Sort(string(FieldId), true)
|
||||
} else {
|
||||
for _, field := range param.SearchAfter.SortFields {
|
||||
source = source.Sort(field.Field, field.Ascending)
|
||||
}
|
||||
}
|
||||
if len(param.SearchAfter.SearchAfter) > 0 {
|
||||
source = source.SearchAfter(param.SearchAfter.SearchAfter...)
|
||||
}
|
||||
} else {
|
||||
source = source.From(param.P).Sort(param.DateField, param.Ascending)
|
||||
}
|
||||
From(param.P).
|
||||
Size(param.Limit).
|
||||
Sort(param.DateField, param.Ascending)
|
||||
|
||||
result, err := search(ctx, indexArr, source, param.Timeout, param.MaxShard)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error:%v", err)
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
@@ -28,16 +27,11 @@ type Doris struct {
|
||||
}
|
||||
|
||||
type QueryParam struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
Limit int `json:"limit" mapstructure:"limit"`
|
||||
From int64 `json:"from" mapstructure:"from"`
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
TimeField string `json:"time_field" mapstructure:"time_field"`
|
||||
TimeFormat string `json:"time_format" mapstructure:"time_format"`
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
}
|
||||
|
||||
func (d *Doris) InitClient() error {
|
||||
@@ -72,7 +66,7 @@ func (d *Doris) Validate(ctx context.Context) error {
|
||||
func (d *Doris) Equal(p datasource.Datasource) bool {
|
||||
newest, ok := p.(*Doris)
|
||||
if !ok {
|
||||
logger.Errorf("unexpected plugin type, expected is doris")
|
||||
logger.Errorf("unexpected plugin type, expected is ck")
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -180,14 +174,6 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryLogs(ctx, &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
@@ -201,7 +187,7 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
logs = append(logs, items[i])
|
||||
}
|
||||
|
||||
return logs, int64(len(logs)), nil
|
||||
return logs, 0, nil
|
||||
}
|
||||
|
||||
func (d *Doris) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
|
||||
|
||||
@@ -23,7 +23,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
|
||||
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -162,7 +162,6 @@ func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models
|
||||
return nil, err
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -230,7 +229,6 @@ func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interfa
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -282,17 +280,7 @@ func parseDBName(sql string) (db string, err error) {
|
||||
if len(matches) != 4 {
|
||||
return "", fmt.Errorf("no valid table name in format database.schema.table found")
|
||||
}
|
||||
return strings.Trim(matches[1], `"`), nil
|
||||
}
|
||||
|
||||
// formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
|
||||
// 在pgsql中,大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的,所以这里转为 "dbname".scheme."tabname"
|
||||
func formatSQLDatabaseNameWithRegex(sql string) string {
|
||||
// 匹配 from dbname.scheme.table_name 的模式
|
||||
// 使用捕获组来精确匹配数据库名称,确保后面跟着scheme和table
|
||||
re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
|
||||
|
||||
return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
|
||||
return matches[1], nil
|
||||
}
|
||||
|
||||
func extractColumns(sql string) ([]string, error) {
|
||||
|
||||
@@ -956,7 +956,7 @@ CREATE TABLE notify_rule (
|
||||
id bigserial PRIMARY KEY,
|
||||
name varchar(255) NOT NULL,
|
||||
description text,
|
||||
enable boolean DEFAULT false,
|
||||
enable smallint NOT NULL DEFAULT 0,
|
||||
user_group_ids varchar(255) NOT NULL DEFAULT '',
|
||||
notify_configs text,
|
||||
pipeline_configs text,
|
||||
@@ -971,7 +971,7 @@ CREATE TABLE notify_channel (
|
||||
name varchar(255) NOT NULL,
|
||||
ident varchar(255) NOT NULL,
|
||||
description text,
|
||||
enable boolean DEFAULT false,
|
||||
enable smallint NOT NULL DEFAULT 0,
|
||||
param_config text,
|
||||
request_type varchar(50) NOT NULL,
|
||||
request_config text,
|
||||
|
||||
@@ -22,8 +22,6 @@ import (
|
||||
|
||||
var FromAPIHook func()
|
||||
|
||||
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
|
||||
|
||||
func Init(ctx *ctx.Context, fromAPI bool) {
|
||||
go getDatasourcesFromDBLoop(ctx, fromAPI)
|
||||
}
|
||||
@@ -102,10 +100,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
|
||||
}
|
||||
|
||||
if DatasourceProcessHook != nil {
|
||||
dss = DatasourceProcessHook(dss)
|
||||
}
|
||||
|
||||
PutDatasources(dss)
|
||||
} else {
|
||||
FromAPIHook()
|
||||
@@ -169,7 +163,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
|
||||
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
|
||||
if err != nil {
|
||||
logger.Debugf("get plugin:%+v fail: %v", item, err)
|
||||
logger.Warningf("get plugin:%+v fail: %v", item, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -129,7 +129,9 @@ func (c *Clickhouse) QueryRows(ctx context.Context, query string) (*sql.Rows, er
|
||||
|
||||
// ShowDatabases lists all databases in Clickhouse
|
||||
func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
res := make([]string, 0)
|
||||
var (
|
||||
res []string
|
||||
)
|
||||
|
||||
rows, err := c.QueryRows(ctx, ShowDatabases)
|
||||
if err != nil {
|
||||
@@ -149,7 +151,9 @@ func (c *Clickhouse) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
|
||||
// ShowTables lists all tables in a given database
|
||||
func (c *Clickhouse) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
res := make([]string, 0)
|
||||
var (
|
||||
res []string
|
||||
)
|
||||
|
||||
showTables := fmt.Sprintf(ShowTables, database)
|
||||
rows, err := c.QueryRows(ctx, showTables)
|
||||
|
||||
@@ -20,8 +20,8 @@ import (
|
||||
|
||||
// Doris struct to hold connection details and the connection object
|
||||
type Doris struct {
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // be node
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe node
|
||||
User string `json:"doris.user" mapstructure:"doris.user"` //
|
||||
Password string `json:"doris.password" mapstructure:"doris.password"` //
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
|
||||
@@ -138,7 +138,7 @@ func (d *Doris) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
databases := make([]string, 0)
|
||||
var databases []string
|
||||
for rows.Next() {
|
||||
var dbName string
|
||||
if err := rows.Scan(&dbName); err != nil {
|
||||
@@ -201,7 +201,7 @@ func (d *Doris) ShowResources(ctx context.Context, resourceType string) ([]strin
|
||||
}
|
||||
|
||||
// 将 map 转换为切片
|
||||
resources := make([]string, 0)
|
||||
var resources []string
|
||||
for name := range distinctName {
|
||||
resources = append(resources, name)
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func (d *Doris) ShowTables(ctx context.Context, database string) ([]string, erro
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
tables := make([]string, 0)
|
||||
var tables []string
|
||||
for rows.Next() {
|
||||
var tableName string
|
||||
if err := rows.Scan(&tableName); err != nil {
|
||||
|
||||
@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
|
||||
}()
|
||||
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
|
||||
db, err = sqlbase.NewDB(
|
||||
|
||||
return sqlbase.NewDB(
|
||||
ctx,
|
||||
mysql.Open(dsn),
|
||||
shard.MaxIdleConns,
|
||||
shard.MaxOpenConns,
|
||||
time.Duration(shard.ConnMaxLifetime)*time.Second,
|
||||
)
|
||||
return db, err
|
||||
}
|
||||
|
||||
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
|
||||
@@ -48,7 +48,7 @@ func CloseDB(db *gorm.DB) error {
|
||||
|
||||
// ShowTables retrieves a list of all tables in the specified database
|
||||
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
|
||||
tables := make([]string, 0)
|
||||
var tables []string
|
||||
|
||||
rows, err := db.WithContext(ctx).Raw(query).Rows()
|
||||
if err != nil {
|
||||
|
||||
@@ -122,7 +122,7 @@ func (tc *Tdengine) QueryTable(query string) (APIResponse, error) {
|
||||
}
|
||||
|
||||
func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
|
||||
databases := make([]string, 0)
|
||||
var databases []string
|
||||
data, err := tc.QueryTable("show databases")
|
||||
if err != nil {
|
||||
return databases, err
|
||||
@@ -135,7 +135,7 @@ func (tc *Tdengine) ShowDatabases(context.Context) ([]string, error) {
|
||||
}
|
||||
|
||||
func (tc *Tdengine) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
tables := make([]string, 0)
|
||||
var tables []string
|
||||
sql := fmt.Sprintf("show %s", database)
|
||||
data, err := tc.QueryTable(sql)
|
||||
if err != nil {
|
||||
|
||||
@@ -240,17 +240,17 @@ func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.
|
||||
go ncc.startNotifyConsumer(chID, queue, quitCh)
|
||||
}
|
||||
|
||||
logger.Debugf("started %d notify consumers for channel %d", concurrency, chID)
|
||||
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
|
||||
}
|
||||
|
||||
// 启动通知消费者协程
|
||||
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
|
||||
logger.Debugf("starting notify consumer for channel %d", channelID)
|
||||
logger.Infof("starting notify consumer for channel %d", channelID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-quitCh:
|
||||
logger.Debugf("notify consumer for channel %d stopped", channelID)
|
||||
logger.Infof("notify consumer for channel %d stopped", channelID)
|
||||
return
|
||||
default:
|
||||
// 从队列中取出任务
|
||||
@@ -502,11 +502,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
|
||||
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
|
||||
}
|
||||
|
||||
// 记录通知详情
|
||||
if ncc.notifyRecordFunc != nil {
|
||||
target := strings.Join(m.Mail.GetHeader("To"), ",")
|
||||
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
|
||||
}
|
||||
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
|
||||
size++
|
||||
|
||||
if size >= conf.Batch {
|
||||
|
||||
@@ -735,15 +735,6 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
if column == "notify_groups" || column == "notify_channels" {
|
||||
updates := map[string]interface{}{
|
||||
column: value,
|
||||
"notify_version": 0,
|
||||
"notify_rule_ids": []int64{},
|
||||
}
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
|
||||
}
|
||||
|
||||
|
||||
@@ -116,18 +116,7 @@ func (s *AlertSubscribe) Verify() error {
|
||||
return errors.New("severities is required")
|
||||
}
|
||||
|
||||
if s.NotifyVersion == 1 {
|
||||
if len(s.NotifyRuleIds) == 0 {
|
||||
return errors.New("no notify rules selected")
|
||||
}
|
||||
|
||||
s.UserGroupIds = ""
|
||||
s.RedefineChannels = 0
|
||||
s.NewChannels = ""
|
||||
s.RedefineWebhooks = 0
|
||||
s.Webhooks = ""
|
||||
s.RedefineSeverity = 0
|
||||
s.NewSeverity = 0
|
||||
if len(s.NotifyRuleIds) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -143,8 +132,8 @@ func (s *AlertSubscribe) Verify() error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.NotifyVersion == 0 {
|
||||
s.NotifyRuleIds = []int64{}
|
||||
if s.NotifyVersion == 1 && len(s.NotifyRuleIds) == 0 {
|
||||
return errors.New("no notify rules selected")
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -392,17 +381,6 @@ func (s *AlertSubscribe) MatchProd(prod string) bool {
|
||||
return s.Prod == prod
|
||||
}
|
||||
|
||||
func (s *AlertSubscribe) MatchCate(cate string) bool {
|
||||
if s.Cate == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
if s.Cate == "host" {
|
||||
return cate == "host"
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *AlertSubscribe) MatchCluster(dsId int64) bool {
|
||||
// 没有配置数据源, 或者事件不需要关联数据源
|
||||
// do not match any datasource or event not related to datasource
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Datasource struct {
|
||||
@@ -141,7 +140,7 @@ func (ds *Datasource) Update(ctx *ctx.Context, selectField interface{}, selectFi
|
||||
if ds.UpdatedAt == 0 {
|
||||
ds.UpdatedAt = time.Now().Unix()
|
||||
}
|
||||
return DB(ctx).Model(ds).Session(&gorm.Session{SkipHooks: true}).Select(selectField, selectFields...).Updates(ds).Error
|
||||
return DB(ctx).Model(ds).Select(selectField, selectFields...).Updates(ds).Error
|
||||
}
|
||||
|
||||
func (ds *Datasource) Add(ctx *ctx.Context) error {
|
||||
|
||||
@@ -72,10 +72,8 @@ func MigrateTables(db *gorm.DB) error {
|
||||
|
||||
if isPostgres(db) {
|
||||
dts = append(dts, &models.PostgresBuiltinComponent{})
|
||||
DropUniqueFiledLimit(db, &models.PostgresBuiltinComponent{}, "idx_ident", "idx_ident")
|
||||
} else {
|
||||
dts = append(dts, &models.BuiltinComponent{})
|
||||
DropUniqueFiledLimit(db, &models.BuiltinComponent{}, "idx_ident", "idx_ident")
|
||||
}
|
||||
|
||||
if !db.Migrator().HasColumn(&imodels.TaskSchedulerHealth{}, "scheduler") {
|
||||
@@ -126,19 +124,11 @@ func MigrateTables(db *gorm.DB) error {
|
||||
DropUniqueFiledLimit(db, &Configs{}, "ckey", "configs_ckey_key")
|
||||
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
|
||||
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
|
||||
|
||||
// 添加自增主键 ii 列到指定表
|
||||
// addAutoIncrementPrimaryKey(db)
|
||||
|
||||
InsertPermPoints(db)
|
||||
return nil
|
||||
}
|
||||
|
||||
func DropUniqueFiledLimit(db *gorm.DB, dst interface{}, uniqueFiled string, pgUniqueFiled string) { // UNIQUE KEY (`ckey`)
|
||||
// 先检查表是否存在,如果不存在则直接返回
|
||||
if !db.Migrator().HasTable(dst) {
|
||||
return
|
||||
}
|
||||
|
||||
if db.Migrator().HasIndex(dst, uniqueFiled) {
|
||||
err := db.Migrator().DropIndex(dst, uniqueFiled) //mysql DROP INDEX
|
||||
if err != nil {
|
||||
@@ -169,6 +159,110 @@ func columnHasIndex(db *gorm.DB, dst interface{}, indexColumn string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func InsertPermPoints(db *gorm.DB) {
|
||||
var ops []models.RoleOperation
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/alert-mutes/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/log/index-patterns",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/help/variable-configs",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/ibex-settings",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-templates/del",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/notification-rules/del",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/add",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/put",
|
||||
})
|
||||
|
||||
ops = append(ops, models.RoleOperation{
|
||||
RoleName: "Standard",
|
||||
Operation: "/event-pipelines/del",
|
||||
})
|
||||
|
||||
for _, op := range ops {
|
||||
var count int64
|
||||
|
||||
session := db.Session(&gorm.Session{}).Model(&models.RoleOperation{})
|
||||
err := session.Where("operation = ? AND role_name = ?", op.Operation, op.RoleName).Count(&count).Error
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("check role operation exists failed, %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if count > 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
err = session.Create(&op).Error
|
||||
if err != nil {
|
||||
logger.Errorf("insert role operation failed, %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type AlertRule struct {
|
||||
ExtraConfig string `gorm:"type:text;column:extra_config"`
|
||||
CronPattern string `gorm:"type:varchar(64);column:cron_pattern"`
|
||||
@@ -371,43 +465,3 @@ type NotifyChannelConfig struct {
|
||||
func (c *NotifyChannelConfig) TableName() string {
|
||||
return "notify_channel"
|
||||
}
|
||||
|
||||
// addAutoIncrementPrimaryKey 为指定表添加自增主键 ii 列
|
||||
func addAutoIncrementPrimaryKey(db *gorm.DB) {
|
||||
// 只在 MySQL 数据库上执行这些操作
|
||||
switch db.Dialector.(type) {
|
||||
case *mysql.Dialector:
|
||||
// 为 task_scheduler_health 表添加 ii 列作为主键
|
||||
if db.Migrator().HasTable("task_scheduler_health") && !db.Migrator().HasColumn("task_scheduler_health", "ii") {
|
||||
err := db.Exec("ALTER TABLE `task_scheduler_health` ADD `ii` INT PRIMARY KEY AUTO_INCREMENT").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to add ii column to task_scheduler_health: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 为 board_busigroup 表重构主键
|
||||
if db.Migrator().HasTable("board_busigroup") && !db.Migrator().HasColumn("board_busigroup", "ii") {
|
||||
// 使用 SHOW KEYS 检查主键是否存在(权限要求更低)
|
||||
var keyInfo []map[string]interface{}
|
||||
err := db.Raw("SHOW KEYS FROM `board_busigroup` WHERE Key_name = 'PRIMARY'").Find(&keyInfo).Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to check primary key for board_busigroup: %v", err)
|
||||
}
|
||||
|
||||
// 只有当主键存在时才删除
|
||||
if len(keyInfo) > 0 {
|
||||
err = db.Exec("ALTER TABLE `board_busigroup` DROP PRIMARY KEY").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to drop primary key from board_busigroup: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 添加新的自增主键列
|
||||
err = db.Exec("ALTER TABLE `board_busigroup` ADD COLUMN `ii` INT AUTO_INCREMENT PRIMARY KEY FIRST").Error
|
||||
if err != nil {
|
||||
logger.Errorf("failed to add new primary key to board_busigroup: %v", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
|
||||
cmd.Stderr = &buf
|
||||
|
||||
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
|
||||
|
||||
res := buf.String()
|
||||
|
||||
@@ -922,6 +922,11 @@ func (ncc *NotifyChannelConfig) ValidateFlashDutyRequestConfig() error {
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) Update(ctx *ctx.Context, ref NotifyChannelConfig) error {
|
||||
// ref.FE2DB()
|
||||
if ncc.Ident != ref.Ident {
|
||||
return errors.New("cannot update ident")
|
||||
}
|
||||
|
||||
ref.ID = ncc.ID
|
||||
ref.CreateAt = ncc.CreateAt
|
||||
ref.CreateBy = ncc.CreateBy
|
||||
|
||||
@@ -8,11 +8,11 @@ import (
|
||||
|
||||
type UserToken struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Username string `json:"username" gorm:"type:varchar(255); not null; default ''"`
|
||||
TokenName string `json:"token_name" gorm:"type:varchar(255); not null; default ''"`
|
||||
Token string `json:"token" gorm:"type:varchar(255); not null; default ''"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint; not null; default 0"`
|
||||
LastUsed int64 `json:"last_used" gorm:"type:bigint; not null; default 0"`
|
||||
Username string `json:"username" gorm:"type:varchar(255) not null default ''"`
|
||||
TokenName string `json:"token_name" gorm:"type:varchar(255) not null default ''"`
|
||||
Token string `json:"token" gorm:"type:varchar(255) not null default ''"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint not null default 0"`
|
||||
LastUsed int64 `json:"last_used" gorm:"type:bigint not null default 0"`
|
||||
}
|
||||
|
||||
func (UserToken) TableName() string {
|
||||
|
||||
@@ -89,7 +89,7 @@ func diffMap(m1, m2 map[int64]*models.User) []models.User {
|
||||
func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
for i := range m1 {
|
||||
if _, ok := m2[i]; ok {
|
||||
if m1[i].Email != m2[i].Email || !PhoneIsSame(m1[i].Phone, m2[i].Phone) || m1[i].Username != m2[i].Username {
|
||||
if m1[i].Email != m2[i].Email || m1[i].Phone != m2[i].Phone || m1[i].Username != m2[i].Username {
|
||||
var flashdutyUser User
|
||||
|
||||
flashdutyUser = User{
|
||||
@@ -110,30 +110,6 @@ func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
}
|
||||
}
|
||||
|
||||
func PhoneIsSame(phone1, phone2 string) bool {
|
||||
// 兼容不同国家/地区前缀,例如 +86、+1、+44 等,以及包含空格或短横线的格式
|
||||
normalize := func(p string) string {
|
||||
p = strings.TrimSpace(p)
|
||||
p = strings.ReplaceAll(p, " ", "")
|
||||
p = strings.ReplaceAll(p, "-", "")
|
||||
p = strings.TrimPrefix(p, "+")
|
||||
return p
|
||||
}
|
||||
|
||||
p1 := normalize(phone1)
|
||||
p2 := normalize(phone2)
|
||||
|
||||
if p1 == p2 {
|
||||
return true
|
||||
}
|
||||
|
||||
// 如果长度相差不超过 3 且较长的以较短的结尾,则认为是相同号码(忽略最多 3 位国家区号差异)
|
||||
if len(p1) > len(p2) {
|
||||
return len(p1)-len(p2) <= 3 && strings.HasSuffix(p1, p2)
|
||||
}
|
||||
return len(p2)-len(p1) <= 3 && strings.HasSuffix(p2, p1)
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Email string `json:"email,omitempty"`
|
||||
Phone string `json:"phone,omitempty"`
|
||||
|
||||
@@ -1,67 +0,0 @@
|
||||
package flashduty
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPhoneIsSame(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
phone1 string
|
||||
phone2 string
|
||||
same bool
|
||||
}{
|
||||
{
|
||||
name: "blank",
|
||||
phone1: "",
|
||||
phone2: "",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 prefix",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 with spaces and hyphens",
|
||||
phone1: "+86 138-1234-5678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "USA +1 prefix",
|
||||
phone1: "+1 234-567-8900",
|
||||
phone2: "2345678900",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "UK +44 prefix",
|
||||
phone1: "+442078765432",
|
||||
phone2: "2078765432",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "India +91 prefix",
|
||||
phone1: "+919876543210",
|
||||
phone2: "9876543210",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Germany +49 prefix",
|
||||
phone1: "+4915123456789",
|
||||
phone2: "15123456789",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Different numbers",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345679",
|
||||
same: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := PhoneIsSame(tt.phone1, tt.phone2); got != tt.same {
|
||||
t.Errorf("%s: expected %v, got %v", tt.name, tt.same, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,7 +77,6 @@ var I18N = `{
|
||||
"event tag not match tag filter": "事件标签不匹配标签过滤器",
|
||||
"event attributes not match attributes filter": "事件属性不匹配属性过滤器",
|
||||
"failed to parse tag filter: %v": "解析标签过滤器失败: %v",
|
||||
"Alert history events deletion started": "告警历史事件删除任务已启动",
|
||||
|
||||
"Infrastructure": "基础设施",
|
||||
"Host - View": "机器 - 查看",
|
||||
@@ -268,7 +267,6 @@ var I18N = `{
|
||||
"event tag not match tag filter": "事件標籤不匹配標籤過濾器",
|
||||
"event attributes not match attributes filter": "事件屬性不匹配屬性過濾器",
|
||||
"failed to parse tag filter: %v": "解析標籤過濾器失敗: %v",
|
||||
"Alert history events deletion started": "告警歷史事件刪除任務已啟動",
|
||||
|
||||
"Infrastructure": "基礎設施",
|
||||
"Host - View": "機器 - 查看",
|
||||
@@ -456,7 +454,6 @@ var I18N = `{
|
||||
"event tag not match tag filter": "イベントタグがタグフィルタと一致しません",
|
||||
"event attributes not match attributes filter": "イベント属性が属性フィルタと一致しません",
|
||||
"failed to parse tag filter: %v": "タグフィルタの解析に失敗しました: %v",
|
||||
"Alert history events deletion started": "アラート履歴イベントの削除タスクが開始されました",
|
||||
|
||||
"Infrastructure": "インフラストラクチャ",
|
||||
"Host - View": "機器 - 閲覧",
|
||||
@@ -644,7 +641,6 @@ var I18N = `{
|
||||
"event tag not match tag filter": "Теги события не соответствуют фильтру тегов",
|
||||
"event attributes not match attributes filter": "Атрибуты события не соответствуют фильтру атрибутов",
|
||||
"failed to parse tag filter: %v": "Не удалось разобрать фильтр тегов: %v",
|
||||
"Alert history events deletion started": "Задача удаления исторических событий оповещений запущена",
|
||||
|
||||
"Infrastructure": "Инфраструктура",
|
||||
"Host - View": "Хост - Просмотр",
|
||||
|
||||
@@ -71,24 +71,6 @@ func ValueFormatter(unit string, decimals int, value float64) FormattedValue {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle positive and negative infinity
|
||||
if math.IsInf(value, 1) {
|
||||
return FormattedValue{
|
||||
Value: 9999999999,
|
||||
Unit: "",
|
||||
Text: "+Inf",
|
||||
Stat: 9999999999,
|
||||
}
|
||||
}
|
||||
if math.IsInf(value, -1) {
|
||||
return FormattedValue{
|
||||
Value: -9999999999,
|
||||
Unit: "",
|
||||
Text: "-Inf",
|
||||
Stat: -9999999999,
|
||||
}
|
||||
}
|
||||
|
||||
// 处理时间单位
|
||||
switch unit {
|
||||
case "none":
|
||||
|
||||
@@ -13,7 +13,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
)
|
||||
@@ -118,11 +117,6 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
var exists []string
|
||||
err = s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
@@ -139,34 +133,16 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
for i := 0; i < len(exists); i++ {
|
||||
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
|
||||
if err != nil {
|
||||
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
return fmt.Errorf("redis is nil")
|
||||
|
||||
@@ -18,9 +18,6 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
|
||||
BusiGroupLabelKey string
|
||||
IdentMetrics []string
|
||||
@@ -52,7 +49,6 @@ type WriterOptions struct {
|
||||
Url string
|
||||
BasicAuthUser string
|
||||
BasicAuthPass string
|
||||
AsyncWrite bool // 如果有多个转发 writer,对应不重要的 writer,可以设置为 true,异步转发提供转发效率
|
||||
|
||||
Timeout int64
|
||||
DialTimeout int64
|
||||
@@ -128,14 +124,6 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
if p.BusiGroupLabelKey == "" {
|
||||
p.BusiGroupLabelKey = "busigroup"
|
||||
}
|
||||
|
||||
@@ -105,17 +105,6 @@ var (
|
||||
},
|
||||
[]string{"operation", "status"},
|
||||
)
|
||||
|
||||
DBOperationLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "db_operation_latency_seconds",
|
||||
Help: "Histogram of latencies for DB operations",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -132,6 +121,5 @@ func init() {
|
||||
GaugeSampleQueueSize,
|
||||
CounterPushQueueOverLimitTotal,
|
||||
RedisOperationLatency,
|
||||
DBOperationLatency,
|
||||
)
|
||||
}
|
||||
|
||||
27
pushgw/writer/json/json-iterator.go
Normal file
27
pushgw/writer/json/json-iterator.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package json
|
||||
|
||||
import (
|
||||
"math"
|
||||
"unsafe"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// 为了处理prom数据中的NaN值
|
||||
jsoniter.RegisterTypeEncoderFunc("float64", func(ptr unsafe.Pointer, stream *jsoniter.Stream) {
|
||||
f := *(*float64)(ptr)
|
||||
if math.IsNaN(f) {
|
||||
stream.WriteString("null")
|
||||
} else {
|
||||
stream.WriteFloat64(f)
|
||||
}
|
||||
}, func(ptr unsafe.Pointer) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func MarshalWithCustomFloat(items interface{}) ([]byte, error) {
|
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
return json.Marshal(items)
|
||||
}
|
||||
@@ -3,9 +3,7 @@ package writer
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -17,6 +15,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/kafka"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer/json"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
@@ -53,49 +52,8 @@ func beforeWrite(key string, items []prompb.TimeSeries, forceUseServerTS bool, e
|
||||
|
||||
return proto.Marshal(req)
|
||||
}
|
||||
// 如果是 json 格式,将 NaN 值的数据丢弃掉
|
||||
return json.Marshal(filterNaNSamples(items))
|
||||
}
|
||||
|
||||
func filterNaNSamples(items []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
// 早期检查:如果没有NaN值,直接返回原始数据
|
||||
hasNaN := false
|
||||
for i := range items {
|
||||
for j := range items[i].Samples {
|
||||
if math.IsNaN(items[i].Samples[j].Value) {
|
||||
hasNaN = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasNaN {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !hasNaN {
|
||||
return items
|
||||
}
|
||||
|
||||
// 有NaN值时进行过滤,原地修改以减少内存分配
|
||||
for i := range items {
|
||||
samples := items[i].Samples
|
||||
validCount := 0
|
||||
|
||||
// 原地过滤 samples,避免额外的内存分配
|
||||
for j := range samples {
|
||||
if !math.IsNaN(samples[j].Value) {
|
||||
if validCount != j {
|
||||
samples[validCount] = samples[j]
|
||||
}
|
||||
validCount++
|
||||
}
|
||||
}
|
||||
|
||||
// 保留所有时间序列,即使没有有效样本(此时Samples为空)
|
||||
items[i].Samples = samples[:validCount]
|
||||
}
|
||||
|
||||
return items
|
||||
return json.MarshalWithCustomFloat(items)
|
||||
}
|
||||
|
||||
func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[string]string) {
|
||||
@@ -199,11 +157,10 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
}
|
||||
|
||||
type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
PushConcurrency atomic.Int64
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -253,31 +210,6 @@ func (ws *WritersType) Put(name string, writer Writer) {
|
||||
ws.backends[name] = writer
|
||||
}
|
||||
|
||||
func (ws *WritersType) isCriticalBackend(key string) bool {
|
||||
backend, exists := ws.backends[key]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// 使用类型断言判断
|
||||
switch backend.(type) {
|
||||
case WriterType:
|
||||
if backend.(WriterType).Opts.AsyncWrite {
|
||||
return false
|
||||
}
|
||||
|
||||
// HTTP Writer 作为关键后端
|
||||
return true
|
||||
case KafkaWriterType:
|
||||
// Kafka Writer 作为非关键后端
|
||||
return false
|
||||
default:
|
||||
// 未知类型,保守起见作为关键后端
|
||||
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) CleanExpQueue() {
|
||||
for {
|
||||
ws.Lock()
|
||||
@@ -346,64 +278,12 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
continue
|
||||
}
|
||||
for key := range ws.backends {
|
||||
|
||||
if ws.isCriticalBackend(key) {
|
||||
ws.backends[key].Write(key, series)
|
||||
} else {
|
||||
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
|
||||
ws.writeToNonCriticalBackend(key, series)
|
||||
}
|
||||
ws.backends[key].Write(key, series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
|
||||
// 原子性地检查并增加并发数
|
||||
currentConcurrency := ws.PushConcurrency.Add(1)
|
||||
|
||||
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
|
||||
// 超过限制,立即减少计数并丢弃
|
||||
ws.PushConcurrency.Add(-1)
|
||||
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
|
||||
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
|
||||
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
|
||||
return
|
||||
}
|
||||
|
||||
// 深拷贝数据,确保并发安全
|
||||
seriesCopy := ws.deepCopySeries(series)
|
||||
|
||||
// 启动goroutine处理
|
||||
go func(backendKey string, data []prompb.TimeSeries) {
|
||||
defer func() {
|
||||
ws.PushConcurrency.Add(-1)
|
||||
if r := recover(); r != nil {
|
||||
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
|
||||
}
|
||||
}()
|
||||
|
||||
ws.backends[backendKey].Write(backendKey, data)
|
||||
}(key, seriesCopy)
|
||||
}
|
||||
|
||||
// 完整的深拷贝方法
|
||||
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
seriesCopy := make([]prompb.TimeSeries, len(series))
|
||||
|
||||
for i := range series {
|
||||
seriesCopy[i] = series[i]
|
||||
|
||||
if len(series[i].Samples) > 0 {
|
||||
samples := make([]prompb.Sample, len(series[i].Samples))
|
||||
copy(samples, series[i].Samples)
|
||||
seriesCopy[i].Samples = samples
|
||||
}
|
||||
}
|
||||
|
||||
return seriesCopy
|
||||
}
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
ws.AllQueueLen.Store(int64(0))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user