Compare commits

..

1 Commits

Author SHA1 Message Date
Ulric Qin
93ff325f72 fix execution of notify script 2025-07-06 08:38:45 +08:00
16 changed files with 32 additions and 222 deletions

View File

@@ -595,10 +595,6 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv
return
}
if !sub.MatchCate(event.Cate) {
return
}
if !common.MatchTags(event.TagsMap, sub.ITags) {
return
}

View File

@@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"text/template"
"time"
@@ -144,11 +143,7 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
// 合并自定义参数
for k, v := range c.CustomParams {
converted, err := convertCustomParam(v)
if err != nil {
return "", fmt.Errorf("failed to convert custom param %s: %v", k, err)
}
reqParams[k] = converted
reqParams[k] = v
}
// 序列化请求体
@@ -201,44 +196,3 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
return chatResp.Choices[0].Message.Content, nil
}
// convertCustomParam 将前端传入的参数转换为正确的类型
func convertCustomParam(value interface{}) (interface{}, error) {
if value == nil {
return nil, nil
}
// 如果是字符串,尝试转换为其他类型
if str, ok := value.(string); ok {
// 尝试转换为数字
if f, err := strconv.ParseFloat(str, 64); err == nil {
// 检查是否为整数
if f == float64(int64(f)) {
return int64(f), nil
}
return f, nil
}
// 尝试转换为布尔值
if b, err := strconv.ParseBool(str); err == nil {
return b, nil
}
// 尝试解析为JSON数组
if strings.HasPrefix(strings.TrimSpace(str), "[") {
var arr []interface{}
if err := json.Unmarshal([]byte(str), &arr); err == nil {
return arr, nil
}
}
// 尝试解析为JSON对象
if strings.HasPrefix(strings.TrimSpace(str), "{") {
var obj map[string]interface{}
if err := json.Unmarshal([]byte(str), &obj); err == nil {
return obj, nil
}
}
}
return value, nil
}

View File

@@ -67,73 +67,3 @@ func TestAISummaryConfig_Process(t *testing.T) {
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
}
func TestConvertCustomParam(t *testing.T) {
tests := []struct {
name string
input interface{}
expected interface{}
hasError bool
}{
{
name: "nil value",
input: nil,
expected: nil,
hasError: false,
},
{
name: "string number to int64",
input: "123",
expected: int64(123),
hasError: false,
},
{
name: "string float to float64",
input: "123.45",
expected: 123.45,
hasError: false,
},
{
name: "string boolean to bool",
input: "true",
expected: true,
hasError: false,
},
{
name: "string false to bool",
input: "false",
expected: false,
hasError: false,
},
{
name: "JSON array string to slice",
input: `["a", "b", "c"]`,
expected: []interface{}{"a", "b", "c"},
hasError: false,
},
{
name: "JSON object string to map",
input: `{"key": "value", "num": 123}`,
expected: map[string]interface{}{"key": "value", "num": float64(123)},
hasError: false,
},
{
name: "plain string remains string",
input: "hello world",
expected: "hello world",
hasError: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
converted, err := convertCustomParam(test.input)
if test.hasError {
assert.Error(t, err)
return
}
assert.NoError(t, err)
assert.Equal(t, test.expected, converted)
})
}
}

View File

@@ -187,7 +187,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
CreatedBy: SYSTEM,
UpdatedBy: SYSTEM,
}
BuiltinPayloadInFile.AddBuiltinPayload(&builtinAlert)
BuiltinPayloadInFile.addBuiltinPayload(&builtinAlert)
}
}
@@ -245,7 +245,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
CreatedBy: SYSTEM,
UpdatedBy: SYSTEM,
}
BuiltinPayloadInFile.AddBuiltinPayload(&builtinDashboard)
BuiltinPayloadInFile.addBuiltinPayload(&builtinDashboard)
}
} else if err != nil {
logger.Warningf("read builtin component dash dir fail %s %v", component.Ident, err)
@@ -314,7 +314,7 @@ func NewBuiltinPayloadInFileType() *BuiltinPayloadInFileType {
}
}
func (b *BuiltinPayloadInFileType) AddBuiltinPayload(bp *models.BuiltinPayload) {
func (b *BuiltinPayloadInFileType) addBuiltinPayload(bp *models.BuiltinPayload) {
if _, exists := b.Data[bp.ComponentID]; !exists {
b.Data[bp.ComponentID] = make(map[string]map[string][]*models.BuiltinPayload)
}

View File

@@ -543,20 +543,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
}
}
// 检测是否是批量更新通知规则的字段,如果是清理掉旧版本的配置
for k := range f.Fields {
if k == "notify_rule_ids" {
f.Fields["notify_version"] = 1
f.Fields["notify_channels"] = ""
f.Fields["notify_groups"] = ""
f.Fields["callbacks"] = ""
}
if k == "notify_channels" {
f.Fields["notify_version"] = 0
}
}
for k, v := range f.Fields {
// 检查 v 是否为各种切片类型
switch v.(type) {

View File

@@ -288,7 +288,6 @@ func (rt *Router) alertSubscribePut(c *gin.Context) {
"busi_groups",
"note",
"notify_rule_ids",
"notify_version",
))
}

