Compare commits

..

25 Commits

Author SHA1 Message Date
ning
f6a857f030 refactor: optimize event processor 2025-06-12 16:42:46 +08:00
Yening Qin
85786d985d feat: add ai summary event processor (#2734)
Co-authored-by: Haobo Zhang <43698160+haobo8@users.noreply.github.com>
2025-06-12 11:33:59 +08:00
Yening Qin
cff211364a feat: support postgresql alert (#2732) 2025-06-11 17:43:34 +08:00
Ulric Qin
0190b2b432 Merge branch 'main' of https://github.com/ccfos/nightingale 2025-06-11 11:50:47 +08:00
Ulric Qin
d8081129f1 replace blank in append_tags 2025-06-11 11:50:37 +08:00
ning
66d4d0c494 refactor: event api perm check 2025-06-11 11:37:18 +08:00
ning
d936d57863 refactor: event api perm check 2025-06-11 11:30:27 +08:00
ning
d819691b78 refactor: change event processor api log 2025-06-10 16:54:52 +08:00
ning
6f0b415821 refactor: mysql set default maxQueryRows 2025-06-09 17:36:33 +08:00
ning
f482efd9ce refactor: add alert rule func 2025-06-09 10:13:14 +08:00
ning
b39d5a742e refactor: event pipline tryrun api 2025-06-08 23:03:03 +08:00
ning
59c3d62c6b refactor: mysql datasource param 2025-06-06 19:23:49 +08:00
ning
624ae125d5 Merge branch 'main' of github.com:ccfos/nightingale 2025-06-06 19:08:10 +08:00
ning
b9c822b220 refactor: mysql datasource param 2025-06-06 19:07:42 +08:00
smx_Morgan
c13baf3a9d refactor : add smtp notify test (#2723) 2025-06-06 18:07:49 +08:00
ning
bc46ff1912 fix: original_tags is nil 2025-06-06 17:38:27 +08:00
ning
2f7c76c275 refactor: message tpl add 2025-06-06 15:45:25 +08:00
Yening Qin
1edf305952 feat: support mysql alert (#2725) 2025-06-06 15:26:22 +08:00
Ulric Qin
c026a6d2b2 update README 2025-06-06 08:47:19 +08:00
smx_Morgan
1853e89f7c feat: add alert history events delete api (#2720) 2025-06-05 19:02:46 +08:00
zjxpsetp
a41a00fba3 Merge remote-tracking branch 'origin/main' 2025-06-05 00:00:36 +08:00
zjxpsetp
ceb9a1d7ff update JAVA for jvm dashboard by opentelementry 2025-06-04 23:58:26 +08:00
710leo
0b5223acdb docs: update postgres sql 2025-06-04 23:02:30 +08:00
710leo
4b63c6b4b1 refactor: change event_pipeline column type 2025-06-04 22:51:19 +08:00
zjxpsetp
edd024306a update JAVA for jvm dashboard by opentelementry 2025-06-03 23:43:35 +08:00
47 changed files with 3715 additions and 422 deletions

View File

@@ -58,7 +58,7 @@
- 想要把公司的多套监控系统产生的事件聚拢到一个平台,统一做收敛降噪、响应处理、数据分析
- 想要支持人员的排班,践行 On-call 文化,想要支持告警认领、升级(避免遗漏)、协同处理
那夜莺是不合适的,您需要的是 [PagerDuty](https://www.pagerduty.com/) 或 [FlashDuty](https://flashcat.cloud/product/flashcat-duty/) (产品易用,且有免费套餐)这样的 On-call 产品。
那夜莺是不合适的,推荐您选用 [FlashDuty](https://flashcat.cloud/product/flashcat-duty/) 这样的 On-call 产品,产品简单易用,也有免费套餐
## 相关资料 & 交流渠道

View File

@@ -82,10 +82,6 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
}
pipeline.Init()
// 设置通知记录回调函数
notifyChannelCache.SetNotifyRecordFunc(sender.NotifyRecord)
return notify
}
@@ -189,8 +185,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
for _, processor := range processors {
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
eventCopy = processor.Process(e.ctx, eventCopy)
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
eventCopy, res, err := processor.Process(e.ctx, eventCopy)
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
if eventCopy == nil {
logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
break
@@ -455,25 +451,30 @@ func (e *Dispatch) sendV2(events []*models.AlertCurEvent, notifyRuleId int64, no
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, IntegrationUrl: %v dutychannel_id: %v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], notifyChannel.RequestConfig.FlashDutyRequestConfig.IntegrationUrl, flashDutyChannelIDs[i], respBody, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, strconv.FormatInt(flashDutyChannelIDs[i], 10), respBody, err)
}
return
case "http":
// 使用队列模式处理 http 通知
// 创建通知任务
task := &memsto.NotifyTask{
Events: events,
NotifyRuleId: notifyRuleId,
NotifyChannel: notifyChannel,
TplContent: tplContent,
CustomParams: customParams,
Sendtos: sendtos,
if e.notifyChannelCache.HttpConcurrencyAdd(notifyChannel.ID) {
defer e.notifyChannelCache.HttpConcurrencyDone(notifyChannel.ID)
}
if notifyChannel.RequestConfig == nil {
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, request config not found", notifyRuleId, notifyChannel.Name, events[0])
}
// 将任务加入队列
success := e.notifyChannelCache.EnqueueNotifyTask(task)
if !success {
logger.Errorf("failed to enqueue notify task for channel %d, notify_id: %d", notifyChannel.ID, notifyRuleId)
// 如果入队失败,记录错误通知
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), "", errors.New("failed to enqueue notify task, queue is full"))
if notifyChannel.RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify_id: %d, channel_name: %v, event:%+v, http request config not found", notifyRuleId, notifyChannel.Name, events[0])
}
if NeedBatchContacts(notifyChannel.RequestConfig.HTTPRequestConfig) || len(sendtos) == 0 {
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, sendtos, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos, resp, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, sendtos), resp, err)
} else {
for i := range sendtos {
resp, err := notifyChannel.SendHTTP(events, tplContent, customParams, []string{sendtos[i]}, e.notifyChannelCache.GetHttpClient(notifyChannel.ID))
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%s, customParams:%v, userInfo:%+v, respBody: %v, err: %v", notifyRuleId, notifyChannel.Name, events[0], tplContent, customParams, sendtos[i], resp, err)
sender.NotifyRecord(e.ctx, events, notifyRuleId, notifyChannel.Name, getSendTarget(customParams, []string{sendtos[i]}), resp, err)
}
}
case "smtp":

View File

@@ -93,7 +93,7 @@ func (s *Scheduler) syncAlertRules() {
}
ruleType := rule.GetRuleType()
if rule.IsPrometheusRule() || rule.IsLokiRule() || rule.IsTdengineRule() || rule.IsClickHouseRule() || rule.IsElasticSearch() {
if rule.IsPrometheusRule() || rule.IsInnerRule() {
datasourceIds := s.datasourceCache.GetIDsByDsCateAndQueries(rule.Cate, rule.DatasourceQueries)
for _, dsId := range datasourceIds {
if !naming.DatasourceHashRing.IsHit(strconv.FormatInt(dsId, 10), fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {

View File

@@ -1,6 +1,7 @@
package pipeline
import (
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/aisummary"
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventdrop"
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventupdate"

View File

@@ -0,0 +1,198 @@
package aisummary
import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"text/template"
"time"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/tplx"
)
const (
HTTP_STATUS_SUCCESS_MAX = 299
)
// AISummaryConfig 配置结构体
type AISummaryConfig struct {
callback.HTTPConfig
ModelName string `json:"model_name"`
APIKey string `json:"api_key"`
PromptTemplate string `json:"prompt_template"`
CustomParams map[string]interface{} `json:"custom_params"`
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type ChatCompletionResponse struct {
Choices []struct {
Message struct {
Content string `json:"content"`
} `json:"message"`
} `json:"choices"`
}
func init() {
models.RegisterProcessor("ai_summary", &AISummaryConfig{})
}
func (c *AISummaryConfig) Init(settings interface{}) (models.Processor, error) {
result, err := common.InitProcessor[*AISummaryConfig](settings)
return result, err
}
func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil {
if err := c.initHTTPClient(); err != nil {
return event, "", fmt.Errorf("failed to initialize HTTP client: %v processor: %v", err, c)
}
}
// 准备告警事件信息
eventInfo, err := c.prepareEventInfo(event)
if err != nil {
return event, "", fmt.Errorf("failed to prepare event info: %v processor: %v", err, c)
}
// 调用AI模型生成总结
summary, err := c.generateAISummary(eventInfo)
if err != nil {
return event, "", fmt.Errorf("failed to generate AI summary: %v processor: %v", err, c)
}
// 将总结添加到annotations字段
if event.AnnotationsJSON == nil {
event.AnnotationsJSON = make(map[string]string)
}
event.AnnotationsJSON["ai_summary"] = summary
// 更新Annotations字段
b, err := json.Marshal(event.AnnotationsJSON)
if err != nil {
return event, "", fmt.Errorf("failed to marshal annotations: %v processor: %v", err, c)
}
event.Annotations = string(b)
return event, "", nil
}
func (c *AISummaryConfig) initHTTPClient() error {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
}
if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy)
if err != nil {
return fmt.Errorf("failed to parse proxy url: %v", err)
}
transport.Proxy = http.ProxyURL(proxyURL)
}
c.Client = &http.Client{
Timeout: time.Duration(c.Timeout) * time.Millisecond,
Transport: transport,
}
return nil
}
func (c *AISummaryConfig) prepareEventInfo(event *models.AlertCurEvent) (string, error) {
var defs = []string{
"{{$event := .}}",
}
text := strings.Join(append(defs, c.PromptTemplate), "")
t, err := template.New("prompt").Funcs(template.FuncMap(tplx.TemplateFuncMap)).Parse(text)
if err != nil {
return "", fmt.Errorf("failed to parse prompt template: %v", err)
}
var body bytes.Buffer
err = t.Execute(&body, event)
if err != nil {
return "", fmt.Errorf("failed to execute prompt template: %v", err)
}
return body.String(), nil
}
func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
// 构建基础请求参数
reqParams := map[string]interface{}{
"model": c.ModelName,
"messages": []Message{
{
Role: "user",
Content: eventInfo,
},
},
}
// 合并自定义参数
for k, v := range c.CustomParams {
reqParams[k] = v
}
// 序列化请求体
jsonData, err := json.Marshal(reqParams)
if err != nil {
return "", fmt.Errorf("failed to marshal request body: %v", err)
}
// 创建HTTP请求
req, err := http.NewRequest("POST", c.URL, bytes.NewBuffer(jsonData))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
// 设置请求头
req.Header.Set("Authorization", "Bearer "+c.APIKey)
req.Header.Set("Content-Type", "application/json")
for k, v := range c.Headers {
req.Header.Set(k, v)
}
// 发送请求
resp, err := c.Client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// 检查响应状态码
if resp.StatusCode > HTTP_STATUS_SUCCESS_MAX {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
}
// 读取响应
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response body: %v", err)
}
// 解析响应
var chatResp ChatCompletionResponse
if err := json.Unmarshal(body, &chatResp); err != nil {
return "", fmt.Errorf("failed to unmarshal response: %v", err)
}
if len(chatResp.Choices) == 0 {
return "", fmt.Errorf("no response from AI model")
}
return chatResp.Choices[0].Message.Content, nil
}

View File

@@ -0,0 +1,69 @@
package aisummary
import (
"testing"
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/stretchr/testify/assert"
)
func TestAISummaryConfig_Process(t *testing.T) {
// 创建测试配置
config := &AISummaryConfig{
HTTPConfig: callback.HTTPConfig{
URL: "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions",
Timeout: 30000,
SkipSSLVerify: true,
Headers: map[string]string{
"Content-Type": "application/json",
},
},
ModelName: "gemini-2.0-flash",
APIKey: "*",
PromptTemplate: "告警规则:{{$event.RuleName}}\n严重程度{{$event.Severity}}",
CustomParams: map[string]interface{}{
"temperature": 0.7,
"max_tokens": 2000,
"top_p": 0.9,
},
}
// 创建测试事件
event := &models.AlertCurEvent{
RuleName: "Test Rule",
Severity: 1,
TagsMap: map[string]string{
"host": "test-host",
},
AnnotationsJSON: map[string]string{
"description": "Test alert",
},
}
// 测试模板处理
eventInfo, err := config.prepareEventInfo(event)
assert.NoError(t, err)
assert.Contains(t, eventInfo, "Test Rule")
assert.Contains(t, eventInfo, "1")
// 测试配置初始化
processor, err := config.Init(config)
assert.NoError(t, err)
assert.NotNil(t, processor)
// 测试处理函数
result, _, err := processor.Process(&ctx.Context{}, event)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.NotEmpty(t, result.AnnotationsJSON["ai_summary"])
// 展示处理结果
t.Log("\n=== 处理结果 ===")
t.Logf("告警规则: %s", result.RuleName)
t.Logf("严重程度: %d", result.Severity)
t.Logf("标签: %v", result.TagsMap)
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
}

View File

@@ -3,6 +3,7 @@ package callback
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
@@ -19,7 +20,7 @@ type HTTPConfig struct {
URL string `json:"url"`
Method string `json:"method,omitempty"`
Body string `json:"body,omitempty"`
Headers map[string]string `json:"headers"`
Headers map[string]string `json:"header"`
AuthUsername string `json:"auth_username"`
AuthPassword string `json:"auth_password"`
Timeout int `json:"timeout"` // 单位:ms
@@ -42,7 +43,7 @@ func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
return result, err
}
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
@@ -51,7 +52,7 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy)
if err != nil {
logger.Errorf("failed to parse proxy url: %v", err)
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
} else {
transport.Proxy = http.ProxyURL(proxyURL)
}
@@ -71,14 +72,12 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
body, err := json.Marshal(event)
if err != nil {
logger.Errorf("failed to marshal event: %v", err)
return event
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
}
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
}
for k, v := range headers {
@@ -91,16 +90,14 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
resp, err := c.Client.Do(req)
if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
}
logger.Infof("response body: %s", string(b))
return event
logger.Debugf("callback processor response body: %s", string(b))
return event, "callback success", nil
}

