mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
27 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
21478fcf3d | ||
|
|
a87c856299 | ||
|
|
ba035a446d | ||
|
|
bf840e6bb2 | ||
|
|
cd01092aed | ||
|
|
e202fd50c8 | ||
|
|
f0e5062485 | ||
|
|
861fe96de5 | ||
|
|
5b66ada96d | ||
|
|
d5a98debff | ||
|
|
4977052a67 | ||
|
|
dcc461e587 | ||
|
|
f5ce1733bb | ||
|
|
436cf25409 | ||
|
|
038f68b0b7 | ||
|
|
96ef1895b7 | ||
|
|
eeaa7b46f1 | ||
|
|
dc525352f1 | ||
|
|
98a3fe9375 | ||
|
|
74b0f802ec | ||
|
|
85bd3148d5 | ||
|
|
0931fa9603 | ||
|
|
65cdb2da9e | ||
|
|
9ad6514af6 | ||
|
|
302c6549e4 | ||
|
|
a3122270e6 | ||
|
|
1245c453bb |
@@ -141,7 +141,7 @@ func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
|
||||
func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
|
||||
conf := smtp
|
||||
if conf.Host == "" || conf.Port == 0 {
|
||||
logger.Warning("SMTP configurations invalid")
|
||||
logger.Debug("SMTP configurations invalid")
|
||||
<-mailQuit
|
||||
return
|
||||
}
|
||||
|
||||
@@ -290,6 +290,15 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
|
||||
models.DataSourceQueryAll,
|
||||
}
|
||||
}
|
||||
|
||||
// 将导入的规则统一转为新版本的通知规则配置
|
||||
lst[i].NotifyVersion = 1
|
||||
lst[i].NotifyChannelsJSON = []string{}
|
||||
lst[i].NotifyGroupsJSON = []string{}
|
||||
lst[i].NotifyChannels = ""
|
||||
lst[i].NotifyGroups = ""
|
||||
lst[i].Callbacks = ""
|
||||
lst[i].CallbacksJSON = []string{}
|
||||
}
|
||||
|
||||
bgid := ginx.UrlParamInt64(c, "id")
|
||||
|
||||
4
go.mod
4
go.mod
@@ -13,7 +13,7 @@ require (
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/expr-lang/expr v1.16.1
|
||||
github.com/flashcatcloud/ibex v1.3.5
|
||||
github.com/flashcatcloud/ibex v1.3.6
|
||||
github.com/gin-contrib/pprof v1.4.0
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
@@ -160,3 +160,5 @@ require (
|
||||
)
|
||||
|
||||
replace golang.org/x/exp v0.0.0-20231006140011-7918f672742d => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
|
||||
|
||||
// replace github.com/flashcatcloud/ibex => ../github.com/flashcatcloud/ibex
|
||||
|
||||
4
go.sum
4
go.sum
@@ -89,8 +89,8 @@ github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8
|
||||
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
|
||||
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
|
||||
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
|
||||
github.com/flashcatcloud/ibex v1.3.5 h1:8GOOf5+aJT0TP/MC6izz7CO5JKJSdKVFBwL0vQp93Nc=
|
||||
github.com/flashcatcloud/ibex v1.3.5/go.mod h1:T8hbMUySK2q6cXUaYp0AUVeKkU9Od2LjzwmB5lmTRBM=
|
||||
github.com/flashcatcloud/ibex v1.3.6 h1:lJShPFxcZksmkB0w99a3uROGB+Fie1NsqOlkAdar12A=
|
||||
github.com/flashcatcloud/ibex v1.3.6/go.mod h1:iTU1dKT9TnDNllRPRHUOjXe+HDTQkPH2TeaucHtSuh4=
|
||||
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
|
||||
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
|
||||
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
1873
integrations/Java/dashboards/jmx_by_kubernetes.json
Normal file
1873
integrations/Java/dashboards/jmx_by_kubernetes.json
Normal file
File diff suppressed because it is too large
Load Diff
2024
integrations/MinIO/dashboards/new-version.json
Normal file
2024
integrations/MinIO/dashboards/new-version.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -448,7 +448,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
|
||||
logger.Warning("SMTP configurations invalid")
|
||||
return
|
||||
}
|
||||
logger.Infof("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
|
||||
logger.Debugf("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
|
||||
|
||||
d := gomail.NewDialer(conf.Host, conf.Port, conf.Username, conf.Password)
|
||||
if conf.InsecureSkipVerify {
|
||||
|
||||
@@ -2,7 +2,6 @@ package models
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/hmac"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
@@ -94,6 +93,9 @@ type UserInfo struct {
|
||||
type FlashDutyRequestConfig struct {
|
||||
Proxy string `json:"proxy"`
|
||||
IntegrationUrl string `json:"integration_url"`
|
||||
Timeout int `json:"timeout"` // 超时时间(毫秒)
|
||||
RetryTimes int `json:"retry_times"` // 重试次数
|
||||
RetrySleep int `json:"retry_sleep"` // 重试等待时间(毫秒)
|
||||
}
|
||||
|
||||
// ParamItem 自定义参数项
|
||||
@@ -196,7 +198,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
|
||||
cmd.Stderr = &buf
|
||||
|
||||
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
|
||||
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
|
||||
|
||||
res := buf.String()
|
||||
|
||||
@@ -315,9 +317,20 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
|
||||
}
|
||||
|
||||
httpConfig := nc.RequestConfig.HTTPRequestConfig
|
||||
if httpConfig.Timeout == 0 {
|
||||
httpConfig.Timeout = 10000
|
||||
|
||||
// 对于 FlashDuty 类型,优先使用 FlashDuty 配置中的超时时间
|
||||
timeout := httpConfig.Timeout
|
||||
if nc.RequestType == "flashduty" && nc.RequestConfig.FlashDutyRequestConfig != nil {
|
||||
flashDutyTimeout := nc.RequestConfig.FlashDutyRequestConfig.Timeout
|
||||
if flashDutyTimeout > 0 {
|
||||
timeout = flashDutyTimeout
|
||||
}
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
timeout = 10000 // HTTP 默认 10 秒
|
||||
}
|
||||
|
||||
if httpConfig.Concurrency == 0 {
|
||||
httpConfig.Concurrency = 5
|
||||
}
|
||||
@@ -347,18 +360,78 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
|
||||
Proxy: proxyFunc,
|
||||
TLSClientConfig: tlsConfig,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
|
||||
Timeout: time.Duration(timeout) * time.Millisecond,
|
||||
}).DialContext,
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
|
||||
Timeout: time.Duration(timeout) * time.Millisecond,
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) makeHTTPRequest(httpConfig *HTTPRequestConfig, url string, headers map[string]string, parameters map[string]string, body []byte) (*http.Request, error) {
|
||||
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query := req.URL.Query()
|
||||
// 设置请求头 腾讯云短信、语音特殊处理
|
||||
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
|
||||
headers = ncc.setTxHeader(headers, body)
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
|
||||
req, err = http.NewRequest(httpConfig.Method, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
|
||||
for key, value := range headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
} else {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
|
||||
for key, value := range parameters {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
req.URL.RawQuery = query.Encode()
|
||||
// 记录完整的请求信息
|
||||
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) makeFlashDutyRequest(url string, bodyBytes []byte, flashDutyChannelID int64) (*http.Request, error) {
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyBytes))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 设置 URL 参数
|
||||
query := req.URL.Query()
|
||||
if flashDutyChannelID != 0 {
|
||||
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDutyChannelID int64, client *http.Client) (string, error) {
|
||||
// todo 每一个 channel 批量发送事件
|
||||
if client == nil {
|
||||
@@ -370,46 +443,57 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
|
||||
return "", err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v, event: %v", err, events)
|
||||
return "", err
|
||||
url := ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl
|
||||
|
||||
retrySleep := time.Second
|
||||
if ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep > 0 {
|
||||
retrySleep = time.Duration(ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep) * time.Millisecond
|
||||
}
|
||||
|
||||
// 设置 URL 参数
|
||||
query := req.URL.Query()
|
||||
if flashDutyChannelID != 0 {
|
||||
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
|
||||
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
|
||||
retryTimes := 3
|
||||
if ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes > 0 {
|
||||
retryTimes = ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes
|
||||
}
|
||||
req.URL.RawQuery = query.Encode()
|
||||
req.Header.Add("Content-Type", "application/json")
|
||||
|
||||
// 重试机制
|
||||
for i := 0; i <= 3; i++ {
|
||||
logger.Infof("send flashduty req:%+v body:%+v", req, string(body))
|
||||
// 把最后一次错误保存下来,后面返回,让用户在页面上也可以看到
|
||||
var lastErrorMessage string
|
||||
for i := 0; i <= retryTimes; i++ {
|
||||
req, err := ncc.makeFlashDutyRequest(url, body, flashDutyChannelID)
|
||||
if err != nil {
|
||||
logger.Errorf("send_flashduty: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
return fmt.Sprintf("failed to create request. error: %v", err), err
|
||||
}
|
||||
|
||||
// 直接使用客户端发送请求,超时时间已经在 client 中设置
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("send flashduty req:%+v err:%v", req, err)
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
logger.Errorf("send_flashduty: http_call=fail url=%s request_body=%s error=%v times=%d", url, string(body), err, i+1)
|
||||
if i < retryTimes {
|
||||
// 重试等待时间,后面要放到页面上配置
|
||||
time.Sleep(retrySleep)
|
||||
}
|
||||
lastErrorMessage = err.Error()
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 读取响应
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response: %v, event: %v", err, events)
|
||||
// 走到这里,说明请求 Flashduty 成功,不管 Flashduty 返回了什么结果,都不判断,仅保存,给用户查看即可
|
||||
// 比如服务端返回 5xx,也不要重试,重试可能会导致服务端数据有问题。告警事件这样的东西,没有那么关键,只要最终能在 UI 上看到调用结果就行
|
||||
var resBody []byte
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
|
||||
resBody, err = io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("send_flashduty: failed to read response. request_body=%s, error=%v", string(body), err)
|
||||
resBody = []byte("failed to read response. error: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Infof("send flashduty req:%+v resp:%+v body:%+v err:%v", req, resp, string(body), err)
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return string(body), nil
|
||||
}
|
||||
time.Sleep(time.Duration(100) * time.Millisecond)
|
||||
logger.Infof("send_flashduty: http_call=succ url=%s request_body=%s response_code=%d response_body=%s times=%d", url, string(body), resp.StatusCode, string(resBody), i+1)
|
||||
return fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(resBody)), nil
|
||||
}
|
||||
|
||||
return "", errors.New("failed to send request")
|
||||
return lastErrorMessage, errors.New("failed to send request")
|
||||
}
|
||||
|
||||
func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string]interface{}, params map[string]string, sendtos []string, client *http.Client) (string, error) {
|
||||
@@ -447,54 +531,21 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
url, headers, parameters := ncc.replaceVariables(fullTpl)
|
||||
logger.Infof("url: %v, headers: %v, parameters: %v", url, headers, parameters)
|
||||
|
||||
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v, event: %v", err, events)
|
||||
return "", err
|
||||
}
|
||||
|
||||
query := req.URL.Query()
|
||||
// 设置请求头 腾讯云短信、语音特殊处理
|
||||
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
|
||||
headers = ncc.setTxHeader(headers, body)
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
|
||||
req, err = http.NewRequest(httpConfig.Method, url, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
|
||||
for key, value := range headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
} else {
|
||||
for key, value := range headers {
|
||||
req.Header.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
|
||||
for key, value := range parameters {
|
||||
query.Add(key, value)
|
||||
}
|
||||
}
|
||||
|
||||
req.URL.RawQuery = query.Encode()
|
||||
// 记录完整的请求信息
|
||||
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
|
||||
|
||||
// 重试机制
|
||||
for i := 0; i <= httpConfig.RetryTimes; i++ {
|
||||
var lastErrorMessage string
|
||||
for i := 0; i < httpConfig.RetryTimes; i++ {
|
||||
var resp *http.Response
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(httpConfig.Timeout)*time.Millisecond)
|
||||
resp, err = client.Do(req.WithContext(ctx))
|
||||
cancel() // 确保释放资源
|
||||
req, err := ncc.makeHTTPRequest(httpConfig, url, headers, parameters, body)
|
||||
if err != nil {
|
||||
logger.Errorf("send_http: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
return fmt.Sprintf("failed to create request. error: %v", err), err
|
||||
}
|
||||
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("send_http: failed to send http notify. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
lastErrorMessage = err.Error()
|
||||
time.Sleep(time.Duration(httpConfig.RetryInterval) * time.Second)
|
||||
logger.Errorf("send http request failed to send http notify: %v", err)
|
||||
continue
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -502,11 +553,9 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
// 读取响应
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
logger.Debugf("send http request: %+v, response: %+v, body: %+v", req, resp, string(body))
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send http notify: %v", err)
|
||||
logger.Errorf("send_http: failed to read response. url=%s request_body=%s error=%v", url, string(body), err)
|
||||
}
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
return string(body), nil
|
||||
}
|
||||
@@ -514,8 +563,7 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
|
||||
return "", fmt.Errorf("failed to send request, status code: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return "", err
|
||||
|
||||
return lastErrorMessage, errors.New("all retries failed, last error: " + lastErrorMessage)
|
||||
}
|
||||
|
||||
// getAliQuery 获取阿里云API的查询参数和请求头
|
||||
@@ -1421,6 +1469,8 @@ var NotiChMap = []*NotifyChannelConfig{
|
||||
},
|
||||
FlashDutyRequestConfig: &FlashDutyRequestConfig{
|
||||
IntegrationUrl: "flashduty integration url",
|
||||
Timeout: 5000, // 默认5秒超时
|
||||
RetryTimes: 3, // 默认重试3次
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -117,7 +117,7 @@ func (pc *PromClientMap) loadFromDatabase() {
|
||||
continue
|
||||
}
|
||||
|
||||
logger.Info("setClientFromPromOption success: ", dsId)
|
||||
logger.Infof("setClientFromPromOption success, datasourceId: %d", dsId)
|
||||
PromOptions.Set(dsId, po)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstat"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
)
|
||||
@@ -23,6 +24,7 @@ type Set struct {
|
||||
redis storage.Redis
|
||||
ctx *ctx.Context
|
||||
configs pconf.Pushgw
|
||||
sema *semaphore.Semaphore
|
||||
}
|
||||
|
||||
func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
|
||||
@@ -32,6 +34,7 @@ func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
|
||||
ctx: ctx,
|
||||
configs: configs,
|
||||
}
|
||||
set.sema = semaphore.NewSemaphore(configs.UpdateTargetByUrlConcurrency)
|
||||
|
||||
set.Init()
|
||||
return set
|
||||
@@ -113,8 +116,26 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
Lst: lst,
|
||||
Now: now,
|
||||
}
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
return err
|
||||
|
||||
if !s.sema.TryAcquire() {
|
||||
logger.Warningf("update_targets: update target by url concurrency limit, skip update target: %v", lst)
|
||||
return nil // 达到并发上限,放弃请求,只是页面上的机器时间不更新,不影响机器失联告警,降级处理下
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer s.sema.Release()
|
||||
// 修改为异步发送,防止机器太多,每个请求耗时比较长导致机器心跳时间更新不及时
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to post target update: %v", err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.configs.UpdateDBTargetTimestampDisable {
|
||||
// 如果 mysql 压力太大,关闭更新 db 的操作
|
||||
return nil
|
||||
}
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
@@ -133,16 +154,34 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
}
|
||||
|
||||
// 从批量更新一批机器的时间戳,改成逐台更新,是为了避免批量更新时,mysql的锁竞争问题
|
||||
for i := 0; i < len(exists); i++ {
|
||||
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
|
||||
if err != nil {
|
||||
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
|
||||
start := time.Now()
|
||||
duration := time.Since(start).Seconds()
|
||||
if len(exists) > 0 {
|
||||
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < len(exists); i++ {
|
||||
sema.Acquire()
|
||||
wg.Add(1)
|
||||
go func(ident string) {
|
||||
defer sema.Release()
|
||||
defer wg.Done()
|
||||
s.updateDBTargetTs(ident, now)
|
||||
}(exists[i])
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Set) updateDBTargetTs(ident string, now int64) {
|
||||
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
|
||||
if err != nil {
|
||||
logger.Error("update_target: failed to update target:", ident, "error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
|
||||
if redis == nil {
|
||||
return fmt.Errorf("redis is nil")
|
||||
|
||||
@@ -18,6 +18,10 @@ type Pushgw struct {
|
||||
UpdateTargetRetryIntervalMills int64
|
||||
UpdateTargetTimeoutMills int64
|
||||
UpdateTargetBatchSize int
|
||||
UpdateDBTargetConcurrency int
|
||||
UpdateDBTargetTimestampDisable bool
|
||||
PushConcurrency int
|
||||
UpdateTargetByUrlConcurrency int
|
||||
|
||||
BusiGroupLabelKey string
|
||||
IdentMetrics []string
|
||||
@@ -124,6 +128,18 @@ func (p *Pushgw) PreCheck() {
|
||||
p.UpdateTargetBatchSize = 20
|
||||
}
|
||||
|
||||
if p.UpdateDBTargetConcurrency <= 0 {
|
||||
p.UpdateDBTargetConcurrency = 16
|
||||
}
|
||||
|
||||
if p.PushConcurrency <= 0 {
|
||||
p.PushConcurrency = 16
|
||||
}
|
||||
|
||||
if p.UpdateTargetByUrlConcurrency <= 0 {
|
||||
p.UpdateTargetByUrlConcurrency = 10
|
||||
}
|
||||
|
||||
if p.BusiGroupLabelKey == "" {
|
||||
p.BusiGroupLabelKey = "busigroup"
|
||||
}
|
||||
|
||||
@@ -105,6 +105,17 @@ var (
|
||||
},
|
||||
[]string{"operation", "status"},
|
||||
)
|
||||
|
||||
DBOperationLatency = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "db_operation_latency_seconds",
|
||||
Help: "Histogram of latencies for DB operations",
|
||||
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
|
||||
},
|
||||
[]string{"operation"},
|
||||
)
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -121,5 +132,6 @@ func init() {
|
||||
GaugeSampleQueueSize,
|
||||
CounterPushQueueOverLimitTotal,
|
||||
RedisOperationLatency,
|
||||
DBOperationLatency,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -157,10 +157,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
}
|
||||
|
||||
type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]Writer
|
||||
queues map[string]*IdentQueue
|
||||
AllQueueLen atomic.Value
|
||||
PushConcurrency atomic.Int64
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -210,6 +211,27 @@ func (ws *WritersType) Put(name string, writer Writer) {
|
||||
ws.backends[name] = writer
|
||||
}
|
||||
|
||||
func (ws *WritersType) isCriticalBackend(key string) bool {
|
||||
backend, exists := ws.backends[key]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
// 使用类型断言判断
|
||||
switch backend.(type) {
|
||||
case WriterType:
|
||||
// HTTP Writer 作为关键后端
|
||||
return true
|
||||
case KafkaWriterType:
|
||||
// Kafka Writer 作为非关键后端
|
||||
return false
|
||||
default:
|
||||
// 未知类型,保守起见作为关键后端
|
||||
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) CleanExpQueue() {
|
||||
for {
|
||||
ws.Lock()
|
||||
@@ -278,12 +300,64 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
continue
|
||||
}
|
||||
for key := range ws.backends {
|
||||
ws.backends[key].Write(key, series)
|
||||
|
||||
if ws.isCriticalBackend(key) {
|
||||
ws.backends[key].Write(key, series)
|
||||
} else {
|
||||
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
|
||||
ws.writeToNonCriticalBackend(key, series)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
|
||||
// 原子性地检查并增加并发数
|
||||
currentConcurrency := ws.PushConcurrency.Add(1)
|
||||
|
||||
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
|
||||
// 超过限制,立即减少计数并丢弃
|
||||
ws.PushConcurrency.Add(-1)
|
||||
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
|
||||
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
|
||||
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
|
||||
return
|
||||
}
|
||||
|
||||
// 深拷贝数据,确保并发安全
|
||||
seriesCopy := ws.deepCopySeries(series)
|
||||
|
||||
// 启动goroutine处理
|
||||
go func(backendKey string, data []prompb.TimeSeries) {
|
||||
defer func() {
|
||||
ws.PushConcurrency.Add(-1)
|
||||
if r := recover(); r != nil {
|
||||
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
|
||||
}
|
||||
}()
|
||||
|
||||
ws.backends[backendKey].Write(backendKey, data)
|
||||
}(key, seriesCopy)
|
||||
}
|
||||
|
||||
// 完整的深拷贝方法
|
||||
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
seriesCopy := make([]prompb.TimeSeries, len(series))
|
||||
|
||||
for i := range series {
|
||||
seriesCopy[i] = series[i]
|
||||
|
||||
if len(series[i].Samples) > 0 {
|
||||
samples := make([]prompb.Sample, len(series[i].Samples))
|
||||
copy(samples, series[i].Samples)
|
||||
seriesCopy[i].Samples = samples
|
||||
}
|
||||
}
|
||||
|
||||
return seriesCopy
|
||||
}
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
ws.AllQueueLen.Store(int64(0))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user