Compare commits

..

1 Commits

Author SHA1 Message Date
ning
ff40fe3518 refactor stats 2023-07-20 16:36:40 +08:00
15 changed files with 119 additions and 118 deletions

View File

@@ -10,23 +10,11 @@ const (
)
type Stats struct {
CounterSampleTotal *prometheus.CounterVec
CounterAlertsTotal *prometheus.CounterVec
GaugeAlertQueueSize prometheus.Gauge
GaugeSampleQueueSize *prometheus.GaugeVec
RequestDuration *prometheus.HistogramVec
ForwardDuration *prometheus.HistogramVec
CounterAlertsTotal *prometheus.CounterVec
GaugeAlertQueueSize prometheus.Gauge
}
func NewSyncStats() *Stats {
// 从各个接收接口接收到的监控数据总量
CounterSampleTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"cluster", "channel"})
// 产生的告警总量
CounterAlertsTotal := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
@@ -43,51 +31,13 @@ func NewSyncStats() *Stats {
Help: "The size of alert queue.",
})
// 数据转发队列,各个队列的长度
GaugeSampleQueueSize := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "sample_queue_size",
Help: "The size of sample queue.",
}, []string{"cluster", "channel_number"})
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
RequestDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.01, .1, 1},
Name: "http_request_duration_seconds",
Help: "HTTP request latencies in seconds.",
}, []string{"code", "path", "method"},
)
// 发往后端TSDB延迟如何
ForwardDuration := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.1, 1, 10},
Name: "forward_duration_seconds",
Help: "Forward samples to TSDB. latencies in seconds.",
}, []string{"cluster", "channel_number"},
)
prometheus.MustRegister(
CounterSampleTotal,
CounterAlertsTotal,
GaugeAlertQueueSize,
GaugeSampleQueueSize,
RequestDuration,
ForwardDuration,
)
return &Stats{
CounterSampleTotal: CounterSampleTotal,
CounterAlertsTotal: CounterAlertsTotal,
GaugeAlertQueueSize: GaugeAlertQueueSize,
GaugeSampleQueueSize: GaugeSampleQueueSize,
RequestDuration: RequestDuration,
ForwardDuration: ForwardDuration,
CounterAlertsTotal: CounterAlertsTotal,
GaugeAlertQueueSize: GaugeAlertQueueSize,
}
}

View File

