Compare commits

...

9 Commits

Author SHA1 Message Date
Ulric Qin
7094665c25 refactor basic auth configurations: merge HTTP.Pushgw and HTTP.Heartbeat to HTTP.APIForAgent; merge HTTP.Alert and HTTP.Service to HTTP.APIForService 2023-06-01 16:12:50 +08:00
ning
f1a5c2065c change alert.toml.example 2023-06-01 14:35:47 +08:00
Yening Qin
6b9ceda9c1 fix: host filter (#1557)
* fix host filter
2023-06-01 14:16:47 +08:00
ning
7390d42e62 refactor: change Makefile 2023-05-31 14:39:31 +08:00
ning
a35f879dc0 refactor: change event notify log 2023-05-31 14:19:41 +08:00
xtan
3fd4ea4853 feat: embed front-end files into n9e executable (#1556)
* feat: embed front-end files into n9e executable
2023-05-31 10:30:01 +08:00
ning
20f0a9d16d fix: webhook update note 2023-05-26 15:41:16 +08:00
ning
5d4151983a refactor: init alert 2023-05-25 14:42:18 +08:00
Yening Qin
83b5f12474 refactor: n9e-alert and n9e-pushgw sync config by http api (#1545)
* get alert mute by api

* add service api

* fix sync datasource

* change event persist

* add hearbeat

* change pushgw update target

* code refactor

* fix get user members

* refactor get alert rules

* update AlertCurEventGetByRuleIdAndDsId

* refactor get from api

* add role perm list and change get datasource

* refactor: get ops and metrics

* change some logs

* change get datasource
2023-05-23 20:53:04 +08:00
81 changed files with 1117 additions and 337 deletions

2
.gitignore vendored
View File

@@ -42,7 +42,7 @@ _test
/docker/n9e
/docker/mysqldata
/docker/experience_pg_vm/pgdata
/etc.local
/etc.local*
.alerts
.idea

View File

@@ -2,6 +2,7 @@ before:
hooks:
# You may remove this if you don't use go modules.
- go mod tidy
- go install github.com/rakyll/statik
snapshot:
name_template: '{{ .Tag }}'

View File

@@ -1,4 +1,4 @@
.PHONY: start build
.PHONY: prebuild start build
ROOT:=$(shell pwd -P)
GIT_COMMIT:=$(shell git --work-tree ${ROOT} rev-parse 'HEAD^{commit}')
@@ -6,6 +6,11 @@ _GIT_VERSION:=$(shell git --work-tree ${ROOT} describe --tags --abbrev=14 "${GIT
TAG=$(shell echo "${_GIT_VERSION}" | awk -F"-" '{print $$1}')
RELEASE_VERSION:="$(TAG)-$(GIT_COMMIT)"
prebuild:
echo "begin download and embed the front-end file..."
sh fe.sh
echo "front-end file download and embedding completed."
all: build
build:
@@ -17,7 +22,7 @@ build-alert:
build-pushgw:
go build -ldflags "-w -s -X github.com/ccfos/nightingale/v6/pkg/version.Version=$(RELEASE_VERSION)" -o n9e-pushgw ./cmd/pushgw/main.go
build-cli:
build-cli:
go build -ldflags "-w -s -X github.com/ccfos/nightingale/v6/pkg/version.Version=$(RELEASE_VERSION)" -o n9e-cli ./cmd/cli/main.go
run:

View File

@@ -23,7 +23,6 @@ import (
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)
func Initialize(configDir string, cryptoKey string) (func(), error) {
@@ -37,21 +36,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
db, err := storage.New(config.DB)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
redis, err := storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
targetCache := memsto.NewTargetCache(ctx, syncStats, nil)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)
@@ -62,7 +52,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
externalProcessors := process.NewExternalProcessors()
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, false)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
@@ -77,7 +67,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
@@ -85,12 +75,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)
naming := naming.NewNaming(ctx, alertc.Heartbeat)
writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)

View File

@@ -8,6 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/queue"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
@@ -82,78 +83,17 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
}
func (e *Consumer) persist(event *models.AlertCurEvent) {
has, err := models.AlertCurEventExists(e.ctx, "hash=?", event.Hash)
if err != nil {
logger.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
return
}
his := event.ToHis(e.ctx)
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
if has {
// 活跃告警表中有记录,删之
err = models.AlertCurEventDelByHash(e.ctx, event.Hash)
if !e.ctx.IsCenter {
event.DB2FE()
err := poster.PostByUrls(e.ctx, "/v1/n9e/event-persist", event)
if err != nil {
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
return
logger.Errorf("event%+v persist err:%v", event, err)
}
if !event.IsRecovered {
// 恢复事件从活跃告警列表彻底删掉告警事件要重新加进来新的event
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
}
}
return
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return
}
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(e.ctx); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
event.TriggerValue,
)
}
err := models.EventPersist(e.ctx, event)
if err != nil {
logger.Errorf("event%+v persist err:%v", event, err)
}
}

View File

@@ -199,7 +199,7 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
}
// handle event callbacks
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.notifyConfigCache.GetIbex())
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.userCache, e.notifyConfigCache.GetIbex())
// handle global webhooks
sender.SendWebhooks(notifyTarget.ToWebhookList(), event)

View File

