mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
7 Commits
es-sql-ale
...
config-acc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60545cf4c6 | ||
|
|
08c762ecae | ||
|
|
bde4f2f715 | ||
|
|
8922b7e94a | ||
|
|
fc0b374cc3 | ||
|
|
311185e799 | ||
|
|
74f3836733 |
@@ -62,6 +62,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
userCache := memsto.NewUserCache(ctx, syncStats)
|
||||
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
|
||||
taskTplsCache := memsto.NewTaskTplCache(ctx)
|
||||
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
|
||||
|
||||
promClients := prom.NewPromClient(ctx)
|
||||
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
|
||||
@@ -70,7 +71,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP,
|
||||
configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
|
||||
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
|
||||
|
||||
if config.Ibex.Enable {
|
||||
|
||||
@@ -95,6 +95,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
userCache := memsto.NewUserCache(ctx, syncStats)
|
||||
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
|
||||
taskTplCache := memsto.NewTaskTplCache(ctx)
|
||||
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
|
||||
|
||||
sso := sso.Init(config.Center, ctx, configCache)
|
||||
promClients := prom.NewPromClient(ctx)
|
||||
@@ -117,7 +118,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
|
||||
|
||||
centerRouter.Config(r)
|
||||
alertrtRouter.Config(r)
|
||||
|
||||
@@ -45,6 +45,10 @@ func (rt *Router) statistic(c *gin.Context) {
|
||||
statistics, err = models.ConfigsUserVariableStatistics(rt.Ctx)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
return
|
||||
case "cval":
|
||||
statistics, err = models.ConfigCvalStatistics(rt.Ctx)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
return
|
||||
default:
|
||||
ginx.Bomb(http.StatusBadRequest, "invalid name")
|
||||
}
|
||||
|
||||
@@ -52,11 +52,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
|
||||
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
|
||||
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
|
||||
idents := idents.New(ctx, redis)
|
||||
metas := metas.New(redis)
|
||||
writers := writer.NewWriters(config.Pushgw)
|
||||
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
|
||||
|
||||
pushgwRouter.Config(r)
|
||||
|
||||
if !config.Alert.Disable {
|
||||
|
||||
150
memsto/config_cval_cache.go
Normal file
150
memsto/config_cval_cache.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package memsto
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dumper"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type CvalCache struct {
|
||||
statTotal int64
|
||||
statLastUpdated int64
|
||||
ctx *ctx.Context
|
||||
stats *Stats
|
||||
|
||||
mu sync.RWMutex
|
||||
cvals map[string]string
|
||||
}
|
||||
|
||||
func NewCvalCache(ctx *ctx.Context, stats *Stats) *CvalCache {
|
||||
cvalCache := &CvalCache{
|
||||
statTotal: -1,
|
||||
statLastUpdated: -1,
|
||||
ctx: ctx,
|
||||
stats: stats,
|
||||
cvals: make(map[string]string),
|
||||
}
|
||||
cvalCache.initSyncConfigs()
|
||||
return cvalCache
|
||||
}
|
||||
|
||||
func (c *CvalCache) initSyncConfigs() {
|
||||
err := c.syncConfigs()
|
||||
if err != nil {
|
||||
log.Fatalln("failed to sync configs:", err)
|
||||
}
|
||||
|
||||
go c.loopSyncConfigs()
|
||||
}
|
||||
|
||||
func (c *CvalCache) loopSyncConfigs() {
|
||||
duration := time.Duration(9000) * time.Millisecond
|
||||
for {
|
||||
time.Sleep(duration)
|
||||
if err := c.syncConfigs(); err != nil {
|
||||
logger.Warning("failed to sync configs:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CvalCache) syncConfigs() error {
|
||||
start := time.Now()
|
||||
|
||||
stat, err := models.ConfigCvalStatistics(c.ctx)
|
||||
if err != nil {
|
||||
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
|
||||
return errors.WithMessage(err, "failed to call ConfigCvalStatistics")
|
||||
}
|
||||
|
||||
if !c.statChanged(stat.Total, stat.LastUpdated) {
|
||||
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(0)
|
||||
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(0)
|
||||
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "not changed")
|
||||
return nil
|
||||
}
|
||||
|
||||
cvals, err := models.ConfigsGetAll(c.ctx)
|
||||
if err != nil {
|
||||
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query records: "+err.Error())
|
||||
return errors.WithMessage(err, "failed to call ConfigsGet")
|
||||
}
|
||||
|
||||
c.Set(cvals, stat.Total, stat.LastUpdated)
|
||||
|
||||
ms := time.Since(start).Milliseconds()
|
||||
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(float64(ms))
|
||||
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(float64(len(c.cvals)))
|
||||
|
||||
logger.Infof("timer: sync cvals done, cost: %dms", ms)
|
||||
dumper.PutSyncRecord("cvals", start.Unix(), ms, len(c.cvals), "success")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *CvalCache) statChanged(total int64, updated int64) bool {
|
||||
if c.statTotal == total && c.statLastUpdated == updated {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *CvalCache) Set(cvals []*models.Configs, total int64, updated int64) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.statTotal = total
|
||||
c.statLastUpdated = updated
|
||||
for _, cfg := range cvals {
|
||||
c.cvals[cfg.Ckey] = cfg.Cval
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CvalCache) Get(ckey string) string {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.cvals[ckey]
|
||||
}
|
||||
|
||||
func (c *CvalCache) GetLastUpdateTime() int64 {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.statLastUpdated
|
||||
}
|
||||
|
||||
type SiteInfo struct {
|
||||
PrintBodyPaths []string `json:"print_body_paths"`
|
||||
PrintAccessLog bool `json:"print_access_log"`
|
||||
}
|
||||
|
||||
func (c *CvalCache) GetSiteInfo() *SiteInfo {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
si := SiteInfo{}
|
||||
if siteInfoStr := c.Get("site_info"); siteInfoStr != "" {
|
||||
if err := json.Unmarshal([]byte(siteInfoStr), &si); err != nil {
|
||||
logger.Errorf("Failed to unmarshal site info: %v", err)
|
||||
}
|
||||
}
|
||||
return &si
|
||||
}
|
||||
|
||||
func (c *CvalCache) PrintBodyPaths() map[string]struct{} {
|
||||
printBodyPaths := c.GetSiteInfo().PrintBodyPaths
|
||||
pbp := make(map[string]struct{}, len(printBodyPaths))
|
||||
for _, p := range printBodyPaths {
|
||||
pbp[p] = struct{}{}
|
||||
}
|
||||
return pbp
|
||||
}
|
||||
|
||||
func (c *CvalCache) PrintAccessLog() bool {
|
||||
return c.GetSiteInfo().PrintAccessLog
|
||||
}
|
||||
@@ -177,8 +177,8 @@ type Trigger struct {
|
||||
Duration int `json:"duration,omitempty"`
|
||||
Percent int `json:"percent,omitempty"`
|
||||
Joins []Join `json:"joins"`
|
||||
RecoverConfig RecoverConfig `json:"recover_config"`
|
||||
JoinRef string `json:"join_ref"`
|
||||
RecoverConfig RecoverConfig `json:"recover_config"`
|
||||
}
|
||||
|
||||
type Join struct {
|
||||
|
||||
@@ -106,10 +106,8 @@ func InitRSAPassWord(ctx *ctx.Context) (string, error) {
|
||||
|
||||
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-in type configs
|
||||
if !ctx.IsCenter {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
|
||||
return s, err
|
||||
}
|
||||
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
|
||||
return s, err
|
||||
}
|
||||
|
||||
var lst []string
|
||||
@@ -125,6 +123,17 @@ func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func ConfigsGetAll(ctx *ctx.Context) ([]*Configs, error) { // select built-in type configs
|
||||
var lst []*Configs
|
||||
err := DB(ctx).Model(&Configs{}).Select("ckey, cval").
|
||||
Where("ckey!='' and external=? ", 0).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, errors.WithMessage(err, "failed to query configs")
|
||||
}
|
||||
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
func ConfigsSet(ctx *ctx.Context, ckey, cval string) error {
|
||||
return ConfigsSetWithUname(ctx, ckey, cval, "default")
|
||||
}
|
||||
@@ -355,3 +364,19 @@ func ConfigUserVariableGetDecryptMap(context *ctx.Context, privateKey []byte, pa
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func ConfigCvalStatistics(context *ctx.Context) (*Statistics, error) {
|
||||
if !context.IsCenter {
|
||||
return poster.GetByUrls[*Statistics](context, "/v1/n9e/statistic?name=cval")
|
||||
}
|
||||
|
||||
session := DB(context).Model(&Configs{}).Select("count(*) as total",
|
||||
"max(update_at) as last_updated").Where("ckey!='' and external=? ", 0) // built-in config
|
||||
|
||||
var stats []*Statistics
|
||||
err := session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return stats[0], nil
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -41,14 +42,21 @@ type LoggerConfig struct {
|
||||
|
||||
// Output is a writer where logs are written.
|
||||
// Optional. Default value is gin.DefaultWriter.
|
||||
Output io.Writer
|
||||
PrintBody bool
|
||||
Output io.Writer
|
||||
PrintAccessLog func() bool
|
||||
PrintBodyPaths func() map[string]struct{}
|
||||
|
||||
// SkipPaths is a url path array which logs are not written.
|
||||
// Optional.
|
||||
SkipPaths []string
|
||||
}
|
||||
|
||||
func (c *LoggerConfig) ContainsPath(path string) bool {
|
||||
path = strings.Split(path, "?")[0]
|
||||
_, exist := c.PrintBodyPaths()[path]
|
||||
return exist
|
||||
}
|
||||
|
||||
// LogFormatter gives the signature of the formatter function passed to LoggerWithFormatter
|
||||
type LogFormatter func(params LogFormatterParams) string
|
||||
|
||||
@@ -255,6 +263,11 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
|
||||
}
|
||||
|
||||
return func(c *gin.Context) {
|
||||
if !conf.PrintAccessLog() {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
// Start timer
|
||||
start := time.Now()
|
||||
path := c.Request.URL.Path
|
||||
@@ -271,7 +284,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
|
||||
}
|
||||
c.Writer = bodyWriter
|
||||
|
||||
if conf.PrintBody {
|
||||
if conf.ContainsPath(c.Request.RequestURI) {
|
||||
buf, _ := io.ReadAll(c.Request.Body)
|
||||
rdr1 = io.NopCloser(bytes.NewBuffer(buf))
|
||||
rdr2 = io.NopCloser(bytes.NewBuffer(buf))
|
||||
@@ -309,7 +322,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
|
||||
|
||||
// fmt.Fprint(out, formatter(param))
|
||||
logger.Info(formatter(param))
|
||||
if conf.PrintBody {
|
||||
if conf.ContainsPath(c.Request.RequestURI) {
|
||||
respBody := readBody(bytes.NewReader(bodyWriter.body.Bytes()), c.Writer.Header().Get("Content-Encoding"))
|
||||
reqBody := readBody(rdr1, c.Request.Header.Get("Content-Encoding"))
|
||||
logger.Debugf("path:%s req body:%s resp:%s", path, reqBody, respBody)
|
||||
|
||||
@@ -70,10 +70,12 @@ type JWTAuth struct {
|
||||
RedisKeyPrefix string
|
||||
}
|
||||
|
||||
func GinEngine(mode string, cfg Config) *gin.Engine {
|
||||
func GinEngine(mode string, cfg Config, printBodyPaths func() map[string]struct{},
|
||||
printAccessLog func() bool) *gin.Engine {
|
||||
gin.SetMode(mode)
|
||||
|
||||
loggerMid := aop.Logger(aop.LoggerConfig{PrintBody: cfg.PrintBody})
|
||||
loggerMid := aop.Logger(aop.LoggerConfig{PrintAccessLog: printAccessLog,
|
||||
PrintBodyPaths: printBodyPaths})
|
||||
recoveryMid := aop.Recovery()
|
||||
|
||||
if strings.ToLower(mode) == "release" {
|
||||
@@ -84,10 +86,7 @@ func GinEngine(mode string, cfg Config) *gin.Engine {
|
||||
|
||||
r.Use(recoveryMid)
|
||||
|
||||
// whether print access log
|
||||
if cfg.PrintAccessLog {
|
||||
r.Use(loggerMid)
|
||||
}
|
||||
r.Use(loggerMid)
|
||||
|
||||
if cfg.PProf {
|
||||
pprof.Register(r, "/api/debug/pprof")
|
||||
|
||||
@@ -48,10 +48,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
|
||||
targetCache := memsto.NewTargetCache(ctx, stats, nil)
|
||||
configCvalCache := memsto.NewCvalCache(ctx, stats)
|
||||
|
||||
writers := writer.NewWriters(config.Pushgw)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
|
||||
rt := router.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
|
||||
rt.Config(r)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user