mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-09 09:29:05 +00:00
Compare commits
3 Commits
feat-pg-al
...
optimize-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6a857f030 | ||
|
|
85786d985d | ||
|
|
cff211364a |
@@ -185,8 +185,8 @@ func (e *Dispatch) HandleEventWithNotifyRule(eventOrigin *models.AlertCurEvent)
|
||||
|
||||
for _, processor := range processors {
|
||||
logger.Infof("before processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
|
||||
eventCopy = processor.Process(e.ctx, eventCopy)
|
||||
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v", notifyRuleId, eventCopy, processor)
|
||||
eventCopy, res, err := processor.Process(e.ctx, eventCopy)
|
||||
logger.Infof("after processor notify_id: %d, event:%+v, processor:%+v, res:%v, err:%v", notifyRuleId, eventCopy, processor, res, err)
|
||||
if eventCopy == nil {
|
||||
logger.Warningf("notify_id: %d, event:%+v, processor:%+v, event is nil", notifyRuleId, eventCopy, processor)
|
||||
break
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/aisummary"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventdrop"
|
||||
_ "github.com/ccfos/nightingale/v6/alert/pipeline/processor/eventupdate"
|
||||
|
||||
198
alert/pipeline/processor/aisummary/ai_summary.go
Normal file
198
alert/pipeline/processor/aisummary/ai_summary.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package aisummary
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/common"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/ccfos/nightingale/v6/pkg/tplx"
|
||||
)
|
||||
|
||||
const (
|
||||
HTTP_STATUS_SUCCESS_MAX = 299
|
||||
)
|
||||
|
||||
// AISummaryConfig 配置结构体
|
||||
type AISummaryConfig struct {
|
||||
callback.HTTPConfig
|
||||
ModelName string `json:"model_name"`
|
||||
APIKey string `json:"api_key"`
|
||||
PromptTemplate string `json:"prompt_template"`
|
||||
CustomParams map[string]interface{} `json:"custom_params"`
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
Role string `json:"role"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
type ChatCompletionResponse struct {
|
||||
Choices []struct {
|
||||
Message struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"message"`
|
||||
} `json:"choices"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
models.RegisterProcessor("ai_summary", &AISummaryConfig{})
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
result, err := common.InitProcessor[*AISummaryConfig](settings)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
if c.Client == nil {
|
||||
if err := c.initHTTPClient(); err != nil {
|
||||
return event, "", fmt.Errorf("failed to initialize HTTP client: %v processor: %v", err, c)
|
||||
}
|
||||
}
|
||||
|
||||
// 准备告警事件信息
|
||||
eventInfo, err := c.prepareEventInfo(event)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to prepare event info: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
// 调用AI模型生成总结
|
||||
summary, err := c.generateAISummary(eventInfo)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to generate AI summary: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
// 将总结添加到annotations字段
|
||||
if event.AnnotationsJSON == nil {
|
||||
event.AnnotationsJSON = make(map[string]string)
|
||||
}
|
||||
event.AnnotationsJSON["ai_summary"] = summary
|
||||
|
||||
// 更新Annotations字段
|
||||
b, err := json.Marshal(event.AnnotationsJSON)
|
||||
if err != nil {
|
||||
return event, "", fmt.Errorf("failed to marshal annotations: %v processor: %v", err, c)
|
||||
}
|
||||
event.Annotations = string(b)
|
||||
|
||||
return event, "", nil
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) initHTTPClient() error {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
}
|
||||
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse proxy url: %v", err)
|
||||
}
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
|
||||
c.Client = &http.Client{
|
||||
Timeout: time.Duration(c.Timeout) * time.Millisecond,
|
||||
Transport: transport,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) prepareEventInfo(event *models.AlertCurEvent) (string, error) {
|
||||
var defs = []string{
|
||||
"{{$event := .}}",
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, c.PromptTemplate), "")
|
||||
t, err := template.New("prompt").Funcs(template.FuncMap(tplx.TemplateFuncMap)).Parse(text)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse prompt template: %v", err)
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
err = t.Execute(&body, event)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to execute prompt template: %v", err)
|
||||
}
|
||||
|
||||
return body.String(), nil
|
||||
}
|
||||
|
||||
func (c *AISummaryConfig) generateAISummary(eventInfo string) (string, error) {
|
||||
// 构建基础请求参数
|
||||
reqParams := map[string]interface{}{
|
||||
"model": c.ModelName,
|
||||
"messages": []Message{
|
||||
{
|
||||
Role: "user",
|
||||
Content: eventInfo,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// 合并自定义参数
|
||||
for k, v := range c.CustomParams {
|
||||
reqParams[k] = v
|
||||
}
|
||||
|
||||
// 序列化请求体
|
||||
jsonData, err := json.Marshal(reqParams)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal request body: %v", err)
|
||||
}
|
||||
|
||||
// 创建HTTP请求
|
||||
req, err := http.NewRequest("POST", c.URL, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request: %v", err)
|
||||
}
|
||||
|
||||
// 设置请求头
|
||||
req.Header.Set("Authorization", "Bearer "+c.APIKey)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
for k, v := range c.Headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
// 发送请求
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to send request: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 检查响应状态码
|
||||
if resp.StatusCode > HTTP_STATUS_SUCCESS_MAX {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return "", fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
// 读取响应
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read response body: %v", err)
|
||||
}
|
||||
|
||||
// 解析响应
|
||||
var chatResp ChatCompletionResponse
|
||||
if err := json.Unmarshal(body, &chatResp); err != nil {
|
||||
return "", fmt.Errorf("failed to unmarshal response: %v", err)
|
||||
}
|
||||
|
||||
if len(chatResp.Choices) == 0 {
|
||||
return "", fmt.Errorf("no response from AI model")
|
||||
}
|
||||
|
||||
return chatResp.Choices[0].Message.Content, nil
|
||||
}
|
||||
69
alert/pipeline/processor/aisummary/ai_summary_test.go
Normal file
69
alert/pipeline/processor/aisummary/ai_summary_test.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package aisummary
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/alert/pipeline/processor/callback"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestAISummaryConfig_Process(t *testing.T) {
|
||||
// 创建测试配置
|
||||
config := &AISummaryConfig{
|
||||
HTTPConfig: callback.HTTPConfig{
|
||||
URL: "https://generativelanguage.googleapis.com/v1beta/openai/chat/completions",
|
||||
Timeout: 30000,
|
||||
SkipSSLVerify: true,
|
||||
Headers: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
},
|
||||
ModelName: "gemini-2.0-flash",
|
||||
APIKey: "*",
|
||||
PromptTemplate: "告警规则:{{$event.RuleName}}\n严重程度:{{$event.Severity}}",
|
||||
CustomParams: map[string]interface{}{
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 2000,
|
||||
"top_p": 0.9,
|
||||
},
|
||||
}
|
||||
|
||||
// 创建测试事件
|
||||
event := &models.AlertCurEvent{
|
||||
RuleName: "Test Rule",
|
||||
Severity: 1,
|
||||
TagsMap: map[string]string{
|
||||
"host": "test-host",
|
||||
},
|
||||
AnnotationsJSON: map[string]string{
|
||||
"description": "Test alert",
|
||||
},
|
||||
}
|
||||
|
||||
// 测试模板处理
|
||||
eventInfo, err := config.prepareEventInfo(event)
|
||||
assert.NoError(t, err)
|
||||
assert.Contains(t, eventInfo, "Test Rule")
|
||||
assert.Contains(t, eventInfo, "1")
|
||||
|
||||
// 测试配置初始化
|
||||
processor, err := config.Init(config)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, processor)
|
||||
|
||||
// 测试处理函数
|
||||
result, _, err := processor.Process(&ctx.Context{}, event)
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, result)
|
||||
assert.NotEmpty(t, result.AnnotationsJSON["ai_summary"])
|
||||
|
||||
// 展示处理结果
|
||||
t.Log("\n=== 处理结果 ===")
|
||||
t.Logf("告警规则: %s", result.RuleName)
|
||||
t.Logf("严重程度: %d", result.Severity)
|
||||
t.Logf("标签: %v", result.TagsMap)
|
||||
t.Logf("原始注释: %v", result.AnnotationsJSON["description"])
|
||||
t.Logf("AI总结: %s", result.AnnotationsJSON["ai_summary"])
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package callback
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -42,7 +43,7 @@ func (c *CallbackConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
|
||||
func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
@@ -51,7 +52,7 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to parse proxy url: %v", err)
|
||||
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
@@ -71,14 +72,12 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal event: %v", err)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
@@ -91,16 +90,14 @@ func (c *CallbackConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send request: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response body: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
logger.Infof("response body: %s", string(b))
|
||||
return event
|
||||
logger.Debugf("callback processor response body: %s", string(b))
|
||||
return event, "callback success", nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package eventdrop
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"strings"
|
||||
texttemplate "text/template"
|
||||
|
||||
@@ -25,7 +26,7 @@ func (c *EventDropConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
|
||||
func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
// 使用背景是可以根据此处理器,实现对事件进行更加灵活的过滤的逻辑
|
||||
// 在标签过滤和属性过滤都不满足需求时可以使用
|
||||
// 如果模板执行结果为 true,则删除该事件
|
||||
@@ -40,22 +41,20 @@ func (c *EventDropConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent)
|
||||
|
||||
tpl, err := texttemplate.New("eventdrop").Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
if err != nil {
|
||||
logger.Errorf("processor failed to parse template: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("processor failed to parse template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
var body bytes.Buffer
|
||||
if err = tpl.Execute(&body, event); err != nil {
|
||||
logger.Errorf("processor failed to execute template: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("processor failed to execute template: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
result := strings.TrimSpace(body.String())
|
||||
logger.Infof("processor eventdrop result: %v", result)
|
||||
if result == "true" {
|
||||
logger.Infof("processor eventdrop drop event: %v", event)
|
||||
return nil
|
||||
return event, "drop event success", nil
|
||||
}
|
||||
|
||||
return event
|
||||
return event, "drop event failed", nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package eventupdate
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -30,7 +31,7 @@ func (c *EventUpdateConfig) Init(settings interface{}) (models.Processor, error)
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
|
||||
func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
if c.Client == nil {
|
||||
transport := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: c.SkipSSLVerify},
|
||||
@@ -39,7 +40,7 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
if c.Proxy != "" {
|
||||
proxyURL, err := url.Parse(c.Proxy)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to parse proxy url: %v", err)
|
||||
return event, "", fmt.Errorf("failed to parse proxy url: %v processor: %v", err, c)
|
||||
} else {
|
||||
transport.Proxy = http.ProxyURL(proxyURL)
|
||||
}
|
||||
@@ -59,14 +60,12 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
|
||||
body, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to marshal event: %v", err)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to marshal event: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", c.URL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
logger.Errorf("failed to create request: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to create request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
for k, v := range headers {
|
||||
@@ -79,22 +78,19 @@ func (c *EventUpdateConfig) Process(ctx *ctx.Context, event *models.AlertCurEven
|
||||
|
||||
resp, err := c.Client.Do(req)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to send request: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to send request: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to read response body: %v event: %v", err, event)
|
||||
return event
|
||||
return nil, "", fmt.Errorf("failed to read response body: %v processor: %v", err, c)
|
||||
}
|
||||
logger.Infof("response body: %s", string(b))
|
||||
logger.Debugf("event update processor response body: %s", string(b))
|
||||
|
||||
err = json.Unmarshal(b, &event)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to unmarshal response body: %v event: %v", err, event)
|
||||
return event
|
||||
return event, "", fmt.Errorf("failed to unmarshal response body: %v processor: %v", err, c)
|
||||
}
|
||||
|
||||
return event
|
||||
return event, "", nil
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func (r *RelabelConfig) Init(settings interface{}) (models.Processor, error) {
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *models.AlertCurEvent {
|
||||
func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) (*models.AlertCurEvent, string, error) {
|
||||
sourceLabels := make([]model.LabelName, len(r.SourceLabels))
|
||||
for i := range r.SourceLabels {
|
||||
sourceLabels[i] = model.LabelName(strings.ReplaceAll(r.SourceLabels[i], ".", REPLACE_DOT))
|
||||
@@ -64,7 +64,7 @@ func (r *RelabelConfig) Process(ctx *ctx.Context, event *models.AlertCurEvent) *
|
||||
}
|
||||
|
||||
EventRelabel(event, relabelConfigs)
|
||||
return event
|
||||
return event, "", nil
|
||||
}
|
||||
|
||||
func EventRelabel(event *models.AlertCurEvent, relabelConfigs []*pconf.RelabelConfig) {
|
||||
|
||||
@@ -37,4 +37,10 @@ var Plugins = []Plugin{
|
||||
Type: "mysql",
|
||||
TypeName: "MySQL",
|
||||
},
|
||||
{
|
||||
Id: 7,
|
||||
Category: "timeseries",
|
||||
Type: "pgsql",
|
||||
TypeName: "PostgreSQL",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
// 获取事件Pipeline列表
|
||||
@@ -145,9 +144,17 @@ func (rt *Router) tryRunEventPipeline(c *gin.Context) {
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
|
||||
}
|
||||
event = processor.Process(rt.Ctx, event)
|
||||
event, _, err = processor.Process(rt.Ctx, event)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
|
||||
if event == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event is dropped")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": "event is dropped",
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,13 +179,15 @@ func (rt *Router) tryRunEventProcessor(c *gin.Context) {
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor err: %+v", err)
|
||||
}
|
||||
event = processor.Process(rt.Ctx, event)
|
||||
logger.Infof("processor %+v result: %+v", f.ProcessorConfig, event)
|
||||
if event == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event is dropped")
|
||||
event, res, err := processor.Process(rt.Ctx, event)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor err: %+v", err)
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(event, nil)
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": res,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
@@ -212,9 +221,17 @@ func (rt *Router) tryRunEventProcessorByNotifyRule(c *gin.Context) {
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "get processor: %+v err: %+v", p, err)
|
||||
}
|
||||
event = processor.Process(rt.Ctx, event)
|
||||
|
||||
event, _, err := processor.Process(rt.Ctx, event)
|
||||
if err != nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "processor: %+v err: %+v", p, err)
|
||||
}
|
||||
if event == nil {
|
||||
ginx.Bomb(http.StatusBadRequest, "event is dropped")
|
||||
ginx.NewRender(c).Data(map[string]interface{}{
|
||||
"event": event,
|
||||
"result": "event is dropped",
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,6 +60,13 @@ func init() {
|
||||
PluginType: "mysql",
|
||||
PluginTypeName: "MySQL",
|
||||
}
|
||||
|
||||
DatasourceTypes[6] = DatasourceType{
|
||||
Id: 6,
|
||||
Category: "timeseries",
|
||||
PluginType: "pgsql",
|
||||
PluginTypeName: "PostgreSQL",
|
||||
}
|
||||
}
|
||||
|
||||
type NewDatasrouceFn func(settings map[string]interface{}) (Datasource, error)
|
||||
|
||||
@@ -94,6 +94,26 @@ func (m *MySQL) Equal(p datasource.Datasource) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Timeout != newShard.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
346
datasource/postgresql/postgresql.go
Normal file
346
datasource/postgresql/postgresql.go
Normal file
@@ -0,0 +1,346 @@
|
||||
package postgresql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/postgres"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
PostgreSQLType = "pgsql"
|
||||
)
|
||||
|
||||
var (
|
||||
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
|
||||
)
|
||||
|
||||
func init() {
|
||||
datasource.RegisterDatasource(PostgreSQLType, new(PostgreSQL))
|
||||
}
|
||||
|
||||
type PostgreSQL struct {
|
||||
Shards []*postgres.PostgreSQL `json:"pgsql.shards" mapstructure:"pgsql.shards"`
|
||||
}
|
||||
|
||||
type QueryParam struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
From int64 `json:"from" mapstructure:"from"`
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) InitClient() error {
|
||||
if len(p.Shards) == 0 {
|
||||
return fmt.Errorf("not found postgresql addr, please check datasource config")
|
||||
}
|
||||
for _, shard := range p.Shards {
|
||||
if db, err := shard.NewConn(context.TODO(), "postgres"); err != nil {
|
||||
defer sqlbase.CloseDB(db)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
|
||||
newest := new(PostgreSQL)
|
||||
err := mapstructure.Decode(settings, newest)
|
||||
return newest, err
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) Validate(ctx context.Context) error {
|
||||
if len(p.Shards) == 0 || len(strings.TrimSpace(p.Shards[0].Addr)) == 0 {
|
||||
return fmt.Errorf("postgresql addr is invalid, please check datasource setting")
|
||||
}
|
||||
|
||||
if len(strings.TrimSpace(p.Shards[0].User)) == 0 {
|
||||
return fmt.Errorf("postgresql user is invalid, please check datasource setting")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Equal compares whether two objects are the same, used for caching
|
||||
func (p *PostgreSQL) Equal(d datasource.Datasource) bool {
|
||||
newest, ok := d.(*PostgreSQL)
|
||||
if !ok {
|
||||
logger.Errorf("unexpected plugin type, expected is postgresql")
|
||||
return false
|
||||
}
|
||||
|
||||
if len(p.Shards) == 0 || len(newest.Shards) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
oldShard := p.Shards[0]
|
||||
newShard := newest.Shards[0]
|
||||
|
||||
if oldShard.Addr != newShard.Addr {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.User != newShard.User {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Password != newShard.Password {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Timeout != newShard.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
return p.Shards[0].ShowDatabases(ctx, "")
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
p.Shards[0].DB = database
|
||||
rets, err := p.Shards[0].ShowTables(ctx, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tables := make([]string, 0, len(rets))
|
||||
for scheme, tabs := range rets {
|
||||
for _, tab := range tabs {
|
||||
tables = append(tables, scheme+"."+tab)
|
||||
}
|
||||
}
|
||||
return tables, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if postgresqlQueryParam.Database != "" {
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
} else {
|
||||
db, err := parseDBName(postgresqlQueryParam.SQL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
timeout := p.Shards[0].Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
items, err := p.Shards[0].QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
|
||||
Sql: postgresqlQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
ValueKey: postgresqlQueryParam.Keys.ValueKey,
|
||||
LabelKey: postgresqlQueryParam.Keys.LabelKey,
|
||||
TimeKey: postgresqlQueryParam.Keys.TimeKey,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
|
||||
return []models.DataResp{}, err
|
||||
}
|
||||
data := make([]models.DataResp, 0)
|
||||
for i := range items {
|
||||
data = append(data, models.DataResp{
|
||||
Ref: postgresqlQueryParam.Ref,
|
||||
Metric: items[i].Metric,
|
||||
Values: items[i].Values,
|
||||
})
|
||||
}
|
||||
|
||||
// parse resp to time series data
|
||||
logger.Infof("req:%+v keys:%+v \n data:%v", postgresqlQueryParam, postgresqlQueryParam.Keys, data)
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if postgresqlQueryParam.Database != "" {
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
} else {
|
||||
db, err := parseDBName(postgresqlQueryParam.SQL)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
timeout := p.Shards[0].Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
items, err := p.Shards[0].Query(timeoutCtx, &sqlbase.QueryParam{
|
||||
Sql: postgresqlQueryParam.SQL,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
|
||||
return []interface{}{}, 0, err
|
||||
}
|
||||
logs := make([]interface{}, 0)
|
||||
for i := range items {
|
||||
logs = append(logs, items[i])
|
||||
}
|
||||
|
||||
return logs, 0, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
pairs := strings.Split(postgresqlQueryParam.Table, ".") // format: scheme.table_name
|
||||
scheme := ""
|
||||
table := postgresqlQueryParam.Table
|
||||
if len(pairs) == 2 {
|
||||
scheme = pairs[0]
|
||||
table = pairs[1]
|
||||
}
|
||||
return p.Shards[0].DescTable(ctx, scheme, table)
|
||||
}
|
||||
|
||||
func parseDBName(sql string) (db string, err error) {
|
||||
re := regexp.MustCompile(regx)
|
||||
matches := re.FindStringSubmatch(sql)
|
||||
if len(matches) != 4 {
|
||||
return "", fmt.Errorf("no valid table name in format database.schema.table found")
|
||||
}
|
||||
return matches[1], nil
|
||||
}
|
||||
|
||||
func extractColumns(sql string) ([]string, error) {
|
||||
// 将 SQL 转换为小写以简化匹配
|
||||
sql = strings.ToLower(sql)
|
||||
|
||||
// 匹配 SELECT 和 FROM 之间的内容
|
||||
re := regexp.MustCompile(`select\s+(.*?)\s+from`)
|
||||
matches := re.FindStringSubmatch(sql)
|
||||
|
||||
if len(matches) < 2 {
|
||||
return nil, fmt.Errorf("no columns found or invalid SQL syntax")
|
||||
}
|
||||
|
||||
// 提取列部分
|
||||
columnsString := matches[1]
|
||||
|
||||
// 分割列
|
||||
columns := splitColumns(columnsString)
|
||||
|
||||
// 清理每个列名
|
||||
for i, col := range columns {
|
||||
columns[i] = strings.TrimSpace(col)
|
||||
}
|
||||
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func splitColumns(columnsString string) []string {
|
||||
var columns []string
|
||||
var currentColumn strings.Builder
|
||||
parenthesesCount := 0
|
||||
inQuotes := false
|
||||
|
||||
for _, char := range columnsString {
|
||||
switch char {
|
||||
case '(':
|
||||
parenthesesCount++
|
||||
currentColumn.WriteRune(char)
|
||||
case ')':
|
||||
parenthesesCount--
|
||||
currentColumn.WriteRune(char)
|
||||
case '\'', '"':
|
||||
inQuotes = !inQuotes
|
||||
currentColumn.WriteRune(char)
|
||||
case ',':
|
||||
if parenthesesCount == 0 && !inQuotes {
|
||||
columns = append(columns, currentColumn.String())
|
||||
currentColumn.Reset()
|
||||
} else {
|
||||
currentColumn.WriteRune(char)
|
||||
}
|
||||
default:
|
||||
currentColumn.WriteRune(char)
|
||||
}
|
||||
}
|
||||
|
||||
if currentColumn.Len() > 0 {
|
||||
columns = append(columns, currentColumn.String())
|
||||
}
|
||||
|
||||
return columns
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/ck"
|
||||
"github.com/ccfos/nightingale/v6/datasource/es"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/mysql"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/postgresql"
|
||||
"github.com/ccfos/nightingale/v6/dskit/tdengine"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
207
dskit/postgres/postgres.go
Normal file
207
dskit/postgres/postgres.go
Normal file
@@ -0,0 +1,207 @@
|
||||
// @Author: Ciusyan 5/20/24
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/pool"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
|
||||
_ "github.com/lib/pq" // PostgreSQL driver
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type PostgreSQL struct {
|
||||
Shard `json:",inline" mapstructure:",squash"`
|
||||
}
|
||||
|
||||
type Shard struct {
|
||||
Addr string `json:"pgsql.addr" mapstructure:"pgsql.addr"`
|
||||
DB string `json:"pgsql.db" mapstructure:"pgsql.db"`
|
||||
User string `json:"pgsql.user" mapstructure:"pgsql.user"`
|
||||
Password string `json:"pgsql.password" mapstructure:"pgsql.password" `
|
||||
Timeout int `json:"pgsql.timeout" mapstructure:"pgsql.timeout"`
|
||||
MaxIdleConns int `json:"pgsql.max_idle_conns" mapstructure:"pgsql.max_idle_conns"`
|
||||
MaxOpenConns int `json:"pgsql.max_open_conns" mapstructure:"pgsql.max_open_conns"`
|
||||
ConnMaxLifetime int `json:"pgsql.conn_max_lifetime" mapstructure:"pgsql.conn_max_lifetime"`
|
||||
MaxQueryRows int `json:"pgsql.max_query_rows" mapstructure:"pgsql.max_query_rows"`
|
||||
}
|
||||
|
||||
// NewPostgreSQLWithSettings initializes a new PostgreSQL instance with the given settings
|
||||
func NewPostgreSQLWithSettings(ctx context.Context, settings interface{}) (*PostgreSQL, error) {
|
||||
newest := new(PostgreSQL)
|
||||
settingsMap := map[string]interface{}{}
|
||||
|
||||
switch s := settings.(type) {
|
||||
case string:
|
||||
if err := json.Unmarshal([]byte(s), &settingsMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case map[string]interface{}:
|
||||
settingsMap = s
|
||||
case *PostgreSQL:
|
||||
return s, nil
|
||||
case PostgreSQL:
|
||||
return &s, nil
|
||||
case Shard:
|
||||
newest.Shard = s
|
||||
return newest, nil
|
||||
case *Shard:
|
||||
newest.Shard = *s
|
||||
return newest, nil
|
||||
default:
|
||||
return nil, errors.New("unsupported settings type")
|
||||
}
|
||||
|
||||
if err := mapstructure.Decode(settingsMap, newest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newest, nil
|
||||
}
|
||||
|
||||
// NewConn establishes a new connection to PostgreSQL
|
||||
func (p *PostgreSQL) NewConn(ctx context.Context, database string) (*gorm.DB, error) {
|
||||
if len(p.DB) == 0 && len(database) == 0 {
|
||||
return nil, errors.New("empty pgsql database") // 兼容阿里实时数仓Holgres, 连接时必须指定db名字
|
||||
}
|
||||
|
||||
if p.Shard.Timeout == 0 {
|
||||
p.Shard.Timeout = 60
|
||||
}
|
||||
|
||||
if p.Shard.MaxIdleConns == 0 {
|
||||
p.Shard.MaxIdleConns = 10
|
||||
}
|
||||
|
||||
if p.Shard.MaxOpenConns == 0 {
|
||||
p.Shard.MaxOpenConns = 100
|
||||
}
|
||||
|
||||
if p.Shard.ConnMaxLifetime == 0 {
|
||||
p.Shard.ConnMaxLifetime = 14400
|
||||
}
|
||||
|
||||
if len(p.Shard.Addr) == 0 {
|
||||
return nil, errors.New("empty fe-node addr")
|
||||
}
|
||||
var keys []string
|
||||
var err error
|
||||
keys = append(keys, p.Shard.Addr)
|
||||
|
||||
keys = append(keys, p.Shard.Password, p.Shard.User)
|
||||
if len(database) > 0 {
|
||||
keys = append(keys, database)
|
||||
}
|
||||
cachedKey := strings.Join(keys, ":")
|
||||
// cache conn with database
|
||||
conn, ok := pool.PoolClient.Load(cachedKey)
|
||||
if ok {
|
||||
return conn.(*gorm.DB), nil
|
||||
}
|
||||
|
||||
var db *gorm.DB
|
||||
defer func() {
|
||||
if db != nil && err == nil {
|
||||
pool.PoolClient.Store(cachedKey, db)
|
||||
}
|
||||
}()
|
||||
|
||||
// Simplified connection logic for PostgreSQL
|
||||
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&TimeZone=Asia/Shanghai", p.Shard.User, p.Shard.Password, p.Shard.Addr, database)
|
||||
db, err = sqlbase.NewDB(
|
||||
ctx,
|
||||
postgres.Open(dsn),
|
||||
p.Shard.MaxIdleConns,
|
||||
p.Shard.MaxOpenConns,
|
||||
time.Duration(p.Shard.ConnMaxLifetime)*time.Second,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if db != nil {
|
||||
sqlDB, _ := db.DB()
|
||||
if sqlDB != nil {
|
||||
sqlDB.Close()
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// ShowDatabases lists all databases in PostgreSQL
|
||||
func (p *PostgreSQL) ShowDatabases(ctx context.Context, searchKeyword string) ([]string, error) {
|
||||
db, err := p.NewConn(ctx, "postgres")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sql := fmt.Sprintf("SELECT datname FROM pg_database WHERE datistemplate = false AND datname LIKE %s",
|
||||
"'%"+searchKeyword+"%'")
|
||||
return sqlbase.ShowDatabases(ctx, db, sql)
|
||||
}
|
||||
|
||||
// ShowTables lists all tables in a given database
|
||||
func (p *PostgreSQL) ShowTables(ctx context.Context, searchKeyword string) (map[string][]string, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sql := fmt.Sprintf("SELECT schemaname, tablename FROM pg_tables WHERE schemaname !='information_schema' and schemaname !='pg_catalog' and tablename LIKE %s",
|
||||
"'%"+searchKeyword+"%'")
|
||||
rets, err := sqlbase.ExecQuery(ctx, db, sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tabs := make(map[string][]string, 3)
|
||||
for _, row := range rets {
|
||||
if val, ok := row["schemaname"].(string); ok {
|
||||
tabs[val] = append(tabs[val], row["tablename"].(string))
|
||||
}
|
||||
}
|
||||
return tabs, nil
|
||||
}
|
||||
|
||||
// DescTable describes the schema of a specified table in PostgreSQL
|
||||
// scheme default: public if not specified
|
||||
func (p *PostgreSQL) DescTable(ctx context.Context, scheme, table string) ([]*types.ColumnProperty, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if scheme == "" {
|
||||
scheme = "public"
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s'", table, scheme)
|
||||
return sqlbase.DescTable(ctx, db, query)
|
||||
}
|
||||
|
||||
// SelectRows selects rows from a specified table in PostgreSQL based on a given query
|
||||
func (p *PostgreSQL) SelectRows(ctx context.Context, table, where string) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.SelectRows(ctx, db, table, where)
|
||||
}
|
||||
|
||||
// ExecQuery executes a SQL query in PostgreSQL
|
||||
func (p *PostgreSQL) ExecQuery(ctx context.Context, sql string) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.ExecQuery(ctx, db, sql)
|
||||
}
|
||||
73
dskit/postgres/timeseries.go
Normal file
73
dskit/postgres/timeseries.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in PostgreSQL and returns the results
|
||||
func (p *PostgreSQL) Query(ctx context.Context, query *sqlbase.QueryParam) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.Shard.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.CheckMaxQueryRows(db, ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.Query(ctx, db, query)
|
||||
}
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters
|
||||
func (p *PostgreSQL) QueryTimeseries(ctx context.Context, query *sqlbase.QueryParam) ([]types.MetricValues, error) {
|
||||
db, err := p.NewConn(ctx, p.Shard.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.CheckMaxQueryRows(db, ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.QueryTimeseries(ctx, db, query, true)
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) CheckMaxQueryRows(db *gorm.DB, ctx context.Context, query *sqlbase.QueryParam) error {
|
||||
sql := strings.ReplaceAll(query.Sql, ";", "")
|
||||
checkQuery := &sqlbase.QueryParam{
|
||||
Sql: fmt.Sprintf("SELECT COUNT(*) as count FROM (%s) AS subquery;", sql),
|
||||
}
|
||||
|
||||
res, err := sqlbase.Query(ctx, db, checkQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res) > 0 {
|
||||
if count, exists := res[0]["count"]; exists {
|
||||
v, err := sqlbase.ParseFloat64Value(count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxQueryRows := p.Shard.MaxQueryRows
|
||||
if maxQueryRows == 0 {
|
||||
maxQueryRows = 500
|
||||
}
|
||||
|
||||
if v > float64(maxQueryRows) {
|
||||
return fmt.Errorf("query result rows count %d exceeds the maximum limit %d", int(v), maxQueryRows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -9,9 +9,9 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
)
|
||||
|
||||
// NewDB creates a new Gorm DB instance based on the provided gorm.Dialector and configures the connection pool
|
||||
@@ -19,7 +19,7 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
|
||||
// Create a new Gorm DB instance
|
||||
db, err := gorm.Open(dialector, &gorm.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return db, err
|
||||
}
|
||||
|
||||
// Configure the connection pool
|
||||
@@ -35,6 +35,17 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
|
||||
return db.WithContext(ctx), sqlDB.Ping()
|
||||
}
|
||||
|
||||
func CloseDB(db *gorm.DB) error {
|
||||
if db != nil {
|
||||
sqlDb, err := db.DB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqlDb.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShowTables retrieves a list of all tables in the specified database
|
||||
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
|
||||
var tables []string
|
||||
@@ -112,7 +123,7 @@ func DescTable(ctx context.Context, db *gorm.DB, query string) ([]*types.ColumnP
|
||||
}
|
||||
|
||||
// Convert the database-specific type to internal type
|
||||
type2, indexable := convertDBType(db.Dialector.Name(), typ)
|
||||
type2, indexable := ConvertDBType(db.Dialector.Name(), typ)
|
||||
columns = append(columns, &types.ColumnProperty{
|
||||
Field: field,
|
||||
Type: typ,
|
||||
@@ -175,7 +186,7 @@ func SelectRows(ctx context.Context, db *gorm.DB, table, query string) ([]map[st
|
||||
}
|
||||
|
||||
// convertDBType converts MySQL or PostgreSQL data types to custom internal types and determines if they are indexable
|
||||
func convertDBType(dialect, dbType string) (string, bool) {
|
||||
func ConvertDBType(dialect, dbType string) (string, bool) {
|
||||
typ := strings.ToLower(dbType)
|
||||
|
||||
// Common type conversions
|
||||
@@ -190,7 +201,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
|
||||
strings.HasPrefix(typ, "char"), strings.HasPrefix(typ, "tinytext"),
|
||||
strings.HasPrefix(typ, "mediumtext"), strings.HasPrefix(typ, "longtext"),
|
||||
strings.HasPrefix(typ, "character varying"), strings.HasPrefix(typ, "nvarchar"),
|
||||
strings.HasPrefix(typ, "nchar"):
|
||||
strings.HasPrefix(typ, "nchar"), strings.HasPrefix(typ, "bpchar"):
|
||||
return types.LogExtractValueTypeText, true
|
||||
|
||||
case strings.HasPrefix(typ, "float"), strings.HasPrefix(typ, "double"),
|
||||
@@ -203,7 +214,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
|
||||
strings.HasPrefix(typ, "time"), strings.HasPrefix(typ, "smalldatetime"):
|
||||
return types.LogExtractValueTypeDate, false
|
||||
|
||||
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"):
|
||||
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"), strings.HasPrefix(typ, "bool"):
|
||||
return types.LogExtractValueTypeBool, false
|
||||
}
|
||||
|
||||
|
||||
1
go.mod
1
go.mod
@@ -27,6 +27,7 @@ require (
|
||||
github.com/jinzhu/copier v0.4.0
|
||||
github.com/json-iterator/go v1.1.12
|
||||
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
|
||||
github.com/lib/pq v1.0.0
|
||||
github.com/mailru/easyjson v0.7.7
|
||||
github.com/mattn/go-isatty v0.0.19
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
|
||||
2
go.sum
2
go.sum
@@ -220,6 +220,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
|
||||
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
|
||||
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
|
||||
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
|
||||
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
|
||||
@@ -29,6 +29,7 @@ const (
|
||||
TDENGINE = "tdengine"
|
||||
ELASTICSEARCH = "elasticsearch"
|
||||
MYSQL = "mysql"
|
||||
POSTGRESQL = "pgsql"
|
||||
|
||||
CLICKHOUSE = "ck"
|
||||
)
|
||||
@@ -1197,7 +1198,8 @@ func (ar *AlertRule) IsInnerRule() bool {
|
||||
ar.Cate == CLICKHOUSE ||
|
||||
ar.Cate == ELASTICSEARCH ||
|
||||
ar.Prod == LOKI || ar.Cate == LOKI ||
|
||||
ar.Cate == MYSQL
|
||||
ar.Cate == MYSQL ||
|
||||
ar.Cate == POSTGRESQL
|
||||
}
|
||||
|
||||
func (ar *AlertRule) GetRuleType() string {
|
||||
|
||||
@@ -8,8 +8,12 @@ import (
|
||||
)
|
||||
|
||||
type Processor interface {
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *AlertCurEvent) *AlertCurEvent // 处理告警事件
|
||||
Init(settings interface{}) (Processor, error) // 初始化配置
|
||||
Process(ctx *ctx.Context, event *AlertCurEvent) (*AlertCurEvent, string, error)
|
||||
// 处理器有三种情况:
|
||||
// 1. 处理成功,返回处理后的事件
|
||||
// 2. 处理成功,不需要返回处理后端事件,只返回处理结果,将处理结果放到 string 中,比如 eventdrop callback 处理器
|
||||
// 3. 处理失败,返回错误,将错误放到 error 中
|
||||
}
|
||||
|
||||
type NewProcessorFn func(settings interface{}) (Processor, error)
|
||||
|
||||
Reference in New Issue
Block a user