mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
1 Commits
fix-exec-s
...
refactor-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ff40fe3518 |
@@ -10,23 +10,11 @@ const (
|
||||
)
|
||||
|
||||
type Stats struct {
|
||||
CounterSampleTotal *prometheus.CounterVec
|
||||
CounterAlertsTotal *prometheus.CounterVec
|
||||
GaugeAlertQueueSize prometheus.Gauge
|
||||
GaugeSampleQueueSize *prometheus.GaugeVec
|
||||
RequestDuration *prometheus.HistogramVec
|
||||
ForwardDuration *prometheus.HistogramVec
|
||||
CounterAlertsTotal *prometheus.CounterVec
|
||||
GaugeAlertQueueSize prometheus.Gauge
|
||||
}
|
||||
|
||||
func NewSyncStats() *Stats {
|
||||
// 从各个接收接口接收到的监控数据总量
|
||||
CounterSampleTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "samples_received_total",
|
||||
Help: "Total number samples received.",
|
||||
}, []string{"cluster", "channel"})
|
||||
|
||||
// 产生的告警总量
|
||||
CounterAlertsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
@@ -43,51 +31,13 @@ func NewSyncStats() *Stats {
|
||||
Help: "The size of alert queue.",
|
||||
})
|
||||
|
||||
// 数据转发队列,各个队列的长度
|
||||
GaugeSampleQueueSize := prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "sample_queue_size",
|
||||
Help: "The size of sample queue.",
|
||||
}, []string{"cluster", "channel_number"})
|
||||
|
||||
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
|
||||
RequestDuration := prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Buckets: []float64{.01, .1, 1},
|
||||
Name: "http_request_duration_seconds",
|
||||
Help: "HTTP request latencies in seconds.",
|
||||
}, []string{"code", "path", "method"},
|
||||
)
|
||||
|
||||
// 发往后端TSDB,延迟如何
|
||||
ForwardDuration := prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Buckets: []float64{.1, 1, 10},
|
||||
Name: "forward_duration_seconds",
|
||||
Help: "Forward samples to TSDB. latencies in seconds.",
|
||||
}, []string{"cluster", "channel_number"},
|
||||
)
|
||||
|
||||
prometheus.MustRegister(
|
||||
CounterSampleTotal,
|
||||
CounterAlertsTotal,
|
||||
GaugeAlertQueueSize,
|
||||
GaugeSampleQueueSize,
|
||||
RequestDuration,
|
||||
ForwardDuration,
|
||||
)
|
||||
|
||||
return &Stats{
|
||||
CounterSampleTotal: CounterSampleTotal,
|
||||
CounterAlertsTotal: CounterAlertsTotal,
|
||||
GaugeAlertQueueSize: GaugeAlertQueueSize,
|
||||
GaugeSampleQueueSize: GaugeSampleQueueSize,
|
||||
RequestDuration: RequestDuration,
|
||||
ForwardDuration: ForwardDuration,
|
||||
CounterAlertsTotal: CounterAlertsTotal,
|
||||
GaugeAlertQueueSize: GaugeAlertQueueSize,
|
||||
}
|
||||
}
|
||||
|
||||
55
pushgw/pstats/stat.go
Normal file
55
pushgw/pstats/stat.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package pstats
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
const (
|
||||
namespace = "n9e"
|
||||
subsystem = "server"
|
||||
)
|
||||
|
||||
var (
|
||||
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "samples_received_total",
|
||||
Help: "Total number samples received.",
|
||||
}, []string{"channel"})
|
||||
|
||||
CounterPushSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "samples_push_total",
|
||||
Help: "Total number samples push to tsdb.",
|
||||
}, []string{"url"})
|
||||
|
||||
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
|
||||
RequestDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Buckets: []float64{.01, .1, 1},
|
||||
Name: "http_request_duration_seconds",
|
||||
Help: "HTTP request latencies in seconds.",
|
||||
}, []string{"code", "path", "method"},
|
||||
)
|
||||
|
||||
// 发往后端TSDB,延迟如何
|
||||
ForwardDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Buckets: []float64{.1, 1, 10},
|
||||
Name: "forward_duration_seconds",
|
||||
Help: "Forward samples to TSDB. latencies in seconds.",
|
||||
}, []string{"url"},
|
||||
)
|
||||
)
|
||||
|
||||
func RegisterMetrics() {
|
||||
prometheus.MustRegister(
|
||||
CounterSampleTotal,
|
||||
CounterPushSampleTotal,
|
||||
RequestDuration,
|
||||
ForwardDuration,
|
||||
)
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/httpx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/logx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/router"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
)
|
||||
@@ -35,6 +36,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
idents := idents.New(ctx)
|
||||
|
||||
stats := memsto.NewSyncStats()
|
||||
pstats.RegisterMetrics()
|
||||
|
||||
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
|
||||
targetCache := memsto.NewTargetCache(ctx, stats, nil)
|
||||
|
||||
@@ -1,15 +1,19 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/httpx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
type EnrichLabelsFunc func(pt *prompb.TimeSeries)
|
||||
@@ -43,8 +47,6 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
return
|
||||
}
|
||||
|
||||
registerMetrics()
|
||||
|
||||
// datadog url: http://n9e-pushgw.foo.com/datadog
|
||||
// use apiKey not basic auth
|
||||
r.POST("/datadog/api/v1/series", rt.datadogSeries)
|
||||
@@ -56,17 +58,30 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
|
||||
// enable basic auth
|
||||
auth := gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth)
|
||||
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", auth, rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
|
||||
r.POST("/opentsdb/put", auth, stat(), rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", auth, stat(), rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", auth, stat(), rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
|
||||
r.POST("/v1/n9e/edge/heartbeat", auth, rt.heartbeat)
|
||||
} else {
|
||||
// no need basic auth
|
||||
r.POST("/opentsdb/put", rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", rt.remoteWrite)
|
||||
r.POST("/opentsdb/put", stat(), rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", stat(), rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", stat(), rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", rt.targetUpdate)
|
||||
r.POST("/v1/n9e/edge/heartbeat", rt.heartbeat)
|
||||
}
|
||||
}
|
||||
|
||||
func stat() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
start := time.Now()
|
||||
c.Next()
|
||||
|
||||
code := fmt.Sprintf("%d", c.Writer.Status())
|
||||
method := c.Request.Method
|
||||
labels := []string{code, c.FullPath(), method}
|
||||
|
||||
pstats.RequestDuration.WithLabelValues(labels...).Observe(time.Since(start).Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
easyjson "github.com/mailru/easyjson"
|
||||
"github.com/prometheus/common/model"
|
||||
@@ -266,7 +268,7 @@ func (r *Router) datadogSeries(c *gin.Context) {
|
||||
}
|
||||
|
||||
if succ > 0 {
|
||||
CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
|
||||
pstats.CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
|
||||
r.IdentSet.MSet(ids)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/mailru/easyjson"
|
||||
"github.com/prometheus/common/model"
|
||||
@@ -216,7 +218,7 @@ func (rt *Router) falconPush(c *gin.Context) {
|
||||
}
|
||||
|
||||
if succ > 0 {
|
||||
CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
|
||||
pstats.CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
|
||||
rt.IdentSet.MSet(ids)
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
@@ -211,7 +213,7 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
|
||||
}
|
||||
|
||||
if succ > 0 {
|
||||
CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
|
||||
pstats.CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
|
||||
rt.IdentSet.MSet(ids)
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
@@ -111,7 +113,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
|
||||
pstats.CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
|
||||
rt.IdentSet.MSet(ids)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
package router
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
var (
|
||||
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "n9e",
|
||||
Subsystem: "pushgw",
|
||||
Name: "samples_received_total",
|
||||
Help: "Total number samples received.",
|
||||
}, []string{"channel"})
|
||||
)
|
||||
|
||||
func registerMetrics() {
|
||||
prometheus.MustRegister(
|
||||
CounterSampleTotal,
|
||||
)
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pstats"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
@@ -37,7 +38,7 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie
|
||||
return ritems
|
||||
}
|
||||
|
||||
func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
|
||||
func (w WriterType) Write(url string, items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
|
||||
defer sema.Release()
|
||||
if len(items) == 0 {
|
||||
return
|
||||
@@ -48,6 +49,11 @@ func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore,
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
pstats.ForwardDuration.WithLabelValues(url).Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
|
||||
if w.ForceUseServerTS {
|
||||
ts := time.Now().UnixMilli()
|
||||
for i := 0; i < len(items); i++ {
|
||||
@@ -205,13 +211,15 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
|
||||
return
|
||||
default:
|
||||
series := identQueue.list.PopBack(ws.pushgw.WriterOpt.QueuePopSize)
|
||||
if len(series) == 0 {
|
||||
count := len(series)
|
||||
if count == 0 {
|
||||
time.Sleep(time.Millisecond * 400)
|
||||
continue
|
||||
}
|
||||
for key := range ws.backends {
|
||||
ws.sema.Acquire()
|
||||
go ws.backends[key].Write(series, ws.sema)
|
||||
pstats.CounterPushSampleTotal.WithLabelValues(key).Add(float64(count))
|
||||
go ws.backends[key].Write(key, series, ws.sema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user