Compare commits

...

7 Commits

Author SHA1 Message Date
ning
b4c16b98f3 fix: edge eval with var 2025-03-17 15:51:11 +08:00
Yening Qin
30519fe909 refactor: optimize post func (#2455) 2025-02-08 18:27:56 +08:00
ning
5b8fb144ac refactor: print log 2025-01-22 20:17:22 +08:00
Yening Qin
696f058975 feat: es alert support nodata alert (#2446) 2025-01-21 18:03:47 +08:00
Yening Qin
6195904715 refactor: optimize data recv and push 2025-01-16 15:52:57 +08:00
Yening Qin
47ace00ffa refactor data recv limit (#2430) 2025-01-14 16:17:27 +08:00
Yening Qin
d8feeec345 refactor: change metric type filter (#2426) 2025-01-10 17:32:57 +08:00
23 changed files with 459 additions and 292 deletions

View File

@@ -20,6 +20,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/hash"
"github.com/ccfos/nightingale/v6/pkg/parser"
"github.com/ccfos/nightingale/v6/pkg/poster"
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
promql2 "github.com/ccfos/nightingale/v6/pkg/promql"
"github.com/ccfos/nightingale/v6/pkg/unit"
@@ -48,6 +49,8 @@ type AlertRuleWorker struct {
HostAndDeviceIdentCache sync.Map
LastSeriesStore map[uint64]models.DataResp
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
}
@@ -84,6 +87,7 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
return nil, nil
},
LastSeriesStore: make(map[uint64]models.DataResp),
}
interval := rule.PromEvalInterval
@@ -533,7 +537,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
if len(query) == 0 {
paramsKeyAllLabel, err := getParamKeyAllLabel(varToLabel[paramKey], originPromql, readerClient)
if err != nil {
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v", arw.Key(), paramQuery.Query, err)
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v query:%s", arw.Key(), err, paramQuery.Query)
}
params = paramsKeyAllLabel
} else {
@@ -630,17 +634,27 @@ func (arw *AlertRuleWorker) getHostIdents(paramQuery models.ParamQuery) ([]strin
return nil, err
}
hostsQuery := models.GetHostsQuery(queries)
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
var lst []*models.Target
err = session.Find(&lst).Error
if err != nil {
return nil, err
if !arw.Ctx.IsCenter {
lst, err := poster.PostByUrlsWithResp[[]*models.Target](arw.Ctx, "/v1/n9e/targets-of-host-query", queries)
if err != nil {
return nil, err
}
for i := range lst {
params = append(params, lst[i].Ident)
}
} else {
hostsQuery := models.GetHostsQuery(queries)
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
var lst []*models.Target
err = session.Find(&lst).Error
if err != nil {
return nil, err
}
for i := range lst {
params = append(params, lst[i].Ident)
}
}
for i := range lst {
params = append(params, lst[i].Ident)
}
arw.HostAndDeviceIdentCache.Store(cacheKey, params)
return params, nil
}
@@ -819,122 +833,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
return lst, nil
}
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]models.AnomalyPoint, []models.AnomalyPoint) {
points := []models.AnomalyPoint{}
recoverPoints := []models.AnomalyPoint{}
if len(ruleQuery.Triggers) == 0 {
return points, recoverPoints
}
if len(seriesTagIndexes) == 0 {
return points, recoverPoints
}
unitMap := make(map[string]string)
for _, query := range ruleQuery.Queries {
ref, unit, err := GetQueryRefAndUnit(query)
if err != nil {
continue
}
unitMap[ref] = unit
}
for _, trigger := range ruleQuery.Triggers {
// seriesTagIndex 的 key 仅做分组使用value 为每组 series 的 hash
seriesTagIndex := ProcessJoins(ruleId, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", ruleId, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", ruleId, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap[series.Ref] = unit.ValueFormatter(u, 2, v)
}
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
ts = int64(t)
sample = series
value = v
}
isTriggered := parser.Calc(trigger.Exp, m)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
}
values += fmt.Sprintf("%s:%v ", k, v)
}
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
if sample.Query != "" {
point.Query = sample.Query
}
// 恢复条件判断经过讨论是只在表达式模式下支持,表达式模式会通过 isTriggered 判断是告警点还是恢复点
// 1. 不设置恢复判断,满足恢复条件产生 recoverPoint 恢复,无数据不产生 anomalyPoint 恢复
// 2. 设置满足条件才恢复,仅可通过产生 recoverPoint 恢复,不能通过不产生 anomalyPoint 恢复
// 3. 设置无数据不恢复,仅可通过产生 recoverPoint 恢复,不产生 anomalyPoint 恢复
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// 对齐原实现 do nothing
case models.RecoverOnCondition:
// 额外判断恢复条件,满足才恢复
fulfill := parser.Calc(trigger.RecoverConfig.RecoverExp, m)
if !fulfill {
continue
}
}
recoverPoints = append(recoverPoints, point)
}
}
}
return points, recoverPoints
}
func flatten(rehashed map[uint64][][]uint64) map[uint64][]uint64 {
seriesTagIndex := make(map[uint64][]uint64)
var i uint64
@@ -1486,107 +1384,158 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
unitMap[ref] = unit
}
// 判断
for _, trigger := range ruleQuery.Triggers {
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
if !ruleQuery.ExpTriggerDisable {
for _, trigger := range ruleQuery.Triggers {
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
for _, seriesHash := range seriesTagIndex {
valuesUnitMap := make(map[string]unit.FormattedValue)
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
sort.Slice(seriesHash, func(i, j int) bool {
return seriesHash[i] < seriesHash[j]
})
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
for k, v := range series.Metric {
if k == "__name__" {
m := make(map[string]interface{})
var ts int64
var sample models.DataResp
var value float64
for _, serieHash := range seriesHash {
series, exists := seriesStore[serieHash]
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
continue
}
t, v, exists := series.Last()
if !exists {
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
// 过滤掉表达式中不包含的标签
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
// 表达式中不包含该变量
continue
}
m["$"+series.Ref+"."+string(k)] = string(v)
m["$"+series.Ref] = v
m["$"+series.Ref+"."+series.MetricName()] = v
for k, v := range series.Metric {
if k == "__name__" {
continue
}
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
// 过滤掉表达式中不包含的标签
continue
}
m["$"+series.Ref+"."+string(k)] = string(v)
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
}
ts = int64(t)
sample = series
value = v
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
}
if u, exists := unitMap[series.Ref]; exists {
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
}
switch v.(type) {
case float64:
values += fmt.Sprintf("%s:%.3f ", k, v)
case string:
values += fmt.Sprintf("%s:%s ", k, v)
}
}
ts = int64(t)
sample = series
value = v
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
}
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
// 此条日志很重要,是告警判断的现场值
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
var values string
for k, v := range m {
if !strings.Contains(k, ".") {
continue
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
switch v.(type) {
case float64:
values += fmt.Sprintf("%s:%.3f ", k, v)
case string:
values += fmt.Sprintf("%s:%s ", k, v)
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// do nothing
case models.RecoverOnCondition:
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
if !fulfill {
continue
}
}
recoverPoints = append(recoverPoints, point)
}
}
}
}
point := models.AnomalyPoint{
Key: sample.MetricName(),
Labels: sample.Metric,
Timestamp: int64(ts),
Value: value,
Values: values,
Severity: trigger.Severity,
Triggered: isTriggered,
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
RecoverConfig: trigger.RecoverConfig,
ValuesUnit: valuesUnitMap,
}
if ruleQuery.NodataTrigger.Enable {
if isTriggered {
points = append(points, point)
} else {
switch trigger.RecoverConfig.JudgeType {
case models.Origin:
// do nothing
case models.RecoverOnCondition:
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
if !fulfill {
now := time.Now().Unix()
// 使用 arw.LastSeriesStore 检查上次查询结果
if len(arw.LastSeriesStore) > 0 {
// 遍历上次的曲线数据
for hash, lastSeries := range arw.LastSeriesStore {
if ruleQuery.NodataTrigger.ResolveAfterEnable {
lastTs, _, exists := lastSeries.Last()
if !exists {
continue
}
// 检查是否超过 resolve_after 时间
if now-int64(lastTs) > int64(ruleQuery.NodataTrigger.ResolveAfter) {
logger.Infof("rule_eval rid:%d series:%+v resolve after %d seconds now:%d lastTs:%d", rule.Id, lastSeries, ruleQuery.NodataTrigger.ResolveAfter, now, int64(lastTs))
delete(arw.LastSeriesStore, hash)
continue
}
}
recoverPoints = append(recoverPoints, point)
// 检查是否在本次查询结果中存在
if _, exists := seriesStore[hash]; !exists {
// 生成无数据告警点
point := models.AnomalyPoint{
Key: lastSeries.MetricName(),
Labels: lastSeries.Metric,
Timestamp: now,
Value: 0,
Values: fmt.Sprintf("nodata since %v", time.Unix(now, 0).Format("2006-01-02 15:04:05")),
Severity: ruleQuery.NodataTrigger.Severity,
Triggered: true,
Query: fmt.Sprintf("nodata check for %s", lastSeries.LabelsString()),
TriggerType: models.TriggerTypeNodata,
}
points = append(points, point)
logger.Infof("rule_eval rid:%d nodata point:%+v", rule.Id, point)
}
}
}
// 更新 arw.LastSeriesStore
for hash, series := range seriesStore {
arw.LastSeriesStore[hash] = series
}
}
}

View File

@@ -224,6 +224,15 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
event.RecoverConfig = anomalyPoint.RecoverConfig
event.RuleHash = ruleHash
if anomalyPoint.TriggerType == models.TriggerTypeNodata {
event.TriggerValue = "nodata"
ruleConfig := models.RuleQuery{}
json.Unmarshal([]byte(p.rule.RuleConfig), &ruleConfig)
ruleConfig.TriggerType = anomalyPoint.TriggerType
b, _ := json.Marshal(ruleConfig)
event.RuleConfig = string(b)
}
if err := json.Unmarshal([]byte(p.rule.Annotations), &event.AnnotationsJSON); err != nil {
event.AnnotationsJSON = make(map[string]string) // 解析失败时使用空 map
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)

View File

@@ -92,7 +92,6 @@ func (s *Set) updateMeta(items map[string]models.HostMeta) {
func (s *Set) updateTargets(m map[string]models.HostMeta) error {
if s.redis == nil {
logger.Warningf("redis is nil")
return nil
}

View File

@@ -549,6 +549,8 @@ func (rt *Router) Config(r *gin.Engine) {
service.PUT("/targets/note", rt.targetUpdateNoteByService)
service.PUT("/targets/bgid", rt.targetUpdateBgidByService)
service.POST("/targets-of-host-query", rt.targetsOfHostQuery)
service.POST("/alert-rules", rt.alertRuleAddByService)
service.POST("/alert-rule-add", rt.alertRuleAddOneByService)
service.DELETE("/alert-rules", rt.alertRuleDelByService)

View File

@@ -102,9 +102,29 @@ func (rt *Router) builtinMetricsDefaultTypes(c *gin.Context) {
func (rt *Router) builtinMetricsTypes(c *gin.Context) {
collector := ginx.QueryStr(c, "collector", "")
query := ginx.QueryStr(c, "query", "")
disabled := ginx.QueryInt(c, "disabled", -1)
lang := c.GetHeader("X-Language")
ginx.NewRender(c).Data(models.BuiltinMetricTypes(rt.Ctx, lang, collector, query))
metricTypeList, err := models.BuiltinMetricTypes(rt.Ctx, lang, collector, query)
ginx.Dangerous(err)
componentList, err := models.BuiltinComponentGets(rt.Ctx, "", disabled)
ginx.Dangerous(err)
// 创建一个 map 来存储 componentList 中的类型
componentTypes := make(map[string]struct{})
for _, comp := range componentList {
componentTypes[comp.Ident] = struct{}{}
}
filteredMetricTypeList := make([]string, 0)
for _, metricType := range metricTypeList {
if _, exists := componentTypes[metricType]; exists {
filteredMetricTypeList = append(filteredMetricTypeList, metricType)
}
}
ginx.NewRender(c).Data(filteredMetricTypeList, nil)
}
func (rt *Router) builtinMetricsCollectors(c *gin.Context) {

View File

@@ -406,16 +406,15 @@ func haveNeverGroupedIdent(ctx *ctx.Context, idents []string) (bool, error) {
if err != nil {
return false, err
}
if len(bgids) <= 0 {
return true, nil
}
}
}
return false, nil
}
func (rt *Router) targetBindBgids(c *gin.Context) {
var f targetBgidsForm
var err error
@@ -571,3 +570,18 @@ func (rt *Router) targetsOfAlertRule(c *gin.Context) {
ginx.NewRender(c).Data(ret, err)
}
func (rt *Router) targetsOfHostQuery(c *gin.Context) {
var queries []models.HostQuery
ginx.BindJSON(c, &queries)
hostsQuery := models.GetHostsQuery(queries)
session := models.TargetFilterQueryBuild(rt.Ctx, hostsQuery, 0, 0)
var lst []*models.Target
err := session.Find(&lst).Error
if err != nil {
ginx.Bomb(http.StatusInternalServerError, err.Error())
}
ginx.NewRender(c).Data(lst, nil)
}

View File

@@ -171,7 +171,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
if err != nil {
logger.Warningf("get plugin:%+v fail: %v", item, err)
logger.Debugf("get plugin:%+v fail: %v", item, err)
continue
}

View File

@@ -161,9 +161,9 @@ type PromRuleConfig struct {
type RecoverJudge int
const (
Origin RecoverJudge = 0
RecoverWithoutData RecoverJudge = 1
RecoverOnCondition RecoverJudge = 2
Origin RecoverJudge = 0
NotRecoverWhenNoData RecoverJudge = 1
RecoverOnCondition RecoverJudge = 2
)
type RecoverConfig struct {
@@ -194,10 +194,20 @@ type HostTrigger struct {
}
type RuleQuery struct {
Version string `json:"version"`
Inhibit bool `json:"inhibit"`
Queries []interface{} `json:"queries"`
Triggers []Trigger `json:"triggers"`
Version string `json:"version"`
Inhibit bool `json:"inhibit"`
Queries []interface{} `json:"queries"`
ExpTriggerDisable bool `json:"exp_trigger_disable"`
Triggers []Trigger `json:"triggers"`
NodataTrigger NodataTrigger `json:"nodata_trigger"`
TriggerType TriggerType `json:"trigger_type,omitempty"`
}
type NodataTrigger struct {
Enable bool `json:"enable"`
Severity int `json:"severity"`
ResolveAfterEnable bool `json:"resolve_after_enable"`
ResolveAfter int `json:"resolve_after"` // 单位秒
}
type Trigger struct {
@@ -1175,8 +1185,6 @@ func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
event.RuleProd = ar.Prod
event.RuleAlgo = ar.Algorithm
event.PromForDuration = ar.PromForDuration
event.RuleConfig = ar.RuleConfig
event.RuleConfigJson = ar.RuleConfigJson
event.Callbacks = ar.Callbacks
event.CallbacksJSON = ar.CallbacksJSON
event.RunbookUrl = ar.RunbookUrl

View File

@@ -20,8 +20,16 @@ type AnomalyPoint struct {
Values string `json:"values"`
ValuesUnit map[string]unit.FormattedValue `json:"values_unit"`
RecoverConfig RecoverConfig `json:"recover_config"`
TriggerType TriggerType `json:"trigger_type"`
}
type TriggerType string
const (
TriggerTypeNormal TriggerType = "normal"
TriggerTypeNodata TriggerType = "nodata"
)
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {
anomalyPointLabels := make(model.Metric)
for k, v := range labels {

View File

@@ -162,11 +162,6 @@ var entries = []CheckEntry{
ErrorMessage: "Some recovery scripts still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TaskRecord{},
ErrorMessage: "Some Task Record records still in the BusiGroup",
FieldName: "group_id",
},
{
Entry: &TargetBusiGroup{},
ErrorMessage: "Some target busigroups still in the BusiGroup",

View File

@@ -53,6 +53,12 @@ func (d *DataResp) MetricName() string {
return string(metric)
}
// labels 转换为 string
func (d *DataResp) LabelsString() string {
labels := d.Metric
return labels.String()
}
type RelationKey struct {
LeftKey string `json:"left_key"`
RightKey string `json:"right_key"`

View File

@@ -23,24 +23,31 @@ type DataResponse[T any] struct {
}
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
var err error
addrs := ctx.CenterApi.Addrs
if len(addrs) == 0 {
var dat T
return dat, fmt.Errorf("no center api addresses configured")
}
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
dat, e := GetByUrl[T](url, ctx.CenterApi)
if e != nil {
err = e
// 随机选择起始位置
startIdx := rand.Intn(len(addrs))
// 从随机位置开始遍历所有地址
var dat T
var err error
for i := 0; i < len(addrs); i++ {
idx := (startIdx + i) % len(addrs)
url := fmt.Sprintf("%s%s", addrs[idx], path)
dat, err = GetByUrl[T](url, ctx.CenterApi)
if err != nil {
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
continue
}
return dat, nil
}
var dat T
err = fmt.Errorf("failed to get data from center, path= %s, ctx.CenterApi.Addrs= %v", path, addrs)
return dat, err
return dat, fmt.Errorf("failed to get data from center, path= %s, addrs= %v err: %v", path, addrs, err)
}
func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
@@ -97,25 +104,32 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
return dataResp.Dat, nil
}
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
func PostByUrls(ctx *ctx.Context, path string, v interface{}) error {
addrs := ctx.CenterApi.Addrs
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
_, err = PostByUrl[interface{}](url, ctx.CenterApi, v)
if err == nil {
return
}
}
if len(addrs) < 1 {
err = fmt.Errorf("submission of the POST request from the center has failed, "+
if len(addrs) == 0 {
return fmt.Errorf("submission of the POST request from the center has failed, "+
"path= %s, v= %v, ctx.CenterApi.Addrs= %v", path, v, addrs)
}
return
// 随机选择起始位置
startIdx := rand.Intn(len(addrs))
// 从随机位置开始遍历所有地址
for i := 0; i < len(addrs); i++ {
idx := (startIdx + i) % len(addrs)
url := fmt.Sprintf("%s%s", addrs[idx], path)
_, err := PostByUrl[interface{}](url, ctx.CenterApi, v)
if err != nil {
logger.Warningf("failed to post data to center, url: %s, err: %v", url, err)
continue
}
return nil
}
return fmt.Errorf("failed to post data to center, path= %s, addrs= %v", path, addrs)
}
func PostByUrlsWithResp[T any](ctx *ctx.Context, path string, v interface{}) (t T, err error) {
addrs := ctx.CenterApi.Addrs
if len(addrs) < 1 {
@@ -123,14 +137,24 @@ func PostByUrlsWithResp[T any](ctx *ctx.Context, path string, v interface{}) (t
"path= %s, v= %v, ctx.CenterApi.Addrs= %v", path, v, addrs)
return
}
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
t, err = PostByUrl[T](fmt.Sprintf("%s%s", addr, path), ctx.CenterApi, v)
if err == nil {
break
// 随机选择起始位置
startIdx := rand.Intn(len(addrs))
// 从随机位置开始遍历所有地址
for i := 0; i < len(addrs); i++ {
idx := (startIdx + i) % len(addrs)
url := fmt.Sprintf("%s%s", addrs[idx], path)
t, err = PostByUrl[T](url, ctx.CenterApi, v)
if err != nil {
logger.Warningf("failed to post data to center, url: %s, err: %v", url, err)
continue
}
return t, nil
}
return
return t, fmt.Errorf("failed to post data to center, path= %s, addrs= %v err: %v", path, addrs, err)
}
func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err error) {

View File

@@ -96,7 +96,7 @@ type TargetUpdate struct {
func (s *Set) UpdateTargets(lst []string, now int64) error {
err := updateTargetsUpdateTs(lst, now, s.redis)
if err != nil {
if err != nil && s.ctx.IsCenter {
logger.Errorf("failed to update targets:%v update_ts: %v", lst, err)
}

View File

@@ -28,6 +28,9 @@ type WriterGlobalOpt struct {
QueuePopSize int
AllQueueMaxSize int
AllQueueMaxSizeInterval int
RetryCount int
RetryInterval int64
OverLimitStatusCode int
}
type WriterOptions struct {
@@ -80,13 +83,25 @@ func (p *Pushgw) PreCheck() {
}
if p.WriterOpt.AllQueueMaxSize <= 0 {
p.WriterOpt.AllQueueMaxSize = 10000000
p.WriterOpt.AllQueueMaxSize = 5000000
}
if p.WriterOpt.AllQueueMaxSizeInterval <= 0 {
p.WriterOpt.AllQueueMaxSizeInterval = 200
}
if p.WriterOpt.RetryCount <= 0 {
p.WriterOpt.RetryCount = 1000
}
if p.WriterOpt.RetryInterval <= 0 {
p.WriterOpt.RetryInterval = 1
}
if p.WriterOpt.OverLimitStatusCode <= 0 {
p.WriterOpt.OverLimitStatusCode = 499
}
if p.WriteConcurrency <= 0 {
p.WriteConcurrency = 5000
}

View File

@@ -145,38 +145,39 @@ func matchSample(filterMap, sampleMap map[string]string) bool {
return true
}
func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSeries) {
func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSeries) error {
v = rt.BeforePush(clientIP, v)
if v == nil {
return
return nil
}
IdentStats.Increment(ident, 1)
if rt.DropSample(v) {
CounterDropSampleTotal.WithLabelValues(ident).Inc()
return
return nil
}
count := IdentStats.Get(ident)
if count > rt.Pushgw.IdentDropThreshold {
CounterDropSampleTotal.WithLabelValues(ident).Inc()
return
// 单个 ident 的样本数超过阈值,不算异常,不影响其他机器的上报,不返回 err
return nil
}
rt.Writers.PushSample(ident, *v)
return rt.Writers.PushSample(ident, *v)
}
func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.TimeSeries) {
func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.TimeSeries) error {
v = rt.BeforePush(clientIP, v)
rt.debugSample(clientIP, v)
if v == nil {
return
return nil
}
IdentStats.Increment(metric, 1)
if rt.DropSample(v) {
CounterDropSampleTotal.WithLabelValues(metric).Inc()
return
return nil
}
var hashkey string
@@ -185,7 +186,8 @@ func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.Time
} else {
hashkey = metric[0:1]
}
rt.Writers.PushSample(hashkey, *v)
return rt.Writers.PushSample(hashkey, *v)
}
func (rt *Router) BeforePush(clientIP string, v *prompb.TimeSeries) *prompb.TimeSeries {

View File

@@ -1,6 +1,9 @@
package router
import (
"fmt"
"time"
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
@@ -30,6 +33,20 @@ type Router struct {
HeartbeartApi string
}
func stat() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
code := fmt.Sprintf("%d", c.Writer.Status())
method := c.Request.Method
labels := []string{"pushgw", code, c.FullPath(), method}
RequestCounter.WithLabelValues(labels...).Inc()
RequestDuration.WithLabelValues(labels...).Observe(float64(time.Since(start).Seconds()))
}
}
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType,
idents *idents.Set, metas *metas.Set,
writers *writer.WritersType, ctx *ctx.Context) *Router {
@@ -61,6 +78,7 @@ func (rt *Router) Config(r *gin.Engine) {
registerMetrics()
go rt.ReportIdentStats()
r.Use(stat())
// datadog url: http://n9e-pushgw.foo.com/datadog
// use apiKey not basic auth
r.POST("/datadog/api/v1/series", rt.datadogSeries)

View File

@@ -257,9 +257,14 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if ident != "" {
r.ForwardByIdent(c.ClientIP(), ident, pt)
err = r.ForwardByIdent(c.ClientIP(), ident, pt)
} else {
r.ForwardByMetric(c.ClientIP(), item.Metric, pt)
err = r.ForwardByMetric(c.ClientIP(), item.Metric, pt)
}
if err != nil {
c.String(r.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
return
}
succ++

View File

@@ -207,9 +207,14 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if ident != "" {
rt.ForwardByIdent(c.ClientIP(), ident, pt)
err = rt.ForwardByIdent(c.ClientIP(), ident, pt)
} else {
rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
err = rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
}
if err != nil {
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
return
}
succ++

View File

@@ -202,9 +202,14 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
}
if host != "" {
rt.ForwardByIdent(c.ClientIP(), host, pt)
err = rt.ForwardByIdent(c.ClientIP(), host, pt)
} else {
rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
err = rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
}
if err != nil {
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
return
}
succ++

View File

@@ -1,15 +1,18 @@
package router
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
func extractMetricFromTimeSeries(s *prompb.TimeSeries) string {
@@ -99,6 +102,15 @@ func duplicateLabelKey(series *prompb.TimeSeries) bool {
}
func (rt *Router) remoteWrite(c *gin.Context) {
curLen := rt.Writers.AllQueueLen.Load().(int)
if curLen > rt.Pushgw.WriterOpt.AllQueueMaxSize {
err := fmt.Errorf("write queue full, metric count over limit: %d", curLen)
logger.Warning(err)
writer.CounterPushQueueOverLimitTotal.Inc()
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
return
}
req, err := DecodeWriteRequest(c.Request.Body)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
@@ -138,15 +150,23 @@ func (rt *Router) remoteWrite(c *gin.Context) {
ids[ident] = struct{}{}
}
var err error
if len(ident) > 0 {
rt.ForwardByIdent(c.ClientIP(), ident, &req.Timeseries[i])
err = rt.ForwardByIdent(c.ClientIP(), ident, &req.Timeseries[i])
} else {
rt.ForwardByMetric(c.ClientIP(), extractMetricFromTimeSeries(&req.Timeseries[i]), &req.Timeseries[i])
err = rt.ForwardByMetric(c.ClientIP(), extractMetricFromTimeSeries(&req.Timeseries[i]), &req.Timeseries[i])
}
if err != nil {
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
return
}
}
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
rt.IdentSet.MSet(ids)
c.String(200, "")
}
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling

View File

@@ -8,6 +8,8 @@ const (
)
var (
labels = []string{"service", "code", "path", "method"}
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@@ -28,6 +30,24 @@ var (
Name: "sample_received_by_ident",
Help: "Number of sample push by ident.",
}, []string{"host_ident"})
RequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "http_request_count_total",
Help: "Total number of HTTP requests made.",
}, labels,
)
RequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "http_request_duration_seconds",
Help: "HTTP request latencies in seconds.",
}, labels,
)
)
func registerMetrics() {
@@ -35,5 +55,7 @@ func registerMetrics() {
CounterSampleTotal,
CounterDropSampleTotal,
CounterSampleReceivedByIdent,
RequestCounter,
RequestDuration,
)
}

View File

@@ -28,6 +28,13 @@ var (
}, []string{"host_ident"},
)
GaugeAllQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "all_queue_size",
Help: "The size of all queue.",
})
CounterWirteTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
@@ -48,6 +55,13 @@ var (
Name: "push_queue_error_total",
Help: "Number of push queue error.",
}, []string{"host_ident"})
CounterPushQueueOverLimitTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "push_queue_over_limit_error_total",
Help: "Number of push queue over limit.",
})
)
func init() {
@@ -57,5 +71,7 @@ func init() {
CounterWirteErrorTotal,
CounterPushQueueErrorTotal,
GaugeSampleQueueSize,
CounterPushQueueOverLimitTotal,
GaugeAllQueueSize,
)
}

View File

@@ -25,6 +25,8 @@ type WriterType struct {
Opts pconf.WriterOptions
ForceUseServerTS bool
Client api.Client
RetryCount int
RetryInterval int64 // 单位秒
}
func (w WriterType) writeRelabel(items []prompb.TimeSeries) []prompb.TimeSeries {
@@ -76,21 +78,32 @@ func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[
return
}
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
for i := 0; i < w.RetryCount; i++ {
err := w.Post(snappy.Encode(nil, data), headers...)
if err == nil {
break
}
CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(items)))
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warning("example timeseries:", items[0].String())
logger.Warningf("post to %s got error: %v in %d times", w.Opts.Url, err, i)
if i == 0 {
logger.Warning("example timeseries:", items[0].String())
}
time.Sleep(time.Duration(w.RetryInterval) * time.Second)
}
}
func (w WriterType) Post(req []byte, headers ...map[string]string) error {
urls := strings.Split(w.Opts.Url, ",")
var err error
var newRequestErr error
var httpReq *http.Request
for _, url := range urls {
httpReq, err = http.NewRequest("POST", url, bytes.NewReader(req))
if err != nil {
logger.Warningf("create remote write:%s request got error: %s", url, err.Error())
httpReq, newRequestErr = http.NewRequest("POST", url, bytes.NewReader(req))
if newRequestErr != nil {
logger.Warningf("create remote write:%s request got error: %s", url, newRequestErr.Error())
continue
}
@@ -126,12 +139,18 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
continue
}
if resp.StatusCode >= 400 {
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
logger.Warningf("push data with remote write:%s request got status code: %v, response body: %s", url, resp.StatusCode, string(body))
continue
}
if resp.StatusCode >= 500 {
err = fmt.Errorf("push data with remote write:%s request got status code: %v, response body: %s", url, resp.StatusCode, string(body))
logger.Warning(err)
continue
}
err = nil
break
}
@@ -142,7 +161,7 @@ type WritersType struct {
pushgw pconf.Pushgw
backends map[string]WriterType
queues map[string]*IdentQueue
allQueueLen atomic.Value
AllQueueLen atomic.Value
sync.RWMutex
}
@@ -159,6 +178,8 @@ func (ws *WritersType) ReportQueueStats(ident string, identQueue *IdentQueue) (i
if count > ws.pushgw.IdentStatsThreshold {
GaugeSampleQueueSize.WithLabelValues(ident).Set(float64(count))
}
GaugeAllQueueSize.Set(float64(ws.AllQueueLen.Load().(int)))
}
}
@@ -170,7 +191,7 @@ func (ws *WritersType) SetAllQueueLen() {
curMetricLen += q.list.Len()
}
ws.RUnlock()
ws.allQueueLen.Store(curMetricLen)
ws.AllQueueLen.Store(curMetricLen)
time.Sleep(time.Duration(ws.pushgw.WriterOpt.AllQueueMaxSizeInterval) * time.Millisecond)
}
}
@@ -180,7 +201,7 @@ func NewWriters(pushgwConfig pconf.Pushgw) *WritersType {
backends: make(map[string]WriterType),
queues: make(map[string]*IdentQueue),
pushgw: pushgwConfig,
allQueueLen: atomic.Value{},
AllQueueLen: atomic.Value{},
}
writers.Init()
@@ -215,7 +236,7 @@ func (ws *WritersType) CleanExpQueue() {
}
}
func (ws *WritersType) PushSample(ident string, v interface{}) {
func (ws *WritersType) PushSample(ident string, v interface{}) error {
ws.RLock()
identQueue := ws.queues[ident]
ws.RUnlock()
@@ -235,11 +256,12 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
}
identQueue.ts = time.Now().Unix()
curLen := ws.allQueueLen.Load().(int)
curLen := ws.AllQueueLen.Load().(int)
if curLen > ws.pushgw.WriterOpt.AllQueueMaxSize {
logger.Warningf("Write %+v full, metric count over limit: %d", v, curLen)
CounterPushQueueErrorTotal.WithLabelValues(ident).Inc()
return
err := fmt.Errorf("write queue full, metric count over limit: %d", curLen)
logger.Warning(err)
CounterPushQueueOverLimitTotal.Inc()
return err
}
succ := identQueue.list.PushFront(v)
@@ -247,6 +269,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, identQueue.list.Len())
CounterPushQueueErrorTotal.WithLabelValues(ident).Inc()
}
return nil
}
func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
@@ -270,7 +293,7 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
func (ws *WritersType) Init() error {
opts := ws.pushgw.Writers
ws.allQueueLen.Store(0)
ws.AllQueueLen.Store(0)
for i := 0; i < len(opts); i++ {
tlsConf, err := opts[i].ClientConfig.TLSConfig()
@@ -310,6 +333,8 @@ func (ws *WritersType) Init() error {
Opts: opts[i],
Client: cli,
ForceUseServerTS: ws.pushgw.ForceUseServerTS,
RetryCount: ws.pushgw.WriterOpt.RetryCount,
RetryInterval: ws.pushgw.WriterOpt.RetryInterval,
}
ws.Put(opts[i].Url, writer)