View File

@@ -2,6 +2,7 @@ package eventdrop
import (
"bytes"
"fmt"
"strings"
texttemplate "text/template"
@@ -25,7 +26,7 @@ func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
return result, err
}
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
// 在标签过滤和属性过滤都不满足需求时可以使用
// 如果模板执行结果为 true则删除该事件
@@ -40,22 +41,20 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
if err != nil {
logger.Errorf("processor failed to parse template: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("processor failed to parse template: %v processor: %v", err, c)
}
var body bytes.Buffer
if err = tpl.Execute(&body, event); err != nil {
logger.Errorf("processor failed to execute template: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("processor failed to execute template: %v processor: %v", err, c)
}
result := strings.TrimSpace(body.String())
logger.Infof("processor eventdrop result: %v", result)
if result == "true" {
logger.Infof("processor eventdrop drop event: %v", event)
return nil
return event, "drop event success", nil
}
return event
return event, "drop event failed", nil
}

View File

@@ -3,6 +3,7 @@ package eventupdate
import (
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
@@ -30,7 +31,7 @@ func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error)
return result, err
}
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
if c.Client == nil {
transport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
@@ -39,7 +40,7 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
if c.Proxy != "" {
proxyURL, err := url.Parse(c.Proxy)
if err != nil {
logger.Errorf("failed to parse proxy url: %v", err)
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
} else {
transport.Proxy = http.ProxyURL(proxyURL)
}
@@ -59,14 +60,12 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
body, err := json.Marshal(event)
if err != nil {
logger.Errorf("failed to marshal event: %v", err)
return event
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
}
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
if err != nil {
logger.Errorf("failed to create request: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
}
for k, v := range headers {
@@ -79,17 +78,19 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
resp, err := c.Client.Do(req)
if err != nil {
logger.Errorf("failed to send request: %v event: %v", err, event)
return event
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
}
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Errorf("failed to read response body: %v event: %v", err, event)
return event
return nil, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
}
logger.Infof("response body: %s", string(b))
logger.Debugf("event update processor response body: %s", string(b))
json.Unmarshal(b, &event)
return event
err = json.Unmarshal(b, &event)
if err != nil {
return event, "", fmt.Errorf("failed to unmarshal response body: %v processor: %v", err, c)
}
return event, "", nil
}

View File

@@ -42,7 +42,7 @@ func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
return result, err
}
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
sourceLabels := make([]model.LabelName, len(r.SourceLabels))
for i := range r.SourceLabels {
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
@@ -64,7 +64,7 @@ func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *
}
EventRelabel(event, relabelConfigs)
return event
return event, "", nil
}
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {

View File

@@ -280,7 +280,7 @@ func Relabel(rule *models.AlertRule, event *models.AlertCurEvent) {
// need to keep the original label
event.OriginalTags = event.Tags
event.OriginalTagsJSON = make([]string, len(event.TagsJSON))
event.OriginalTagsJSON = event.TagsJSON
if len(rule.EventRelabelConfig) == 0 {
return

View File

@@ -31,4 +31,16 @@ var Plugins = []Plugin{
Type: "ck",
TypeName: "ClickHouse",
},
{
Id: 6,
Category: "timeseries",
Type: "mysql",
TypeName: "MySQL",
},
{
Id: 7,
Category: "timeseries",
Type: "pgsql",
TypeName: "PostgreSQL",
},
}

View File

@@ -398,21 +398,16 @@ func (rt *Router) Config(r *gin.Engine) {
pages.PUT("/busi-group/:id/alert-subscribes", rt.auth(), rt.user(), rt.perm("/alert-subscribes/put"), rt.bgrw(), rt.alertSubscribePut)
pages.DELETE("/busi-group/:id/alert-subscribes", rt.auth(), rt.user(), rt.perm("/alert-subscribes/del"), rt.bgrw(), rt.alertSubscribeDel)
if rt.Center.AnonymousAccess.AlertDetail {
pages.GET("/alert-cur-event/:eid", rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.notificationRecordList)
} else {
pages.GET("/alert-cur-event/:eid", rt.auth(), rt.user(), rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.auth(), rt.user(), rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.auth(), rt.user(), rt.notificationRecordList)
}
pages.GET("/alert-cur-event/:eid", rt.alertCurEventGet)
pages.GET("/alert-his-event/:eid", rt.alertHisEventGet)
pages.GET("/event-notify-records/:eid", rt.notificationRecordList)
// card logic
pages.GET("/alert-cur-events/list", rt.auth(), rt.user(), rt.alertCurEventsList)
pages.GET("/alert-cur-events/card", rt.auth(), rt.user(), rt.alertCurEventsCard)
pages.POST("/alert-cur-events/card/details", rt.auth(), rt.alertCurEventsCardDetails)
pages.GET("/alert-his-events/list", rt.auth(), rt.user(), rt.alertHisEventsList)
pages.DELETE("/alert-his-events", rt.auth(), rt.admin(), rt.alertHisEventsDelete)
pages.DELETE("/alert-cur-events", rt.auth(), rt.user(), rt.perm("/alert-cur-events/del"), rt.alertCurEventDel)
pages.GET("/alert-cur-events/stats", rt.auth(), rt.alertCurEventsStatistics)

View File

@@ -233,6 +233,14 @@ func (rt *Router) checkCurEventBusiGroupRWPermission(c *gin.Context, ids []int64
func (rt *Router) alertCurEventGet(c *gin.Context) {
eid := ginx.UrlParamInt64(c, "eid")
event, err := GetCurEventDetail(rt.Ctx, eid)
hasPermission := HasPermission(rt.Ctx, c, "event", fmt.Sprintf("%d", eid), rt.Center.AnonymousAccess.AlertDetail)
if !hasPermission {
rt.auth()(c)
rt.user()(c)
rt.bgroCheck(c, event.GroupId)
}
ginx.NewRender(c).Data(event, err)
}

View File

@@ -2,6 +2,7 @@ package router
import (
"fmt"
"net/http"
"strings"
"time"
@@ -10,6 +11,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
"golang.org/x/exp/slices"
)
@@ -78,16 +80,56 @@ func (rt *Router) alertHisEventsList(c *gin.Context) {
}, nil)
}
type alertHisEventsDeleteForm struct {
Severities []int `json:"severities"`
Timestamp int64 `json:"timestamp" binding:"required"`
}
func (rt *Router) alertHisEventsDelete(c *gin.Context) {
var f alertHisEventsDeleteForm
ginx.BindJSON(c, &f)
// 校验
if f.Timestamp == 0 {
ginx.Bomb(http.StatusBadRequest, "timestamp parameter is required")
return
}
user := c.MustGet("user").(*models.User)
// 启动后台清理任务
go func() {
limit := 100
for {
n, err := models.AlertHisEventBatchDelete(rt.Ctx, f.Timestamp, f.Severities, limit)
if err != nil {
logger.Errorf("Failed to delete alert history events: operator=%s, timestamp=%d, severities=%v, error=%v",
user.Username, f.Timestamp, f.Severities, err)
break
}
logger.Debugf("Successfully deleted alert history events: operator=%s, timestamp=%d, severities=%v, deleted=%d",
user.Username, f.Timestamp, f.Severities, n)
if n < int64(limit) {
break // 已经删完
}
time.Sleep(100 * time.Millisecond) // 防止锁表
}
}()
ginx.NewRender(c).Message("Alert history events deletion started")
}
func (rt *Router) alertHisEventGet(c *gin.Context) {
eid := ginx.UrlParamInt64(c, "eid")
event, err := models.AlertHisEventGetById(rt.Ctx, eid)
ginx.Dangerous(err)
if event == nil {
ginx.Bomb(404, "No such alert event")
}
if !rt.Center.AnonymousAccess.AlertDetail && rt.Center.EventHistoryGroupView {
hasPermission := HasPermission(rt.Ctx, c, "event", fmt.Sprintf("%d", eid), rt.Center.AnonymousAccess.AlertDetail)
if !hasPermission {
rt.auth()(c)
rt.user()(c)
rt.bgroCheck(c, event.GroupId)
}

View File

@@ -148,11 +148,12 @@ func DatasourceCheck(ds models.Datasource) error {
},
}
ds.HTTPJson.Url = strings.TrimRight(ds.HTTPJson.Url, "/")
var fullURL string
req, err := ds.HTTPJson.NewReq(&fullURL)
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request urls:%v failed", ds.HTTPJson.GetUrls())
return fmt.Errorf("request urls:%v failed: %v", ds.HTTPJson.GetUrls(), err)
}
if ds.PluginType == models.PROMETHEUS {
@@ -168,14 +169,14 @@ func DatasourceCheck(ds models.Datasource) error {
req, err = http.NewRequest("GET", fullURL, nil)
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request url:%s failed", fullURL)
return fmt.Errorf("request url:%s failed: %v", fullURL, err)
}
} else if ds.PluginType == models.TDENGINE {
fullURL = fmt.Sprintf("%s/rest/sql", ds.HTTPJson.Url)
req, err = http.NewRequest("POST", fullURL, strings.NewReader("show databases"))
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request url:%s failed", fullURL)
return fmt.Errorf("request url:%s failed: %v", fullURL, err)
}
}
@@ -187,7 +188,7 @@ func DatasourceCheck(ds models.Datasource) error {
req, err = http.NewRequest("GET", fullURL, nil)
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request url:%s failed", fullURL)
return fmt.Errorf("request url:%s failed: %v", fullURL, err)
}
}
@@ -202,7 +203,7 @@ func DatasourceCheck(ds models.Datasource) error {
resp, err := client.Do(req)
if err != nil {
logger.Errorf("Error making request: %v\n", err)
return fmt.Errorf("request url:%s failed", fullURL)
return fmt.Errorf("request url:%s failed: %v", fullURL, err)
}
defer resp.Body.Close()

View File

@@ -8,7 +8,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
// 获取事件Pipeline列表
@@ -143,11 +142,19 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
for _, p := range f.PipelineConfig.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
}
event = processor.Process(rt.Ctx, event)
event, _, err = processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
if event == nil {
ginx.Bomb(http.StatusBadRequest, "event is nil")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
}, nil)
return
}
}
@@ -170,15 +177,17 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
processor, err := models.GetProcessorByType(f.ProcessorConfig.Typ, f.ProcessorConfig.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor type not found")
ginx.Bomb(http.StatusBadRequest, "get processor err: %+v", err)
}
event = processor.Process(rt.Ctx, event)
logger.Infof("processor %+v result: %+v", f.ProcessorConfig, event)
if event == nil {
ginx.Bomb(http.StatusBadRequest, "event is nil")
event, res, err := processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor err: %+v", err)
}
ginx.NewRender(c).Data(event, nil)
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": res,
}, nil)
}
func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
@@ -210,11 +219,19 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
for _, p := range pl.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor %+v type not found", p)
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
}
event, _, err := processor.Process(rt.Ctx, event)
if err != nil {
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
}
event = processor.Process(rt.Ctx, event)
if event == nil {
ginx.Bomb(http.StatusBadRequest, "event is nil")
ginx.NewRender(c).Data(map[string]interface{}{
"event": event,
"result": "event is dropped",
}, nil)
return
}
}
}

