mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-05 07:29:03 +00:00
Compare commits
2 Commits
optimize-c
...
feat-es-su
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0916922abe | ||
|
|
5ba5096da2 |
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
@@ -135,7 +136,8 @@ func EventMuteStrategy(event *models.AlertCurEvent, alertMuteCache *memsto.Alert
|
||||
}
|
||||
|
||||
for i := 0; i < len(mutes); i++ {
|
||||
if MatchMute(event, mutes[i]) {
|
||||
matched, _ := MatchMute(event, mutes[i])
|
||||
if matched {
|
||||
return true, mutes[i].Id
|
||||
}
|
||||
}
|
||||
@@ -144,9 +146,9 @@ func EventMuteStrategy(event *models.AlertCurEvent, alertMuteCache *memsto.Alert
|
||||
}
|
||||
|
||||
// MatchMute 如果传入了clock这个可选参数,就表示使用这个clock表示的时间,否则就从event的字段中取TriggerTime
|
||||
func MatchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
|
||||
func MatchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) (bool, error) {
|
||||
if mute.Disabled == 1 {
|
||||
return false
|
||||
return false, errors.New("mute is disabled")
|
||||
}
|
||||
|
||||
// 如果不是全局的,判断 匹配的 datasource id
|
||||
@@ -158,13 +160,13 @@ func MatchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int
|
||||
|
||||
// 判断 event.datasourceId 是否包含在 idm 中
|
||||
if _, has := idm[event.DatasourceId]; !has {
|
||||
return false
|
||||
return false, errors.New("datasource id not match")
|
||||
}
|
||||
}
|
||||
|
||||
if mute.MuteTimeType == models.TimeRange {
|
||||
if !mute.IsWithinTimeRange(event.TriggerTime) {
|
||||
return false
|
||||
return false, errors.New("event trigger time not within mute time range")
|
||||
}
|
||||
} else if mute.MuteTimeType == models.Periodic {
|
||||
ts := event.TriggerTime
|
||||
@@ -173,11 +175,11 @@ func MatchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int
|
||||
}
|
||||
|
||||
if !mute.IsWithinPeriodicMute(ts) {
|
||||
return false
|
||||
return false, errors.New("event trigger time not within periodic mute range")
|
||||
}
|
||||
} else {
|
||||
logger.Warningf("mute time type invalid, %d", mute.MuteTimeType)
|
||||
return false
|
||||
return false, errors.New("mute time type invalid")
|
||||
}
|
||||
|
||||
var matchSeverity bool
|
||||
@@ -193,12 +195,14 @@ func MatchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int
|
||||
}
|
||||
|
||||
if !matchSeverity {
|
||||
return false
|
||||
return false, errors.New("event severity not match mute severity")
|
||||
}
|
||||
|
||||
if mute.ITags == nil || len(mute.ITags) == 0 {
|
||||
return true
|
||||
return true, nil
|
||||
}
|
||||
|
||||
return common.MatchTags(event.TagsMap, mute.ITags)
|
||||
if !common.MatchTags(event.TagsMap, mute.ITags) {
|
||||
return false, errors.New("event tags not match mute tags")
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -372,6 +372,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.POST("/relabel-test", rt.auth(), rt.user(), rt.relabelTest)
|
||||
pages.POST("/busi-group/:id/alert-rules/clone", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.bgrw(), rt.cloneToMachine)
|
||||
pages.POST("/busi-groups/alert-rules/clones", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.batchAlertRuleClone)
|
||||
pages.POST("/busi-group/alert-rules/notify-tryrun", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.alertRuleNotifyTryRun)
|
||||
pages.POST("/busi-group/alert-rules/enable-tryrun", rt.auth(), rt.user(), rt.perm("/alert-rules/add"), rt.alertRuleEnableTryRun)
|
||||
|
||||
pages.GET("/busi-groups/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGetsByGids)
|
||||
pages.GET("/busi-group/:id/recording-rules", rt.auth(), rt.user(), rt.perm("/recording-rules"), rt.recordingRuleGets)
|
||||
@@ -397,6 +399,7 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.POST("/busi-group/:id/alert-subscribes", rt.auth(), rt.user(), rt.perm("/alert-subscribes/add"), rt.bgrw(), rt.alertSubscribeAdd)
|
||||
pages.PUT("/busi-group/:id/alert-subscribes", rt.auth(), rt.user(), rt.perm("/alert-subscribes/put"), rt.bgrw(), rt.alertSubscribePut)
|
||||
pages.DELETE("/busi-group/:id/alert-subscribes", rt.auth(), rt.user(), rt.perm("/alert-subscribes/del"), rt.bgrw(), rt.alertSubscribeDel)
|
||||
pages.POST("/alert-subscribe/alert-subscribes-tryrun", rt.auth(), rt.user(), rt.perm("/alert-subscribes/add"), rt.alertSubscribeTryRun)
|
||||
|
||||
pages.GET("/alert-cur-event/:eid", rt.alertCurEventGet)
|
||||
pages.GET("/alert-his-event/:eid", rt.alertHisEventGet)
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"gopkg.in/yaml.v2"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/mute"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/strx"
|
||||
"github.com/ccfos/nightingale/v6/pushgw/pconf"
|
||||
@@ -18,6 +19,7 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/jinzhu/copier"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/i18n"
|
||||
@@ -157,6 +159,120 @@ func (rt *Router) alertRuleAddByFE(c *gin.Context) {
|
||||
ginx.NewRender(c).Data(reterr, nil)
|
||||
}
|
||||
|
||||
type AlertRuleTryRunForm struct {
|
||||
EventId int64 `json:"event_id" binding:"required"`
|
||||
AlertRuleConfig models.AlertRule `json:"alert_rule_config" binding:"required"`
|
||||
}
|
||||
|
||||
func (rt *Router) alertRuleNotifyTryRun(c *gin.Context) {
|
||||
// check notify channels of old version
|
||||
var f AlertRuleTryRunForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
if hisEvent == nil {
|
||||
ginx.Bomb(http.StatusNotFound, "event not found")
|
||||
}
|
||||
|
||||
curEvent := *hisEvent.ToCur()
|
||||
curEvent.SetTagsMap()
|
||||
|
||||
if f.AlertRuleConfig.NotifyVersion == 1 {
|
||||
for _, id := range f.AlertRuleConfig.NotifyRuleIds {
|
||||
notifyRule, err := models.GetNotifyRule(rt.Ctx, id)
|
||||
ginx.Dangerous(err)
|
||||
for _, notifyConfig := range notifyRule.NotifyConfigs {
|
||||
_, err = SendNotifyChannelMessage(rt.Ctx, rt.UserCache, rt.UserGroupCache, notifyConfig, []*models.AlertCurEvent{&curEvent})
|
||||
ginx.Dangerous(err)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data("notification test ok", nil)
|
||||
return
|
||||
}
|
||||
|
||||
if len(f.AlertRuleConfig.NotifyChannelsJSON) == 0 {
|
||||
ginx.Bomb(http.StatusOK, "no notify channels selected")
|
||||
}
|
||||
|
||||
if len(f.AlertRuleConfig.NotifyGroupsJSON) == 0 {
|
||||
ginx.Bomb(http.StatusOK, "no notify groups selected")
|
||||
}
|
||||
|
||||
ancs := make([]string, 0, len(curEvent.NotifyChannelsJSON))
|
||||
ugids := f.AlertRuleConfig.NotifyGroupsJSON
|
||||
ngids := make([]int64, 0)
|
||||
for i := 0; i < len(ugids); i++ {
|
||||
if gid, err := strconv.ParseInt(ugids[i], 10, 64); err == nil {
|
||||
ngids = append(ngids, gid)
|
||||
}
|
||||
}
|
||||
userGroups := rt.UserGroupCache.GetByUserGroupIds(ngids)
|
||||
uids := make([]int64, 0)
|
||||
for i := range userGroups {
|
||||
uids = append(uids, userGroups[i].UserIds...)
|
||||
}
|
||||
users := rt.UserCache.GetByUserIds(uids)
|
||||
for _, NotifyChannels := range curEvent.NotifyChannelsJSON {
|
||||
flag := true
|
||||
// ignore non-default channels
|
||||
switch NotifyChannels {
|
||||
case models.Dingtalk, models.Wecom, models.Feishu, models.Mm,
|
||||
models.Telegram, models.Email, models.FeishuCard:
|
||||
// do nothing
|
||||
default:
|
||||
continue
|
||||
}
|
||||
// default channels
|
||||
for ui := range users {
|
||||
if _, b := users[ui].ExtractToken(NotifyChannels); b {
|
||||
flag = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if flag {
|
||||
ancs = append(ancs, NotifyChannels)
|
||||
}
|
||||
}
|
||||
if len(ancs) > 0 {
|
||||
ginx.Dangerous(errors.New(fmt.Sprintf("All users are missing notify channel configurations. Please check for missing tokens (each channel should be configured with at least one user). %v", ancs)))
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data("notification test ok", nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertRuleEnableTryRun(c *gin.Context) {
|
||||
// check notify channels of old version
|
||||
var f AlertRuleTryRunForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
if hisEvent == nil {
|
||||
ginx.Bomb(http.StatusNotFound, "event not found")
|
||||
}
|
||||
|
||||
curEvent := *hisEvent.ToCur()
|
||||
curEvent.SetTagsMap()
|
||||
|
||||
if f.AlertRuleConfig.Disabled == 1 {
|
||||
ginx.Bomb(http.StatusOK, "rule is disabled")
|
||||
}
|
||||
|
||||
if mute.TimeSpanMuteStrategy(&f.AlertRuleConfig, &curEvent) {
|
||||
ginx.Bomb(http.StatusOK, "event is not match for period of time")
|
||||
}
|
||||
|
||||
if mute.BgNotMatchMuteStrategy(&f.AlertRuleConfig, &curEvent, rt.TargetCache) {
|
||||
ginx.Bomb(http.StatusOK, "event target busi group not match rule busi group")
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data("event is effective", nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertRuleAddByImport(c *gin.Context) {
|
||||
username := c.MustGet("username").(string)
|
||||
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/strx"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
)
|
||||
|
||||
@@ -104,6 +109,124 @@ func (rt *Router) alertSubscribeAdd(c *gin.Context) {
|
||||
ginx.NewRender(c).Message(f.Add(rt.Ctx))
|
||||
}
|
||||
|
||||
type SubscribeTryRunForm struct {
|
||||
EventId int64 `json:"event_id" binding:"required"`
|
||||
SubscribeConfig models.AlertSubscribe `json:"subscribe_config" binding:"required"`
|
||||
}
|
||||
|
||||
func (rt *Router) alertSubscribeTryRun(c *gin.Context) {
|
||||
var f SubscribeTryRunForm
|
||||
ginx.BindJSON(c, &f)
|
||||
|
||||
hisEvent, err := models.AlertHisEventGetById(rt.Ctx, f.EventId)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
if hisEvent == nil {
|
||||
ginx.Bomb(http.StatusNotFound, "event not found")
|
||||
}
|
||||
|
||||
curEvent := *hisEvent.ToCur()
|
||||
curEvent.SetTagsMap()
|
||||
|
||||
// 先判断匹配条件
|
||||
if !f.SubscribeConfig.MatchCluster(curEvent.DatasourceId) {
|
||||
ginx.Dangerous(errors.New("Datasource mismatch"))
|
||||
}
|
||||
|
||||
// 匹配 tag
|
||||
f.SubscribeConfig.Parse()
|
||||
if !common.MatchTags(curEvent.TagsMap, f.SubscribeConfig.ITags) {
|
||||
ginx.Dangerous(errors.New("Tags mismatch"))
|
||||
}
|
||||
|
||||
// 匹配group name
|
||||
if !common.MatchGroupsName(curEvent.GroupName, f.SubscribeConfig.IBusiGroups) {
|
||||
ginx.Dangerous(errors.New("Group name mismatch"))
|
||||
}
|
||||
|
||||
// 检查严重级别(Severity)匹配
|
||||
if len(f.SubscribeConfig.SeveritiesJson) != 0 {
|
||||
match := false
|
||||
for _, s := range f.SubscribeConfig.SeveritiesJson {
|
||||
if s == curEvent.Severity || s == 0 {
|
||||
match = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !match {
|
||||
ginx.Dangerous(errors.New("Severity mismatch"))
|
||||
}
|
||||
}
|
||||
|
||||
// 新版本通知规则
|
||||
if f.SubscribeConfig.NotifyVersion == 1 {
|
||||
for _, id := range f.SubscribeConfig.NotifyRuleIds {
|
||||
notifyRule, err := models.GetNotifyRule(rt.Ctx, id)
|
||||
ginx.Dangerous(err)
|
||||
|
||||
for _, notifyConfig := range notifyRule.NotifyConfigs {
|
||||
_, err = SendNotifyChannelMessage(rt.Ctx, rt.UserCache, rt.UserGroupCache, notifyConfig, []*models.AlertCurEvent{&curEvent})
|
||||
ginx.Dangerous(err)
|
||||
}
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data("notification test ok", nil)
|
||||
return
|
||||
}
|
||||
|
||||
// 旧版通知方式
|
||||
f.SubscribeConfig.ModifyEvent(&curEvent)
|
||||
if len(curEvent.NotifyChannelsJSON) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "no notify channels selected")
|
||||
}
|
||||
|
||||
if len(curEvent.NotifyGroupsJSON) == 0 {
|
||||
ginx.Bomb(http.StatusOK, "no notify groups selected")
|
||||
}
|
||||
|
||||
ancs := make([]string, 0, len(curEvent.NotifyChannelsJSON))
|
||||
ugids := strings.Fields(f.SubscribeConfig.UserGroupIds)
|
||||
ngids := make([]int64, 0)
|
||||
for i := 0; i < len(ugids); i++ {
|
||||
if gid, err := strconv.ParseInt(ugids[i], 10, 64); err == nil {
|
||||
ngids = append(ngids, gid)
|
||||
}
|
||||
}
|
||||
|
||||
userGroups := rt.UserGroupCache.GetByUserGroupIds(ngids)
|
||||
uids := make([]int64, 0)
|
||||
for i := range userGroups {
|
||||
uids = append(uids, userGroups[i].UserIds...)
|
||||
}
|
||||
users := rt.UserCache.GetByUserIds(uids)
|
||||
for _, NotifyChannels := range curEvent.NotifyChannelsJSON {
|
||||
flag := true
|
||||
// ignore non-default channels
|
||||
switch NotifyChannels {
|
||||
case models.Dingtalk, models.Wecom, models.Feishu, models.Mm,
|
||||
models.Telegram, models.Email, models.FeishuCard:
|
||||
// do nothing
|
||||
default:
|
||||
continue
|
||||
}
|
||||
// default channels
|
||||
for ui := range users {
|
||||
if _, b := users[ui].ExtractToken(NotifyChannels); b {
|
||||
flag = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if flag {
|
||||
ancs = append(ancs, NotifyChannels)
|
||||
}
|
||||
}
|
||||
if len(ancs) > 0 {
|
||||
ginx.Dangerous(errors.New(fmt.Sprintf("All users are missing notify channel configurations. Please check for missing tokens (each channel should be configured with at least one user). %v", ancs)))
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data("notification test ok", nil)
|
||||
}
|
||||
|
||||
func (rt *Router) alertSubscribePut(c *gin.Context) {
|
||||
var fs []models.AlertSubscribe
|
||||
ginx.BindJSON(c, &fs)
|
||||
|
||||
@@ -95,7 +95,9 @@ func (rt *Router) alertMuteTryRun(c *gin.Context) {
|
||||
f.AlertMute.Btime = 0 // 最小可能值(如 Unix 时间戳起点)
|
||||
f.AlertMute.Etime = math.MaxInt64 // 最大可能值(int64 上限)
|
||||
|
||||
if !mute.MatchMute(&curEvent, &f.AlertMute) {
|
||||
match, err := mute.MatchMute(&curEvent, &f.AlertMute)
|
||||
ginx.Dangerous(err)
|
||||
if !match {
|
||||
ginx.NewRender(c).Data("not match", nil)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -6,11 +6,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/dispatch"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/slice"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
@@ -161,79 +162,89 @@ func (rt *Router) notifyTest(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "not events applicable")
|
||||
}
|
||||
|
||||
notifyChannels, err := models.NotifyChannelGets(rt.Ctx, f.NotifyConfig.ChannelID, "", "", -1)
|
||||
ginx.Dangerous(err)
|
||||
resp, err := SendNotifyChannelMessage(rt.Ctx, rt.UserCache, rt.UserGroupCache, f.NotifyConfig, events)
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
}
|
||||
|
||||
func SendNotifyChannelMessage(ctx *ctx.Context, userCache *memsto.UserCacheType, userGroup *memsto.UserGroupCacheType, notifyConfig models.NotifyConfig, events []*models.AlertCurEvent) (string, error) {
|
||||
notifyChannels, err := models.NotifyChannelGets(ctx, notifyConfig.ChannelID, "", "", -1)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get notify channels: %v", err)
|
||||
}
|
||||
|
||||
if len(notifyChannels) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "notify channel not found")
|
||||
return "", fmt.Errorf("notify channel not found")
|
||||
}
|
||||
|
||||
notifyChannel := notifyChannels[0]
|
||||
|
||||
if !notifyChannel.Enable {
|
||||
ginx.Bomb(http.StatusBadRequest, "notify channel not enabled, please enable it first")
|
||||
return "", fmt.Errorf("notify channel not enabled, please enable it first")
|
||||
}
|
||||
|
||||
tplContent := make(map[string]interface{})
|
||||
if notifyChannel.RequestType != "flashtudy" {
|
||||
messageTemplates, err := models.MessageTemplateGets(rt.Ctx, f.NotifyConfig.TemplateID, "", "")
|
||||
ginx.Dangerous(err)
|
||||
if notifyChannel.RequestType != "flashduty" {
|
||||
messageTemplates, err := models.MessageTemplateGets(ctx, notifyConfig.TemplateID, "", "")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get message templates: %v", err)
|
||||
}
|
||||
|
||||
if len(messageTemplates) == 0 {
|
||||
ginx.Bomb(http.StatusBadRequest, "message template not found")
|
||||
return "", fmt.Errorf("message template not found")
|
||||
}
|
||||
tplContent = messageTemplates[0].RenderEvent(events)
|
||||
}
|
||||
|
||||
var contactKey string
|
||||
if notifyChannel.ParamConfig != nil && notifyChannel.ParamConfig.UserInfo != nil {
|
||||
contactKey = notifyChannel.ParamConfig.UserInfo.ContactKey
|
||||
}
|
||||
|
||||
sendtos, flashDutyChannelIDs, customParams := dispatch.GetNotifyConfigParams(&f.NotifyConfig, contactKey, rt.UserCache, rt.UserGroupCache)
|
||||
sendtos, flashDutyChannelIDs, customParams := dispatch.GetNotifyConfigParams(¬ifyConfig, contactKey, userCache, userGroup)
|
||||
|
||||
var resp string
|
||||
switch notifyChannel.RequestType {
|
||||
case "flashduty":
|
||||
client, err := models.GetHTTPClient(notifyChannel)
|
||||
ginx.Dangerous(err)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get http client: %v", err)
|
||||
}
|
||||
|
||||
for i := range flashDutyChannelIDs {
|
||||
resp, err = notifyChannel.SendFlashDuty(events, flashDutyChannelIDs[i], client)
|
||||
if err != nil {
|
||||
break
|
||||
return "", fmt.Errorf("failed to send flashduty notify: %v", err)
|
||||
}
|
||||
}
|
||||
logger.Infof("channel_name: %v, event:%+v, tplContent:%s, customParams:%v, respBody: %v, err: %v", notifyChannel.Name, events[0], tplContent, customParams, resp, err)
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
return resp, nil
|
||||
case "http":
|
||||
client, err := models.GetHTTPClient(notifyChannel)
|
||||
ginx.Dangerous(err)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get http client: %v", err)
|
||||
}
|
||||
|
||||
if notifyChannel.RequestConfig == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "request config not found")
|
||||
return "", fmt.Errorf("request config is nil")
|
||||
}
|
||||
|
||||
if notifyChannel.RequestConfig.HTTPRequestConfig == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "http request config not found")
|
||||
return "", fmt.Errorf("http request config is nil")
|
||||
}
|
||||
|
||||
if dispatch.NeedBatchContacts(notifyChannel.RequestConfig.HTTPRequestConfig) || len(sendtos) == 0 {
|
||||
resp, err = notifyChannel.SendHTTP(events, tplContent, customParams, sendtos, client)
|
||||
logger.Infof("channel_name: %v, event:%+v, sendtos:%+v, tplContent:%s, customParams:%v, respBody: %v, err: %v", notifyChannel.Name, events[0], sendtos, tplContent, customParams, resp, err)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send http notify: %v", err)
|
||||
return "", fmt.Errorf("failed to send http notify: %v", err)
|
||||
}
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
return resp, nil
|
||||
} else {
|
||||
for i := range sendtos {
|
||||
resp, err = notifyChannel.SendHTTP(events, tplContent, customParams, []string{sendtos[i]}, client)
|
||||
logger.Infof("channel_name: %v, event:%+v, tplContent:%s, customParams:%v, sendto:%+v, respBody: %v, err: %v", notifyChannel.Name, events[0], tplContent, customParams, sendtos[i], resp, err)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send http notify: %v", err)
|
||||
ginx.NewRender(c).Message(err)
|
||||
return
|
||||
return "", fmt.Errorf("failed to send http notify: %v", err)
|
||||
}
|
||||
}
|
||||
ginx.NewRender(c).Message(err)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
case "smtp":
|
||||
@@ -241,14 +252,17 @@ func (rt *Router) notifyTest(c *gin.Context) {
|
||||
ginx.Bomb(http.StatusBadRequest, "No valid email address in the user and team")
|
||||
}
|
||||
err := notifyChannel.SendEmailNow(events, tplContent, sendtos)
|
||||
ginx.NewRender(c).Message(err)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to send email notify: %v", err)
|
||||
}
|
||||
return resp, nil
|
||||
case "script":
|
||||
resp, _, err := notifyChannel.SendScript(events, tplContent, customParams, sendtos)
|
||||
logger.Infof("channel_name: %v, event:%+v, tplContent:%s, customParams:%v, respBody: %v, err: %v", notifyChannel.Name, events[0], tplContent, customParams, resp, err)
|
||||
ginx.NewRender(c).Data(resp, err)
|
||||
return resp, err
|
||||
default:
|
||||
logger.Errorf("unsupported request type: %v", notifyChannel.RequestType)
|
||||
ginx.NewRender(c).Message(errors.New("unsupported request type"))
|
||||
return "", fmt.Errorf("unsupported request type")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,17 +1,23 @@
|
||||
package eslike
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/araddon/dateparse"
|
||||
"github.com/bitly/go-simplejson"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/memsto"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/elastic/go-elasticsearch/v9"
|
||||
"github.com/elastic/go-elasticsearch/v9/esapi"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/prometheus/common/model"
|
||||
@@ -37,6 +43,11 @@ type Query struct {
|
||||
|
||||
Timeout int `json:"timeout" mapstructure:"timeout"`
|
||||
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
|
||||
|
||||
QueryType string `json:"query_type" mapstructure:"query_type"`
|
||||
Query string `json:"query" mapstructure:"query"`
|
||||
CustomParams map[string]interface{} `json:"custom_params" mapstructure:"custom_params"`
|
||||
MaxQueryRows int `json:"max_query_rows" mapstructure:"max_query_rows"`
|
||||
}
|
||||
|
||||
type MetricAggr struct {
|
||||
@@ -548,6 +559,152 @@ func QueryData(ctx context.Context, queryParam interface{}, cliTimeout int64, ve
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func QuerySQLData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, client *elasticsearch.Client) ([]models.DataResp, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if param.Timeout == 0 {
|
||||
param.Timeout = int(cliTimeout) / 1000
|
||||
}
|
||||
|
||||
// Prepare SQL query request
|
||||
query := map[string]interface{}{
|
||||
"query": param.Query,
|
||||
}
|
||||
|
||||
for k, v := range param.CustomParams {
|
||||
query[k] = v
|
||||
}
|
||||
|
||||
// Add timeout if specified
|
||||
if param.Timeout > 0 {
|
||||
query["request_timeout"] = fmt.Sprintf("%ds", param.Timeout)
|
||||
}
|
||||
|
||||
// Execute SQL query
|
||||
queryBytes, err := json.Marshal(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal SQL query: %v", err)
|
||||
}
|
||||
|
||||
// Create a new request with context
|
||||
req := esapi.SQLQueryRequest{
|
||||
Body: bytes.NewReader(queryBytes),
|
||||
}
|
||||
|
||||
// Execute the request with context
|
||||
res, err := req.Do(ctx, client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to execute SQL query: %v", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
// Parse response
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse SQL response: %v", err)
|
||||
}
|
||||
|
||||
// Check for errors in response
|
||||
if res.IsError() {
|
||||
return nil, fmt.Errorf("SQL query error: %s", result["error"].(map[string]interface{})["reason"])
|
||||
}
|
||||
|
||||
// Extract columns and rows
|
||||
columns := result["columns"].([]interface{})
|
||||
rows := result["rows"].([]interface{})
|
||||
|
||||
var dataResps []models.DataResp
|
||||
dataMap := make(map[string]*models.DataResp)
|
||||
|
||||
for _, row := range rows {
|
||||
rowData := row.([]interface{})
|
||||
labels := make(map[string]string)
|
||||
metricValue := make(map[string]float64)
|
||||
metricTs := make(map[string]float64)
|
||||
|
||||
// Process each column based on its role
|
||||
for i, col := range columns {
|
||||
colName := col.(map[string]interface{})["name"].(string)
|
||||
colType := col.(map[string]interface{})["type"].(string)
|
||||
value := rowData[i]
|
||||
|
||||
if colType == "datetime" {
|
||||
if ts, err := sqlbase.ParseTime(value, time.RFC3339Nano); err == nil {
|
||||
metricTs[colName] = float64(ts.Unix())
|
||||
} else {
|
||||
logger.Warningf("parse time error:%v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
switch v := value.(type) {
|
||||
case float64:
|
||||
metricValue[colName] = v
|
||||
case int64:
|
||||
metricValue[colName] = float64(v)
|
||||
default:
|
||||
labels[colName] = fmt.Sprintf("%v", value)
|
||||
}
|
||||
}
|
||||
|
||||
// Process metric values
|
||||
for metricName, value := range metricValue {
|
||||
metrics := make(model.Metric)
|
||||
var labelsStr []string
|
||||
|
||||
// Add labels
|
||||
for k, v := range labels {
|
||||
metrics[model.LabelName(k)] = model.LabelValue(v)
|
||||
labelsStr = append(labelsStr, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
|
||||
// Add metric name
|
||||
metrics["__name__"] = model.LabelValue(metricName)
|
||||
labelsStr = append(labelsStr, fmt.Sprintf("__name__=%s", metricName))
|
||||
|
||||
// Create hash key for labels
|
||||
sort.Strings(labelsStr)
|
||||
labelsStrHash := fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(labelsStr, ","))))
|
||||
|
||||
// Get timestamp
|
||||
var ts float64
|
||||
for id, timestamp := range metricTs {
|
||||
ts = timestamp
|
||||
if id == "time" {
|
||||
break
|
||||
}
|
||||
}
|
||||
if ts == 0 {
|
||||
ts = float64(time.Now().Unix())
|
||||
}
|
||||
|
||||
// Create or update DataResp
|
||||
valuePair := []float64{ts, value}
|
||||
if existing, ok := dataMap[labelsStrHash]; ok {
|
||||
existing.Values = append(existing.Values, valuePair)
|
||||
} else {
|
||||
dataResp := models.DataResp{
|
||||
Ref: param.Ref,
|
||||
Metric: metrics,
|
||||
Values: [][]float64{valuePair},
|
||||
}
|
||||
dataMap[labelsStrHash] = &dataResp
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert map to slice and sort values
|
||||
for _, v := range dataMap {
|
||||
sort.Slice(v.Values, func(i, j int) bool { return v.Values[i][0] < v.Values[j][0] })
|
||||
dataResps = append(dataResps, *v)
|
||||
}
|
||||
|
||||
return dataResps, nil
|
||||
}
|
||||
|
||||
func HitFilter(typ string) bool {
|
||||
switch typ {
|
||||
case "keyword", "date", "long", "integer", "short", "byte", "double", "float", "half_float", "scaled_float", "unsigned_long":
|
||||
@@ -651,3 +808,83 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
|
||||
|
||||
return ret, total, nil
|
||||
}
|
||||
|
||||
func QuerySQLLog(ctx context.Context, queryParam interface{}, timeout int64, version string, client *elasticsearch.Client) ([]interface{}, int64, error) {
|
||||
param := new(Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if param.Timeout == 0 {
|
||||
param.Timeout = int(timeout) / 1000
|
||||
}
|
||||
|
||||
// Prepare SQL query request
|
||||
query := map[string]interface{}{
|
||||
"query": param.Query,
|
||||
}
|
||||
|
||||
for k, v := range param.CustomParams {
|
||||
query[k] = v
|
||||
}
|
||||
|
||||
// Add timeout if specified
|
||||
if param.Timeout > 0 {
|
||||
query["request_timeout"] = fmt.Sprintf("%ds", param.Timeout)
|
||||
}
|
||||
|
||||
// Add max rows limit
|
||||
if param.MaxQueryRows > 0 {
|
||||
query["fetch_size"] = param.MaxQueryRows
|
||||
}
|
||||
|
||||
// Execute SQL query
|
||||
queryBytes, err := json.Marshal(query)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to marshal SQL query: %v", err)
|
||||
}
|
||||
|
||||
// Create a new request with context
|
||||
req := esapi.SQLQueryRequest{
|
||||
Body: bytes.NewReader(queryBytes),
|
||||
}
|
||||
|
||||
// Execute the request with context
|
||||
res, err := req.Do(ctx, client)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to execute SQL query: %v", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
// Parse response
|
||||
var result map[string]interface{}
|
||||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to parse SQL response: %v", err)
|
||||
}
|
||||
|
||||
// Check for errors in response
|
||||
if res.IsError() {
|
||||
return nil, 0, fmt.Errorf("SQL query error: %s", result["error"].(map[string]interface{})["reason"])
|
||||
}
|
||||
|
||||
// Extract columns and rows
|
||||
columns := result["columns"].([]interface{})
|
||||
rows := result["rows"].([]interface{})
|
||||
|
||||
// Convert rows to interface slice
|
||||
var ret []interface{}
|
||||
for _, row := range rows {
|
||||
rowData := row.([]interface{})
|
||||
rowMap := make(map[string]interface{})
|
||||
|
||||
// Convert row data to a map
|
||||
for i, col := range columns {
|
||||
colName := col.(map[string]interface{})["name"].(string)
|
||||
rowMap[colName] = rowData[i]
|
||||
}
|
||||
|
||||
ret = append(ret, rowMap)
|
||||
}
|
||||
|
||||
return ret, 0, nil
|
||||
}
|
||||
|
||||
154
datasource/commons/eslike/eslike_test.go
Normal file
154
datasource/commons/eslike/eslike_test.go
Normal file
@@ -0,0 +1,154 @@
|
||||
package eslike
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/elastic/go-elasticsearch/v9"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TestConfig holds configuration for ES tests
|
||||
type TestConfig struct {
|
||||
ESAddress string
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
var testConfig = TestConfig{
|
||||
ESAddress: "http://localhost:9200", // Default test ES address
|
||||
Username: "elastic", // Add your test ES username
|
||||
Password: "*", // Add your test ES password
|
||||
}
|
||||
|
||||
// setupTestESClient creates a real ES client for integration tests
|
||||
func setupTestESClient(t *testing.T) *elasticsearch.Client {
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: []string{testConfig.ESAddress},
|
||||
}
|
||||
if testConfig.Username != "" && testConfig.Password != "" {
|
||||
cfg.Username = testConfig.Username
|
||||
cfg.Password = testConfig.Password
|
||||
}
|
||||
|
||||
client, err := elasticsearch.NewClient(cfg)
|
||||
assert.NoError(t, err)
|
||||
return client
|
||||
}
|
||||
|
||||
func TestQuerySQLData(t *testing.T) {
|
||||
client := setupTestESClient(t)
|
||||
if client == nil {
|
||||
t.Skip("Skipping test: ES client not available")
|
||||
}
|
||||
|
||||
// Test query parameters
|
||||
query := &Query{
|
||||
Query: "SELECT * FROM library",
|
||||
Timeout: 30,
|
||||
QueryType: "SQL",
|
||||
CustomParams: make(map[string]interface{}),
|
||||
MaxQueryRows: 2,
|
||||
}
|
||||
|
||||
// Execute query
|
||||
results, err := QuerySQLData(context.Background(), query, 30, "7.0", client)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, results)
|
||||
|
||||
// Print results
|
||||
fmt.Printf("\nQuerySQLData Results:\n")
|
||||
for i, result := range results {
|
||||
fmt.Printf("Result %d:\n", i+1)
|
||||
fmt.Printf(" Ref: %s\n", result.Ref)
|
||||
fmt.Printf(" Metric: %v\n", result.Metric)
|
||||
fmt.Printf(" Values: %v\n", result.Values)
|
||||
if result.Query != "" {
|
||||
fmt.Printf(" Query: %s\n", result.Query)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestQuerySQLLog(t *testing.T) {
|
||||
client := setupTestESClient(t)
|
||||
if client == nil {
|
||||
t.Skip("Skipping test: ES client not available")
|
||||
}
|
||||
|
||||
// Test query parameters
|
||||
query := &Query{
|
||||
Query: "SELECT * FROM library",
|
||||
Timeout: 30,
|
||||
QueryType: "SQL",
|
||||
CustomParams: make(map[string]interface{}),
|
||||
MaxQueryRows: 2,
|
||||
}
|
||||
|
||||
// Execute query
|
||||
results, total, err := QuerySQLLog(context.Background(), query, 30, "7.0", client)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, results)
|
||||
assert.GreaterOrEqual(t, total, int64(0))
|
||||
|
||||
// Print results
|
||||
fmt.Printf("\nQuerySQLLog Results:\n")
|
||||
fmt.Printf("Total: %d\n", total)
|
||||
for i, result := range results {
|
||||
fmt.Printf("Result %d:\n", i+1)
|
||||
// Pretty print the result map
|
||||
if resultMap, ok := result.(map[string]interface{}); ok {
|
||||
prettyJSON, err := json.MarshalIndent(resultMap, "", " ")
|
||||
if err == nil {
|
||||
fmt.Printf(" %s\n", string(prettyJSON))
|
||||
} else {
|
||||
fmt.Printf(" %v\n", resultMap)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" %v\n", result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseTime(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
input string
|
||||
expected time.Time
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "RFC3339Nano format",
|
||||
input: "2024-01-01T00:00:00.000Z",
|
||||
expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "Unix timestamp",
|
||||
input: "1704067200",
|
||||
expected: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "Invalid format",
|
||||
input: "invalid-time",
|
||||
expected: time.Time{},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
result, err := sqlbase.ParseTime(tc.input, time.RFC3339Nano)
|
||||
if tc.wantErr {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, tc.expected.UTC(), result.UTC())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tlsx"
|
||||
|
||||
"github.com/elastic/go-elasticsearch/v9"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/olivere/elastic/v7"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
@@ -27,18 +28,20 @@ const (
|
||||
)
|
||||
|
||||
type Elasticsearch struct {
|
||||
Addr string `json:"es.addr" mapstructure:"es.addr"`
|
||||
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
|
||||
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
|
||||
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
|
||||
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
|
||||
Version string `json:"es.version" mapstructure:"es.version"`
|
||||
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
|
||||
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
|
||||
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
|
||||
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
|
||||
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
|
||||
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
|
||||
Addr string `json:"es.addr" mapstructure:"es.addr"`
|
||||
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
|
||||
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
|
||||
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
|
||||
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
|
||||
Version string `json:"es.version" mapstructure:"es.version"`
|
||||
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
|
||||
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
|
||||
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
|
||||
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
|
||||
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
|
||||
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
|
||||
NewClient *elasticsearch.Client `json:"es.new_client" mapstructure:"es.new_client"`
|
||||
MaxQueryRows int `json:"es.max_query_rows" mapstructure:"es.max_query_rows"`
|
||||
}
|
||||
|
||||
type TLS struct {
|
||||
@@ -106,6 +109,28 @@ func (e *Elasticsearch) InitClient() error {
|
||||
options = append(options, elastic.SetHealthcheck(false))
|
||||
|
||||
e.Client, err = elastic.NewClient(options...)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cfg := elasticsearch.Config{
|
||||
Addresses: e.Nodes,
|
||||
Transport: transport,
|
||||
Header: http.Header{},
|
||||
}
|
||||
|
||||
if e.Basic.Username != "" && e.Basic.Password != "" {
|
||||
cfg.Username = e.Basic.Username
|
||||
cfg.Password = e.Basic.Password
|
||||
}
|
||||
|
||||
for k, v := range e.Headers {
|
||||
cfg.Header[k] = []string{v}
|
||||
}
|
||||
|
||||
e.NewClient, err = elasticsearch.NewClient(cfg)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -183,6 +208,14 @@ func (e *Elasticsearch) MakeTSQuery(ctx context.Context, query interface{}, even
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
|
||||
param := new(eslike.Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if param.QueryType == "SQL" {
|
||||
return eslike.QuerySQLData(ctx, param, e.Timeout, e.Version, e.NewClient)
|
||||
}
|
||||
|
||||
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
|
||||
return e.Client.Search().
|
||||
@@ -250,6 +283,20 @@ func (e *Elasticsearch) QueryFields(indexs []string) ([]string, error) {
|
||||
}
|
||||
|
||||
func (e *Elasticsearch) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
|
||||
param := new(eslike.Query)
|
||||
if err := mapstructure.Decode(queryParam, param); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if param.QueryType == "SQL" {
|
||||
if param.CustomParams == nil {
|
||||
param.CustomParams = make(map[string]interface{})
|
||||
}
|
||||
if e.MaxQueryRows > 0 {
|
||||
param.MaxQueryRows = e.MaxQueryRows
|
||||
}
|
||||
return eslike.QuerySQLLog(ctx, param, e.Timeout, e.Version, e.NewClient)
|
||||
}
|
||||
|
||||
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
|
||||
// 应该是之前为了获取 fields 字段,做的这个兼容
|
||||
|
||||
@@ -142,6 +142,7 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
|
||||
ds.Settings["es.min_interval"] = item.SettingsJson["min_interval"]
|
||||
ds.Settings["es.max_shard"] = item.SettingsJson["max_shard"]
|
||||
ds.Settings["es.enable_write"] = item.SettingsJson["enable_write"]
|
||||
ds.Settings["es.max_query_rows"] = item.SettingsJson["max_query_rows"]
|
||||
}
|
||||
|
||||
// for opensearch
|
||||
|
||||
15
go.mod
15
go.mod
@@ -1,6 +1,8 @@
|
||||
module github.com/ccfos/nightingale/v6
|
||||
|
||||
go 1.22
|
||||
go 1.23
|
||||
|
||||
toolchain go1.24.3
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v1.4.0
|
||||
@@ -12,6 +14,7 @@ require (
|
||||
github.com/coreos/go-oidc v2.2.1+incompatible
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0
|
||||
github.com/expr-lang/expr v1.16.1
|
||||
github.com/flashcatcloud/ibex v1.3.5
|
||||
github.com/gin-contrib/pprof v1.4.0
|
||||
@@ -68,8 +71,8 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
go.opentelemetry.io/otel v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.32.0 // indirect
|
||||
go.opentelemetry.io/otel v1.35.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
@@ -79,7 +82,10 @@ require (
|
||||
github.com/eapache/go-resiliency v1.7.0 // indirect
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
|
||||
github.com/eapache/queue v1.1.0 // indirect
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
|
||||
github.com/glebarez/go-sqlite v1.21.2 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.3 // indirect
|
||||
@@ -92,10 +98,11 @@ require (
|
||||
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
|
||||
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/rogpeppe/go-internal v1.13.1 // indirect
|
||||
github.com/valyala/fastrand v1.1.0 // indirect
|
||||
github.com/valyala/histogram v1.2.0 // indirect
|
||||
github.com/yuin/gopher-lua v1.1.1 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
modernc.org/libc v1.22.5 // indirect
|
||||
modernc.org/mathutil v1.5.0 // indirect
|
||||
|
||||
27
go.sum
27
go.sum
@@ -70,6 +70,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
|
||||
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
|
||||
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
|
||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
|
||||
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0 h1:krpgPeJ2lC8apkaw6B58gKDYJq5eUhP8AMwpPt01Q/U=
|
||||
github.com/elastic/go-elasticsearch/v9 v9.0.0/go.mod h1:2PB5YQPpY5tWbF65MRqzEXA31PZOdXCkloQSOZtU14I=
|
||||
github.com/expr-lang/expr v1.16.1 h1:Na8CUcMdyGbnNpShY7kzcHCU7WqxuL+hnxgHZ4vaz/A=
|
||||
github.com/expr-lang/expr v1.16.1/go.mod h1:uCkhfG+x7fcZ5A5sXHKuQ07jGZRl6J0FCAaf2k4PtVQ=
|
||||
github.com/fatih/camelcase v1.0.0 h1:hxNvNX/xYBp0ovncs8WyWZrOrpBNub/JfaMvbURyft8=
|
||||
@@ -106,6 +110,11 @@ github.com/go-ldap/ldap/v3 v3.4.4 h1:qPjipEpt+qDa6SI/h1fzuGWoRUY+qqQ9sOZq67/PYUs
|
||||
github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXgXtJC+aI=
|
||||
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
|
||||
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
|
||||
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
|
||||
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
|
||||
@@ -139,8 +148,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
|
||||
@@ -350,10 +359,16 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
|
||||
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
|
||||
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
|
||||
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
|
||||
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
|
||||
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
|
||||
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
|
||||
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
|
||||
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
|
||||
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
|
||||
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=
|
||||
|
||||
Reference in New Issue
Block a user