mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
17 Commits
v8.2.2
...
get-config
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ec2ed3dedf | ||
|
|
053a5b860b | ||
|
|
c1eafd35f2 | ||
|
|
b0e9ac15d8 | ||
|
|
987c0c86af | ||
|
|
b75cd7a16f | ||
|
|
a57fe3231f | ||
|
|
9870fdce4c | ||
|
|
a7b9323318 | ||
|
|
a38e87d29c | ||
|
|
b2669d3a38 | ||
|
|
806d50c5f6 | ||
|
|
7b664323d3 | ||
|
|
23189f81e3 | ||
|
|
edfdb5b8fe | ||
|
|
43f6266c2f | ||
|
|
77b4d6d044 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -42,7 +42,7 @@ _test
|
||||
/docker/n9e
|
||||
/docker/mysqldata
|
||||
/docker/experience_pg_vm/pgdata
|
||||
/etc.local
|
||||
/etc.local*
|
||||
|
||||
.alerts
|
||||
.idea
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
)
|
||||
|
||||
func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
@@ -37,21 +36,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := storage.New(config.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
|
||||
redis, err := storage.NewRedis(config.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
alertStats := astats.NewSyncStats()
|
||||
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, nil)
|
||||
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
|
||||
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
|
||||
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)
|
||||
@@ -62,7 +52,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, false)
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
|
||||
@@ -77,7 +67,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
}
|
||||
|
||||
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
|
||||
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
|
||||
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
|
||||
userCache := memsto.NewUserCache(ctx, syncStats)
|
||||
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
|
||||
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
|
||||
@@ -85,12 +75,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
|
||||
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
|
||||
|
||||
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)
|
||||
naming := naming.NewNaming(ctx, alertc.Heartbeat)
|
||||
|
||||
writers := writer.NewWriters(pushgwc)
|
||||
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
|
||||
|
||||
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
|
||||
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/queue"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -82,78 +83,17 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
|
||||
}
|
||||
|
||||
func (e *Consumer) persist(event *models.AlertCurEvent) {
|
||||
has, err := models.AlertCurEventExists(e.ctx, "hash=?", event.Hash)
|
||||
if err != nil {
|
||||
logger.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
|
||||
return
|
||||
}
|
||||
|
||||
his := event.ToHis(e.ctx)
|
||||
|
||||
// 不管是告警还是恢复,全量告警里都要记录
|
||||
if err := his.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
|
||||
if has {
|
||||
// 活跃告警表中有记录,删之
|
||||
err = models.AlertCurEventDelByHash(e.ctx, event.Hash)
|
||||
if !e.ctx.IsCenter {
|
||||
event.DB2FE()
|
||||
err := poster.PostByUrls(e.ctx, "/v1/n9e/event-persist", event)
|
||||
if err != nil {
|
||||
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
|
||||
return
|
||||
logger.Errorf("event%+v persist err:%v", event, err)
|
||||
}
|
||||
|
||||
if !event.IsRecovered {
|
||||
// 恢复事件,从活跃告警列表彻底删掉,告警事件,要重新加进来新的event
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if event.IsRecovered {
|
||||
// alert_cur_event表里没有数据,表示之前没告警,结果现在报了恢复,神奇....理论上不应该出现的
|
||||
return
|
||||
}
|
||||
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
err := models.EventPersist(e.ctx, event)
|
||||
if err != nil {
|
||||
logger.Errorf("event%+v persist err:%v", event, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,7 +199,7 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
|
||||
}
|
||||
|
||||
// handle event callbacks
|
||||
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.notifyConfigCache.GetIbex())
|
||||
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.userCache, e.notifyConfigCache.GetIbex())
|
||||
|
||||
// handle global webhooks
|
||||
sender.SendWebhooks(notifyTarget.ToWebhookList(), event)
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
isCenter bool
|
||||
// key: hash
|
||||
alertRules map[string]*AlertRuleWorker
|
||||
|
||||
@@ -38,11 +37,10 @@ type Scheduler struct {
|
||||
stats *astats.Stats
|
||||
}
|
||||
|
||||
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
|
||||
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
|
||||
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
|
||||
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
|
||||
scheduler := &Scheduler{
|
||||
isCenter: isCenter,
|
||||
aconf: aconf,
|
||||
alertRules: make(map[string]*AlertRuleWorker),
|
||||
|
||||
@@ -108,7 +106,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
}
|
||||
} else if rule.IsHostRule() && s.isCenter {
|
||||
} else if rule.IsHostRule() && s.ctx.IsCenter {
|
||||
// all host rule will be processed by center instance
|
||||
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
|
||||
continue
|
||||
|
||||
@@ -109,7 +109,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
}
|
||||
|
||||
func (arw *AlertRuleWorker) Stop() {
|
||||
logger.Infof("%s stopped", arw.Key())
|
||||
logger.Infof("rule_eval %s stopped", arw.Key())
|
||||
close(arw.quit)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -16,14 +17,12 @@ import (
|
||||
type Naming struct {
|
||||
ctx *ctx.Context
|
||||
heartbeatConfig aconf.HeartbeatConfig
|
||||
isCenter bool
|
||||
}
|
||||
|
||||
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
|
||||
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
|
||||
naming := &Naming{
|
||||
ctx: ctx,
|
||||
heartbeatConfig: heartbeat,
|
||||
isCenter: isCenter,
|
||||
}
|
||||
naming.Heartbeats()
|
||||
return naming
|
||||
@@ -45,6 +44,10 @@ func (n *Naming) Heartbeats() error {
|
||||
}
|
||||
|
||||
func (n *Naming) loopDeleteInactiveInstances() {
|
||||
if !n.ctx.IsCenter {
|
||||
return
|
||||
}
|
||||
|
||||
interval := time.Duration(10) * time.Minute
|
||||
for {
|
||||
time.Sleep(interval)
|
||||
@@ -112,7 +115,7 @@ func (n *Naming) heartbeat() error {
|
||||
localss[datasourceIds[i]] = newss
|
||||
}
|
||||
|
||||
if n.isCenter {
|
||||
if n.ctx.IsCenter {
|
||||
// 如果是中心节点,还需要处理 host 类型的告警规则,host 类型告警规则,和数据源无关,想复用下数据源的 hash ring,想用一个虚假的数据源 id 来处理
|
||||
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
|
||||
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, HostDatasource)
|
||||
@@ -146,6 +149,11 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
|
||||
return nil, fmt.Errorf("cluster is empty")
|
||||
}
|
||||
|
||||
if !n.ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?dsid="+fmt.Sprintf("%d", datasourceId))
|
||||
return lst, err
|
||||
}
|
||||
|
||||
// 30秒内有心跳,就认为是活的
|
||||
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
|
||||
logger.Errorf("rule not found %+v", anomalyPoints)
|
||||
return
|
||||
}
|
||||
|
||||
p.rule = cachedRule
|
||||
now := time.Now().Unix()
|
||||
alertingKeys := map[string]struct{}{}
|
||||
|
||||
@@ -337,7 +337,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
|
||||
func (p *Processor) RecoverAlertCurEventFromDb() {
|
||||
p.pendings = NewAlertCurEventMap(nil)
|
||||
|
||||
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(p.ctx, p.rule.Id, p.datasourceId)
|
||||
curEvents, err := models.AlertCurEventGetByRuleIdAndDsId(p.ctx, p.rule.Id, p.datasourceId)
|
||||
if err != nil {
|
||||
logger.Errorf("recover event from db for rule:%s failed, err:%s", p.Key(), err)
|
||||
p.fires = NewAlertCurEventMap(nil)
|
||||
|
||||
@@ -48,6 +48,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.Use(gin.BasicAuth(rt.HTTP.Alert.BasicAuth))
|
||||
}
|
||||
service.POST("/event", rt.pushEventToQueue)
|
||||
service.POST("/event-persist", rt.eventPersist)
|
||||
service.POST("/make-event", rt.makeEvent)
|
||||
}
|
||||
|
||||
|
||||
@@ -83,6 +83,13 @@ func (rt *Router) pushEventToQueue(c *gin.Context) {
|
||||
ginx.NewRender(c).Message(nil)
|
||||
}
|
||||
|
||||
func (rt *Router) eventPersist(c *gin.Context) {
|
||||
var event *models.AlertCurEvent
|
||||
ginx.BindJSON(c, &event)
|
||||
event.FE2DB()
|
||||
ginx.NewRender(c).Message(models.EventPersist(rt.Ctx, event))
|
||||
}
|
||||
|
||||
type eventForm struct {
|
||||
Alert bool `json:"alert"`
|
||||
AnomalyPoints []common.AnomalyPoint `json:"vectors"`
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
|
||||
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
|
||||
for _, url := range urls {
|
||||
if url == "" {
|
||||
continue
|
||||
@@ -23,7 +23,7 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
|
||||
|
||||
if strings.HasPrefix(url, "${ibex}") {
|
||||
if !event.IsRecovered {
|
||||
handleIbex(ctx, url, event, targetCache, ibexConf)
|
||||
handleIbex(ctx, url, event, targetCache, userCache, ibexConf)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -60,7 +60,7 @@ type TaskCreateReply struct {
|
||||
Dat int64 `json:"dat"` // task.id
|
||||
}
|
||||
|
||||
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
|
||||
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
|
||||
arr := strings.Split(url, "/")
|
||||
|
||||
var idstr string
|
||||
@@ -103,7 +103,7 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
|
||||
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache)
|
||||
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
|
||||
return
|
||||
@@ -176,12 +176,8 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
|
||||
}
|
||||
}
|
||||
|
||||
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType) (bool, error) {
|
||||
user, err := models.UserGetByUsername(ctx, username)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
user := userCache.GetByUsername(username)
|
||||
if user != nil && user.IsAdmin() {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -1,18 +1,12 @@
|
||||
package cconf
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type Center struct {
|
||||
Plugins []Plugin
|
||||
BasicAuth gin.Accounts
|
||||
MetricsYamlFile string
|
||||
OpsYamlFile string
|
||||
BuiltinIntegrationsDir string
|
||||
I18NHeaderKey string
|
||||
MetricDesc MetricDescType
|
||||
TargetMetrics map[string]string
|
||||
AnonymousAccess AnonymousAccess
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"path"
|
||||
|
||||
"github.com/toolkits/pkg/file"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
)
|
||||
|
||||
// metricDesc , As load map happens before read map, there is no necessary to use concurrent map for metric desc store
|
||||
@@ -33,10 +32,10 @@ func GetMetricDesc(lang, metric string) string {
|
||||
return MetricDesc.CommonDesc[metric]
|
||||
}
|
||||
|
||||
func LoadMetricsYaml(metricsYamlFile string) error {
|
||||
func LoadMetricsYaml(configDir, metricsYamlFile string) error {
|
||||
fp := metricsYamlFile
|
||||
if fp == "" {
|
||||
fp = path.Join(runner.Cwd, "etc", "metrics.yaml")
|
||||
fp = path.Join(configDir, "metrics.yaml")
|
||||
}
|
||||
if !file.IsExist(fp) {
|
||||
return nil
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"path"
|
||||
|
||||
"github.com/toolkits/pkg/file"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
)
|
||||
|
||||
var Operations = Operation{}
|
||||
@@ -19,10 +18,10 @@ type Ops struct {
|
||||
Ops []string `yaml:"ops" json:"ops"`
|
||||
}
|
||||
|
||||
func LoadOpsYaml(opsYamlFile string) error {
|
||||
func LoadOpsYaml(configDir string, opsYamlFile string) error {
|
||||
fp := opsYamlFile
|
||||
if fp == "" {
|
||||
fp = path.Join(runner.Cwd, "etc", "ops.yaml")
|
||||
fp = path.Join(configDir, "ops.yaml")
|
||||
}
|
||||
if !file.IsExist(fp) {
|
||||
return nil
|
||||
|
||||
@@ -33,8 +33,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, fmt.Errorf("failed to init config: %v", err)
|
||||
}
|
||||
|
||||
cconf.LoadMetricsYaml(config.Center.MetricsYamlFile)
|
||||
cconf.LoadOpsYaml(config.Center.OpsYamlFile)
|
||||
cconf.LoadMetricsYaml(configDir, config.Center.MetricsYamlFile)
|
||||
cconf.LoadOpsYaml(configDir, config.Center.OpsYamlFile)
|
||||
|
||||
logxClean, err := logx.Init(config.Log)
|
||||
if err != nil {
|
||||
@@ -47,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), db, true)
|
||||
models.InitRoot(ctx)
|
||||
|
||||
redis, err := storage.NewRedis(config.Redis)
|
||||
@@ -56,7 +56,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
}
|
||||
|
||||
metas := metas.New(redis)
|
||||
idents := idents.New(db)
|
||||
idents := idents.New(ctx)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
alertStats := astats.NewSyncStats()
|
||||
@@ -73,7 +73,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, true)
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
|
||||
|
||||
writers := writer.NewWriters(config.Pushgw)
|
||||
|
||||
|
||||
@@ -148,6 +148,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/auth/callback", rt.loginCallback)
|
||||
pages.GET("/auth/callback/cas", rt.loginCallbackCas)
|
||||
pages.GET("/auth/callback/oauth", rt.loginCallbackOAuth)
|
||||
pages.GET("/auth/perms", rt.allPerms)
|
||||
|
||||
pages.GET("/metrics/desc", rt.metricsDescGetFile)
|
||||
pages.POST("/metrics/desc", rt.metricsDescGetMap)
|
||||
@@ -303,7 +304,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
pages.GET("/role/:id/ops", rt.auth(), rt.admin(), rt.operationOfRole)
|
||||
pages.PUT("/role/:id/ops", rt.auth(), rt.admin(), rt.roleBindOperation)
|
||||
pages.GET("operation", rt.operations)
|
||||
pages.GET("/operation", rt.operations)
|
||||
|
||||
pages.GET("/notify-tpls", rt.auth(), rt.admin(), rt.notifyTplGets)
|
||||
pages.PUT("/notify-tpl/content", rt.auth(), rt.admin(), rt.notifyTplUpdateContent)
|
||||
@@ -339,7 +340,10 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.POST("/users", rt.userAddPost)
|
||||
service.GET("/users", rt.userFindAll)
|
||||
|
||||
service.GET("/targets", rt.targetGets)
|
||||
service.GET("/user-groups", rt.userGroupGetsByService)
|
||||
service.GET("/user-group-members", rt.userGroupMemberGetsByService)
|
||||
|
||||
service.GET("/targets", rt.targetGetsByService)
|
||||
service.GET("/targets/tags", rt.targetGetTags)
|
||||
service.POST("/targets/tags", rt.targetBindTagsByService)
|
||||
service.DELETE("/targets/tags", rt.targetUnbindTagsByService)
|
||||
@@ -351,16 +355,29 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.GET("/alert-rule/:arid", rt.alertRuleGet)
|
||||
service.GET("/alert-rules", rt.alertRulesGetByService)
|
||||
|
||||
service.GET("/alert-subscribes", rt.alertSubscribeGetsByService)
|
||||
|
||||
service.GET("/busi-groups", rt.busiGroupGetsByService)
|
||||
|
||||
service.GET("/datasources", rt.datasourceGetsByService)
|
||||
service.GET("/datasource-ids", rt.getDatasourceIds)
|
||||
service.POST("/server-heartbeat", rt.serverHeartbeat)
|
||||
service.GET("/servers-active", rt.serversActive)
|
||||
|
||||
service.GET("/recording-rules", rt.recordingRuleGetsByService)
|
||||
|
||||
service.GET("/alert-mutes", rt.alertMuteGets)
|
||||
service.POST("/alert-mutes", rt.alertMuteAddByService)
|
||||
service.DELETE("/alert-mutes", rt.alertMuteDel)
|
||||
|
||||
service.GET("/alert-cur-events", rt.alertCurEventsList)
|
||||
service.GET("/alert-cur-events-get-by-rid", rt.alertCurEventsGetByRid)
|
||||
service.GET("/alert-his-events", rt.alertHisEventsList)
|
||||
service.GET("/alert-his-event/:eid", rt.alertHisEventGet)
|
||||
|
||||
service.GET("/config/:id", rt.configGet)
|
||||
service.GET("/configs", rt.configsGet)
|
||||
service.GET("/config", rt.configGetByKey)
|
||||
service.PUT("/configs", rt.configsPut)
|
||||
service.POST("/configs", rt.configsPost)
|
||||
service.DELETE("/configs", rt.configsDel)
|
||||
@@ -368,7 +385,11 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.POST("/conf-prop/encrypt", rt.confPropEncrypt)
|
||||
service.POST("/conf-prop/decrypt", rt.confPropDecrypt)
|
||||
|
||||
service.GET("/datasource-ids", rt.getDatasourceIds)
|
||||
service.GET("/statistic", rt.statistic)
|
||||
|
||||
service.GET("/notify-tpls", rt.notifyTplGets)
|
||||
|
||||
service.POST("/task-record-add", rt.taskRecordAdd)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -128,6 +128,13 @@ func (rt *Router) alertCurEventsCardDetails(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
// alertCurEventsGetByRid
|
||||
func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
|
||||
rid := ginx.QueryInt64(c, "rid")
|
||||
dsId := ginx.QueryInt64(c, "dsid")
|
||||
ginx.NewRender(c).Data(models.AlertCurEventGetByRuleIdAndDsId(rt.Ctx, rid, dsId))
|
||||
}
|
||||
|
||||
// 列表方式,拉取活跃告警
|
||||
func (rt *Router) alertCurEventsList(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
|
||||
@@ -27,7 +27,12 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
|
||||
}
|
||||
|
||||
func (rt *Router) alertRulesGetByService(c *gin.Context) {
|
||||
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
|
||||
prods := []string{}
|
||||
prodStr := ginx.QueryStr(c, "prods", "")
|
||||
if prodStr != "" {
|
||||
prods = strings.Split(ginx.QueryStr(c, "prods", ""), ",")
|
||||
}
|
||||
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
algorithm := ginx.QueryStr(c, "algorithm", "")
|
||||
cluster := ginx.QueryStr(c, "cluster", "")
|
||||
|
||||
@@ -110,3 +110,8 @@ func (rt *Router) alertSubscribeDel(c *gin.Context) {
|
||||
|
||||
ginx.NewRender(c).Message(models.AlertSubscribeDel(rt.Ctx, f.Ids))
|
||||
}
|
||||
|
||||
func (rt *Router) alertSubscribeGetsByService(c *gin.Context) {
|
||||
lst, err := models.AlertSubscribeGetsByService(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -123,6 +123,11 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) busiGroupGetsByService(c *gin.Context) {
|
||||
lst, err := models.BusiGroupGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
// 这个接口只有在活跃告警页面才调用,获取各个BG的活跃告警数量
|
||||
func (rt *Router) busiGroupAlertingsGets(c *gin.Context) {
|
||||
ids := ginx.QueryStr(c, "ids", "")
|
||||
|
||||
@@ -20,6 +20,11 @@ func (rt *Router) configGet(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(configs, err)
|
||||
}
|
||||
|
||||
func (rt *Router) configGetByKey(c *gin.Context) {
|
||||
config, err := models.ConfigsGet(rt.Ctx, ginx.QueryStr(c, "key"))
|
||||
ginx.NewRender(c).Data(config, err)
|
||||
}
|
||||
|
||||
func (rt *Router) configsDel(c *gin.Context) {
|
||||
var f idsForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
@@ -36,6 +36,12 @@ func (rt *Router) datasourceList(c *gin.Context) {
|
||||
Render(c, list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) datasourceGetsByService(c *gin.Context) {
|
||||
typ := ginx.QueryStr(c, "typ", "")
|
||||
lst, err := models.GetDatasourcesGetsBy(rt.Ctx, typ, "", "", "")
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
type datasourceBrief struct {
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
||||
@@ -17,6 +17,41 @@ import (
|
||||
|
||||
const defaultLimit = 300
|
||||
|
||||
func (rt *Router) statistic(c *gin.Context) {
|
||||
name := ginx.QueryStr(c, "name")
|
||||
var model interface{}
|
||||
var err error
|
||||
var statistics *models.Statistics
|
||||
switch name {
|
||||
case "alert_mute":
|
||||
model = models.AlertMute{}
|
||||
case "alert_rule":
|
||||
model = models.AlertRule{}
|
||||
case "alert_subscribe":
|
||||
model = models.AlertSubscribe{}
|
||||
case "busi_group":
|
||||
model = models.BusiGroup{}
|
||||
case "recording_rule":
|
||||
model = models.RecordingRule{}
|
||||
case "target":
|
||||
model = models.Target{}
|
||||
case "user":
|
||||
model = models.User{}
|
||||
case "user_group":
|
||||
model = models.UserGroup{}
|
||||
case "datasource":
|
||||
// datasource update_at is different from others
|
||||
statistics, err = models.DatasourceStatistics(rt.Ctx)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
return
|
||||
default:
|
||||
ginx.Bomb(http.StatusBadRequest, "invalid name")
|
||||
}
|
||||
|
||||
statistics, err = models.StatisticsGet(rt.Ctx, model)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
}
|
||||
|
||||
func queryDatasourceIds(c *gin.Context) []int64 {
|
||||
datasourceIds := ginx.QueryStr(c, "datasource_ids", "")
|
||||
datasourceIds = strings.ReplaceAll(datasourceIds, ",", " ")
|
||||
|
||||
@@ -21,7 +21,7 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
|
||||
|
||||
func (rt *Router) alertMuteGets(c *gin.Context) {
|
||||
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
|
||||
bgid := ginx.QueryInt64(c, "bgid", 0)
|
||||
bgid := ginx.QueryInt64(c, "bgid", -1)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, query)
|
||||
|
||||
|
||||
@@ -19,6 +19,11 @@ func (rt *Router) recordingRuleGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
func (rt *Router) recordingRuleGetsByService(c *gin.Context) {
|
||||
ars, err := models.RecordingRuleEnabledGets(rt.Ctx)
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
func (rt *Router) recordingRuleGet(c *gin.Context) {
|
||||
rrid := ginx.UrlParamInt64(c, "rrid")
|
||||
|
||||
|
||||
@@ -83,3 +83,18 @@ func (rt *Router) roleGets(c *gin.Context) {
|
||||
lst, err := models.RoleGetsAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) allPerms(c *gin.Context) {
|
||||
roles, err := models.RoleGetsAll(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
m := make(map[string][]string)
|
||||
for _, r := range roles {
|
||||
lst, err := models.OperationsOfRole(rt.Ctx, strings.Fields(r.Name))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
m[r.Name] = lst
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(m, err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -16,3 +18,17 @@ func (rt *Router) serverClustersGet(c *gin.Context) {
|
||||
list, err := models.AlertingEngineGetsClusters(rt.Ctx, "")
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) serverHeartbeat(c *gin.Context) {
|
||||
var req models.HeartbeatInfo
|
||||
ginx.BindJSON(c, &req)
|
||||
err := models.AlertingEngineHeartbeatWithCluster(rt.Ctx, req.Instance, req.EngineCluster, req.DatasourceId)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) serversActive(c *gin.Context) {
|
||||
datasourceId := ginx.QueryInt64(c, "dsid")
|
||||
|
||||
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
|
||||
ginx.NewRender(c).Data(servers, err)
|
||||
}
|
||||
|
||||
@@ -101,6 +101,11 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) targetGetsByService(c *gin.Context) {
|
||||
lst, err := models.TargetGetsAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) targetGetTags(c *gin.Context) {
|
||||
idents := ginx.QueryStr(c, "idents", "")
|
||||
idents = strings.ReplaceAll(idents, ",", " ")
|
||||
|
||||
@@ -120,6 +120,12 @@ func (f *taskForm) HandleFH(fh string) {
|
||||
f.Title = f.Title + " FH: " + fh
|
||||
}
|
||||
|
||||
func (rt *Router) taskRecordAdd(c *gin.Context) {
|
||||
var f *models.TaskRecord
|
||||
ginx.BindJSON(c, &f)
|
||||
ginx.NewRender(c).Message(f.Add(rt.Ctx))
|
||||
}
|
||||
|
||||
func (rt *Router) taskAdd(c *gin.Context) {
|
||||
var f taskForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
@@ -12,19 +12,8 @@ import (
|
||||
)
|
||||
|
||||
func (rt *Router) userFindAll(c *gin.Context) {
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
|
||||
total, err := models.UserTotal(rt.Ctx, query)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
list, err := models.UserGets(rt.Ctx, query, limit, ginx.Offset(c, limit))
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": list,
|
||||
"total": total,
|
||||
}, nil)
|
||||
list, err := models.UserGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) userGets(c *gin.Context) {
|
||||
|
||||
@@ -29,6 +29,17 @@ func (rt *Router) userGroupGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) userGroupGetsByService(c *gin.Context) {
|
||||
lst, err := models.UserGroupGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
// user group member get by service
|
||||
func (rt *Router) userGroupMemberGetsByService(c *gin.Context) {
|
||||
members, err := models.UserGroupMemberGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(members, err)
|
||||
}
|
||||
|
||||
type userGroupForm struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Note string `json:"note"`
|
||||
|
||||
@@ -18,7 +18,7 @@ func Upgrade(configFile string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), db, false)
|
||||
for _, cluster := range config.Clusters {
|
||||
count, err := models.GetDatasourcesCountBy(ctx, "", "", cluster.Name)
|
||||
if err != nil {
|
||||
|
||||
18
conf/conf.go
18
conf/conf.go
@@ -14,20 +14,28 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type ConfigType struct {
|
||||
Global GlobalConfig
|
||||
Log logx.Config
|
||||
HTTP httpx.Config
|
||||
DB ormx.DBConfig
|
||||
Redis storage.RedisConfig
|
||||
Global GlobalConfig
|
||||
Log logx.Config
|
||||
HTTP httpx.Config
|
||||
DB ormx.DBConfig
|
||||
Redis storage.RedisConfig
|
||||
CenterApi CenterApi
|
||||
|
||||
Pushgw pconf.Pushgw
|
||||
Alert aconf.Alert
|
||||
Center cconf.Center
|
||||
}
|
||||
|
||||
type CenterApi struct {
|
||||
Addrs []string
|
||||
BasicAuth gin.Accounts
|
||||
}
|
||||
|
||||
type GlobalConfig struct {
|
||||
RunMode string
|
||||
}
|
||||
|
||||
62
etc/alert.toml.example
Normal file
62
etc/alert.toml.example
Normal file
@@ -0,0 +1,62 @@
|
||||
[Global]
|
||||
RunMode = "release"
|
||||
|
||||
[CenterApi]
|
||||
Addrs = ["http://127.0.0.1:17000"]
|
||||
[CenterApi.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[Alert]
|
||||
[Alert.Heartbeat]
|
||||
# auto detect if blank
|
||||
IP = ""
|
||||
# unit ms
|
||||
Interval = 1000
|
||||
EngineName = "default"
|
||||
|
||||
[Log]
|
||||
# log write dir
|
||||
Dir = "logs"
|
||||
# log level: DEBUG INFO WARNING ERROR
|
||||
Level = "DEBUG"
|
||||
# stdout, stderr, file
|
||||
Output = "stdout"
|
||||
# # rotate by time
|
||||
# KeepHours = 4
|
||||
# # rotate by size
|
||||
# RotateNum = 3
|
||||
# # unit: MB
|
||||
# RotateSize = 256
|
||||
|
||||
[HTTP]
|
||||
# http listening address
|
||||
Host = "0.0.0.0"
|
||||
# http listening port
|
||||
Port = 17001
|
||||
# https cert file path
|
||||
CertFile = ""
|
||||
# https key file path
|
||||
KeyFile = ""
|
||||
# whether print access log
|
||||
PrintAccessLog = false
|
||||
# whether enable pprof
|
||||
PProf = false
|
||||
# expose prometheus /metrics?
|
||||
ExposeMetrics = true
|
||||
# http graceful shutdown timeout, unit: s
|
||||
ShutdownTimeout = 30
|
||||
# max content length: 64M
|
||||
MaxContentLength = 67108864
|
||||
# http server read timeout, unit: s
|
||||
ReadTimeout = 20
|
||||
# http server write timeout, unit: s
|
||||
WriteTimeout = 40
|
||||
# http server idle timeout, unit: s
|
||||
IdleTimeout = 120
|
||||
|
||||
[HTTP.Alert]
|
||||
Enable = true
|
||||
[HTTP.Alert.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
|
||||
103
etc/pushgw.toml.example
Normal file
103
etc/pushgw.toml.example
Normal file
@@ -0,0 +1,103 @@
|
||||
[Global]
|
||||
RunMode = "release"
|
||||
|
||||
[CenterApi]
|
||||
Addrs = ["http://127.0.0.1:17000"]
|
||||
[CenterApi.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[Pushgw]
|
||||
# use target labels in database instead of in series
|
||||
LabelRewrite = true
|
||||
# # default busigroup key name
|
||||
# BusiGroupLabelKey = "busigroup"
|
||||
# ForceUseServerTS = false
|
||||
|
||||
# [Pushgw.DebugSample]
|
||||
# ident = "xx"
|
||||
# __name__ = "xx"
|
||||
|
||||
# [Pushgw.WriterOpt]
|
||||
# # Writer Options
|
||||
# QueueCount = 1000
|
||||
# QueueMaxSize = 1000000
|
||||
# QueuePopSize = 1000
|
||||
# # ident or metric
|
||||
# ShardingKey = "ident"
|
||||
|
||||
[[Pushgw.Writers]]
|
||||
# Url = "http://127.0.0.1:8480/insert/0/prometheus/api/v1/write"
|
||||
Url = "http://127.0.0.1:9090/api/v1/write"
|
||||
# Basic auth username
|
||||
BasicAuthUser = ""
|
||||
# Basic auth password
|
||||
BasicAuthPass = ""
|
||||
# timeout settings, unit: ms
|
||||
Headers = ["X-From", "n9e"]
|
||||
Timeout = 10000
|
||||
DialTimeout = 3000
|
||||
TLSHandshakeTimeout = 30000
|
||||
ExpectContinueTimeout = 1000
|
||||
IdleConnTimeout = 90000
|
||||
# time duration, unit: ms
|
||||
KeepAlive = 30000
|
||||
MaxConnsPerHost = 0
|
||||
MaxIdleConns = 100
|
||||
MaxIdleConnsPerHost = 100
|
||||
## Optional TLS Config
|
||||
# UseTLS = false
|
||||
# TLSCA = "/etc/n9e/ca.pem"
|
||||
# TLSCert = "/etc/n9e/cert.pem"
|
||||
# TLSKey = "/etc/n9e/key.pem"
|
||||
# InsecureSkipVerify = false
|
||||
# [[Writers.WriteRelabels]]
|
||||
# Action = "replace"
|
||||
# SourceLabels = ["__address__"]
|
||||
# Regex = "([^:]+)(?::\\d+)?"
|
||||
# Replacement = "$1:80"
|
||||
# TargetLabel = "__address__"
|
||||
|
||||
[Log]
|
||||
# log write dir
|
||||
Dir = "logs"
|
||||
# log level: DEBUG INFO WARNING ERROR
|
||||
Level = "DEBUG"
|
||||
# stdout, stderr, file
|
||||
Output = "stdout"
|
||||
# # rotate by time
|
||||
# KeepHours = 4
|
||||
# # rotate by size
|
||||
# RotateNum = 3
|
||||
# # unit: MB
|
||||
# RotateSize = 256
|
||||
|
||||
[HTTP]
|
||||
# http listening address
|
||||
Host = "0.0.0.0"
|
||||
# http listening port
|
||||
Port = 17000
|
||||
# https cert file path
|
||||
CertFile = ""
|
||||
# https key file path
|
||||
KeyFile = ""
|
||||
# whether print access log
|
||||
PrintAccessLog = false
|
||||
# whether enable pprof
|
||||
PProf = false
|
||||
# expose prometheus /metrics?
|
||||
ExposeMetrics = true
|
||||
# http graceful shutdown timeout, unit: s
|
||||
ShutdownTimeout = 30
|
||||
# max content length: 64M
|
||||
MaxContentLength = 67108864
|
||||
# http server read timeout, unit: s
|
||||
ReadTimeout = 20
|
||||
# http server write timeout, unit: s
|
||||
WriteTimeout = 40
|
||||
# http server idle timeout, unit: s
|
||||
IdleTimeout = 120
|
||||
|
||||
[HTTP.Pushgw]
|
||||
Enable = true
|
||||
# [HTTP.Pushgw.BasicAuth]
|
||||
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
@@ -144,14 +144,17 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
return errors.WithMessage(err, "failed to call TargetGetsAll")
|
||||
}
|
||||
|
||||
metaMap := tc.GetHostMetas(lst)
|
||||
|
||||
m := make(map[string]*models.Target)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FillTagsMap()
|
||||
if meta, ok := metaMap[lst[i].Ident]; ok {
|
||||
lst[i].FillMeta(meta)
|
||||
if tc.ctx.IsCenter {
|
||||
metaMap := tc.GetHostMetas(lst)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
if meta, ok := metaMap[lst[i].Ident]; ok {
|
||||
lst[i].FillMeta(meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].Ident] = lst[i]
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,17 @@ func (uc *UserCacheType) GetByUserId(id int64) *models.User {
|
||||
return uc.users[id]
|
||||
}
|
||||
|
||||
func (uc *UserCacheType) GetByUsername(name string) *models.User {
|
||||
uc.RLock()
|
||||
defer uc.RUnlock()
|
||||
for _, v := range uc.users {
|
||||
if v.Username == name {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (uc *UserCacheType) GetByUserIds(ids []int64) []*models.User {
|
||||
set := make(map[int64]struct{})
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -220,7 +221,7 @@ func (e *AlertCurEvent) ToHis(ctx *ctx.Context) *AlertHisEvent {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
|
||||
func (e *AlertCurEvent) DB2FE() {
|
||||
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
e.CallbacksJSON = strings.Fields(e.Callbacks)
|
||||
@@ -229,6 +230,18 @@ func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
|
||||
json.Unmarshal([]byte(e.RuleConfig), &e.RuleConfigJson)
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) FE2DB() {
|
||||
e.NotifyChannels = strings.Join(e.NotifyChannelsJSON, " ")
|
||||
e.NotifyGroups = strings.Join(e.NotifyGroupsJSON, " ")
|
||||
e.Callbacks = strings.Join(e.CallbacksJSON, " ")
|
||||
e.Tags = strings.Join(e.TagsJSON, ",,")
|
||||
b, _ := json.Marshal(e.AnnotationsJSON)
|
||||
e.Annotations = string(b)
|
||||
|
||||
b, _ = json.Marshal(e.RuleConfigJson)
|
||||
e.RuleConfig = string(b)
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) DB2Mem() {
|
||||
e.IsRecovered = false
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
@@ -356,7 +369,7 @@ func AlertCurEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
|
||||
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,7 +403,7 @@ func AlertCurEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lst[0].DB2FE(ctx)
|
||||
lst[0].DB2FE()
|
||||
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
|
||||
|
||||
return lst[0], nil
|
||||
@@ -435,14 +448,19 @@ func AlertCurEventGetByIds(ctx *ctx.Context, ids []int64) ([]*AlertCurEvent, err
|
||||
err := DB(ctx).Where("id in ?", ids).Order("id desc").Find(&lst).Error
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func AlertCurEventGetByRuleIdAndCluster(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
|
||||
func AlertCurEventGetByRuleIdAndDsId(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertCurEvent](ctx, "/v1/n9e/alert-cur-events-get-by-rid?rid="+strconv.FormatInt(ruleId, 10)+"&dsid="+strconv.FormatInt(datasourceId, 10))
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*AlertCurEvent
|
||||
err := DB(ctx).Where("rule_id=? and datasource_id = ?", ruleId, datasourceId).Find(&lst).Error
|
||||
return lst, err
|
||||
|
||||
@@ -2,6 +2,7 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -61,7 +62,7 @@ func (e *AlertHisEvent) Add(ctx *ctx.Context) error {
|
||||
return Insert(ctx, e)
|
||||
}
|
||||
|
||||
func (e *AlertHisEvent) DB2FE(ctx *ctx.Context) {
|
||||
func (e *AlertHisEvent) DB2FE() {
|
||||
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
e.CallbacksJSON = strings.Fields(e.Callbacks)
|
||||
@@ -182,7 +183,7 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
|
||||
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,7 +201,7 @@ func AlertHisEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lst[0].DB2FE(ctx)
|
||||
lst[0].DB2FE()
|
||||
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
|
||||
|
||||
return lst[0], nil
|
||||
@@ -259,3 +260,53 @@ func AlertHisEventUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func EventPersist(ctx *ctx.Context, event *AlertCurEvent) error {
|
||||
has, err := AlertCurEventExists(ctx, "hash=?", event.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
|
||||
}
|
||||
|
||||
his := event.ToHis(ctx)
|
||||
|
||||
// 不管是告警还是恢复,全量告警里都要记录
|
||||
if err := his.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add his event error:%v", err)
|
||||
}
|
||||
|
||||
if has {
|
||||
// 活跃告警表中有记录,删之
|
||||
err = AlertCurEventDelByHash(ctx, event.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
|
||||
}
|
||||
|
||||
if !event.IsRecovered {
|
||||
// 恢复事件,从活跃告警列表彻底删掉,告警事件,要重新加进来新的event
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add cur event err:%v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if event.IsRecovered {
|
||||
// alert_cur_event表里没有数据,表示之前没告警,结果现在报了恢复,神奇....理论上不应该出现的
|
||||
return nil
|
||||
}
|
||||
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add cur event error:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -78,7 +79,15 @@ func AlertMuteGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertMu
|
||||
}
|
||||
|
||||
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (lst []AlertMute, err error) {
|
||||
session := DB(ctx).Where("group_id = ? and prod in (?)", bgid, prods)
|
||||
session := DB(ctx)
|
||||
|
||||
if bgid != -1 {
|
||||
session = session.Where("group_id = ?", bgid)
|
||||
}
|
||||
|
||||
if len(prods) > 0 {
|
||||
session = session.Where("prod in (?)", prods)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
@@ -220,6 +229,12 @@ func AlertMuteDel(ctx *ctx.Context, ids []int64) error {
|
||||
}
|
||||
|
||||
func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
var stats []*Statistics
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_mute")
|
||||
return s, err
|
||||
}
|
||||
|
||||
// clean expired first
|
||||
buf := int64(30)
|
||||
err := DB(ctx).Where("etime < ? and mute_time_type = 0", time.Now().Unix()-buf).Delete(new(AlertMute)).Error
|
||||
@@ -229,7 +244,6 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
session := DB(ctx).Model(&AlertMute{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
err = session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -240,9 +254,20 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
|
||||
// get my cluster's mutes
|
||||
var lst []*AlertMute
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertMute{})
|
||||
|
||||
var lst []*AlertMute
|
||||
err := session.Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -643,6 +644,17 @@ func AlertRuleGets(ctx *ctx.Context, groupId int64) ([]AlertRule, error) {
|
||||
}
|
||||
|
||||
func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertRule](ctx, "/v1/n9e/alert-rules?disabled=0")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Where("disabled = ?", 0)
|
||||
|
||||
var lst []*AlertRule
|
||||
@@ -662,7 +674,11 @@ func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
|
||||
}
|
||||
|
||||
func AlertRulesGetsBy(ctx *ctx.Context, prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) {
|
||||
session := DB(ctx).Where("prod in (?)", prods)
|
||||
session := DB(ctx)
|
||||
|
||||
if len(prods) > 0 {
|
||||
session = session.Where("prod in (?)", prods)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
@@ -734,6 +750,11 @@ func AlertRuleGetName(ctx *ctx.Context, id int64) (string, error) {
|
||||
}
|
||||
|
||||
func AlertRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_rule")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertRule{}).Select("count(*) as total", "max(update_at) as last_updated").Where("disabled = ?", 0)
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -52,6 +53,18 @@ func AlertSubscribeGets(ctx *ctx.Context, groupId int64) (lst []AlertSubscribe,
|
||||
return
|
||||
}
|
||||
|
||||
func AlertSubscribeGetsByService(ctx *ctx.Context) (lst []AlertSubscribe, err error) {
|
||||
err = DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i := range lst {
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func AlertSubscribeGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertSubscribe, error) {
|
||||
var lst []*AlertSubscribe
|
||||
err := DB(ctx).Where(where, args...).Find(&lst).Error
|
||||
@@ -262,6 +275,11 @@ func AlertSubscribeDel(ctx *ctx.Context, ids []int64) error {
|
||||
}
|
||||
|
||||
func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_subscribe")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertSubscribe{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
@@ -274,6 +292,17 @@ func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
}
|
||||
|
||||
func AlertSubscribeGetsAll(ctx *ctx.Context) ([]*AlertSubscribe, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertSubscribe](ctx, "/v1/n9e/alert-subscribes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
// get my cluster's subscribes
|
||||
session := DB(ctx).Model(&AlertSubscribe{})
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type AlertingEngines struct {
|
||||
@@ -128,7 +129,23 @@ func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interfa
|
||||
return arr, err
|
||||
}
|
||||
|
||||
type HeartbeatInfo struct {
|
||||
Instance string `json:"instance"`
|
||||
EngineCluster string `json:"engine_cluster"`
|
||||
DatasourceId int64 `json:"datasource_id"`
|
||||
}
|
||||
|
||||
func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
|
||||
if !ctx.IsCenter {
|
||||
info := HeartbeatInfo{
|
||||
Instance: instance,
|
||||
EngineCluster: cluster,
|
||||
DatasourceId: datasourceId,
|
||||
}
|
||||
err := poster.PostByUrls(ctx, "/v1/n9e/server-heartbeat", info)
|
||||
return err
|
||||
}
|
||||
|
||||
var total int64
|
||||
err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
|
||||
if err != nil {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
@@ -64,9 +65,17 @@ func (bg *BusiGroup) FillUserGroups(ctx *ctx.Context) error {
|
||||
|
||||
func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var err error
|
||||
if !ctx.IsCenter {
|
||||
lst, err = poster.GetByUrls[[]*BusiGroup](ctx, "/v1/n9e/busi-groups")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err = DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[int64]*BusiGroup)
|
||||
@@ -77,6 +86,12 @@ func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func BusiGroupGetAll(ctx *ctx.Context) ([]*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func BusiGroupGet(ctx *ctx.Context, where string, args ...interface{}) (*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Where(where, args...).Find(&lst).Error
|
||||
@@ -324,6 +339,11 @@ func BusiGroupAdd(ctx *ctx.Context, name string, labelEnable int, labelValue str
|
||||
}
|
||||
|
||||
func BusiGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=busi_group")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&BusiGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -46,6 +46,18 @@ type Statistics struct {
|
||||
LastUpdated int64 `gorm:"last_updated"`
|
||||
}
|
||||
|
||||
func StatisticsGet[T any](ctx *ctx.Context, model T) (*Statistics, error) {
|
||||
var stats []*Statistics
|
||||
session := DB(ctx).Model(model).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
err := session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stats[0], nil
|
||||
}
|
||||
|
||||
func MatchDatasource(ids []int64, id int64) bool {
|
||||
if id == DatasourceIdAll {
|
||||
return true
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
@@ -43,6 +44,13 @@ func InitSalt(ctx *ctx.Context) {
|
||||
}
|
||||
|
||||
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) {
|
||||
if !ctx.IsCenter {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
|
||||
return s, err
|
||||
}
|
||||
}
|
||||
|
||||
var lst []string
|
||||
err := DB(ctx).Model(&Configs{}).Where("ckey=?", ckey).Pluck("cval", &lst).Error
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
@@ -112,6 +113,17 @@ func (ds *Datasource) Get(ctx *ctx.Context) error {
|
||||
}
|
||||
|
||||
func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]Datasource](ctx, "/v1/n9e/datasources")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
var dss []Datasource
|
||||
err := DB(ctx).Find(&dss).Error
|
||||
|
||||
@@ -123,6 +135,11 @@ func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
|
||||
}
|
||||
|
||||
func GetDatasourceIdsByEngineName(ctx *ctx.Context, engineName string) ([]int64, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]int64](ctx, "/v1/n9e/datasource-ids?name="+engineName)
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var dss []Datasource
|
||||
var ids []int64
|
||||
err := DB(ctx).Where("cluster_name = ?", engineName).Find(&dss).Error
|
||||
@@ -267,19 +284,32 @@ func (ds *Datasource) DB2FE() error {
|
||||
|
||||
func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
|
||||
var lst []*Datasource
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var err error
|
||||
if !ctx.IsCenter {
|
||||
lst, err = poster.GetByUrls[[]*Datasource](ctx, "/v1/n9e/datasources")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
} else {
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
err := lst[i].DB2FE()
|
||||
if err != nil {
|
||||
logger.Warningf("get ds:%+v err:%v", lst[i], err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[int64]*Datasource)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
err := lst[i].DB2FE()
|
||||
if err != nil {
|
||||
logger.Warningf("get ds:%+v err:%v", lst[i], err)
|
||||
continue
|
||||
}
|
||||
|
||||
ret[lst[i].Id] = lst[i]
|
||||
}
|
||||
|
||||
@@ -287,6 +317,11 @@ func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
|
||||
}
|
||||
|
||||
func DatasourceStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=datasource")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&Datasource{}).Select("count(*) as total", "max(updated_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -59,6 +60,11 @@ func NotifyTplCountByChannel(c *ctx.Context, channel string) (int64, error) {
|
||||
}
|
||||
|
||||
func NotifyTplGets(c *ctx.Context) ([]*NotifyTpl, error) {
|
||||
if !c.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*NotifyTpl](c, "/v1/n9e/notify-tpls")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*NotifyTpl
|
||||
err := DB(c).Find(&lst).Error
|
||||
return lst, err
|
||||
@@ -88,6 +94,10 @@ func ListTpls(c *ctx.Context) (map[string]*template.Template, error) {
|
||||
}
|
||||
|
||||
func InitNotifyConfig(c *ctx.Context, tplDir string) {
|
||||
if !c.IsCenter {
|
||||
return
|
||||
}
|
||||
|
||||
// init notify channel
|
||||
cval, err := ConfigsGet(c, NOTIFYCHANNEL)
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -197,7 +199,33 @@ func RecordingRuleGetById(ctx *ctx.Context, id int64) (*RecordingRule, error) {
|
||||
return RecordingRuleGet(ctx, "id=?", id)
|
||||
}
|
||||
|
||||
func RecordingRuleEnabledGets(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
session := DB(ctx)
|
||||
|
||||
var lst []*RecordingRule
|
||||
err := session.Where("disabled = ?", 0).Find(&lst).Error
|
||||
if err != nil {
|
||||
return lst, err
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
}
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*RecordingRule](ctx, "/v1/n9e/recording-rules")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Where("disabled = ?", 0)
|
||||
|
||||
var lst []*RecordingRule
|
||||
@@ -217,6 +245,11 @@ func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
}
|
||||
|
||||
func RecordingRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=recording_rule")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&RecordingRule{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
@@ -19,7 +20,7 @@ type Target struct {
|
||||
Note string `json:"note"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series
|
||||
TagsMap map[string]string `json:"tags_maps" gorm:"-"` // internal use, append tags to series
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
|
||||
UnixTime int64 `json:"unixtime" gorm:"-"`
|
||||
@@ -59,6 +60,11 @@ func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
|
||||
}
|
||||
|
||||
func TargetStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=target")
|
||||
return s, err
|
||||
}
|
||||
|
||||
var stats []*Statistics
|
||||
err := DB(ctx).Model(&Target{}).Select("count(*) as total", "max(update_at) as last_updated").Find(&stats).Error
|
||||
if err != nil {
|
||||
@@ -164,8 +170,16 @@ func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limi
|
||||
}
|
||||
|
||||
func TargetGetsAll(ctx *ctx.Context) ([]*Target, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*Target](ctx, "/v1/n9e/targets")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*Target
|
||||
err := DB(ctx).Model(&Target{}).Find(&lst).Error
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FillTagsMap()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import "github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type TaskRecord struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
@@ -27,6 +30,11 @@ func (r *TaskRecord) TableName() string {
|
||||
|
||||
// create task
|
||||
func (r *TaskRecord) Add(ctx *ctx.Context) error {
|
||||
if !ctx.IsCenter {
|
||||
err := poster.PostByUrls(ctx, "/v1/n9e/task-record-add", r)
|
||||
return err
|
||||
}
|
||||
|
||||
return Insert(ctx, r)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ldapx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tidwall/gjson"
|
||||
@@ -347,12 +348,18 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int) ([]User, error)
|
||||
for i := 0; i < len(users); i++ {
|
||||
users[i].RolesLst = strings.Fields(users[i].Roles)
|
||||
users[i].Admin = users[i].IsAdmin()
|
||||
users[i].Password = ""
|
||||
}
|
||||
|
||||
return users, nil
|
||||
}
|
||||
|
||||
func UserGetAll(ctx *ctx.Context) ([]*User, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*User](ctx, "/v1/n9e/users")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*User
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err == nil {
|
||||
@@ -429,6 +436,11 @@ func (u *User) CheckPerm(ctx *ctx.Context, operation string) (bool, error) {
|
||||
}
|
||||
|
||||
func UserStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&User{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/str"
|
||||
"gorm.io/gorm"
|
||||
@@ -111,6 +113,11 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
|
||||
}
|
||||
|
||||
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/users")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*UserGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
@@ -139,6 +146,11 @@ func (ug *UserGroup) DelMembers(ctx *ctx.Context, userIds []int64) error {
|
||||
}
|
||||
|
||||
func UserGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user_group")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&UserGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import "github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type UserGroupMember struct {
|
||||
GroupId int64
|
||||
@@ -54,8 +57,13 @@ func UserGroupMemberDel(ctx *ctx.Context, groupId int64, userIds []int64) error
|
||||
return DB(ctx).Where("group_id = ? and user_id in ?", groupId, userIds).Delete(&UserGroupMember{}).Error
|
||||
}
|
||||
|
||||
func UserGroupMemberGetAll(ctx *ctx.Context) ([]UserGroupMember, error) {
|
||||
var lst []UserGroupMember
|
||||
func UserGroupMemberGetAll(ctx *ctx.Context) ([]*UserGroupMember, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*UserGroupMember](ctx, "/v1/n9e/user-group-members")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*UserGroupMember
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
@@ -3,18 +3,29 @@ package ctx
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/conf"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
DB *gorm.DB
|
||||
Ctx context.Context
|
||||
DB *gorm.DB
|
||||
CenterApi conf.CenterApi
|
||||
Ctx context.Context
|
||||
IsCenter bool
|
||||
}
|
||||
|
||||
func NewContext(ctx context.Context, db *gorm.DB) *Context {
|
||||
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, centerApis ...conf.CenterApi) *Context {
|
||||
var api conf.CenterApi
|
||||
if len(centerApis) > 0 {
|
||||
api = centerApis[0]
|
||||
}
|
||||
|
||||
return &Context{
|
||||
Ctx: ctx,
|
||||
DB: db,
|
||||
Ctx: ctx,
|
||||
DB: db,
|
||||
CenterApi: api,
|
||||
IsCenter: isCenter,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,160 @@ package poster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/net/httplib"
|
||||
)
|
||||
|
||||
type DataResponse[T any] struct {
|
||||
Dat T `json:"dat"`
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
|
||||
var err error
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
dat, e := GetByUrl[T](url, ctx.CenterApi.BasicAuth)
|
||||
if e != nil {
|
||||
err = e
|
||||
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
|
||||
continue
|
||||
}
|
||||
return dat, nil
|
||||
}
|
||||
|
||||
var dat T
|
||||
return dat, err
|
||||
}
|
||||
|
||||
func GetByUrl[T any](url string, basicAuth gin.Accounts) (T, error) {
|
||||
var dat T
|
||||
req := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond)
|
||||
|
||||
if len(basicAuth) > 0 {
|
||||
var token string
|
||||
for username, password := range basicAuth {
|
||||
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
|
||||
}
|
||||
|
||||
if len(token) > 0 {
|
||||
req = req.Header("Authorization", "Basic "+token)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := req.Response()
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to fetch from url: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return dat, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
var dataResp DataResponse[T]
|
||||
err = json.Unmarshal(body, &dataResp)
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
if dataResp.Err != "" {
|
||||
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
|
||||
return dataResp.Dat, nil
|
||||
}
|
||||
|
||||
type PostResponse struct {
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
err = PostByUrl(url, ctx.CenterApi.BasicAuth, v)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func PostByUrl(url string, basicAuth gin.Accounts, v interface{}) (err error) {
|
||||
var bs []byte
|
||||
bs, err = json.Marshal(v)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
bf := bytes.NewBuffer(bs)
|
||||
client := http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, bf)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
if len(basicAuth) > 0 {
|
||||
var token string
|
||||
for username, password := range basicAuth {
|
||||
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
|
||||
}
|
||||
if len(token) > 0 {
|
||||
req.Header.Set("Authorization", "Basic "+token)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch from url: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
var dataResp PostResponse
|
||||
err = json.Unmarshal(body, &dataResp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode response: %w body:%s", err, string(body))
|
||||
}
|
||||
|
||||
if dataResp.Err != "" {
|
||||
return fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {
|
||||
var bs []byte
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]stri
|
||||
}
|
||||
|
||||
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
|
||||
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
|
||||
logger.Warningf("%v post to %s got error: %v", w.Opts, w.Opts.Url, err)
|
||||
logger.Debug("example timeseries:", items[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ func (po *PromOption) Equal(target PromOption) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if po.WriteAddr != target.WriteAddr {
|
||||
return false
|
||||
}
|
||||
|
||||
if po.Timeout != target.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/prom"
|
||||
|
||||
"github.com/prometheus/client_golang/api"
|
||||
@@ -42,11 +43,25 @@ type PromSetting struct {
|
||||
}
|
||||
|
||||
func (pc *PromClientMap) loadFromDatabase() {
|
||||
datasources, err := models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
var datasources []*models.Datasource
|
||||
var err error
|
||||
if !pc.ctx.IsCenter {
|
||||
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.PROMETHEUS)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
}
|
||||
for i := 0; i < len(datasources); i++ {
|
||||
datasources[i].FE2DB()
|
||||
}
|
||||
} else {
|
||||
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newCluster := make(map[int64]struct{})
|
||||
for _, ds := range datasources {
|
||||
dsId := ds.Id
|
||||
|
||||
@@ -4,21 +4,23 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Set struct {
|
||||
sync.Mutex
|
||||
items map[string]struct{}
|
||||
db *gorm.DB
|
||||
ctx *ctx.Context
|
||||
}
|
||||
|
||||
func New(db *gorm.DB) *Set {
|
||||
func New(ctx *ctx.Context) *Set {
|
||||
set := &Set{
|
||||
items: make(map[string]struct{}),
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
set.Init()
|
||||
@@ -68,7 +70,7 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
|
||||
lst = append(lst, ident)
|
||||
num++
|
||||
if num == 100 {
|
||||
if err := s.updateTargets(lst, now); err != nil {
|
||||
if err := s.UpdateTargets(lst, now); err != nil {
|
||||
logger.Errorf("failed to update targets: %v", err)
|
||||
}
|
||||
lst = lst[:0]
|
||||
@@ -76,18 +78,32 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.updateTargets(lst, now); err != nil {
|
||||
if err := s.UpdateTargets(lst, now); err != nil {
|
||||
logger.Errorf("failed to update targets: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargets(lst []string, now int64) error {
|
||||
type TargetUpdate struct {
|
||||
Lst []string `json:"lst"`
|
||||
Now int64 `json:"now"`
|
||||
}
|
||||
|
||||
func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
if !s.ctx.IsCenter {
|
||||
t := TargetUpdate{
|
||||
Lst: lst,
|
||||
Now: now,
|
||||
}
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
return err
|
||||
}
|
||||
|
||||
count := int64(len(lst))
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
|
||||
ret := s.ctx.DB.Table("target").Where("ident in ?", lst).Update("update_at", now)
|
||||
if ret.Error != nil {
|
||||
return ret.Error
|
||||
}
|
||||
@@ -98,14 +114,14 @@ func (s *Set) updateTargets(lst []string, now int64) error {
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
var exists []string
|
||||
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
err := s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
news := slice.SubString(lst, exists)
|
||||
for i := 0; i < len(news); i++ {
|
||||
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
|
||||
err = s.ctx.DB.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
|
||||
if err != nil {
|
||||
logger.Error("failed to insert target:", news[i], "error:", err)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/router"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
)
|
||||
|
||||
type PushgwProvider struct {
|
||||
@@ -31,13 +30,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := storage.New(config.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
idents := idents.New(db)
|
||||
idents := idents.New(ctx)
|
||||
|
||||
stats := memsto.NewSyncStats()
|
||||
|
||||
|
||||
@@ -59,10 +59,12 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", auth, rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
|
||||
} else {
|
||||
// no need basic auth
|
||||
r.POST("/opentsdb/put", rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", rt.targetUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
14
pushgw/router/router_target.go
Normal file
14
pushgw/router/router_target.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
func (rt *Router) targetUpdate(c *gin.Context) {
|
||||
var f idents.TargetUpdate
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
ginx.NewRender(c).Message(rt.IdentSet.UpdateTargets(f.Lst, f.Now))
|
||||
}
|
||||
Reference in New Issue
Block a user