View File

@@ -149,12 +149,6 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
f.Fields["datasource_queries"] = string(bytes)
}
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
bytes, err := json.Marshal(datasourceIds)
ginx.Dangerous(err)
f.Fields["datasource_ids"] = string(bytes)
}
for i := 0; i < len(f.Ids); i++ {
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
ginx.Dangerous(err)

View File

@@ -8,7 +8,6 @@ import (
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
@@ -28,16 +27,11 @@ type Doris struct {
}
type QueryParam struct {
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
Limit int `json:"limit" mapstructure:"limit"`
From int64 `json:"from" mapstructure:"from"`
To int64 `json:"to" mapstructure:"to"`
TimeField string `json:"time_field" mapstructure:"time_field"`
TimeFormat string `json:"time_format" mapstructure:"time_format"`
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
}
func (d *Doris) InitClient() error {
@@ -72,7 +66,7 @@ func (d *Doris) Validate(ctx context.Context) error {
func (d *Doris) Equal(p datasource.Datasource) bool {
newest, ok := p.(*Doris)
if !ok {
logger.Errorf("unexpected plugin type, expected is doris")
logger.Errorf("unexpected plugin type, expected is ck")
return false
}
@@ -180,14 +174,6 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
return nil, 0, err
}
if strings.Contains(dorisQueryParam.SQL, "$__") {
var err error
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
if err != nil {
return nil, 0, err
}
}
items, err := d.QueryLogs(ctx, &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
@@ -201,7 +187,7 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
logs = append(logs, items[i])
}
return logs, int64(len(logs)), nil
return logs, 0, nil
}
func (d *Doris) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {

View File

@@ -956,7 +956,7 @@ CREATE TABLE notify_rule (
id bigserial PRIMARY KEY,
name varchar(255) NOT NULL,
description text,
enable boolean DEFAULT false,
enable smallint NOT NULL DEFAULT 0,
user_group_ids varchar(255) NOT NULL DEFAULT '',
notify_configs text,
pipeline_configs text,
@@ -971,7 +971,7 @@ CREATE TABLE notify_channel (
name varchar(255) NOT NULL,
ident varchar(255) NOT NULL,
description text,
enable boolean DEFAULT false,
enable smallint NOT NULL DEFAULT 0,
param_config text,
request_type varchar(50) NOT NULL,
request_config text,

View File

@@ -22,8 +22,6 @@ import (
var FromAPIHook func()
var DatasourceProcessHook func(items []datasource.DatasourceInfo) []datasource.DatasourceInfo
func Init(ctx *ctx.Context, fromAPI bool) {
go getDatasourcesFromDBLoop(ctx, fromAPI)
}
@@ -102,10 +100,6 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
atomic.StoreInt64(&PromDefaultDatasourceId, 0)
}
if DatasourceProcessHook != nil {
dss = DatasourceProcessHook(dss)
}
PutDatasources(dss)
} else {
FromAPIHook()

View File

@@ -20,8 +20,8 @@ import (
// Doris struct to hold connection details and the connection object
type Doris struct {
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // be node
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe node
User string `json:"doris.user" mapstructure:"doris.user"` //
Password string `json:"doris.password" mapstructure:"doris.password"` //
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`

View File

@@ -240,17 +240,17 @@ func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.
go ncc.startNotifyConsumer(chID, queue, quitCh)
}
logger.Debugf("started %d notify consumers for channel %d", concurrency, chID)
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
}
// 启动通知消费者协程
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
logger.Debugf("starting notify consumer for channel %d", channelID)
logger.Infof("starting notify consumer for channel %d", channelID)
for {
select {
case <-quitCh:
logger.Debugf("notify consumer for channel %d stopped", channelID)
logger.Infof("notify consumer for channel %d stopped", channelID)
return
default:
// 从队列中取出任务
@@ -502,11 +502,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
}
// 记录通知详情
if ncc.notifyRecordFunc != nil {
target := strings.Join(m.Mail.GetHeader("To"), ",")
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
}
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
size++
if size >= conf.Batch {

View File

@@ -116,18 +116,7 @@ func (s *AlertSubscribe) Verify() error {
return errors.New("severities is required")
}
if s.NotifyVersion == 1 {
if len(s.NotifyRuleIds) == 0 {
return errors.New("no notify rules selected")
}
s.UserGroupIds = ""
s.RedefineChannels = 0
s.NewChannels = ""
s.RedefineWebhooks = 0
s.Webhooks = ""
s.RedefineSeverity = 0
s.NewSeverity = 0
if len(s.NotifyRuleIds) > 0 {
return nil
}
@@ -143,8 +132,8 @@ func (s *AlertSubscribe) Verify() error {
}
}
if s.NotifyVersion == 0 {
s.NotifyRuleIds = []int64{}
if s.NotifyVersion == 1 && len(s.NotifyRuleIds) == 0 {
return errors.New("no notify rules selected")
}
return nil
@@ -392,17 +381,6 @@ func (s *AlertSubscribe) MatchProd(prod string) bool {
return s.Prod == prod
}
func (s *AlertSubscribe) MatchCate(cate string) bool {
if s.Cate == "" {
return true
}
if s.Cate == "host" {
return cate == "host"
}
return true
}
func (s *AlertSubscribe) MatchCluster(dsId int64) bool {
// 没有配置数据源, 或者事件不需要关联数据源
// do not match any datasource or event not related to datasource

View File

@@ -72,10 +72,8 @@ func MigrateTables(db *gorm.DB) error {
if isPostgres(db) {
dts = append(dts, &models.PostgresBuiltinComponent{})
DropUniqueFiledLimit(db, &models.PostgresBuiltinComponent{}, "idx_ident", "idx_ident")
} else {
dts = append(dts, &models.BuiltinComponent{})
DropUniqueFiledLimit(db, &models.BuiltinComponent{}, "idx_ident", "idx_ident")
}
if !db.Migrator().HasColumn(&imodels.TaskSchedulerHealth{}, "scheduler") {
@@ -126,17 +124,11 @@ func MigrateTables(db *gorm.DB) error {
DropUniqueFiledLimit(db, &Configs{}, "ckey", "configs_ckey_key")
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
InsertPermPoints(db)
return nil
}
func DropUniqueFiledLimit(db *gorm.DB, dst interface{}, uniqueFiled string, pgUniqueFiled string) { // UNIQUE KEY (`ckey`)
// 先检查表是否存在,如果不存在则直接返回
if !db.Migrator().HasTable(dst) {
return
}
if db.Migrator().HasIndex(dst, uniqueFiled) {
err := db.Migrator().DropIndex(dst, uniqueFiled) //mysql DROP INDEX
if err != nil {

View File

@@ -922,6 +922,11 @@ func (ncc *NotifyChannelConfig) ValidateFlashDutyRequestConfig() error {
}
func (ncc *NotifyChannelConfig) Update(ctx *ctx.Context, ref NotifyChannelConfig) error {
// ref.FE2DB()
if ncc.Ident != ref.Ident {
return errors.New("cannot update ident")
}
ref.ID = ncc.ID
ref.CreateAt = ncc.CreateAt
ref.CreateBy = ncc.CreateBy

View File

@@ -8,11 +8,11 @@ import (
type UserToken struct {
Id int64 `json:"id" gorm:"primaryKey"`
Username string `json:"username" gorm:"type:varchar(255); not null; default ''"`
TokenName string `json:"token_name" gorm:"type:varchar(255); not null; default ''"`
Token string `json:"token" gorm:"type:varchar(255); not null; default ''"`
CreateAt int64 `json:"create_at" gorm:"type:bigint; not null; default 0"`
LastUsed int64 `json:"last_used" gorm:"type:bigint; not null; default 0"`
Username string `json:"username" gorm:"type:varchar(255) not null default ''"`
TokenName string `json:"token_name" gorm:"type:varchar(255) not null default ''"`
Token string `json:"token" gorm:"type:varchar(255) not null default ''"`
CreateAt int64 `json:"create_at" gorm:"type:bigint not null default 0"`
LastUsed int64 `json:"last_used" gorm:"type:bigint not null default 0"`
}
func (UserToken) TableName() string {