mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
61 Commits
fix-exec-s
...
v8.3.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21478fcf3d | ||
|
|
a87c856299 | ||
|
|
ba035a446d | ||
|
|
bf840e6bb2 | ||
|
|
cd01092aed | ||
|
|
e202fd50c8 | ||
|
|
f0e5062485 | ||
|
|
861fe96de5 | ||
|
|
5b66ada96d | ||
|
|
d5a98debff | ||
|
|
4977052a67 | ||
|
|
dcc461e587 | ||
|
|
f5ce1733bb | ||
|
|
436cf25409 | ||
|
|
038f68b0b7 | ||
|
|
96ef1895b7 | ||
|
|
eeaa7b46f1 | ||
|
|
dc525352f1 | ||
|
|
98a3fe9375 | ||
|
|
74b0f802ec | ||
|
|
85bd3148d5 | ||
|
|
0931fa9603 | ||
|
|
65cdb2da9e | ||
|
|
9ad6514af6 | ||
|
|
302c6549e4 | ||
|
|
a3122270e6 | ||
|
|
1245c453bb | ||
|
|
9c5ccf0c8f | ||
|
|
cd468af250 | ||
|
|
2d3449c0ec | ||
|
|
e15bdbce92 | ||
|
|
3890243d42 | ||
|
|
37fb4ee867 | ||
|
|
6db63eafc1 | ||
|
|
1e9cbfc316 | ||
|
|
4f95554fe3 | ||
|
|
8eba9aa92f | ||
|
|
6ba74b8e21 | ||
|
|
8ea4632681 | ||
|
|
f958f27de1 | ||
|
|
1bdfa3e032 | ||
|
|
143880cd46 | ||
|
|
38f0b4f1bb | ||
|
|
2bccd5be99 | ||
|
|
7b328b3eaa | ||
|
|
8bd5b90e94 | ||
|
|
96629e284f | ||
|
|
67d2875690 | ||
|
|
238895a1f8 | ||
|
|
fb341b645d | ||
|
|
2d84fd8cf3 | ||
|
|
2611f87c41 | ||
|
|
a5b7aa7a26 | ||
|
|
0714a0f8f1 | ||
|
|
063cc750e1 | ||
|
|
b2a912d72f | ||
|
|
4ba745f442 | ||
|
|
fa7d46ecad | ||
|
|
a5a43df44f | ||
|
|
fbf1d68b84 | ||
|
|
ca712f62a4 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -58,6 +58,9 @@ _test
|
||||
.idea
|
||||
.index
|
||||
.vscode
|
||||
.issue
|
||||
.cursor
|
||||
.claude
|
||||
.DS_Store
|
||||
.cache-loader
|
||||
.payload
|
||||
|
||||
@@ -595,6 +595,10 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv
|
||||
return
|
||||
}
|
||||
|
||||
if !sub.MatchCate(event.Cate) {
|
||||
return
|
||||
}
|
||||
|
||||
if !common.MatchTags(event.TagsMap, sub.ITags) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
@@ -143,7 +144,11 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
|
||||
|
||||
// 合并自定义参数
|
||||
for k, v := range c.CustomParams {
|
||||
reqParams[k] = v
|
||||
converted, err := convertCustomParam(v)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to convert custom param %s: %v", k, err)
|
||||
}
|
||||
reqParams[k] = converted
|
||||
}
|
||||
|
||||
// 序列化请求体
|
||||
@@ -196,3 +201,44 @@ func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
|
||||
|
||||
return chatResp.Choices[0].Message.Content, nil
|
||||
}
|
||||
|
||||
// convertCustomParam 将前端传入的参数转换为正确的类型
|
||||
func convertCustomParam(value interface{}) (interface{}, error) {
|
||||
if value == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// 如果是字符串,尝试转换为其他类型
|
||||
if str, ok := value.(string); ok {
|
||||
// 尝试转换为数字
|
||||
if f, err := strconv.ParseFloat(str, 64); err == nil {
|
||||
// 检查是否为整数
|
||||
if f == float64(int64(f)) {
|
||||
return int64(f), nil
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
|
||||
// 尝试转换为布尔值
|
||||
if b, err := strconv.ParseBool(str); err == nil {
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// 尝试解析为JSON数组
|
||||
if strings.HasPrefix(strings.TrimSpace(str), "[") {
|
||||
var arr []interface{}
|
||||
if err := json.Unmarshal([]byte(str), &arr); err == nil {
|
||||
return arr, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 尝试解析为JSON对象
|
||||
if strings.HasPrefix(strings.TrimSpace(str), "{") {
|
||||
var obj map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(str), &obj); err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
@@ -67,3 +67,73 @@ func TestAISummaryConfig_Process(t *testing.T) {
|
||||
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
|
||||
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
|
||||
}
|
||||
|
||||
func TestConvertCustomParam(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input interface{}
|
||||
expected interface{}
|
||||
hasError bool
|
||||
}{
|
||||
{
|
||||
name: "nil value",
|
||||
input: nil,
|
||||
expected: nil,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string number to int64",
|
||||
input: "123",
|
||||
expected: int64(123),
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string float to float64",
|
||||
input: "123.45",
|
||||
expected: 123.45,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string boolean to bool",
|
||||
input: "true",
|
||||
expected: true,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "string false to bool",
|
||||
input: "false",
|
||||
expected: false,
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "JSON array string to slice",
|
||||
input: `["a", "b", "c"]`,
|
||||
expected: []interface{}{"a", "b", "c"},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "JSON object string to map",
|
||||
input: `{"key": "value", "num": 123}`,
|
||||
expected: map[string]interface{}{"key": "value", "num": float64(123)},
|
||||
hasError: false,
|
||||
},
|
||||
{
|
||||
name: "plain string remains string",
|
||||
input: "hello world",
|
||||
expected: "hello world",
|
||||
hasError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
converted, err := convertCustomParam(test.input)
|
||||
if test.hasError {
|
||||
assert.Error(t, err)
|
||||
return
|
||||
}
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, test.expected, converted)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
|
||||
func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
|
||||
conf := smtp
|
||||
if conf.Host == "" || conf.Port == 0 {
|
||||
logger.Warning("SMTP configurations invalid")
|
||||
logger.Debug("SMTP configurations invalid")
|
||||
<-mailQuit
|
||||
return
|
||||
}
|
||||
|
||||
@@ -187,7 +187,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
|
||||
CreatedBy: SYSTEM,
|
||||
UpdatedBy: SYSTEM,
|
||||
}
|
||||
BuiltinPayloadInFile.addBuiltinPayload(&builtinAlert)
|
||||
BuiltinPayloadInFile.AddBuiltinPayload(&builtinAlert)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -245,7 +245,7 @@ func Init(ctx *ctx.Context, builtinIntegrationsDir string) {
|
||||
CreatedBy: SYSTEM,
|
||||
UpdatedBy: SYSTEM,
|
||||
}
|
||||
BuiltinPayloadInFile.addBuiltinPayload(&builtinDashboard)
|
||||
BuiltinPayloadInFile.AddBuiltinPayload(&builtinDashboard)
|
||||
}
|
||||
} else if err != nil {
|
||||
logger.Warningf("read builtin component dash dir fail %s %v", component.Ident, err)
|
||||
@@ -314,7 +314,7 @@ func NewBuiltinPayloadInFileType() *BuiltinPayloadInFileType {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BuiltinPayloadInFileType) addBuiltinPayload(bp *models.BuiltinPayload) {
|
||||
func (b *BuiltinPayloadInFileType) AddBuiltinPayload(bp *models.BuiltinPayload) {
|
||||
if _, exists := b.Data[bp.ComponentID]; !exists {
|
||||
b.Data[bp.ComponentID] = make(map[string]map[string][]*models.BuiltinPayload)
|
||||
}
|
||||
@@ -390,9 +390,10 @@ func filterByQuery(payloads []*models.BuiltinPayload, query string) []*models.Bu
|
||||
return payloads
|
||||
}
|
||||
|
||||
queryLower := strings.ToLower(query)
|
||||
var filtered []*models.BuiltinPayload
|
||||
for _, p := range payloads {
|
||||
if strings.Contains(p.Name, query) || strings.Contains(p.Tags, query) {
|
||||
if strings.Contains(strings.ToLower(p.Name), queryLower) || strings.Contains(strings.ToLower(p.Tags), queryLower) {
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,6 +290,15 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
|
||||
models.DataSourceQueryAll,
|
||||
}
|
||||
}
|
||||
|
||||
// 将导入的规则统一转为新版本的通知规则配置
|
||||
lst[i].NotifyVersion = 1
|
||||
lst[i].NotifyChannelsJSON = []string{}
|
||||
lst[i].NotifyGroupsJSON = []string{}
|
||||
lst[i].NotifyChannels = ""
|
||||
lst[i].NotifyGroups = ""
|
||||
lst[i].Callbacks = ""
|
||||
lst[i].CallbacksJSON = []string{}
|
||||
}
|
||||
|
||||
bgid := ginx.UrlParamInt64(c, "id")
|
||||
@@ -308,19 +317,52 @@ func (rt *Router) alertRuleAddByImportPromRule(c *gin.Context) {
|
||||
var f promRuleForm
|
||||
ginx.Dangerous(c.BindJSON(&f))
|
||||
|
||||
// 首先尝试解析带 groups 的格式
|
||||
var pr struct {
|
||||
Groups []models.PromRuleGroup `yaml:"groups"`
|
||||
}
|
||||
err := yaml.Unmarshal([]byte(f.Payload), &pr)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "invalid yaml format, please use the example format. err: %v", err)
|
||||
|
||||
var groups []models.PromRuleGroup
|
||||
|
||||
if err != nil || len(pr.Groups) == 0 {
|
||||
// 如果解析失败或没有 groups,尝试解析规则数组格式
|
||||
var rules []models.PromRule
|
||||
err = yaml.Unmarshal([]byte(f.Payload), &rules)
|
||||
if err != nil {
|
||||
// 最后尝试解析单个规则格式
|
||||
var singleRule models.PromRule
|
||||
err = yaml.Unmarshal([]byte(f.Payload), &singleRule)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "invalid yaml format. err: %v", err)
|
||||
}
|
||||
|
||||
// 验证单个规则是否有效
|
||||
if singleRule.Alert == "" && singleRule.Record == "" {
|
||||
ginx.Bomb(http.StatusBadRequest, "input yaml is empty or invalid")
|
||||
}
|
||||
|
||||
rules = []models.PromRule{singleRule}
|
||||
}
|
||||
|
||||
// 验证规则数组是否为空
|
||||
if len(rules) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "input yaml contains no rules")
|
||||
}
|
||||
|
||||
// 将规则数组包装成 group
|
||||
groups = []models.PromRuleGroup{
|
||||
{
|
||||
Name: "imported_rules",
|
||||
Rules: rules,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
// 使用已解析的 groups
|
||||
groups = pr.Groups
|
||||
}
|
||||
|
||||
if len(pr.Groups) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "input yaml is empty")
|
||||
}
|
||||
|
||||
lst := models.DealPromGroup(pr.Groups, f.DatasourceQueries, f.Disabled)
|
||||
lst := models.DealPromGroup(groups, f.DatasourceQueries, f.Disabled)
|
||||
username := c.MustGet("username").(string)
|
||||
bgid := ginx.UrlParamInt64(c, "id")
|
||||
ginx.NewRender(c).Data(rt.alertRuleAdd(lst, username, bgid, c.GetHeader("X-Language")), nil)
|
||||
@@ -465,8 +507,8 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "fields empty")
|
||||
}
|
||||
|
||||
f.Fields["update_by"] = c.MustGet("username").(string)
|
||||
f.Fields["update_at"] = time.Now().Unix()
|
||||
updateBy := c.MustGet("username").(string)
|
||||
updateAt := time.Now().Unix()
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.AlertRuleGetById(rt.Ctx, f.Ids[i])
|
||||
@@ -483,7 +525,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(originRule)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"rule_config": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,7 +537,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -509,7 +549,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
b, err := json.Marshal(ar.AnnotationsJSON)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"annotations": string(b)}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,7 +558,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
callback := callbacks.(string)
|
||||
if !strings.Contains(ar.Callbacks, callback) {
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": ar.Callbacks + " " + callback}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -529,7 +567,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
if callbacks, has := f.Fields["callbacks"]; has {
|
||||
callback := callbacks.(string)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"callbacks": strings.ReplaceAll(ar.Callbacks, callback, "")}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,7 +576,6 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
bytes, err := json.Marshal(datasourceQueries)
|
||||
ginx.Dangerous(err)
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{"datasource_queries": bytes}))
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -555,6 +591,12 @@ func (rt *Router) alertRulePutFields(c *gin.Context) {
|
||||
ginx.Dangerous(ar.UpdateColumn(rt.Ctx, k, v))
|
||||
}
|
||||
}
|
||||
|
||||
// 统一更新更新时间和更新人,只有更新时间变了,告警规则才会被引擎拉取
|
||||
ginx.Dangerous(ar.UpdateFieldsMap(rt.Ctx, map[string]interface{}{
|
||||
"update_by": updateBy,
|
||||
"update_at": updateAt,
|
||||
}))
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Message(nil)
|
||||
|
||||
@@ -288,6 +288,7 @@ func (rt *Router) alertSubscribePut(c *gin.Context) {
|
||||
"busi_groups",
|
||||
"note",
|
||||
"notify_rule_ids",
|
||||
"notify_version",
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/i18n"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
@@ -102,7 +103,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
|
||||
if !req.ForceSave {
|
||||
if req.PluginType == models.PROMETHEUS || req.PluginType == models.LOKI || req.PluginType == models.TDENGINE {
|
||||
err = DatasourceCheck(req)
|
||||
err = DatasourceCheck(c, req)
|
||||
if err != nil {
|
||||
Dangerous(c, err)
|
||||
return
|
||||
@@ -173,7 +174,7 @@ func (rt *Router) datasourceUpsert(c *gin.Context) {
|
||||
Render(c, nil, err)
|
||||
}
|
||||
|
||||
func DatasourceCheck(ds models.Datasource) error {
|
||||
func DatasourceCheck(c *gin.Context, ds models.Datasource) error {
|
||||
if ds.PluginType == models.PROMETHEUS || ds.PluginType == models.LOKI || ds.PluginType == models.TDENGINE {
|
||||
if ds.HTTPJson.Url == "" {
|
||||
return fmt.Errorf("url is empty")
|
||||
@@ -232,6 +233,10 @@ func DatasourceCheck(ds models.Datasource) error {
|
||||
req, err = http.NewRequest("GET", fullURL, nil)
|
||||
if err != nil {
|
||||
logger.Errorf("Error creating request: %v", err)
|
||||
if !strings.Contains(ds.HTTPJson.Url, "/loki") {
|
||||
lang := c.GetHeader("X-Language")
|
||||
return fmt.Errorf(i18n.Sprintf(lang, "/loki suffix is miss, please add /loki to the url: %s", ds.HTTPJson.Url+"/loki"))
|
||||
}
|
||||
return fmt.Errorf("request url:%s failed: %v", fullURL, err)
|
||||
}
|
||||
}
|
||||
@@ -253,6 +258,10 @@ func DatasourceCheck(ds models.Datasource) error {
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
logger.Errorf("Error making request: %v\n", resp.StatusCode)
|
||||
if resp.StatusCode == 404 && ds.PluginType == models.LOKI && !strings.Contains(ds.HTTPJson.Url, "/loki") {
|
||||
lang := c.GetHeader("X-Language")
|
||||
return fmt.Errorf(i18n.Sprintf(lang, "/loki suffix is miss, please add /loki to the url: %s", ds.HTTPJson.Url+"/loki"))
|
||||
}
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("request url:%s failed code:%d body:%s", fullURL, resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
@@ -161,6 +161,9 @@ func (rt *Router) notifyTest(c *gin.Context) {
|
||||
}
|
||||
|
||||
resp, err := SendNotifyChannelMessage(rt.Ctx, rt.UserCache, rt.UserGroupCache, f.NotifyConfig, events)
|
||||
if resp == "" {
|
||||
resp = "success"
|
||||
}
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -149,6 +149,12 @@ func (rt *Router) recordingRulePutFields(c *gin.Context) {
|
||||
f.Fields["datasource_queries"] = string(bytes)
|
||||
}
|
||||
|
||||
if datasourceIds, ok := f.Fields["datasource_ids"]; ok {
|
||||
bytes, err := json.Marshal(datasourceIds)
|
||||
ginx.Dangerous(err)
|
||||
f.Fields["datasource_ids"] = string(bytes)
|
||||
}
|
||||
|
||||
for i := 0; i < len(f.Ids); i++ {
|
||||
ar, err := models.RecordingRuleGetById(rt.Ctx, f.Ids[i])
|
||||
ginx.Dangerous(err)
|
||||
|
||||
@@ -271,7 +271,10 @@ func MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, st
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
@@ -295,7 +298,10 @@ func MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, sta
|
||||
}
|
||||
|
||||
for i := 0; i < len(eventTags); i++ {
|
||||
eventTags[i] = strings.Replace(eventTags[i], "=", ":", 1)
|
||||
arr := strings.SplitN(eventTags[i], "=", 2)
|
||||
if len(arr) == 2 {
|
||||
eventTags[i] = fmt.Sprintf("%s:%s", arr[0], strconv.Quote(arr[1]))
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventTags) > 0 {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
@@ -27,11 +28,16 @@ type Doris struct {
|
||||
}
|
||||
|
||||
type QueryParam struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
Limit int `json:"limit" mapstructure:"limit"`
|
||||
From int64 `json:"from" mapstructure:"from"`
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
TimeField string `json:"time_field" mapstructure:"time_field"`
|
||||
TimeFormat string `json:"time_format" mapstructure:"time_format"`
|
||||
}
|
||||
|
||||
func (d *Doris) InitClient() error {
|
||||
@@ -66,7 +72,7 @@ func (d *Doris) Validate(ctx context.Context) error {
|
||||
func (d *Doris) Equal(p datasource.Datasource) bool {
|
||||
newest, ok := p.(*Doris)
|
||||
if !ok {
|
||||
logger.Errorf("unexpected plugin type, expected is ck")
|
||||
logger.Errorf("unexpected plugin type, expected is doris")
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -174,6 +180,14 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if strings.Contains(dorisQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
dorisQueryParam.SQL, err = macros.Macro(dorisQueryParam.SQL, dorisQueryParam.From, dorisQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryLogs(ctx, &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
@@ -187,7 +201,7 @@ func (d *Doris) QueryLog(ctx context.Context, query interface{}) ([]interface{},
|
||||
logs = append(logs, items[i])
|
||||
}
|
||||
|
||||
return logs, 0, nil
|
||||
return logs, int64(len(logs)), nil
|
||||
}
|
||||
|
||||
func (d *Doris) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
|
||||
|
||||
@@ -23,7 +23,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
|
||||
regx = `(?i)from\s+((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))\.((?:"[^"]+"|[a-zA-Z0-9_]+))`
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -162,6 +162,7 @@ func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models
|
||||
return nil, err
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -229,6 +230,7 @@ func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interfa
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
postgresqlQueryParam.SQL = formatSQLDatabaseNameWithRegex(postgresqlQueryParam.SQL)
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
@@ -280,7 +282,17 @@ func parseDBName(sql string) (db string, err error) {
|
||||
if len(matches) != 4 {
|
||||
return "", fmt.Errorf("no valid table name in format database.schema.table found")
|
||||
}
|
||||
return matches[1], nil
|
||||
return strings.Trim(matches[1], `"`), nil
|
||||
}
|
||||
|
||||
// formatSQLDatabaseNameWithRegex 只对 dbname.scheme.tabname 格式进行数据库名称格式化,转为 "dbname".scheme.tabname
|
||||
// 在pgsql中,大小写是通过"" 双引号括起来区分的,默认pg都是转为小写的,所以这里转为 "dbname".scheme."tabname"
|
||||
func formatSQLDatabaseNameWithRegex(sql string) string {
|
||||
// 匹配 from dbname.scheme.table_name 的模式
|
||||
// 使用捕获组来精确匹配数据库名称,确保后面跟着scheme和table
|
||||
re := regexp.MustCompile(`(?i)\bfrom\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\.\s*([a-zA-Z_][a-zA-Z0-9_]*)`)
|
||||
|
||||
return re.ReplaceAllString(sql, `from "$1"."$2"."$3"`)
|
||||
}
|
||||
|
||||
func extractColumns(sql string) ([]string, error) {
|
||||
|
||||
@@ -956,7 +956,7 @@ CREATE TABLE notify_rule (
|
||||
id bigserial PRIMARY KEY,
|
||||
name varchar(255) NOT NULL,
|
||||
description text,
|
||||
enable smallint NOT NULL DEFAULT 0,
|
||||
enable boolean DEFAULT false,
|
||||
user_group_ids varchar(255) NOT NULL DEFAULT '',
|
||||
notify_configs text,
|
||||
pipeline_configs text,
|
||||
@@ -971,7 +971,7 @@ CREATE TABLE notify_channel (
|
||||
name varchar(255) NOT NULL,
|
||||
ident varchar(255) NOT NULL,
|
||||
description text,
|
||||
enable smallint NOT NULL DEFAULT 0,
|
||||
enable boolean DEFAULT false,
|
||||
param_config text,
|
||||
request_type varchar(50) NOT NULL,
|
||||
request_config text,
|
||||
|
||||
@@ -20,8 +20,8 @@ import (
|
||||
|
||||
// Doris struct to hold connection details and the connection object
|
||||
type Doris struct {
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // be node
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe node
|
||||
Addr string `json:"doris.addr" mapstructure:"doris.addr"` // fe mysql endpoint
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
|
||||
User string `json:"doris.user" mapstructure:"doris.user"` //
|
||||
Password string `json:"doris.password" mapstructure:"doris.password"` //
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
|
||||
|
||||
@@ -115,14 +115,14 @@ func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error)
|
||||
}()
|
||||
|
||||
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
|
||||
|
||||
return sqlbase.NewDB(
|
||||
db, err = sqlbase.NewDB(
|
||||
ctx,
|
||||
mysql.Open(dsn),
|
||||
shard.MaxIdleConns,
|
||||
shard.MaxOpenConns,
|
||||
time.Duration(shard.ConnMaxLifetime)*time.Second,
|
||||
)
|
||||
return db, err
|
||||
}
|
||||
|
||||
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
|
||||
8
go.mod
8
go.mod
@@ -1,6 +1,6 @@
|
||||
module github.com/ccfos/nightingale/v6
|
||||
|
||||
go 1.22
|
||||
go 1.23.0
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v1.4.0
|
||||
@@ -13,7 +13,7 @@ require (
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/expr-lang/expr v1.16.1
|
||||
github.com/flashcatcloud/ibex v1.3.5
|
||||
github.com/flashcatcloud/ibex v1.3.6
|
||||
github.com/gin-contrib/pprof v1.4.0
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
@@ -47,7 +47,7 @@ require (
|
||||
github.com/tidwall/gjson v1.14.2
|
||||
github.com/toolkits/pkg v1.3.8
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
|
||||
golang.org/x/oauth2 v0.23.0
|
||||
golang.org/x/oauth2 v0.27.0
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
gorm.io/driver/clickhouse v0.6.1
|
||||
@@ -160,3 +160,5 @@ require (
|
||||
)
|
||||
|
||||
replace golang.org/x/exp v0.0.0-20231006140011-7918f672742d => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
|
||||
|
||||
// replace github.com/flashcatcloud/ibex => ../github.com/flashcatcloud/ibex
|
||||
|
||||
8
go.sum
8
go.sum
@@ -89,8 +89,8 @@ github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8
|
||||
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
|
||||
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
|
||||
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
|
||||
github.com/flashcatcloud/ibex v1.3.5 h1:8GOOf5+aJT0TP/MC6izz7CO5JKJSdKVFBwL0vQp93Nc=
|
||||
github.com/flashcatcloud/ibex v1.3.5/go.mod h1:T8hbMUySK2q6cXUaYp0AUVeKkU9Od2LjzwmB5lmTRBM=
|
||||
github.com/flashcatcloud/ibex v1.3.6 h1:lJShPFxcZksmkB0w99a3uROGB+Fie1NsqOlkAdar12A=
|
||||
github.com/flashcatcloud/ibex v1.3.6/go.mod h1:iTU1dKT9TnDNllRPRHUOjXe+HDTQkPH2TeaucHtSuh4=
|
||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
|
||||
@@ -416,8 +416,8 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
|
||||
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
|
||||
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
|
||||
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
|
||||
golang.org/x/oauth2 v0.27.0 h1:da9Vo7/tDv5RH/7nZDz1eMGS/q1Vv1N/7FCrBhI9I3M=
|
||||
golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
1873
integrations/Java/dashboards/jmx_by_kubernetes.json
Normal file
1873
integrations/Java/dashboards/jmx_by_kubernetes.json
Normal file
File diff suppressed because it is too large
Load Diff
2024
integrations/MinIO/dashboards/new-version.json
Normal file
2024
integrations/MinIO/dashboards/new-version.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -240,17 +240,17 @@ func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.
|
||||
go ncc.startNotifyConsumer(chID, queue, quitCh)
|
||||
}
|
||||
|
||||
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
|
||||
logger.Debugf("started %d notify consumers for channel %d", concurrency, chID)
|
||||
}
|
||||
|
||||
// 启动通知消费者协程
|
||||
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
|
||||
logger.Infof("starting notify consumer for channel %d", channelID)
|
||||
logger.Debugf("starting notify consumer for channel %d", channelID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-quitCh:
|
||||
logger.Infof("notify consumer for channel %d stopped", channelID)
|
||||
logger.Debugf("notify consumer for channel %d stopped", channelID)
|
||||
return
|
||||
default:
|
||||
// 从队列中取出任务
|
||||
@@ -448,7 +448,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
|
||||
logger.Warning("SMTP configurations invalid")
|
||||
return
|
||||
}
|
||||
logger.Infof("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
|
||||
logger.Debugf("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
|
||||
|
||||
d := gomail.NewDialer(conf.Host, conf.Port, conf.Username, conf.Password)
|
||||
if conf.InsecureSkipVerify {
|
||||
@@ -502,7 +502,11 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
|
||||
m.Mail.GetHeader("Subject"), m.Mail.GetHeader("To"))
|
||||
}
|
||||
|
||||
// sender.NotifyRecord(ncc.ctx, m.Events, m.NotifyRuleId, models.Email, strings.Join(m.Mail.GetHeader("To"), ","), "", err)
|
||||
// 记录通知详情
|
||||
if ncc.notifyRecordFunc != nil {
|
||||
target := strings.Join(m.Mail.GetHeader("To"), ",")
|
||||
ncc.notifyRecordFunc(ncc.ctx, m.Events, m.NotifyRuleId, "Email", target, "success", err)
|
||||
}
|
||||
size++
|
||||
|
||||
if size >= conf.Batch {
|
||||
|
||||
@@ -735,6 +735,15 @@ func (ar *AlertRule) UpdateColumn(ctx *ctx.Context, column string, value interfa
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
if column == "notify_groups" || column == "notify_channels" {
|
||||
updates := map[string]interface{}{
|
||||
column: value,
|
||||
"notify_version": 0,
|
||||
"notify_rule_ids": []int64{},
|
||||
}
|
||||
return DB(ctx).Model(ar).Updates(updates).Error
|
||||
}
|
||||
|
||||
return DB(ctx).Model(ar).UpdateColumn(column, value).Error
|
||||
}
|
||||
|
||||
@@ -892,7 +901,8 @@ func (ar *AlertRule) FE2DB() error {
|
||||
}
|
||||
ar.AlgoParams = string(algoParamsByte)
|
||||
|
||||
if ar.RuleConfigJson == nil {
|
||||
// 老的规则,是 PromQl 和 Severity 字段,新版的规则,使用 RuleConfig 字段
|
||||
if ar.RuleConfigJson == nil || len(ar.PromQl) > 0 {
|
||||
query := PromQuery{
|
||||
PromQl: ar.PromQl,
|
||||
Severity: ar.Severity,
|
||||
@@ -1008,11 +1018,8 @@ func AlertRuleExists(ctx *ctx.Context, id, groupId int64, name string) (bool, er
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if len(lst) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return false, nil
|
||||
return len(lst) > 0, nil
|
||||
}
|
||||
|
||||
func GetAlertRuleIdsByTaskId(ctx *ctx.Context, taskId int64) ([]int64, error) {
|
||||
|
||||
@@ -116,7 +116,18 @@ func (s *AlertSubscribe) Verify() error {
|
||||
return errors.New("severities is required")
|
||||
}
|
||||
|
||||
if len(s.NotifyRuleIds) > 0 {
|
||||
if s.NotifyVersion == 1 {
|
||||
if len(s.NotifyRuleIds) == 0 {
|
||||
return errors.New("no notify rules selected")
|
||||
}
|
||||
|
||||
s.UserGroupIds = ""
|
||||
s.RedefineChannels = 0
|
||||
s.NewChannels = ""
|
||||
s.RedefineWebhooks = 0
|
||||
s.Webhooks = ""
|
||||
s.RedefineSeverity = 0
|
||||
s.NewSeverity = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -132,8 +143,8 @@ func (s *AlertSubscribe) Verify() error {
|
||||
}
|
||||
}
|
||||
|
||||
if s.NotifyVersion == 1 && len(s.NotifyRuleIds) == 0 {
|
||||
return errors.New("no notify rules selected")
|
||||
if s.NotifyVersion == 0 {
|
||||
s.NotifyRuleIds = []int64{}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -381,6 +392,17 @@ func (s *AlertSubscribe) MatchProd(prod string) bool {
|
||||
return s.Prod == prod
|
||||
}
|
||||
|
||||
func (s *AlertSubscribe) MatchCate(cate string) bool {
|
||||
if s.Cate == "" {
|
||||
return true
|
||||
}
|
||||
|
||||
if s.Cate == "host" {
|
||||
return cate == "host"
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *AlertSubscribe) MatchCluster(dsId int64) bool {
|
||||
// 没有配置数据源, 或者事件不需要关联数据源
|
||||
// do not match any datasource or event not related to datasource
|
||||
|
||||
@@ -249,12 +249,7 @@ var NewTplMap = map[string]string{
|
||||
{{- end}}
|
||||
{{end}}
|
||||
{{$domain := "http://127.0.0.1:17000" }}
|
||||
{{$mutelink := print $domain "/alert-mutes/add?busiGroup=" $event.GroupId "&cate=" $event.Cate "&datasource_ids=" $event.DatasourceId "&prod=" $event.RuleProd}}
|
||||
{{- range $key, $value := $event.TagsMap}}
|
||||
{{- $encodedValue := $value | urlquery }}
|
||||
{{- $mutelink = print $mutelink "&tags=" $key "%3D" $encodedValue}}
|
||||
{{- end}}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}}) | [屏蔽1小时]({{$mutelink}}) | [查看曲线]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|urlquery}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}}) | [屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}} | [查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
|
||||
Email: `<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
@@ -483,8 +478,8 @@ var NewTplMap = map[string]string{
|
||||
{{if $event.IsRecovered}}恢复时间:{{timeformat $event.LastEvalTime}}{{else}}触发时间: {{timeformat $event.TriggerTime}}
|
||||
触发时值: {{$event.TriggerValue}}{{end}}
|
||||
发送时间: {{timestamp}}{{$domain := "http://127.0.0.1:17000" }}
|
||||
事件详情: {{$domain}}/alert-his-events/{{$event.Id}}{{$muteUrl := print $domain "/alert-mutes/add?busiGroup=" $event.GroupId "&cate=" $event.Cate "&datasource_ids=" $event.DatasourceId "&prod=" $event.RuleProd}}{{range $key, $value := $event.TagsMap}}{{$muteUrl = print $muteUrl "&tags=" $key "%3D" $value}}{{end}}
|
||||
屏蔽1小时: {{ unescaped $muteUrl }}`,
|
||||
事件详情: {{$domain}}/alert-his-events/{{$event.Id}}
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?__event_id={{$event.Id}}`,
|
||||
FeishuCard: `{{- if $event.IsRecovered -}}
|
||||
{{- if ne $event.Cate "host" -}}
|
||||
**告警集群:** {{$event.Cluster}}{{end}}
|
||||
@@ -511,7 +506,7 @@ var NewTplMap = map[string]string{
|
||||
{{- end}}
|
||||
{{- end}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
|
||||
EmailSubject: `{{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}: {{$event.RuleName}} {{$event.TagsJSON}}`,
|
||||
Mm: `级别状态: S{{$event.Severity}} {{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}
|
||||
规则名称: {{$event.RuleName}}{{if $event.RuleNote}}
|
||||
@@ -540,7 +535,7 @@ var NewTplMap = map[string]string{
|
||||
{{$time_duration := sub now.Unix $event.FirstTriggerTime }}{{if $event.IsRecovered}}{{$time_duration = sub $event.LastEvalTime $event.FirstTriggerTime }}{{end}}**距离首次告警**: {{humanizeDurationInterface $time_duration}}
|
||||
**发送时间**: {{timestamp}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
|
||||
Lark: `级别状态: S{{$event.Severity}} {{if $event.IsRecovered}}Recovered{{else}}Triggered{{end}}
|
||||
规则名称: {{$event.RuleName}}{{if $event.RuleNote}}
|
||||
规则备注: {{$event.RuleNote}}{{end}}
|
||||
@@ -550,7 +545,7 @@ var NewTplMap = map[string]string{
|
||||
发送时间: {{timestamp}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
事件详情: {{$domain}}/alert-his-events/{{$event.Id}}
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}}`,
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?__event_id={{$event.Id}}`,
|
||||
LarkCard: `{{ if $event.IsRecovered }}
|
||||
{{- if ne $event.Cate "host"}}
|
||||
**告警集群:** {{$event.Cluster}}{{end}}
|
||||
@@ -573,7 +568,7 @@ var NewTplMap = map[string]string{
|
||||
{{if $event.RuleNote }}**告警描述:** **{{$event.RuleNote}}**{{end}}
|
||||
{{- end -}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{$event.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}){{if eq $event.Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}){{end}}`,
|
||||
SlackWebhook: `{{ if $event.IsRecovered }}
|
||||
{{- if ne $event.Cate "host"}}
|
||||
*Alarm cluster:* {{$event.Cluster}}{{end}}
|
||||
@@ -600,8 +595,8 @@ var NewTplMap = map[string]string{
|
||||
|
||||
{{$domain := "http://127.0.0.1:17000" }}
|
||||
<{{$domain}}/alert-his-events/{{$event.Id}}|Event Details>
|
||||
<{{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}}|Block for 1 hour>
|
||||
<{{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|escape}}|View Curve>`,
|
||||
<{{$domain}}/alert-mutes/add?__event_id={{$event.Id}}|Block for 1 hour>
|
||||
<{{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}}|View Curve>`,
|
||||
Discord: `**Level Status**: {{if $event.IsRecovered}}S{{$event.Severity}} Recovered{{else}}S{{$event.Severity}} Triggered{{end}}
|
||||
**Rule Title**: {{$event.RuleName}}{{if $event.RuleNote}}
|
||||
**Rule Note**: {{$event.RuleNote}}{{end}}{{if $event.TargetIdent}}
|
||||
@@ -613,12 +608,7 @@ var NewTplMap = map[string]string{
|
||||
**Send Time**: {{timestamp}}
|
||||
|
||||
{{$domain := "http://127.0.0.1:17000" }}
|
||||
{{$mutelink := print $domain "/alert-mutes/add?busiGroup=" $event.GroupId "&cate=" $event.Cate "&datasource_ids=" $event.DatasourceId "&prod=" $event.RuleProd}}
|
||||
{{- range $key, $value := $event.TagsMap}}
|
||||
{{- $encodedValue := $value | urlquery }}
|
||||
{{- $mutelink = print $mutelink "&tags=" $key "%3D" $encodedValue}}
|
||||
{{- end}}
|
||||
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}}) | [Silence 1h]({{$mutelink}}) | [View Graph]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|urlquery}})`,
|
||||
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}}) | [Silence 1h]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}}) | [View Graph]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}})`,
|
||||
|
||||
MattermostWebhook: `{{ if $event.IsRecovered }}
|
||||
{{- if ne $event.Cate "host"}}
|
||||
@@ -640,7 +630,7 @@ var NewTplMap = map[string]string{
|
||||
{{if $event.RuleNote }}**Alarm description:** **{{$event.RuleNote}}**{{end}}
|
||||
{{- end -}}
|
||||
{{$domain := "http://127.0.0.1:17000" }}
|
||||
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}})|[Block for 1 hour]({{$domain}}/alert-mutes/add?busiGroup={{$event.GroupId}}&cate={{$event.Cate}}&datasource_ids={{$event.DatasourceId}}&prod={{$event.RuleProd}}{{range $key, $value := $event.TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[View Curve]({{$domain}}/metric/explorer?data_source_id={{$event.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{$event.PromQl|escape}})`,
|
||||
[Event Details]({{$domain}}/alert-his-events/{{$event.Id}})|[Block for 1 hour]({{$domain}}/alert-mutes/add?__event_id={{$event.Id}})|[View Curve]({{$domain}}/metric/explorer?__event_id={{$event.Id}}&mode=graph}})`,
|
||||
FeishuApp: `{{- if $event.IsRecovered -}}
|
||||
{{- if ne $event.Cate "host" -}}
|
||||
**告警集群:** {{$event.Cluster}}{{end}}
|
||||
|
||||
@@ -72,8 +72,10 @@ func MigrateTables(db *gorm.DB) error {
|
||||
|
||||
if isPostgres(db) {
|
||||
dts = append(dts, &models.PostgresBuiltinComponent{})
|
||||
DropUniqueFiledLimit(db, &models.PostgresBuiltinComponent{}, "idx_ident", "idx_ident")
|
||||
} else {
|
||||
dts = append(dts, &models.BuiltinComponent{})
|
||||
DropUniqueFiledLimit(db, &models.BuiltinComponent{}, "idx_ident", "idx_ident")
|
||||
}
|
||||
|
||||
if !db.Migrator().HasColumn(&imodels.TaskSchedulerHealth{}, "scheduler") {
|
||||
@@ -124,11 +126,17 @@ func MigrateTables(db *gorm.DB) error {
|
||||
DropUniqueFiledLimit(db, &Configs{}, "ckey", "configs_ckey_key")
|
||||
// 删除 builtin_metrics 表的 idx_collector_typ_name 唯一索引
|
||||
DropUniqueFiledLimit(db, &models.BuiltinMetric{}, "idx_collector_typ_name", "idx_collector_typ_name")
|
||||
|
||||
InsertPermPoints(db)
|
||||
return nil
|
||||
}
|
||||
|
||||
func DropUniqueFiledLimit(db *gorm.DB, dst interface{}, uniqueFiled string, pgUniqueFiled string) { // UNIQUE KEY (`ckey`)
|
||||
// 先检查表是否存在,如果不存在则直接返回
|
||||
if !db.Migrator().HasTable(dst) {
|
||||
return
|
||||
}
|
||||
|
||||
if db.Migrator().HasIndex(dst, uniqueFiled) {
|
||||
err := db.Migrator().DropIndex(dst, uniqueFiled) //mysql DROP INDEX
|
||||
if err != nil {
|
||||
|
||||
@@ -2,7 +2,6 @@ package models
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
@@ -21,10 +20,10 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/cmdx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
@@ -33,7 +32,6 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/file"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/sys"
|
||||
"gopkg.in/gomail.v2"
|
||||
)
|
||||
|
||||
@@ -95,6 +93,9 @@ type UserInfo struct {
|
||||
type FlashDutyRequestConfig struct {
|
||||
Proxy string `json:"proxy"`
|
||||
IntegrationUrl string `json:"integration_url"`
|
||||
Timeout int `json:"timeout"` // 超时时间(毫秒)
|
||||
RetryTimes int `json:"retry_times"` // 重试次数
|
||||
RetrySleep int `json:"retry_sleep"` // 重试等待时间(毫秒)
|
||||
}
|
||||
|
||||
// ParamItem 自定义参数项
|
||||
@@ -196,10 +197,8 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
|
||||
cmd.Stdout = &buf
|
||||
cmd.Stderr = &buf
|
||||
|
||||
err := startCmd(cmd)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to start script: %v", err)
|
||||
}
|
||||
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
|
||||
|
||||
res := buf.String()
|
||||
|
||||
@@ -218,8 +217,6 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
|
||||
res = res[:validLen] + "..."
|
||||
}
|
||||
|
||||
err, isTimeout := sys.WrapTimeout(cmd, time.Duration(config.Timeout)*time.Second)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
|
||||
if isTimeout {
|
||||
if err == nil {
|
||||
return cmd.String(), res, errors.New("timeout and killed process")
|
||||
@@ -257,11 +254,6 @@ func getStdinBytes(events []*AlertCurEvent, tpl map[string]interface{}, params m
|
||||
return jsonBytes
|
||||
}
|
||||
|
||||
func startCmd(c *exec.Cmd) error {
|
||||
c.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
return c.Start()
|
||||
}
|
||||
|
||||
func NotifyChannelStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=notify_channel")
|
||||
@@ -325,9 +317,20 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
|
||||
}
|
||||
|
||||
httpConfig := nc.RequestConfig.HTTPRequestConfig
|
||||
if httpConfig.Timeout == 0 {
|
||||
httpConfig.Timeout = 10000
|
||||
|
||||
// 对于 FlashDuty 类型,优先使用 FlashDuty 配置中的超时时间
|
||||
timeout := httpConfig.Timeout
|
||||
if nc.RequestType == "flashduty" && nc.RequestConfig.FlashDutyRequestConfig != nil {
|
||||
flashDutyTimeout := nc.RequestConfig.FlashDutyRequestConfig.Timeout
|
||||
if flashDutyTimeout > 0 {
|
||||
timeout = flashDutyTimeout
|
||||
}
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
timeout = 10000 // HTTP 默认 10 秒
|
||||
}
|
||||
|
||||
if httpConfig.Concurrency == 0 {
|
||||
httpConfig.Concurrency = 5
|
||||
}
|
||||
@@ -357,18 +360,78 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
|
||||
Proxy: proxyFunc,
|
||||
TLSClientConfig: tlsConfig,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
|
||||
Timeout: time.Duration(timeout) * time.Millisecond,
|
||||
}).DialContext,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
|
||||
Timeout: time.Duration(timeout) * time.Millisecond,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) makeHTTPRequest(httpConfig *HTTPRequestConfig, url string, headers map[string]string, parameters map[string]string, body []byte) (*http.Request, error) {
|
||||
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query := req.URL.Query()
|
||||
// 设置请求头 腾讯云短信、语音特殊处理
|
||||
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
|
||||
headers = ncc.setTxHeader(headers, body)
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
|
||||
req, err = http.NewRequest(httpConfig.Method, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
|
||||
for key, value := range headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
} else {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
|
||||
for key, value := range parameters {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
req.URL.RawQuery = query.Encode()
|
||||
// 记录完整的请求信息
|
||||
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) makeFlashDutyRequest(url string, bodyBytes []byte, flashDutyChannelID int64) (*http.Request, error) {
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 设置 URL 参数
|
||||
query := req.URL.Query()
|
||||
if flashDutyChannelID != 0 {
|
||||
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDutyChannelID int64, client *http.Client) (string, error) {
|
||||
// todo 每一个 channel 批量发送事件
|
||||
if client == nil {
|
||||
@@ -380,46 +443,57 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
|
||||
return "", err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v, event: %v", err, events)
|
||||
return "", err
|
||||
url := ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl
|
||||
|
||||
retrySleep := time.Second
|
||||
if ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep > 0 {
|
||||
retrySleep = time.Duration(ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep) * time.Millisecond
|
||||
}
|
||||
|
||||
// 设置 URL 参数
|
||||
query := req.URL.Query()
|
||||
if flashDutyChannelID != 0 {
|
||||
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
retryTimes := 3
|
||||
if ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes > 0 {
|
||||
retryTimes = ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
// 重试机制
|
||||
for i := 0; i <= 3; i++ {
|
||||
logger.Infof("send flashduty req:%+v body:%+v", req, string(body))
|
||||
// 把最后一次错误保存下来,后面返回,让用户在页面上也可以看到
|
||||
var lastErrorMessage string
|
||||
for i := 0; i <= retryTimes; i++ {
|
||||
req, err := ncc.makeFlashDutyRequest(url, body, flashDutyChannelID)
|
||||
if err != nil {
|
||||
logger.Errorf("send_flashduty: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
return fmt.Sprintf("failed to create request. error: %v", err), err
|
||||
}
|
||||
|
||||
// 直接使用客户端发送请求,超时时间已经在 client 中设置
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("send flashduty req:%+v err:%v", req, err)
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
logger.Errorf("send_flashduty: http_call=fail url=%s request_body=%s error=%v times=%d", url, string(body), err, i+1)
|
||||
if i < retryTimes {
|
||||
// 重试等待时间,后面要放到页面上配置
|
||||
time.Sleep(retrySleep)
|
||||
}
|
||||
lastErrorMessage = err.Error()
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 读取响应
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response: %v, event: %v", err, events)
|
||||
// 走到这里,说明请求 Flashduty 成功,不管 Flashduty 返回了什么结果,都不判断,仅保存,给用户查看即可
|
||||
// 比如服务端返回 5xx,也不要重试,重试可能会导致服务端数据有问题。告警事件这样的东西,没有那么关键,只要最终能在 UI 上看到调用结果就行
|
||||
var resBody []byte
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
|
||||
resBody, err = io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("send_flashduty: failed to read response. request_body=%s, error=%v", string(body), err)
|
||||
resBody = []byte("failed to read response. error: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("send flashduty req:%+v resp:%+v body:%+v err:%v", req, resp, string(body), err)
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return string(body), nil
|
||||
}
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
logger.Infof("send_flashduty: http_call=succ url=%s request_body=%s response_code=%d response_body=%s times=%d", url, string(body), resp.StatusCode, string(resBody), i+1)
|
||||
return fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(resBody)), nil
|
||||
}
|
||||
|
||||
return "", errors.New("failed to send request")
|
||||
return lastErrorMessage, errors.New("failed to send request")
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string]interface{}, params map[string]string, sendtos []string, client *http.Client) (string, error) {
|
||||
@@ -457,54 +531,21 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
url, headers, parameters := ncc.replaceVariables(fullTpl)
|
||||
logger.Infof("url: %v, headers: %v, parameters: %v", url, headers, parameters)
|
||||
|
||||
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v, event: %v", err, events)
|
||||
return "", err
|
||||
}
|
||||
|
||||
query := req.URL.Query()
|
||||
// 设置请求头 腾讯云短信、语音特殊处理
|
||||
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
|
||||
headers = ncc.setTxHeader(headers, body)
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
|
||||
req, err = http.NewRequest(httpConfig.Method, url, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
|
||||
for key, value := range headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
} else {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
|
||||
for key, value := range parameters {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
req.URL.RawQuery = query.Encode()
|
||||
// 记录完整的请求信息
|
||||
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
|
||||
|
||||
// 重试机制
|
||||
for i := 0; i <= httpConfig.RetryTimes; i++ {
|
||||
var lastErrorMessage string
|
||||
for i := 0; i < httpConfig.RetryTimes; i++ {
|
||||
var resp *http.Response
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(httpConfig.Timeout)*time.Millisecond)
|
||||
resp, err = client.Do(req.WithContext(ctx))
|
||||
cancel() // 确保释放资源
|
||||
req, err := ncc.makeHTTPRequest(httpConfig, url, headers, parameters, body)
|
||||
if err != nil {
|
||||
logger.Errorf("send_http: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
return fmt.Sprintf("failed to create request. error: %v", err), err
|
||||
}
|
||||
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("send_http: failed to send http notify. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
lastErrorMessage = err.Error()
|
||||
time.Sleep(time.Duration(httpConfig.RetryInterval) * time.Second)
|
||||
logger.Errorf("send http request failed to send http notify: %v", err)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -512,11 +553,9 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
// 读取响应
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
logger.Debugf("send http request: %+v, response: %+v, body: %+v", req, resp, string(body))
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send http notify: %v", err)
|
||||
logger.Errorf("send_http: failed to read response. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return string(body), nil
|
||||
}
|
||||
@@ -524,8 +563,7 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
return "", fmt.Errorf("failed to send request, status code: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return "", err
|
||||
|
||||
return lastErrorMessage, errors.New("all retries failed, last error: " + lastErrorMessage)
|
||||
}
|
||||
|
||||
// getAliQuery 获取阿里云API的查询参数和请求头
|
||||
@@ -932,11 +970,6 @@ func (ncc *NotifyChannelConfig) ValidateFlashDutyRequestConfig() error {
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) Update(ctx *ctx.Context, ref NotifyChannelConfig) error {
|
||||
// ref.FE2DB()
|
||||
if ncc.Ident != ref.Ident {
|
||||
return errors.New("cannot update ident")
|
||||
}
|
||||
|
||||
ref.ID = ncc.ID
|
||||
ref.CreateAt = ncc.CreateAt
|
||||
ref.CreateBy = ncc.CreateBy
|
||||
@@ -1436,6 +1469,8 @@ var NotiChMap = []*NotifyChannelConfig{
|
||||
},
|
||||
FlashDutyRequestConfig: &FlashDutyRequestConfig{
|
||||
IntegrationUrl: "flashduty integration url",
|
||||
Timeout: 5000, // 默认5秒超时
|
||||
RetryTimes: 3, // 默认重试3次
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -232,7 +232,7 @@ var TplMap = map[string]string{
|
||||
{{- end}}
|
||||
{{- end}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{.Id}}){{if eq .Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{.Id}}&mode=graph}}){{end}}`,
|
||||
Email: `<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
@@ -459,7 +459,7 @@ var TplMap = map[string]string{
|
||||
发送时间: {{timestamp}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
事件详情: {{$domain}}/alert-his-events/{{.Id}}
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}}`,
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?__event_id={{.Id}}`,
|
||||
FeishuCard: `{{ if .IsRecovered }}
|
||||
{{- if ne .Cate "host"}}
|
||||
**告警集群:** {{.Cluster}}{{end}}
|
||||
@@ -478,7 +478,7 @@ var TplMap = map[string]string{
|
||||
{{if .RuleNote }}**告警描述:** **{{.RuleNote}}**{{end}}
|
||||
{{- end -}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{.Id}}){{if eq .Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{.Id}}&mode=graph}}){{end}}`,
|
||||
EmailSubject: `{{if .IsRecovered}}Recovered{{else}}Triggered{{end}}: {{.RuleName}} {{.TagsJSON}}`,
|
||||
Mm: `级别状态: S{{.Severity}} {{if .IsRecovered}}Recovered{{else}}Triggered{{end}}
|
||||
规则名称: {{.RuleName}}{{if .RuleNote}}
|
||||
@@ -506,7 +506,7 @@ var TplMap = map[string]string{
|
||||
{{$time_duration := sub now.Unix .FirstTriggerTime }}{{if .IsRecovered}}{{$time_duration = sub .LastEvalTime .FirstTriggerTime }}{{end}}**距离首次告警**: {{humanizeDurationInterface $time_duration}}
|
||||
**发送时间**: {{timestamp}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{.Id}}){{if eq .Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{.Id}}&mode=graph}}){{end}}`,
|
||||
Lark: `级别状态: S{{.Severity}} {{if .IsRecovered}}Recovered{{else}}Triggered{{end}}
|
||||
规则名称: {{.RuleName}}{{if .RuleNote}}
|
||||
规则备注: {{.RuleNote}}{{end}}
|
||||
@@ -516,7 +516,7 @@ var TplMap = map[string]string{
|
||||
发送时间: {{timestamp}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
事件详情: {{$domain}}/alert-his-events/{{.Id}}
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}}`,
|
||||
屏蔽1小时: {{$domain}}/alert-mutes/add?__event_id={{.Id}}`,
|
||||
LarkCard: `{{ if .IsRecovered }}
|
||||
{{- if ne .Cate "host"}}
|
||||
**告警集群:** {{.Cluster}}{{end}}
|
||||
@@ -537,5 +537,5 @@ var TplMap = map[string]string{
|
||||
{{if .RuleNote }}**告警描述:** **{{.RuleNote}}**{{end}}
|
||||
{{- end -}}
|
||||
{{$domain := "http://请联系管理员修改通知模板将域名替换为实际的域名" }}
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?busiGroup={{.GroupId}}&cate={{.Cate}}&datasource_ids={{.DatasourceId}}&prod={{.RuleProd}}{{range $key, $value := .TagsMap}}&tags={{$key}}%3D{{$value}}{{end}})|[查看曲线]({{$domain}}/metric/explorer?data_source_id={{.DatasourceId}}&data_source_name=prometheus&mode=graph&prom_ql={{.PromQl|escape}})`,
|
||||
[事件详情]({{$domain}}/alert-his-events/{{.Id}})|[屏蔽1小时]({{$domain}}/alert-mutes/add?__event_id={{.Id}}){{if eq .Cate "prometheus"}}|[查看曲线]({{$domain}}/metric/explorer?__event_id={{.Id}}&mode=graph}}){{end}}`,
|
||||
}
|
||||
|
||||
@@ -27,8 +27,13 @@ func convertInterval(interval string) int {
|
||||
duration, err := time.ParseDuration(interval)
|
||||
if err != nil {
|
||||
logger.Errorf("Error parsing interval `%s`, err: %v", interval, err)
|
||||
return 0
|
||||
return 60
|
||||
}
|
||||
|
||||
if duration.Seconds() == 0 {
|
||||
duration = 60 * time.Second
|
||||
}
|
||||
|
||||
return int(duration.Seconds())
|
||||
}
|
||||
|
||||
@@ -57,17 +62,12 @@ func ConvertAlert(rule PromRule, interval string, datasouceQueries []DatasourceQ
|
||||
}
|
||||
|
||||
ar := AlertRule{
|
||||
Name: rule.Alert,
|
||||
Severity: severity,
|
||||
Disabled: disabled,
|
||||
PromForDuration: convertInterval(rule.For),
|
||||
PromQl: rule.Expr,
|
||||
PromEvalInterval: convertInterval(interval),
|
||||
EnableStimeJSON: "00:00",
|
||||
EnableEtimeJSON: "23:59",
|
||||
EnableDaysOfWeekJSON: []string{
|
||||
"1", "2", "3", "4", "5", "6", "0",
|
||||
},
|
||||
Name: rule.Alert,
|
||||
Severity: severity,
|
||||
Disabled: disabled,
|
||||
PromForDuration: convertInterval(rule.For),
|
||||
PromQl: rule.Expr,
|
||||
CronPattern: fmt.Sprintf("@every %ds", convertInterval(interval)),
|
||||
EnableInBG: AlertRuleEnableInGlobalBG,
|
||||
NotifyRecovered: AlertRuleNotifyRecovered,
|
||||
NotifyRepeatStep: AlertRuleNotifyRepeatStep60Min,
|
||||
@@ -75,6 +75,8 @@ func ConvertAlert(rule PromRule, interval string, datasouceQueries []DatasourceQ
|
||||
AnnotationsJSON: annotations,
|
||||
AppendTagsJSON: appendTags,
|
||||
DatasourceQueries: datasouceQueries,
|
||||
NotifyVersion: 1,
|
||||
NotifyRuleIds: []int64{},
|
||||
}
|
||||
|
||||
return ar
|
||||
@@ -86,7 +88,7 @@ func DealPromGroup(promRule []PromRuleGroup, dataSourceQueries []DatasourceQuery
|
||||
for _, group := range promRule {
|
||||
interval := group.Interval
|
||||
if interval == "" {
|
||||
interval = "15s"
|
||||
interval = "60s"
|
||||
}
|
||||
for _, rule := range group.Rules {
|
||||
if rule.Alert != "" {
|
||||
|
||||
@@ -124,7 +124,7 @@ func TargetStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
func TargetDel(ctx *ctx.Context, idents []string, deleteHook TargetDeleteHookFunc) error {
|
||||
if len(idents) == 0 {
|
||||
panic("idents empty")
|
||||
return errors.New("idents cannot be empty")
|
||||
}
|
||||
|
||||
return DB(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
|
||||
@@ -8,11 +8,11 @@ import (
|
||||
|
||||
type UserToken struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
Username string `json:"username" gorm:"type:varchar(255) not null default ''"`
|
||||
TokenName string `json:"token_name" gorm:"type:varchar(255) not null default ''"`
|
||||
Token string `json:"token" gorm:"type:varchar(255) not null default ''"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint not null default 0"`
|
||||
LastUsed int64 `json:"last_used" gorm:"type:bigint not null default 0"`
|
||||
Username string `json:"username" gorm:"type:varchar(255); not null; default ''"`
|
||||
TokenName string `json:"token_name" gorm:"type:varchar(255); not null; default ''"`
|
||||
Token string `json:"token" gorm:"type:varchar(255); not null; default ''"`
|
||||
CreateAt int64 `json:"create_at" gorm:"type:bigint; not null; default 0"`
|
||||
LastUsed int64 `json:"last_used" gorm:"type:bigint; not null; default 0"`
|
||||
}
|
||||
|
||||
func (UserToken) TableName() string {
|
||||
|
||||
37
pkg/cmdx/cmd_notwindows.go
Normal file
37
pkg/cmdx/cmd_notwindows.go
Normal file
@@ -0,0 +1,37 @@
|
||||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package cmdx
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func CmdWait(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
|
||||
var err error
|
||||
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
done <- cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
go func() {
|
||||
<-done // allow goroutine to exit
|
||||
}()
|
||||
|
||||
// IMPORTANT: cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} is necessary before cmd.Start()
|
||||
err = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
||||
return err, true
|
||||
case err = <-done:
|
||||
return err, false
|
||||
}
|
||||
}
|
||||
|
||||
func CmdStart(cmd *exec.Cmd) error {
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
return cmd.Start()
|
||||
}
|
||||
35
pkg/cmdx/cmd_windows.go
Normal file
35
pkg/cmdx/cmd_windows.go
Normal file
@@ -0,0 +1,35 @@
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package cmdx
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func CmdWait(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
|
||||
var err error
|
||||
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
done <- cmd.Wait()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
go func() {
|
||||
<-done // allow goroutine to exit
|
||||
}()
|
||||
|
||||
err = cmd.Process.Signal(syscall.SIGKILL)
|
||||
return err, true
|
||||
case err = <-done:
|
||||
return err, false
|
||||
}
|
||||
}
|
||||
|
||||
func CmdStart(cmd *exec.Cmd) error {
|
||||
return cmd.Start()
|
||||
}
|
||||
15
pkg/cmdx/cmdx.go
Normal file
15
pkg/cmdx/cmdx.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package cmdx
|
||||
|
||||
import (
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RunTimeout(cmd *exec.Cmd, timeout time.Duration) (error, bool) {
|
||||
err := CmdStart(cmd)
|
||||
if err != nil {
|
||||
return err, false
|
||||
}
|
||||
|
||||
return CmdWait(cmd, timeout)
|
||||
}
|
||||
@@ -89,7 +89,7 @@ func diffMap(m1, m2 map[int64]*models.User) []models.User {
|
||||
func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
for i := range m1 {
|
||||
if _, ok := m2[i]; ok {
|
||||
if m1[i].Email != m2[i].Email || m1[i].Phone != m2[i].Phone || m1[i].Username != m2[i].Username {
|
||||
if m1[i].Email != m2[i].Email || !PhoneIsSame(m1[i].Phone, m2[i].Phone) || m1[i].Username != m2[i].Username {
|
||||
var flashdutyUser User
|
||||
|
||||
flashdutyUser = User{
|
||||
@@ -110,6 +110,30 @@ func updateUser(appKey string, m1, m2 map[int64]*models.User) {
|
||||
}
|
||||
}
|
||||
|
||||
func PhoneIsSame(phone1, phone2 string) bool {
|
||||
// 兼容不同国家/地区前缀,例如 +86、+1、+44 等,以及包含空格或短横线的格式
|
||||
normalize := func(p string) string {
|
||||
p = strings.TrimSpace(p)
|
||||
p = strings.ReplaceAll(p, " ", "")
|
||||
p = strings.ReplaceAll(p, "-", "")
|
||||
p = strings.TrimPrefix(p, "+")
|
||||
return p
|
||||
}
|
||||
|
||||
p1 := normalize(phone1)
|
||||
p2 := normalize(phone2)
|
||||
|
||||
if p1 == p2 {
|
||||
return true
|
||||
}
|
||||
|
||||
// 如果长度相差不超过 3 且较长的以较短的结尾,则认为是相同号码(忽略最多 3 位国家区号差异)
|
||||
if len(p1) > len(p2) {
|
||||
return len(p1)-len(p2) <= 3 && strings.HasSuffix(p1, p2)
|
||||
}
|
||||
return len(p2)-len(p1) <= 3 && strings.HasSuffix(p2, p1)
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Email string `json:"email,omitempty"`
|
||||
Phone string `json:"phone,omitempty"`
|
||||
|
||||
67
pkg/flashduty/sync_user_test.go
Normal file
67
pkg/flashduty/sync_user_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package flashduty
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestPhoneIsSame(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
phone1 string
|
||||
phone2 string
|
||||
same bool
|
||||
}{
|
||||
{
|
||||
name: "blank",
|
||||
phone1: "",
|
||||
phone2: "",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 prefix",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "China +86 with spaces and hyphens",
|
||||
phone1: "+86 138-1234-5678",
|
||||
phone2: "13812345678",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "USA +1 prefix",
|
||||
phone1: "+1 234-567-8900",
|
||||
phone2: "2345678900",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "UK +44 prefix",
|
||||
phone1: "+442078765432",
|
||||
phone2: "2078765432",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "India +91 prefix",
|
||||
phone1: "+919876543210",
|
||||
phone2: "9876543210",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Germany +49 prefix",
|
||||
phone1: "+4915123456789",
|
||||
phone2: "15123456789",
|
||||
same: true,
|
||||
},
|
||||
{
|
||||
name: "Different numbers",
|
||||
phone1: "+8613812345678",
|
||||
phone2: "13812345679",
|
||||
same: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
if got := PhoneIsSame(tt.phone1, tt.phone2); got != tt.same {
|
||||
t.Errorf("%s: expected %v, got %v", tt.name, tt.same, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -71,6 +71,7 @@ var I18N = `{
|
||||
"no notify groups selected": "未选择通知组",
|
||||
"all users missing notify channel configurations: %v": "所有用户缺少通知渠道配置: %v",
|
||||
"event match subscribe and notify settings ok": "事件匹配订阅规则,通知设置正常",
|
||||
"/loki suffix is miss, please add /loki to the url: %s": "缺少/loki后缀,请在URL中添加/loki:%s",
|
||||
|
||||
"event time not match time filter": "事件时间不匹配时间过滤器",
|
||||
"event severity not match severity filter": "事件等级不匹配等级过滤器",
|
||||
@@ -261,6 +262,7 @@ var I18N = `{
|
||||
"no notify groups selected": "未選擇通知組",
|
||||
"all users missing notify channel configurations: %v": "所有用戶缺少通知渠道配置: %v",
|
||||
"event match subscribe and notify settings ok": "事件匹配訂閱規則,通知設置正常",
|
||||
"/loki suffix is miss, please add /loki to the url: %s": "缺少/loki後綴,請在URL中添加/loki:%s",
|
||||
|
||||
"event time not match time filter": "事件時間不匹配時間過濾器",
|
||||
"event severity not match severity filter": "事件等級不匹配等級過濾器",
|
||||
@@ -448,6 +450,7 @@ var I18N = `{
|
||||
"no notify groups selected": "通知グループが選択されていません",
|
||||
"all users missing notify channel configurations: %v": "すべてのユーザーに通知チャンネル設定がありません: %v",
|
||||
"event match subscribe and notify settings ok": "イベントがサブスクライブルールに一致し、通知設定が正常です",
|
||||
"/loki suffix is miss, please add /loki to the url: %s": "/lokiサフィックスがありません。URLに/lokiを追加してください: %s",
|
||||
|
||||
"event time not match time filter": "イベント時間が時間フィルタと一致しません",
|
||||
"event severity not match severity filter": "イベント等級が等級フィルタと一致しません",
|
||||
@@ -635,6 +638,7 @@ var I18N = `{
|
||||
"no notify groups selected": "Группы уведомлений не выбраны",
|
||||
"all users missing notify channel configurations: %v": "У всех пользователей отсутствуют настройки каналов уведомлений: %v",
|
||||
"event match subscribe and notify settings ok": "Событие соответствует правилу подписки, настройки уведомлений в порядке",
|
||||
"/loki suffix is miss, please add /loki to the url: %s": "Отсутствует суффикс /loki, пожалуйста, добавьте /loki к URL: %s",
|
||||
|
||||
"event time not match time filter": "Время события не соответствует временному фильтру",
|
||||
"event severity not match severity filter": "Уровень события не соответствует фильтру уровня",
|
||||
|
||||
@@ -117,7 +117,7 @@ func (pc *PromClientMap) loadFromDatabase() {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("setClientFromPromOption success: ", dsId)
|
||||
logger.Infof("setClientFromPromOption success, datasourceId: %d", dsId)
|
||||
PromOptions.Set(dsId, po)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
)
|
||||
@@ -23,6 +24,7 @@ type Set struct {
|
||||
redis storage.Redis
|
||||
ctx *ctx.Context
|
||||
configs pconf.Pushgw
|
||||
sema *semaphore.Semaphore
|
||||
}
|
||||
|
||||
func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
|
||||
@@ -32,6 +34,7 @@ func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
|
||||
ctx: ctx,
|
||||
configs: configs,
|
||||
}
|
||||
set.sema = semaphore.NewSemaphore(configs.UpdateTargetByUrlConcurrency)
|
||||
|
||||
set.Init()
|
||||
return set
|
||||
@@ -113,8 +116,26 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
Lst: lst,
|
||||
Now: now,
|
||||
}
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
return err
|
||||
|
||||
if !s.sema.TryAcquire() {
|
||||
logger.Warningf("update_targets: update target by url concurrency limit, skip update target: %v", lst)
|
||||
return nil // 达到并发上限,放弃请求,只是页面上的机器时间不更新,不影响机器失联告警,降级处理下
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer s.sema.Release()
|
||||
// 修改为异步发送,防止机器太多,每个请求耗时比较长导致机器心跳时间更新不及时
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to post target update: %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
@@ -133,16 +154,34 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
for i := 0; i < len(exists); i++ {
|
||||
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
|
||||
if err != nil {
|
||||
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
return fmt.Errorf("redis is nil")
|
||||
|
||||
@@ -18,6 +18,10 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
UpdateTargetByUrlConcurrency int
|
||||
|
||||
BusiGroupLabelKey string
|
||||
IdentMetrics []string
|
||||
@@ -124,6 +128,18 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
if p.UpdateTargetByUrlConcurrency <= 0 {
|
||||
p.UpdateTargetByUrlConcurrency = 10
|
||||
}
|
||||
|
||||
if p.BusiGroupLabelKey == "" {
|
||||
p.BusiGroupLabelKey = "busigroup"
|
||||
}
|
||||
|
||||
@@ -105,6 +105,17 @@ var (
|
||||
},
|
||||
[]string{"operation", "status"},
|
||||
)
|
||||
|
||||
DBOperationLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "db_operation_latency_seconds",
|
||||
Help: "Histogram of latencies for DB operations",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -121,5 +132,6 @@ func init() {
|
||||
GaugeSampleQueueSize,
|
||||
CounterPushQueueOverLimitTotal,
|
||||
RedisOperationLatency,
|
||||
DBOperationLatency,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -157,10 +157,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
}
|
||||
|
||||
type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
PushConcurrency atomic.Int64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -210,6 +211,27 @@ func (ws *WritersType) Put(name string, writer Writer) {
|
||||
ws.backends[name] = writer
|
||||
}
|
||||
|
||||
func (ws *WritersType) isCriticalBackend(key string) bool {
|
||||
backend, exists := ws.backends[key]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// 使用类型断言判断
|
||||
switch backend.(type) {
|
||||
case WriterType:
|
||||
// HTTP Writer 作为关键后端
|
||||
return true
|
||||
case KafkaWriterType:
|
||||
// Kafka Writer 作为非关键后端
|
||||
return false
|
||||
default:
|
||||
// 未知类型,保守起见作为关键后端
|
||||
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) CleanExpQueue() {
|
||||
for {
|
||||
ws.Lock()
|
||||
@@ -278,12 +300,64 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
continue
|
||||
}
|
||||
for key := range ws.backends {
|
||||
ws.backends[key].Write(key, series)
|
||||
|
||||
if ws.isCriticalBackend(key) {
|
||||
ws.backends[key].Write(key, series)
|
||||
} else {
|
||||
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
|
||||
ws.writeToNonCriticalBackend(key, series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
|
||||
// 原子性地检查并增加并发数
|
||||
currentConcurrency := ws.PushConcurrency.Add(1)
|
||||
|
||||
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
|
||||
// 超过限制,立即减少计数并丢弃
|
||||
ws.PushConcurrency.Add(-1)
|
||||
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
|
||||
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
|
||||
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
|
||||
return
|
||||
}
|
||||
|
||||
// 深拷贝数据,确保并发安全
|
||||
seriesCopy := ws.deepCopySeries(series)
|
||||
|
||||
// 启动goroutine处理
|
||||
go func(backendKey string, data []prompb.TimeSeries) {
|
||||
defer func() {
|
||||
ws.PushConcurrency.Add(-1)
|
||||
if r := recover(); r != nil {
|
||||
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
|
||||
}
|
||||
}()
|
||||
|
||||
ws.backends[backendKey].Write(backendKey, data)
|
||||
}(key, seriesCopy)
|
||||
}
|
||||
|
||||
// 完整的深拷贝方法
|
||||
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
seriesCopy := make([]prompb.TimeSeries, len(series))
|
||||
|
||||
for i := range series {
|
||||
seriesCopy[i] = series[i]
|
||||
|
||||
if len(series[i].Samples) > 0 {
|
||||
samples := make([]prompb.Sample, len(series[i].Samples))
|
||||
copy(samples, series[i].Samples)
|
||||
seriesCopy[i].Samples = samples
|
||||
}
|
||||
}
|
||||
|
||||
return seriesCopy
|
||||
}
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
ws.AllQueueLen.Store(int64(0))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user