View File

@@ -173,3 +173,38 @@ func Username(c *gin.Context) string {
}
return username
}
func HasPermission(ctx *ctx.Context, c *gin.Context, sourceType, sourceId string, isAnonymousAccess bool) bool {
if sourceType == "event" && isAnonymousAccess {
return true
}
// 尝试从请求中获取 __token 参数
token := ginx.QueryStr(c, "__token", "")
// 如果有 __token 参数,验证其合法性
if token != "" {
return ValidateSourceToken(ctx, sourceType, sourceId, token)
}
return false
}
func ValidateSourceToken(ctx *ctx.Context, sourceType, sourceId, token string) bool {
if token == "" {
return false
}
// 根据源类型、源ID和令牌获取源令牌记录
sourceToken, err := models.GetSourceTokenBySource(ctx, sourceType, sourceId, token)
if err != nil {
return false
}
// 检查令牌是否过期
if sourceToken.IsExpired() {
return false
}
return true
}

View File

@@ -12,7 +12,9 @@ import (
"github.com/ccfos/nightingale/v6/pkg/slice"
"github.com/ccfos/nightingale/v6/pkg/strx"
"github.com/ccfos/nightingale/v6/pkg/tplx"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/toolkits/pkg/ginx"
)
@@ -30,6 +32,9 @@ func (rt *Router) messageTemplatesAdd(c *gin.Context) {
ginx.Dangerous(err)
now := time.Now().Unix()
for _, tpl := range lst {
// 生成一个唯一的标识符,以后也不允许修改,前端不需要传这个参数
tpl.Ident = uuid.New().String()
ginx.Dangerous(tpl.Verify())
if !isAdmin && !slice.HaveIntersection(gids, tpl.UserGroupIds) {
ginx.Bomb(http.StatusForbidden, "forbidden")

View File

@@ -237,6 +237,9 @@ func (rt *Router) notifyTest(c *gin.Context) {
}
case "smtp":
if len(sendtos) == 0 {
ginx.Bomb(http.StatusBadRequest, "No valid email address in the user and team")
}
err := notifyChannel.SendEmailNow(events, tplContent, sendtos)
ginx.NewRender(c).Message(err)
case "script":

View File

@@ -53,6 +53,20 @@ func init() {
PluginType: "ck",
PluginTypeName: "ClickHouse",
}
DatasourceTypes[5] = DatasourceType{
Id: 5,
Category: "timeseries",
PluginType: "mysql",
PluginTypeName: "MySQL",
}
DatasourceTypes[6] = DatasourceType{
Id: 6,
Category: "timeseries",
PluginType: "pgsql",
PluginTypeName: "PostgreSQL",
}
}
type NewDatasrouceFn func(settings map[string]interface{}) (Datasource, error)

227
datasource/mysql/mysql.go Normal file
View File

@@ -0,0 +1,227 @@
package mysql
import (
"context"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/dskit/mysql"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/mitchellh/mapstructure"
"github.com/toolkits/pkg/logger"
)
const (
MySQLType = "mysql"
)
func init() {
datasource.RegisterDatasource(MySQLType, new(MySQL))
}
type MySQL struct {
mysql.MySQL `json:",inline" mapstructure:",squash"`
}
type QueryParam struct {
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
From int64 `json:"from" mapstructure:"from"`
To int64 `json:"to" mapstructure:"to"`
}
func (m *MySQL) InitClient() error {
if len(m.Shards) == 0 {
return fmt.Errorf("not found mysql addr, please check datasource config")
}
if _, err := m.NewConn(context.TODO(), ""); err != nil {
return err
}
return nil
}
func (m *MySQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
newest := new(MySQL)
err := mapstructure.Decode(settings, newest)
return newest, err
}
func (m *MySQL) Validate(ctx context.Context) error {
if len(m.Shards) == 0 || len(strings.TrimSpace(m.Shards[0].Addr)) == 0 {
return fmt.Errorf("mysql addr is invalid, please check datasource setting")
}
if len(strings.TrimSpace(m.Shards[0].User)) == 0 {
return fmt.Errorf("mysql user is invalid, please check datasource setting")
}
return nil
}
// Equal compares whether two objects are the same, used for caching
func (m *MySQL) Equal(p datasource.Datasource) bool {
newest, ok := p.(*MySQL)
if !ok {
logger.Errorf("unexpected plugin type, expected is mysql")
return false
}
if len(m.Shards) == 0 || len(newest.Shards) == 0 {
return false
}
oldShard := m.Shards[0]
newShard := newest.Shards[0]
if oldShard.Addr != newShard.Addr {
return false
}
if oldShard.User != newShard.User {
return false
}
if oldShard.Password != newShard.Password {
return false
}
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
return false
}
if oldShard.Timeout != newShard.Timeout {
return false
}
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
return false
}
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
return false
}
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
return false
}
return true
}
func (m *MySQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
return nil, nil
}
func (m *MySQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
return nil, nil
}
func (m *MySQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
return nil, nil
}
func (m *MySQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
mysqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, mysqlQueryParam); err != nil {
return nil, err
}
if strings.Contains(mysqlQueryParam.SQL, "$__") {
var err error
mysqlQueryParam.SQL, err = macros.Macro(mysqlQueryParam.SQL, mysqlQueryParam.From, mysqlQueryParam.To)
if err != nil {
return nil, err
}
}
if mysqlQueryParam.Keys.ValueKey == "" {
return nil, fmt.Errorf("valueKey is required")
}
timeout := m.Shards[0].Timeout
if timeout == 0 {
timeout = 60
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
items, err := m.QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
Sql: mysqlQueryParam.SQL,
Keys: types.Keys{
ValueKey: mysqlQueryParam.Keys.ValueKey,
LabelKey: mysqlQueryParam.Keys.LabelKey,
TimeKey: mysqlQueryParam.Keys.TimeKey,
},
})
if err != nil {
logger.Warningf("query:%+v get data err:%v", mysqlQueryParam, err)
return []models.DataResp{}, err
}
data := make([]models.DataResp, 0)
for i := range items {
data = append(data, models.DataResp{
Ref: mysqlQueryParam.Ref,
Metric: items[i].Metric,
Values: items[i].Values,
})
}
return data, nil
}
func (m *MySQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
mysqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, mysqlQueryParam); err != nil {
return nil, 0, err
}
if strings.Contains(mysqlQueryParam.SQL, "$__") {
var err error
mysqlQueryParam.SQL, err = macros.Macro(mysqlQueryParam.SQL, mysqlQueryParam.From, mysqlQueryParam.To)
if err != nil {
return nil, 0, err
}
}
timeout := m.Shards[0].Timeout
if timeout == 0 {
timeout = 60
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
items, err := m.Query(timeoutCtx, &sqlbase.QueryParam{
Sql: mysqlQueryParam.SQL,
})
if err != nil {
logger.Warningf("query:%+v get data err:%v", mysqlQueryParam, err)
return []interface{}{}, 0, err
}
logs := make([]interface{}, 0)
for i := range items {
logs = append(logs, items[i])
}
return logs, 0, nil
}
func (m *MySQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
mysqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, mysqlQueryParam); err != nil {
return nil, err
}
return m.DescTable(ctx, mysqlQueryParam.Database, mysqlQueryParam.Table)
}

View File

