mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 14:38:55 +00:00
Compare commits
9 Commits
get-config
...
refactor_h
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7094665c25 | ||
|
|
f1a5c2065c | ||
|
|
6b9ceda9c1 | ||
|
|
7390d42e62 | ||
|
|
a35f879dc0 | ||
|
|
3fd4ea4853 | ||
|
|
20f0a9d16d | ||
|
|
5d4151983a | ||
|
|
83b5f12474 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -42,7 +42,7 @@ _test
|
||||
/docker/n9e
|
||||
/docker/mysqldata
|
||||
/docker/experience_pg_vm/pgdata
|
||||
/etc.local
|
||||
/etc.local*
|
||||
|
||||
.alerts
|
||||
.idea
|
||||
|
||||
@@ -2,6 +2,7 @@ before:
|
||||
hooks:
|
||||
# You may remove this if you don't use go modules.
|
||||
- go mod tidy
|
||||
- go install github.com/rakyll/statik
|
||||
|
||||
snapshot:
|
||||
name_template: '{{ .Tag }}'
|
||||
|
||||
9
Makefile
9
Makefile
@@ -1,4 +1,4 @@
|
||||
.PHONY: start build
|
||||
.PHONY: prebuild start build
|
||||
|
||||
ROOT:=$(shell pwd -P)
|
||||
GIT_COMMIT:=$(shell git --work-tree ${ROOT} rev-parse 'HEAD^{commit}')
|
||||
@@ -6,6 +6,11 @@ _GIT_VERSION:=$(shell git --work-tree ${ROOT} describe --tags --abbrev=14 "${GIT
|
||||
TAG=$(shell echo "${_GIT_VERSION}" | awk -F"-" '{print $$1}')
|
||||
RELEASE_VERSION:="$(TAG)-$(GIT_COMMIT)"
|
||||
|
||||
prebuild:
|
||||
echo "begin download and embed the front-end file..."
|
||||
sh fe.sh
|
||||
echo "front-end file download and embedding completed."
|
||||
|
||||
all: build
|
||||
|
||||
build:
|
||||
@@ -17,7 +22,7 @@ build-alert:
|
||||
build-pushgw:
|
||||
go build -ldflags "-w -s -X github.com/ccfos/nightingale/v6/pkg/version.Version=$(RELEASE_VERSION)" -o n9e-pushgw ./cmd/pushgw/main.go
|
||||
|
||||
build-cli:
|
||||
build-cli:
|
||||
go build -ldflags "-w -s -X github.com/ccfos/nightingale/v6/pkg/version.Version=$(RELEASE_VERSION)" -o n9e-cli ./cmd/cli/main.go
|
||||
|
||||
run:
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
)
|
||||
|
||||
func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
@@ -37,21 +36,12 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := storage.New(config.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
|
||||
redis, err := storage.NewRedis(config.Redis)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
alertStats := astats.NewSyncStats()
|
||||
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
|
||||
targetCache := memsto.NewTargetCache(ctx, syncStats, nil)
|
||||
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
|
||||
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
|
||||
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)
|
||||
@@ -62,7 +52,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, false)
|
||||
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
|
||||
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
|
||||
@@ -77,7 +67,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
}
|
||||
|
||||
func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
|
||||
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
|
||||
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, notifyConfigCache *memsto.NotifyConfigCacheType, datasourceCache *memsto.DatasourceCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
|
||||
userCache := memsto.NewUserCache(ctx, syncStats)
|
||||
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
|
||||
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
|
||||
@@ -85,12 +75,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al
|
||||
|
||||
go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)
|
||||
|
||||
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)
|
||||
naming := naming.NewNaming(ctx, alertc.Heartbeat)
|
||||
|
||||
writers := writer.NewWriters(pushgwc)
|
||||
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)
|
||||
|
||||
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
|
||||
eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, datasourceCache, promClients, naming, ctx, alertStats)
|
||||
|
||||
dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, notifyConfigCache, alertc.Alerting, ctx)
|
||||
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/queue"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/concurrent/semaphore"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -82,78 +83,17 @@ func (e *Consumer) consumeOne(event *models.AlertCurEvent) {
|
||||
}
|
||||
|
||||
func (e *Consumer) persist(event *models.AlertCurEvent) {
|
||||
has, err := models.AlertCurEventExists(e.ctx, "hash=?", event.Hash)
|
||||
if err != nil {
|
||||
logger.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
|
||||
return
|
||||
}
|
||||
|
||||
his := event.ToHis(e.ctx)
|
||||
|
||||
// 不管是告警还是恢复,全量告警里都要记录
|
||||
if err := his.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
|
||||
if has {
|
||||
// 活跃告警表中有记录,删之
|
||||
err = models.AlertCurEventDelByHash(e.ctx, event.Hash)
|
||||
if !e.ctx.IsCenter {
|
||||
event.DB2FE()
|
||||
err := poster.PostByUrls(e.ctx, "/v1/n9e/event-persist", event)
|
||||
if err != nil {
|
||||
logger.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
|
||||
return
|
||||
logger.Errorf("event%+v persist err:%v", event, err)
|
||||
}
|
||||
|
||||
if !event.IsRecovered {
|
||||
// 恢复事件,从活跃告警列表彻底删掉,告警事件,要重新加进来新的event
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if event.IsRecovered {
|
||||
// alert_cur_event表里没有数据,表示之前没告警,结果现在报了恢复,神奇....理论上不应该出现的
|
||||
return
|
||||
}
|
||||
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(e.ctx); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
event.TriggerValue,
|
||||
)
|
||||
}
|
||||
err := models.EventPersist(e.ctx, event)
|
||||
if err != nil {
|
||||
logger.Errorf("event%+v persist err:%v", event, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,7 +199,7 @@ func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, not
|
||||
}
|
||||
|
||||
// handle event callbacks
|
||||
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.notifyConfigCache.GetIbex())
|
||||
sender.SendCallbacks(e.ctx, notifyTarget.ToCallbackList(), event, e.targetCache, e.userCache, e.notifyConfigCache.GetIbex())
|
||||
|
||||
// handle global webhooks
|
||||
sender.SendWebhooks(notifyTarget.ToWebhookList(), event)
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
)
|
||||
|
||||
type Scheduler struct {
|
||||
isCenter bool
|
||||
// key: hash
|
||||
alertRules map[string]*AlertRuleWorker
|
||||
|
||||
@@ -38,11 +37,10 @@ type Scheduler struct {
|
||||
stats *astats.Stats
|
||||
}
|
||||
|
||||
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
|
||||
func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
|
||||
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, datasourceCache *memsto.DatasourceCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
|
||||
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
|
||||
scheduler := &Scheduler{
|
||||
isCenter: isCenter,
|
||||
aconf: aconf,
|
||||
alertRules: make(map[string]*AlertRuleWorker),
|
||||
|
||||
@@ -108,7 +106,7 @@ func (s *Scheduler) syncAlertRules() {
|
||||
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
|
||||
alertRuleWorkers[alertRule.Hash()] = alertRule
|
||||
}
|
||||
} else if rule.IsHostRule() && s.isCenter {
|
||||
} else if rule.IsHostRule() && s.ctx.IsCenter {
|
||||
// all host rule will be processed by center instance
|
||||
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
|
||||
continue
|
||||
|
||||
@@ -109,7 +109,7 @@ func (arw *AlertRuleWorker) Eval() {
|
||||
}
|
||||
|
||||
func (arw *AlertRuleWorker) Stop() {
|
||||
logger.Infof("%s stopped", arw.Key())
|
||||
logger.Infof("rule_eval %s stopped", arw.Key())
|
||||
close(arw.quit)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -16,14 +17,12 @@ import (
|
||||
type Naming struct {
|
||||
ctx *ctx.Context
|
||||
heartbeatConfig aconf.HeartbeatConfig
|
||||
isCenter bool
|
||||
}
|
||||
|
||||
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
|
||||
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
|
||||
naming := &Naming{
|
||||
ctx: ctx,
|
||||
heartbeatConfig: heartbeat,
|
||||
isCenter: isCenter,
|
||||
}
|
||||
naming.Heartbeats()
|
||||
return naming
|
||||
@@ -45,6 +44,10 @@ func (n *Naming) Heartbeats() error {
|
||||
}
|
||||
|
||||
func (n *Naming) loopDeleteInactiveInstances() {
|
||||
if !n.ctx.IsCenter {
|
||||
return
|
||||
}
|
||||
|
||||
interval := time.Duration(10) * time.Minute
|
||||
for {
|
||||
time.Sleep(interval)
|
||||
@@ -112,7 +115,7 @@ func (n *Naming) heartbeat() error {
|
||||
localss[datasourceIds[i]] = newss
|
||||
}
|
||||
|
||||
if n.isCenter {
|
||||
if n.ctx.IsCenter {
|
||||
// 如果是中心节点,还需要处理 host 类型的告警规则,host 类型告警规则,和数据源无关,想复用下数据源的 hash ring,想用一个虚假的数据源 id 来处理
|
||||
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
|
||||
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.EngineName, HostDatasource)
|
||||
@@ -146,6 +149,11 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
|
||||
return nil, fmt.Errorf("cluster is empty")
|
||||
}
|
||||
|
||||
if !n.ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?dsid="+fmt.Sprintf("%d", datasourceId))
|
||||
return lst, err
|
||||
}
|
||||
|
||||
// 30秒内有心跳,就认为是活的
|
||||
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ func (p *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, inh
|
||||
logger.Errorf("rule not found %+v", anomalyPoints)
|
||||
return
|
||||
}
|
||||
|
||||
p.rule = cachedRule
|
||||
now := time.Now().Unix()
|
||||
alertingKeys := map[string]struct{}{}
|
||||
|
||||
@@ -337,7 +337,7 @@ func (p *Processor) pushEventToQueue(e *models.AlertCurEvent) {
|
||||
func (p *Processor) RecoverAlertCurEventFromDb() {
|
||||
p.pendings = NewAlertCurEventMap(nil)
|
||||
|
||||
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(p.ctx, p.rule.Id, p.datasourceId)
|
||||
curEvents, err := models.AlertCurEventGetByRuleIdAndDsId(p.ctx, p.rule.Id, p.datasourceId)
|
||||
if err != nil {
|
||||
logger.Errorf("recover event from db for rule:%s failed, err:%s", p.Key(), err)
|
||||
p.fires = NewAlertCurEventMap(nil)
|
||||
|
||||
@@ -39,15 +39,16 @@ func New(httpConfig httpx.Config, alert aconf.Alert, amc *memsto.AlertMuteCacheT
|
||||
}
|
||||
|
||||
func (rt *Router) Config(r *gin.Engine) {
|
||||
if !rt.HTTP.Alert.Enable {
|
||||
if !rt.HTTP.APIForService.Enable {
|
||||
return
|
||||
}
|
||||
|
||||
service := r.Group("/v1/n9e")
|
||||
if len(rt.HTTP.Alert.BasicAuth) > 0 {
|
||||
service.Use(gin.BasicAuth(rt.HTTP.Alert.BasicAuth))
|
||||
if len(rt.HTTP.APIForService.BasicAuth) > 0 {
|
||||
service.Use(gin.BasicAuth(rt.HTTP.APIForService.BasicAuth))
|
||||
}
|
||||
service.POST("/event", rt.pushEventToQueue)
|
||||
service.POST("/event-persist", rt.eventPersist)
|
||||
service.POST("/make-event", rt.makeEvent)
|
||||
}
|
||||
|
||||
|
||||
@@ -83,6 +83,13 @@ func (rt *Router) pushEventToQueue(c *gin.Context) {
|
||||
ginx.NewRender(c).Message(nil)
|
||||
}
|
||||
|
||||
func (rt *Router) eventPersist(c *gin.Context) {
|
||||
var event *models.AlertCurEvent
|
||||
ginx.BindJSON(c, &event)
|
||||
event.FE2DB()
|
||||
ginx.NewRender(c).Message(models.EventPersist(rt.Ctx, event))
|
||||
}
|
||||
|
||||
type eventForm struct {
|
||||
Alert bool `json:"alert"`
|
||||
AnomalyPoints []common.AnomalyPoint `json:"vectors"`
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
|
||||
func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
|
||||
for _, url := range urls {
|
||||
if url == "" {
|
||||
continue
|
||||
@@ -23,7 +23,7 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
|
||||
|
||||
if strings.HasPrefix(url, "${ibex}") {
|
||||
if !event.IsRecovered {
|
||||
handleIbex(ctx, url, event, targetCache, ibexConf)
|
||||
handleIbex(ctx, url, event, targetCache, userCache, ibexConf)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -34,9 +34,9 @@ func SendCallbacks(ctx *ctx.Context, urls []string, event *models.AlertCurEvent,
|
||||
|
||||
resp, code, err := poster.PostJSON(url, 5*time.Second, event, 3)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback(rule_id=%d url=%s) fail, resp: %s, err: %v, code: %d", event.RuleId, url, string(resp), err, code)
|
||||
logger.Errorf("event_callback_fail(rule_id=%d url=%s), resp: %s, err: %v, code: %d", event.RuleId, url, string(resp), err, code)
|
||||
} else {
|
||||
logger.Infof("event_callback(rule_id=%d url=%s) succ, resp: %s, code: %d", event.RuleId, url, string(resp), code)
|
||||
logger.Infof("event_callback_succ(rule_id=%d url=%s), resp: %s, code: %d", event.RuleId, url, string(resp), code)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,7 +60,7 @@ type TaskCreateReply struct {
|
||||
Dat int64 `json:"dat"` // task.id
|
||||
}
|
||||
|
||||
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, ibexConf aconf.Ibex) {
|
||||
func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType, ibexConf aconf.Ibex) {
|
||||
arr := strings.Split(url, "/")
|
||||
|
||||
var idstr string
|
||||
@@ -103,7 +103,7 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
|
||||
|
||||
// check perm
|
||||
// tpl.GroupId - host - account 三元组校验权限
|
||||
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache)
|
||||
can, err := canDoIbex(ctx, tpl.UpdateBy, tpl, host, targetCache, userCache)
|
||||
if err != nil {
|
||||
logger.Errorf("event_callback_ibex: check perm fail: %v", err)
|
||||
return
|
||||
@@ -176,12 +176,8 @@ func handleIbex(ctx *ctx.Context, url string, event *models.AlertCurEvent, targe
|
||||
}
|
||||
}
|
||||
|
||||
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType) (bool, error) {
|
||||
user, err := models.UserGetByUsername(ctx, username)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
func canDoIbex(ctx *ctx.Context, username string, tpl *models.TaskTpl, host string, targetCache *memsto.TargetCacheType, userCache *memsto.UserCacheType) (bool, error) {
|
||||
user := userCache.GetByUsername(username)
|
||||
if user != nil && user.IsAdmin() {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
|
||||
if file.IsExist(fpath) {
|
||||
oldContent, err := file.ToString(fpath)
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: read script file err: %v", err)
|
||||
logger.Errorf("event_script_notify_fail: read script file err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -47,13 +47,13 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
|
||||
if rewrite {
|
||||
_, err := file.WriteString(fpath, config.Content)
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: write script file err: %v", err)
|
||||
logger.Errorf("event_script_notify_fail: write script file err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = os.Chmod(fpath, 0777)
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: chmod script file err: %v", err)
|
||||
logger.Errorf("event_script_notify_fail: chmod script file err: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -70,7 +70,7 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
|
||||
|
||||
err := startCmd(cmd)
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: run cmd err: %v", err)
|
||||
logger.Errorf("event_script_notify_fail: run cmd err: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -78,20 +78,20 @@ func alertingCallScript(stdinBytes []byte, notifyScript models.NotifyScript) {
|
||||
|
||||
if isTimeout {
|
||||
if err == nil {
|
||||
logger.Errorf("event_notify: timeout and killed process %s", fpath)
|
||||
logger.Errorf("event_script_notify_fail: timeout and killed process %s", fpath)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: kill process %s occur error %v", fpath, err)
|
||||
logger.Errorf("event_script_notify_fail: kill process %s occur error %v", fpath, err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
logger.Errorf("event_notify: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
|
||||
logger.Errorf("event_script_notify_fail: exec script %s occur error: %v, output: %s", fpath, err, buf.String())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Infof("event_notify: exec %s output: %s", fpath, buf.String())
|
||||
logger.Infof("event_script_notify_ok: exec %s output: %s", fpath, buf.String())
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent) {
|
||||
var resp *http.Response
|
||||
resp, err = client.Do(req)
|
||||
if err != nil {
|
||||
logger.Warningf("WebhookCallError, ruleId: [%d], eventId: [%d], url: [%s], error: [%s]", event.RuleId, event.Id, conf.Url, err)
|
||||
logger.Errorf("event_webhook_fail, ruleId: [%d], eventId: [%d], url: [%s], error: [%s]", event.RuleId, event.Id, conf.Url, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -63,6 +63,6 @@ func SendWebhooks(webhooks []*models.Webhook, event *models.AlertCurEvent) {
|
||||
body, _ = ioutil.ReadAll(resp.Body)
|
||||
}
|
||||
|
||||
logger.Debugf("alertingWebhook done, url: %s, response code: %d, body: %s", conf.Url, resp.StatusCode, string(body))
|
||||
logger.Debugf("event_webhook_succ, url: %s, response code: %d, body: %s", conf.Url, resp.StatusCode, string(body))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,18 +1,12 @@
|
||||
package cconf
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type Center struct {
|
||||
Plugins []Plugin
|
||||
BasicAuth gin.Accounts
|
||||
MetricsYamlFile string
|
||||
OpsYamlFile string
|
||||
BuiltinIntegrationsDir string
|
||||
I18NHeaderKey string
|
||||
MetricDesc MetricDescType
|
||||
TargetMetrics map[string]string
|
||||
AnonymousAccess AnonymousAccess
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"path"
|
||||
|
||||
"github.com/toolkits/pkg/file"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
)
|
||||
|
||||
// metricDesc , As load map happens before read map, there is no necessary to use concurrent map for metric desc store
|
||||
@@ -33,10 +32,10 @@ func GetMetricDesc(lang, metric string) string {
|
||||
return MetricDesc.CommonDesc[metric]
|
||||
}
|
||||
|
||||
func LoadMetricsYaml(metricsYamlFile string) error {
|
||||
func LoadMetricsYaml(configDir, metricsYamlFile string) error {
|
||||
fp := metricsYamlFile
|
||||
if fp == "" {
|
||||
fp = path.Join(runner.Cwd, "etc", "metrics.yaml")
|
||||
fp = path.Join(configDir, "metrics.yaml")
|
||||
}
|
||||
if !file.IsExist(fp) {
|
||||
return nil
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"path"
|
||||
|
||||
"github.com/toolkits/pkg/file"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
)
|
||||
|
||||
var Operations = Operation{}
|
||||
@@ -19,10 +18,10 @@ type Ops struct {
|
||||
Ops []string `yaml:"ops" json:"ops"`
|
||||
}
|
||||
|
||||
func LoadOpsYaml(opsYamlFile string) error {
|
||||
func LoadOpsYaml(configDir string, opsYamlFile string) error {
|
||||
fp := opsYamlFile
|
||||
if fp == "" {
|
||||
fp = path.Join(runner.Cwd, "etc", "ops.yaml")
|
||||
fp = path.Join(configDir, "ops.yaml")
|
||||
}
|
||||
if !file.IsExist(fp) {
|
||||
return nil
|
||||
|
||||
@@ -33,8 +33,8 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, fmt.Errorf("failed to init config: %v", err)
|
||||
}
|
||||
|
||||
cconf.LoadMetricsYaml(config.Center.MetricsYamlFile)
|
||||
cconf.LoadOpsYaml(config.Center.OpsYamlFile)
|
||||
cconf.LoadMetricsYaml(configDir, config.Center.MetricsYamlFile)
|
||||
cconf.LoadOpsYaml(configDir, config.Center.OpsYamlFile)
|
||||
|
||||
logxClean, err := logx.Init(config.Log)
|
||||
if err != nil {
|
||||
@@ -47,7 +47,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), db, true)
|
||||
models.InitRoot(ctx)
|
||||
|
||||
redis, err := storage.NewRedis(config.Redis)
|
||||
@@ -56,7 +56,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
}
|
||||
|
||||
metas := metas.New(redis)
|
||||
idents := idents.New(db)
|
||||
idents := idents.New(ctx)
|
||||
|
||||
syncStats := memsto.NewSyncStats()
|
||||
alertStats := astats.NewSyncStats()
|
||||
@@ -73,7 +73,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)
|
||||
|
||||
externalProcessors := process.NewExternalProcessors()
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients, true)
|
||||
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, notifyConfigCache, dsCache, ctx, promClients)
|
||||
|
||||
writers := writer.NewWriters(config.Pushgw)
|
||||
|
||||
|
||||
@@ -3,8 +3,6 @@ package router
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -12,15 +10,17 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/center/cstats"
|
||||
"github.com/ccfos/nightingale/v6/center/metas"
|
||||
"github.com/ccfos/nightingale/v6/center/sso"
|
||||
_ "github.com/ccfos/nightingale/v6/front/statik"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/pkg/aop"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/httpx"
|
||||
"github.com/ccfos/nightingale/v6/prom"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/rakyll/statik/fs"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type Router struct {
|
||||
@@ -89,38 +89,32 @@ func languageDetector(i18NHeaderKey string) gin.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Router) configNoRoute(r *gin.Engine) {
|
||||
func (rt *Router) configNoRoute(r *gin.Engine, fs *http.FileSystem) {
|
||||
r.NoRoute(func(c *gin.Context) {
|
||||
arr := strings.Split(c.Request.URL.Path, ".")
|
||||
suffix := arr[len(arr)-1]
|
||||
|
||||
switch suffix {
|
||||
case "png", "jpeg", "jpg", "svg", "ico", "gif", "css", "js", "html", "htm", "gz", "zip", "map":
|
||||
cwdarr := []string{"/"}
|
||||
if runtime.GOOS == "windows" {
|
||||
cwdarr[0] = ""
|
||||
}
|
||||
cwdarr = append(cwdarr, strings.Split(runner.Cwd, "/")...)
|
||||
cwdarr = append(cwdarr, "pub")
|
||||
cwdarr = append(cwdarr, strings.Split(c.Request.URL.Path, "/")...)
|
||||
c.File(path.Join(cwdarr...))
|
||||
c.FileFromFS(c.Request.URL.Path, *fs)
|
||||
default:
|
||||
cwdarr := []string{"/"}
|
||||
if runtime.GOOS == "windows" {
|
||||
cwdarr[0] = ""
|
||||
}
|
||||
cwdarr = append(cwdarr, strings.Split(runner.Cwd, "/")...)
|
||||
cwdarr = append(cwdarr, "pub")
|
||||
cwdarr = append(cwdarr, "index.html")
|
||||
c.File(path.Join(cwdarr...))
|
||||
c.FileFromFS("/", *fs)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
r.Use(stat())
|
||||
r.Use(languageDetector(rt.Center.I18NHeaderKey))
|
||||
r.Use(aop.Recovery())
|
||||
|
||||
statikFS, err := fs.New()
|
||||
if err != nil {
|
||||
logger.Errorf("cannot create statik fs: %v", err)
|
||||
}
|
||||
r.StaticFS("/pub", statikFS)
|
||||
|
||||
pagesPrefix := "/api/n9e"
|
||||
pages := r.Group(pagesPrefix)
|
||||
{
|
||||
@@ -148,6 +142,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/auth/callback", rt.loginCallback)
|
||||
pages.GET("/auth/callback/cas", rt.loginCallbackCas)
|
||||
pages.GET("/auth/callback/oauth", rt.loginCallbackOAuth)
|
||||
pages.GET("/auth/perms", rt.allPerms)
|
||||
|
||||
pages.GET("/metrics/desc", rt.metricsDescGetFile)
|
||||
pages.POST("/metrics/desc", rt.metricsDescGetMap)
|
||||
@@ -303,7 +298,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
|
||||
pages.GET("/role/:id/ops", rt.auth(), rt.admin(), rt.operationOfRole)
|
||||
pages.PUT("/role/:id/ops", rt.auth(), rt.admin(), rt.roleBindOperation)
|
||||
pages.GET("operation", rt.operations)
|
||||
pages.GET("/operation", rt.operations)
|
||||
|
||||
pages.GET("/notify-tpls", rt.auth(), rt.admin(), rt.notifyTplGets)
|
||||
pages.PUT("/notify-tpl/content", rt.auth(), rt.admin(), rt.notifyTplUpdateContent)
|
||||
@@ -329,17 +324,20 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.PUT("/notify-config", rt.auth(), rt.admin(), rt.notifyConfigPut)
|
||||
}
|
||||
|
||||
if rt.HTTP.Service.Enable {
|
||||
if rt.HTTP.APIForService.Enable {
|
||||
service := r.Group("/v1/n9e")
|
||||
if len(rt.HTTP.Service.BasicAuth) > 0 {
|
||||
service.Use(gin.BasicAuth(rt.HTTP.Service.BasicAuth))
|
||||
if len(rt.HTTP.APIForService.BasicAuth) > 0 {
|
||||
service.Use(gin.BasicAuth(rt.HTTP.APIForService.BasicAuth))
|
||||
}
|
||||
{
|
||||
service.Any("/prometheus/*url", rt.dsProxy)
|
||||
service.POST("/users", rt.userAddPost)
|
||||
service.GET("/users", rt.userFindAll)
|
||||
|
||||
service.GET("/targets", rt.targetGets)
|
||||
service.GET("/user-groups", rt.userGroupGetsByService)
|
||||
service.GET("/user-group-members", rt.userGroupMemberGetsByService)
|
||||
|
||||
service.GET("/targets", rt.targetGetsByService)
|
||||
service.GET("/targets/tags", rt.targetGetTags)
|
||||
service.POST("/targets/tags", rt.targetBindTagsByService)
|
||||
service.DELETE("/targets/tags", rt.targetUnbindTagsByService)
|
||||
@@ -351,16 +349,29 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.GET("/alert-rule/:arid", rt.alertRuleGet)
|
||||
service.GET("/alert-rules", rt.alertRulesGetByService)
|
||||
|
||||
service.GET("/alert-subscribes", rt.alertSubscribeGetsByService)
|
||||
|
||||
service.GET("/busi-groups", rt.busiGroupGetsByService)
|
||||
|
||||
service.GET("/datasources", rt.datasourceGetsByService)
|
||||
service.GET("/datasource-ids", rt.getDatasourceIds)
|
||||
service.POST("/server-heartbeat", rt.serverHeartbeat)
|
||||
service.GET("/servers-active", rt.serversActive)
|
||||
|
||||
service.GET("/recording-rules", rt.recordingRuleGetsByService)
|
||||
|
||||
service.GET("/alert-mutes", rt.alertMuteGets)
|
||||
service.POST("/alert-mutes", rt.alertMuteAddByService)
|
||||
service.DELETE("/alert-mutes", rt.alertMuteDel)
|
||||
|
||||
service.GET("/alert-cur-events", rt.alertCurEventsList)
|
||||
service.GET("/alert-cur-events-get-by-rid", rt.alertCurEventsGetByRid)
|
||||
service.GET("/alert-his-events", rt.alertHisEventsList)
|
||||
service.GET("/alert-his-event/:eid", rt.alertHisEventGet)
|
||||
|
||||
service.GET("/config/:id", rt.configGet)
|
||||
service.GET("/configs", rt.configsGet)
|
||||
service.GET("/config", rt.configGetByKey)
|
||||
service.PUT("/configs", rt.configsPut)
|
||||
service.POST("/configs", rt.configsPost)
|
||||
service.DELETE("/configs", rt.configsDel)
|
||||
@@ -368,21 +379,26 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
service.POST("/conf-prop/encrypt", rt.confPropEncrypt)
|
||||
service.POST("/conf-prop/decrypt", rt.confPropDecrypt)
|
||||
|
||||
service.GET("/datasource-ids", rt.getDatasourceIds)
|
||||
service.GET("/statistic", rt.statistic)
|
||||
|
||||
service.GET("/notify-tpls", rt.notifyTplGets)
|
||||
|
||||
service.POST("/task-record-add", rt.taskRecordAdd)
|
||||
}
|
||||
}
|
||||
|
||||
if rt.HTTP.Heartbeat.Enable {
|
||||
if rt.HTTP.APIForAgent.Enable {
|
||||
heartbeat := r.Group("/v1/n9e")
|
||||
{
|
||||
if len(rt.HTTP.Heartbeat.BasicAuth) > 0 {
|
||||
heartbeat.Use(gin.BasicAuth(rt.HTTP.Heartbeat.BasicAuth))
|
||||
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
|
||||
heartbeat.Use(gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth))
|
||||
}
|
||||
heartbeat.POST("/heartbeat", rt.heartbeat)
|
||||
}
|
||||
}
|
||||
|
||||
rt.configNoRoute(r)
|
||||
rt.configNoRoute(r, &statikFS)
|
||||
|
||||
}
|
||||
|
||||
func Render(c *gin.Context, data, msg interface{}) {
|
||||
|
||||
@@ -128,6 +128,13 @@ func (rt *Router) alertCurEventsCardDetails(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
// alertCurEventsGetByRid
|
||||
func (rt *Router) alertCurEventsGetByRid(c *gin.Context) {
|
||||
rid := ginx.QueryInt64(c, "rid")
|
||||
dsId := ginx.QueryInt64(c, "dsid")
|
||||
ginx.NewRender(c).Data(models.AlertCurEventGetByRuleIdAndDsId(rt.Ctx, rid, dsId))
|
||||
}
|
||||
|
||||
// 列表方式,拉取活跃告警
|
||||
func (rt *Router) alertCurEventsList(c *gin.Context) {
|
||||
stime, etime := getTimeRange(c)
|
||||
|
||||
@@ -27,7 +27,12 @@ func (rt *Router) alertRuleGets(c *gin.Context) {
|
||||
}
|
||||
|
||||
func (rt *Router) alertRulesGetByService(c *gin.Context) {
|
||||
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
|
||||
prods := []string{}
|
||||
prodStr := ginx.QueryStr(c, "prods", "")
|
||||
if prodStr != "" {
|
||||
prods = strings.Split(ginx.QueryStr(c, "prods", ""), ",")
|
||||
}
|
||||
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
algorithm := ginx.QueryStr(c, "algorithm", "")
|
||||
cluster := ginx.QueryStr(c, "cluster", "")
|
||||
|
||||
@@ -110,3 +110,8 @@ func (rt *Router) alertSubscribeDel(c *gin.Context) {
|
||||
|
||||
ginx.NewRender(c).Message(models.AlertSubscribeDel(rt.Ctx, f.Ids))
|
||||
}
|
||||
|
||||
func (rt *Router) alertSubscribeGetsByService(c *gin.Context) {
|
||||
lst, err := models.AlertSubscribeGetsByService(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
@@ -123,6 +123,11 @@ func (rt *Router) busiGroupGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) busiGroupGetsByService(c *gin.Context) {
|
||||
lst, err := models.BusiGroupGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
// 这个接口只有在活跃告警页面才调用,获取各个BG的活跃告警数量
|
||||
func (rt *Router) busiGroupAlertingsGets(c *gin.Context) {
|
||||
ids := ginx.QueryStr(c, "ids", "")
|
||||
|
||||
@@ -20,6 +20,11 @@ func (rt *Router) configGet(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(configs, err)
|
||||
}
|
||||
|
||||
func (rt *Router) configGetByKey(c *gin.Context) {
|
||||
config, err := models.ConfigsGet(rt.Ctx, ginx.QueryStr(c, "key"))
|
||||
ginx.NewRender(c).Data(config, err)
|
||||
}
|
||||
|
||||
func (rt *Router) configsDel(c *gin.Context) {
|
||||
var f idsForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
@@ -36,6 +36,12 @@ func (rt *Router) datasourceList(c *gin.Context) {
|
||||
Render(c, list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) datasourceGetsByService(c *gin.Context) {
|
||||
typ := ginx.QueryStr(c, "typ", "")
|
||||
lst, err := models.GetDatasourcesGetsBy(rt.Ctx, typ, "", "", "")
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
type datasourceBrief struct {
|
||||
Id int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
|
||||
@@ -17,6 +17,41 @@ import (
|
||||
|
||||
const defaultLimit = 300
|
||||
|
||||
func (rt *Router) statistic(c *gin.Context) {
|
||||
name := ginx.QueryStr(c, "name")
|
||||
var model interface{}
|
||||
var err error
|
||||
var statistics *models.Statistics
|
||||
switch name {
|
||||
case "alert_mute":
|
||||
model = models.AlertMute{}
|
||||
case "alert_rule":
|
||||
model = models.AlertRule{}
|
||||
case "alert_subscribe":
|
||||
model = models.AlertSubscribe{}
|
||||
case "busi_group":
|
||||
model = models.BusiGroup{}
|
||||
case "recording_rule":
|
||||
model = models.RecordingRule{}
|
||||
case "target":
|
||||
model = models.Target{}
|
||||
case "user":
|
||||
model = models.User{}
|
||||
case "user_group":
|
||||
model = models.UserGroup{}
|
||||
case "datasource":
|
||||
// datasource update_at is different from others
|
||||
statistics, err = models.DatasourceStatistics(rt.Ctx)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
return
|
||||
default:
|
||||
ginx.Bomb(http.StatusBadRequest, "invalid name")
|
||||
}
|
||||
|
||||
statistics, err = models.StatisticsGet(rt.Ctx, model)
|
||||
ginx.NewRender(c).Data(statistics, err)
|
||||
}
|
||||
|
||||
func queryDatasourceIds(c *gin.Context) []int64 {
|
||||
datasourceIds := ginx.QueryStr(c, "datasource_ids", "")
|
||||
datasourceIds = strings.ReplaceAll(datasourceIds, ",", " ")
|
||||
|
||||
@@ -21,7 +21,7 @@ func (rt *Router) alertMuteGetsByBG(c *gin.Context) {
|
||||
|
||||
func (rt *Router) alertMuteGets(c *gin.Context) {
|
||||
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
|
||||
bgid := ginx.QueryInt64(c, "bgid", 0)
|
||||
bgid := ginx.QueryInt64(c, "bgid", -1)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
lst, err := models.AlertMuteGets(rt.Ctx, prods, bgid, query)
|
||||
|
||||
|
||||
@@ -30,9 +30,12 @@ func (rt *Router) webhookPuts(c *gin.Context) {
|
||||
var webhooks []models.Webhook
|
||||
ginx.BindJSON(c, &webhooks)
|
||||
for i := 0; i < len(webhooks); i++ {
|
||||
for k, v := range webhooks[i].HeaderMap {
|
||||
webhooks[i].Headers = append(webhooks[i].Headers, k)
|
||||
webhooks[i].Headers = append(webhooks[i].Headers, v)
|
||||
webhooks[i].Headers = []string{}
|
||||
if len(webhooks[i].HeaderMap) > 0 {
|
||||
for k, v := range webhooks[i].HeaderMap {
|
||||
webhooks[i].Headers = append(webhooks[i].Headers, k)
|
||||
webhooks[i].Headers = append(webhooks[i].Headers, v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,11 @@ func (rt *Router) recordingRuleGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
func (rt *Router) recordingRuleGetsByService(c *gin.Context) {
|
||||
ars, err := models.RecordingRuleEnabledGets(rt.Ctx)
|
||||
ginx.NewRender(c).Data(ars, err)
|
||||
}
|
||||
|
||||
func (rt *Router) recordingRuleGet(c *gin.Context) {
|
||||
rrid := ginx.UrlParamInt64(c, "rrid")
|
||||
|
||||
|
||||
@@ -83,3 +83,18 @@ func (rt *Router) roleGets(c *gin.Context) {
|
||||
lst, err := models.RoleGetsAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) allPerms(c *gin.Context) {
|
||||
roles, err := models.RoleGetsAll(rt.Ctx)
|
||||
ginx.Dangerous(err)
|
||||
m := make(map[string][]string)
|
||||
for _, r := range roles {
|
||||
lst, err := models.OperationsOfRole(rt.Ctx, strings.Fields(r.Name))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
m[r.Name] = lst
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(m, err)
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -16,3 +18,17 @@ func (rt *Router) serverClustersGet(c *gin.Context) {
|
||||
list, err := models.AlertingEngineGetsClusters(rt.Ctx, "")
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) serverHeartbeat(c *gin.Context) {
|
||||
var req models.HeartbeatInfo
|
||||
ginx.BindJSON(c, &req)
|
||||
err := models.AlertingEngineHeartbeatWithCluster(rt.Ctx, req.Instance, req.EngineCluster, req.DatasourceId)
|
||||
ginx.NewRender(c).Message(err)
|
||||
}
|
||||
|
||||
func (rt *Router) serversActive(c *gin.Context) {
|
||||
datasourceId := ginx.QueryInt64(c, "dsid")
|
||||
|
||||
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
|
||||
ginx.NewRender(c).Data(servers, err)
|
||||
}
|
||||
|
||||
@@ -101,6 +101,11 @@ func (rt *Router) targetGets(c *gin.Context) {
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) targetGetsByService(c *gin.Context) {
|
||||
lst, err := models.TargetGetsAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) targetGetTags(c *gin.Context) {
|
||||
idents := ginx.QueryStr(c, "idents", "")
|
||||
idents = strings.ReplaceAll(idents, ",", " ")
|
||||
|
||||
@@ -120,6 +120,12 @@ func (f *taskForm) HandleFH(fh string) {
|
||||
f.Title = f.Title + " FH: " + fh
|
||||
}
|
||||
|
||||
func (rt *Router) taskRecordAdd(c *gin.Context) {
|
||||
var f *models.TaskRecord
|
||||
ginx.BindJSON(c, &f)
|
||||
ginx.NewRender(c).Message(f.Add(rt.Ctx))
|
||||
}
|
||||
|
||||
func (rt *Router) taskAdd(c *gin.Context) {
|
||||
var f taskForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
@@ -12,19 +12,8 @@ import (
|
||||
)
|
||||
|
||||
func (rt *Router) userFindAll(c *gin.Context) {
|
||||
limit := ginx.QueryInt(c, "limit", 20)
|
||||
query := ginx.QueryStr(c, "query", "")
|
||||
|
||||
total, err := models.UserTotal(rt.Ctx, query)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
list, err := models.UserGets(rt.Ctx, query, limit, ginx.Offset(c, limit))
|
||||
ginx.Dangerous(err)
|
||||
|
||||
ginx.NewRender(c).Data(gin.H{
|
||||
"list": list,
|
||||
"total": total,
|
||||
}, nil)
|
||||
list, err := models.UserGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(list, err)
|
||||
}
|
||||
|
||||
func (rt *Router) userGets(c *gin.Context) {
|
||||
|
||||
@@ -29,6 +29,17 @@ func (rt *Router) userGroupGets(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func (rt *Router) userGroupGetsByService(c *gin.Context) {
|
||||
lst, err := models.UserGroupGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
// user group member get by service
|
||||
func (rt *Router) userGroupMemberGetsByService(c *gin.Context) {
|
||||
members, err := models.UserGroupMemberGetAll(rt.Ctx)
|
||||
ginx.NewRender(c).Data(members, err)
|
||||
}
|
||||
|
||||
type userGroupForm struct {
|
||||
Name string `json:"name" binding:"required"`
|
||||
Note string `json:"note"`
|
||||
|
||||
@@ -18,7 +18,7 @@ func Upgrade(configFile string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), db, false)
|
||||
for _, cluster := range config.Clusters {
|
||||
count, err := models.GetDatasourcesCountBy(ctx, "", "", cluster.Name)
|
||||
if err != nil {
|
||||
|
||||
18
conf/conf.go
18
conf/conf.go
@@ -14,20 +14,28 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type ConfigType struct {
|
||||
Global GlobalConfig
|
||||
Log logx.Config
|
||||
HTTP httpx.Config
|
||||
DB ormx.DBConfig
|
||||
Redis storage.RedisConfig
|
||||
Global GlobalConfig
|
||||
Log logx.Config
|
||||
HTTP httpx.Config
|
||||
DB ormx.DBConfig
|
||||
Redis storage.RedisConfig
|
||||
CenterApi CenterApi
|
||||
|
||||
Pushgw pconf.Pushgw
|
||||
Alert aconf.Alert
|
||||
Center cconf.Center
|
||||
}
|
||||
|
||||
type CenterApi struct {
|
||||
Addrs []string
|
||||
BasicAuth gin.Accounts
|
||||
}
|
||||
|
||||
type GlobalConfig struct {
|
||||
RunMode string
|
||||
}
|
||||
|
||||
@@ -14,39 +14,22 @@ func decryptConfig(config *ConfigType, cryptoKey string) error {
|
||||
|
||||
config.DB.DSN = decryptDsn
|
||||
|
||||
for k := range config.HTTP.Alert.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Alert.BasicAuth[k], cryptoKey)
|
||||
for k := range config.HTTP.APIForService.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.APIForService.BasicAuth[k], cryptoKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
|
||||
}
|
||||
|
||||
config.HTTP.Alert.BasicAuth[k] = decryptPwd
|
||||
config.HTTP.APIForService.BasicAuth[k] = decryptPwd
|
||||
}
|
||||
|
||||
for k := range config.HTTP.Pushgw.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Pushgw.BasicAuth[k], cryptoKey)
|
||||
for k := range config.HTTP.APIForAgent.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.APIForAgent.BasicAuth[k], cryptoKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
|
||||
}
|
||||
|
||||
config.HTTP.Pushgw.BasicAuth[k] = decryptPwd
|
||||
}
|
||||
|
||||
for k := range config.HTTP.Heartbeat.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Heartbeat.BasicAuth[k], cryptoKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
|
||||
}
|
||||
|
||||
config.HTTP.Heartbeat.BasicAuth[k] = decryptPwd
|
||||
}
|
||||
|
||||
for k := range config.HTTP.Service.BasicAuth {
|
||||
decryptPwd, err := secu.DealWithDecrypt(config.HTTP.Service.BasicAuth[k], cryptoKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decrypt http basic auth password: %s", err)
|
||||
}
|
||||
config.HTTP.Service.BasicAuth[k] = decryptPwd
|
||||
config.HTTP.APIForAgent.BasicAuth[k] = decryptPwd
|
||||
}
|
||||
|
||||
for i, v := range config.Pushgw.Writers {
|
||||
|
||||
@@ -4,8 +4,7 @@ FROM python:3-slim
|
||||
WORKDIR /app
|
||||
ADD n9e /app
|
||||
ADD http://download.flashcat.cloud/wait /wait
|
||||
RUN mkdir -p /app/pub && chmod +x /wait
|
||||
ADD pub /app/pub/
|
||||
RUN chmod +x /wait
|
||||
RUN chmod +x n9e
|
||||
|
||||
EXPOSE 17000
|
||||
|
||||
@@ -7,7 +7,6 @@ ADD etc /app/
|
||||
ADD integrations /app/integrations/
|
||||
ADD --chmod=755 https://github.com/ufoscout/docker-compose-wait/releases/download/2.11.0/wait_x86_64 /wait
|
||||
RUN chmod +x /wait
|
||||
ADD pub /app/pub/
|
||||
|
||||
EXPOSE 17000
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ WORKDIR /app
|
||||
ADD n9e /app/
|
||||
ADD etc /app/
|
||||
ADD integrations /app/integrations/
|
||||
ADD pub /app/pub/
|
||||
COPY --chmod=755 --from=toolbox /toolbox/wait_aarch64 /wait
|
||||
|
||||
EXPOSE 17000
|
||||
|
||||
@@ -10,7 +10,6 @@ echo "tag: ${tag}"
|
||||
|
||||
rm -rf n9e pub
|
||||
cp ../n9e .
|
||||
cp -r ../pub .
|
||||
|
||||
docker build -t nightingale:${tag} .
|
||||
|
||||
|
||||
60
etc/alert.toml.example
Normal file
60
etc/alert.toml.example
Normal file
@@ -0,0 +1,60 @@
|
||||
[Global]
|
||||
RunMode = "release"
|
||||
|
||||
[CenterApi]
|
||||
Addrs = ["http://127.0.0.1:17000"]
|
||||
[CenterApi.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[Alert]
|
||||
[Alert.Heartbeat]
|
||||
# auto detect if blank
|
||||
IP = ""
|
||||
# unit ms
|
||||
Interval = 1000
|
||||
EngineName = "default02"
|
||||
|
||||
[Log]
|
||||
# log write dir
|
||||
Dir = "logs"
|
||||
# log level: DEBUG INFO WARNING ERROR
|
||||
Level = "DEBUG"
|
||||
# stdout, stderr, file
|
||||
Output = "stdout"
|
||||
# # rotate by time
|
||||
# KeepHours = 4
|
||||
# # rotate by size
|
||||
# RotateNum = 3
|
||||
# # unit: MB
|
||||
# RotateSize = 256
|
||||
|
||||
[HTTP]
|
||||
# http listening address
|
||||
Host = "0.0.0.0"
|
||||
# http listening port
|
||||
Port = 17001
|
||||
# https cert file path
|
||||
CertFile = ""
|
||||
# https key file path
|
||||
KeyFile = ""
|
||||
# whether print access log
|
||||
PrintAccessLog = false
|
||||
# whether enable pprof
|
||||
PProf = false
|
||||
# expose prometheus /metrics?
|
||||
ExposeMetrics = true
|
||||
# http graceful shutdown timeout, unit: s
|
||||
ShutdownTimeout = 30
|
||||
# max content length: 64M
|
||||
MaxContentLength = 67108864
|
||||
# http server read timeout, unit: s
|
||||
ReadTimeout = 20
|
||||
# http server write timeout, unit: s
|
||||
WriteTimeout = 40
|
||||
# http server idle timeout, unit: s
|
||||
IdleTimeout = 120
|
||||
|
||||
[HTTP.Alert]
|
||||
Enable = true
|
||||
[HTTP.Alert.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
@@ -41,22 +41,12 @@ WriteTimeout = 40
|
||||
# http server idle timeout, unit: s
|
||||
IdleTimeout = 120
|
||||
|
||||
[HTTP.Pushgw]
|
||||
[HTTP.APIForAgent]
|
||||
Enable = true
|
||||
# [HTTP.Pushgw.BasicAuth]
|
||||
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[HTTP.Alert]
|
||||
Enable = true
|
||||
[HTTP.Alert.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[HTTP.Heartbeat]
|
||||
Enable = true
|
||||
# [HTTP.Heartbeat.BasicAuth]
|
||||
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[HTTP.Service]
|
||||
[HTTP.APIForService]
|
||||
Enable = true
|
||||
[HTTP.Service.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
103
etc/pushgw.toml.example
Normal file
103
etc/pushgw.toml.example
Normal file
@@ -0,0 +1,103 @@
|
||||
[Global]
|
||||
RunMode = "release"
|
||||
|
||||
[CenterApi]
|
||||
Addrs = ["http://127.0.0.1:17000"]
|
||||
[CenterApi.BasicAuth]
|
||||
user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
|
||||
[Pushgw]
|
||||
# use target labels in database instead of in series
|
||||
LabelRewrite = true
|
||||
# # default busigroup key name
|
||||
# BusiGroupLabelKey = "busigroup"
|
||||
# ForceUseServerTS = false
|
||||
|
||||
# [Pushgw.DebugSample]
|
||||
# ident = "xx"
|
||||
# __name__ = "xx"
|
||||
|
||||
# [Pushgw.WriterOpt]
|
||||
# # Writer Options
|
||||
# QueueCount = 1000
|
||||
# QueueMaxSize = 1000000
|
||||
# QueuePopSize = 1000
|
||||
# # ident or metric
|
||||
# ShardingKey = "ident"
|
||||
|
||||
[[Pushgw.Writers]]
|
||||
# Url = "http://127.0.0.1:8480/insert/0/prometheus/api/v1/write"
|
||||
Url = "http://127.0.0.1:9090/api/v1/write"
|
||||
# Basic auth username
|
||||
BasicAuthUser = ""
|
||||
# Basic auth password
|
||||
BasicAuthPass = ""
|
||||
# timeout settings, unit: ms
|
||||
Headers = ["X-From", "n9e"]
|
||||
Timeout = 10000
|
||||
DialTimeout = 3000
|
||||
TLSHandshakeTimeout = 30000
|
||||
ExpectContinueTimeout = 1000
|
||||
IdleConnTimeout = 90000
|
||||
# time duration, unit: ms
|
||||
KeepAlive = 30000
|
||||
MaxConnsPerHost = 0
|
||||
MaxIdleConns = 100
|
||||
MaxIdleConnsPerHost = 100
|
||||
## Optional TLS Config
|
||||
# UseTLS = false
|
||||
# TLSCA = "/etc/n9e/ca.pem"
|
||||
# TLSCert = "/etc/n9e/cert.pem"
|
||||
# TLSKey = "/etc/n9e/key.pem"
|
||||
# InsecureSkipVerify = false
|
||||
# [[Writers.WriteRelabels]]
|
||||
# Action = "replace"
|
||||
# SourceLabels = ["__address__"]
|
||||
# Regex = "([^:]+)(?::\\d+)?"
|
||||
# Replacement = "$1:80"
|
||||
# TargetLabel = "__address__"
|
||||
|
||||
[Log]
|
||||
# log write dir
|
||||
Dir = "logs"
|
||||
# log level: DEBUG INFO WARNING ERROR
|
||||
Level = "DEBUG"
|
||||
# stdout, stderr, file
|
||||
Output = "stdout"
|
||||
# # rotate by time
|
||||
# KeepHours = 4
|
||||
# # rotate by size
|
||||
# RotateNum = 3
|
||||
# # unit: MB
|
||||
# RotateSize = 256
|
||||
|
||||
[HTTP]
|
||||
# http listening address
|
||||
Host = "0.0.0.0"
|
||||
# http listening port
|
||||
Port = 17000
|
||||
# https cert file path
|
||||
CertFile = ""
|
||||
# https key file path
|
||||
KeyFile = ""
|
||||
# whether print access log
|
||||
PrintAccessLog = false
|
||||
# whether enable pprof
|
||||
PProf = false
|
||||
# expose prometheus /metrics?
|
||||
ExposeMetrics = true
|
||||
# http graceful shutdown timeout, unit: s
|
||||
ShutdownTimeout = 30
|
||||
# max content length: 64M
|
||||
MaxContentLength = 67108864
|
||||
# http server read timeout, unit: s
|
||||
ReadTimeout = 20
|
||||
# http server write timeout, unit: s
|
||||
WriteTimeout = 40
|
||||
# http server idle timeout, unit: s
|
||||
IdleTimeout = 120
|
||||
|
||||
[HTTP.Pushgw]
|
||||
Enable = true
|
||||
# [HTTP.Pushgw.BasicAuth]
|
||||
# user001 = "ccc26da7b9aba533cbb263a36c07dcc5"
|
||||
7
fe.sh
7
fe.sh
@@ -8,3 +8,10 @@ curl -o n9e-fe-${VERSION}.tar.gz -L https://github.com/n9e/fe/releases/download/
|
||||
tar zxvf n9e-fe-${VERSION}.tar.gz
|
||||
|
||||
cp ./docker/initsql/a-n9e.sql n9e.sql
|
||||
|
||||
# Embed files into a Go executable
|
||||
statik -src=./pub -dest=./front
|
||||
|
||||
# rm the fe file
|
||||
rm n9e-fe-${VERSION}.tar.gz
|
||||
rm -r ./pub
|
||||
14
front/statik/statik.go
Normal file
14
front/statik/statik.go
Normal file
@@ -0,0 +1,14 @@
|
||||
// Code generated by statik. DO NOT EDIT.
|
||||
|
||||
package statik
|
||||
|
||||
import (
|
||||
"github.com/rakyll/statik/fs"
|
||||
)
|
||||
|
||||
|
||||
func init() {
|
||||
data := "PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
|
||||
fs.Register(data)
|
||||
}
|
||||
|
||||
1
go.mod
1
go.mod
@@ -23,6 +23,7 @@ require (
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
github.com/prometheus/common v0.39.0
|
||||
github.com/prometheus/prometheus v2.5.0+incompatible
|
||||
github.com/rakyll/statik v0.1.7
|
||||
github.com/redis/go-redis/v9 v9.0.2
|
||||
github.com/tidwall/gjson v1.14.0
|
||||
github.com/toolkits/pkg v1.3.3
|
||||
|
||||
2
go.sum
2
go.sum
@@ -232,6 +232,8 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5
|
||||
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
|
||||
github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9+10He72rVRJvMXrE9Hg=
|
||||
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
|
||||
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
|
||||
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
|
||||
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
|
||||
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
|
||||
github.com/robfig/go-cache v0.0.0-20130306151617-9fc39e0dbf62/go.mod h1:65XQgovT59RWatovFwnwocoUxiI/eENTnOY5GK3STuY=
|
||||
|
||||
@@ -144,14 +144,17 @@ func (tc *TargetCacheType) syncTargets() error {
|
||||
return errors.WithMessage(err, "failed to call TargetGetsAll")
|
||||
}
|
||||
|
||||
metaMap := tc.GetHostMetas(lst)
|
||||
|
||||
m := make(map[string]*models.Target)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FillTagsMap()
|
||||
if meta, ok := metaMap[lst[i].Ident]; ok {
|
||||
lst[i].FillMeta(meta)
|
||||
if tc.ctx.IsCenter {
|
||||
metaMap := tc.GetHostMetas(lst)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
if meta, ok := metaMap[lst[i].Ident]; ok {
|
||||
lst[i].FillMeta(meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
m[lst[i].Ident] = lst[i]
|
||||
}
|
||||
|
||||
|
||||
@@ -58,6 +58,17 @@ func (uc *UserCacheType) GetByUserId(id int64) *models.User {
|
||||
return uc.users[id]
|
||||
}
|
||||
|
||||
func (uc *UserCacheType) GetByUsername(name string) *models.User {
|
||||
uc.RLock()
|
||||
defer uc.RUnlock()
|
||||
for _, v := range uc.users {
|
||||
if v.Username == name {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (uc *UserCacheType) GetByUserIds(ids []int64) []*models.User {
|
||||
set := make(map[int64]struct{})
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -220,7 +221,7 @@ func (e *AlertCurEvent) ToHis(ctx *ctx.Context) *AlertHisEvent {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
|
||||
func (e *AlertCurEvent) DB2FE() {
|
||||
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
e.CallbacksJSON = strings.Fields(e.Callbacks)
|
||||
@@ -229,6 +230,18 @@ func (e *AlertCurEvent) DB2FE(ctx *ctx.Context) {
|
||||
json.Unmarshal([]byte(e.RuleConfig), &e.RuleConfigJson)
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) FE2DB() {
|
||||
e.NotifyChannels = strings.Join(e.NotifyChannelsJSON, " ")
|
||||
e.NotifyGroups = strings.Join(e.NotifyGroupsJSON, " ")
|
||||
e.Callbacks = strings.Join(e.CallbacksJSON, " ")
|
||||
e.Tags = strings.Join(e.TagsJSON, ",,")
|
||||
b, _ := json.Marshal(e.AnnotationsJSON)
|
||||
e.Annotations = string(b)
|
||||
|
||||
b, _ = json.Marshal(e.RuleConfigJson)
|
||||
e.RuleConfig = string(b)
|
||||
}
|
||||
|
||||
func (e *AlertCurEvent) DB2Mem() {
|
||||
e.IsRecovered = false
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
@@ -356,7 +369,7 @@ func AlertCurEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
|
||||
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,7 +403,7 @@ func AlertCurEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lst[0].DB2FE(ctx)
|
||||
lst[0].DB2FE()
|
||||
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
|
||||
|
||||
return lst[0], nil
|
||||
@@ -435,14 +448,19 @@ func AlertCurEventGetByIds(ctx *ctx.Context, ids []int64) ([]*AlertCurEvent, err
|
||||
err := DB(ctx).Where("id in ?", ids).Order("id desc").Find(&lst).Error
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func AlertCurEventGetByRuleIdAndCluster(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
|
||||
func AlertCurEventGetByRuleIdAndDsId(ctx *ctx.Context, ruleId int64, datasourceId int64) ([]*AlertCurEvent, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertCurEvent](ctx, "/v1/n9e/alert-cur-events-get-by-rid?rid="+strconv.FormatInt(ruleId, 10)+"&dsid="+strconv.FormatInt(datasourceId, 10))
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*AlertCurEvent
|
||||
err := DB(ctx).Where("rule_id=? and datasource_id = ?", ruleId, datasourceId).Find(&lst).Error
|
||||
return lst, err
|
||||
|
||||
@@ -2,6 +2,7 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -61,7 +62,7 @@ func (e *AlertHisEvent) Add(ctx *ctx.Context) error {
|
||||
return Insert(ctx, e)
|
||||
}
|
||||
|
||||
func (e *AlertHisEvent) DB2FE(ctx *ctx.Context) {
|
||||
func (e *AlertHisEvent) DB2FE() {
|
||||
e.NotifyChannelsJSON = strings.Fields(e.NotifyChannels)
|
||||
e.NotifyGroupsJSON = strings.Fields(e.NotifyGroups)
|
||||
e.CallbacksJSON = strings.Fields(e.Callbacks)
|
||||
@@ -182,7 +183,7 @@ func AlertHisEventGets(ctx *ctx.Context, prods []string, bgid, stime, etime int6
|
||||
|
||||
if err == nil {
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,7 +201,7 @@ func AlertHisEventGet(ctx *ctx.Context, where string, args ...interface{}) (*Ale
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lst[0].DB2FE(ctx)
|
||||
lst[0].DB2FE()
|
||||
lst[0].FillNotifyGroups(ctx, make(map[int64]*UserGroup))
|
||||
|
||||
return lst[0], nil
|
||||
@@ -259,3 +260,53 @@ func AlertHisEventUpgradeToV6(ctx *ctx.Context, dsm map[string]Datasource) error
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func EventPersist(ctx *ctx.Context, event *AlertCurEvent) error {
|
||||
has, err := AlertCurEventExists(ctx, "hash=?", event.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("event_persist_check_exists_fail: %v rule_id=%d hash=%s", err, event.RuleId, event.Hash)
|
||||
}
|
||||
|
||||
his := event.ToHis(ctx)
|
||||
|
||||
// 不管是告警还是恢复,全量告警里都要记录
|
||||
if err := his.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add his event error:%v", err)
|
||||
}
|
||||
|
||||
if has {
|
||||
// 活跃告警表中有记录,删之
|
||||
err = AlertCurEventDelByHash(ctx, event.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("event_del_cur_fail: %v hash=%s", err, event.Hash)
|
||||
}
|
||||
|
||||
if !event.IsRecovered {
|
||||
// 恢复事件,从活跃告警列表彻底删掉,告警事件,要重新加进来新的event
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add cur event err:%v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
if event.IsRecovered {
|
||||
// alert_cur_event表里没有数据,表示之前没告警,结果现在报了恢复,神奇....理论上不应该出现的
|
||||
return nil
|
||||
}
|
||||
|
||||
// use his id as cur id
|
||||
event.Id = his.Id
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(ctx); err != nil {
|
||||
return fmt.Errorf("add cur event error:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -78,7 +79,15 @@ func AlertMuteGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertMu
|
||||
}
|
||||
|
||||
func AlertMuteGets(ctx *ctx.Context, prods []string, bgid int64, query string) (lst []AlertMute, err error) {
|
||||
session := DB(ctx).Where("group_id = ? and prod in (?)", bgid, prods)
|
||||
session := DB(ctx)
|
||||
|
||||
if bgid != -1 {
|
||||
session = session.Where("group_id = ?", bgid)
|
||||
}
|
||||
|
||||
if len(prods) > 0 {
|
||||
session = session.Where("prod in (?)", prods)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
@@ -220,6 +229,12 @@ func AlertMuteDel(ctx *ctx.Context, ids []int64) error {
|
||||
}
|
||||
|
||||
func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
var stats []*Statistics
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_mute")
|
||||
return s, err
|
||||
}
|
||||
|
||||
// clean expired first
|
||||
buf := int64(30)
|
||||
err := DB(ctx).Where("etime < ? and mute_time_type = 0", time.Now().Unix()-buf).Delete(new(AlertMute)).Error
|
||||
@@ -229,7 +244,6 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
session := DB(ctx).Model(&AlertMute{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
err = session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -240,9 +254,20 @@ func AlertMuteStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
|
||||
func AlertMuteGetsAll(ctx *ctx.Context) ([]*AlertMute, error) {
|
||||
// get my cluster's mutes
|
||||
var lst []*AlertMute
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertMute](ctx, "/v1/n9e/alert-mutes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertMute{})
|
||||
|
||||
var lst []*AlertMute
|
||||
err := session.Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -113,16 +114,17 @@ type Trigger struct {
|
||||
Severity int `json:"severity"`
|
||||
}
|
||||
|
||||
func GetHostsQuery(queries []HostQuery) map[string]interface{} {
|
||||
var query = make(map[string]interface{})
|
||||
func GetHostsQuery(queries []HostQuery) []map[string]interface{} {
|
||||
var query []map[string]interface{}
|
||||
for _, q := range queries {
|
||||
m := make(map[string]interface{})
|
||||
switch q.Key {
|
||||
case "group_ids":
|
||||
ids := ParseInt64(q.Values)
|
||||
if q.Op == "==" {
|
||||
query["group_id in (?)"] = ids
|
||||
m["group_id in (?)"] = ids
|
||||
} else {
|
||||
query["group_id not in (?)"] = ids
|
||||
m["group_id not in (?)"] = ids
|
||||
}
|
||||
case "tags":
|
||||
lst := []string{}
|
||||
@@ -133,12 +135,16 @@ func GetHostsQuery(queries []HostQuery) map[string]interface{} {
|
||||
lst = append(lst, v.(string))
|
||||
}
|
||||
if q.Op == "==" {
|
||||
blank := " "
|
||||
for _, tag := range lst {
|
||||
query["tags like ?"] = "%" + tag + "%"
|
||||
m["tags like ?"+blank] = "%" + tag + "%"
|
||||
blank += " "
|
||||
}
|
||||
} else {
|
||||
blank := " "
|
||||
for _, tag := range lst {
|
||||
query["tags not like ?"] = "%" + tag + "%"
|
||||
m["tags not like ?"+blank] = "%" + tag + "%"
|
||||
blank += " "
|
||||
}
|
||||
}
|
||||
case "hosts":
|
||||
@@ -150,11 +156,12 @@ func GetHostsQuery(queries []HostQuery) map[string]interface{} {
|
||||
lst = append(lst, v.(string))
|
||||
}
|
||||
if q.Op == "==" {
|
||||
query["ident in (?)"] = lst
|
||||
m["ident in (?)"] = lst
|
||||
} else {
|
||||
query["ident not in (?)"] = lst
|
||||
m["ident not in (?)"] = lst
|
||||
}
|
||||
}
|
||||
query = append(query, m)
|
||||
}
|
||||
return query
|
||||
}
|
||||
@@ -643,6 +650,17 @@ func AlertRuleGets(ctx *ctx.Context, groupId int64) ([]AlertRule, error) {
|
||||
}
|
||||
|
||||
func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertRule](ctx, "/v1/n9e/alert-rules?disabled=0")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Where("disabled = ?", 0)
|
||||
|
||||
var lst []*AlertRule
|
||||
@@ -662,7 +680,11 @@ func AlertRuleGetsAll(ctx *ctx.Context) ([]*AlertRule, error) {
|
||||
}
|
||||
|
||||
func AlertRulesGetsBy(ctx *ctx.Context, prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) {
|
||||
session := DB(ctx).Where("prod in (?)", prods)
|
||||
session := DB(ctx)
|
||||
|
||||
if len(prods) > 0 {
|
||||
session = session.Where("prod in (?)", prods)
|
||||
}
|
||||
|
||||
if query != "" {
|
||||
arr := strings.Fields(query)
|
||||
@@ -734,6 +756,11 @@ func AlertRuleGetName(ctx *ctx.Context, id int64) (string, error) {
|
||||
}
|
||||
|
||||
func AlertRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_rule")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertRule{}).Select("count(*) as total", "max(update_at) as last_updated").Where("disabled = ?", 0)
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -52,6 +53,18 @@ func AlertSubscribeGets(ctx *ctx.Context, groupId int64) (lst []AlertSubscribe,
|
||||
return
|
||||
}
|
||||
|
||||
func AlertSubscribeGetsByService(ctx *ctx.Context) (lst []AlertSubscribe, err error) {
|
||||
err = DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for i := range lst {
|
||||
lst[i].DB2FE()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func AlertSubscribeGet(ctx *ctx.Context, where string, args ...interface{}) (*AlertSubscribe, error) {
|
||||
var lst []*AlertSubscribe
|
||||
err := DB(ctx).Where(where, args...).Find(&lst).Error
|
||||
@@ -262,6 +275,11 @@ func AlertSubscribeDel(ctx *ctx.Context, ids []int64) error {
|
||||
}
|
||||
|
||||
func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=alert_subscribe")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&AlertSubscribe{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
@@ -274,6 +292,17 @@ func AlertSubscribeStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
}
|
||||
|
||||
func AlertSubscribeGetsAll(ctx *ctx.Context) ([]*AlertSubscribe, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*AlertSubscribe](ctx, "/v1/n9e/alert-subscribes")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
// get my cluster's subscribes
|
||||
session := DB(ctx).Model(&AlertSubscribe{})
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type AlertingEngines struct {
|
||||
@@ -128,7 +129,23 @@ func AlertingEngineGetsInstances(ctx *ctx.Context, where string, args ...interfa
|
||||
return arr, err
|
||||
}
|
||||
|
||||
type HeartbeatInfo struct {
|
||||
Instance string `json:"instance"`
|
||||
EngineCluster string `json:"engine_cluster"`
|
||||
DatasourceId int64 `json:"datasource_id"`
|
||||
}
|
||||
|
||||
func AlertingEngineHeartbeatWithCluster(ctx *ctx.Context, instance, cluster string, datasourceId int64) error {
|
||||
if !ctx.IsCenter {
|
||||
info := HeartbeatInfo{
|
||||
Instance: instance,
|
||||
EngineCluster: cluster,
|
||||
DatasourceId: datasourceId,
|
||||
}
|
||||
err := poster.PostByUrls(ctx, "/v1/n9e/server-heartbeat", info)
|
||||
return err
|
||||
}
|
||||
|
||||
var total int64
|
||||
err := DB(ctx).Model(new(AlertingEngines)).Where("instance=? and engine_cluster = ? and datasource_id=?", instance, cluster, datasourceId).Count(&total).Error
|
||||
if err != nil {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
@@ -64,9 +65,17 @@ func (bg *BusiGroup) FillUserGroups(ctx *ctx.Context) error {
|
||||
|
||||
func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var err error
|
||||
if !ctx.IsCenter {
|
||||
lst, err = poster.GetByUrls[[]*BusiGroup](ctx, "/v1/n9e/busi-groups")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
err = DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[int64]*BusiGroup)
|
||||
@@ -77,6 +86,12 @@ func BusiGroupGetMap(ctx *ctx.Context) (map[int64]*BusiGroup, error) {
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
func BusiGroupGetAll(ctx *ctx.Context) ([]*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func BusiGroupGet(ctx *ctx.Context, where string, args ...interface{}) (*BusiGroup, error) {
|
||||
var lst []*BusiGroup
|
||||
err := DB(ctx).Where(where, args...).Find(&lst).Error
|
||||
@@ -324,6 +339,11 @@ func BusiGroupAdd(ctx *ctx.Context, name string, labelEnable int, labelValue str
|
||||
}
|
||||
|
||||
func BusiGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=busi_group")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&BusiGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -46,6 +46,18 @@ type Statistics struct {
|
||||
LastUpdated int64 `gorm:"last_updated"`
|
||||
}
|
||||
|
||||
func StatisticsGet[T any](ctx *ctx.Context, model T) (*Statistics, error) {
|
||||
var stats []*Statistics
|
||||
session := DB(ctx).Model(model).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
err := session.Find(&stats).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stats[0], nil
|
||||
}
|
||||
|
||||
func MatchDatasource(ids []int64, id int64) bool {
|
||||
if id == DatasourceIdAll {
|
||||
return true
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/runner"
|
||||
@@ -43,6 +44,13 @@ func InitSalt(ctx *ctx.Context) {
|
||||
}
|
||||
|
||||
func ConfigsGet(ctx *ctx.Context, ckey string) (string, error) {
|
||||
if !ctx.IsCenter {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[string](ctx, "/v1/n9e/config?key="+ckey)
|
||||
return s, err
|
||||
}
|
||||
}
|
||||
|
||||
var lst []string
|
||||
err := DB(ctx).Model(&Configs{}).Where("ckey=?", ckey).Pluck("cval", &lst).Error
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
@@ -112,6 +113,17 @@ func (ds *Datasource) Get(ctx *ctx.Context) error {
|
||||
}
|
||||
|
||||
func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]Datasource](ctx, "/v1/n9e/datasources")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
var dss []Datasource
|
||||
err := DB(ctx).Find(&dss).Error
|
||||
|
||||
@@ -123,6 +135,11 @@ func GetDatasources(ctx *ctx.Context) ([]Datasource, error) {
|
||||
}
|
||||
|
||||
func GetDatasourceIdsByEngineName(ctx *ctx.Context, engineName string) ([]int64, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]int64](ctx, "/v1/n9e/datasource-ids?name="+engineName)
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var dss []Datasource
|
||||
var ids []int64
|
||||
err := DB(ctx).Where("cluster_name = ?", engineName).Find(&dss).Error
|
||||
@@ -267,19 +284,32 @@ func (ds *Datasource) DB2FE() error {
|
||||
|
||||
func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
|
||||
var lst []*Datasource
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
var err error
|
||||
if !ctx.IsCenter {
|
||||
lst, err = poster.GetByUrls[[]*Datasource](ctx, "/v1/n9e/datasources")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
} else {
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
err := lst[i].DB2FE()
|
||||
if err != nil {
|
||||
logger.Warningf("get ds:%+v err:%v", lst[i], err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret := make(map[int64]*Datasource)
|
||||
for i := 0; i < len(lst); i++ {
|
||||
err := lst[i].DB2FE()
|
||||
if err != nil {
|
||||
logger.Warningf("get ds:%+v err:%v", lst[i], err)
|
||||
continue
|
||||
}
|
||||
|
||||
ret[lst[i].Id] = lst[i]
|
||||
}
|
||||
|
||||
@@ -287,6 +317,11 @@ func DatasourceGetMap(ctx *ctx.Context) (map[int64]*Datasource, error) {
|
||||
}
|
||||
|
||||
func DatasourceStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=datasource")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&Datasource{}).Select("count(*) as total", "max(updated_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -16,6 +16,7 @@ type Webhook struct {
|
||||
HeaderMap map[string]string `json:"headers"`
|
||||
Headers []string `json:"headers_str"`
|
||||
SkipVerify bool `json:"skip_verify"`
|
||||
Note string `json:"note"`
|
||||
}
|
||||
|
||||
type NotifyScript struct {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -59,6 +60,11 @@ func NotifyTplCountByChannel(c *ctx.Context, channel string) (int64, error) {
|
||||
}
|
||||
|
||||
func NotifyTplGets(c *ctx.Context) ([]*NotifyTpl, error) {
|
||||
if !c.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*NotifyTpl](c, "/v1/n9e/notify-tpls")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*NotifyTpl
|
||||
err := DB(c).Find(&lst).Error
|
||||
return lst, err
|
||||
@@ -88,6 +94,10 @@ func ListTpls(c *ctx.Context) (map[string]*template.Template, error) {
|
||||
}
|
||||
|
||||
func InitNotifyConfig(c *ctx.Context, tplDir string) {
|
||||
if !c.IsCenter {
|
||||
return
|
||||
}
|
||||
|
||||
// init notify channel
|
||||
cval, err := ConfigsGet(c, NOTIFYCHANNEL)
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -197,7 +199,33 @@ func RecordingRuleGetById(ctx *ctx.Context, id int64) (*RecordingRule, error) {
|
||||
return RecordingRuleGet(ctx, "id=?", id)
|
||||
}
|
||||
|
||||
func RecordingRuleEnabledGets(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
session := DB(ctx)
|
||||
|
||||
var lst []*RecordingRule
|
||||
err := session.Where("disabled = ?", 0).Find(&lst).Error
|
||||
if err != nil {
|
||||
return lst, err
|
||||
}
|
||||
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].DB2FE(ctx)
|
||||
}
|
||||
return lst, nil
|
||||
}
|
||||
|
||||
func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*RecordingRule](ctx, "/v1/n9e/recording-rules")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FE2DB()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Where("disabled = ?", 0)
|
||||
|
||||
var lst []*RecordingRule
|
||||
@@ -217,6 +245,11 @@ func RecordingRuleGetsByCluster(ctx *ctx.Context) ([]*RecordingRule, error) {
|
||||
}
|
||||
|
||||
func RecordingRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=recording_rule")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&RecordingRule{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"gorm.io/gorm"
|
||||
@@ -19,7 +20,7 @@ type Target struct {
|
||||
Note string `json:"note"`
|
||||
Tags string `json:"-"`
|
||||
TagsJSON []string `json:"tags" gorm:"-"`
|
||||
TagsMap map[string]string `json:"-" gorm:"-"` // internal use, append tags to series
|
||||
TagsMap map[string]string `json:"tags_maps" gorm:"-"` // internal use, append tags to series
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
|
||||
UnixTime int64 `json:"unixtime" gorm:"-"`
|
||||
@@ -59,6 +60,11 @@ func (t *Target) FillGroup(ctx *ctx.Context, cache map[int64]*BusiGroup) error {
|
||||
}
|
||||
|
||||
func TargetStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=target")
|
||||
return s, err
|
||||
}
|
||||
|
||||
var stats []*Statistics
|
||||
err := DB(ctx).Model(&Target{}).Select("count(*) as total", "max(update_at) as last_updated").Find(&stats).Error
|
||||
if err != nil {
|
||||
@@ -117,7 +123,7 @@ func TargetGets(ctx *ctx.Context, bgid int64, dsIds []int64, query string, limit
|
||||
}
|
||||
|
||||
// 根据 groupids, tags, hosts 查询 targets
|
||||
func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, offset int) ([]*Target, error) {
|
||||
func TargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) ([]*Target, error) {
|
||||
var lst []*Target
|
||||
session := TargetFilterQueryBuild(ctx, query, limit, offset)
|
||||
err := session.Order("ident").Find(&lst).Error
|
||||
@@ -130,12 +136,12 @@ func TargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, limit, o
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func TargetCountByFilter(ctx *ctx.Context, query map[string]interface{}) (int64, error) {
|
||||
func TargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}) (int64, error) {
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func MissTargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, ts int64) ([]*Target, error) {
|
||||
func MissTargetGetsByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) ([]*Target, error) {
|
||||
var lst []*Target
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
@@ -144,16 +150,20 @@ func MissTargetGetsByFilter(ctx *ctx.Context, query map[string]interface{}, ts i
|
||||
return lst, err
|
||||
}
|
||||
|
||||
func MissTargetCountByFilter(ctx *ctx.Context, query map[string]interface{}, ts int64) (int64, error) {
|
||||
func MissTargetCountByFilter(ctx *ctx.Context, query []map[string]interface{}, ts int64) (int64, error) {
|
||||
session := TargetFilterQueryBuild(ctx, query, 0, 0)
|
||||
session = session.Where("update_at < ?", ts)
|
||||
return Count(session)
|
||||
}
|
||||
|
||||
func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limit, offset int) *gorm.DB {
|
||||
func TargetFilterQueryBuild(ctx *ctx.Context, query []map[string]interface{}, limit, offset int) *gorm.DB {
|
||||
session := DB(ctx).Model(&Target{})
|
||||
for k, v := range query {
|
||||
session = session.Where(k, v)
|
||||
for _, q := range query {
|
||||
tx := DB(ctx).Model(&Target{})
|
||||
for k, v := range q {
|
||||
tx = tx.Or(k, v)
|
||||
}
|
||||
session = session.Where(tx)
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
@@ -164,8 +174,16 @@ func TargetFilterQueryBuild(ctx *ctx.Context, query map[string]interface{}, limi
|
||||
}
|
||||
|
||||
func TargetGetsAll(ctx *ctx.Context) ([]*Target, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*Target](ctx, "/v1/n9e/targets")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*Target
|
||||
err := DB(ctx).Model(&Target{}).Find(&lst).Error
|
||||
for i := 0; i < len(lst); i++ {
|
||||
lst[i].FillTagsMap()
|
||||
}
|
||||
return lst, err
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import "github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type TaskRecord struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
@@ -27,6 +30,11 @@ func (r *TaskRecord) TableName() string {
|
||||
|
||||
// create task
|
||||
func (r *TaskRecord) Add(ctx *ctx.Context) error {
|
||||
if !ctx.IsCenter {
|
||||
err := poster.PostByUrls(ctx, "/v1/n9e/task-record-add", r)
|
||||
return err
|
||||
}
|
||||
|
||||
return Insert(ctx, r)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ldapx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ormx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tidwall/gjson"
|
||||
@@ -347,12 +348,18 @@ func UserGets(ctx *ctx.Context, query string, limit, offset int) ([]User, error)
|
||||
for i := 0; i < len(users); i++ {
|
||||
users[i].RolesLst = strings.Fields(users[i].Roles)
|
||||
users[i].Admin = users[i].IsAdmin()
|
||||
users[i].Password = ""
|
||||
}
|
||||
|
||||
return users, nil
|
||||
}
|
||||
|
||||
func UserGetAll(ctx *ctx.Context) ([]*User, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*User](ctx, "/v1/n9e/users")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*User
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
if err == nil {
|
||||
@@ -429,6 +436,11 @@ func (u *User) CheckPerm(ctx *ctx.Context, operation string) (bool, error) {
|
||||
}
|
||||
|
||||
func UserStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&User{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/str"
|
||||
"gorm.io/gorm"
|
||||
@@ -111,6 +113,11 @@ func UserGroupGetByIds(ctx *ctx.Context, ids []int64) ([]UserGroup, error) {
|
||||
}
|
||||
|
||||
func UserGroupGetAll(ctx *ctx.Context) ([]*UserGroup, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*UserGroup](ctx, "/v1/n9e/users")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*UserGroup
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
@@ -139,6 +146,11 @@ func (ug *UserGroup) DelMembers(ctx *ctx.Context, userIds []int64) error {
|
||||
}
|
||||
|
||||
func UserGroupStatistics(ctx *ctx.Context) (*Statistics, error) {
|
||||
if !ctx.IsCenter {
|
||||
s, err := poster.GetByUrls[*Statistics](ctx, "/v1/n9e/statistic?name=user_group")
|
||||
return s, err
|
||||
}
|
||||
|
||||
session := DB(ctx).Model(&UserGroup{}).Select("count(*) as total", "max(update_at) as last_updated")
|
||||
|
||||
var stats []*Statistics
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import "github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
)
|
||||
|
||||
type UserGroupMember struct {
|
||||
GroupId int64
|
||||
@@ -54,8 +57,13 @@ func UserGroupMemberDel(ctx *ctx.Context, groupId int64, userIds []int64) error
|
||||
return DB(ctx).Where("group_id = ? and user_id in ?", groupId, userIds).Delete(&UserGroupMember{}).Error
|
||||
}
|
||||
|
||||
func UserGroupMemberGetAll(ctx *ctx.Context) ([]UserGroupMember, error) {
|
||||
var lst []UserGroupMember
|
||||
func UserGroupMemberGetAll(ctx *ctx.Context) ([]*UserGroupMember, error) {
|
||||
if !ctx.IsCenter {
|
||||
lst, err := poster.GetByUrls[[]*UserGroupMember](ctx, "/v1/n9e/user-group-members")
|
||||
return lst, err
|
||||
}
|
||||
|
||||
var lst []*UserGroupMember
|
||||
err := DB(ctx).Find(&lst).Error
|
||||
return lst, err
|
||||
}
|
||||
|
||||
@@ -3,18 +3,29 @@ package ctx
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/conf"
|
||||
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Context struct {
|
||||
DB *gorm.DB
|
||||
Ctx context.Context
|
||||
DB *gorm.DB
|
||||
CenterApi conf.CenterApi
|
||||
Ctx context.Context
|
||||
IsCenter bool
|
||||
}
|
||||
|
||||
func NewContext(ctx context.Context, db *gorm.DB) *Context {
|
||||
func NewContext(ctx context.Context, db *gorm.DB, isCenter bool, centerApis ...conf.CenterApi) *Context {
|
||||
var api conf.CenterApi
|
||||
if len(centerApis) > 0 {
|
||||
api = centerApis[0]
|
||||
}
|
||||
|
||||
return &Context{
|
||||
Ctx: ctx,
|
||||
DB: db,
|
||||
Ctx: ctx,
|
||||
DB: db,
|
||||
CenterApi: api,
|
||||
IsCenter: isCenter,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,28 +31,11 @@ type Config struct {
|
||||
IdleTimeout int
|
||||
JWTAuth JWTAuth
|
||||
ProxyAuth ProxyAuth
|
||||
Alert Alert
|
||||
Pushgw Pushgw
|
||||
Heartbeat Heartbeat
|
||||
Service Service
|
||||
APIForAgent BasicAuths
|
||||
APIForService BasicAuths
|
||||
}
|
||||
|
||||
type Alert struct {
|
||||
BasicAuth gin.Accounts
|
||||
Enable bool
|
||||
}
|
||||
|
||||
type Pushgw struct {
|
||||
BasicAuth gin.Accounts
|
||||
Enable bool
|
||||
}
|
||||
|
||||
type Heartbeat struct {
|
||||
BasicAuth gin.Accounts
|
||||
Enable bool
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
type BasicAuths struct {
|
||||
BasicAuth gin.Accounts
|
||||
Enable bool
|
||||
}
|
||||
|
||||
@@ -2,14 +2,160 @@ package poster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/net/httplib"
|
||||
)
|
||||
|
||||
type DataResponse[T any] struct {
|
||||
Dat T `json:"dat"`
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func GetByUrls[T any](ctx *ctx.Context, path string) (T, error) {
|
||||
var err error
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
dat, e := GetByUrl[T](url, ctx.CenterApi.BasicAuth)
|
||||
if e != nil {
|
||||
err = e
|
||||
logger.Warningf("failed to get data from center, url: %s, err: %v", url, err)
|
||||
continue
|
||||
}
|
||||
return dat, nil
|
||||
}
|
||||
|
||||
var dat T
|
||||
return dat, err
|
||||
}
|
||||
|
||||
func GetByUrl[T any](url string, basicAuth gin.Accounts) (T, error) {
|
||||
var dat T
|
||||
req := httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond)
|
||||
|
||||
if len(basicAuth) > 0 {
|
||||
var token string
|
||||
for username, password := range basicAuth {
|
||||
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
|
||||
}
|
||||
|
||||
if len(token) > 0 {
|
||||
req = req.Header("Authorization", "Basic "+token)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := req.Response()
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to fetch from url: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return dat, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
var dataResp DataResponse[T]
|
||||
err = json.Unmarshal(body, &dataResp)
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
if dataResp.Err != "" {
|
||||
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
|
||||
return dataResp.Dat, nil
|
||||
}
|
||||
|
||||
type PostResponse struct {
|
||||
Err string `json:"err"`
|
||||
}
|
||||
|
||||
func PostByUrls(ctx *ctx.Context, path string, v interface{}) (err error) {
|
||||
addrs := ctx.CenterApi.Addrs
|
||||
|
||||
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
|
||||
for _, addr := range addrs {
|
||||
url := fmt.Sprintf("%s%s", addr, path)
|
||||
err = PostByUrl(url, ctx.CenterApi.BasicAuth, v)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func PostByUrl(url string, basicAuth gin.Accounts, v interface{}) (err error) {
|
||||
var bs []byte
|
||||
bs, err = json.Marshal(v)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
bf := bytes.NewBuffer(bs)
|
||||
client := http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", url, bf)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
if len(basicAuth) > 0 {
|
||||
var token string
|
||||
for username, password := range basicAuth {
|
||||
token = base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
|
||||
}
|
||||
if len(token) > 0 {
|
||||
req.Header.Set("Authorization", "Basic "+token)
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch from url: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
var dataResp PostResponse
|
||||
err = json.Unmarshal(body, &dataResp)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode response: %w body:%s", err, string(body))
|
||||
}
|
||||
|
||||
if dataResp.Err != "" {
|
||||
return fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {
|
||||
var bs []byte
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ func (w WriterType) Write(items []*prompb.TimeSeries, headers ...map[string]stri
|
||||
}
|
||||
|
||||
if err := w.Post(snappy.Encode(nil, data), headers...); err != nil {
|
||||
logger.Warningf("post to %s got error: %v", w.Opts.Url, err)
|
||||
logger.Warningf("%v post to %s got error: %v", w.Opts, w.Opts.Url, err)
|
||||
logger.Debug("example timeseries:", items[0].String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ func (po *PromOption) Equal(target PromOption) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if po.WriteAddr != target.WriteAddr {
|
||||
return false
|
||||
}
|
||||
|
||||
if po.Timeout != target.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/alert/aconf"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
"github.com/ccfos/nightingale/v6/pkg/prom"
|
||||
|
||||
"github.com/prometheus/client_golang/api"
|
||||
@@ -42,11 +43,25 @@ type PromSetting struct {
|
||||
}
|
||||
|
||||
func (pc *PromClientMap) loadFromDatabase() {
|
||||
datasources, err := models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
var datasources []*models.Datasource
|
||||
var err error
|
||||
if !pc.ctx.IsCenter {
|
||||
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.PROMETHEUS)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
}
|
||||
for i := 0; i < len(datasources); i++ {
|
||||
datasources[i].FE2DB()
|
||||
}
|
||||
} else {
|
||||
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
|
||||
if err != nil {
|
||||
logger.Errorf("failed to get datasources, error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
newCluster := make(map[int64]struct{})
|
||||
for _, ds := range datasources {
|
||||
dsId := ds.Id
|
||||
|
||||
@@ -4,21 +4,23 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/poster"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/slice"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type Set struct {
|
||||
sync.Mutex
|
||||
items map[string]struct{}
|
||||
db *gorm.DB
|
||||
ctx *ctx.Context
|
||||
}
|
||||
|
||||
func New(db *gorm.DB) *Set {
|
||||
func New(ctx *ctx.Context) *Set {
|
||||
set := &Set{
|
||||
items: make(map[string]struct{}),
|
||||
db: db,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
set.Init()
|
||||
@@ -68,7 +70,7 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
|
||||
lst = append(lst, ident)
|
||||
num++
|
||||
if num == 100 {
|
||||
if err := s.updateTargets(lst, now); err != nil {
|
||||
if err := s.UpdateTargets(lst, now); err != nil {
|
||||
logger.Errorf("failed to update targets: %v", err)
|
||||
}
|
||||
lst = lst[:0]
|
||||
@@ -76,18 +78,32 @@ func (s *Set) updateTimestamp(items map[string]struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.updateTargets(lst, now); err != nil {
|
||||
if err := s.UpdateTargets(lst, now); err != nil {
|
||||
logger.Errorf("failed to update targets: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Set) updateTargets(lst []string, now int64) error {
|
||||
type TargetUpdate struct {
|
||||
Lst []string `json:"lst"`
|
||||
Now int64 `json:"now"`
|
||||
}
|
||||
|
||||
func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
if !s.ctx.IsCenter {
|
||||
t := TargetUpdate{
|
||||
Lst: lst,
|
||||
Now: now,
|
||||
}
|
||||
err := poster.PostByUrls(s.ctx, "/v1/n9e/target-update", t)
|
||||
return err
|
||||
}
|
||||
|
||||
count := int64(len(lst))
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ret := s.db.Table("target").Where("ident in ?", lst).Update("update_at", now)
|
||||
ret := s.ctx.DB.Table("target").Where("ident in ?", lst).Update("update_at", now)
|
||||
if ret.Error != nil {
|
||||
return ret.Error
|
||||
}
|
||||
@@ -98,14 +114,14 @@ func (s *Set) updateTargets(lst []string, now int64) error {
|
||||
|
||||
// there are some idents not found in db, so insert them
|
||||
var exists []string
|
||||
err := s.db.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
err := s.ctx.DB.Table("target").Where("ident in ?", lst).Pluck("ident", &exists).Error
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
news := slice.SubString(lst, exists)
|
||||
for i := 0; i < len(news); i++ {
|
||||
err = s.db.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
|
||||
err = s.ctx.DB.Exec("INSERT INTO target(ident, update_at) VALUES(?, ?)", news[i], now).Error
|
||||
if err != nil {
|
||||
logger.Error("failed to insert target:", news[i], "error:", err)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/router"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/writer"
|
||||
"github.com/ccfos/nightingale/v6/storage"
|
||||
)
|
||||
|
||||
type PushgwProvider struct {
|
||||
@@ -31,13 +30,9 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := storage.New(config.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ctx := ctx.NewContext(context.Background(), db)
|
||||
ctx := ctx.NewContext(context.Background(), nil, false, config.CenterApi)
|
||||
|
||||
idents := idents.New(db)
|
||||
idents := idents.New(ctx)
|
||||
|
||||
stats := memsto.NewSyncStats()
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ func New(httpConfig httpx.Config, pushgw pconf.Pushgw, tc *memsto.TargetCacheTyp
|
||||
}
|
||||
|
||||
func (rt *Router) Config(r *gin.Engine) {
|
||||
if !rt.HTTP.Pushgw.Enable {
|
||||
if !rt.HTTP.APIForAgent.Enable {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -53,16 +53,18 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
r.POST("/datadog/api/v1/metadata", datadogMetadata)
|
||||
r.POST("/datadog/intake/", datadogIntake)
|
||||
|
||||
if len(rt.HTTP.Pushgw.BasicAuth) > 0 {
|
||||
if len(rt.HTTP.APIForAgent.BasicAuth) > 0 {
|
||||
// enable basic auth
|
||||
auth := gin.BasicAuth(rt.HTTP.Pushgw.BasicAuth)
|
||||
auth := gin.BasicAuth(rt.HTTP.APIForAgent.BasicAuth)
|
||||
r.POST("/opentsdb/put", auth, rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", auth, rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", auth, rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", auth, rt.targetUpdate)
|
||||
} else {
|
||||
// no need basic auth
|
||||
r.POST("/opentsdb/put", rt.openTSDBPut)
|
||||
r.POST("/openfalcon/push", rt.falconPush)
|
||||
r.POST("/prometheus/v1/write", rt.remoteWrite)
|
||||
r.POST("/v1/n9e/target-update", rt.targetUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,9 +186,9 @@ func (r *Router) datadogSeries(c *gin.Context) {
|
||||
apiKey = ""
|
||||
}
|
||||
|
||||
if len(r.HTTP.Pushgw.BasicAuth) > 0 {
|
||||
if len(r.HTTP.APIForAgent.BasicAuth) > 0 {
|
||||
ok := false
|
||||
for _, v := range r.HTTP.Pushgw.BasicAuth {
|
||||
for _, v := range r.HTTP.APIForAgent.BasicAuth {
|
||||
if apiKey == v {
|
||||
ok = true
|
||||
break
|
||||
|
||||
14
pushgw/router/router_target.go
Normal file
14
pushgw/router/router_target.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"github.com/ccfos/nightingale/v6/pushgw/idents"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
func (rt *Router) targetUpdate(c *gin.Context) {
|
||||
var f idents.TargetUpdate
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
ginx.NewRender(c).Message(rt.IdentSet.UpdateTargets(f.Lst, f.Now))
|
||||
}
|
||||
Reference in New Issue
Block a user