Compare commits

...

17 Commits

Author SHA1 Message Date
ning
ec2ed3dedf refactor: init alert 2023-05-25 14:37:42 +08:00
ning
053a5b860b change get datasource 2023-05-23 20:52:09 +08:00
ning
c1eafd35f2 change some logs 2023-05-23 19:43:55 +08:00
ning
b0e9ac15d8 Merge branch 'main' of ssh://github.com/ccfos/nightingale into get-config-by-api 2023-05-23 13:54:07 +08:00
ning
987c0c86af refactor: get ops and metrics 2023-05-22 16:06:34 +08:00
ning
b75cd7a16f add role perm list and change get datasource 2023-05-22 15:12:27 +08:00
ning
a57fe3231f refactor get from api 2023-05-18 21:06:18 +08:00
ning
9870fdce4c update AlertCurEventGetByRuleIdAndDsId 2023-05-18 20:36:21 +08:00
ning
a7b9323318 refactor get alert rules 2023-05-18 19:43:24 +08:00
ning
a38e87d29c fix get user members 2023-05-18 18:42:13 +08:00
ning
b2669d3a38 code refactor 2023-05-18 18:18:33 +08:00
ning
806d50c5f6 change pushgw update target 2023-05-18 18:10:51 +08:00
ning
7b664323d3 add hearbeat 2023-05-18 17:46:30 +08:00
ning
23189f81e3 change event persist 2023-05-18 16:46:53 +08:00
ning
edfdb5b8fe fix sync datasource 2023-05-18 15:51:14 +08:00
ning
43f6266c2f add service api 2023-05-18 15:17:26 +08:00
ning
77b4d6d044 get alert mute by api 2023-05-17 19:45:49 +08:00
63 changed files with 1001 additions and 210 deletions

2
.gitignore vendored
View File

@@ -42,7 +42,7 @@ _test
/docker/n9e
/docker/mysqldata
/docker/experience_pg_vm/pgdata
/etc.local
/etc.local*
.alerts
.idea

View File

@@ -23,7 +23,6 @@ import (
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)
func Initialize(configDir string, cryptoKey string) (func(), error) {
@@ -37,21 +36,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
db, err := storage.New(config.DB)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
redis, err := storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
targetCache := memsto.NewTargetCache(ctx, syncStats, nil)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)
@@ -62,7 +52,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, false)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
@@ -77,7 +67,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
@@ -85,12 +75,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)
naming := naming.NewNaming(ctx, alertc.Heartbeat)
writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)

View File

@@ -8,6 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/queue"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
@@ -82,78 +83,17 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
}
func (e *Consumer) persist(event *models.AlertCurEvent) {
has, err := models.AlertCurEventExists(e.ctx, "hash=?", event.Hash)
if err != nil {
logger.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
return
}
his := event.ToHis(e.ctx)
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
if has {
// 活跃告警表中有记录,删之
err = models.AlertCurEventDelByHash(e.ctx, event.Hash)
if !e.ctx.IsCenter {
event.DB2FE()
err := poster.PostByUrls(e.ctx, "/v1/n9e/event-persist", event)
if err != nil {
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
return
logger.Errorf("event%+v persist err:%v", event, err)
}
if !event.IsRecovered {
// 恢复事件从活跃告警列表彻底删掉告警事件要重新加进来新的event
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
}
}
return
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return
}
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
err := models.EventPersist(e.ctx, event)
if err != nil {
logger.Errorf("event%+v persist err:%v", event, err)
}
}

View File

@@ -199,7 +199,7 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
}
// handle event callbacks
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.notifyConfigCache.GetIbex())
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.userCache, e.notifyConfigCache.GetIbex())
// handle global webhooks
sender.SendWebhooks(notifyTarget.ToWebhookList(), event)

View File