@@ -106,7 +106,7 @@ func (rt *Router) configNoRoute(r *gin.Engine, fs *http.FileSystem) {
suffix := arr[len(arr)-1]
switch suffix {
case "png", "jpeg", "jpg", "svg", "ico", "gif", "css", "js", "html", "htm", "gz", "zip", "map", "ttf":
case "png", "jpeg", "jpg", "svg", "ico", "gif", "css", "js", "html", "htm", "gz", "zip", "map":
if !rt.Center.UseFileAssets {
c.FileFromFS(c.Request.URL.Path, *fs)
} else {
@@ -367,7 +367,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.PUT("/notify-config", rt.auth(), rt.admin(), rt.notifyConfigPut)
pages.GET("/es-index-pattern", rt.auth(), rt.esIndexPatternGet)
pages.GET("/es-index-pattern-list", rt.auth(), rt.esIndexPatternGetList)
pages.GET("/es-index-pattern-list", rt.auth(), rt.esIndexPatternGetAll)
pages.POST("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternAdd)
pages.PUT("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternPut)
pages.DELETE("/es-index-pattern", rt.auth(), rt.admin(), rt.esIndexPatternDel)

View File

@@ -57,17 +57,8 @@ func (rt *Router) esIndexPatternDel(c *gin.Context) {
}
// ES Index Pattern列表
func (rt *Router) esIndexPatternGetList(c *gin.Context) {
datasourceId := ginx.QueryInt64(c, "datasource_id", 0)
var lst []*models.EsIndexPattern
var err error
if datasourceId != 0 {
lst, err = models.EsIndexPatternGets(rt.Ctx, "datasource_id = ?", datasourceId)
} else {
lst, err = models.EsIndexPatternGets(rt.Ctx, "")
}
func (rt *Router) esIndexPatternGetAll(c *gin.Context) {
lst, err := models.EsIndexPatternGets(rt.Ctx, "")
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -15,7 +15,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
type queryFormItem struct {
@@ -33,14 +32,10 @@ type batchQueryForm struct {
func (rt *Router) promBatchQueryRange(c *gin.Context) {
var f batchQueryForm
ginx.Dangerous(c.BindJSON(&f))
var lst []model.Value
cli := rt.PromClients.GetCli(f.DatasourceId)
if cli == nil {
logger.Warningf("no such datasource id: %d", f.DatasourceId)
ginx.NewRender(c).Data(lst, nil)
return
}
var lst []model.Value
for _, item := range f.Queries {
r := pkgprom.Range{
@@ -72,14 +67,9 @@ func (rt *Router) promBatchQueryInstant(c *gin.Context) {
var f batchInstantForm
ginx.Dangerous(c.BindJSON(&f))
var lst []model.Value
cli := rt.PromClients.GetCli(f.DatasourceId)
if cli == nil {
logger.Warningf("no such datasource id: %d", f.DatasourceId)
ginx.NewRender(c).Data(lst, nil)
return
}
var lst []model.Value
for _, item := range f.Queries {
resp, _, err := cli.Query(context.Background(), item.Query, time.Unix(item.Time, 0))

View File

@@ -6,7 +6,7 @@ ADD n9e /app/
ADD etc /app/
ADD integrations /app/integrations/
ADD --chmod=755 https://github.com/ufoscout/docker-compose-wait/releases/download/2.11.0/wait_x86_64 /wait
RUN chmod +x /wait && pip install requests
RUN chmod +x /wait
EXPOSE 17000

View File

@@ -632,5 +632,5 @@ CREATE TABLE `es_index_pattern` (
`update_at` bigint default '0',
`update_by` varchar(64) default '',
PRIMARY KEY (`id`),
UNIQUE KEY (`datasource_id`, `name`)
UNIQUE KEY (`name`, `datasource_id`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

55
pushgw/pstats/stat.go Normal file
View File

@@ -0,0 +1,55 @@
package pstats
import "github.com/prometheus/client_golang/prometheus"
const (
namespace = "n9e"
subsystem = "server"
)
var (
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"channel"})
CounterPushSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_push_total",
Help: "Total number samples push to tsdb.",
}, []string{"url"})
// 一些重要的请求,比如接收数据的请求,应该统计一下延迟情况
RequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.01, .1, 1},
Name: "http_request_duration_seconds",
Help: "HTTP request latencies in seconds.",
}, []string{"code", "path", "method"},
)
// 发往后端TSDB延迟如何
ForwardDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Buckets: []float64{.1, 1, 10},
Name: "forward_duration_seconds",
Help: "Forward samples to TSDB. latencies in seconds.",
}, []string{"url"},
)
)
func RegisterMetrics() {
prometheus.MustRegister(
CounterSampleTotal,
CounterPushSampleTotal,
RequestDuration,
ForwardDuration,
)
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
)
@@ -35,6 +36,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
idents := idents.New(ctx)
stats := memsto.NewSyncStats()
pstats.RegisterMetrics()
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats, nil)

View File

@@ -1,15 +1,19 @@
package router
import (
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
"fmt"
"time"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/gin-gonic/gin"
"github.com/prometheus/prometheus/prompb"
)
type EnrichLabelsFunc func(pt *prompb.TimeSeries)
@@ -43,8 +47,6 @@ func (rt *Router) Config(r *gin.Engine) {
return
}
registerMetrics()
// datadog url: http://n9e-pushgw.foo.com/datadog
// use apiKey not basic auth
r.POST("/datadog/api/v1/series", rt.datadogSeries)
@@ -56,17 +58,30 @@ func (rt *Router) Config(r *gin.Engine) {
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
// enable basic auth
auth := gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth)
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
r.POST("/openfalcon/push", auth, rt.falconPush)
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
r.POST("/opentsdb/put", auth, stat(), rt.openTSDBPut)
r.POST("/openfalcon/push", auth, stat(), rt.falconPush)
r.POST("/prometheus/v1/write", auth, stat(), rt.remoteWrite)
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
r.POST("/v1/n9e/edge/heartbeat", auth, rt.heartbeat)
} else {
// no need basic auth
r.POST("/opentsdb/put", rt.openTSDBPut)
r.POST("/openfalcon/push", rt.falconPush)
r.POST("/prometheus/v1/write", rt.remoteWrite)
r.POST("/opentsdb/put", stat(), rt.openTSDBPut)
r.POST("/openfalcon/push", stat(), rt.falconPush)
r.POST("/prometheus/v1/write", stat(), rt.remoteWrite)
r.POST("/v1/n9e/target-update", rt.targetUpdate)
r.POST("/v1/n9e/edge/heartbeat", rt.heartbeat)
}
}
func stat() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
code := fmt.Sprintf("%d", c.Writer.Status())
method := c.Request.Method
labels := []string{code, c.FullPath(), method}
pstats.RequestDuration.WithLabelValues(labels...).Observe(time.Since(start).Seconds())
}
}

View File

@@ -8,6 +8,8 @@ import (
"net/http"
"strings"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
easyjson "github.com/mailru/easyjson"
"github.com/prometheus/common/model"
@@ -266,7 +268,7 @@ func (r *Router) datadogSeries(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("datadog").Add(float64(succ))
r.IdentSet.MSet(ids)
}

View File

@@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/mailru/easyjson"
"github.com/prometheus/common/model"
@@ -216,7 +218,7 @@ func (rt *Router) falconPush(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("openfalcon").Add(float64(succ))
rt.IdentSet.MSet(ids)
}

View File

@@ -8,6 +8,8 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
@@ -211,7 +213,7 @@ func (rt *Router) openTSDBPut(c *gin.Context) {
}
if succ > 0 {
CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
pstats.CounterSampleTotal.WithLabelValues("opentsdb").Add(float64(succ))
rt.IdentSet.MSet(ids)
}

View File

@@ -5,6 +5,8 @@ import (
"io/ioutil"
"net/http"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/gin-gonic/gin"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
@@ -111,7 +113,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
}
}
CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
pstats.CounterSampleTotal.WithLabelValues("prometheus").Add(float64(count))
rt.IdentSet.MSet(ids)
}

View File

@@ -1,18 +0,0 @@
package router
import "github.com/prometheus/client_golang/prometheus"
var (
CounterSampleTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "n9e",
Subsystem: "pushgw",
Name: "samples_received_total",
Help: "Total number samples received.",
}, []string{"channel"})
)
func registerMetrics() {
prometheus.MustRegister(
CounterSampleTotal,
)
}

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/pstats"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
@@ -37,7 +38,7 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie
return ritems
}
func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
func (w WriterType) Write(url string, items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
defer sema.Release()
if len(items) == 0 {
return
@@ -48,6 +49,11 @@ func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore,
return
}
start := time.Now()
defer func() {
pstats.ForwardDuration.WithLabelValues(url).Observe(time.Since(start).Seconds())
}()
if w.ForceUseServerTS {
ts := time.Now().UnixMilli()
for i := 0; i < len(items); i++ {
@@ -205,13 +211,15 @@ func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
return
default:
series := identQueue.list.PopBack(ws.pushgw.WriterOpt.QueuePopSize)
if len(series) == 0 {
count := len(series)
if count == 0 {
time.Sleep(time.Millisecond * 400)
continue
}
for key := range ws.backends {
ws.sema.Acquire()
go ws.backends[key].Write(series, ws.sema)
pstats.CounterPushSampleTotal.WithLabelValues(key).Add(float64(count))
go ws.backends[key].Write(key, series, ws.sema)
}
}
}