@@ -0,0 +1,346 @@
package postgresql
import (
"context"
"fmt"
"regexp"
"strings"
"time"
"github.com/ccfos/nightingale/v6/datasource"
"github.com/ccfos/nightingale/v6/pkg/macros"
"github.com/ccfos/nightingale/v6/dskit/postgres"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/models"
"github.com/mitchellh/mapstructure"
"github.com/toolkits/pkg/logger"
)
const (
PostgreSQLType = "pgsql"
)
var (
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
)
func init() {
datasource.RegisterDatasource(PostgreSQLType, new(PostgreSQL))
}
type PostgreSQL struct {
Shards []*postgres.PostgreSQL `json:"pgsql.shards" mapstructure:"pgsql.shards"`
}
type QueryParam struct {
Ref string `json:"ref" mapstructure:"ref"`
Database string `json:"database" mapstructure:"database"`
Table string `json:"table" mapstructure:"table"`
SQL string `json:"sql" mapstructure:"sql"`
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
From int64 `json:"from" mapstructure:"from"`
To int64 `json:"to" mapstructure:"to"`
}
func (p *PostgreSQL) InitClient() error {
if len(p.Shards) == 0 {
return fmt.Errorf("not found postgresql addr, please check datasource config")
}
for _, shard := range p.Shards {
if db, err := shard.NewConn(context.TODO(), "postgres"); err != nil {
defer sqlbase.CloseDB(db)
return err
}
}
return nil
}
func (p *PostgreSQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
newest := new(PostgreSQL)
err := mapstructure.Decode(settings, newest)
return newest, err
}
func (p *PostgreSQL) Validate(ctx context.Context) error {
if len(p.Shards) == 0 || len(strings.TrimSpace(p.Shards[0].Addr)) == 0 {
return fmt.Errorf("postgresql addr is invalid, please check datasource setting")
}
if len(strings.TrimSpace(p.Shards[0].User)) == 0 {
return fmt.Errorf("postgresql user is invalid, please check datasource setting")
}
return nil
}
// Equal compares whether two objects are the same, used for caching
func (p *PostgreSQL) Equal(d datasource.Datasource) bool {
newest, ok := d.(*PostgreSQL)
if !ok {
logger.Errorf("unexpected plugin type, expected is postgresql")
return false
}
if len(p.Shards) == 0 || len(newest.Shards) == 0 {
return false
}
oldShard := p.Shards[0]
newShard := newest.Shards[0]
if oldShard.Addr != newShard.Addr {
return false
}
if oldShard.User != newShard.User {
return false
}
if oldShard.Password != newShard.Password {
return false
}
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
return false
}
if oldShard.Timeout != newShard.Timeout {
return false
}
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
return false
}
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
return false
}
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
return false
}
return true
}
func (p *PostgreSQL) ShowDatabases(ctx context.Context) ([]string, error) {
return p.Shards[0].ShowDatabases(ctx, "")
}
func (p *PostgreSQL) ShowTables(ctx context.Context, database string) ([]string, error) {
p.Shards[0].DB = database
rets, err := p.Shards[0].ShowTables(ctx, "")
if err != nil {
return nil, err
}
tables := make([]string, 0, len(rets))
for scheme, tabs := range rets {
for _, tab := range tabs {
tables = append(tables, scheme+"."+tab)
}
}
return tables, nil
}
func (p *PostgreSQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
return nil, nil
}
func (p *PostgreSQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
return nil, nil
}
func (p *PostgreSQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
return nil, nil
}
func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
postgresqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
return nil, err
}
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
if err != nil {
return nil, err
}
}
if postgresqlQueryParam.Database != "" {
p.Shards[0].DB = postgresqlQueryParam.Database
} else {
db, err := parseDBName(postgresqlQueryParam.SQL)
if err != nil {
return nil, err
}
p.Shards[0].DB = db
}
timeout := p.Shards[0].Timeout
if timeout == 0 {
timeout = 60
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
items, err := p.Shards[0].QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
Sql: postgresqlQueryParam.SQL,
Keys: types.Keys{
ValueKey: postgresqlQueryParam.Keys.ValueKey,
LabelKey: postgresqlQueryParam.Keys.LabelKey,
TimeKey: postgresqlQueryParam.Keys.TimeKey,
},
})
if err != nil {
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
return []models.DataResp{}, err
}
data := make([]models.DataResp, 0)
for i := range items {
data = append(data, models.DataResp{
Ref: postgresqlQueryParam.Ref,
Metric: items[i].Metric,
Values: items[i].Values,
})
}
// parse resp to time series data
logger.Infof("req:%+v keys:%+v \n data:%v", postgresqlQueryParam, postgresqlQueryParam.Keys, data)
return data, nil
}
func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
postgresqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
return nil, 0, err
}
if postgresqlQueryParam.Database != "" {
p.Shards[0].DB = postgresqlQueryParam.Database
} else {
db, err := parseDBName(postgresqlQueryParam.SQL)
if err != nil {
return nil, 0, err
}
p.Shards[0].DB = db
}
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
var err error
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
if err != nil {
return nil, 0, err
}
}
timeout := p.Shards[0].Timeout
if timeout == 0 {
timeout = 60
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
items, err := p.Shards[0].Query(timeoutCtx, &sqlbase.QueryParam{
Sql: postgresqlQueryParam.SQL,
})
if err != nil {
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
return []interface{}{}, 0, err
}
logs := make([]interface{}, 0)
for i := range items {
logs = append(logs, items[i])
}
return logs, 0, nil
}
func (p *PostgreSQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
postgresqlQueryParam := new(QueryParam)
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
return nil, err
}
p.Shards[0].DB = postgresqlQueryParam.Database
pairs := strings.Split(postgresqlQueryParam.Table, ".") // format: scheme.table_name
scheme := ""
table := postgresqlQueryParam.Table
if len(pairs) == 2 {
scheme = pairs[0]
table = pairs[1]
}
return p.Shards[0].DescTable(ctx, scheme, table)
}
func parseDBName(sql string) (db string, err error) {
re := regexp.MustCompile(regx)
matches := re.FindStringSubmatch(sql)
if len(matches) != 4 {
return "", fmt.Errorf("no valid table name in format database.schema.table found")
}
return matches[1], nil
}
func extractColumns(sql string) ([]string, error) {
// 将 SQL 转换为小写以简化匹配
sql = strings.ToLower(sql)
// 匹配 SELECT 和 FROM 之间的内容
re := regexp.MustCompile(`select\s+(.*?)\s+from`)
matches := re.FindStringSubmatch(sql)
if len(matches) < 2 {
return nil, fmt.Errorf("no columns found or invalid SQL syntax")
}
// 提取列部分
columnsString := matches[1]
// 分割列
columns := splitColumns(columnsString)
// 清理每个列名
for i, col := range columns {
columns[i] = strings.TrimSpace(col)
}
return columns, nil
}
func splitColumns(columnsString string) []string {
var columns []string
var currentColumn strings.Builder
parenthesesCount := 0
inQuotes := false
for _, char := range columnsString {
switch char {
case '(':
parenthesesCount++
currentColumn.WriteRune(char)
case ')':
parenthesesCount--
currentColumn.WriteRune(char)
case '\'', '"':
inQuotes = !inQuotes
currentColumn.WriteRune(char)
case ',':
if parenthesesCount == 0 && !inQuotes {
columns = append(columns, currentColumn.String())
currentColumn.Reset()
} else {
currentColumn.WriteRune(char)
}
default:
currentColumn.WriteRune(char)
}
}
if currentColumn.Len() > 0 {
columns = append(columns, currentColumn.String())
}
return columns
}

View File

