mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-04 06:59:00 +00:00
Compare commits
7 Commits
dev22
...
release-13
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b4c16b98f3 | ||
|
|
30519fe909 | ||
|
|
5b8fb144ac | ||
|
|
696f058975 | ||
|
|
6195904715 | ||
|
|
47ace00ffa | ||
|
|
d8feeec345 |
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/hash"
|
||||
"github.com/ccfos/nightingale/v6/pkg/parser"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
promsdk "github.com/ccfos/nightingale/v6/pkg/prom"
|
||||
promql2 "github.com/ccfos/nightingale/v6/pkg/promql"
|
||||
"github.com/ccfos/nightingale/v6/pkg/unit"
|
||||
@@ -48,6 +49,8 @@ type AlertRuleWorker struct {
|
||||
|
||||
HostAndDeviceIdentCache sync.Map
|
||||
|
||||
LastSeriesStore map[uint64]models.DataResp
|
||||
|
||||
DeviceIdentHook func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error)
|
||||
}
|
||||
|
||||
@@ -84,6 +87,7 @@ func NewAlertRuleWorker(rule *models.AlertRule, datasourceId int64, Processor *p
|
||||
DeviceIdentHook: func(arw *AlertRuleWorker, paramQuery models.ParamQuery) ([]string, error) {
|
||||
return nil, nil
|
||||
},
|
||||
LastSeriesStore: make(map[uint64]models.DataResp),
|
||||
}
|
||||
|
||||
interval := rule.PromEvalInterval
|
||||
@@ -533,7 +537,7 @@ func (arw *AlertRuleWorker) getParamPermutation(paramVal map[string]models.Param
|
||||
if len(query) == 0 {
|
||||
paramsKeyAllLabel, err := getParamKeyAllLabel(varToLabel[paramKey], originPromql, readerClient)
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v", arw.Key(), paramQuery.Query, err)
|
||||
logger.Errorf("rule_eval:%s, fail to getParamKeyAllLabel, error:%v query:%s", arw.Key(), err, paramQuery.Query)
|
||||
}
|
||||
params = paramsKeyAllLabel
|
||||
} else {
|
||||
@@ -630,17 +634,27 @@ func (arw *AlertRuleWorker) getHostIdents(paramQuery models.ParamQuery) ([]strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hostsQuery := models.GetHostsQuery(queries)
|
||||
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
|
||||
var lst []*models.Target
|
||||
err = session.Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if !arw.Ctx.IsCenter {
|
||||
lst, err := poster.PostByUrlsWithResp[[]*models.Target](arw.Ctx, "/v1/n9e/targets-of-host-query", queries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range lst {
|
||||
params = append(params, lst[i].Ident)
|
||||
}
|
||||
} else {
|
||||
hostsQuery := models.GetHostsQuery(queries)
|
||||
session := models.TargetFilterQueryBuild(arw.Ctx, hostsQuery, 0, 0)
|
||||
var lst []*models.Target
|
||||
err = session.Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range lst {
|
||||
params = append(params, lst[i].Ident)
|
||||
}
|
||||
}
|
||||
for i := range lst {
|
||||
params = append(params, lst[i].Ident)
|
||||
}
|
||||
arw.HostAndDeviceIdentCache.Store(cacheKey, params)
|
||||
|
||||
return params, nil
|
||||
}
|
||||
|
||||
@@ -819,122 +833,6 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) ([]models.Ano
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
func GetAnomalyPoint(ruleId int64, ruleQuery models.RuleQuery, seriesTagIndexes map[string]map[uint64][]uint64, seriesStore map[uint64]models.DataResp) ([]models.AnomalyPoint, []models.AnomalyPoint) {
|
||||
points := []models.AnomalyPoint{}
|
||||
recoverPoints := []models.AnomalyPoint{}
|
||||
|
||||
if len(ruleQuery.Triggers) == 0 {
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
if len(seriesTagIndexes) == 0 {
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
unitMap := make(map[string]string)
|
||||
for _, query := range ruleQuery.Queries {
|
||||
ref, unit, err := GetQueryRefAndUnit(query)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
unitMap[ref] = unit
|
||||
}
|
||||
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
// seriesTagIndex 的 key 仅做分组使用,value 为每组 series 的 hash
|
||||
seriesTagIndex := ProcessJoins(ruleId, trigger, seriesTagIndexes, seriesStore)
|
||||
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", ruleId, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", ruleId, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap[series.Ref] = unit.ValueFormatter(u, 2, v)
|
||||
}
|
||||
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
}
|
||||
isTriggered := parser.Calc(trigger.Exp, m)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", ruleId, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
}
|
||||
values += fmt.Sprintf("%s:%v ", k, v)
|
||||
}
|
||||
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
|
||||
if sample.Query != "" {
|
||||
point.Query = sample.Query
|
||||
}
|
||||
// 恢复条件判断经过讨论是只在表达式模式下支持,表达式模式会通过 isTriggered 判断是告警点还是恢复点
|
||||
// 1. 不设置恢复判断,满足恢复条件产生 recoverPoint 恢复,无数据不产生 anomalyPoint 恢复
|
||||
// 2. 设置满足条件才恢复,仅可通过产生 recoverPoint 恢复,不能通过不产生 anomalyPoint 恢复
|
||||
// 3. 设置无数据不恢复,仅可通过产生 recoverPoint 恢复,不产生 anomalyPoint 恢复
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// 对齐原实现 do nothing
|
||||
case models.RecoverOnCondition:
|
||||
// 额外判断恢复条件,满足才恢复
|
||||
fulfill := parser.Calc(trigger.RecoverConfig.RecoverExp, m)
|
||||
if !fulfill {
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return points, recoverPoints
|
||||
}
|
||||
|
||||
func flatten(rehashed map[uint64][][]uint64) map[uint64][]uint64 {
|
||||
seriesTagIndex := make(map[uint64][]uint64)
|
||||
var i uint64
|
||||
@@ -1486,107 +1384,158 @@ func (arw *AlertRuleWorker) GetAnomalyPoint(rule *models.AlertRule, dsId int64)
|
||||
unitMap[ref] = unit
|
||||
}
|
||||
|
||||
// 判断
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
if !ruleQuery.ExpTriggerDisable {
|
||||
for _, trigger := range ruleQuery.Triggers {
|
||||
seriesTagIndex := ProcessJoins(rule.Id, trigger, seriesTagIndexes, seriesStore)
|
||||
for _, seriesHash := range seriesTagIndex {
|
||||
valuesUnitMap := make(map[string]unit.FormattedValue)
|
||||
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
sort.Slice(seriesHash, func(i, j int) bool {
|
||||
return seriesHash[i] < seriesHash[j]
|
||||
})
|
||||
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
for k, v := range series.Metric {
|
||||
if k == "__name__" {
|
||||
m := make(map[string]interface{})
|
||||
var ts int64
|
||||
var sample models.DataResp
|
||||
var value float64
|
||||
for _, serieHash := range seriesHash {
|
||||
series, exists := seriesStore[serieHash]
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
t, v, exists := series.Last()
|
||||
if !exists {
|
||||
logger.Warningf("rule_eval rid:%d series:%+v value not found", rule.Id, series)
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
|
||||
// 过滤掉表达式中不包含的标签
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref) {
|
||||
// 表达式中不包含该变量
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref+"."+string(k)] = string(v)
|
||||
m["$"+series.Ref] = v
|
||||
m["$"+series.Ref+"."+series.MetricName()] = v
|
||||
for k, v := range series.Metric {
|
||||
if k == "__name__" {
|
||||
continue
|
||||
}
|
||||
|
||||
if !strings.Contains(trigger.Exp, "$"+series.Ref+"."+string(k)) {
|
||||
// 过滤掉表达式中不包含的标签
|
||||
continue
|
||||
}
|
||||
|
||||
m["$"+series.Ref+"."+string(k)] = string(v)
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
|
||||
}
|
||||
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
|
||||
}
|
||||
|
||||
if u, exists := unitMap[series.Ref]; exists {
|
||||
valuesUnitMap["$"+series.Ref+"."+series.MetricName()] = unit.ValueFormatter(u, 2, v)
|
||||
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
}
|
||||
|
||||
switch v.(type) {
|
||||
case float64:
|
||||
values += fmt.Sprintf("%s:%.3f ", k, v)
|
||||
case string:
|
||||
values += fmt.Sprintf("%s:%s ", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
ts = int64(t)
|
||||
sample = series
|
||||
value = v
|
||||
logger.Infof("rule_eval rid:%d origin series labels:%+v", rule.Id, series.Metric)
|
||||
}
|
||||
|
||||
isTriggered := parser.CalcWithRid(trigger.Exp, m, rule.Id)
|
||||
// 此条日志很重要,是告警判断的现场值
|
||||
logger.Infof("rule_eval rid:%d trigger:%+v exp:%s res:%v m:%v", rule.Id, trigger, trigger.Exp, isTriggered, m)
|
||||
|
||||
var values string
|
||||
for k, v := range m {
|
||||
if !strings.Contains(k, ".") {
|
||||
continue
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
|
||||
switch v.(type) {
|
||||
case float64:
|
||||
values += fmt.Sprintf("%s:%.3f ", k, v)
|
||||
case string:
|
||||
values += fmt.Sprintf("%s:%s ", k, v)
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// do nothing
|
||||
case models.RecoverOnCondition:
|
||||
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
|
||||
if !fulfill {
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
point := models.AnomalyPoint{
|
||||
Key: sample.MetricName(),
|
||||
Labels: sample.Metric,
|
||||
Timestamp: int64(ts),
|
||||
Value: value,
|
||||
Values: values,
|
||||
Severity: trigger.Severity,
|
||||
Triggered: isTriggered,
|
||||
Query: fmt.Sprintf("query:%+v trigger:%+v", ruleQuery.Queries, trigger),
|
||||
RecoverConfig: trigger.RecoverConfig,
|
||||
ValuesUnit: valuesUnitMap,
|
||||
}
|
||||
if ruleQuery.NodataTrigger.Enable {
|
||||
|
||||
if isTriggered {
|
||||
points = append(points, point)
|
||||
} else {
|
||||
switch trigger.RecoverConfig.JudgeType {
|
||||
case models.Origin:
|
||||
// do nothing
|
||||
case models.RecoverOnCondition:
|
||||
fulfill := parser.CalcWithRid(trigger.RecoverConfig.RecoverExp, m, rule.Id)
|
||||
if !fulfill {
|
||||
now := time.Now().Unix()
|
||||
|
||||
// 使用 arw.LastSeriesStore 检查上次查询结果
|
||||
if len(arw.LastSeriesStore) > 0 {
|
||||
// 遍历上次的曲线数据
|
||||
for hash, lastSeries := range arw.LastSeriesStore {
|
||||
|
||||
if ruleQuery.NodataTrigger.ResolveAfterEnable {
|
||||
lastTs, _, exists := lastSeries.Last()
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查是否超过 resolve_after 时间
|
||||
if now-int64(lastTs) > int64(ruleQuery.NodataTrigger.ResolveAfter) {
|
||||
logger.Infof("rule_eval rid:%d series:%+v resolve after %d seconds now:%d lastTs:%d", rule.Id, lastSeries, ruleQuery.NodataTrigger.ResolveAfter, now, int64(lastTs))
|
||||
delete(arw.LastSeriesStore, hash)
|
||||
continue
|
||||
}
|
||||
}
|
||||
recoverPoints = append(recoverPoints, point)
|
||||
|
||||
// 检查是否在本次查询结果中存在
|
||||
if _, exists := seriesStore[hash]; !exists {
|
||||
// 生成无数据告警点
|
||||
point := models.AnomalyPoint{
|
||||
Key: lastSeries.MetricName(),
|
||||
Labels: lastSeries.Metric,
|
||||
Timestamp: now,
|
||||
Value: 0,
|
||||
Values: fmt.Sprintf("nodata since %v", time.Unix(now, 0).Format("2006-01-02 15:04:05")),
|
||||
Severity: ruleQuery.NodataTrigger.Severity,
|
||||
Triggered: true,
|
||||
Query: fmt.Sprintf("nodata check for %s", lastSeries.LabelsString()),
|
||||
TriggerType: models.TriggerTypeNodata,
|
||||
}
|
||||
points = append(points, point)
|
||||
logger.Infof("rule_eval rid:%d nodata point:%+v", rule.Id, point)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 更新 arw.LastSeriesStore
|
||||
for hash, series := range seriesStore {
|
||||
arw.LastSeriesStore[hash] = series
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -224,6 +224,15 @@ func (p *Processor) BuildEvent(anomalyPoint models.AnomalyPoint, from string, no
|
||||
event.RecoverConfig = anomalyPoint.RecoverConfig
|
||||
event.RuleHash = ruleHash
|
||||
|
||||
if anomalyPoint.TriggerType == models.TriggerTypeNodata {
|
||||
event.TriggerValue = "nodata"
|
||||
ruleConfig := models.RuleQuery{}
|
||||
json.Unmarshal([]byte(p.rule.RuleConfig), &ruleConfig)
|
||||
ruleConfig.TriggerType = anomalyPoint.TriggerType
|
||||
b, _ := json.Marshal(ruleConfig)
|
||||
event.RuleConfig = string(b)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal([]byte(p.rule.Annotations), &event.AnnotationsJSON); err != nil {
|
||||
event.AnnotationsJSON = make(map[string]string) // 解析失败时使用空 map
|
||||
logger.Warningf("unmarshal annotations json failed: %v, rule: %d", err, p.rule.Id)
|
||||
|
||||
@@ -92,7 +92,6 @@ func (s *Set) updateMeta(items map[string]models.HostMeta) {
|
||||
|
||||
func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
if s.redis == nil {
|
||||
logger.Warningf("redis is nil")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -549,6 +549,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.PUT("/targets/note", rt.targetUpdateNoteByService)
|
||||
service.PUT("/targets/bgid", rt.targetUpdateBgidByService)
|
||||
|
||||
service.POST("/targets-of-host-query", rt.targetsOfHostQuery)
|
||||
|
||||
service.POST("/alert-rules", rt.alertRuleAddByService)
|
||||
service.POST("/alert-rule-add", rt.alertRuleAddOneByService)
|
||||
service.DELETE("/alert-rules", rt.alertRuleDelByService)
|
||||
|
||||
@@ -102,9 +102,29 @@ func (rt *Router) builtinMetricsDefaultTypes(c *gin.Context) {
|
||||
func (rt *Router) builtinMetricsTypes(c *gin.Context) {
|
||||
collector := ginx.QueryStr(c, "collector", "")
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
disabled := ginx.QueryInt(c, "disabled", -1)
|
||||
lang := c.GetHeader("X-Language")
|
||||
|
||||
ginx.NewRender(c).Data(models.BuiltinMetricTypes(rt.Ctx, lang, collector, query))
|
||||
metricTypeList, err := models.BuiltinMetricTypes(rt.Ctx, lang, collector, query)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
componentList, err := models.BuiltinComponentGets(rt.Ctx, "", disabled)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
// 创建一个 map 来存储 componentList 中的类型
|
||||
componentTypes := make(map[string]struct{})
|
||||
for _, comp := range componentList {
|
||||
componentTypes[comp.Ident] = struct{}{}
|
||||
}
|
||||
|
||||
filteredMetricTypeList := make([]string, 0)
|
||||
for _, metricType := range metricTypeList {
|
||||
if _, exists := componentTypes[metricType]; exists {
|
||||
filteredMetricTypeList = append(filteredMetricTypeList, metricType)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(filteredMetricTypeList, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) builtinMetricsCollectors(c *gin.Context) {
|
||||
|
||||
@@ -406,16 +406,15 @@ func haveNeverGroupedIdent(ctx *ctx.Context, idents []string) (bool, error) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
||||
if len(bgids) <= 0 {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
||||
func (rt *Router) targetBindBgids(c *gin.Context) {
|
||||
var f targetBgidsForm
|
||||
var err error
|
||||
@@ -571,3 +570,18 @@ func (rt *Router) targetsOfAlertRule(c *gin.Context) {
|
||||
|
||||
ginx.NewRender(c).Data(ret, err)
|
||||
}
|
||||
|
||||
func (rt *Router) targetsOfHostQuery(c *gin.Context) {
|
||||
var queries []models.HostQuery
|
||||
ginx.BindJSON(c, &queries)
|
||||
|
||||
hostsQuery := models.GetHostsQuery(queries)
|
||||
session := models.TargetFilterQueryBuild(rt.Ctx, hostsQuery, 0, 0)
|
||||
var lst []*models.Target
|
||||
err := session.Find(&lst).Error
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(lst, nil)
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
|
||||
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
|
||||
if err != nil {
|
||||
logger.Warningf("get plugin:%+v fail: %v", item, err)
|
||||
logger.Debugf("get plugin:%+v fail: %v", item, err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -161,9 +161,9 @@ type PromRuleConfig struct {
|
||||
type RecoverJudge int
|
||||
|
||||
const (
|
||||
Origin RecoverJudge = 0
|
||||
RecoverWithoutData RecoverJudge = 1
|
||||
RecoverOnCondition RecoverJudge = 2
|
||||
Origin RecoverJudge = 0
|
||||
NotRecoverWhenNoData RecoverJudge = 1
|
||||
RecoverOnCondition RecoverJudge = 2
|
||||
)
|
||||
|
||||
type RecoverConfig struct {
|
||||
@@ -194,10 +194,20 @@ type HostTrigger struct {
|
||||
}
|
||||
|
||||
type RuleQuery struct {
|
||||
Version string `json:"version"`
|
||||
Inhibit bool `json:"inhibit"`
|
||||
Queries []interface{} `json:"queries"`
|
||||
Triggers []Trigger `json:"triggers"`
|
||||
Version string `json:"version"`
|
||||
Inhibit bool `json:"inhibit"`
|
||||
Queries []interface{} `json:"queries"`
|
||||
ExpTriggerDisable bool `json:"exp_trigger_disable"`
|
||||
Triggers []Trigger `json:"triggers"`
|
||||
NodataTrigger NodataTrigger `json:"nodata_trigger"`
|
||||
TriggerType TriggerType `json:"trigger_type,omitempty"`
|
||||
}
|
||||
|
||||
type NodataTrigger struct {
|
||||
Enable bool `json:"enable"`
|
||||
Severity int `json:"severity"`
|
||||
ResolveAfterEnable bool `json:"resolve_after_enable"`
|
||||
ResolveAfter int `json:"resolve_after"` // 单位秒
|
||||
}
|
||||
|
||||
type Trigger struct {
|
||||
@@ -1175,8 +1185,6 @@ func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
|
||||
event.RuleProd = ar.Prod
|
||||
event.RuleAlgo = ar.Algorithm
|
||||
event.PromForDuration = ar.PromForDuration
|
||||
event.RuleConfig = ar.RuleConfig
|
||||
event.RuleConfigJson = ar.RuleConfigJson
|
||||
event.Callbacks = ar.Callbacks
|
||||
event.CallbacksJSON = ar.CallbacksJSON
|
||||
event.RunbookUrl = ar.RunbookUrl
|
||||
|
||||
@@ -20,8 +20,16 @@ type AnomalyPoint struct {
|
||||
Values string `json:"values"`
|
||||
ValuesUnit map[string]unit.FormattedValue `json:"values_unit"`
|
||||
RecoverConfig RecoverConfig `json:"recover_config"`
|
||||
TriggerType TriggerType `json:"trigger_type"`
|
||||
}
|
||||
|
||||
type TriggerType string
|
||||
|
||||
const (
|
||||
TriggerTypeNormal TriggerType = "normal"
|
||||
TriggerTypeNodata TriggerType = "nodata"
|
||||
)
|
||||
|
||||
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {
|
||||
anomalyPointLabels := make(model.Metric)
|
||||
for k, v := range labels {
|
||||
|
||||
@@ -162,11 +162,6 @@ var entries = []CheckEntry{
|
||||
ErrorMessage: "Some recovery scripts still in the BusiGroup",
|
||||
FieldName: "group_id",
|
||||
},
|
||||
{
|
||||
Entry: &TaskRecord{},
|
||||
ErrorMessage: "Some Task Record records still in the BusiGroup",
|
||||
FieldName: "group_id",
|
||||
},
|
||||
{
|
||||
Entry: &TargetBusiGroup{},
|
||||
ErrorMessage: "Some target busigroups still in the BusiGroup",
|
||||
|
||||
@@ -53,6 +53,12 @@ func (d *DataResp) MetricName() string {
|
||||
return string(metric)
|
||||
}
|
||||
|
||||
// labels 转换为 string
|
||||
func (d *DataResp) LabelsString() string {
|
||||
labels := d.Metric
|
||||
return labels.String()
|
||||
}
|
||||
|
||||
type RelationKey struct {
|
||||
LeftKey string `json:"left_key"`
|
||||
RightKey string `json:"right_key"`
|
||||
|
||||
@@ -23,24 +23,31 @@ type DataResponse[T any] struct {
|
||||
}
|
||||
|
||||
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
|
||||
var err error
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
if len(addrs) == 0 {
|
||||
var dat T
|
||||
return dat, fmt.Errorf("no center api addresses configured")
|
||||
}
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
dat, e := GetByUrl[T](url, ctx.CenterApi)
|
||||
if e != nil {
|
||||
err = e
|
||||
// 随机选择起始位置
|
||||
startIdx := rand.Intn(len(addrs))
|
||||
|
||||
// 从随机位置开始遍历所有地址
|
||||
var dat T
|
||||
var err error
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
idx := (startIdx + i) % len(addrs)
|
||||
url := fmt.Sprintf("%s%s", addrs[idx], path)
|
||||
|
||||
dat, err = GetByUrl[T](url, ctx.CenterApi)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
|
||||
continue
|
||||
}
|
||||
return dat, nil
|
||||
}
|
||||
|
||||
var dat T
|
||||
err = fmt.Errorf("failed to get data from center, path= %s, ctx.CenterApi.Addrs= %v", path, addrs)
|
||||
return dat, err
|
||||
return dat, fmt.Errorf("failed to get data from center, path= %s, addrs= %v err: %v", path, addrs, err)
|
||||
}
|
||||
|
||||
func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
|
||||
@@ -97,25 +104,32 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
|
||||
return dataResp.Dat, nil
|
||||
}
|
||||
|
||||
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
|
||||
func PostByUrls(ctx *ctx.Context, path string, v interface{}) error {
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
|
||||
_, err = PostByUrl[interface{}](url, ctx.CenterApi, v)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(addrs) < 1 {
|
||||
err = fmt.Errorf("submission of the POST request from the center has failed, "+
|
||||
if len(addrs) == 0 {
|
||||
return fmt.Errorf("submission of the POST request from the center has failed, "+
|
||||
"path= %s, v= %v, ctx.CenterApi.Addrs= %v", path, v, addrs)
|
||||
}
|
||||
return
|
||||
|
||||
// 随机选择起始位置
|
||||
startIdx := rand.Intn(len(addrs))
|
||||
|
||||
// 从随机位置开始遍历所有地址
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
idx := (startIdx + i) % len(addrs)
|
||||
url := fmt.Sprintf("%s%s", addrs[idx], path)
|
||||
|
||||
_, err := PostByUrl[interface{}](url, ctx.CenterApi, v)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to post data to center, url: %s, err: %v", url, err)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("failed to post data to center, path= %s, addrs= %v", path, addrs)
|
||||
}
|
||||
|
||||
func PostByUrlsWithResp[T any](ctx *ctx.Context, path string, v interface{}) (t T, err error) {
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
if len(addrs) < 1 {
|
||||
@@ -123,14 +137,24 @@ func PostByUrlsWithResp[T any](ctx *ctx.Context, path string, v interface{}) (t
|
||||
"path= %s, v= %v, ctx.CenterApi.Addrs= %v", path, v, addrs)
|
||||
return
|
||||
}
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
t, err = PostByUrl[T](fmt.Sprintf("%s%s", addr, path), ctx.CenterApi, v)
|
||||
if err == nil {
|
||||
break
|
||||
|
||||
// 随机选择起始位置
|
||||
startIdx := rand.Intn(len(addrs))
|
||||
|
||||
// 从随机位置开始遍历所有地址
|
||||
for i := 0; i < len(addrs); i++ {
|
||||
idx := (startIdx + i) % len(addrs)
|
||||
url := fmt.Sprintf("%s%s", addrs[idx], path)
|
||||
|
||||
t, err = PostByUrl[T](url, ctx.CenterApi, v)
|
||||
if err != nil {
|
||||
logger.Warningf("failed to post data to center, url: %s, err: %v", url, err)
|
||||
continue
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
return
|
||||
|
||||
return t, fmt.Errorf("failed to post data to center, path= %s, addrs= %v err: %v", path, addrs, err)
|
||||
}
|
||||
|
||||
func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err error) {
|
||||
|
||||
@@ -96,7 +96,7 @@ type TargetUpdate struct {
|
||||
|
||||
func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
err := updateTargetsUpdateTs(lst, now, s.redis)
|
||||
if err != nil {
|
||||
if err != nil && s.ctx.IsCenter {
|
||||
logger.Errorf("failed to update targets:%v update_ts: %v", lst, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -28,6 +28,9 @@ type WriterGlobalOpt struct {
|
||||
QueuePopSize int
|
||||
AllQueueMaxSize int
|
||||
AllQueueMaxSizeInterval int
|
||||
RetryCount int
|
||||
RetryInterval int64
|
||||
OverLimitStatusCode int
|
||||
}
|
||||
|
||||
type WriterOptions struct {
|
||||
@@ -80,13 +83,25 @@ func (p *Pushgw) PreCheck() {
|
||||
}
|
||||
|
||||
if p.WriterOpt.AllQueueMaxSize <= 0 {
|
||||
p.WriterOpt.AllQueueMaxSize = 10000000
|
||||
p.WriterOpt.AllQueueMaxSize = 5000000
|
||||
}
|
||||
|
||||
if p.WriterOpt.AllQueueMaxSizeInterval <= 0 {
|
||||
p.WriterOpt.AllQueueMaxSizeInterval = 200
|
||||
}
|
||||
|
||||
if p.WriterOpt.RetryCount <= 0 {
|
||||
p.WriterOpt.RetryCount = 1000
|
||||
}
|
||||
|
||||
if p.WriterOpt.RetryInterval <= 0 {
|
||||
p.WriterOpt.RetryInterval = 1
|
||||
}
|
||||
|
||||
if p.WriterOpt.OverLimitStatusCode <= 0 {
|
||||
p.WriterOpt.OverLimitStatusCode = 499
|
||||
}
|
||||
|
||||
if p.WriteConcurrency <= 0 {
|
||||
p.WriteConcurrency = 5000
|
||||
}
|
||||
|
||||
@@ -145,38 +145,39 @@ func matchSample(filterMap, sampleMap map[string]string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSeries) {
|
||||
func (rt *Router) ForwardByIdent(clientIP string, ident string, v *prompb.TimeSeries) error {
|
||||
v = rt.BeforePush(clientIP, v)
|
||||
if v == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
IdentStats.Increment(ident, 1)
|
||||
if rt.DropSample(v) {
|
||||
CounterDropSampleTotal.WithLabelValues(ident).Inc()
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
count := IdentStats.Get(ident)
|
||||
if count > rt.Pushgw.IdentDropThreshold {
|
||||
CounterDropSampleTotal.WithLabelValues(ident).Inc()
|
||||
return
|
||||
// 单个 ident 的样本数超过阈值,不算异常,不影响其他机器的上报,不返回 err
|
||||
return nil
|
||||
}
|
||||
|
||||
rt.Writers.PushSample(ident, *v)
|
||||
return rt.Writers.PushSample(ident, *v)
|
||||
}
|
||||
|
||||
func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.TimeSeries) {
|
||||
func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.TimeSeries) error {
|
||||
v = rt.BeforePush(clientIP, v)
|
||||
rt.debugSample(clientIP, v)
|
||||
if v == nil {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
IdentStats.Increment(metric, 1)
|
||||
if rt.DropSample(v) {
|
||||
CounterDropSampleTotal.WithLabelValues(metric).Inc()
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
var hashkey string
|
||||
@@ -185,7 +186,8 @@ func (rt *Router) ForwardByMetric(clientIP string, metric string, v *prompb.Time
|
||||
} else {
|
||||
hashkey = metric[0:1]
|
||||
}
|
||||
rt.Writers.PushSample(hashkey, *v)
|
||||
|
||||
return rt.Writers.PushSample(hashkey, *v)
|
||||
}
|
||||
|
||||
func (rt *Router) BeforePush(clientIP string, v *prompb.TimeSeries) *prompb.TimeSeries {
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
|
||||
@@ -30,6 +33,20 @@ type Router struct {
|
||||
HeartbeartApi string
|
||||
}
|
||||
|
||||
func stat() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
start := time.Now()
|
||||
c.Next()
|
||||
|
||||
code := fmt.Sprintf("%d", c.Writer.Status())
|
||||
method := c.Request.Method
|
||||
labels := []string{"pushgw", code, c.FullPath(), method}
|
||||
|
||||
RequestCounter.WithLabelValues(labels...).Inc()
|
||||
RequestDuration.WithLabelValues(labels...).Observe(float64(time.Since(start).Seconds()))
|
||||
}
|
||||
}
|
||||
|
||||
func New(httpConfig httpx.Config, pushgw pconf.Pushgw, aconf aconf.Alert, tc *memsto.TargetCacheType, bg *memsto.BusiGroupCacheType,
|
||||
idents *idents.Set, metas *metas.Set,
|
||||
writers *writer.WritersType, ctx *ctx.Context) *Router {
|
||||
@@ -61,6 +78,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
registerMetrics()
|
||||
go rt.ReportIdentStats()
|
||||
|
||||
r.Use(stat())
|
||||
// datadog url: http://n9e-pushgw.foo.com/datadog
|
||||
// use apiKey not basic auth
|
||||
r.POST("/datadog/api/v1/series", rt.datadogSeries)
|
||||
|
||||
@@ -257,9 +257,14 @@ func (r *Router) datadogSeries(c *gin.Context) {
|
||||
}
|
||||
|
||||
if ident != "" {
|
||||
r.ForwardByIdent(c.ClientIP(), ident, pt)
|
||||
err = r.ForwardByIdent(c.ClientIP(), ident, pt)
|
||||
} else {
|
||||
r.ForwardByMetric(c.ClientIP(), item.Metric, pt)
|
||||
err = r.ForwardByMetric(c.ClientIP(), item.Metric, pt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.String(r.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
succ++
|
||||
|
||||
@@ -207,9 +207,14 @@ func (rt *Router) falconPush(c *gin.Context) {
|
||||
}
|
||||
|
||||
if ident != "" {
|
||||
rt.ForwardByIdent(c.ClientIP(), ident, pt)
|
||||
err = rt.ForwardByIdent(c.ClientIP(), ident, pt)
|
||||
} else {
|
||||
rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
|
||||
err = rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
succ++
|
||||
|
||||
@@ -202,9 +202,14 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
|
||||
}
|
||||
|
||||
if host != "" {
|
||||
rt.ForwardByIdent(c.ClientIP(), host, pt)
|
||||
err = rt.ForwardByIdent(c.ClientIP(), host, pt)
|
||||
} else {
|
||||
rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
|
||||
err = rt.ForwardByMetric(c.ClientIP(), arr[i].Metric, pt)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
succ++
|
||||
|
||||
@@ -1,15 +1,18 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func extractMetricFromTimeSeries(s *prompb.TimeSeries) string {
|
||||
@@ -99,6 +102,15 @@ func duplicateLabelKey(series *prompb.TimeSeries) bool {
|
||||
}
|
||||
|
||||
func (rt *Router) remoteWrite(c *gin.Context) {
|
||||
curLen := rt.Writers.AllQueueLen.Load().(int)
|
||||
if curLen > rt.Pushgw.WriterOpt.AllQueueMaxSize {
|
||||
err := fmt.Errorf("write queue full, metric count over limit: %d", curLen)
|
||||
logger.Warning(err)
|
||||
writer.CounterPushQueueOverLimitTotal.Inc()
|
||||
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
req, err := DecodeWriteRequest(c.Request.Body)
|
||||
if err != nil {
|
||||
c.String(http.StatusBadRequest, err.Error())
|
||||
@@ -138,15 +150,23 @@ func (rt *Router) remoteWrite(c *gin.Context) {
|
||||
ids[ident] = struct{}{}
|
||||
}
|
||||
|
||||
var err error
|
||||
if len(ident) > 0 {
|
||||
rt.ForwardByIdent(c.ClientIP(), ident, &req.Timeseries[i])
|
||||
err = rt.ForwardByIdent(c.ClientIP(), ident, &req.Timeseries[i])
|
||||
} else {
|
||||
rt.ForwardByMetric(c.ClientIP(), extractMetricFromTimeSeries(&req.Timeseries[i]), &req.Timeseries[i])
|
||||
err = rt.ForwardByMetric(c.ClientIP(), extractMetricFromTimeSeries(&req.Timeseries[i]), &req.Timeseries[i])
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.String(rt.Pushgw.WriterOpt.OverLimitStatusCode, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
|
||||
rt.IdentSet.MSet(ids)
|
||||
|
||||
c.String(200, "")
|
||||
}
|
||||
|
||||
// DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling
|
||||
|
||||
@@ -8,6 +8,8 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
labels = []string{"service", "code", "path", "method"}
|
||||
|
||||
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
@@ -28,6 +30,24 @@ var (
|
||||
Name: "sample_received_by_ident",
|
||||
Help: "Number of sample push by ident.",
|
||||
}, []string{"host_ident"})
|
||||
|
||||
RequestCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "http_request_count_total",
|
||||
Help: "Total number of HTTP requests made.",
|
||||
}, labels,
|
||||
)
|
||||
|
||||
RequestDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "http_request_duration_seconds",
|
||||
Help: "HTTP request latencies in seconds.",
|
||||
}, labels,
|
||||
)
|
||||
)
|
||||
|
||||
func registerMetrics() {
|
||||
@@ -35,5 +55,7 @@ func registerMetrics() {
|
||||
CounterSampleTotal,
|
||||
CounterDropSampleTotal,
|
||||
CounterSampleReceivedByIdent,
|
||||
RequestCounter,
|
||||
RequestDuration,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -28,6 +28,13 @@ var (
|
||||
}, []string{"host_ident"},
|
||||
)
|
||||
|
||||
GaugeAllQueueSize = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "all_queue_size",
|
||||
Help: "The size of all queue.",
|
||||
})
|
||||
|
||||
CounterWirteTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
@@ -48,6 +55,13 @@ var (
|
||||
Name: "push_queue_error_total",
|
||||
Help: "Number of push queue error.",
|
||||
}, []string{"host_ident"})
|
||||
|
||||
CounterPushQueueOverLimitTotal = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "push_queue_over_limit_error_total",
|
||||
Help: "Number of push queue over limit.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -57,5 +71,7 @@ func init() {
|
||||
CounterWirteErrorTotal,
|
||||
CounterPushQueueErrorTotal,
|
||||
GaugeSampleQueueSize,
|
||||
CounterPushQueueOverLimitTotal,
|
||||
GaugeAllQueueSize,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ type WriterType struct {
|
||||
Opts pconf.WriterOptions
|
||||
ForceUseServerTS bool
|
||||
Client api.Client
|
||||
RetryCount int
|
||||
RetryInterval int64 // 单位秒
|
||||
}
|
||||
|
||||
func (w WriterType) writeRelabel(items []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
@@ -76,21 +78,32 @@ func (w WriterType) Write(key string, items []prompb.TimeSeries, headers ...map[
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
|
||||
for i := 0; i < w.RetryCount; i++ {
|
||||
err := w.Post(snappy.Encode(nil, data), headers...)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
CounterWirteErrorTotal.WithLabelValues(key).Add(float64(len(items)))
|
||||
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
|
||||
logger.Warning("example timeseries:", items[0].String())
|
||||
logger.Warningf("post to %s got error: %v in %d times", w.Opts.Url, err, i)
|
||||
|
||||
if i == 0 {
|
||||
logger.Warning("example timeseries:", items[0].String())
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(w.RetryInterval) * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
urls := strings.Split(w.Opts.Url, ",")
|
||||
var err error
|
||||
var newRequestErr error
|
||||
var httpReq *http.Request
|
||||
for _, url := range urls {
|
||||
httpReq, err = http.NewRequest("POST", url, bytes.NewReader(req))
|
||||
if err != nil {
|
||||
logger.Warningf("create remote write:%s request got error: %s", url, err.Error())
|
||||
httpReq, newRequestErr = http.NewRequest("POST", url, bytes.NewReader(req))
|
||||
if newRequestErr != nil {
|
||||
logger.Warningf("create remote write:%s request got error: %s", url, newRequestErr.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -126,12 +139,18 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
|
||||
logger.Warningf("push data with remote write:%s request got status code: %v, response body: %s", url, resp.StatusCode, string(body))
|
||||
continue
|
||||
}
|
||||
|
||||
if resp.StatusCode >= 500 {
|
||||
err = fmt.Errorf("push data with remote write:%s request got status code: %v, response body: %s", url, resp.StatusCode, string(body))
|
||||
logger.Warning(err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
|
||||
@@ -142,7 +161,7 @@ type WritersType struct {
|
||||
pushgw pconf.Pushgw
|
||||
backends map[string]WriterType
|
||||
queues map[string]*IdentQueue
|
||||
allQueueLen atomic.Value
|
||||
AllQueueLen atomic.Value
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@@ -159,6 +178,8 @@ func (ws *WritersType) ReportQueueStats(ident string, identQueue *IdentQueue) (i
|
||||
if count > ws.pushgw.IdentStatsThreshold {
|
||||
GaugeSampleQueueSize.WithLabelValues(ident).Set(float64(count))
|
||||
}
|
||||
|
||||
GaugeAllQueueSize.Set(float64(ws.AllQueueLen.Load().(int)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,7 +191,7 @@ func (ws *WritersType) SetAllQueueLen() {
|
||||
curMetricLen += q.list.Len()
|
||||
}
|
||||
ws.RUnlock()
|
||||
ws.allQueueLen.Store(curMetricLen)
|
||||
ws.AllQueueLen.Store(curMetricLen)
|
||||
time.Sleep(time.Duration(ws.pushgw.WriterOpt.AllQueueMaxSizeInterval) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
@@ -180,7 +201,7 @@ func NewWriters(pushgwConfig pconf.Pushgw) *WritersType {
|
||||
backends: make(map[string]WriterType),
|
||||
queues: make(map[string]*IdentQueue),
|
||||
pushgw: pushgwConfig,
|
||||
allQueueLen: atomic.Value{},
|
||||
AllQueueLen: atomic.Value{},
|
||||
}
|
||||
|
||||
writers.Init()
|
||||
@@ -215,7 +236,7 @@ func (ws *WritersType) CleanExpQueue() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WritersType) PushSample(ident string, v interface{}) {
|
||||
func (ws *WritersType) PushSample(ident string, v interface{}) error {
|
||||
ws.RLock()
|
||||
identQueue := ws.queues[ident]
|
||||
ws.RUnlock()
|
||||
@@ -235,11 +256,12 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
|
||||
}
|
||||
|
||||
identQueue.ts = time.Now().Unix()
|
||||
curLen := ws.allQueueLen.Load().(int)
|
||||
curLen := ws.AllQueueLen.Load().(int)
|
||||
if curLen > ws.pushgw.WriterOpt.AllQueueMaxSize {
|
||||
logger.Warningf("Write %+v full, metric count over limit: %d", v, curLen)
|
||||
CounterPushQueueErrorTotal.WithLabelValues(ident).Inc()
|
||||
return
|
||||
err := fmt.Errorf("write queue full, metric count over limit: %d", curLen)
|
||||
logger.Warning(err)
|
||||
CounterPushQueueOverLimitTotal.Inc()
|
||||
return err
|
||||
}
|
||||
|
||||
succ := identQueue.list.PushFront(v)
|
||||
@@ -247,6 +269,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {
|
||||
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, identQueue.list.Len())
|
||||
CounterPushQueueErrorTotal.WithLabelValues(ident).Inc()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
@@ -270,7 +293,7 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
|
||||
func (ws *WritersType) Init() error {
|
||||
opts := ws.pushgw.Writers
|
||||
ws.allQueueLen.Store(0)
|
||||
ws.AllQueueLen.Store(0)
|
||||
|
||||
for i := 0; i < len(opts); i++ {
|
||||
tlsConf, err := opts[i].ClientConfig.TLSConfig()
|
||||
@@ -310,6 +333,8 @@ func (ws *WritersType) Init() error {
|
||||
Opts: opts[i],
|
||||
Client: cli,
|
||||
ForceUseServerTS: ws.pushgw.ForceUseServerTS,
|
||||
RetryCount: ws.pushgw.WriterOpt.RetryCount,
|
||||
RetryInterval: ws.pushgw.WriterOpt.RetryInterval,
|
||||
}
|
||||
|
||||
ws.Put(opts[i].Url, writer)
|
||||
|
||||
Reference in New Issue
Block a user