Compare commits

...

7 Commits

Author SHA1 Message Date
Yening Qin
60545cf4c6 Merge branch 'main' into config-access-log 2024-10-21 12:11:06 +08:00
ning
08c762ecae config-access-log 2024-10-20 21:51:16 +08:00
ning
bde4f2f715 update GetQueryRef 2024-10-20 21:17:12 +08:00
710leo
8922b7e94a code refactor 2024-10-09 20:08:46 +08:00
710leo
fc0b374cc3 refactor 2024-10-09 11:06:48 +08:00
710leo
311185e799 refactor 2024-10-08 20:30:29 +08:00
710leo
74f3836733 refactor: alert query data set operations 2024-10-08 20:16:08 +08:00
10 changed files with 216 additions and 19 deletions

View File

@@ -62,6 +62,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplsCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
promClients := prom.NewPromClient(ctx)
tdengineClients := tdengine.NewTdengineClient(ctx, config.Alert.Heartbeat)
@@ -70,7 +71,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, taskTplsCache, dsCache, ctx, promClients, tdengineClients, userCache, userGroupCache)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP,
configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
if config.Ibex.Enable {

View File

@@ -95,6 +95,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
taskTplCache := memsto.NewTaskTplCache(ctx)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
sso := sso.Init(config.Center, ctx, configCache)
promClients := prom.NewPromClient(ctx)
@@ -117,7 +118,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
centerRouter.Config(r)
alertrtRouter.Config(r)

View File

@@ -45,6 +45,10 @@ func (rt *Router) statistic(c *gin.Context) {
statistics, err = models.ConfigsUserVariableStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
case "cval":
statistics, err = models.ConfigCvalStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
default:
ginx.Bomb(http.StatusBadRequest, "invalid name")
}

View File

@@ -52,11 +52,13 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
configCvalCache := memsto.NewCvalCache(ctx, syncStats)
idents := idents.New(ctx, redis)
metas := metas.New(redis)
writers := writer.NewWriters(config.Pushgw)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
pushgwRouter.Config(r)
if !config.Alert.Disable {

150
memsto/config_cval_cache.go Normal file
View File

@@ -0,0 +1,150 @@
package memsto
import (
"encoding/json"
"log"
"sync"
"time"
"github.com/ccfos/nightingale/v6/dumper"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
type CvalCache struct {
statTotal int64
statLastUpdated int64
ctx *ctx.Context
stats *Stats
mu sync.RWMutex
cvals map[string]string
}
func NewCvalCache(ctx *ctx.Context, stats *Stats) *CvalCache {
cvalCache := &CvalCache{
statTotal: -1,
statLastUpdated: -1,
ctx: ctx,
stats: stats,
cvals: make(map[string]string),
}
cvalCache.initSyncConfigs()
return cvalCache
}
func (c *CvalCache) initSyncConfigs() {
err := c.syncConfigs()
if err != nil {
log.Fatalln("failed to sync configs:", err)
}
go c.loopSyncConfigs()
}
func (c *CvalCache) loopSyncConfigs() {
duration := time.Duration(9000) * time.Millisecond
for {
time.Sleep(duration)
if err := c.syncConfigs(); err != nil {
logger.Warning("failed to sync configs:", err)
}
}
}
func (c *CvalCache) syncConfigs() error {
start := time.Now()
stat, err := models.ConfigCvalStatistics(c.ctx)
if err != nil {
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query statistics: "+err.Error())
return errors.WithMessage(err, "failed to call ConfigCvalStatistics")
}
if !c.statChanged(stat.Total, stat.LastUpdated) {
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(0)
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(0)
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "not changed")
return nil
}
cvals, err := models.ConfigsGetAll(c.ctx)
if err != nil {
dumper.PutSyncRecord("cvals", start.Unix(), -1, -1, "failed to query records: "+err.Error())
return errors.WithMessage(err, "failed to call ConfigsGet")
}
c.Set(cvals, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
c.stats.GaugeCronDuration.WithLabelValues("sync_cvals").Set(float64(ms))
c.stats.GaugeSyncNumber.WithLabelValues("sync_cvals").Set(float64(len(c.cvals)))
logger.Infof("timer: sync cvals done, cost: %dms", ms)
dumper.PutSyncRecord("cvals", start.Unix(), ms, len(c.cvals), "success")
return nil
}
func (c *CvalCache) statChanged(total int64, updated int64) bool {
if c.statTotal == total && c.statLastUpdated == updated {
return false
}
return true
}
func (c *CvalCache) Set(cvals []*models.Configs, total int64, updated int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.statTotal = total
c.statLastUpdated = updated
for _, cfg := range cvals {
c.cvals[cfg.Ckey] = cfg.Cval
}
}
func (c *CvalCache) Get(ckey string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cvals[ckey]
}
func (c *CvalCache) GetLastUpdateTime() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.statLastUpdated
}
type SiteInfo struct {
PrintBodyPaths []string `json:"print_body_paths"`
PrintAccessLog bool `json:"print_access_log"`
}
func (c *CvalCache) GetSiteInfo() *SiteInfo {
c.mu.RLock()
defer c.mu.RUnlock()
si := SiteInfo{}
if siteInfoStr := c.Get("site_info"); siteInfoStr != "" {
if err := json.Unmarshal([]byte(siteInfoStr), &si); err != nil {
logger.Errorf("Failed to unmarshal site info: %v", err)
}
}
return &si
}
func (c *CvalCache) PrintBodyPaths() map[string]struct{} {
printBodyPaths := c.GetSiteInfo().PrintBodyPaths
pbp := make(map[string]struct{}, len(printBodyPaths))
for _, p := range printBodyPaths {
pbp[p] = struct{}{}
}
return pbp
}
func (c *CvalCache) PrintAccessLog() bool {
return c.GetSiteInfo().PrintAccessLog
}

View File

@@ -177,8 +177,8 @@ type Trigger struct {
Duration int `json:"duration,omitempty"`
Percent int `json:"percent,omitempty"`
Joins []Join `json:"joins"`
RecoverConfig RecoverConfig `json:"recover_config"`
JoinRef string `json:"join_ref"`
RecoverConfig RecoverConfig `json:"recover_config"`
}
type Join struct {

View File

@@ -106,10 +106,8 @@ func InitRSAPassWord(ctx *ctx.Context) (string, error) {
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-in type configs
if !ctx.IsCenter {
if !ctx.IsCenter {
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
var lst []string
@@ -125,6 +123,17 @@ func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) { //select built-
return "", nil
}
func ConfigsGetAll(ctx *ctx.Context) ([]*Configs, error) { // select built-in type configs
var lst []*Configs
err := DB(ctx).Model(&Configs{}).Select("ckey, cval").
Where("ckey!='' and external=? ", 0).Find(&lst).Error
if err != nil {
return nil, errors.WithMessage(err, "failed to query configs")
}
return lst, nil
}
func ConfigsSet(ctx *ctx.Context, ckey, cval string) error {
return ConfigsSetWithUname(ctx, ckey, cval, "default")
}
@@ -355,3 +364,19 @@ func ConfigUserVariableGetDecryptMap(context *ctx.Context, privateKey []byte, pa
return ret, nil
}
func ConfigCvalStatistics(context *ctx.Context) (*Statistics, error) {
if !context.IsCenter {
return poster.GetByUrls[*Statistics](context, "/v1/n9e/statistic?name=cval")
}
session := DB(context).Model(&Configs{}).Select("count(*) as total",
"max(update_at) as last_updated").Where("ckey!='' and external=? ", 0) // built-in config
var stats []*Statistics
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
return stats[0], nil
}

View File

@@ -7,6 +7,7 @@ import (
"io"
"net/http"
"os"
"strings"
"time"
"github.com/gin-gonic/gin"
@@ -41,14 +42,21 @@ type LoggerConfig struct {
// Output is a writer where logs are written.
// Optional. Default value is gin.DefaultWriter.
Output io.Writer
PrintBody bool
Output io.Writer
PrintAccessLog func() bool
PrintBodyPaths func() map[string]struct{}
// SkipPaths is a url path array which logs are not written.
// Optional.
SkipPaths []string
}
func (c *LoggerConfig) ContainsPath(path string) bool {
path = strings.Split(path, "?")[0]
_, exist := c.PrintBodyPaths()[path]
return exist
}
// LogFormatter gives the signature of the formatter function passed to LoggerWithFormatter
type LogFormatter func(params LogFormatterParams) string
@@ -255,6 +263,11 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
}
return func(c *gin.Context) {
if !conf.PrintAccessLog() {
c.Next()
return
}
// Start timer
start := time.Now()
path := c.Request.URL.Path
@@ -271,7 +284,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
}
c.Writer = bodyWriter
if conf.PrintBody {
if conf.ContainsPath(c.Request.RequestURI) {
buf, _ := io.ReadAll(c.Request.Body)
rdr1 = io.NopCloser(bytes.NewBuffer(buf))
rdr2 = io.NopCloser(bytes.NewBuffer(buf))
@@ -309,7 +322,7 @@ func LoggerWithConfig(conf LoggerConfig) gin.HandlerFunc {
// fmt.Fprint(out, formatter(param))
logger.Info(formatter(param))
if conf.PrintBody {
if conf.ContainsPath(c.Request.RequestURI) {
respBody := readBody(bytes.NewReader(bodyWriter.body.Bytes()), c.Writer.Header().Get("Content-Encoding"))
reqBody := readBody(rdr1, c.Request.Header.Get("Content-Encoding"))
logger.Debugf("path:%s req body:%s resp:%s", path, reqBody, respBody)

View File

@@ -70,10 +70,12 @@ type JWTAuth struct {
RedisKeyPrefix string
}
func GinEngine(mode string, cfg Config) *gin.Engine {
func GinEngine(mode string, cfg Config, printBodyPaths func() map[string]struct{},
printAccessLog func() bool) *gin.Engine {
gin.SetMode(mode)
loggerMid := aop.Logger(aop.LoggerConfig{PrintBody: cfg.PrintBody})
loggerMid := aop.Logger(aop.LoggerConfig{PrintAccessLog: printAccessLog,
PrintBodyPaths: printBodyPaths})
recoveryMid := aop.Recovery()
if strings.ToLower(mode) == "release" {
@@ -84,10 +86,7 @@ func GinEngine(mode string, cfg Config) *gin.Engine {
r.Use(recoveryMid)
// whether print access log
if cfg.PrintAccessLog {
r.Use(loggerMid)
}
r.Use(loggerMid)
if cfg.PProf {
pprof.Register(r, "/api/debug/pprof")

View File

@@ -48,10 +48,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
busiGroupCache := memsto.NewBusiGroupCache(ctx, stats)
targetCache := memsto.NewTargetCache(ctx, stats, nil)
configCvalCache := memsto.NewCvalCache(ctx, stats)
writers := writer.NewWriters(config.Pushgw)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
rt := router.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
rt.Config(r)