@@ -204,6 +204,7 @@ CREATE TABLE board (
public smallint not null default 0 ,
built_in smallint not null default 0 ,
hide smallint not null default 0 ,
public_cate bigint NOT NULL DEFAULT 0,
create_at bigint not null default 0,
create_by varchar(64) not null default '',
update_at bigint not null default 0,
@@ -217,6 +218,7 @@ COMMENT ON COLUMN board.tags IS 'split by space';
COMMENT ON COLUMN board.public IS '0:false 1:true';
COMMENT ON COLUMN board.built_in IS '0:false 1:true';
COMMENT ON COLUMN board.hide IS '0:false 1:true';
COMMENT ON COLUMN board.public_cate IS '0 anonymous 1 login 2 busi';
-- for dashboard new version
@@ -429,43 +431,31 @@ CREATE TABLE target (
ident varchar(191) not null,
note varchar(255) not null default '',
tags varchar(512) not null default '',
host_tags text,
host_ip varchar(15) default '',
agent_version varchar(255) default '',
engine_name varchar(255) default '',
os varchar(31) default '',
update_at bigint not null default 0,
PRIMARY KEY (id),
UNIQUE (ident)
);
CREATE INDEX ON target (group_id);
CREATE INDEX idx_host_ip ON target (host_ip);
CREATE INDEX idx_agent_version ON target (agent_version);
CREATE INDEX idx_engine_name ON target (engine_name);
CREATE INDEX idx_os ON target (os);
COMMENT ON COLUMN target.group_id IS 'busi group id';
COMMENT ON COLUMN target.ident IS 'target id';
COMMENT ON COLUMN target.note IS 'append to alert event as field';
COMMENT ON COLUMN target.tags IS 'append to series data as tags, split by space, append external space at suffix';
COMMENT ON COLUMN target.host_tags IS 'global labels set in conf file';
COMMENT ON COLUMN target.host_ip IS 'IPv4 string';
COMMENT ON COLUMN target.agent_version IS 'agent version';
COMMENT ON COLUMN target.engine_name IS 'engine_name';
-- case1: target_idents; case2: target_tags
-- CREATE TABLE collect_rule (
-- id bigserial,
-- group_id bigint not null default 0 comment 'busi group id',
-- cluster varchar(128) not null,
-- target_idents varchar(512) not null default '' comment 'ident list, split by space',
-- target_tags varchar(512) not null default '' comment 'filter targets by tags, split by space',
-- name varchar(191) not null default '',
-- note varchar(255) not null default '',
-- step int not null,
-- type varchar(64) not null comment 'e.g. port proc log plugin',
-- data text not null,
-- append_tags varchar(255) not null default '' comment 'split by space: e.g. mod=n9e dept=cloud',
-- create_at bigint not null default 0,
-- create_by varchar(64) not null default '',
-- update_at bigint not null default 0,
-- update_by varchar(64) not null default '',
-- PRIMARY KEY (id),
-- KEY (group_id, type, name)
-- ) ;
COMMENT ON COLUMN target.os IS 'os type';
CREATE TABLE metric_view (
id bigserial,
@@ -734,6 +724,7 @@ CREATE TABLE datasource
(
id serial,
name varchar(191) not null default '',
identifier varchar(255) not null default '',
description varchar(255) not null default '',
category varchar(255) not null default '',
plugin_id int not null default 0,
@@ -751,8 +742,8 @@ CREATE TABLE datasource
updated_by varchar(64) not null default '',
UNIQUE (name),
PRIMARY KEY (id)
) ;
) ;
CREATE TABLE builtin_cate (
id bigserial,
name varchar(191) not null,
@@ -795,10 +786,12 @@ CREATE TABLE es_index_pattern (
create_by varchar(64) default '',
update_at bigint default '0',
update_by varchar(64) default '',
note varchar(4096) not null default '',
PRIMARY KEY (id),
UNIQUE (datasource_id, name)
) ;
COMMENT ON COLUMN es_index_pattern.datasource_id IS 'datasource id';
COMMENT ON COLUMN es_index_pattern.note IS 'description of metric in Chinese';
CREATE TABLE builtin_metrics (
id bigserial,
@@ -813,6 +806,7 @@ CREATE TABLE builtin_metrics (
created_by varchar(191) NOT NULL DEFAULT '',
updated_at bigint NOT NULL DEFAULT 0,
updated_by varchar(191) NOT NULL DEFAULT '',
uuid BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (id),
UNIQUE (lang, collector, typ, name)
);
@@ -834,6 +828,7 @@ COMMENT ON COLUMN builtin_metrics.created_at IS 'create time';
COMMENT ON COLUMN builtin_metrics.created_by IS 'creator';
COMMENT ON COLUMN builtin_metrics.updated_at IS 'update time';
COMMENT ON COLUMN builtin_metrics.updated_by IS 'updater';
COMMENT ON COLUMN builtin_metrics.uuid IS 'unique identifier';
CREATE TABLE metric_filter (
id BIGSERIAL PRIMARY KEY,
@@ -916,3 +911,115 @@ CREATE TABLE source_token (
);
CREATE INDEX idx_source_token_type_id_token ON source_token (source_type, source_id, token);
CREATE TABLE notification_record (
id BIGSERIAL PRIMARY KEY,
notify_rule_id BIGINT NOT NULL DEFAULT 0,
event_id bigint NOT NULL,
sub_id bigint DEFAULT NULL,
channel varchar(255) NOT NULL,
status bigint DEFAULT NULL,
target varchar(1024) NOT NULL,
details varchar(2048) DEFAULT '',
created_at bigint NOT NULL
);
CREATE INDEX idx_evt ON notification_record (event_id);
COMMENT ON COLUMN notification_record.event_id IS 'event history id';
COMMENT ON COLUMN notification_record.sub_id IS 'subscribed rule id';
COMMENT ON COLUMN notification_record.channel IS 'notification channel name';
COMMENT ON COLUMN notification_record.status IS 'notification status';
COMMENT ON COLUMN notification_record.target IS 'notification target';
COMMENT ON COLUMN notification_record.details IS 'notification other info';
COMMENT ON COLUMN notification_record.created_at IS 'create time';
CREATE TABLE target_busi_group (
id BIGSERIAL PRIMARY KEY,
target_ident varchar(191) NOT NULL,
group_id bigint NOT NULL,
update_at bigint NOT NULL
);
CREATE UNIQUE INDEX idx_target_group ON target_busi_group (target_ident, group_id);
CREATE TABLE user_token (
id BIGSERIAL PRIMARY KEY,
username varchar(255) NOT NULL DEFAULT '',
token_name varchar(255) NOT NULL DEFAULT '',
token varchar(255) NOT NULL DEFAULT '',
create_at bigint NOT NULL DEFAULT 0,
last_used bigint NOT NULL DEFAULT 0
);
CREATE TABLE notify_rule (
id bigserial PRIMARY KEY,
name varchar(255) NOT NULL,
description text,
enable smallint NOT NULL DEFAULT 0,
user_group_ids varchar(255) NOT NULL DEFAULT '',
notify_configs text,
pipeline_configs text,
create_at bigint NOT NULL DEFAULT 0,
create_by varchar(64) NOT NULL DEFAULT '',
update_at bigint NOT NULL DEFAULT 0,
update_by varchar(64) NOT NULL DEFAULT ''
);
CREATE TABLE notify_channel (
id bigserial PRIMARY KEY,
name varchar(255) NOT NULL,
ident varchar(255) NOT NULL,
description text,
enable smallint NOT NULL DEFAULT 0,
param_config text,
request_type varchar(50) NOT NULL,
request_config text,
weight int NOT NULL DEFAULT 0,
create_at bigint NOT NULL DEFAULT 0,
create_by varchar(64) NOT NULL DEFAULT '',
update_at bigint NOT NULL DEFAULT 0,
update_by varchar(64) NOT NULL DEFAULT ''
);
CREATE TABLE message_template (
id bigserial PRIMARY KEY,
name varchar(64) NOT NULL,
ident varchar(64) NOT NULL,
content text,
user_group_ids varchar(64),
notify_channel_ident varchar(64) NOT NULL DEFAULT '',
private int NOT NULL DEFAULT 0,
weight int NOT NULL DEFAULT 0,
create_at bigint NOT NULL DEFAULT 0,
create_by varchar(64) NOT NULL DEFAULT '',
update_at bigint NOT NULL DEFAULT 0,
update_by varchar(64) NOT NULL DEFAULT ''
);
CREATE TABLE event_pipeline (
id bigserial PRIMARY KEY,
name varchar(128) NOT NULL,
team_ids text,
description varchar(255) NOT NULL DEFAULT '',
filter_enable smallint NOT NULL DEFAULT 0,
label_filters text,
attribute_filters text,
processors text,
create_at bigint NOT NULL DEFAULT 0,
create_by varchar(64) NOT NULL DEFAULT '',
update_at bigint NOT NULL DEFAULT 0,
update_by varchar(64) NOT NULL DEFAULT ''
);
CREATE TABLE embedded_product (
id bigserial PRIMARY KEY,
name varchar(255) DEFAULT NULL,
url varchar(255) DEFAULT NULL,
is_private boolean DEFAULT NULL,
team_ids varchar(255),
create_at bigint NOT NULL DEFAULT 0,
create_by varchar(64) NOT NULL DEFAULT '',
update_at bigint NOT NULL DEFAULT 0,
update_by varchar(64) NOT NULL DEFAULT ''
);

View File

@@ -9,6 +9,8 @@ import (
"github.com/ccfos/nightingale/v6/datasource"
_ "github.com/ccfos/nightingale/v6/datasource/ck"
"github.com/ccfos/nightingale/v6/datasource/es"
_ "github.com/ccfos/nightingale/v6/datasource/mysql"
_ "github.com/ccfos/nightingale/v6/datasource/postgresql"
"github.com/ccfos/nightingale/v6/dskit/tdengine"
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/ctx"

172
dskit/mysql/mysql.go Normal file
View File

@@ -0,0 +1,172 @@
// @Author: Ciusyan 5/10/24
package mysql
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/dskit/pool"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
_ "github.com/go-sql-driver/mysql" // MySQL driver
"github.com/mitchellh/mapstructure"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
type MySQL struct {
Shards []Shard `json:"mysql.shards" mapstructure:"mysql.shards"`
}
type Shard struct {
Addr string `json:"mysql.addr" mapstructure:"mysql.addr"`
DB string `json:"mysql.db" mapstructure:"mysql.db"`
User string `json:"mysql.user" mapstructure:"mysql.user"`
Password string `json:"mysql.password" mapstructure:"mysql.password"`
Timeout int `json:"mysql.timeout" mapstructure:"mysql.timeout"`
MaxIdleConns int `json:"mysql.max_idle_conns" mapstructure:"mysql.max_idle_conns"`
MaxOpenConns int `json:"mysql.max_open_conns" mapstructure:"mysql.max_open_conns"`
ConnMaxLifetime int `json:"mysql.conn_max_lifetime" mapstructure:"mysql.conn_max_lifetime"`
MaxQueryRows int `json:"mysql.max_query_rows" mapstructure:"mysql.max_query_rows"`
}
func NewMySQLWithSettings(ctx context.Context, settings interface{}) (*MySQL, error) {
newest := new(MySQL)
settingsMap := map[string]interface{}{}
switch s := settings.(type) {
case string:
if err := json.Unmarshal([]byte(s), &settingsMap); err != nil {
return nil, err
}
case map[string]interface{}:
settingsMap = s
default:
return nil, errors.New("unsupported settings type")
}
if err := mapstructure.Decode(settingsMap, newest); err != nil {
return nil, err
}
return newest, nil
}
// NewConn establishes a new connection to MySQL
func (m *MySQL) NewConn(ctx context.Context, database string) (*gorm.DB, error) {
if len(m.Shards) == 0 {
return nil, errors.New("empty pgsql shards")
}
shard := m.Shards[0]
if shard.Timeout == 0 {
shard.Timeout = 300
}
if shard.MaxIdleConns == 0 {
shard.MaxIdleConns = 10
}
if shard.MaxOpenConns == 0 {
shard.MaxOpenConns = 100
}
if shard.ConnMaxLifetime == 0 {
shard.ConnMaxLifetime = 300
}
if shard.MaxQueryRows == 0 {
shard.MaxQueryRows = 100
}
if len(shard.Addr) == 0 {
return nil, errors.New("empty addr")
}
if len(shard.Addr) == 0 {
return nil, errors.New("empty addr")
}
var keys []string
var err error
keys = append(keys, shard.Addr)
keys = append(keys, shard.Password, shard.User)
if len(database) > 0 {
keys = append(keys, database)
}
cachedKey := strings.Join(keys, ":")
// cache conn with database
conn, ok := pool.PoolClient.Load(cachedKey)
if ok {
return conn.(*gorm.DB), nil
}
var db *gorm.DB
defer func() {
if db != nil && err == nil {
pool.PoolClient.Store(cachedKey, db)
}
}()
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8&parseTime=True", shard.User, shard.Password, shard.Addr, database)
return sqlbase.NewDB(
ctx,
mysql.Open(dsn),
shard.MaxIdleConns,
shard.MaxOpenConns,
time.Duration(shard.ConnMaxLifetime)*time.Second,
)
}
func (m *MySQL) ShowDatabases(ctx context.Context) ([]string, error) {
db, err := m.NewConn(ctx, "")
if err != nil {
return nil, err
}
return sqlbase.ShowDatabases(ctx, db, "SHOW DATABASES")
}
func (m *MySQL) ShowTables(ctx context.Context, database string) ([]string, error) {
db, err := m.NewConn(ctx, database)
if err != nil {
return nil, err
}
return sqlbase.ShowTables(ctx, db, "SHOW TABLES")
}
func (m *MySQL) DescTable(ctx context.Context, database, table string) ([]*types.ColumnProperty, error) {
db, err := m.NewConn(ctx, database)
if err != nil {
return nil, err
}
query := fmt.Sprintf("DESCRIBE %s", table)
return sqlbase.DescTable(ctx, db, query)
}
func (m *MySQL) SelectRows(ctx context.Context, database, table, query string) ([]map[string]interface{}, error) {
db, err := m.NewConn(ctx, database)
if err != nil {
return nil, err
}
return sqlbase.SelectRows(ctx, db, table, query)
}
func (m *MySQL) ExecQuery(ctx context.Context, database string, sql string) ([]map[string]interface{}, error) {
db, err := m.NewConn(ctx, database)
if err != nil {
return nil, err
}
return sqlbase.ExecQuery(ctx, db, sql)
}

129
dskit/mysql/mysql_test.go Normal file
View File

@@ -0,0 +1,129 @@
// @Author: Ciusyan 5/11/24
package mysql
import (
"context"
"testing"
"github.com/stretchr/testify/require"
)
func TestNewMySQLWithSettings(t *testing.T) {
tests := []struct {
name string
settings interface{}
wantErr bool
}{
{
name: "valid string settings",
settings: `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`,
wantErr: false,
},
{
name: "invalid settings type",
settings: 12345,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewMySQLWithSettings(context.Background(), tt.settings)
if (err != nil) != tt.wantErr {
t.Errorf("NewMySQLWithSettings() error = %v, wantErr %v", err, tt.wantErr)
}
t.Log(got)
})
}
}
func TestNewConn(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
tests := []struct {
name string
database string
wantErr bool
}{
{
name: "valid connection",
database: "db1",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := mysql.NewConn(ctx, tt.database)
if (err != nil) != tt.wantErr {
t.Errorf("NewConn() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func TestShowDatabases(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
databases, err := mysql.ShowDatabases(ctx)
require.NoError(t, err)
t.Log(databases)
}
func TestShowTables(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
tables, err := mysql.ShowTables(ctx, "db1")
require.NoError(t, err)
t.Log(tables)
}
func TestDescTable(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
descTable, err := mysql.DescTable(ctx, "db1", "students")
require.NoError(t, err)
for _, desc := range descTable {
t.Logf("%+v", *desc)
}
}
func TestExecQuery(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
rows, err := mysql.ExecQuery(ctx, "db1", "SELECT * FROM students WHERE id = 10008")
require.NoError(t, err)
for _, row := range rows {
t.Log(row)
}
}
func TestSelectRows(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
rows, err := mysql.SelectRows(ctx, "db1", "students", "id > 10008")
require.NoError(t, err)
for _, row := range rows {
t.Log(row)
}
}

74
dskit/mysql/timeseries.go Normal file
View File

@@ -0,0 +1,74 @@
package mysql
import (
"context"
"fmt"
"strings"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"gorm.io/gorm"
)
// Query executes a given SQL query in MySQL and returns the results
func (m *MySQL) Query(ctx context.Context, query *sqlbase.QueryParam) ([]map[string]interface{}, error) {
db, err := m.NewConn(ctx, "")
if err != nil {
return nil, err
}
err = m.CheckMaxQueryRows(db, ctx, query)
if err != nil {
return nil, err
}
return sqlbase.Query(ctx, db, query)
}
// QueryTimeseries executes a time series data query using the given parameters
func (m *MySQL) QueryTimeseries(ctx context.Context, query *sqlbase.QueryParam) ([]types.MetricValues, error) {
db, err := m.NewConn(ctx, "")
if err != nil {
return nil, err
}
err = m.CheckMaxQueryRows(db, ctx, query)
if err != nil {
return nil, err
}
return sqlbase.QueryTimeseries(ctx, db, query)
}
func (m *MySQL) CheckMaxQueryRows(db *gorm.DB, ctx context.Context, query *sqlbase.QueryParam) error {
sql := strings.ReplaceAll(query.Sql, ";", "")
checkQuery := &sqlbase.QueryParam{
Sql: fmt.Sprintf("SELECT COUNT(*) as count FROM (%s) AS subquery;", sql),
}
res, err := sqlbase.Query(ctx, db, checkQuery)
if err != nil {
return err
}
if len(res) > 0 {
if count, exists := res[0]["count"]; exists {
v, err := sqlbase.ParseFloat64Value(count)
if err != nil {
return err
}
maxQueryRows := m.Shards[0].MaxQueryRows
if maxQueryRows == 0 {
maxQueryRows = 500
}
if v > float64(maxQueryRows) {
return fmt.Errorf("query result rows count %d exceeds the maximum limit %d", int(v), maxQueryRows)
}
}
}
return nil
}

View File

@@ -0,0 +1,62 @@
// @Author: Ciusyan 5/11/24
package mysql
import (
"context"
"testing"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/stretchr/testify/require"
)
func TestQuery(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
param := &sqlbase.QueryParam{
Sql: "SELECT * FROM students WHERE id > 10900",
Keys: types.Keys{
ValueKey: "",
LabelKey: "",
TimeKey: "",
TimeFormat: "",
},
}
rows, err := mysql.Query(ctx, param)
require.NoError(t, err)
for _, row := range rows {
t.Log(row)
}
}
func TestQueryTimeseries(t *testing.T) {
ctx := context.Background()
settings := `{"mysql.addr":"localhost:3306","mysql.user":"root","mysql.password":"root","mysql.maxIdleConns":5,"mysql.maxOpenConns":10,"mysql.connMaxLifetime":30}`
mysql, err := NewMySQLWithSettings(ctx, settings)
require.NoError(t, err)
// Prepare a test query parameter
param := &sqlbase.QueryParam{
Sql: "SELECT id, grade, student_name, a_grade, update_time FROM students WHERE grade > 20000", // Modify SQL query to select specific columns
Keys: types.Keys{
ValueKey: "grade a_grade", // Set the value key to the column name containing the metric value
LabelKey: "id student_name", // Set the label key to the column name containing the metric label
TimeKey: "update_time", // Set the time key to the column name containing the timestamp
TimeFormat: "2006-01-02 15:04:05 +0000 UTC", // Provide the time format according to the timestamp column's format
},
}
// Execute the query and retrieve the time series data
metricValues, err := mysql.QueryTimeseries(ctx, param)
require.NoError(t, err)
for _, metric := range metricValues {
t.Log(metric)
}
}

37
dskit/pool/pool.go Normal file
View File

@@ -0,0 +1,37 @@
package pool
import (
"bytes"
"sync"
"time"
gc "github.com/patrickmn/go-cache"
)
var (
PoolClient = new(sync.Map)
)
var (
// default cache instance, do not use this if you want to specify the defaultExpiration
DefaultCache = gc.New(time.Hour*24, time.Hour)
)
var (
bytesPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
)
func PoolGetBytesBuffer() *bytes.Buffer {
buf := bytesPool.Get().(*bytes.Buffer)
buf.Reset()
return buf
}
func PoolPutBytesBuffer(buf *bytes.Buffer) {
if buf == nil {
return
}
bytesPool.Put(buf)
}

207
dskit/postgres/postgres.go Normal file
View File

@@ -0,0 +1,207 @@
// @Author: Ciusyan 5/20/24
package postgres
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/ccfos/nightingale/v6/dskit/pool"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
_ "github.com/lib/pq" // PostgreSQL driver
"github.com/mitchellh/mapstructure"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type PostgreSQL struct {
Shard `json:",inline" mapstructure:",squash"`
}
type Shard struct {
Addr string `json:"pgsql.addr" mapstructure:"pgsql.addr"`
DB string `json:"pgsql.db" mapstructure:"pgsql.db"`
User string `json:"pgsql.user" mapstructure:"pgsql.user"`
Password string `json:"pgsql.password" mapstructure:"pgsql.password" `
Timeout int `json:"pgsql.timeout" mapstructure:"pgsql.timeout"`
MaxIdleConns int `json:"pgsql.max_idle_conns" mapstructure:"pgsql.max_idle_conns"`
MaxOpenConns int `json:"pgsql.max_open_conns" mapstructure:"pgsql.max_open_conns"`
ConnMaxLifetime int `json:"pgsql.conn_max_lifetime" mapstructure:"pgsql.conn_max_lifetime"`
MaxQueryRows int `json:"pgsql.max_query_rows" mapstructure:"pgsql.max_query_rows"`
}
// NewPostgreSQLWithSettings initializes a new PostgreSQL instance with the given settings
func NewPostgreSQLWithSettings(ctx context.Context, settings interface{}) (*PostgreSQL, error) {
newest := new(PostgreSQL)
settingsMap := map[string]interface{}{}
switch s := settings.(type) {
case string:
if err := json.Unmarshal([]byte(s), &settingsMap); err != nil {
return nil, err
}
case map[string]interface{}:
settingsMap = s
case *PostgreSQL:
return s, nil
case PostgreSQL:
return &s, nil
case Shard:
newest.Shard = s
return newest, nil
case *Shard:
newest.Shard = *s
return newest, nil
default:
return nil, errors.New("unsupported settings type")
}
if err := mapstructure.Decode(settingsMap, newest); err != nil {
return nil, err
}
return newest, nil
}
// NewConn establishes a new connection to PostgreSQL
func (p *PostgreSQL) NewConn(ctx context.Context, database string) (*gorm.DB, error) {
if len(p.DB) == 0 && len(database) == 0 {
return nil, errors.New("empty pgsql database") // 兼容阿里实时数仓Holgres, 连接时必须指定db名字
}
if p.Shard.Timeout == 0 {
p.Shard.Timeout = 60
}
if p.Shard.MaxIdleConns == 0 {
p.Shard.MaxIdleConns = 10
}
if p.Shard.MaxOpenConns == 0 {
p.Shard.MaxOpenConns = 100
}
if p.Shard.ConnMaxLifetime == 0 {
p.Shard.ConnMaxLifetime = 14400
}
if len(p.Shard.Addr) == 0 {
return nil, errors.New("empty fe-node addr")
}
var keys []string
var err error
keys = append(keys, p.Shard.Addr)
keys = append(keys, p.Shard.Password, p.Shard.User)
if len(database) > 0 {
keys = append(keys, database)
}
cachedKey := strings.Join(keys, ":")
// cache conn with database
conn, ok := pool.PoolClient.Load(cachedKey)
if ok {
return conn.(*gorm.DB), nil
}
var db *gorm.DB
defer func() {
if db != nil && err == nil {
pool.PoolClient.Store(cachedKey, db)
}
}()
// Simplified connection logic for PostgreSQL
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&TimeZone=Asia/Shanghai", p.Shard.User, p.Shard.Password, p.Shard.Addr, database)
db, err = sqlbase.NewDB(
ctx,
postgres.Open(dsn),
p.Shard.MaxIdleConns,
p.Shard.MaxOpenConns,
time.Duration(p.Shard.ConnMaxLifetime)*time.Second,
)
if err != nil {
if db != nil {
sqlDB, _ := db.DB()
if sqlDB != nil {
sqlDB.Close()
}
}
return nil, err
}
return db, nil
}
// ShowDatabases lists all databases in PostgreSQL
func (p *PostgreSQL) ShowDatabases(ctx context.Context, searchKeyword string) ([]string, error) {
db, err := p.NewConn(ctx, "postgres")
if err != nil {
return nil, err
}
sql := fmt.Sprintf("SELECT datname FROM pg_database WHERE datistemplate = false AND datname LIKE %s",
"'%"+searchKeyword+"%'")
return sqlbase.ShowDatabases(ctx, db, sql)
}
// ShowTables lists all tables in a given database
func (p *PostgreSQL) ShowTables(ctx context.Context, searchKeyword string) (map[string][]string, error) {
db, err := p.NewConn(ctx, p.DB)
if err != nil {
return nil, err
}
sql := fmt.Sprintf("SELECT schemaname, tablename FROM pg_tables WHERE schemaname !='information_schema' and schemaname !='pg_catalog' and tablename LIKE %s",
"'%"+searchKeyword+"%'")
rets, err := sqlbase.ExecQuery(ctx, db, sql)
if err != nil {
return nil, err
}
tabs := make(map[string][]string, 3)
for _, row := range rets {
if val, ok := row["schemaname"].(string); ok {
tabs[val] = append(tabs[val], row["tablename"].(string))
}
}
return tabs, nil
}
// DescTable describes the schema of a specified table in PostgreSQL
// scheme default: public if not specified
func (p *PostgreSQL) DescTable(ctx context.Context, scheme, table string) ([]*types.ColumnProperty, error) {
db, err := p.NewConn(ctx, p.DB)
if err != nil {
return nil, err
}
if scheme == "" {
scheme = "public"
}
query := fmt.Sprintf("SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s'", table, scheme)
return sqlbase.DescTable(ctx, db, query)
}
// SelectRows selects rows from a specified table in PostgreSQL based on a given query
func (p *PostgreSQL) SelectRows(ctx context.Context, table, where string) ([]map[string]interface{}, error) {
db, err := p.NewConn(ctx, p.DB)
if err != nil {
return nil, err
}
return sqlbase.SelectRows(ctx, db, table, where)
}
// ExecQuery executes a SQL query in PostgreSQL
func (p *PostgreSQL) ExecQuery(ctx context.Context, sql string) ([]map[string]interface{}, error) {
db, err := p.NewConn(ctx, p.DB)
if err != nil {
return nil, err
}
return sqlbase.ExecQuery(ctx, db, sql)
}

View File

@@ -0,0 +1,73 @@
package postgres
import (
"context"
"fmt"
"strings"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"gorm.io/gorm"
)
// Query executes a given SQL query in PostgreSQL and returns the results
func (p *PostgreSQL) Query(ctx context.Context, query *sqlbase.QueryParam) ([]map[string]interface{}, error) {
db, err := p.NewConn(ctx, p.Shard.DB)
if err != nil {
return nil, err
}
err = p.CheckMaxQueryRows(db, ctx, query)
if err != nil {
return nil, err
}
return sqlbase.Query(ctx, db, query)
}
// QueryTimeseries executes a time series data query using the given parameters
func (p *PostgreSQL) QueryTimeseries(ctx context.Context, query *sqlbase.QueryParam) ([]types.MetricValues, error) {
db, err := p.NewConn(ctx, p.Shard.DB)
if err != nil {
return nil, err
}
err = p.CheckMaxQueryRows(db, ctx, query)
if err != nil {
return nil, err
}
return sqlbase.QueryTimeseries(ctx, db, query, true)
}
func (p *PostgreSQL) CheckMaxQueryRows(db *gorm.DB, ctx context.Context, query *sqlbase.QueryParam) error {
sql := strings.ReplaceAll(query.Sql, ";", "")
checkQuery := &sqlbase.QueryParam{
Sql: fmt.Sprintf("SELECT COUNT(*) as count FROM (%s) AS subquery;", sql),
}
res, err := sqlbase.Query(ctx, db, checkQuery)
if err != nil {
return err
}
if len(res) > 0 {
if count, exists := res[0]["count"]; exists {
v, err := sqlbase.ParseFloat64Value(count)
if err != nil {
return err
}
maxQueryRows := p.Shard.MaxQueryRows
if maxQueryRows == 0 {
maxQueryRows = 500
}
if v > float64(maxQueryRows) {
return fmt.Errorf("query result rows count %d exceeds the maximum limit %d", int(v), maxQueryRows)
}
}
}
return nil
}

View File

@@ -9,9 +9,9 @@ import (
"strings"
"time"
"github.com/ccfos/nightingale/v6/dskit/types"
"gorm.io/gorm"
"github.com/ccfos/nightingale/v6/dskit/types"
)
// NewDB creates a new Gorm DB instance based on the provided gorm.Dialector and configures the connection pool
@@ -19,7 +19,7 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
// Create a new Gorm DB instance
db, err := gorm.Open(dialector, &gorm.Config{})
if err != nil {
return nil, err
return db, err
}
// Configure the connection pool
@@ -35,6 +35,17 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
return db.WithContext(ctx), sqlDB.Ping()
}
func CloseDB(db *gorm.DB) error {
if db != nil {
sqlDb, err := db.DB()
if err != nil {
return err
}
return sqlDb.Close()
}
return nil
}
// ShowTables retrieves a list of all tables in the specified database
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
var tables []string
@@ -112,7 +123,7 @@ func DescTable(ctx context.Context, db *gorm.DB, query string) ([]*types.ColumnP
}
// Convert the database-specific type to internal type
type2, indexable := convertDBType(db.Dialector.Name(), typ)
type2, indexable := ConvertDBType(db.Dialector.Name(), typ)
columns = append(columns, &types.ColumnProperty{
Field: field,
Type: typ,
@@ -175,7 +186,7 @@ func SelectRows(ctx context.Context, db *gorm.DB, table, query string) ([]map[st
}
// convertDBType converts MySQL or PostgreSQL data types to custom internal types and determines if they are indexable
func convertDBType(dialect, dbType string) (string, bool) {
func ConvertDBType(dialect, dbType string) (string, bool) {
typ := strings.ToLower(dbType)
// Common type conversions
@@ -190,7 +201,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
strings.HasPrefix(typ, "char"), strings.HasPrefix(typ, "tinytext"),
strings.HasPrefix(typ, "mediumtext"), strings.HasPrefix(typ, "longtext"),
strings.HasPrefix(typ, "character varying"), strings.HasPrefix(typ, "nvarchar"),
strings.HasPrefix(typ, "nchar"):
strings.HasPrefix(typ, "nchar"), strings.HasPrefix(typ, "bpchar"):
return types.LogExtractValueTypeText, true
case strings.HasPrefix(typ, "float"), strings.HasPrefix(typ, "double"),
@@ -203,7 +214,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
strings.HasPrefix(typ, "time"), strings.HasPrefix(typ, "smalldatetime"):
return types.LogExtractValueTypeDate, false
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"):
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"), strings.HasPrefix(typ, "bool"):
return types.LogExtractValueTypeBool, false
}

View File

@@ -112,7 +112,8 @@ func FormatMetricValues(keys types.Keys, rows []map[string]interface{}, ignoreDe
metricTs[k] = float64(ts.Unix())
default:
// Default to labels for any unrecognized columns
if !ignore {
if !ignore && keys.LabelKey == "" {
// 只有当 labelKey 为空时,才将剩余的列作为 label
labels[k] = fmt.Sprintf("%v", v)
}
}

4
go.mod
View File

@@ -27,11 +27,13 @@ require (
github.com/jinzhu/copier v0.4.0
github.com/json-iterator/go v1.1.12
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
github.com/lib/pq v1.0.0
github.com/mailru/easyjson v0.7.7
github.com/mattn/go-isatty v0.0.19
github.com/mitchellh/mapstructure v1.5.0
github.com/mojocn/base64Captcha v1.3.6
github.com/olivere/elastic/v7 v7.0.32
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml/v2 v2.0.8
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.20.5
@@ -120,7 +122,7 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-sql-driver/mysql v1.6.0
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd // indirect

4
go.sum
View File

@@ -220,6 +220,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
@@ -246,6 +248,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=

File diff suppressed because it is too large Load Diff

View File

Before

Width:  |  Height:  |  Size: 2.5 KiB

After

Width:  |  Height:  |  Size: 2.5 KiB

View File

@@ -141,7 +141,7 @@ func (epc *EventProcessorCacheType) syncEventProcessors() error {
for _, p := range eventPipeline.ProcessorConfigs {
processor, err := models.GetProcessorByType(p.Typ, p.Config)
if err != nil {
logger.Warningf("event_pipeline_id: %d, event:%+v, processor:%+v type not found", eventPipeline.ID, eventPipeline, p)
logger.Warningf("event_pipeline_id: %d, event:%+v, processor:%+v get processor err: %+v", eventPipeline.ID, eventPipeline, p, err)
continue
}

View File

@@ -2,10 +2,8 @@ package memsto
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
@@ -16,23 +14,9 @@ import (
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/pkg/errors"
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/logger"
)
// NotifyTask 表示一个通知发送任务
type NotifyTask struct {
Events []*models.AlertCurEvent
NotifyRuleId int64
NotifyChannel *models.NotifyChannelConfig
TplContent map[string]interface{}
CustomParams map[string]string
Sendtos []string
}
// NotifyRecordFunc 通知记录函数类型
type NotifyRecordFunc func(ctx *ctx.Context, events []*models.AlertCurEvent, notifyRuleId int64, channelName, target, resp string, err error)
type NotifyChannelCacheType struct {
statTotal int64
statLastUpdated int64
@@ -40,18 +24,13 @@ type NotifyChannelCacheType struct {
stats *Stats
sync.RWMutex
channels map[int64]*models.NotifyChannelConfig // key: channel id
channelsQueue map[int64]*list.SafeListLimited
channels map[int64]*models.NotifyChannelConfig // key: channel id
httpConcurrency map[int64]chan struct{}
httpClient map[int64]*http.Client
smtpCh map[int64]chan *models.EmailContext
smtpQuitCh map[int64]chan struct{}
// 队列消费者控制
queueQuitCh map[int64]chan struct{}
// 通知记录回调函数
notifyRecordFunc NotifyRecordFunc
}
func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheType {
@@ -61,20 +40,18 @@ func NewNotifyChannelCache(ctx *ctx.Context, stats *Stats) *NotifyChannelCacheTy
ctx: ctx,
stats: stats,
channels: make(map[int64]*models.NotifyChannelConfig),
channelsQueue: make(map[int64]*list.SafeListLimited),
queueQuitCh: make(map[int64]chan struct{}),
httpClient: make(map[int64]*http.Client),
smtpCh: make(map[int64]chan *models.EmailContext),
smtpQuitCh: make(map[int64]chan struct{}),
}
ncc.SyncNotifyChannels()
return ncc
}
// SetNotifyRecordFunc 设置通知记录回调函数
func (ncc *NotifyChannelCacheType) SetNotifyRecordFunc(fn NotifyRecordFunc) {
ncc.notifyRecordFunc = fn
func (ncc *NotifyChannelCacheType) Reset() {
ncc.Lock()
defer ncc.Unlock()
ncc.statTotal = -1
ncc.statLastUpdated = -1
ncc.channels = make(map[int64]*models.NotifyChannelConfig)
}
func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
@@ -85,253 +62,30 @@ func (ncc *NotifyChannelCacheType) StatChanged(total, lastUpdated int64) bool {
return true
}
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, total, lastUpdated int64) {
func (ncc *NotifyChannelCacheType) Set(m map[int64]*models.NotifyChannelConfig, httpConcurrency map[int64]chan struct{}, httpClient map[int64]*http.Client,
smtpCh map[int64]chan *models.EmailContext, quitCh map[int64]chan struct{}, total, lastUpdated int64) {
ncc.Lock()
defer ncc.Unlock()
for _, k := range ncc.httpConcurrency {
close(k)
}
ncc.httpConcurrency = httpConcurrency
ncc.channels = m
ncc.httpClient = httpClient
ncc.smtpCh = smtpCh
// 1. 处理需要删除的通道
ncc.removeDeletedChannels(m)
for i := range ncc.smtpQuitCh {
close(ncc.smtpQuitCh[i])
}
// 2. 处理新增和更新的通道
ncc.addOrUpdateChannels(m)
ncc.smtpQuitCh = quitCh
ncc.Unlock()
// only one goroutine used, so no need lock
ncc.statTotal = total
ncc.statLastUpdated = lastUpdated
}
// removeDeletedChannels 移除已删除的通道
func (ncc *NotifyChannelCacheType) removeDeletedChannels(newChannels map[int64]*models.NotifyChannelConfig) {
for chID := range ncc.channels {
if _, exists := newChannels[chID]; !exists {
logger.Infof("removing deleted channel %d", chID)
// 停止消费者协程
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
close(quitCh)
delete(ncc.queueQuitCh, chID)
}
// 删除队列
delete(ncc.channelsQueue, chID)
// 删除HTTP客户端
delete(ncc.httpClient, chID)
// 停止SMTP发送器
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
close(quitCh)
delete(ncc.smtpQuitCh, chID)
delete(ncc.smtpCh, chID)
}
// 删除通道配置
delete(ncc.channels, chID)
}
}
}
// addOrUpdateChannels 添加或更新通道
func (ncc *NotifyChannelCacheType) addOrUpdateChannels(newChannels map[int64]*models.NotifyChannelConfig) {
for chID, newChannel := range newChannels {
oldChannel, exists := ncc.channels[chID]
if exists {
if ncc.channelConfigChanged(oldChannel, newChannel) {
logger.Infof("updating channel %d (new: %t)", chID, !exists)
ncc.stopChannelResources(chID)
} else {
logger.Infof("channel %d config not changed", chID)
continue
}
}
// 更新通道配置
ncc.channels[chID] = newChannel
// 根据类型创建相应的资源
switch newChannel.RequestType {
case "http", "flashduty":
// 创建HTTP客户端
if newChannel.RequestConfig != nil && newChannel.RequestConfig.HTTPRequestConfig != nil {
cli, err := models.GetHTTPClient(newChannel)
if err != nil {
logger.Warningf("failed to create HTTP client for channel %d: %v", chID, err)
} else {
if ncc.httpClient == nil {
ncc.httpClient = make(map[int64]*http.Client)
}
ncc.httpClient[chID] = cli
}
}
// 对于 http 类型,启动队列和消费者
if newChannel.RequestType == "http" {
ncc.startHttpChannel(chID, newChannel)
}
case "smtp":
// 创建SMTP发送器
if newChannel.RequestConfig != nil && newChannel.RequestConfig.SMTPRequestConfig != nil {
ch := make(chan *models.EmailContext)
quit := make(chan struct{})
go ncc.startEmailSender(chID, newChannel.RequestConfig.SMTPRequestConfig, ch, quit)
if ncc.smtpCh == nil {
ncc.smtpCh = make(map[int64]chan *models.EmailContext)
}
if ncc.smtpQuitCh == nil {
ncc.smtpQuitCh = make(map[int64]chan struct{})
}
ncc.smtpCh[chID] = ch
ncc.smtpQuitCh[chID] = quit
}
}
}
}
// channelConfigChanged 检查通道配置是否发生变化
func (ncc *NotifyChannelCacheType) channelConfigChanged(oldChannel, newChannel *models.NotifyChannelConfig) bool {
if oldChannel == nil || newChannel == nil {
return true
}
// check updateat
if oldChannel.UpdateAt != newChannel.UpdateAt {
return true
}
return false
}
// stopChannelResources 停止通道的相关资源
func (ncc *NotifyChannelCacheType) stopChannelResources(chID int64) {
// 停止HTTP消费者协程
if quitCh, exists := ncc.queueQuitCh[chID]; exists {
close(quitCh)
delete(ncc.queueQuitCh, chID)
delete(ncc.channelsQueue, chID)
}
// 停止SMTP发送器
if quitCh, exists := ncc.smtpQuitCh[chID]; exists {
close(quitCh)
delete(ncc.smtpQuitCh, chID)
delete(ncc.smtpCh, chID)
}
}
// startHttpChannel 启动HTTP通道的队列和消费者
func (ncc *NotifyChannelCacheType) startHttpChannel(chID int64, channel *models.NotifyChannelConfig) {
if channel.RequestConfig == nil || channel.RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify channel %+v http request config not found", channel)
return
}
// 创建队列
queue := list.NewSafeListLimited(100000)
ncc.channelsQueue[chID] = queue
// 启动消费者协程
quitCh := make(chan struct{})
ncc.queueQuitCh[chID] = quitCh
// 启动指定数量的消费者协程
concurrency := channel.RequestConfig.HTTPRequestConfig.Concurrency
for i := 0; i < concurrency; i++ {
go ncc.startNotifyConsumer(chID, queue, quitCh)
}
logger.Infof("started %d notify consumers for channel %d", concurrency, chID)
}
// 启动通知消费者协程
func (ncc *NotifyChannelCacheType) startNotifyConsumer(channelID int64, queue *list.SafeListLimited, quitCh chan struct{}) {
logger.Infof("starting notify consumer for channel %d", channelID)
for {
select {
case <-quitCh:
logger.Infof("notify consumer for channel %d stopped", channelID)
return
default:
// 从队列中取出任务
task := queue.PopBack()
if task == nil {
// 队列为空,等待一段时间
time.Sleep(100 * time.Millisecond)
continue
}
notifyTask, ok := task.(*NotifyTask)
if !ok {
logger.Errorf("invalid task type in queue for channel %d", channelID)
continue
}
// 处理通知任务
ncc.processNotifyTask(notifyTask)
}
}
}
// processNotifyTask 处理通知任务(仅处理 http 类型)
func (ncc *NotifyChannelCacheType) processNotifyTask(task *NotifyTask) {
httpClient := ncc.GetHttpClient(task.NotifyChannel.ID)
// 现在只处理 http 类型flashduty 保持直接发送
if task.NotifyChannel.RequestType == "http" {
if len(task.Sendtos) == 0 || ncc.needBatchContacts(task.NotifyChannel.RequestConfig.HTTPRequestConfig) {
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, task.Sendtos, httpClient)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos, resp, err)
// 调用通知记录回调函数
if ncc.notifyRecordFunc != nil {
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, task.Sendtos), resp, err)
}
} else {
for i := range task.Sendtos {
resp, err := task.NotifyChannel.SendHTTP(task.Events, task.TplContent, task.CustomParams, []string{task.Sendtos[i]}, httpClient)
logger.Infof("notify_id: %d, channel_name: %v, event:%+v, tplContent:%v, customParams:%v, userInfo:%+v, respBody: %v, err: %v",
task.NotifyRuleId, task.NotifyChannel.Name, task.Events[0], task.TplContent, task.CustomParams, task.Sendtos[i], resp, err)
// 调用通知记录回调函数
if ncc.notifyRecordFunc != nil {
ncc.notifyRecordFunc(ncc.ctx, task.Events, task.NotifyRuleId, task.NotifyChannel.Name, ncc.getSendTarget(task.CustomParams, []string{task.Sendtos[i]}), resp, err)
}
}
}
}
}
// 判断是否需要批量发送联系人
func (ncc *NotifyChannelCacheType) needBatchContacts(requestConfig *models.HTTPRequestConfig) bool {
if requestConfig == nil {
return false
}
b, _ := json.Marshal(requestConfig)
return strings.Contains(string(b), "$sendtos")
}
// 获取发送目标
func (ncc *NotifyChannelCacheType) getSendTarget(customParams map[string]string, sendtos []string) string {
if len(customParams) == 0 {
return strings.Join(sendtos, ",")
}
values := make([]string, 0)
for _, value := range customParams {
runes := []rune(value)
if len(runes) <= 4 {
values = append(values, value)
} else {
maskedValue := string(runes[:len(runes)-4]) + "****"
values = append(values, maskedValue)
}
}
return strings.Join(values, ",")
}
func (ncc *NotifyChannelCacheType) Get(channelId int64) *models.NotifyChannelConfig {
ncc.RLock()
defer ncc.RUnlock()
@@ -363,25 +117,6 @@ func (ncc *NotifyChannelCacheType) GetChannelIds() []int64 {
return list
}
// 新增:将通知任务加入队列
func (ncc *NotifyChannelCacheType) EnqueueNotifyTask(task *NotifyTask) bool {
ncc.RLock()
queue := ncc.channelsQueue[task.NotifyChannel.ID]
ncc.RUnlock()
if queue == nil {
logger.Errorf("no queue found for channel %d", task.NotifyChannel.ID)
return false
}
success := queue.PushFront(task)
if !success {
logger.Warningf("failed to enqueue notify task for channel %d, queue is full", task.NotifyChannel.ID)
}
return success
}
func (ncc *NotifyChannelCacheType) SyncNotifyChannels() {
err := ncc.syncNotifyChannels()
if err != nil {
@@ -427,8 +162,38 @@ func (ncc *NotifyChannelCacheType) syncNotifyChannels() error {
m[lst[i].ID] = lst[i]
}
// 增量更新:只传递通道配置,让增量更新逻辑按需创建资源
ncc.Set(m, stat.Total, stat.LastUpdated)
httpConcurrency := make(map[int64]chan struct{})
httpClient := make(map[int64]*http.Client)
smtpCh := make(map[int64]chan *models.EmailContext)
quitCh := make(map[int64]chan struct{})
for i := range lst {
// todo 优化变更粒度
switch lst[i].RequestType {
case "http", "flashduty":
if lst[i].RequestConfig == nil || lst[i].RequestConfig.HTTPRequestConfig == nil {
logger.Warningf("notify channel %+v http request config not found", lst[i])
continue
}
cli, _ := models.GetHTTPClient(lst[i])
httpClient[lst[i].ID] = cli
httpConcurrency[lst[i].ID] = make(chan struct{}, lst[i].RequestConfig.HTTPRequestConfig.Concurrency)
for j := 0; j < lst[i].RequestConfig.HTTPRequestConfig.Concurrency; j++ {
httpConcurrency[lst[i].ID] <- struct{}{}
}
case "smtp":
ch := make(chan *models.EmailContext)
quit := make(chan struct{})
go ncc.startEmailSender(lst[i].ID, lst[i].RequestConfig.SMTPRequestConfig, ch, quit)
smtpCh[lst[i].ID] = ch
quitCh[lst[i].ID] = quit
default:
}
}
ncc.Set(m, httpConcurrency, httpClient, smtpCh, quitCh, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
ncc.stats.GaugeCronDuration.WithLabelValues("sync_notify_channels").Set(float64(ms))
@@ -540,3 +305,22 @@ func (ncc *NotifyChannelCacheType) dialSmtp(quitCh chan struct{}, d *gomail.Dial
}
}
}
func (ncc *NotifyChannelCacheType) HttpConcurrencyAdd(channelId int64) bool {
ncc.RLock()
defer ncc.RUnlock()
if _, ok := ncc.httpConcurrency[channelId]; !ok {
return false
}
_, ok := <-ncc.httpConcurrency[channelId]
return ok
}
func (ncc *NotifyChannelCacheType) HttpConcurrencyDone(channelId int64) {
ncc.RLock()
defer ncc.RUnlock()
if _, ok := ncc.httpConcurrency[channelId]; !ok {
return
}
ncc.httpConcurrency[channelId] <- struct{}{}
}

View File

@@ -243,6 +243,15 @@ func AlertHisEventGetById(ctx *ctx.Context, id int64) (*AlertHisEvent, error) {
return AlertHisEventGet(ctx, "id=?", id)
}
func AlertHisEventBatchDelete(ctx *ctx.Context, timestamp int64, severities []int, limit int) (int64, error) {
db := DB(ctx).Where("last_eval_time < ?", timestamp)
if len(severities) > 0 {
db = db.Where("severity IN (?)", severities)
}
res := db.Limit(limit).Delete(&AlertHisEvent{})
return res.RowsAffected, res.Error
}
func (m *AlertHisEvent) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}) error {
return DB(ctx).Model(m).Updates(fields).Error
}

View File

@@ -28,6 +28,8 @@ const (
PROMETHEUS = "prometheus"
TDENGINE = "tdengine"
ELASTICSEARCH = "elasticsearch"
MYSQL = "mysql"
POSTGRESQL = "pgsql"
CLICKHOUSE = "ck"
)
@@ -505,7 +507,7 @@ func (ar *AlertRule) Verify() error {
ar.AppendTags = strings.TrimSpace(ar.AppendTags)
arr := strings.Fields(ar.AppendTags)
for i := 0; i < len(arr); i++ {
if len(strings.Split(arr[i], "=")) != 2 {
if !strings.Contains(arr[i], "=") {
return fmt.Errorf("AppendTags(%s) invalid", arr[i])
}
}
@@ -838,7 +840,6 @@ func (ar *AlertRule) FillNotifyGroups(ctx *ctx.Context, cache map[int64]*UserGro
}
func (ar *AlertRule) FE2DB() error {
if len(ar.EnableStimesJSON) > 0 {
ar.EnableStime = strings.Join(ar.EnableStimesJSON, " ")
ar.EnableEtime = strings.Join(ar.EnableEtimesJSON, " ")
@@ -866,7 +867,16 @@ func (ar *AlertRule) FE2DB() error {
ar.NotifyChannels = strings.Join(ar.NotifyChannelsJSON, " ")
ar.NotifyGroups = strings.Join(ar.NotifyGroupsJSON, " ")
ar.Callbacks = strings.Join(ar.CallbacksJSON, " ")
ar.AppendTags = strings.Join(ar.AppendTagsJSON, " ")
for i := range ar.AppendTagsJSON {
// 后面要把多个标签拼接在一起,所以每个标签里不能有空格
ar.AppendTagsJSON[i] = strings.ReplaceAll(ar.AppendTagsJSON[i], " ", "")
}
if len(ar.AppendTagsJSON) > 0 {
ar.AppendTags = strings.Join(ar.AppendTagsJSON, " ")
}
algoParamsByte, err := json.Marshal(ar.AlgoParamsJson)
if err != nil {
return fmt.Errorf("marshal algo_params err:%v", err)
@@ -1175,12 +1185,21 @@ func (ar *AlertRule) IsLokiRule() bool {
return ar.Prod == LOKI || ar.Cate == LOKI
}
func (ar *AlertRule) IsTdengineRule() bool {
return ar.Cate == TDENGINE
}
func (ar *AlertRule) IsHostRule() bool {
return ar.Prod == HOST
}
func (ar *AlertRule) IsTdengineRule() bool {
return ar.Cate == TDENGINE
func (ar *AlertRule) IsInnerRule() bool {
return ar.Cate == TDENGINE ||
ar.Cate == CLICKHOUSE ||
ar.Cate == ELASTICSEARCH ||
ar.Prod == LOKI || ar.Cate == LOKI ||
ar.Cate == MYSQL ||
ar.Cate == POSTGRESQL
}
func (ar *AlertRule) GetRuleType() string {

View File

@@ -116,6 +116,10 @@ func (s *AlertSubscribe) Verify() error {
return errors.New("severities is required")
}
if len(s.NotifyRuleIds) > 0 {
return nil
}
if s.UserGroupIds != "" && s.NewChannels == "" {
// 如果指定了用户组,那么新告警的通知渠道必须指定,否则容易出现告警规则中没有指定通知渠道,导致订阅通知时,没有通知渠道
return errors.New("new_channels is required")

View File

@@ -16,7 +16,7 @@ type EventPipeline struct {
TeamIds []int64 `json:"team_ids" gorm:"type:text;serializer:json"`
TeamNames []string `json:"team_names" gorm:"-"`
Description string `json:"description" gorm:"type:varchar(255)"`
FilterEnable bool `json:"filter_enable" gorm:"type:tinyint(1)"`
FilterEnable bool `json:"filter_enable" gorm:"type:bigint"`
LabelFilters []TagFilter `json:"label_filters" gorm:"type:text;serializer:json"`
AttrFilters []TagFilter `json:"attribute_filters" gorm:"type:text;serializer:json"`
ProcessorConfigs []ProcessorConfig `json:"processors" gorm:"type:text;serializer:json"`

View File

@@ -8,8 +8,12 @@ import (
)
type Processor interface {
Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *AlertCurEvent) *AlertCurEvent // 处理告警事件
Init(settings interface{}) (Processor, error) // 初始化配置
Process(ctx *ctx.Context, event *AlertCurEvent) (*AlertCurEvent, string, error)
// 处理器有三种情况:
// 1. 处理成功,返回处理后的事件
// 2. 处理成功,不需要返回处理后端事件,只返回处理结果,将处理结果放到 string 中,比如 eventdrop callback 处理器
// 3. 处理失败,返回错误,将错误放到 error 中
}
type NewProcessorFn func(settings interface{}) (Processor, error)

View File

@@ -2,6 +2,7 @@ package models
import (
"fmt"
"strings"
"time"
"github.com/toolkits/pkg/logger"
@@ -40,7 +41,7 @@ func ConvertAlert(rule PromRule, interval string, datasouceQueries []DatasourceQ
if len(rule.Labels) > 0 {
for k, v := range rule.Labels {
if k != "severity" {
appendTags = append(appendTags, fmt.Sprintf("%s=%s", k, v))
appendTags = append(appendTags, fmt.Sprintf("%s=%s", strings.ReplaceAll(k, " ", ""), strings.ReplaceAll(v, " ", "")))
} else {
switch v {
case "critical":