Compare commits

...

1 Commits

Author SHA1 Message Date
ning
47ce119d21 es sql alert 2025-12-11 16:52:57 +08:00
5 changed files with 216 additions and 23 deletions

View File

@@ -1,6 +1,7 @@
package eslike
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -10,13 +11,16 @@ import (
"github.com/araddon/dateparse"
"github.com/bitly/go-simplejson"
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
"github.com/ccfos/nightingale/v6/dskit/types"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
"github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/ccfos/nightingale/v6/memsto"
"github.com/ccfos/nightingale/v6/models"
)
type FixedField string
@@ -47,6 +51,12 @@ type Query struct {
MaxShard int `json:"max_shard" mapstructure:"max_shard"`
SearchAfter *SearchAfter `json:"search_after" mapstructure:"search_after"`
QueryType string `json:"query_type" mapstructure:"query_type"`
Query string `json:"query" mapstructure:"query"`
CustomParams map[string]interface{} `json:"custom_params" mapstructure:"custom_params"`
MaxQueryRows int `json:"max_query_rows" mapstructure:"max_query_rows"`
Keys types.Keys `json:"keys" mapstructure:"keys"`
}
type SortField struct {
@@ -714,3 +724,121 @@ func QueryLog(ctx context.Context, queryParam interface{}, timeout int64, versio
return ret, total, nil
}
// execSQLQuery executes ES SQL query and returns rows as []map[string]interface{}
func execSQLQuery(ctx context.Context, param *Query, client *elasticsearch.Client) ([]map[string]interface{}, error) {
query := map[string]interface{}{
"query": param.Query,
}
for k, v := range param.CustomParams {
query[k] = v
}
if param.Timeout > 0 {
query["request_timeout"] = fmt.Sprintf("%ds", param.Timeout)
}
if param.MaxQueryRows > 0 {
query["fetch_size"] = param.MaxQueryRows
}
queryBytes, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("failed to marshal SQL query: %v", err)
}
req := esapi.SQLQueryRequest{
Body: bytes.NewReader(queryBytes),
}
res, err := req.Do(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to execute SQL query: %v", err)
}
defer res.Body.Close()
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to parse SQL response: %v", err)
}
if res.IsError() {
if errObj, ok := result["error"].(map[string]interface{}); ok {
return nil, fmt.Errorf("SQL query error: %s", errObj["reason"])
}
return nil, fmt.Errorf("SQL query error: unknown")
}
columns, _ := result["columns"].([]interface{})
rows, _ := result["rows"].([]interface{})
var rowMaps []map[string]interface{}
for _, row := range rows {
rowData, _ := row.([]interface{})
rowMap := make(map[string]interface{})
for i, col := range columns {
if colObj, ok := col.(map[string]interface{}); ok {
colName, _ := colObj["name"].(string)
if i < len(rowData) {
rowMap[colName] = rowData[i]
}
}
}
rowMaps = append(rowMaps, rowMap)
}
return rowMaps, nil
}
func QuerySQLData(ctx context.Context, queryParam interface{}, cliTimeout int64, version string, client *elasticsearch.Client) ([]models.DataResp, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, err
}
if param.Timeout == 0 {
param.Timeout = int(cliTimeout) / 1000
}
rowMaps, err := execSQLQuery(ctx, param, client)
if err != nil {
return nil, err
}
metricValues := sqlbase.FormatMetricValues(param.Keys, rowMaps)
var dataResps []models.DataResp
for _, mv := range metricValues {
dataResps = append(dataResps, models.DataResp{
Ref: param.Ref,
Metric: mv.Metric,
Values: mv.Values,
})
}
return dataResps, nil
}
func QuerySQLLog(ctx context.Context, queryParam interface{}, timeout int64, version string, client *elasticsearch.Client) ([]interface{}, int64, error) {
param := new(Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, 0, err
}
if param.Timeout == 0 {
param.Timeout = int(timeout) / 1000
}
rowMaps, err := execSQLQuery(ctx, param, client)
if err != nil {
return nil, 0, err
}
ret := make([]interface{}, len(rowMaps))
for i, rowMap := range rowMaps {
ret[i] = rowMap
}
return ret, 0, nil
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/ccfos/nightingale/v6/models"
"github.com/ccfos/nightingale/v6/pkg/tlsx"
"github.com/elastic/go-elasticsearch/v9"
"github.com/mitchellh/mapstructure"
"github.com/olivere/elastic/v7"
"github.com/toolkits/pkg/logger"
@@ -27,18 +28,20 @@ const (
)
type Elasticsearch struct {
Addr string `json:"es.addr" mapstructure:"es.addr"`
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
Version string `json:"es.version" mapstructure:"es.version"`
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
Addr string `json:"es.addr" mapstructure:"es.addr"`
Nodes []string `json:"es.nodes" mapstructure:"es.nodes"`
Timeout int64 `json:"es.timeout" mapstructure:"es.timeout"` // millis
Basic BasicAuth `json:"es.basic" mapstructure:"es.basic"`
TLS TLS `json:"es.tls" mapstructure:"es.tls"`
Version string `json:"es.version" mapstructure:"es.version"`
Headers map[string]string `json:"es.headers" mapstructure:"es.headers"`
MinInterval int `json:"es.min_interval" mapstructure:"es.min_interval"` // seconds
MaxShard int `json:"es.max_shard" mapstructure:"es.max_shard"`
ClusterName string `json:"es.cluster_name" mapstructure:"es.cluster_name"`
EnableWrite bool `json:"es.enable_write" mapstructure:"es.enable_write"` // 允许写操作
Client *elastic.Client `json:"es.client" mapstructure:"es.client"`
NewClient *elasticsearch.Client `json:"es.new_client" mapstructure:"es.new_client"`
MaxQueryRows int `json:"es.max_query_rows" mapstructure:"es.max_query_rows"`
}
type TLS struct {
@@ -110,6 +113,23 @@ func (e *Elasticsearch) InitClient() error {
return err
}
cfg := elasticsearch.Config{
Addresses: e.Nodes,
Transport: transport,
Header: http.Header{},
}
if e.Basic.Username != "" && e.Basic.Password != "" {
cfg.Username = e.Basic.Username
cfg.Password = e.Basic.Password
}
for k, v := range e.Headers {
cfg.Header[k] = []string{v}
}
e.NewClient, err = elasticsearch.NewClient(cfg)
return err
}
@@ -183,6 +203,15 @@ func (e *Elasticsearch) MakeTSQuery(ctx context.Context, query interface{}, even
}
func (e *Elasticsearch) QueryData(ctx context.Context, queryParam interface{}) ([]models.DataResp, error) {
param := new(eslike.Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, err
}
if param.QueryType == "SQL" {
return eslike.QuerySQLData(ctx, param, e.Timeout, e.Version, e.NewClient)
}
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
return e.Client.Search().
Index(indices...).
@@ -248,6 +277,20 @@ func (e *Elasticsearch) QueryFields(indexes []string) ([]string, error) {
}
func (e *Elasticsearch) QueryLog(ctx context.Context, queryParam interface{}) ([]interface{}, int64, error) {
param := new(eslike.Query)
if err := mapstructure.Decode(queryParam, param); err != nil {
return nil, 0, err
}
if param.QueryType == "SQL" {
if param.CustomParams == nil {
param.CustomParams = make(map[string]interface{})
}
if e.MaxQueryRows > 0 {
param.MaxQueryRows = e.MaxQueryRows
}
return eslike.QuerySQLLog(ctx, param, e.Timeout, e.Version, e.NewClient)
}
search := func(ctx context.Context, indices []string, source interface{}, timeout int, maxShard int) (*elastic.SearchResult, error) {
// 应该是之前为了获取 fields 字段,做的这个兼容

View File

@@ -170,6 +170,7 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
ds.Settings["es.min_interval"] = item.SettingsJson["min_interval"]
ds.Settings["es.max_shard"] = item.SettingsJson["max_shard"]
ds.Settings["es.enable_write"] = item.SettingsJson["enable_write"]
ds.Settings["es.max_query_rows"] = item.SettingsJson["max_query_rows"]
}
func PutDatasources(items []datasource.DatasourceInfo) {

11
go.mod
View File

@@ -18,6 +18,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/go-elasticsearch/v9 v9.0.0
github.com/expr-lang/expr v1.16.1
github.com/flashcatcloud/ibex v1.3.6
github.com/gin-contrib/pprof v1.4.0
@@ -75,8 +76,8 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
)
require (
@@ -90,7 +91,10 @@ require (
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
github.com/glebarez/go-sqlite v1.21.2 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
@@ -103,11 +107,12 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rogpeppe/go-internal v1.13.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
golang.org/x/sync v0.18.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
modernc.org/libc v1.22.5 // indirect

26
go.sum
View File

@@ -144,6 +144,10 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
github.com/elastic/go-elasticsearch/v9 v9.0.0 h1:krpgPeJ2lC8apkaw6B58gKDYJq5eUhP8AMwpPt01Q/U=
github.com/elastic/go-elasticsearch/v9 v9.0.0/go.mod h1:2PB5YQPpY5tWbF65MRqzEXA31PZOdXCkloQSOZtU14I=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
@@ -183,6 +187,11 @@ github.com/go-ldap/ldap/v3 v3.4.4 h1:qPjipEpt+qDa6SI/h1fzuGWoRUY+qqQ9sOZq67/PYUs
github.com/go-ldap/ldap/v3 v3.4.4/go.mod h1:fe1MsuN5eJJ1FeLT/LEBVdWfNWKh459R7aXgXtJC+aI=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
@@ -232,8 +241,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
@@ -463,10 +473,16 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M=
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.4.0/go.mod h1:/mTEdr7LvHhs0v7mjdxDreTz1OG5zdZGqgOnhWiR/+Q=