@@ -16,7 +16,6 @@ import (
)
type Scheduler struct {
isCenter bool
// key: hash
alertRules map[string]*AlertRuleWorker
@@ -38,11 +37,10 @@ type Scheduler struct {
stats *astats.Stats
}
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
scheduler := &Scheduler{
isCenter: isCenter,
aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker),
@@ -108,7 +106,7 @@ func (s *Scheduler) syncAlertRules() {
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() && s.isCenter {
} else if rule.IsHostRule() && s.ctx.IsCenter {
// all host rule will be processed by center instance
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue

View File

@@ -109,7 +109,7 @@ func (arw *AlertRuleWorker) Eval() {
}
func (arw *AlertRuleWorker) Stop() {
logger.Infof("%s stopped", arw.Key())
logger.Infof("rule_eval %s stopped", arw.Key())
close(arw.quit)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
)
@@ -16,14 +17,12 @@ import (
type Naming struct {
ctx *ctx.Context
heartbeatConfig aconf.HeartbeatConfig
isCenter bool
}
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
naming := &Naming{
ctx: ctx,
heartbeatConfig: heartbeat,
isCenter: isCenter,
}
naming.Heartbeats()
return naming
@@ -45,6 +44,10 @@ func (n *Naming) Heartbeats() error {
}
func (n *Naming) loopDeleteInactiveInstances() {
if !n.ctx.IsCenter {
return
}
interval := time.Duration(10) * time.Minute
for {
time.Sleep(interval)
@@ -112,7 +115,7 @@ func (n *Naming) heartbeat() error {
localss[datasourceIds[i]] = newss
}
if n.isCenter {
if n.ctx.IsCenter {
// 如果是中心节点,还需要处理 host 类型的告警规则host 类型告警规则,和数据源无关,想复用下数据源的 hash ring想用一个虚假的数据源 id 来处理
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, HostDatasource)
@@ -146,6 +149,11 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
return nil, fmt.Errorf("cluster is empty")
}
if !n.ctx.IsCenter {
lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?dsid="+fmt.Sprintf("%d", datasourceId))
return lst, err
}
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}

View File

@@ -118,7 +118,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
logger.Errorf("rule not found %+v", anomalyPoints)
return
}
p.rule = cachedRule
now := time.Now().Unix()
alertingKeys := map[string]struct{}{}
@@ -337,7 +337,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
func (p *Processor) RecoverAlertCurEventFromDb() {
p.pendings = NewAlertCurEventMap(nil)
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(p.ctx, p.rule.Id, p.datasourceId)
curEvents, err := models.AlertCurEventGetByRuleIdAndDsId(p.ctx, p.rule.Id, p.datasourceId)
if err != nil {
logger.Errorf("recover event from db for rule:%s failed, err:%s", p.Key(), err)
p.fires = NewAlertCurEventMap(nil)

View File

@@ -39,15 +39,16 @@ func New(httpConfig httpx.Config, alert aconf.Alert, amc *memsto.AlertMuteCacheT
}
func (rt *Router) Config(r *gin.Engine) {
if !rt.HTTP.Alert.Enable {
if !rt.HTTP.APIForService.Enable {
return
}
service := r.Group("/v1/n9e")
if len(rt.HTTP.Alert.BasicAuth) > 0 {
service.Use(gin.BasicAuth(rt.HTTP.Alert.BasicAuth))
if len(rt.HTTP.APIForService.BasicAuth) > 0 {
service.Use(gin.BasicAuth(rt.HTTP.APIForService.BasicAuth))
}
service.POST("/event", rt.pushEventToQueue)
service.POST("/event-persist", rt.eventPersist)
service.POST("/make-event", rt.makeEvent)
}

View File

@@ -83,6 +83,13 @@ func (rt *Router) pushEventToQueue(c *gin.Context) {
ginx.NewRender(c).Message(nil)
}
func (rt *Router) eventPersist(c *gin.Context) {
var event *models.AlertCurEvent
ginx.BindJSON(c, &event)
event.FE2DB()
ginx.NewRender(c).Message(models.EventPersist(rt.Ctx, event))
}
type eventForm struct {
Alert bool `json:"alert"`
AnomalyPoints []common.AnomalyPoint `json:"vectors"`

View File

@@ -15,7 +15,7 @@ import (
"github.com/toolkits/pkg/logger"
)
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
for _, url := range urls {
if url == "" {
continue
@@ -23,7 +23,7 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
if strings.HasPrefix(url, "${ibex}") {
if !event.IsRecovered {
handleIbex(ctx, url, event, targetCache, ibexConf)
handleIbex(ctx, url, event, targetCache, userCache, ibexConf)
}
continue
}
@@ -34,9 +34,9 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
resp, code, err := poster.PostJSON(url, 5*time.Second, event, 3)
if err != nil {
logger.Errorf("event_callback(rule_id=%d url=%s) fail, resp: %s, err: %v, code: %d", event.RuleId, url, string(resp), err, code)
logger.Errorf("event_callback_fail(rule_id=%d url=%s), resp: %s, err: %v, code: %d", event.RuleId, url, string(resp), err, code)
} else {
logger.Infof("event_callback(rule_id=%d url=%s) succ, resp: %s, code: %d", event.RuleId, url, string(resp), code)
logger.Infof("event_callback_succ(rule_id=%d url=%s), resp: %s, code: %d", event.RuleId, url, string(resp), code)
}
}
}
@@ -60,7 +60,7 @@ type TaskCreateReply struct {
Dat int64 `json:"dat"` // task.id
}
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
arr := strings.Split(url, "/")
var idstr string
@@ -103,7 +103,7 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
// check perm
// tpl.GroupId - host - account 三元组校验权限
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache)
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache, userCache)
if err != nil {
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
return
@@ -176,12 +176,8 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
}
}
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType) (bool, error) {
user, err := models.UserGetByUsername(ctx, username)
if err != nil {
return false, err
}
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
user := userCache.GetByUsername(username)
if user != nil && user.IsAdmin() {
return true, nil
}

View File

@@ -35,7 +35,7 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
if file.IsExist(fpath) {
oldContent, err := file.ToString(fpath)
if err != nil {
logger.Errorf("event_notify: read script file err: %v", err)
logger.Errorf("event_script_notify_fail: read script file err: %v", err)
return
}
@@ -47,13 +47,13 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
if rewrite {
_, err := file.WriteString(fpath, config.Content)
if err != nil {
logger.Errorf("event_notify: write script file err: %v", err)
logger.Errorf("event_script_notify_fail: write script file err: %v", err)
return
}
err = os.Chmod(fpath, 0777)
if err != nil {
logger.Errorf("event_notify: chmod script file err: %v", err)
logger.Errorf("event_script_notify_fail: chmod script file err: %v", err)
return
}
}
@@ -70,7 +70,7 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
err := startCmd(cmd)
if err != nil {
logger.Errorf("event_notify: run cmd err: %v", err)
logger.Errorf("event_script_notify_fail: run cmd err: %v", err)
return
}
@@ -78,20 +78,20 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
if isTimeout {
if err == nil {
logger.Errorf("event_notify: timeout and killed process %s", fpath)
logger.Errorf("event_script_notify_fail: timeout and killed process %s", fpath)
}
if err != nil {
logger.Errorf("event_notify: kill process %s occur error %v", fpath, err)
logger.Errorf("event_script_notify_fail: kill process %s occur error %v", fpath, err)
}
return
}
if err != nil {
logger.Errorf("event_notify: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
logger.Errorf("event_script_notify_fail: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
return
}
logger.Infof("event_notify: exec %s output: %s", fpath, buf.String())
logger.Infof("event_script_notify_ok: exec %s output: %s", fpath, buf.String())
}

View File

@@ -53,7 +53,7 @@ func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent) {
var resp *http.Response
resp, err = client.Do(req)
if err != nil {
logger.Warningf("WebhookCallError, ruleId: [%d], eventId: [%d], url: [%s], error: [%s]", event.RuleId, event.Id, conf.Url, err)
logger.Errorf("event_webhook_fail, ruleId: [%d], eventId: [%d], url: [%s], error: [%s]", event.RuleId, event.Id, conf.Url, err)
continue
}
@@ -63,6 +63,6 @@ func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent) {
body, _ = ioutil.ReadAll(resp.Body)
}
logger.Debugf("alertingWebhook done, url: %s, response code: %d, body: %s", conf.Url, resp.StatusCode, string(body))
logger.Debugf("event_webhook_succ, url: %s, response code: %d, body: %s", conf.Url, resp.StatusCode, string(body))
}
}

View File

@@ -1,18 +1,12 @@
package cconf
import (
"github.com/gin-gonic/gin"
)
type Center struct {
Plugins []Plugin
BasicAuth gin.Accounts
MetricsYamlFile string
OpsYamlFile string
BuiltinIntegrationsDir string
I18NHeaderKey string
MetricDesc MetricDescType
TargetMetrics map[string]string
AnonymousAccess AnonymousAccess
}

View File

@@ -4,7 +4,6 @@ import (
"path"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/runner"
)
// metricDesc , As load map happens before read map, there is no necessary to use concurrent map for metric desc store
@@ -33,10 +32,10 @@ func GetMetricDesc(lang, metric string) string {
return MetricDesc.CommonDesc[metric]
}
func LoadMetricsYaml(metricsYamlFile string) error {
func LoadMetricsYaml(configDir, metricsYamlFile string) error {
fp := metricsYamlFile
if fp == "" {
fp = path.Join(runner.Cwd, "etc", "metrics.yaml")
fp = path.Join(configDir, "metrics.yaml")
}
if !file.IsExist(fp) {
return nil

View File

@@ -4,7 +4,6 @@ import (
"path"
"github.com/toolkits/pkg/file"
"github.com/toolkits/pkg/runner"
)
var Operations = Operation{}
@@ -19,10 +18,10 @@ type Ops struct {
Ops []string `yaml:"ops" json:"ops"`
}
func LoadOpsYaml(opsYamlFile string) error {
func LoadOpsYaml(configDir string, opsYamlFile string) error {
fp := opsYamlFile
if fp == "" {
fp = path.Join(runner.Cwd, "etc", "ops.yaml")
fp = path.Join(configDir, "ops.yaml")
}
if !file.IsExist(fp) {
return nil

View File

@@ -33,8 +33,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, fmt.Errorf("failed to init config: %v", err)
}
cconf.LoadMetricsYaml(config.Center.MetricsYamlFile)
cconf.LoadOpsYaml(config.Center.OpsYamlFile)
cconf.LoadMetricsYaml(configDir, config.Center.MetricsYamlFile)
cconf.LoadOpsYaml(configDir, config.Center.OpsYamlFile)
logxClean, err := logx.Init(config.Log)
if err != nil {
@@ -47,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), db, true)
models.InitRoot(ctx)
redis, err := storage.NewRedis(config.Redis)
@@ -56,7 +56,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
metas := metas.New(redis)
idents := idents.New(db)
idents := idents.New(ctx)
syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
@@ -73,7 +73,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, true)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
writers := writer.NewWriters(config.Pushgw)

View File

@@ -3,8 +3,6 @@ package router
import (
"fmt"
"net/http"
"path"
"runtime"
"strings"
"time"
@@ -12,15 +10,17 @@ import (
"github.com/ccfos/nightingale/v6/center/cstats"
"github.com/ccfos/nightingale/v6/center/metas"
"github.com/ccfos/nightingale/v6/center/sso"
_ "github.com/ccfos/nightingale/v6/front/statik"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/pkg/aop"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/httpx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/storage"
"github.com/toolkits/pkg/runner"
"github.com/gin-gonic/gin"
"github.com/rakyll/statik/fs"
"github.com/toolkits/pkg/logger"
)
type Router struct {
@@ -89,38 +89,32 @@ func languageDetector(i18NHeaderKey string) gin.HandlerFunc {
}
}
func (rt *Router) configNoRoute(r *gin.Engine) {
func (rt *Router) configNoRoute(r *gin.Engine, fs *http.FileSystem) {
r.NoRoute(func(c *gin.Context) {
arr := strings.Split(c.Request.URL.Path, ".")
suffix := arr[len(arr)-1]
switch suffix {
case "png", "jpeg", "jpg", "svg", "ico", "gif", "css", "js", "html", "htm", "gz", "zip", "map":
cwdarr := []string{"/"}
if runtime.GOOS == "windows" {
cwdarr[0] = ""
}
cwdarr = append(cwdarr, strings.Split(runner.Cwd, "/")...)
cwdarr = append(cwdarr, "pub")
cwdarr = append(cwdarr, strings.Split(c.Request.URL.Path, "/")...)
c.File(path.Join(cwdarr...))
c.FileFromFS(c.Request.URL.Path, *fs)
default:
cwdarr := []string{"/"}
if runtime.GOOS == "windows" {
cwdarr[0] = ""
}
cwdarr = append(cwdarr, strings.Split(runner.Cwd, "/")...)
cwdarr = append(cwdarr, "pub")
cwdarr = append(cwdarr, "index.html")
c.File(path.Join(cwdarr...))
c.FileFromFS("/", *fs)
}
})
}
func (rt *Router) Config(r *gin.Engine) {
r.Use(stat())
r.Use(languageDetector(rt.Center.I18NHeaderKey))
r.Use(aop.Recovery())
statikFS, err := fs.New()
if err != nil {
logger.Errorf("cannot create statik fs: %v", err)
}
r.StaticFS("/pub", statikFS)
pagesPrefix := "/api/n9e"
pages := r.Group(pagesPrefix)
{
@@ -148,6 +142,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/auth/callback", rt.loginCallback)
pages.GET("/auth/callback/cas", rt.loginCallbackCas)
pages.GET("/auth/callback/oauth", rt.loginCallbackOAuth)
pages.GET("/auth/perms", rt.allPerms)
pages.GET("/metrics/desc", rt.metricsDescGetFile)
pages.POST("/metrics/desc", rt.metricsDescGetMap)
@@ -303,7 +298,7 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/role/:id/ops", rt.auth(), rt.admin(), rt.operationOfRole)
pages.PUT("/role/:id/ops", rt.auth(), rt.admin(), rt.roleBindOperation)
pages.GET("operation", rt.operations)
pages.GET("/operation", rt.operations)
pages.GET("/notify-tpls", rt.auth(), rt.admin(), rt.notifyTplGets)
pages.PUT("/notify-tpl/content", rt.auth(), rt.admin(), rt.notifyTplUpdateContent)
@@ -329,17 +324,20 @@ func (rt *Router) Config(r *gin.Engine) {
pages.PUT("/notify-config", rt.auth(), rt.admin(), rt.notifyConfigPut)
}
if rt.HTTP.Service.Enable {
if rt.HTTP.APIForService.Enable {
service := r.Group("/v1/n9e")
if len(rt.HTTP.Service.BasicAuth) > 0 {
service.Use(gin.BasicAuth(rt.HTTP.Service.BasicAuth))
if len(rt.HTTP.APIForService.BasicAuth) > 0 {
service.Use(gin.BasicAuth(rt.HTTP.APIForService.BasicAuth))
}
{
service.Any("/prometheus/*url", rt.dsProxy)
service.POST("/users", rt.userAddPost)
service.GET("/users", rt.userFindAll)
service.GET("/targets", rt.targetGets)
service.GET("/user-groups", rt.userGroupGetsByService)
service.GET("/user-group-members", rt.userGroupMemberGetsByService)
service.GET("/targets", rt.targetGetsByService)
service.GET("/targets/tags", rt.targetGetTags)
service.POST("/targets/tags", rt.targetBindTagsByService)
service.DELETE("/targets/tags", rt.targetUnbindTagsByService)
@@ -351,16 +349,29 @@ func (rt *Router) Config(r *gin.Engine) {
service.GET("/alert-rule/:arid", rt.alertRuleGet)
service.GET("/alert-rules", rt.alertRulesGetByService)
service.GET("/alert-subscribes", rt.alertSubscribeGetsByService)
service.GET("/busi-groups", rt.busiGroupGetsByService)
service.GET("/datasources", rt.datasourceGetsByService)
service.GET("/datasource-ids", rt.getDatasourceIds)
service.POST("/server-heartbeat", rt.serverHeartbeat)
service.GET("/servers-active", rt.serversActive)
service.GET("/recording-rules", rt.recordingRuleGetsByService)
service.GET("/alert-mutes", rt.alertMuteGets)
service.POST("/alert-mutes", rt.alertMuteAddByService)
service.DELETE("/alert-mutes", rt.alertMuteDel)
service.GET("/alert-cur-events", rt.alertCurEventsList)
service.GET("/alert-cur-events-get-by-rid", rt.alertCurEventsGetByRid)
service.GET("/alert-his-events", rt.alertHisEventsList)
service.GET("/alert-his-event/:eid", rt.alertHisEventGet)
service.GET("/config/:id", rt.configGet)
service.GET("/configs", rt.configsGet)
service.GET("/config", rt.configGetByKey)
service.PUT("/configs", rt.configsPut)
service.POST("/configs", rt.configsPost)
service.DELETE("/configs", rt.configsDel)
@@ -368,21 +379,26 @@ func (rt *Router) Config(r *gin.Engine) {
service.POST("/conf-prop/encrypt", rt.confPropEncrypt)
service.POST("/conf-prop/decrypt", rt.confPropDecrypt)
service.GET("/datasource-ids", rt.getDatasourceIds)
service.GET("/statistic", rt.statistic)
service.GET("/notify-tpls", rt.notifyTplGets)
service.POST("/task-record-add", rt.taskRecordAdd)
}
}
if rt.HTTP.Heartbeat.Enable {
if rt.HTTP.APIForAgent.Enable {
heartbeat := r.Group("/v1/n9e")
{
if len(rt.HTTP.Heartbeat.BasicAuth) > 0 {
heartbeat.Use(gin.BasicAuth(rt.HTTP.Heartbeat.BasicAuth))
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
heartbeat.Use(gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth))
}
heartbeat.POST("/heartbeat", rt.heartbeat)
}
}
rt.configNoRoute(r)
rt.configNoRoute(r, &statikFS)
}
func Render(c *gin.Context, data, msg interface{}) {

View File

@@ -128,6 +128,13 @@ func (rt *Router) alertCurEventsCardDetails(c *gin.Context) {
ginx.NewRender(c).Data(list, err)
}
// alertCurEventsGetByRid
func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
rid := ginx.QueryInt64(c, "rid")
dsId := ginx.QueryInt64(c, "dsid")
ginx.NewRender(c).Data(models.AlertCurEventGetByRuleIdAndDsId(rt.Ctx, rid, dsId))
}
// 列表方式,拉取活跃告警
func (rt *Router) alertCurEventsList(c *gin.Context) {
stime, etime := getTimeRange(c)

View File

@@ -27,7 +27,12 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
}
func (rt *Router) alertRulesGetByService(c *gin.Context) {
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
prods := []string{}
prodStr := ginx.QueryStr(c, "prods", "")
if prodStr != "" {
prods = strings.Split(ginx.QueryStr(c, "prods", ""), ",")
}
query := ginx.QueryStr(c, "query", "")
algorithm := ginx.QueryStr(c, "algorithm", "")
cluster := ginx.QueryStr(c, "cluster", "")

View File

@@ -110,3 +110,8 @@ func (rt *Router) alertSubscribeDel(c *gin.Context) {
ginx.NewRender(c).Message(models.AlertSubscribeDel(rt.Ctx, f.Ids))
}
func (rt *Router) alertSubscribeGetsByService(c *gin.Context) {
lst, err := models.AlertSubscribeGetsByService(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}

View File

@@ -123,6 +123,11 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) busiGroupGetsByService(c *gin.Context) {
lst, err := models.BusiGroupGetAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
// 这个接口只有在活跃告警页面才调用获取各个BG的活跃告警数量
func (rt *Router) busiGroupAlertingsGets(c *gin.Context) {
ids := ginx.QueryStr(c, "ids", "")

View File

@@ -20,6 +20,11 @@ func (rt *Router) configGet(c *gin.Context) {
ginx.NewRender(c).Data(configs, err)
}
func (rt *Router) configGetByKey(c *gin.Context) {
config, err := models.ConfigsGet(rt.Ctx, ginx.QueryStr(c, "key"))
ginx.NewRender(c).Data(config, err)
}
func (rt *Router) configsDel(c *gin.Context) {
var f idsForm
ginx.BindJSON(c, &f)

View File

@@ -36,6 +36,12 @@ func (rt *Router) datasourceList(c *gin.Context) {
Render(c, list, err)
}
func (rt *Router) datasourceGetsByService(c *gin.Context) {
typ := ginx.QueryStr(c, "typ", "")
lst, err := models.GetDatasourcesGetsBy(rt.Ctx, typ, "", "", "")
ginx.NewRender(c).Data(lst, err)
}
type datasourceBrief struct {
Id int64 `json:"id"`
Name string `json:"name"`

View File

@@ -17,6 +17,41 @@ import (
const defaultLimit = 300
func (rt *Router) statistic(c *gin.Context) {
name := ginx.QueryStr(c, "name")
var model interface{}
var err error
var statistics *models.Statistics
switch name {
case "alert_mute":
model = models.AlertMute{}
case "alert_rule":
model = models.AlertRule{}
case "alert_subscribe":
model = models.AlertSubscribe{}
case "busi_group":
model = models.BusiGroup{}
case "recording_rule":
model = models.RecordingRule{}
case "target":
model = models.Target{}
case "user":
model = models.User{}
case "user_group":
model = models.UserGroup{}
case "datasource":
// datasource update_at is different from others
statistics, err = models.DatasourceStatistics(rt.Ctx)
ginx.NewRender(c).Data(statistics, err)
return
default:
ginx.Bomb(http.StatusBadRequest, "invalid name")
}
statistics, err = models.StatisticsGet(rt.Ctx, model)
ginx.NewRender(c).Data(statistics, err)
}
func queryDatasourceIds(c *gin.Context) []int64 {
datasourceIds := ginx.QueryStr(c, "datasource_ids", "")
datasourceIds = strings.ReplaceAll(datasourceIds, ",", " ")

View File

@@ -21,7 +21,7 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
func (rt *Router) alertMuteGets(c *gin.Context) {
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
bgid := ginx.QueryInt64(c, "bgid", 0)
bgid := ginx.QueryInt64(c, "bgid", -1)
query := ginx.QueryStr(c, "query", "")
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, query)

View File

@@ -30,9 +30,12 @@ func (rt *Router) webhookPuts(c *gin.Context) {
var webhooks []models.Webhook
ginx.BindJSON(c, &webhooks)
for i := 0; i < len(webhooks); i++ {
for k, v := range webhooks[i].HeaderMap {
webhooks[i].Headers = append(webhooks[i].Headers, k)
webhooks[i].Headers = append(webhooks[i].Headers, v)
webhooks[i].Headers = []string{}
if len(webhooks[i].HeaderMap) > 0 {
for k, v := range webhooks[i].HeaderMap {
webhooks[i].Headers = append(webhooks[i].Headers, k)
webhooks[i].Headers = append(webhooks[i].Headers, v)
}
}
}

View File

@@ -19,6 +19,11 @@ func (rt *Router) recordingRuleGets(c *gin.Context) {
ginx.NewRender(c).Data(ars, err)
}
func (rt *Router) recordingRuleGetsByService(c *gin.Context) {
ars, err := models.RecordingRuleEnabledGets(rt.Ctx)
ginx.NewRender(c).Data(ars, err)
}
func (rt *Router) recordingRuleGet(c *gin.Context) {
rrid := ginx.UrlParamInt64(c, "rrid")

View File

@@ -83,3 +83,18 @@ func (rt *Router) roleGets(c *gin.Context) {
lst, err := models.RoleGetsAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) allPerms(c *gin.Context) {
roles, err := models.RoleGetsAll(rt.Ctx)
ginx.Dangerous(err)
m := make(map[string][]string)
for _, r := range roles {
lst, err := models.OperationsOfRole(rt.Ctx, strings.Fields(r.Name))
if err != nil {
continue
}
m[r.Name] = lst
}
ginx.NewRender(c).Data(m, err)
}

View File

@@ -1,6 +1,8 @@
package router
import (
"time"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
@@ -16,3 +18,17 @@ func (rt *Router) serverClustersGet(c *gin.Context) {
list, err := models.AlertingEngineGetsClusters(rt.Ctx, "")
ginx.NewRender(c).Data(list, err)
}
func (rt *Router) serverHeartbeat(c *gin.Context) {
var req models.HeartbeatInfo
ginx.BindJSON(c, &req)
err := models.AlertingEngineHeartbeatWithCluster(rt.Ctx, req.Instance, req.EngineCluster, req.DatasourceId)
ginx.NewRender(c).Message(err)
}
func (rt *Router) serversActive(c *gin.Context) {
datasourceId := ginx.QueryInt64(c, "dsid")
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
ginx.NewRender(c).Data(servers, err)
}

View File

@@ -101,6 +101,11 @@ func (rt *Router) targetGets(c *gin.Context) {
}, nil)
}
func (rt *Router) targetGetsByService(c *gin.Context) {
lst, err := models.TargetGetsAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) targetGetTags(c *gin.Context) {
idents := ginx.QueryStr(c, "idents", "")
idents = strings.ReplaceAll(idents, ",", " ")

View File

@@ -120,6 +120,12 @@ func (f *taskForm) HandleFH(fh string) {
f.Title = f.Title + " FH: " + fh
}
func (rt *Router) taskRecordAdd(c *gin.Context) {
var f *models.TaskRecord
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(f.Add(rt.Ctx))
}
func (rt *Router) taskAdd(c *gin.Context) {
var f taskForm
ginx.BindJSON(c, &f)

View File

@@ -12,19 +12,8 @@ import (
)
func (rt *Router) userFindAll(c *gin.Context) {
limit := ginx.QueryInt(c, "limit", 20)
query := ginx.QueryStr(c, "query", "")
total, err := models.UserTotal(rt.Ctx, query)
ginx.Dangerous(err)
list, err := models.UserGets(rt.Ctx, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
ginx.NewRender(c).Data(gin.H{
"list": list,
"total": total,
}, nil)
list, err := models.UserGetAll(rt.Ctx)
ginx.NewRender(c).Data(list, err)
}
func (rt *Router) userGets(c *gin.Context) {

View File

@@ -29,6 +29,17 @@ func (rt *Router) userGroupGets(c *gin.Context) {
ginx.NewRender(c).Data(lst, err)
}
func (rt *Router) userGroupGetsByService(c *gin.Context) {
lst, err := models.UserGroupGetAll(rt.Ctx)
ginx.NewRender(c).Data(lst, err)
}
// user group member get by service
func (rt *Router) userGroupMemberGetsByService(c *gin.Context) {
members, err := models.UserGroupMemberGetAll(rt.Ctx)
ginx.NewRender(c).Data(members, err)
}
type userGroupForm struct {
Name string `json:"name" binding:"required"`
Note string `json:"note"`

View File

@@ -18,7 +18,7 @@ func Upgrade(configFile string) error {
return err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), db, false)
for _, cluster := range config.Clusters {
count, err := models.GetDatasourcesCountBy(ctx, "", "", cluster.Name)
if err != nil {

View File

@@ -14,20 +14,28 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pushgw/pconf"
"github.com/ccfos/nightingale/v6/storage"
"github.com/gin-gonic/gin"
)
type ConfigType struct {
Global GlobalConfig
Log logx.Config
HTTP httpx.Config
DB ormx.DBConfig
Redis storage.RedisConfig
Global GlobalConfig
Log logx.Config
HTTP httpx.Config
DB ormx.DBConfig
Redis storage.RedisConfig
CenterApi CenterApi
Pushgw pconf.Pushgw
Alert aconf.Alert
Center cconf.Center
}
type CenterApi struct {
Addrs []string
BasicAuth gin.Accounts
}
type GlobalConfig struct {
RunMode string
}

View File

@@ -14,39 +14,22 @@ func decryptConfig(config *ConfigType, cryptoKey string) error {
config.DB.DSN = decryptDsn
for k := range config.HTTP.Alert.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Alert.BasicAuth[k], cryptoKey)
for k := range config.HTTP.APIForService.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.APIForService.BasicAuth[k], cryptoKey)
if err != nil {
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
}
config.HTTP.Alert.BasicAuth[k] = decryptPwd
config.HTTP.APIForService.BasicAuth[k] = decryptPwd
}
for k := range config.HTTP.Pushgw.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Pushgw.BasicAuth[k], cryptoKey)
for k := range config.HTTP.APIForAgent.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.APIForAgent.BasicAuth[k], cryptoKey)
if err != nil {
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
}
config.HTTP.Pushgw.BasicAuth[k] = decryptPwd
}
for k := range config.HTTP.Heartbeat.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Heartbeat.BasicAuth[k], cryptoKey)
if err != nil {
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
}
config.HTTP.Heartbeat.BasicAuth[k] = decryptPwd
}
for k := range config.HTTP.Service.BasicAuth {
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Service.BasicAuth[k], cryptoKey)
if err != nil {
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
}
config.HTTP.Service.BasicAuth[k] = decryptPwd
config.HTTP.APIForAgent.BasicAuth[k] = decryptPwd
}
for i, v := range config.Pushgw.Writers {

View File

@@ -4,8 +4,7 @@ FROM python:3-slim
WORKDIR /app
ADD n9e /app
ADD http://download.flashcat.cloud/wait /wait
RUN mkdir -p /app/pub && chmod +x /wait
ADD pub /app/pub/
RUN chmod +x /wait
RUN chmod +x n9e
EXPOSE 17000

View File

@@ -7,7 +7,6 @@ ADD etc /app/
ADD integrations /app/integrations/
ADD --chmod=755 https://github.com/ufoscout/docker-compose-wait/releases/download/2.11.0/wait_x86_64 /wait
RUN chmod +x /wait
ADD pub /app/pub/
EXPOSE 17000

View File

@@ -6,7 +6,6 @@ WORKDIR /app
ADD n9e /app/
ADD etc /app/
ADD integrations /app/integrations/
ADD pub /app/pub/
COPY --chmod=755 --from=toolbox /toolbox/wait_aarch64 /wait
EXPOSE 17000

View File

@@ -10,7 +10,6 @@ echo "tag: ${tag}"
rm -rf n9e pub
cp ../n9e .
cp -r ../pub .
docker build -t nightingale:${tag} .

60
etc/alert.toml.example Normal file
View File

@@ -0,0 +1,60 @@
[Global]
RunMode = "release"
[CenterApi]
Addrs = ["http://127.0.0.1:17000"]
[CenterApi.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[Alert]
[Alert.Heartbeat]
# auto detect if blank
IP = ""
# unit ms
Interval = 1000
EngineName = "default02"
[Log]
# log write dir
Dir = "logs"
# log level: DEBUG INFO WARNING ERROR
Level = "DEBUG"
# stdout, stderr, file
Output = "stdout"
# # rotate by time
# KeepHours = 4
# # rotate by size
# RotateNum = 3
# # unit: MB
# RotateSize = 256
[HTTP]
# http listening address
Host = "0.0.0.0"
# http listening port
Port = 17001
# https cert file path
CertFile = ""
# https key file path
KeyFile = ""
# whether print access log
PrintAccessLog = false
# whether enable pprof
PProf = false
# expose prometheus /metrics?
ExposeMetrics = true
# http graceful shutdown timeout, unit: s
ShutdownTimeout = 30
# max content length: 64M
MaxContentLength = 67108864
# http server read timeout, unit: s
ReadTimeout = 20
# http server write timeout, unit: s
WriteTimeout = 40
# http server idle timeout, unit: s
IdleTimeout = 120
[HTTP.Alert]
Enable = true
[HTTP.Alert.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"

View File

@@ -41,22 +41,12 @@ WriteTimeout = 40
# http server idle timeout, unit: s
IdleTimeout = 120
[HTTP.Pushgw]
[HTTP.APIForAgent]
Enable = true
# [HTTP.Pushgw.BasicAuth]
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[HTTP.Alert]
Enable = true
[HTTP.Alert.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[HTTP.Heartbeat]
Enable = true
# [HTTP.Heartbeat.BasicAuth]
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[HTTP.Service]
[HTTP.APIForService]
Enable = true
[HTTP.Service.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"

103
etc/pushgw.toml.example Normal file
View File

@@ -0,0 +1,103 @@
[Global]
RunMode = "release"
[CenterApi]
Addrs = ["http://127.0.0.1:17000"]
[CenterApi.BasicAuth]
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
[Pushgw]
# use target labels in database instead of in series
LabelRewrite = true
# # default busigroup key name
# BusiGroupLabelKey = "busigroup"
# ForceUseServerTS = false
# [Pushgw.DebugSample]
# ident = "xx"
# __name__ = "xx"
# [Pushgw.WriterOpt]
# # Writer Options
# QueueCount = 1000
# QueueMaxSize = 1000000
# QueuePopSize = 1000
# # ident or metric
# ShardingKey = "ident"
[[Pushgw.Writers]]
# Url = "http://127.0.0.1:8480/insert/0/prometheus/api/v1/write"
Url = "http://127.0.0.1:9090/api/v1/write"
# Basic auth username
BasicAuthUser = ""
# Basic auth password
BasicAuthPass = ""
# timeout settings, unit: ms
Headers = ["X-From", "n9e"]
Timeout = 10000
DialTimeout = 3000
TLSHandshakeTimeout = 30000
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000
# time duration, unit: ms
KeepAlive = 30000
MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 100
## Optional TLS Config
# UseTLS = false
# TLSCA = "/etc/n9e/ca.pem"
# TLSCert = "/etc/n9e/cert.pem"
# TLSKey = "/etc/n9e/key.pem"
# InsecureSkipVerify = false
# [[Writers.WriteRelabels]]
# Action = "replace"
# SourceLabels = ["__address__"]
# Regex = "([^:]+)(?::\\d+)?"
# Replacement = "$1:80"
# TargetLabel = "__address__"
[Log]
# log write dir
Dir = "logs"
# log level: DEBUG INFO WARNING ERROR
Level = "DEBUG"
# stdout, stderr, file
Output = "stdout"
# # rotate by time
# KeepHours = 4
# # rotate by size
# RotateNum = 3
# # unit: MB
# RotateSize = 256
[HTTP]
# http listening address
Host = "0.0.0.0"
# http listening port
Port = 17000
# https cert file path
CertFile = ""
# https key file path
KeyFile = ""
# whether print access log
PrintAccessLog = false
# whether enable pprof
PProf = false
# expose prometheus /metrics?
ExposeMetrics = true
# http graceful shutdown timeout, unit: s
ShutdownTimeout = 30
# max content length: 64M
MaxContentLength = 67108864
# http server read timeout, unit: s
ReadTimeout = 20
# http server write timeout, unit: s
WriteTimeout = 40
# http server idle timeout, unit: s
IdleTimeout = 120
[HTTP.Pushgw]
Enable = true
# [HTTP.Pushgw.BasicAuth]
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"

7
fe.sh
View File

@@ -8,3 +8,10 @@ curl -o n9e-fe-${VERSION}.tar.gz -L https://github.com/n9e/fe/releases/download/
tar zxvf n9e-fe-${VERSION}.tar.gz
cp ./docker/initsql/a-n9e.sql n9e.sql
# Embed files into a Go executable
statik -src=./pub -dest=./front
# rm the fe file
rm n9e-fe-${VERSION}.tar.gz
rm -r ./pub

14
front/statik/statik.go Normal file
View File

@@ -0,0 +1,14 @@
// Code generated by statik. DO NOT EDIT.
package statik
import (
"github.com/rakyll/statik/fs"
)
func init() {
data := "PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
fs.Register(data)
}

1
go.mod
View File

@@ -23,6 +23,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.39.0
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/rakyll/statik v0.1.7
github.com/redis/go-redis/v9 v9.0.2
github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.3

2
go.sum
View File

@@ -232,6 +232,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62/go.mod h1:65XQgovT59RWatovFwnwocoUxiI/eENTnOY5GK3STuY=

View File

@@ -144,14 +144,17 @@ func (tc *TargetCacheType) syncTargets() error {
return errors.WithMessage(err, "failed to call TargetGetsAll")
}
metaMap := tc.GetHostMetas(lst)
m := make(map[string]*models.Target)
for i := 0; i < len(lst); i++ {
lst[i].FillTagsMap()
if meta, ok := metaMap[lst[i].Ident]; ok {
lst[i].FillMeta(meta)
if tc.ctx.IsCenter {
metaMap := tc.GetHostMetas(lst)
for i := 0; i < len(lst); i++ {
if meta, ok := metaMap[lst[i].Ident]; ok {
lst[i].FillMeta(meta)
}
}
}
for i := 0; i < len(lst); i++ {
m[lst[i].Ident] = lst[i]
}

View File

@@ -58,6 +58,17 @@ func (uc *UserCacheType) GetByUserId(id int64) *models.User {
return uc.users[id]
}
func (uc *UserCacheType) GetByUsername(name string) *models.User {
uc.RLock()
defer uc.RUnlock()
for _, v := range uc.users {
if v.Username == name {
return v
}
}
return nil
}
func (uc *UserCacheType) GetByUserIds(ids []int64) []*models.User {
set := make(map[int64]struct{})

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/toolkits/pkg/logger"
)
@@ -220,7 +221,7 @@ func (e *AlertCurEvent) ToHis(ctx *ctx.Context) *AlertHisEvent {
}
}
func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
func (e *AlertCurEvent) DB2FE() {
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
e.CallbacksJSON = strings.Fields(e.Callbacks)
@@ -229,6 +230,18 @@ func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
json.Unmarshal([]byte(e.RuleConfig), &e.RuleConfigJson)
}
func (e *AlertCurEvent) FE2DB() {
e.NotifyChannels = strings.Join(e.NotifyChannelsJSON, " ")
e.NotifyGroups = strings.Join(e.NotifyGroupsJSON, " ")
e.Callbacks = strings.Join(e.CallbacksJSON, " ")
e.Tags = strings.Join(e.TagsJSON, ",,")
b, _ := json.Marshal(e.AnnotationsJSON)
e.Annotations = string(b)
b, _ = json.Marshal(e.RuleConfigJson)
e.RuleConfig = string(b)
}
func (e *AlertCurEvent) DB2Mem() {
e.IsRecovered = false
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
@@ -356,7 +369,7 @@ func AlertCurEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
@@ -390,7 +403,7 @@ func AlertCurEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
return nil, nil
}
lst[0].DB2FE(ctx)
lst[0].DB2FE()
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
return lst[0], nil
@@ -435,14 +448,19 @@ func AlertCurEventGetByIds(ctx *ctx.Context, ids []int64) ([]*AlertCurEvent, err
err := DB(ctx).Where("id in ?", ids).Order("id desc").Find(&lst).Error
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
return lst, err
}
func AlertCurEventGetByRuleIdAndCluster(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
func AlertCurEventGetByRuleIdAndDsId(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertCurEvent](ctx, "/v1/n9e/alert-cur-events-get-by-rid?rid="+strconv.FormatInt(ruleId, 10)+"&dsid="+strconv.FormatInt(datasourceId, 10))
return lst, err
}
var lst []*AlertCurEvent
err := DB(ctx).Where("rule_id=? and datasource_id = ?", ruleId, datasourceId).Find(&lst).Error
return lst, err

View File

@@ -2,6 +2,7 @@ package models
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
@@ -61,7 +62,7 @@ func (e *AlertHisEvent) Add(ctx *ctx.Context) error {
return Insert(ctx, e)
}
func (e *AlertHisEvent) DB2FE(ctx *ctx.Context) {
func (e *AlertHisEvent) DB2FE() {
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
e.CallbacksJSON = strings.Fields(e.Callbacks)
@@ -182,7 +183,7 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
if err == nil {
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
lst[i].DB2FE()
}
}
@@ -200,7 +201,7 @@ func AlertHisEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
return nil, nil
}
lst[0].DB2FE(ctx)
lst[0].DB2FE()
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
return lst[0], nil
@@ -259,3 +260,53 @@ func AlertHisEventUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error
}
return nil
}
func EventPersist(ctx *ctx.Context, event *AlertCurEvent) error {
has, err := AlertCurEventExists(ctx, "hash=?", event.Hash)
if err != nil {
return fmt.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
}
his := event.ToHis(ctx)
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(ctx); err != nil {
return fmt.Errorf("add his event error:%v", err)
}
if has {
// 活跃告警表中有记录,删之
err = AlertCurEventDelByHash(ctx, event.Hash)
if err != nil {
return fmt.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
}
if !event.IsRecovered {
// 恢复事件从活跃告警列表彻底删掉告警事件要重新加进来新的event
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(ctx); err != nil {
return fmt.Errorf("add cur event err:%v", err)
}
}
}
return nil
}
if event.IsRecovered {
// alert_cur_event表里没有数据表示之前没告警结果现在报了恢复神奇....理论上不应该出现的
return nil
}
// use his id as cur id
event.Id = his.Id
if event.Id > 0 {
if err := event.Add(ctx); err != nil {
return fmt.Errorf("add cur event error:%v", err)
}
}
return nil
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
"github.com/pkg/errors"
@@ -78,7 +79,15 @@ func AlertMuteGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertMu
}
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (lst []AlertMute, err error) {
session := DB(ctx).Where("group_id = ? and prod in (?)", bgid, prods)
session := DB(ctx)
if bgid != -1 {
session = session.Where("group_id = ?", bgid)
}
if len(prods) > 0 {
session = session.Where("prod in (?)", prods)
}
if query != "" {
arr := strings.Fields(query)
@@ -220,6 +229,12 @@ func AlertMuteDel(ctx *ctx.Context, ids []int64) error {
}
func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
var stats []*Statistics
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_mute")
return s, err
}
// clean expired first
buf := int64(30)
err := DB(ctx).Where("etime < ? and mute_time_type = 0", time.Now().Unix()-buf).Delete(new(AlertMute)).Error
@@ -229,7 +244,6 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
session := DB(ctx).Model(&AlertMute{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics
err = session.Find(&stats).Error
if err != nil {
return nil, err
@@ -240,9 +254,20 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
// get my cluster's mutes
var lst []*AlertMute
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Model(&AlertMute{})
var lst []*AlertMute
err := session.Find(&lst).Error
if err != nil {
return nil, err

View File

@@ -8,6 +8,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
@@ -113,16 +114,17 @@ type Trigger struct {
Severity int `json:"severity"`
}
func GetHostsQuery(queries []HostQuery) map[string]interface{} {
var query = make(map[string]interface{})
func GetHostsQuery(queries []HostQuery) []map[string]interface{} {
var query []map[string]interface{}
for _, q := range queries {
m := make(map[string]interface{})
switch q.Key {
case "group_ids":
ids := ParseInt64(q.Values)
if q.Op == "==" {
query["group_id in (?)"] = ids
m["group_id in (?)"] = ids
} else {
query["group_id not in (?)"] = ids
m["group_id not in (?)"] = ids
}
case "tags":
lst := []string{}
@@ -133,12 +135,16 @@ func GetHostsQuery(queries []HostQuery) map[string]interface{} {
lst = append(lst, v.(string))
}
if q.Op == "==" {
blank := " "
for _, tag := range lst {
query["tags like ?"] = "%" + tag + "%"
m["tags like ?"+blank] = "%" + tag + "%"
blank += " "
}
} else {
blank := " "
for _, tag := range lst {
query["tags not like ?"] = "%" + tag + "%"
m["tags not like ?"+blank] = "%" + tag + "%"
blank += " "
}
}
case "hosts":
@@ -150,11 +156,12 @@ func GetHostsQuery(queries []HostQuery) map[string]interface{} {
lst = append(lst, v.(string))
}
if q.Op == "==" {
query["ident in (?)"] = lst
m["ident in (?)"] = lst
} else {
query["ident not in (?)"] = lst
m["ident not in (?)"] = lst
}
}
query = append(query, m)
}
return query
}
@@ -643,6 +650,17 @@ func AlertRuleGets(ctx *ctx.Context, groupId int64) ([]AlertRule, error) {
}
func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertRule](ctx, "/v1/n9e/alert-rules?disabled=0")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Where("disabled = ?", 0)
var lst []*AlertRule
@@ -662,7 +680,11 @@ func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
}
func AlertRulesGetsBy(ctx *ctx.Context, prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) {
session := DB(ctx).Where("prod in (?)", prods)
session := DB(ctx)
if len(prods) > 0 {
session = session.Where("prod in (?)", prods)
}
if query != "" {
arr := strings.Fields(query)
@@ -734,6 +756,11 @@ func AlertRuleGetName(ctx *ctx.Context, id int64) (string, error) {
}
func AlertRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_rule")
return s, err
}
session := DB(ctx).Model(&AlertRule{}).Select("count(*) as total", "max(update_at) as last_updated").Where("disabled = ?", 0)
var stats []*Statistics

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
)
@@ -52,6 +53,18 @@ func AlertSubscribeGets(ctx *ctx.Context, groupId int64) (lst []AlertSubscribe,
return
}
func AlertSubscribeGetsByService(ctx *ctx.Context) (lst []AlertSubscribe, err error) {
err = DB(ctx).Find(&lst).Error
if err != nil {
return
}
for i := range lst {
lst[i].DB2FE()
}
return
}
func AlertSubscribeGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertSubscribe, error) {
var lst []*AlertSubscribe
err := DB(ctx).Where(where, args...).Find(&lst).Error
@@ -262,6 +275,11 @@ func AlertSubscribeDel(ctx *ctx.Context, ids []int64) error {
}
func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_subscribe")
return s, err
}
session := DB(ctx).Model(&AlertSubscribe{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics
@@ -274,6 +292,17 @@ func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
}
func AlertSubscribeGetsAll(ctx *ctx.Context) ([]*AlertSubscribe, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*AlertSubscribe](ctx, "/v1/n9e/alert-subscribes")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
// get my cluster's subscribes
session := DB(ctx).Model(&AlertSubscribe{})

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type AlertingEngines struct {
@@ -128,7 +129,23 @@ func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interfa
return arr, err
}
type HeartbeatInfo struct {
Instance string `json:"instance"`
EngineCluster string `json:"engine_cluster"`
DatasourceId int64 `json:"datasource_id"`
}
func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
if !ctx.IsCenter {
info := HeartbeatInfo{
Instance: instance,
EngineCluster: cluster,
DatasourceId: datasourceId,
}
err := poster.PostByUrls(ctx, "/v1/n9e/server-heartbeat", info)
return err
}
var total int64
err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
if err != nil {

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"gorm.io/gorm"
@@ -64,9 +65,17 @@ func (bg *BusiGroup) FillUserGroups(ctx *ctx.Context) error {
func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
var err error
if !ctx.IsCenter {
lst, err = poster.GetByUrls[[]*BusiGroup](ctx, "/v1/n9e/busi-groups")
if err != nil {
return nil, err
}
} else {
err = DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
}
}
ret := make(map[int64]*BusiGroup)
@@ -77,6 +86,12 @@ func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
return ret, nil
}
func BusiGroupGetAll(ctx *ctx.Context) ([]*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Find(&lst).Error
return lst, err
}
func BusiGroupGet(ctx *ctx.Context, where string, args ...interface{}) (*BusiGroup, error) {
var lst []*BusiGroup
err := DB(ctx).Where(where, args...).Find(&lst).Error
@@ -324,6 +339,11 @@ func BusiGroupAdd(ctx *ctx.Context, name string, labelEnable int, labelValue str
}
func BusiGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=busi_group")
return s, err
}
session := DB(ctx).Model(&BusiGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -46,6 +46,18 @@ type Statistics struct {
LastUpdated int64 `gorm:"last_updated"`
}
func StatisticsGet[T any](ctx *ctx.Context, model T) (*Statistics, error) {
var stats []*Statistics
session := DB(ctx).Model(model).Select("count(*) as total", "max(update_at) as last_updated")
err := session.Find(&stats).Error
if err != nil {
return nil, err
}
return stats[0], nil
}
func MatchDatasource(ids []int64, id int64) bool {
if id == DatasourceIdAll {
return true

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/runner"
@@ -43,6 +44,13 @@ func InitSalt(ctx *ctx.Context) {
}
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) {
if !ctx.IsCenter {
if !ctx.IsCenter {
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
return s, err
}
}
var lst []string
err := DB(ctx).Model(&Configs{}).Where("ckey=?", ckey).Pluck("cval", &lst).Error
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
@@ -112,6 +113,17 @@ func (ds *Datasource) Get(ctx *ctx.Context) error {
}
func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]Datasource](ctx, "/v1/n9e/datasources")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, nil
}
var dss []Datasource
err := DB(ctx).Find(&dss).Error
@@ -123,6 +135,11 @@ func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
}
func GetDatasourceIdsByEngineName(ctx *ctx.Context, engineName string) ([]int64, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]int64](ctx, "/v1/n9e/datasource-ids?name="+engineName)
return lst, err
}
var dss []Datasource
var ids []int64
err := DB(ctx).Where("cluster_name = ?", engineName).Find(&dss).Error
@@ -267,19 +284,32 @@ func (ds *Datasource) DB2FE() error {
func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
var lst []*Datasource
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
var err error
if !ctx.IsCenter {
lst, err = poster.GetByUrls[[]*Datasource](ctx, "/v1/n9e/datasources")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
} else {
err := DB(ctx).Find(&lst).Error
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
err := lst[i].DB2FE()
if err != nil {
logger.Warningf("get ds:%+v err:%v", lst[i], err)
continue
}
}
}
ret := make(map[int64]*Datasource)
for i := 0; i < len(lst); i++ {
err := lst[i].DB2FE()
if err != nil {
logger.Warningf("get ds:%+v err:%v", lst[i], err)
continue
}
ret[lst[i].Id] = lst[i]
}
@@ -287,6 +317,11 @@ func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
}
func DatasourceStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=datasource")
return s, err
}
session := DB(ctx).Model(&Datasource{}).Select("count(*) as total", "max(updated_at) as last_updated")
var stats []*Statistics

View File

@@ -16,6 +16,7 @@ type Webhook struct {
HeaderMap map[string]string `json:"headers"`
Headers []string `json:"headers_str"`
SkipVerify bool `json:"skip_verify"`
Note string `json:"note"`
}
type NotifyScript struct {

View File

@@ -8,6 +8,7 @@ import (
"strings"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/pkg/errors"
@@ -59,6 +60,11 @@ func NotifyTplCountByChannel(c *ctx.Context, channel string) (int64, error) {
}
func NotifyTplGets(c *ctx.Context) ([]*NotifyTpl, error) {
if !c.IsCenter {
lst, err := poster.GetByUrls[[]*NotifyTpl](c, "/v1/n9e/notify-tpls")
return lst, err
}
var lst []*NotifyTpl
err := DB(c).Find(&lst).Error
return lst, err
@@ -88,6 +94,10 @@ func ListTpls(c *ctx.Context) (map[string]*template.Template, error) {
}
func InitNotifyConfig(c *ctx.Context, tplDir string) {
if !c.IsCenter {
return
}
// init notify channel
cval, err := ConfigsGet(c, NOTIFYCHANNEL)
if err != nil {

View File

@@ -7,6 +7,8 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
@@ -197,7 +199,33 @@ func RecordingRuleGetById(ctx *ctx.Context, id int64) (*RecordingRule, error) {
return RecordingRuleGet(ctx, "id=?", id)
}
func RecordingRuleEnabledGets(ctx *ctx.Context) ([]*RecordingRule, error) {
session := DB(ctx)
var lst []*RecordingRule
err := session.Where("disabled = ?", 0).Find(&lst).Error
if err != nil {
return lst, err
}
for i := 0; i < len(lst); i++ {
lst[i].DB2FE(ctx)
}
return lst, nil
}
func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*RecordingRule](ctx, "/v1/n9e/recording-rules")
if err != nil {
return nil, err
}
for i := 0; i < len(lst); i++ {
lst[i].FE2DB()
}
return lst, err
}
session := DB(ctx).Where("disabled = ?", 0)
var lst []*RecordingRule
@@ -217,6 +245,11 @@ func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
}
func RecordingRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=recording_rule")
return s, err
}
session := DB(ctx).Model(&RecordingRule{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"gorm.io/gorm"
@@ -19,7 +20,7 @@ type Target struct {
Note string `json:"note"`
Tags string `json:"-"`
TagsJSON []string `json:"tags" gorm:"-"`
TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series
TagsMap map[string]string `json:"tags_maps" gorm:"-"` // internal use, append tags to series
UpdateAt int64 `json:"update_at"`
UnixTime int64 `json:"unixtime" gorm:"-"`
@@ -59,6 +60,11 @@ func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
}
func TargetStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=target")
return s, err
}
var stats []*Statistics
err := DB(ctx).Model(&Target{}).Select("count(*) as total", "max(update_at) as last_updated").Find(&stats).Error
if err != nil {
@@ -117,7 +123,7 @@ func TargetGets(ctx *ctx.Context, bgid int64, dsIds []int64, query string, limit
}
// 根据 groupids, tags, hosts 查询 targets
func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, offset int) ([]*Target, error) {
func TargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) ([]*Target, error) {
var lst []*Target
session := TargetFilterQueryBuild(ctx, query, limit, offset)
err := session.Order("ident").Find(&lst).Error
@@ -130,12 +136,12 @@ func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, o
return lst, err
}
func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}) (int64, error) {
func TargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}) (int64, error) {
session := TargetFilterQueryBuild(ctx, query, 0, 0)
return Count(session)
}
func MissTargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, ts int64) ([]*Target, error) {
func MissTargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) ([]*Target, error) {
var lst []*Target
session := TargetFilterQueryBuild(ctx, query, 0, 0)
session = session.Where("update_at < ?", ts)
@@ -144,16 +150,20 @@ func MissTargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, ts i
return lst, err
}
func MissTargetCountByFilter(ctx *ctx.Context, query map[string]interface{}, ts int64) (int64, error) {
func MissTargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) (int64, error) {
session := TargetFilterQueryBuild(ctx, query, 0, 0)
session = session.Where("update_at < ?", ts)
return Count(session)
}
func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limit, offset int) *gorm.DB {
func TargetFilterQueryBuild(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) *gorm.DB {
session := DB(ctx).Model(&Target{})
for k, v := range query {
session = session.Where(k, v)
for _, q := range query {
tx := DB(ctx).Model(&Target{})
for k, v := range q {
tx = tx.Or(k, v)
}
session = session.Where(tx)
}
if limit > 0 {
@@ -164,8 +174,16 @@ func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limi
}
func TargetGetsAll(ctx *ctx.Context) ([]*Target, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*Target](ctx, "/v1/n9e/targets")
return lst, err
}
var lst []*Target
err := DB(ctx).Model(&Target{}).Find(&lst).Error
for i := 0; i < len(lst); i++ {
lst[i].FillTagsMap()
}
return lst, err
}

View File

@@ -1,6 +1,9 @@
package models
import "github.com/ccfos/nightingale/v6/pkg/ctx"
import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type TaskRecord struct {
Id int64 `json:"id" gorm:"primaryKey"`
@@ -27,6 +30,11 @@ func (r *TaskRecord) TableName() string {
// create task
func (r *TaskRecord) Add(ctx *ctx.Context) error {
if !ctx.IsCenter {
err := poster.PostByUrls(ctx, "/v1/n9e/task-record-add", r)
return err
}
return Insert(ctx, r)
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/ldapx"
"github.com/ccfos/nightingale/v6/pkg/ormx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
@@ -347,12 +348,18 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int) ([]User, error)
for i := 0; i < len(users); i++ {
users[i].RolesLst = strings.Fields(users[i].Roles)
users[i].Admin = users[i].IsAdmin()
users[i].Password = ""
}
return users, nil
}
func UserGetAll(ctx *ctx.Context) ([]*User, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*User](ctx, "/v1/n9e/users")
return lst, err
}
var lst []*User
err := DB(ctx).Find(&lst).Error
if err == nil {
@@ -429,6 +436,11 @@ func (u *User) CheckPerm(ctx *ctx.Context, operation string) (bool, error) {
}
func UserStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user")
return s, err
}
session := DB(ctx).Model(&User{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -4,6 +4,8 @@ import (
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/pkg/errors"
"github.com/toolkits/pkg/str"
"gorm.io/gorm"
@@ -111,6 +113,11 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
}
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/users")
return lst, err
}
var lst []*UserGroup
err := DB(ctx).Find(&lst).Error
return lst, err
@@ -139,6 +146,11 @@ func (ug *UserGroup) DelMembers(ctx *ctx.Context, userIds []int64) error {
}
func UserGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
if !ctx.IsCenter {
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user_group")
return s, err
}
session := DB(ctx).Model(&UserGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
var stats []*Statistics

View File

@@ -1,6 +1,9 @@
package models
import "github.com/ccfos/nightingale/v6/pkg/ctx"
import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
)
type UserGroupMember struct {
GroupId int64
@@ -54,8 +57,13 @@ func UserGroupMemberDel(ctx *ctx.Context, groupId int64, userIds []int64) error
return DB(ctx).Where("group_id = ? and user_id in ?", groupId, userIds).Delete(&UserGroupMember{}).Error
}
func UserGroupMemberGetAll(ctx *ctx.Context) ([]UserGroupMember, error) {
var lst []UserGroupMember
func UserGroupMemberGetAll(ctx *ctx.Context) ([]*UserGroupMember, error) {
if !ctx.IsCenter {
lst, err := poster.GetByUrls[[]*UserGroupMember](ctx, "/v1/n9e/user-group-members")
return lst, err
}
var lst []*UserGroupMember
err := DB(ctx).Find(&lst).Error
return lst, err
}

View File

@@ -3,18 +3,29 @@ package ctx
import (
"context"
"github.com/ccfos/nightingale/v6/conf"
"gorm.io/gorm"
)
type Context struct {
DB *gorm.DB
Ctx context.Context
DB *gorm.DB
CenterApi conf.CenterApi
Ctx context.Context
IsCenter bool
}
func NewContext(ctx context.Context, db *gorm.DB) *Context {
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, centerApis ...conf.CenterApi) *Context {
var api conf.CenterApi
if len(centerApis) > 0 {
api = centerApis[0]
}
return &Context{
Ctx: ctx,
DB: db,
Ctx: ctx,
DB: db,
CenterApi: api,
IsCenter: isCenter,
}
}

View File

@@ -31,28 +31,11 @@ type Config struct {
IdleTimeout int
JWTAuth JWTAuth
ProxyAuth ProxyAuth
Alert Alert
Pushgw Pushgw
Heartbeat Heartbeat
Service Service
APIForAgent BasicAuths
APIForService BasicAuths
}
type Alert struct {
BasicAuth gin.Accounts
Enable bool
}
type Pushgw struct {
BasicAuth gin.Accounts
Enable bool
}
type Heartbeat struct {
BasicAuth gin.Accounts
Enable bool
}
type Service struct {
type BasicAuths struct {
BasicAuth gin.Accounts
Enable bool
}

View File

@@ -2,14 +2,160 @@ package poster
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
)
type DataResponse[T any] struct {
Dat T `json:"dat"`
Err string `json:"err"`
}
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
var err error
addrs := ctx.CenterApi.Addrs
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
dat, e := GetByUrl[T](url, ctx.CenterApi.BasicAuth)
if e != nil {
err = e
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
continue
}
return dat, nil
}
var dat T
return dat, err
}
func GetByUrl[T any](url string, basicAuth gin.Accounts) (T, error) {
var dat T
req := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond)
if len(basicAuth) > 0 {
var token string
for username, password := range basicAuth {
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
}
if len(token) > 0 {
req = req.Header("Authorization", "Basic "+token)
}
}
resp, err := req.Response()
if err != nil {
return dat, fmt.Errorf("failed to fetch from url: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return dat, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return dat, fmt.Errorf("failed to read response body: %w", err)
}
var dataResp DataResponse[T]
err = json.Unmarshal(body, &dataResp)
if err != nil {
return dat, fmt.Errorf("failed to decode response: %w", err)
}
if dataResp.Err != "" {
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
return dataResp.Dat, nil
}
type PostResponse struct {
Err string `json:"err"`
}
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
addrs := ctx.CenterApi.Addrs
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
for _, addr := range addrs {
url := fmt.Sprintf("%s%s", addr, path)
err = PostByUrl(url, ctx.CenterApi.BasicAuth, v)
if err == nil {
return
}
}
return
}
func PostByUrl(url string, basicAuth gin.Accounts, v interface{}) (err error) {
var bs []byte
bs, err = json.Marshal(v)
if err != nil {
return
}
bf := bytes.NewBuffer(bs)
client := http.Client{
Timeout: 10 * time.Second,
}
req, err := http.NewRequest("POST", url, bf)
req.Header.Set("Content-Type", "application/json")
if len(basicAuth) > 0 {
var token string
for username, password := range basicAuth {
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
}
if len(token) > 0 {
req.Header.Set("Authorization", "Basic "+token)
}
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to fetch from url: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
var dataResp PostResponse
err = json.Unmarshal(body, &dataResp)
if err != nil {
return fmt.Errorf("failed to decode response: %w body:%s", err, string(body))
}
if dataResp.Err != "" {
return fmt.Errorf("error from server: %s", dataResp.Err)
}
return nil
}
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {
var bs []byte

View File

@@ -42,7 +42,7 @@ func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]stri
}
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
logger.Warningf("%v post to %s got error: %v", w.Opts, w.Opts.Url, err)
logger.Debug("example timeseries:", items[0].String())
}
}

View File

@@ -30,6 +30,10 @@ func (po *PromOption) Equal(target PromOption) bool {
return false
}
if po.WriteAddr != target.WriteAddr {
return false
}
if po.Timeout != target.Timeout {
return false
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/aconf"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/prom"
"github.com/prometheus/client_golang/api"
@@ -42,11 +43,25 @@ type PromSetting struct {
}
func (pc *PromClientMap) loadFromDatabase() {
datasources, err := models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
var datasources []*models.Datasource
var err error
if !pc.ctx.IsCenter {
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.PROMETHEUS)
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
for i := 0; i < len(datasources); i++ {
datasources[i].FE2DB()
}
} else {
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
}
}
newCluster := make(map[int64]struct{})
for _, ds := range datasources {
dsId := ds.Id

View File

@@ -4,21 +4,23 @@ import (
"sync"
"time"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/slice"
"gorm.io/gorm"
)
type Set struct {
sync.Mutex
items map[string]struct{}
db *gorm.DB
ctx *ctx.Context
}
func New(db *gorm.DB) *Set {
func New(ctx *ctx.Context) *Set {
set := &Set{
items: make(map[string]struct{}),
db: db,
ctx: ctx,
}
set.Init()
@@ -68,7 +70,7 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
lst = append(lst, ident)
num++
if num == 100 {
if err := s.updateTargets(lst, now); err != nil {
if err := s.UpdateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
lst = lst[:0]
@@ -76,18 +78,32 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
}
}
if err := s.updateTargets(lst, now); err != nil {
if err := s.UpdateTargets(lst, now); err != nil {
logger.Errorf("failed to update targets: %v", err)
}
}
func (s *Set) updateTargets(lst []string, now int64) error {
type TargetUpdate struct {
Lst []string `json:"lst"`
Now int64 `json:"now"`
}
func (s *Set) UpdateTargets(lst []string, now int64) error {
if !s.ctx.IsCenter {
t := TargetUpdate{
Lst: lst,
Now: now,
}
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
return err
}
count := int64(len(lst))
if count == 0 {
return nil
}
ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
ret := s.ctx.DB.Table("target").Where("ident in ?", lst).Update("update_at", now)
if ret.Error != nil {
return ret.Error
}
@@ -98,14 +114,14 @@ func (s *Set) updateTargets(lst []string, now int64) error {
// there are some idents not found in db, so insert them
var exists []string
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
err := s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
if err != nil {
return err
}
news := slice.SubString(lst, exists)
for i := 0; i < len(news); i++ {
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
err = s.ctx.DB.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
if err != nil {
logger.Error("failed to insert target:", news[i], "error:", err)
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/router"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"
)
type PushgwProvider struct {
@@ -31,13 +30,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}
db, err := storage.New(config.DB)
if err != nil {
return nil, err
}
ctx := ctx.NewContext(context.Background(), db)
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
idents := idents.New(db)
idents := idents.New(ctx)
stats := memsto.NewSyncStats()

View File

@@ -39,7 +39,7 @@ func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheTyp
}
func (rt *Router) Config(r *gin.Engine) {
if !rt.HTTP.Pushgw.Enable {
if !rt.HTTP.APIForAgent.Enable {
return
}
@@ -53,16 +53,18 @@ func (rt *Router) Config(r *gin.Engine) {
r.POST("/datadog/api/v1/metadata", datadogMetadata)
r.POST("/datadog/intake/", datadogIntake)
if len(rt.HTTP.Pushgw.BasicAuth) > 0 {
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
// enable basic auth
auth := gin.BasicAuth(rt.HTTP.Pushgw.BasicAuth)
auth := gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth)
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
r.POST("/openfalcon/push", auth, rt.falconPush)
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
} else {
// no need basic auth
r.POST("/opentsdb/put", rt.openTSDBPut)
r.POST("/openfalcon/push", rt.falconPush)
r.POST("/prometheus/v1/write", rt.remoteWrite)
r.POST("/v1/n9e/target-update", rt.targetUpdate)
}
}

View File

@@ -186,9 +186,9 @@ func (r *Router) datadogSeries(c *gin.Context) {
apiKey = ""
}
if len(r.HTTP.Pushgw.BasicAuth) > 0 {
if len(r.HTTP.APIForAgent.BasicAuth) > 0 {
ok := false
for _, v := range r.HTTP.Pushgw.BasicAuth {
for _, v := range r.HTTP.APIForAgent.BasicAuth {
if apiKey == v {
ok = true
break

View File

@@ -0,0 +1,14 @@
package router
import (
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
func (rt *Router) targetUpdate(c *gin.Context) {
var f idents.TargetUpdate
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(rt.IdentSet.UpdateTargets(f.Lst, f.Now))
}