Compare commits

...

27 Commits

Author SHA1 Message Date
Yening Qin
21478fcf3d fix: send http notify retry (#2849) 2025-08-30 01:57:32 +08:00
ulricqin
a87c856299 fix: call flashduty to push event (#2848) 2025-08-30 00:06:53 +08:00
ning
ba035a446d refactor: change some log 2025-08-29 16:32:57 +08:00
ning
bf840e6bb2 docs: update dashboard tpl 2025-08-29 10:14:59 +08:00
ning
cd01092aed refactor: update alert rule import api 2025-08-28 19:44:05 +08:00
ning
e202fd50c8 refactor: datasource api 2025-08-28 16:48:04 +08:00
ning
f0e5062485 refactor: optimize edge ident update 2025-08-28 16:24:33 +08:00
ning
861fe96de5 add UpdateDBTargetTimestampDisable 2025-08-27 19:12:56 +08:00
Ulric Qin
5b66ada96d Merge branch 'main' of https://github.com/ccfos/nightingale 2025-08-27 10:06:19 +08:00
Ulric Qin
d5a98debff upgrade ibex 2025-08-27 10:06:11 +08:00
ning
4977052a67 Merge branch 'main' of github.com:ccfos/nightingale 2025-08-22 15:03:17 +08:00
ning
dcc461e587 refactor: push writer support sync 2025-08-22 15:00:49 +08:00
Ulric Qin
f5ce1733bb update minio dashboard 2025-08-21 18:58:47 +08:00
Ulric Qin
436cf25409 Merge branch 'main' of https://github.com/ccfos/nightingale 2025-08-21 10:34:56 +08:00
Ulric Qin
038f68b0b7 add minio dashboard for new version 2025-08-21 10:34:50 +08:00
ning
96ef1895b7 refactor: event_script_notify_result log add stdin 2025-08-20 14:24:28 +08:00
zjxpsetp
eeaa7b46f1 update es dashboard for categraf version bigger than 0.3.102 2025-08-19 23:43:54 +08:00
zjxpsetp
dc525352f1 Merge remote-tracking branch 'origin/main' 2025-08-19 17:36:12 +08:00
zjxpsetp
98a3fe9375 update jmx dashboard in kubernetes 2025-08-19 17:35:55 +08:00
ning
74b0f802ec Merge branch 'main' of github.com:ccfos/nightingale 2025-08-18 11:19:07 +08:00
ning
85bd3148d5 refactor: add update db metric 2025-08-18 11:18:52 +08:00
ning
0931fa9603 fix: target update ts 2025-08-18 11:18:39 +08:00
zjxpsetp
65cdb2da9e 更新 jmx 的仪表盘,新的jmx Exporter 指标和之前有一些差别 2025-08-17 17:23:39 +08:00
ning
9ad6514af6 refactor: ds query api 2025-08-13 11:50:01 +08:00
ning
302c6549e4 refactor: ds query api 2025-08-13 11:12:57 +08:00
ning
a3122270e6 refactor: ds query api 2025-08-13 11:02:00 +08:00
Yening Qin
1245c453bb refactor: send flashduty (#2824) 2025-08-12 20:55:23 +08:00
15 changed files with 8815 additions and 1104 deletions

View File

@@ -141,7 +141,7 @@ func updateSmtp(ctx *ctx.Context, ncc *memsto.NotifyConfigCacheType) {
func startEmailSender(ctx *ctx.Context, smtp aconf.SMTPConfig) {
conf := smtp
if conf.Host == "" || conf.Port == 0 {
logger.Warning("SMTP configurations invalid")
logger.Debug("SMTP configurations invalid")
<-mailQuit
return
}

View File

@@ -290,6 +290,15 @@ func (rt *Router) alertRuleAddByImport(c *gin.Context) {
models.DataSourceQueryAll,
}
}
// 将导入的规则统一转为新版本的通知规则配置
lst[i].NotifyVersion = 1
lst[i].NotifyChannelsJSON = []string{}
lst[i].NotifyGroupsJSON = []string{}
lst[i].NotifyChannels = ""
lst[i].NotifyGroups = ""
lst[i].Callbacks = ""
lst[i].CallbacksJSON = []string{}
}
bgid := ginx.UrlParamInt64(c, "id")

4
go.mod
View File

@@ -13,7 +13,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/expr-lang/expr v1.16.1
github.com/flashcatcloud/ibex v1.3.5
github.com/flashcatcloud/ibex v1.3.6
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.9.1
github.com/glebarez/sqlite v1.11.0
@@ -160,3 +160,5 @@ require (
)
replace golang.org/x/exp v0.0.0-20231006140011-7918f672742d => golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
// replace github.com/flashcatcloud/ibex => ../github.com/flashcatcloud/ibex

4
go.sum
View File

@@ -89,8 +89,8 @@ github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8
github.com/fatih/camelcase v1.0.0/go.mod h1:yN2Sb0lFhZJUdVvtELVWefmrXpuZESvPmqwoZc+/fpc=
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/flashcatcloud/ibex v1.3.5 h1:8GOOf5+aJT0TP/MC6izz7CO5JKJSdKVFBwL0vQp93Nc=
github.com/flashcatcloud/ibex v1.3.5/go.mod h1:T8hbMUySK2q6cXUaYp0AUVeKkU9Od2LjzwmB5lmTRBM=
github.com/flashcatcloud/ibex v1.3.6 h1:lJShPFxcZksmkB0w99a3uROGB+Fie1NsqOlkAdar12A=
github.com/flashcatcloud/ibex v1.3.6/go.mod h1:iTU1dKT9TnDNllRPRHUOjXe+HDTQkPH2TeaucHtSuh4=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -448,7 +448,7 @@ func (ncc *NotifyChannelCacheType) startEmailSender(chID int64, smtp *models.SMT
logger.Warning("SMTP configurations invalid")
return
}
logger.Infof("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
logger.Debugf("start email sender... conf.Host:%+v,conf.Port:%+v", conf.Host, conf.Port)
d := gomail.NewDialer(conf.Host, conf.Port, conf.Username, conf.Password)
if conf.InsecureSkipVerify {

View File

@@ -2,7 +2,6 @@ package models
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/tls"
@@ -94,6 +93,9 @@ type UserInfo struct {
type FlashDutyRequestConfig struct {
Proxy string `json:"proxy"`
IntegrationUrl string `json:"integration_url"`
Timeout int `json:"timeout"` // 超时时间(毫秒)
RetryTimes int `json:"retry_times"` // 重试次数
RetrySleep int `json:"retry_sleep"` // 重试等待时间(毫秒)
}
// ParamItem 自定义参数项
@@ -196,7 +198,7 @@ func (ncc *NotifyChannelConfig) SendScript(events []*AlertCurEvent, tpl map[stri
cmd.Stderr = &buf
err, isTimeout := cmdx.RunTimeout(cmd, time.Duration(config.Timeout)*time.Millisecond)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v", fpath, buf.String(), isTimeout, err)
logger.Infof("event_script_notify_result: exec %s output: %s isTimeout: %v err: %v stdin: %s", fpath, buf.String(), isTimeout, err, string(getStdinBytes(events, tpl, params, sendtos)))
res := buf.String()
@@ -315,9 +317,20 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
}
httpConfig := nc.RequestConfig.HTTPRequestConfig
if httpConfig.Timeout == 0 {
httpConfig.Timeout = 10000
// 对于 FlashDuty 类型,优先使用 FlashDuty 配置中的超时时间
timeout := httpConfig.Timeout
if nc.RequestType == "flashduty" && nc.RequestConfig.FlashDutyRequestConfig != nil {
flashDutyTimeout := nc.RequestConfig.FlashDutyRequestConfig.Timeout
if flashDutyTimeout > 0 {
timeout = flashDutyTimeout
}
}
if timeout == 0 {
timeout = 10000 // HTTP 默认 10 秒
}
if httpConfig.Concurrency == 0 {
httpConfig.Concurrency = 5
}
@@ -347,18 +360,78 @@ func GetHTTPClient(nc *NotifyChannelConfig) (*http.Client, error) {
Proxy: proxyFunc,
TLSClientConfig: tlsConfig,
DialContext: (&net.Dialer{
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
Timeout: time.Duration(timeout) * time.Millisecond,
}).DialContext,
}
client := &http.Client{
Transport: transport,
Timeout: time.Duration(httpConfig.Timeout) * time.Millisecond,
Timeout: time.Duration(timeout) * time.Millisecond,
}
return client, nil
}
func (ncc *NotifyChannelConfig) makeHTTPRequest(httpConfig *HTTPRequestConfig, url string, headers map[string]string, parameters map[string]string, body []byte) (*http.Request, error) {
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v", err)
return nil, err
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return nil, err
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
return req, nil
}
func (ncc *NotifyChannelConfig) makeFlashDutyRequest(url string, bodyBytes []byte, flashDutyChannelID int64) (*http.Request, error) {
req, err := http.NewRequest("POST", url, bytes.NewBuffer(bodyBytes))
if err != nil {
return nil, err
}
// 设置 URL 参数
query := req.URL.Query()
if flashDutyChannelID != 0 {
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
}
req.URL.RawQuery = query.Encode()
req.Header.Add("Content-Type", "application/json")
return req, nil
}
func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDutyChannelID int64, client *http.Client) (string, error) {
// todo 每一个 channel 批量发送事件
if client == nil {
@@ -370,46 +443,57 @@ func (ncc *NotifyChannelConfig) SendFlashDuty(events []*AlertCurEvent, flashDuty
return "", err
}
req, err := http.NewRequest("POST", ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v, event: %v", err, events)
return "", err
url := ncc.RequestConfig.FlashDutyRequestConfig.IntegrationUrl
retrySleep := time.Second
if ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep > 0 {
retrySleep = time.Duration(ncc.RequestConfig.FlashDutyRequestConfig.RetrySleep) * time.Millisecond
}
// 设置 URL 参数
query := req.URL.Query()
if flashDutyChannelID != 0 {
// 如果 flashduty 有配置协作空间(channel_id),则传入 channel_id 参数
query.Add("channel_id", strconv.FormatInt(flashDutyChannelID, 10))
retryTimes := 3
if ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes > 0 {
retryTimes = ncc.RequestConfig.FlashDutyRequestConfig.RetryTimes
}
req.URL.RawQuery = query.Encode()
req.Header.Add("Content-Type", "application/json")
// 重试机制
for i := 0; i <= 3; i++ {
logger.Infof("send flashduty req:%+v body:%+v", req, string(body))
// 把最后一次错误保存下来,后面返回,让用户在页面上也可以看到
var lastErrorMessage string
for i := 0; i <= retryTimes; i++ {
req, err := ncc.makeFlashDutyRequest(url, body, flashDutyChannelID)
if err != nil {
logger.Errorf("send_flashduty: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
return fmt.Sprintf("failed to create request. error: %v", err), err
}
// 直接使用客户端发送请求,超时时间已经在 client 中设置
resp, err := client.Do(req)
if err != nil {
logger.Errorf("send flashduty req:%+v err:%v", req, err)
time.Sleep(time.Duration(100) * time.Millisecond)
logger.Errorf("send_flashduty: http_call=fail url=%s request_body=%s error=%v times=%d", url, string(body), err, i+1)
if i < retryTimes {
// 重试等待时间,后面要放到页面上配置
time.Sleep(retrySleep)
}
lastErrorMessage = err.Error()
continue
}
defer resp.Body.Close()
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response: %v, event: %v", err, events)
// 走到这里,说明请求 Flashduty 成功,不管 Flashduty 返回了什么结果,都不判断,仅保存,给用户查看即可
// 比如服务端返回 5xx也不要重试重试可能会导致服务端数据有问题。告警事件这样的东西没有那么关键只要最终能在 UI 上看到调用结果就行
var resBody []byte
if resp.Body != nil {
defer resp.Body.Close()
resBody, err = io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("send_flashduty: failed to read response. request_body=%s, error=%v", string(body), err)
resBody = []byte("failed to read response. error: " + err.Error())
}
}
logger.Infof("send flashduty req:%+v resp:%+v body:%+v err:%v", req, resp, string(body), err)
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
time.Sleep(time.Duration(100) * time.Millisecond)
logger.Infof("send_flashduty: http_call=succ url=%s request_body=%s response_code=%d response_body=%s times=%d", url, string(body), resp.StatusCode, string(resBody), i+1)
return fmt.Sprintf("status_code:%d, response:%s", resp.StatusCode, string(resBody)), nil
}
return "", errors.New("failed to send request")
return lastErrorMessage, errors.New("failed to send request")
}
func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string]interface{}, params map[string]string, sendtos []string, client *http.Client) (string, error) {
@@ -447,54 +531,21 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
url, headers, parameters := ncc.replaceVariables(fullTpl)
logger.Infof("url: %v, headers: %v, parameters: %v", url, headers, parameters)
req, err := http.NewRequest(httpConfig.Method, url, bytes.NewBuffer(body))
if err != nil {
logger.Errorf("failed to create request: %v, event: %v", err, events)
return "", err
}
query := req.URL.Query()
// 设置请求头 腾讯云短信、语音特殊处理
if ncc.Ident == "tx-sms" || ncc.Ident == "tx-voice" {
headers = ncc.setTxHeader(headers, body)
for key, value := range headers {
req.Header.Add(key, value)
}
} else if ncc.Ident == "ali-sms" || ncc.Ident == "ali-voice" {
req, err = http.NewRequest(httpConfig.Method, url, nil)
if err != nil {
return "", err
}
query, headers = ncc.getAliQuery(ncc.Ident, query, httpConfig.Request.Parameters["AccessKeyId"], httpConfig.Request.Parameters["AccessKeySecret"], parameters)
for key, value := range headers {
req.Header.Set(key, value)
}
} else {
for key, value := range headers {
req.Header.Add(key, value)
}
}
if ncc.Ident != "ali-sms" && ncc.Ident != "ali-voice" {
for key, value := range parameters {
query.Add(key, value)
}
}
req.URL.RawQuery = query.Encode()
// 记录完整的请求信息
logger.Debugf("URL: %v, Method: %s, Headers: %+v, params: %+v, Body: %s", req.URL, req.Method, req.Header, query, string(body))
// 重试机制
for i := 0; i <= httpConfig.RetryTimes; i++ {
var lastErrorMessage string
for i := 0; i < httpConfig.RetryTimes; i++ {
var resp *http.Response
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(httpConfig.Timeout)*time.Millisecond)
resp, err = client.Do(req.WithContext(ctx))
cancel() // 确保释放资源
req, err := ncc.makeHTTPRequest(httpConfig, url, headers, parameters, body)
if err != nil {
logger.Errorf("send_http: failed to create request. url=%s request_body=%s error=%v", url, string(body), err)
return fmt.Sprintf("failed to create request. error: %v", err), err
}
resp, err = client.Do(req)
if err != nil {
logger.Errorf("send_http: failed to send http notify. url=%s request_body=%s error=%v", url, string(body), err)
lastErrorMessage = err.Error()
time.Sleep(time.Duration(httpConfig.RetryInterval) * time.Second)
logger.Errorf("send http request failed to send http notify: %v", err)
continue
}
defer resp.Body.Close()
@@ -502,11 +553,9 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
// 读取响应
body, err := io.ReadAll(resp.Body)
logger.Debugf("send http request: %+v, response: %+v, body: %+v", req, resp, string(body))
if err != nil {
logger.Errorf("failed to send http notify: %v", err)
logger.Errorf("send_http: failed to read response. url=%s request_body=%s error=%v", url, string(body), err)
}
if resp.StatusCode == http.StatusOK {
return string(body), nil
}
@@ -514,8 +563,7 @@ func (ncc *NotifyChannelConfig) SendHTTP(events []*AlertCurEvent, tpl map[string
return "", fmt.Errorf("failed to send request, status code: %d, body: %s", resp.StatusCode, string(body))
}
return "", err
return lastErrorMessage, errors.New("all retries failed, last error: " + lastErrorMessage)
}
// getAliQuery 获取阿里云API的查询参数和请求头
@@ -1421,6 +1469,8 @@ var NotiChMap = []*NotifyChannelConfig{
},
FlashDutyRequestConfig: &FlashDutyRequestConfig{
IntegrationUrl: "flashduty integration url",
Timeout: 5000, // 默认5秒超时
RetryTimes: 3, // 默认重试3次
},
},
},

View File

@@ -117,7 +117,7 @@ func (pc *PromClientMap) loadFromDatabase() {
continue
}
logger.Info("setClientFromPromOption success: ", dsId)
logger.Infof("setClientFromPromOption success, datasourceId: %d", dsId)
PromOptions.Set(dsId, po)
continue
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/pstat"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
)
@@ -23,6 +24,7 @@ type Set struct {
redis storage.Redis
ctx *ctx.Context
configs pconf.Pushgw
sema *semaphore.Semaphore
}
func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
@@ -32,6 +34,7 @@ func New(ctx *ctx.Context, redis storage.Redis, configs pconf.Pushgw) *Set {
ctx: ctx,
configs: configs,
}
set.sema = semaphore.NewSemaphore(configs.UpdateTargetByUrlConcurrency)
set.Init()
return set
@@ -113,8 +116,26 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
Lst: lst,
Now: now,
}
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
return err
if !s.sema.TryAcquire() {
logger.Warningf("update_targets: update target by url concurrency limit, skip update target: %v", lst)
return nil // 达到并发上限,放弃请求,只是页面上的机器时间不更新,不影响机器失联告警,降级处理下
}
go func() {
defer s.sema.Release()
// 修改为异步发送,防止机器太多,每个请求耗时比较长导致机器心跳时间更新不及时
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
if err != nil {
logger.Errorf("failed to post target update: %v", err)
}
}()
return nil
}
if s.configs.UpdateDBTargetTimestampDisable {
// 如果 mysql 压力太大,关闭更新 db 的操作
return nil
}
// there are some idents not found in db, so insert them
@@ -133,16 +154,34 @@ func (s *Set) UpdateTargets(lst []string, now int64) error {
}
// 从批量更新一批机器的时间戳改成逐台更新是为了避免批量更新时mysql的锁竞争问题
for i := 0; i < len(exists); i++ {
err = s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, exists[i]).Error
if err != nil {
logger.Error("upsert_target: failed to update target:", exists[i], "error:", err)
start := time.Now()
duration := time.Since(start).Seconds()
if len(exists) > 0 {
sema := semaphore.NewSemaphore(s.configs.UpdateDBTargetConcurrency)
wg := sync.WaitGroup{}
for i := 0; i < len(exists); i++ {
sema.Acquire()
wg.Add(1)
go func(ident string) {
defer sema.Release()
defer wg.Done()
s.updateDBTargetTs(ident, now)
}(exists[i])
}
wg.Wait()
}
pstat.DBOperationLatency.WithLabelValues("update_targets_ts").Observe(duration)
return nil
}
func (s *Set) updateDBTargetTs(ident string, now int64) {
err := s.ctx.DB.Exec("UPDATE target SET update_at = ? WHERE ident = ?", now, ident).Error
if err != nil {
logger.Error("update_target: failed to update target:", ident, "error:", err)
}
}
func (s *Set) updateTargetsUpdateTs(lst []string, now int64, redis storage.Redis) error {
if redis == nil {
return fmt.Errorf("redis is nil")

View File

@@ -18,6 +18,10 @@ type Pushgw struct {
UpdateTargetRetryIntervalMills int64
UpdateTargetTimeoutMills int64
UpdateTargetBatchSize int
UpdateDBTargetConcurrency int
UpdateDBTargetTimestampDisable bool
PushConcurrency int
UpdateTargetByUrlConcurrency int
BusiGroupLabelKey string
IdentMetrics []string
@@ -124,6 +128,18 @@ func (p *Pushgw) PreCheck() {
p.UpdateTargetBatchSize = 20
}
if p.UpdateDBTargetConcurrency <= 0 {
p.UpdateDBTargetConcurrency = 16
}
if p.PushConcurrency <= 0 {
p.PushConcurrency = 16
}
if p.UpdateTargetByUrlConcurrency <= 0 {
p.UpdateTargetByUrlConcurrency = 10
}
if p.BusiGroupLabelKey == "" {
p.BusiGroupLabelKey = "busigroup"
}

View File

@@ -105,6 +105,17 @@ var (
},
[]string{"operation", "status"},
)
DBOperationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "db_operation_latency_seconds",
Help: "Histogram of latencies for DB operations",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5},
},
[]string{"operation"},
)
)
func init() {
@@ -121,5 +132,6 @@ func init() {
GaugeSampleQueueSize,
CounterPushQueueOverLimitTotal,
RedisOperationLatency,
DBOperationLatency,
)
}

View File

@@ -157,10 +157,11 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}
type WritersType struct {
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
pushgw pconf.Pushgw
backends map[string]Writer
queues map[string]*IdentQueue
AllQueueLen atomic.Value
PushConcurrency atomic.Int64
sync.RWMutex
}
@@ -210,6 +211,27 @@ func (ws *WritersType) Put(name string, writer Writer) {
ws.backends[name] = writer
}
func (ws *WritersType) isCriticalBackend(key string) bool {
backend, exists := ws.backends[key]
if !exists {
return false
}
// 使用类型断言判断
switch backend.(type) {
case WriterType:
// HTTP Writer 作为关键后端
return true
case KafkaWriterType:
// Kafka Writer 作为非关键后端
return false
default:
// 未知类型,保守起见作为关键后端
logger.Warningf("Unknown backend type: %T, treating as critical", backend)
return true
}
}
func (ws *WritersType) CleanExpQueue() {
for {
ws.Lock()
@@ -278,12 +300,64 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
continue
}
for key := range ws.backends {
ws.backends[key].Write(key, series)
if ws.isCriticalBackend(key) {
ws.backends[key].Write(key, series)
} else {
// 像 kafka 这种 writer 使用异步写入,防止因为写入太慢影响主流程
ws.writeToNonCriticalBackend(key, series)
}
}
}
}
}
func (ws *WritersType) writeToNonCriticalBackend(key string, series []prompb.TimeSeries) {
// 原子性地检查并增加并发数
currentConcurrency := ws.PushConcurrency.Add(1)
if currentConcurrency > int64(ws.pushgw.PushConcurrency) {
// 超过限制,立即减少计数并丢弃
ws.PushConcurrency.Add(-1)
logger.Warningf("push concurrency limit exceeded, current: %d, limit: %d, dropping %d series for backend: %s",
currentConcurrency-1, ws.pushgw.PushConcurrency, len(series), key)
pstat.CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(series)))
return
}
// 深拷贝数据,确保并发安全
seriesCopy := ws.deepCopySeries(series)
// 启动goroutine处理
go func(backendKey string, data []prompb.TimeSeries) {
defer func() {
ws.PushConcurrency.Add(-1)
if r := recover(); r != nil {
logger.Errorf("panic in non-critical backend %s: %v", backendKey, r)
}
}()
ws.backends[backendKey].Write(backendKey, data)
}(key, seriesCopy)
}
// 完整的深拷贝方法
func (ws *WritersType) deepCopySeries(series []prompb.TimeSeries) []prompb.TimeSeries {
seriesCopy := make([]prompb.TimeSeries, len(series))
for i := range series {
seriesCopy[i] = series[i]
if len(series[i].Samples) > 0 {
samples := make([]prompb.Sample, len(series[i].Samples))
copy(samples, series[i].Samples)
seriesCopy[i].Samples = samples
}
}
return seriesCopy
}
func (ws *WritersType) Init() error {
ws.AllQueueLen.Store(int64(0))