Compare commits

...

1 Commits

Author SHA1 Message Date
ning
ff40fe3518 refactor stats 2023-07-20 16:36:40 +08:00
10 changed files with 109 additions and 89 deletions

View File

@@ -10,23 +10,11 @@ const (
)
type Stats struct {
CounterSampleTotal *prometheus.CounterVec
CounterAlertsTotal *prometheus.CounterVec
GaugeAlertQueueSize prometheus.Gauge
GaugeSampleQueueSize *prometheus.GaugeVec
RequestDuration *prometheus.HistogramVec
ForwardDuration *prometheus.HistogramVec
CounterAlertsTotal *prometheus.CounterVec
GaugeAlertQueueSize prometheus.Gauge
}
func NewSyncStats() *Stats {
// 从各个接收接口接收到的监控数据总量
CounterSampleTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"cluster", "channel"})
// 产生的告警总量
CounterAlertsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
@@ -43,51 +31,13 @@ func NewSyncStats() *Stats {
Help: "The size of alert queue.",
})
// 数据转发队列,各个队列的长度
GaugeSampleQueueSize := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_queue_size",
Help: "The size of sample queue.",
}, []string{"cluster", "channel_number"})
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
RequestDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.01, .1, 1},
Name: "http_request_duration_seconds",
Help: "HTTP request latencies in seconds.",
}, []string{"code", "path", "method"},
)
// 发往后端TSDB延迟如何
ForwardDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.1, 1, 10},
Name: "forward_duration_seconds",
Help: "Forward samples to TSDB. latencies in seconds.",
}, []string{"cluster", "channel_number"},
)
prometheus.MustRegister(
CounterSampleTotal,
CounterAlertsTotal,
GaugeAlertQueueSize,
GaugeSampleQueueSize,
RequestDuration,
ForwardDuration,
)
return &Stats{
CounterSampleTotal: CounterSampleTotal,
CounterAlertsTotal: CounterAlertsTotal,
GaugeAlertQueueSize: GaugeAlertQueueSize,
GaugeSampleQueueSize: GaugeSampleQueueSize,
RequestDuration: RequestDuration,
ForwardDuration: ForwardDuration,
CounterAlertsTotal: CounterAlertsTotal,
GaugeAlertQueueSize: GaugeAlertQueueSize,
}
}

55
pushgw/pstats/stat.go Normal file
View File

@@ -0,0 +1,55 @@
package pstats
import "github.com/prometheus/client_golang/prometheus"
const (
namespace = "n9e"
subsystem = "server"
)
var (
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"channel"})
CounterPushSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_push_total",
Help: "Total number samples push to tsdb.",
}, []string{"url"})
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
RequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.01, .1, 1},
Name: "http_request_duration_seconds",
Help: "HTTP request latencies in seconds.",
}, []string{"code", "path", "method"},
)
// 发往后端TSDB延迟如何
ForwardDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.1, 1, 10},
Name: "forward_duration_seconds",
Help: "Forward samples to TSDB. latencies in seconds.",
}, []string{"url"},
)
)
func RegisterMetrics() {
prometheus.MustRegister(
CounterSampleTotal,
CounterPushSampleTotal,
RequestDuration,
ForwardDuration,
)
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
)
@@ -35,6 +36,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
idents := idents.New(ctx)
stats := memsto.NewSyncStats()
pstats.RegisterMetrics()
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats, nil)

View File

@@ -1,15 +1,19 @@
package router
import (
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
"fmt"
"time"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
)
type EnrichLabelsFunc func(pt *prompb.TimeSeries)
@@ -43,8 +47,6 @@ func (rt *Router) Config(r *gin.Engine) {
return
}
registerMetrics()
// datadog url: http://n9e-pushgw.foo.com/datadog
// use apiKey not basic auth
r.POST("/datadog/api/v1/series", rt.datadogSeries)
@@ -56,17 +58,30 @@ func (rt *Router) Config(r *gin.Engine) {
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
// enable basic auth
auth := gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth)
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
r.POST("/openfalcon/push", auth, rt.falconPush)
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
r.POST("/opentsdb/put", auth, stat(), rt.openTSDBPut)
r.POST("/openfalcon/push", auth, stat(), rt.falconPush)
r.POST("/prometheus/v1/write", auth, stat(), rt.remoteWrite)
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
r.POST("/v1/n9e/edge/heartbeat", auth, rt.heartbeat)
} else {
// no need basic auth
r.POST("/opentsdb/put", rt.openTSDBPut)
r.POST("/openfalcon/push", rt.falconPush)
r.POST("/prometheus/v1/write", rt.remoteWrite)
r.POST("/opentsdb/put", stat(), rt.openTSDBPut)
r.POST("/openfalcon/push", stat(), rt.falconPush)
r.POST("/prometheus/v1/write", stat(), rt.remoteWrite)
r.POST("/v1/n9e/target-update", rt.targetUpdate)
r.POST("/v1/n9e/edge/heartbeat", rt.heartbeat)
}
}
func stat() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
code := fmt.Sprintf("%d", c.Writer.Status())
method := c.Request.Method
labels := []string{code, c.FullPath(), method}
pstats.RequestDuration.WithLabelValues(labels...).Observe(time.Since(start).Seconds())
}
}

View File

@@ -8,6 +8,8 @@ import (
"net/http"
"strings"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
easyjson "github.com/mailru/easyjson"
"github.com/prometheus/common/model"
@@ -266,7 +268,7 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
r.IdentSet.MSet(ids)
}

View File

@@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/mailru/easyjson"
"github.com/prometheus/common/model"
@@ -216,7 +218,7 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
rt.IdentSet.MSet(ids)
}

View File

@@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
@@ -211,7 +213,7 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
rt.IdentSet.MSet(ids)
}

View File

@@ -5,6 +5,8 @@ import (
"io/ioutil"
"net/http"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@@ -111,7 +113,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
}
}
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
pstats.CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
rt.IdentSet.MSet(ids)
}

View File

@@ -1,18 +0,0 @@
package router
import "github.com/prometheus/client_golang/prometheus"
var (
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "n9e",
Subsystem: "pushgw",
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"channel"})
)
func registerMetrics() {
prometheus.MustRegister(
CounterSampleTotal,
)
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
@@ -37,7 +38,7 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie
return ritems
}
func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
func (w WriterType) Write(url string, items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
defer sema.Release()
if len(items) == 0 {
return
@@ -48,6 +49,11 @@ func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore,
return
}
start := time.Now()
defer func() {
pstats.ForwardDuration.WithLabelValues(url).Observe(time.Since(start).Seconds())
}()
if w.ForceUseServerTS {
ts := time.Now().UnixMilli()
for i := 0; i < len(items); i++ {
@@ -205,13 +211,15 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
return
default:
series := identQueue.list.PopBack(ws.pushgw.WriterOpt.QueuePopSize)
if len(series) == 0 {
count := len(series)
if count == 0 {
time.Sleep(time.Millisecond * 400)
continue
}
for key := range ws.backends {
ws.sema.Acquire()
go ws.backends[key].Write(series, ws.sema)
pstats.CounterPushSampleTotal.WithLabelValues(key).Add(float64(count))
go ws.backends[key].Write(key, series, ws.sema)
}
}
}