@@ -16,7 +16,6 @@ import (
)
type Scheduler struct {
isCenter bool
// key: hash
alertRules map[string]*AlertRuleWorker
@@ -38,11 +37,10 @@ type Scheduler struct {
stats *astats.Stats
}
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
scheduler := &Scheduler{
isCenter: isCenter,
aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker),
@@ -108,7 +106,7 @@ func (s *Scheduler) syncAlertRules() {
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() && s.isCenter {
} else if rule.IsHostRule() && s.ctx.IsCenter {
// all host rule will be processed by center instance
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue

View File

@@ -109,7 +109,7 @@ func (arw *AlertRuleWorker) Eval() {
}
func (arw *AlertRuleWorker) Stop() {
logger.Infof("%s stopped", arw.Key())
logger.Infof("rule_eval %s stopped", arw.Key())
close(arw.quit)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
@@ -16,14 +17,12 @@ import (
type Naming struct {
ctx *ctx.Context
heartbeatConfig aconf.HeartbeatConfig
isCenter bool
}
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
naming := &Naming{
ctx: ctx,
heartbeatConfig: heartbeat,
isCenter: isCenter,
}
naming.Heartbeats()
return naming
@@ -45,6 +44,10 @@ func (n *Naming) Heartbeats() error {
}
func (n *Naming) loopDeleteInactiveInstances() {
if !n.ctx.IsCenter {
return
}
interval := time.Duration(10) * time.Minute
for {
time.Sleep(interval)
@@ -112,7 +115,7 @@ func (n *Naming) heartbeat() error {
localss[datasourceIds[i]] = newss
}
if n.isCenter {
if n.ctx.IsCenter {
// 如果是中心节点,还需要处理 host 类型的告警规则host 类型告警规则,和数据源无关,想复用下数据源的 hash ring想用一个虚假的数据源 id 来处理
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, HostDatasource)
@@ -146,6 +149,11 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
return nil, fmt.Errorf("cluster is empty")
}
if !n.ctx.IsCenter {
lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?dsid="+fmt.Sprintf("%d", datasourceId))
return lst, err
}
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}

View File

@@ -118,7 +118,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
logger.Errorf("rule not found %+v", anomalyPoints)
return
}
p.rule = cachedRule
now := time.Now().Unix()
alertingKeys := map[string]struct{}{}
@@ -337,7 +337,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
func (p *Processor) RecoverAlertCurEventFromDb() {
p.pendings = NewAlertCurEventMap(nil)
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(p.ctx, p.rule.Id, p.datasourceId)
curEvents, err := models.AlertCurEventGetByRuleIdAndDsId(p.ctx, p.rule.Id, p.datasourceId)
if err != nil {
logger.Errorf("recover event from db for rule:%s failed, err:%s", p.Key(), err)
p.fires = NewAlertCurEventMap(nil)

View File

@@ -48,6 +48,7 @@ func (rt *Router) Config(r *gin.Engine) {
service.Use(gin.BasicAuth(rt.HTTP.Alert.BasicAuth))
}
service.POST("/event", rt.pushEventToQueue)
service.POST("/event-persist", rt.eventPersist)
service.POST("/make-event", rt.makeEvent)
}

View File

@@ -83,6 +83,13 @@ func (rt *Router) pushEventToQueue(c *gin.Context) {
ginx.NewRender(c).Message(nil)
}
func (rt *Router) eventPersist(c *gin.Context) {
var event *models.AlertCurEvent
ginx.BindJSON(c, &event)
event.FE2DB()
ginx.NewRender(c).Message(models.EventPersist(rt.Ctx, event))
}
type eventForm struct {
Alert bool `json:"alert"`
AnomalyPoints []common.AnomalyPoint `json:"vectors"`

View File

@@ -15,7 +15,7 @@ import (
"github.com/toolkits/pkg/logger"
)
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
for _, url := range urls {
if url == "" {
continue
@@ -23,7 +23,7 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
if strings.HasPrefix(url, "${ibex}") {
if !event.IsRecovered {
handleIbex(ctx, url, event, targetCache, ibexConf)
handleIbex(ctx, url, event, targetCache, userCache, ibexConf)
}
continue
}
@@ -60,7 +60,7 @@ type TaskCreateReply struct {
Dat int64 `json:"dat"` // task.id
}
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
arr := strings.Split(url, "/")
var idstr string
@@ -103,7 +103,7 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
// check perm
// tpl.GroupId - host - account 三元组校验权限
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache)
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache, userCache)
if err != nil {
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
return
@@ -176,12 +176,8 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
}
}
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType) (bool, error) {
user, err := models.UserGetByUsername(ctx, username)
if err != nil {
return false, err
}
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
user := userCache.GetByUsername(username)
if user != nil && user.IsAdmin() {
return true, nil
}

View File

@@ -1,18 +1,12 @@
package cconf
import (
"github.com/gin-gonic/gin"
)
type Center struct {
Plugins []Plugin
BasicAuth gin.Accounts
MetricsYamlFile string
OpsYamlFile string
BuiltinIntegrationsDir string
I18NHeaderKey string
MetricDesc MetricDescType
TargetMetrics map[string]string
AnonymousAccess AnonymousAccess
}

View File

@@ -4,7 +4,6 @@ import (
"path"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/runner"
)
// metricDesc , As load map happens before read map, there is no necessary to use concurrent map for metric desc store
@@ -33,10 +32,10 @@ func GetMetricDesc(lang, metric string) string {
return MetricDesc.CommonDesc[metric]
}
func LoadMetricsYaml(metricsYamlFile string) error {
func LoadMetricsYaml(configDir, metricsYamlFile string) error {
fp := metricsYamlFile
if fp == "" {
fp = path.Join(runner.Cwd, "etc", "metrics.yaml")
fp = path.Join(configDir, "metrics.yaml")
}
if !file.IsExist(fp) {
return nil

View File

@@ -4,7 +4,6 @@ import (
"path"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/runner"
)
var Operations = Operation{}
@@ -19,10 +18,10 @@ type Ops struct {
Ops []string `yaml:"ops" json:"ops"`
}
func LoadOpsYaml(opsYamlFile string) error {
func LoadOpsYaml(configDir string, opsYamlFile string) error {
fp := opsYamlFile
if fp == "" {
fp = path.Join(runner.Cwd, "etc", "ops.yaml")
fp = path.Join(configDir, "ops.yaml")
}
if !file.IsExist(fp) {
return nil

View File

@@ -33,8 +33,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, fmt.Errorf("failed to init config: %v", err)
}
cconf.LoadMetricsYaml(config.Center.MetricsYamlFile)
cconf.LoadOpsYaml(config.Center.OpsYamlFile)
cconf.LoadMetricsYaml(configDir, config.Center.MetricsYamlFile)
cconf.LoadOpsYaml(configDir, config.Center.OpsYamlFile)
logxClean, err := logx.Init(config.Log)
if err != nil {
@@ -47,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), db, true)
models.InitRoot(ctx)
redis, err := storage.NewRedis(config.Redis)
@@ -56,7 +56,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
metas := metas.New(redis)
idents := idents.New(db)
idents := idents.New(ctx)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
@@ -73,7 +73,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, true)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
writers := writer.NewWriters(config.Pushgw)

View File

@@ -148,6 +148,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/auth/callback", rt.loginCallback)
pages.GET("/auth/callback/cas", rt.loginCallbackCas)
pages.GET("/auth/callback/oauth", rt.loginCallbackOAuth)
pages.GET("/auth/perms", rt.allPerms)
pages.GET("/metrics/desc", rt.metricsDescGetFile)
pages.POST("/metrics/desc", rt.metricsDescGetMap)
@@ -303,7 +304,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/role/:id/ops", rt.auth(), rt.admin(), rt.operationOfRole)
pages.PUT("/role/:id/ops", rt.auth(), rt.admin(), rt.roleBindOperation)
pages.GET("operation", rt.operations)
pages.GET("/operation", rt.operations)
pages.GET("/notify-tpls", rt.auth(), rt.admin(), rt.notifyTplGets)
pages.PUT("/notify-tpl/content", rt.auth(), rt.admin(), rt.notifyTplUpdateContent)
@@ -339,7 +340,10 @@ func (rt *Router) Config(r *gin.Engine) {
service.POST("/users", rt.userAddPost)
service.GET("/users", rt.userFindAll)
service.GET("/targets", rt.targetGets)
service.GET("/user-groups", rt.userGroupGetsByService)
service.GET("/user-group-members", rt.userGroupMemberGetsByService)
service.GET("/targets", rt.targetGetsByService)
service.GET("/targets/tags", rt.targetGetTags)
service.POST("/targets/tags", rt.targetBindTagsByService)
service.DELETE("/targets/tags", rt.targetUnbindTagsByService)
@@ -351,16 +355,29 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/alert-rule/:arid", rt.alertRuleGet)
service.GET("/alert-rules", rt.alertRulesGetByService)
service.GET("/alert-subscribes", rt.alertSubscribeGetsByService)
service.GET("/busi-groups", rt.busiGroupGetsByService)
service.GET("/datasources", rt.datasourceGetsByService)
service.GET("/datasource-ids", rt.getDatasourceIds)
service.POST("/server-heartbeat", rt.serverHeartbeat)
service.GET("/servers-active", rt.serversActive)
service.GET("/recording-rules", rt.recordingRuleGetsByService)
service.GET("/alert-mutes", rt.alertMuteGets)
service.POST("/alert-mutes", rt.alertMuteAddByService)
service.DELETE("/alert-mutes", rt.alertMuteDel)
service.GET("/alert-cur-events", rt.alertCurEventsList)
service.GET("/alert-cur-events-get-by-rid", rt.alertCurEventsGetByRid)
service.GET("/alert-his-events", rt.alertHisEventsList)
service.GET("/alert-his-event/:eid", rt.alertHisEventGet)
service.GET("/config/:id", rt.configGet)
service.GET("/configs", rt.configsGet)
service.GET("/config", rt.configGetByKey)
service.PUT("/configs", rt.configsPut)
service.POST("/configs", rt.configsPost)
service.DELETE("/configs", rt.configsDel)
@@ -368,7 +385,11 @@ func (rt *Router) Config(r *gin.Engine) {
service.POST("/conf-prop/encrypt", rt.confPropEncrypt)
service.POST("/conf-prop/decrypt", rt.confPropDecrypt)
service.GET("/datasource-ids", rt.getDatasourceIds)
service.GET("/statistic", rt.statistic)
service.GET("/notify-tpls", rt.notifyTplGets)
service.POST("/task-record-add", rt.taskRecordAdd)
}
}

View File

@@ -128,6 +128,13 @@ func (rt *Router) alertCurEventsCardDetails(c *gin.Context) {
ginx.NewRender(c).Data(list, err)
}
// alertCurEventsGetByRid
func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
rid := ginx.QueryInt64(c, "rid")
dsId := ginx.QueryInt64(c, "dsid")
ginx.NewRender(c).Data(models.AlertCurEventGetByRuleIdAndDsId(rt.Ctx, rid, dsId))
}
// 列表方式,拉取活跃告警
func (rt *Router) alertCurEventsList(c *gin.Context) {
stime, etime := getTimeRange(c)

View File

@@ -27,7 +27,12 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
}
func (rt *Router) alertRulesGetByService(c *gin.Context) {
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
prods := []string{}
prodStr := ginx.QueryStr(c, "prods", "")
if prodStr != "" {
prods = strings.Split(ginx.QueryStr(c, "prods", ""), ",")
}
query := ginx.QueryStr(c, "query", "")
algorithm := ginx.QueryStr(c, "algorithm", "")
cluster := ginx.QueryStr(c, "cluster", "")

View File

@@ -110,3 +110,8 @@ func (rt *Router) alertSubscribeDel(c *gin.Context) {
ginx.NewRender(c).Message(models.AlertSubscribeDel(rt.Ctx, f.Ids))
}
func (rt *Router) alertSubscribeGetsByService(c *gin.Context) {
lst, err := models.AlertSubscribeGetsByService(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -123,6 +123,11 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) busiGroupGetsByService(c *gin.Context) {
lst, err := models.BusiGroupGetAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
// 这个接口只有在活跃告警页面才调用获取各个BG的活跃告警数量
func (rt *Router) busiGroupAlertingsGets(c *gin.Context) {
ids := ginx.QueryStr(c, "ids", "")

View File

@@ -20,6 +20,11 @@ func (rt *Router) configGet(c *gin.Context) {
ginx.NewRender(c).Data(configs, err)
}
func (rt *Router) configGetByKey(c *gin.Context) {
config, err := models.ConfigsGet(rt.Ctx, ginx.QueryStr(c, "key"))
ginx.NewRender(c).Data(config, err)
}
func (rt *Router) configsDel(c *gin.Context) {
var f idsForm
ginx.BindJSON(c, &f)

View File

@@ -36,6 +36,12 @@ func (rt *Router) datasourceList(c *gin.Context) {
Render(c, list, err)
}
func (rt *Router) datasourceGetsByService(c *gin.Context) {
typ := ginx.QueryStr(c, "typ", "")
lst, err := models.GetDatasourcesGetsBy(rt.Ctx, typ, "", "", "")
ginx.NewRender(c).Data(lst, err)
}
type datasourceBrief struct {
Id int64 `json:"id"`
Name string `json:"name"`

View File

@@ -17,6 +17,41 @@ import (
const defaultLimit = 300
func (rt *Router) statistic(c *gin.Context) {
name := ginx.QueryStr(c, "name")
var model interface{}
var err error
var statistics *models.Statistics
switch name {
case "alert_mute":
model = models.AlertMute{}
case "alert_rule":
model = models.AlertRule{}
case "alert_subscribe":
model = models.AlertSubscribe{}
case "busi_group":
model = models.BusiGroup{}
case "recording_rule":
model = models.RecordingRule{}
case "target":
model = models.Target{}
case "user":
model = models.User{}
case "user_group":
model = models.UserGroup{}
case "datasource":
// datasource update_at is different from others
statistics, err = models.DatasourceStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
default:
ginx.Bomb(http.StatusBadRequest, "invalid name")
}
statistics, err = models.StatisticsGet(rt.Ctx, model)
ginx.NewRender(c).Data(statistics, err)
}
func queryDatasourceIds(c *gin.Context) []int64 {
datasourceIds := ginx.QueryStr(c, "datasource_ids", "")
datasourceIds = strings.ReplaceAll(datasourceIds, ",", " ")

View File

@@ -21,7 +21,7 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
func (rt *Router) alertMuteGets(c *gin.Context) {
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
bgid := ginx.QueryInt64(c, "bgid", 0)
bgid := ginx.QueryInt64(c, "bgid", -1)
query := ginx.QueryStr(c, "query", "")
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, query)

View File

@@ -19,6 +19,11 @@ func (rt *Router) recordingRuleGets(c *gin.Context) {
ginx.NewRender(c).Data(ars, err)
}
func (rt *Router) recordingRuleGetsByService(c *gin.Context) {
ars, err := models.RecordingRuleEnabledGets(rt.Ctx)
ginx.NewRender(c).Data(ars, err)
}
func (rt *Router) recordingRuleGet(c *gin.Context) {
rrid := ginx.UrlParamInt64(c, "rrid")

View File

@@ -83,3 +83,18 @@ func (rt *Router) roleGets(c *gin.Context) {
lst, err := models.RoleGetsAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) allPerms(c *gin.Context) {
roles, err := models.RoleGetsAll(rt.Ctx)
ginx.Dangerous(err)
m := make(map[string][]string)
for _, r := range roles {
lst, err := models.OperationsOfRole(rt.Ctx, strings.Fields(r.Name))
if err != nil {
continue
}
m[r.Name] = lst
}
ginx.NewRender(c).Data(m, err)
}

View File

@@ -1,6 +1,8 @@
package router
import (
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
@@ -16,3 +18,17 @@ func (rt *Router) serverClustersGet(c *gin.Context) {
list, err := models.AlertingEngineGetsClusters(rt.Ctx, "")
ginx.NewRender(c).Data(list, err)
}
func (rt *Router) serverHeartbeat(c *gin.Context) {
var req models.HeartbeatInfo
ginx.BindJSON(c, &req)
err := models.AlertingEngineHeartbeatWithCluster(rt.Ctx, req.Instance, req.EngineCluster, req.DatasourceId)
ginx.NewRender(c).Message(err)
}
func (rt *Router) serversActive(c *gin.Context) {
datasourceId := ginx.QueryInt64(c, "dsid")
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
ginx.NewRender(c).Data(servers, err)
}

View File

@@ -101,6 +101,11 @@ func (rt *Router) targetGets(c *gin.Context) {
}, nil)
}
func (rt *Router) targetGetsByService(c *gin.Context) {
lst, err := models.TargetGetsAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) targetGetTags(c *gin.Context) {
idents := ginx.QueryStr(c, "idents", "")
idents = strings.ReplaceAll(idents, ",", " ")

View File

@@ -120,6 +120,12 @@ func (f *taskForm) HandleFH(fh string) {
f.Title = f.Title + " FH: " + fh
}
func (rt *Router) taskRecordAdd(c *gin.Context) {
var f *models.TaskRecord
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(f.Add(rt.Ctx))
}
func (rt *Router) taskAdd(c *gin.Context) {
var f taskForm
ginx.BindJSON(c, &f)

View File

@@ -12,19 +12,8 @@ import (
)
func (rt *Router) userFindAll(c *gin.Context) {
limit := ginx.QueryInt(c, "limit", 20)
query := ginx.QueryStr(c, "query", "")
total, err := models.UserTotal(rt.Ctx, query)
ginx.Dangerous(err)
list, err := models.UserGets(rt.Ctx, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
"list": list,
"total": total,
}, nil)
list, err := models.UserGetAll(rt.Ctx)
ginx.NewRender(c).Data(list, err)
}
func (rt *Router) userGets(c *gin.Context) {

View File

@@ -29,6 +29,17 @@ func (rt *Router) userGroupGets(c *gin.Context) {
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) userGroupGetsByService(c *gin.Context) {
lst, err := models.UserGroupGetAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
// user group member get by service
func (rt *Router) userGroupMemberGetsByService(c *gin.Context) {
members, err := models.UserGroupMemberGetAll(rt.Ctx)
ginx.NewRender(c).Data(members, err)
}
type userGroupForm struct {
Name string `json:"name" binding:"required"`
Note string `json:"note"`

View File

@@ -18,7 +18,7 @@ func Upgrade(configFile string) error {
return err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), db, false)
for _, cluster := range config.Clusters {
count, err := models.GetDatasourcesCountBy(ctx, "", "", cluster.Name)
if err != nil {

View File

@@ -14,20 +14,28 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/storage"
"github.com/gin-gonic/gin"
)
type ConfigType struct {
Global GlobalConfig
Log logx.Config
HTTP httpx.Config
DB ormx.DBConfig
Redis storage.RedisConfig
Global GlobalConfig
Log logx.Config
HTTP httpx.Config
DB ormx.DBConfig
Redis storage.RedisConfig
CenterApi CenterApi
Pushgw pconf.Pushgw
Alert aconf.Alert
Center cconf.Center
}
type CenterApi struct {
Addrs []string
BasicAuth gin.Accounts
}
type GlobalConfig struct {
RunMode string
}

62
etc/alert.toml.example Normal file
View File

@@ -0,0 +1,62 @@
[Global]
RunMode = "release"
[CenterApi]
Addrs = ["http://127.0.0.1:17000"]
[CenterApi.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[Alert]
[Alert.Heartbeat]
# auto detect if blank
IP = ""
# unit ms
Interval = 1000
EngineName = "default"
[Log]
# log write dir
Dir = "logs"
# log level: DEBUG INFO WARNING ERROR
Level = "DEBUG"
# stdout, stderr, file
Output = "stdout"
# # rotate by time
# KeepHours = 4
# # rotate by size
# RotateNum = 3
# # unit: MB
# RotateSize = 256
[HTTP]
# http listening address
Host = "0.0.0.0"
# http listening port
Port = 17001
# https cert file path
CertFile = ""
# https key file path
KeyFile = ""
# whether print access log
PrintAccessLog = false
# whether enable pprof
PProf = false
# expose prometheus /metrics?
ExposeMetrics = true
# http graceful shutdown timeout, unit: s
ShutdownTimeout = 30
# max content length: 64M
MaxContentLength = 67108864
# http server read timeout, unit: s
ReadTimeout = 20
# http server write timeout, unit: s
WriteTimeout = 40
# http server idle timeout, unit: s
IdleTimeout = 120
[HTTP.Alert]
Enable = true
[HTTP.Alert.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"

103
etc/pushgw.toml.example Normal file
View File

@@ -0,0 +1,103 @@
[Global]
RunMode = "release"
[CenterApi]
Addrs = ["http://127.0.0.1:17000"]
[CenterApi.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[Pushgw]
# use target labels in database instead of in series
LabelRewrite = true
# # default busigroup key name
# BusiGroupLabelKey = "busigroup"
# ForceUseServerTS = false
# [Pushgw.DebugSample]
# ident = "xx"
# __name__ = "xx"
# [Pushgw.WriterOpt]
# # Writer Options
# QueueCount = 1000
# QueueMaxSize = 1000000
# QueuePopSize = 1000
# # ident or metric
# ShardingKey = "ident"
[[Pushgw.Writers]]
# Url = "http://127.0.0.1:8480/insert/0/prometheus/api/v1/write"
Url = "http://127.0.0.1:9090/api/v1/write"
# Basic auth username
BasicAuthUser = ""
# Basic auth password
BasicAuthPass = ""
# timeout settings, unit: ms
Headers = ["X-From", "n9e"]
Timeout = 10000
DialTimeout = 3000
TLSHandshakeTimeout = 30000
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000
# time duration, unit: ms
KeepAlive = 30000
MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 100
## Optional TLS Config
# UseTLS = false
# TLSCA = "/etc/n9e/ca.pem"
# TLSCert = "/etc/n9e/cert.pem"
# TLSKey = "/etc/n9e/key.pem"
# InsecureSkipVerify = false
# [[Writers.WriteRelabels]]
# Action = "replace"
# SourceLabels = ["__address__"]
# Regex = "([^:]+)(?::\\d+)?"
# Replacement = "$1:80"
# TargetLabel = "__address__"
[Log]
# log write dir
Dir = "logs"
# log level: DEBUG INFO WARNING ERROR
Level = "DEBUG"
# stdout, stderr, file
Output = "stdout"
# # rotate by time
# KeepHours = 4
# # rotate by size
# RotateNum = 3
# # unit: MB
# RotateSize = 256
[HTTP]
# http listening address
Host = "0.0.0.0"
# http listening port
Port = 17000
# https cert file path
CertFile = ""
# https key file path
KeyFile = ""
# whether print access log
PrintAccessLog = false
# whether enable pprof
PProf = false
# expose prometheus /metrics?
ExposeMetrics = true
# http graceful shutdown timeout, unit: s
ShutdownTimeout = 30
# max content length: 64M
MaxContentLength = 67108864
# http server read timeout, unit: s
ReadTimeout = 20
# http server write timeout, unit: s
WriteTimeout = 40
# http server idle timeout, unit: s
IdleTimeout = 120
[HTTP.Pushgw]
Enable = true
# [HTTP.Pushgw.BasicAuth]
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"

View File

@@ -144,14 +144,17 @@ func (tc *TargetCacheType) syncTargets() error {
return errors.WithMessage(err, "failed to call TargetGetsAll")
}
metaMap := tc.GetHostMetas(lst)
m := make(map[string]*models.Target)
for i := 0; i < len(lst); i++ {
lst[i].FillTagsMap()
if meta, ok := metaMap[lst[i].Ident]; ok {
lst[i].FillMeta(meta)
if tc.ctx.IsCenter {
metaMap := tc.GetHostMetas(lst)
for i := 0; i < len(lst); i++ {
if meta, ok := metaMap[lst[i].Ident]; ok {
lst[i].FillMeta(meta)
}
}
}
for i := 0; i < len(lst); i++ {
m[lst[i].Ident] = lst[i]
}

View File

@@ -58,6 +58,17 @@ func (uc *UserCacheType) GetByUserId(id int64) *models.User {
return uc.users[id]
}
func (uc *UserCacheType) GetByUsername(name string) *models.User {
uc.RLock()
defer uc.RUnlock()
for _, v := range uc.users {
if v.Username == name {
return v
}
}
return nil
}
func (uc *UserCacheType) GetByUserIds(ids []int64) []*models.User {
set := make(map[int64]struct{})

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/toolkits/pkg/logger"
)
@@ -220,7 +221,7 @@ func (e *AlertCurEvent) ToHis(ctx *ctx.Context) *AlertHisEvent {
}
}
func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
func (e *AlertCurEvent) DB2FE() {
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
e.CallbacksJSON = strings.Fields(e.Callbacks)
@@ -229,6 +230,18 @@ func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
json.Unmarshal([]byte(e.RuleConfig), &e.RuleConfigJson)
}
func (e *AlertCurEvent) FE2DB() {
e.NotifyChannels = strings.Join(e.NotifyChannelsJSON, " ")
e.NotifyGroups = strings.Join(e.NotifyGroupsJSON, " ")
e.Callbacks = strings.Join(e.CallbacksJSON, " ")
e.Tags = strings.Join(e.TagsJSON, ",,")
b, _ := json.Marshal(e.AnnotationsJSON)
e.Annotations = string(b)
b, _ = json.Marshal(e.RuleConfigJson)
e.RuleConfig = string(b)
}
func (e *AlertCurEvent) DB2Mem() {
e.IsRecovered = false
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
@@ -356,7 +369,7 @@ func AlertCurEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
@@ -390,7 +403,7 @@ func AlertCurEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
return nil, nil
}
lst[0].DB2FE(ctx)
lst[0].DB2FE()
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
return lst[0], nil
@@ -435,14 +448,19 @@ func AlertCurEventGetByIds(ctx *ctx.Context, ids []int64) ([]*AlertCurEvent, err
err := DB(ctx).Where("id in ?", ids).Order("id desc").Find(&lst).Error
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
return lst, err
}
func AlertCurEventGetByRuleIdAndCluster(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
func AlertCurEventGetByRuleIdAndDsId(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertCurEvent](ctx, "/v1/n9e/alert-cur-events-get-by-rid?rid="+strconv.FormatInt(ruleId, 10)+"&dsid="+strconv.FormatInt(datasourceId, 10))
return lst, err
}
var lst []*AlertCurEvent
err := DB(ctx).Where("rule_id=? and datasource_id = ?", ruleId, datasourceId).Find(&lst).Error
return lst, err

View File

@@ -2,6 +2,7 @@ package models
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
@@ -61,7 +62,7 @@ func (e *AlertHisEvent) Add(ctx *ctx.Context) error {
return Insert(ctx, e)
}
func (e *AlertHisEvent) DB2FE(ctx *ctx.Context) {
func (e *AlertHisEvent) DB2FE() {
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
e.CallbacksJSON = strings.Fields(e.Callbacks)
@@ -182,7 +183,7 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
@@ -200,7 +201,7 @@ func AlertHisEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
return nil, nil
}
lst[0].DB2FE(ctx)
lst[0].DB2FE()
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
return lst[0], nil
@@ -259,3 +260,53 @@ func AlertHisEventUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error
}
return nil
}
func EventPersist(ctx *ctx.Context, event *AlertCurEvent) error {
has, err := AlertCurEventExists(ctx, "hash=?", event.Hash)
if err != nil {
return fmt.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
}
his := event.ToHis(ctx)
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(ctx); err != nil {
return fmt.Errorf("add his event error:%v", err)
}
if has {
// 活跃告警表中有记录,删之
err = AlertCurEventDelByHash(ctx, event.Hash)
if err != nil {
return fmt.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
}
if !event.IsRecovered {
// 恢复事件从活跃告警列表彻底删掉告警事件要重新加进来新的event
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(ctx); err != nil {
return fmt.Errorf("add cur event err:%v", err)
}
}
}
return nil
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return nil
}
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(ctx); err != nil {
return fmt.Errorf("add cur event error:%v", err)
}
}
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
"github.com/pkg/errors"
@@ -78,7 +79,15 @@ func AlertMuteGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertMu
}
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (lst []AlertMute, err error) {
session := DB(ctx).Where("group_id = ? and prod in (?)", bgid, prods)
session := DB(ctx)
if bgid != -1 {
session = session.Where("group_id = ?", bgid)
}
if len(prods) > 0 {
session = session.Where("prod in (?)", prods)
}
if query != "" {
arr := strings.Fields(query)
@@ -220,6 +229,12 @@ func AlertMuteDel(ctx *ctx.Context, ids []int64) error {
}
func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
var stats []*Statistics
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_mute")
return s, err
}
// clean expired first
buf := int64(30)
err := DB(ctx).Where("etime < ? and mute_time_type = 0", time.Now().Unix()-buf).Delete(new(AlertMute)).Error
@@ -229,7 +244,6 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
session := DB(ctx).Model(&AlertMute{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics
err = session.Find(&stats).Error
if err != nil {
return nil, err
@@ -240,9 +254,20 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
// get my cluster's mutes
var lst []*AlertMute
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Model(&AlertMute{})
var lst []*AlertMute
err := session.Find(&lst).Error
if err != nil {
return nil, err

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
@@ -643,6 +644,17 @@ func AlertRuleGets(ctx *ctx.Context, groupId int64) ([]AlertRule, error) {
}
func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertRule](ctx, "/v1/n9e/alert-rules?disabled=0")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Where("disabled = ?", 0)
var lst []*AlertRule
@@ -662,7 +674,11 @@ func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
}
func AlertRulesGetsBy(ctx *ctx.Context, prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) {
session := DB(ctx).Where("prod in (?)", prods)
session := DB(ctx)
if len(prods) > 0 {
session = session.Where("prod in (?)", prods)
}
if query != "" {
arr := strings.Fields(query)
@@ -734,6 +750,11 @@ func AlertRuleGetName(ctx *ctx.Context, id int64) (string, error) {
}
func AlertRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_rule")
return s, err
}
session := DB(ctx).Model(&AlertRule{}).Select("count(*) as total", "max(update_at) as last_updated").Where("disabled = ?", 0)
var stats []*Statistics

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
@@ -52,6 +53,18 @@ func AlertSubscribeGets(ctx *ctx.Context, groupId int64) (lst []AlertSubscribe,
return
}
func AlertSubscribeGetsByService(ctx *ctx.Context) (lst []AlertSubscribe, err error) {
err = DB(ctx).Find(&lst).Error
if err != nil {
return
}
for i := range lst {
lst[i].DB2FE()
}
return
}
func AlertSubscribeGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertSubscribe, error) {
var lst []*AlertSubscribe
err := DB(ctx).Where(where, args...).Find(&lst).Error
@@ -262,6 +275,11 @@ func AlertSubscribeDel(ctx *ctx.Context, ids []int64) error {
}
func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_subscribe")
return s, err
}
session := DB(ctx).Model(&AlertSubscribe{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics
@@ -274,6 +292,17 @@ func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
}
func AlertSubscribeGetsAll(ctx *ctx.Context) ([]*AlertSubscribe, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertSubscribe](ctx, "/v1/n9e/alert-subscribes")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
// get my cluster's subscribes
session := DB(ctx).Model(&AlertSubscribe{})

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type AlertingEngines struct {
@@ -128,7 +129,23 @@ func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interfa
return arr, err
}
type HeartbeatInfo struct {
Instance string `json:"instance"`
EngineCluster string `json:"engine_cluster"`
DatasourceId int64 `json:"datasource_id"`
}
func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
if !ctx.IsCenter {
info := HeartbeatInfo{
Instance: instance,
EngineCluster: cluster,
DatasourceId: datasourceId,
}
err := poster.PostByUrls(ctx, "/v1/n9e/server-heartbeat", info)
return err
}
var total int64
err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
if err != nil {

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"gorm.io/gorm"
@@ -64,9 +65,17 @@ func (bg *BusiGroup) FillUserGroups(ctx *ctx.Context) error {
func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
var err error
if !ctx.IsCenter {
lst, err = poster.GetByUrls[[]*BusiGroup](ctx, "/v1/n9e/busi-groups")
if err != nil {
return nil, err
}
} else {
err = DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
}
}
ret := make(map[int64]*BusiGroup)
@@ -77,6 +86,12 @@ func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
return ret, nil
}
func BusiGroupGetAll(ctx *ctx.Context) ([]*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Find(&lst).Error
return lst, err
}
func BusiGroupGet(ctx *ctx.Context, where string, args ...interface{}) (*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Where(where, args...).Find(&lst).Error
@@ -324,6 +339,11 @@ func BusiGroupAdd(ctx *ctx.Context, name string, labelEnable int, labelValue str
}
func BusiGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=busi_group")
return s, err
}
session := DB(ctx).Model(&BusiGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -46,6 +46,18 @@ type Statistics struct {
LastUpdated int64 `gorm:"last_updated"`
}
func StatisticsGet[T any](ctx *ctx.Context, model T) (*Statistics, error) {
var stats []*Statistics
session := DB(ctx).Model(model).Select("count(*) as total", "max(update_at) as last_updated")
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
return stats[0], nil
}
func MatchDatasource(ids []int64, id int64) bool {
if id == DatasourceIdAll {
return true

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/runner"
@@ -43,6 +44,13 @@ func InitSalt(ctx *ctx.Context) {
}
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) {
if !ctx.IsCenter {
if !ctx.IsCenter {
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
}
var lst []string
err := DB(ctx).Model(&Configs{}).Where("ckey=?", ckey).Pluck("cval", &lst).Error
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
@@ -112,6 +113,17 @@ func (ds *Datasource) Get(ctx *ctx.Context) error {
}
func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]Datasource](ctx, "/v1/n9e/datasources")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, nil
}
var dss []Datasource
err := DB(ctx).Find(&dss).Error
@@ -123,6 +135,11 @@ func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
}
func GetDatasourceIdsByEngineName(ctx *ctx.Context, engineName string) ([]int64, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]int64](ctx, "/v1/n9e/datasource-ids?name="+engineName)
return lst, err
}
var dss []Datasource
var ids []int64
err := DB(ctx).Where("cluster_name = ?", engineName).Find(&dss).Error
@@ -267,19 +284,32 @@ func (ds *Datasource) DB2FE() error {
func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
var lst []*Datasource
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
var err error
if !ctx.IsCenter {
lst, err = poster.GetByUrls[[]*Datasource](ctx, "/v1/n9e/datasources")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
} else {
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
err := lst[i].DB2FE()
if err != nil {
logger.Warningf("get ds:%+v err:%v", lst[i], err)
continue
}
}
}
ret := make(map[int64]*Datasource)
for i := 0; i < len(lst); i++ {
err := lst[i].DB2FE()
if err != nil {
logger.Warningf("get ds:%+v err:%v", lst[i], err)
continue
}
ret[lst[i].Id] = lst[i]
}
@@ -287,6 +317,11 @@ func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
}
func DatasourceStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=datasource")
return s, err
}
session := DB(ctx).Model(&Datasource{}).Select("count(*) as total", "max(updated_at) as last_updated")
var stats []*Statistics

View File

@@ -8,6 +8,7 @@ import (
"strings"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/pkg/errors"
@@ -59,6 +60,11 @@ func NotifyTplCountByChannel(c *ctx.Context, channel string) (int64, error) {
}
func NotifyTplGets(c *ctx.Context) ([]*NotifyTpl, error) {
if !c.IsCenter {
lst, err := poster.GetByUrls[[]*NotifyTpl](c, "/v1/n9e/notify-tpls")
return lst, err
}
var lst []*NotifyTpl
err := DB(c).Find(&lst).Error
return lst, err
@@ -88,6 +94,10 @@ func ListTpls(c *ctx.Context) (map[string]*template.Template, error) {
}
func InitNotifyConfig(c *ctx.Context, tplDir string) {
if !c.IsCenter {
return
}
// init notify channel
cval, err := ConfigsGet(c, NOTIFYCHANNEL)
if err != nil {

View File

@@ -7,6 +7,8 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
@@ -197,7 +199,33 @@ func RecordingRuleGetById(ctx *ctx.Context, id int64) (*RecordingRule, error) {
return RecordingRuleGet(ctx, "id=?", id)
}
func RecordingRuleEnabledGets(ctx *ctx.Context) ([]*RecordingRule, error) {
session := DB(ctx)
var lst []*RecordingRule
err := session.Where("disabled = ?", 0).Find(&lst).Error
if err != nil {
return lst, err
}
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
}
return lst, nil
}
func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*RecordingRule](ctx, "/v1/n9e/recording-rules")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Where("disabled = ?", 0)
var lst []*RecordingRule
@@ -217,6 +245,11 @@ func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
}
func RecordingRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=recording_rule")
return s, err
}
session := DB(ctx).Model(&RecordingRule{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"gorm.io/gorm"
@@ -19,7 +20,7 @@ type Target struct {
Note string `json:"note"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series
TagsMap map[string]string `json:"tags_maps" gorm:"-"` // internal use, append tags to series
UpdateAt int64 `json:"update_at"`
UnixTime int64 `json:"unixtime" gorm:"-"`
@@ -59,6 +60,11 @@ func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
}
func TargetStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=target")
return s, err
}
var stats []*Statistics
err := DB(ctx).Model(&Target{}).Select("count(*) as total", "max(update_at) as last_updated").Find(&stats).Error
if err != nil {
@@ -164,8 +170,16 @@ func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limi
}
func TargetGetsAll(ctx *ctx.Context) ([]*Target, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*Target](ctx, "/v1/n9e/targets")
return lst, err
}
var lst []*Target
err := DB(ctx).Model(&Target{}).Find(&lst).Error
for i := 0; i < len(lst); i++ {
lst[i].FillTagsMap()
}
return lst, err
}

View File

@@ -1,6 +1,9 @@
package models
import "github.com/ccfos/nightingale/v6/pkg/ctx"
import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type TaskRecord struct {
Id int64 `json:"id" gorm:"primaryKey"`
@@ -27,6 +30,11 @@ func (r *TaskRecord) TableName() string {
// create task
func (r *TaskRecord) Add(ctx *ctx.Context) error {
if !ctx.IsCenter {
err := poster.PostByUrls(ctx, "/v1/n9e/task-record-add", r)
return err
}
return Insert(ctx, r)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ldapx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
@@ -347,12 +348,18 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int) ([]User, error)
for i := 0; i < len(users); i++ {
users[i].RolesLst = strings.Fields(users[i].Roles)
users[i].Admin = users[i].IsAdmin()
users[i].Password = ""
}
return users, nil
}
func UserGetAll(ctx *ctx.Context) ([]*User, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*User](ctx, "/v1/n9e/users")
return lst, err
}
var lst []*User
err := DB(ctx).Find(&lst).Error
if err == nil {
@@ -429,6 +436,11 @@ func (u *User) CheckPerm(ctx *ctx.Context, operation string) (bool, error) {
}
func UserStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user")
return s, err
}
session := DB(ctx).Model(&User{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -4,6 +4,8 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/str"
"gorm.io/gorm"
@@ -111,6 +113,11 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
}
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/users")
return lst, err
}
var lst []*UserGroup
err := DB(ctx).Find(&lst).Error
return lst, err
@@ -139,6 +146,11 @@ func (ug *UserGroup) DelMembers(ctx *ctx.Context, userIds []int64) error {
}
func UserGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user_group")
return s, err
}
session := DB(ctx).Model(&UserGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -1,6 +1,9 @@
package models
import "github.com/ccfos/nightingale/v6/pkg/ctx"
import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type UserGroupMember struct {
GroupId int64
@@ -54,8 +57,13 @@ func UserGroupMemberDel(ctx *ctx.Context, groupId int64, userIds []int64) error
return DB(ctx).Where("group_id = ? and user_id in ?", groupId, userIds).Delete(&UserGroupMember{}).Error
}
func UserGroupMemberGetAll(ctx *ctx.Context) ([]UserGroupMember, error) {
var lst []UserGroupMember
func UserGroupMemberGetAll(ctx *ctx.Context) ([]*UserGroupMember, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*UserGroupMember](ctx, "/v1/n9e/user-group-members")
return lst, err
}
var lst []*UserGroupMember
err := DB(ctx).Find(&lst).Error
return lst, err
}

View File

@@ -3,18 +3,29 @@ package ctx
import (
"context"
"github.com/ccfos/nightingale/v6/conf"
"gorm.io/gorm"
)
type Context struct {
DB *gorm.DB
Ctx context.Context
DB *gorm.DB
CenterApi conf.CenterApi
Ctx context.Context
IsCenter bool
}
func NewContext(ctx context.Context, db *gorm.DB) *Context {
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, centerApis ...conf.CenterApi) *Context {
var api conf.CenterApi
if len(centerApis) > 0 {
api = centerApis[0]
}
return &Context{
Ctx: ctx,
DB: db,
Ctx: ctx,
DB: db,
CenterApi: api,
IsCenter: isCenter,
}
}

View File

@@ -2,14 +2,160 @@ package poster
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)
type DataResponse[T any] struct {
Dat T `json:"dat"`
Err string `json:"err"`
}
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
var err error
addrs := ctx.CenterApi.Addrs
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
dat, e := GetByUrl[T](url, ctx.CenterApi.BasicAuth)
if e != nil {
err = e
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
continue
}
return dat, nil
}
var dat T
return dat, err
}
func GetByUrl[T any](url string, basicAuth gin.Accounts) (T, error) {
var dat T
req := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond)
if len(basicAuth) > 0 {
var token string
for username, password := range basicAuth {
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
}
if len(token) > 0 {
req = req.Header("Authorization", "Basic "+token)
}
}
resp, err := req.Response()
if err != nil {
return dat, fmt.Errorf("failed to fetch from url: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return dat, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return dat, fmt.Errorf("failed to read response body: %w", err)
}
var dataResp DataResponse[T]
err = json.Unmarshal(body, &dataResp)
if err != nil {
return dat, fmt.Errorf("failed to decode response: %w", err)
}
if dataResp.Err != "" {
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
return dataResp.Dat, nil
}
type PostResponse struct {
Err string `json:"err"`
}
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
addrs := ctx.CenterApi.Addrs
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
err = PostByUrl(url, ctx.CenterApi.BasicAuth, v)
if err == nil {
return
}
}
return
}
func PostByUrl(url string, basicAuth gin.Accounts, v interface{}) (err error) {
var bs []byte
bs, err = json.Marshal(v)
if err != nil {
return
}
bf := bytes.NewBuffer(bs)
client := http.Client{
Timeout: 10 * time.Second,
}
req, err := http.NewRequest("POST", url, bf)
req.Header.Set("Content-Type", "application/json")
if len(basicAuth) > 0 {
var token string
for username, password := range basicAuth {
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
}
if len(token) > 0 {
req.Header.Set("Authorization", "Basic "+token)
}
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to fetch from url: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
var dataResp PostResponse
err = json.Unmarshal(body, &dataResp)
if err != nil {
return fmt.Errorf("failed to decode response: %w body:%s", err, string(body))
}
if dataResp.Err != "" {
return fmt.Errorf("error from server: %s", dataResp.Err)
}
return nil
}
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {
var bs []byte

View File

@@ -42,7 +42,7 @@ func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]stri
}
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warningf("%v post to %s got error: %v", w.Opts, w.Opts.Url, err)
logger.Debug("example timeseries:", items[0].String())
}
}

View File

@@ -30,6 +30,10 @@ func (po *PromOption) Equal(target PromOption) bool {
return false
}
if po.WriteAddr != target.WriteAddr {
return false
}
if po.Timeout != target.Timeout {
return false
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/prometheus/client_golang/api"
@@ -42,11 +43,25 @@ type PromSetting struct {
}
func (pc *PromClientMap) loadFromDatabase() {
datasources, err := models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
var datasources []*models.Datasource
var err error
if !pc.ctx.IsCenter {
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.PROMETHEUS)
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
for i := 0; i < len(datasources); i++ {
datasources[i].FE2DB()
}
} else {
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
}
newCluster := make(map[int64]struct{})
for _, ds := range datasources {
dsId := ds.Id

View File

@@ -4,21 +4,23 @@ import (
"sync"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)
type Set struct {
sync.Mutex
items map[string]struct{}
db *gorm.DB
ctx *ctx.Context
}
func New(db *gorm.DB) *Set {
func New(ctx *ctx.Context) *Set {
set := &Set{
items: make(map[string]struct{}),
db: db,
ctx: ctx,
}
set.Init()
@@ -68,7 +70,7 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
lst = append(lst, ident)
num++
if num == 100 {
if err := s.updateTargets(lst, now); err != nil {
if err := s.UpdateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
lst = lst[:0]
@@ -76,18 +78,32 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
}
}
if err := s.updateTargets(lst, now); err != nil {
if err := s.UpdateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
}
func (s *Set) updateTargets(lst []string, now int64) error {
type TargetUpdate struct {
Lst []string `json:"lst"`
Now int64 `json:"now"`
}
func (s *Set) UpdateTargets(lst []string, now int64) error {
if !s.ctx.IsCenter {
t := TargetUpdate{
Lst: lst,
Now: now,
}
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
return err
}
count := int64(len(lst))
if count == 0 {
return nil
}
ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
ret := s.ctx.DB.Table("target").Where("ident in ?", lst).Update("update_at", now)
if ret.Error != nil {
return ret.Error
}
@@ -98,14 +114,14 @@ func (s *Set) updateTargets(lst []string, now int64) error {
// there are some idents not found in db, so insert them
var exists []string
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
err := s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}
news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
err = s.ctx.DB.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)
type PushgwProvider struct {
@@ -31,13 +30,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
db, err := storage.New(config.DB)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
idents := idents.New(db)
idents := idents.New(ctx)
stats := memsto.NewSyncStats()

View File

@@ -59,10 +59,12 @@ func (rt *Router) Config(r *gin.Engine) {
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
r.POST("/openfalcon/push", auth, rt.falconPush)
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
} else {
// no need basic auth
r.POST("/opentsdb/put", rt.openTSDBPut)
r.POST("/openfalcon/push", rt.falconPush)
r.POST("/prometheus/v1/write", rt.remoteWrite)
r.POST("/v1/n9e/target-update", rt.targetUpdate)
}
}

View File

@@ -0,0 +1,14 @@
package router
import (
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func (rt *Router) targetUpdate(c *gin.Context) {
var f idents.TargetUpdate
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(rt.IdentSet.UpdateTargets(f.Lst, f.Now))
}