mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 22:48:56 +00:00
Compare commits
2 Commits
optimize-c
...
feat-pg-al
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5b47f3c2f | ||
|
|
52c24c4173 |
@@ -37,4 +37,10 @@ var Plugins = []Plugin{
|
||||
Type: "mysql",
|
||||
TypeName: "MySQL",
|
||||
},
|
||||
{
|
||||
Id: 7,
|
||||
Category: "timeseries",
|
||||
Type: "pgsql",
|
||||
TypeName: "PostgreSQL",
|
||||
},
|
||||
}
|
||||
|
||||
@@ -60,6 +60,13 @@ func init() {
|
||||
PluginType: "mysql",
|
||||
PluginTypeName: "MySQL",
|
||||
}
|
||||
|
||||
DatasourceTypes[6] = DatasourceType{
|
||||
Id: 6,
|
||||
Category: "timeseries",
|
||||
PluginType: "pgsql",
|
||||
PluginTypeName: "PostgreSQL",
|
||||
}
|
||||
}
|
||||
|
||||
type NewDatasrouceFn func(settings map[string]interface{}) (Datasource, error)
|
||||
|
||||
@@ -94,6 +94,26 @@ func (m *MySQL) Equal(p datasource.Datasource) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Timeout != newShard.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
346
datasource/postgresql/postgresql.go
Normal file
346
datasource/postgresql/postgresql.go
Normal file
@@ -0,0 +1,346 @@
|
||||
package postgresql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/ccfos/nightingale/v6/pkg/macros"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/postgres"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
PostgreSQLType = "pgsql"
|
||||
)
|
||||
|
||||
var (
|
||||
regx = "(?i)from\\s+([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)\\.([a-zA-Z0-9_]+)"
|
||||
)
|
||||
|
||||
func init() {
|
||||
datasource.RegisterDatasource(PostgreSQLType, new(PostgreSQL))
|
||||
}
|
||||
|
||||
type PostgreSQL struct {
|
||||
Shards []*postgres.PostgreSQL `json:"pgsql.shards" mapstructure:"pgsql.shards"`
|
||||
}
|
||||
|
||||
type QueryParam struct {
|
||||
Ref string `json:"ref" mapstructure:"ref"`
|
||||
Database string `json:"database" mapstructure:"database"`
|
||||
Table string `json:"table" mapstructure:"table"`
|
||||
SQL string `json:"sql" mapstructure:"sql"`
|
||||
Keys datasource.Keys `json:"keys" mapstructure:"keys"`
|
||||
From int64 `json:"from" mapstructure:"from"`
|
||||
To int64 `json:"to" mapstructure:"to"`
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) InitClient() error {
|
||||
if len(p.Shards) == 0 {
|
||||
return fmt.Errorf("not found postgresql addr, please check datasource config")
|
||||
}
|
||||
for _, shard := range p.Shards {
|
||||
if db, err := shard.NewConn(context.TODO(), "postgres"); err != nil {
|
||||
defer sqlbase.CloseDB(db)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) Init(settings map[string]interface{}) (datasource.Datasource, error) {
|
||||
newest := new(PostgreSQL)
|
||||
err := mapstructure.Decode(settings, newest)
|
||||
return newest, err
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) Validate(ctx context.Context) error {
|
||||
if len(p.Shards) == 0 || len(strings.TrimSpace(p.Shards[0].Addr)) == 0 {
|
||||
return fmt.Errorf("postgresql addr is invalid, please check datasource setting")
|
||||
}
|
||||
|
||||
if len(strings.TrimSpace(p.Shards[0].User)) == 0 {
|
||||
return fmt.Errorf("postgresql user is invalid, please check datasource setting")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Equal compares whether two objects are the same, used for caching
|
||||
func (p *PostgreSQL) Equal(d datasource.Datasource) bool {
|
||||
newest, ok := d.(*PostgreSQL)
|
||||
if !ok {
|
||||
logger.Errorf("unexpected plugin type, expected is postgresql")
|
||||
return false
|
||||
}
|
||||
|
||||
if len(p.Shards) == 0 || len(newest.Shards) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
oldShard := p.Shards[0]
|
||||
newShard := newest.Shards[0]
|
||||
|
||||
if oldShard.Addr != newShard.Addr {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.User != newShard.User {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Password != newShard.Password {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxQueryRows != newShard.MaxQueryRows {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.Timeout != newShard.Timeout {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxIdleConns != newShard.MaxIdleConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.MaxOpenConns != newShard.MaxOpenConns {
|
||||
return false
|
||||
}
|
||||
|
||||
if oldShard.ConnMaxLifetime != newShard.ConnMaxLifetime {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) ShowDatabases(ctx context.Context) ([]string, error) {
|
||||
return p.Shards[0].ShowDatabases(ctx, "")
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) ShowTables(ctx context.Context, database string) ([]string, error) {
|
||||
p.Shards[0].DB = database
|
||||
rets, err := p.Shards[0].ShowTables(ctx, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tables := make([]string, 0, len(rets))
|
||||
for scheme, tabs := range rets {
|
||||
for _, tab := range tabs {
|
||||
tables = append(tables, scheme+"."+tab)
|
||||
}
|
||||
}
|
||||
return tables, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) MakeTSQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryMapData(ctx context.Context, query interface{}) ([]map[string]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryData(ctx context.Context, query interface{}) ([]models.DataResp, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if postgresqlQueryParam.Database != "" {
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
} else {
|
||||
db, err := parseDBName(postgresqlQueryParam.SQL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
timeout := p.Shards[0].Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
items, err := p.Shards[0].QueryTimeseries(timeoutCtx, &sqlbase.QueryParam{
|
||||
Sql: postgresqlQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
ValueKey: postgresqlQueryParam.Keys.ValueKey,
|
||||
LabelKey: postgresqlQueryParam.Keys.LabelKey,
|
||||
TimeKey: postgresqlQueryParam.Keys.TimeKey,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
|
||||
return []models.DataResp{}, err
|
||||
}
|
||||
data := make([]models.DataResp, 0)
|
||||
for i := range items {
|
||||
data = append(data, models.DataResp{
|
||||
Ref: postgresqlQueryParam.Ref,
|
||||
Metric: items[i].Metric,
|
||||
Values: items[i].Values,
|
||||
})
|
||||
}
|
||||
|
||||
// parse resp to time series data
|
||||
logger.Infof("req:%+v keys:%+v \n data:%v", postgresqlQueryParam, postgresqlQueryParam.Keys, data)
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) QueryLog(ctx context.Context, query interface{}) ([]interface{}, int64, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if postgresqlQueryParam.Database != "" {
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
} else {
|
||||
db, err := parseDBName(postgresqlQueryParam.SQL)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
p.Shards[0].DB = db
|
||||
}
|
||||
|
||||
if strings.Contains(postgresqlQueryParam.SQL, "$__") {
|
||||
var err error
|
||||
postgresqlQueryParam.SQL, err = macros.Macro(postgresqlQueryParam.SQL, postgresqlQueryParam.From, postgresqlQueryParam.To)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
|
||||
timeout := p.Shards[0].Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
defer cancel()
|
||||
items, err := p.Shards[0].Query(timeoutCtx, &sqlbase.QueryParam{
|
||||
Sql: postgresqlQueryParam.SQL,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warningf("query:%+v get data err:%v", postgresqlQueryParam, err)
|
||||
return []interface{}{}, 0, err
|
||||
}
|
||||
logs := make([]interface{}, 0)
|
||||
for i := range items {
|
||||
logs = append(logs, items[i])
|
||||
}
|
||||
|
||||
return logs, 0, nil
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) DescribeTable(ctx context.Context, query interface{}) ([]*types.ColumnProperty, error) {
|
||||
postgresqlQueryParam := new(QueryParam)
|
||||
if err := mapstructure.Decode(query, postgresqlQueryParam); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Shards[0].DB = postgresqlQueryParam.Database
|
||||
pairs := strings.Split(postgresqlQueryParam.Table, ".") // format: scheme.table_name
|
||||
scheme := ""
|
||||
table := postgresqlQueryParam.Table
|
||||
if len(pairs) == 2 {
|
||||
scheme = pairs[0]
|
||||
table = pairs[1]
|
||||
}
|
||||
return p.Shards[0].DescTable(ctx, scheme, table)
|
||||
}
|
||||
|
||||
func parseDBName(sql string) (db string, err error) {
|
||||
re := regexp.MustCompile(regx)
|
||||
matches := re.FindStringSubmatch(sql)
|
||||
if len(matches) != 4 {
|
||||
return "", fmt.Errorf("no valid table name in format database.schema.table found")
|
||||
}
|
||||
return matches[1], nil
|
||||
}
|
||||
|
||||
func extractColumns(sql string) ([]string, error) {
|
||||
// 将 SQL 转换为小写以简化匹配
|
||||
sql = strings.ToLower(sql)
|
||||
|
||||
// 匹配 SELECT 和 FROM 之间的内容
|
||||
re := regexp.MustCompile(`select\s+(.*?)\s+from`)
|
||||
matches := re.FindStringSubmatch(sql)
|
||||
|
||||
if len(matches) < 2 {
|
||||
return nil, fmt.Errorf("no columns found or invalid SQL syntax")
|
||||
}
|
||||
|
||||
// 提取列部分
|
||||
columnsString := matches[1]
|
||||
|
||||
// 分割列
|
||||
columns := splitColumns(columnsString)
|
||||
|
||||
// 清理每个列名
|
||||
for i, col := range columns {
|
||||
columns[i] = strings.TrimSpace(col)
|
||||
}
|
||||
|
||||
return columns, nil
|
||||
}
|
||||
|
||||
func splitColumns(columnsString string) []string {
|
||||
var columns []string
|
||||
var currentColumn strings.Builder
|
||||
parenthesesCount := 0
|
||||
inQuotes := false
|
||||
|
||||
for _, char := range columnsString {
|
||||
switch char {
|
||||
case '(':
|
||||
parenthesesCount++
|
||||
currentColumn.WriteRune(char)
|
||||
case ')':
|
||||
parenthesesCount--
|
||||
currentColumn.WriteRune(char)
|
||||
case '\'', '"':
|
||||
inQuotes = !inQuotes
|
||||
currentColumn.WriteRune(char)
|
||||
case ',':
|
||||
if parenthesesCount == 0 && !inQuotes {
|
||||
columns = append(columns, currentColumn.String())
|
||||
currentColumn.Reset()
|
||||
} else {
|
||||
currentColumn.WriteRune(char)
|
||||
}
|
||||
default:
|
||||
currentColumn.WriteRune(char)
|
||||
}
|
||||
}
|
||||
|
||||
if currentColumn.Len() > 0 {
|
||||
columns = append(columns, currentColumn.String())
|
||||
}
|
||||
|
||||
return columns
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/ck"
|
||||
"github.com/ccfos/nightingale/v6/datasource/es"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/mysql"
|
||||
_ "github.com/ccfos/nightingale/v6/datasource/postgresql"
|
||||
"github.com/ccfos/nightingale/v6/dskit/tdengine"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
"github.com/ccfos/nightingale/v6/pkg/ctx"
|
||||
|
||||
207
dskit/postgres/postgres.go
Normal file
207
dskit/postgres/postgres.go
Normal file
@@ -0,0 +1,207 @@
|
||||
// @Author: Ciusyan 5/20/24
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/pool"
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
|
||||
_ "github.com/lib/pq" // PostgreSQL driver
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
type PostgreSQL struct {
|
||||
Shard `json:",inline" mapstructure:",squash"`
|
||||
}
|
||||
|
||||
type Shard struct {
|
||||
Addr string `json:"pgsql.addr" mapstructure:"pgsql.addr"`
|
||||
DB string `json:"pgsql.db" mapstructure:"pgsql.db"`
|
||||
User string `json:"pgsql.user" mapstructure:"pgsql.user"`
|
||||
Password string `json:"pgsql.password" mapstructure:"pgsql.password" `
|
||||
Timeout int `json:"pgsql.timeout" mapstructure:"pgsql.timeout"`
|
||||
MaxIdleConns int `json:"pgsql.max_idle_conns" mapstructure:"pgsql.max_idle_conns"`
|
||||
MaxOpenConns int `json:"pgsql.max_open_conns" mapstructure:"pgsql.max_open_conns"`
|
||||
ConnMaxLifetime int `json:"pgsql.conn_max_lifetime" mapstructure:"pgsql.conn_max_lifetime"`
|
||||
MaxQueryRows int `json:"pgsql.max_query_rows" mapstructure:"pgsql.max_query_rows"`
|
||||
}
|
||||
|
||||
// NewPostgreSQLWithSettings initializes a new PostgreSQL instance with the given settings
|
||||
func NewPostgreSQLWithSettings(ctx context.Context, settings interface{}) (*PostgreSQL, error) {
|
||||
newest := new(PostgreSQL)
|
||||
settingsMap := map[string]interface{}{}
|
||||
|
||||
switch s := settings.(type) {
|
||||
case string:
|
||||
if err := json.Unmarshal([]byte(s), &settingsMap); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case map[string]interface{}:
|
||||
settingsMap = s
|
||||
case *PostgreSQL:
|
||||
return s, nil
|
||||
case PostgreSQL:
|
||||
return &s, nil
|
||||
case Shard:
|
||||
newest.Shard = s
|
||||
return newest, nil
|
||||
case *Shard:
|
||||
newest.Shard = *s
|
||||
return newest, nil
|
||||
default:
|
||||
return nil, errors.New("unsupported settings type")
|
||||
}
|
||||
|
||||
if err := mapstructure.Decode(settingsMap, newest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newest, nil
|
||||
}
|
||||
|
||||
// NewConn establishes a new connection to PostgreSQL
|
||||
func (p *PostgreSQL) NewConn(ctx context.Context, database string) (*gorm.DB, error) {
|
||||
if len(p.DB) == 0 && len(database) == 0 {
|
||||
return nil, errors.New("empty pgsql database") // 兼容阿里实时数仓Holgres, 连接时必须指定db名字
|
||||
}
|
||||
|
||||
if p.Shard.Timeout == 0 {
|
||||
p.Shard.Timeout = 60
|
||||
}
|
||||
|
||||
if p.Shard.MaxIdleConns == 0 {
|
||||
p.Shard.MaxIdleConns = 10
|
||||
}
|
||||
|
||||
if p.Shard.MaxOpenConns == 0 {
|
||||
p.Shard.MaxOpenConns = 100
|
||||
}
|
||||
|
||||
if p.Shard.ConnMaxLifetime == 0 {
|
||||
p.Shard.ConnMaxLifetime = 14400
|
||||
}
|
||||
|
||||
if len(p.Shard.Addr) == 0 {
|
||||
return nil, errors.New("empty fe-node addr")
|
||||
}
|
||||
var keys []string
|
||||
var err error
|
||||
keys = append(keys, p.Shard.Addr)
|
||||
|
||||
keys = append(keys, p.Shard.Password, p.Shard.User)
|
||||
if len(database) > 0 {
|
||||
keys = append(keys, database)
|
||||
}
|
||||
cachedKey := strings.Join(keys, ":")
|
||||
// cache conn with database
|
||||
conn, ok := pool.PoolClient.Load(cachedKey)
|
||||
if ok {
|
||||
return conn.(*gorm.DB), nil
|
||||
}
|
||||
|
||||
var db *gorm.DB
|
||||
defer func() {
|
||||
if db != nil && err == nil {
|
||||
pool.PoolClient.Store(cachedKey, db)
|
||||
}
|
||||
}()
|
||||
|
||||
// Simplified connection logic for PostgreSQL
|
||||
dsn := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable&TimeZone=Asia/Shanghai", p.Shard.User, p.Shard.Password, p.Shard.Addr, database)
|
||||
db, err = sqlbase.NewDB(
|
||||
ctx,
|
||||
postgres.Open(dsn),
|
||||
p.Shard.MaxIdleConns,
|
||||
p.Shard.MaxOpenConns,
|
||||
time.Duration(p.Shard.ConnMaxLifetime)*time.Second,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
if db != nil {
|
||||
sqlDB, _ := db.DB()
|
||||
if sqlDB != nil {
|
||||
sqlDB.Close()
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// ShowDatabases lists all databases in PostgreSQL
|
||||
func (p *PostgreSQL) ShowDatabases(ctx context.Context, searchKeyword string) ([]string, error) {
|
||||
db, err := p.NewConn(ctx, "postgres")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sql := fmt.Sprintf("SELECT datname FROM pg_database WHERE datistemplate = false AND datname LIKE %s",
|
||||
"'%"+searchKeyword+"%'")
|
||||
return sqlbase.ShowDatabases(ctx, db, sql)
|
||||
}
|
||||
|
||||
// ShowTables lists all tables in a given database
|
||||
func (p *PostgreSQL) ShowTables(ctx context.Context, searchKeyword string) (map[string][]string, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sql := fmt.Sprintf("SELECT schemaname, tablename FROM pg_tables WHERE schemaname !='information_schema' and schemaname !='pg_catalog' and tablename LIKE %s",
|
||||
"'%"+searchKeyword+"%'")
|
||||
rets, err := sqlbase.ExecQuery(ctx, db, sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tabs := make(map[string][]string, 3)
|
||||
for _, row := range rets {
|
||||
if val, ok := row["schemaname"].(string); ok {
|
||||
tabs[val] = append(tabs[val], row["tablename"].(string))
|
||||
}
|
||||
}
|
||||
return tabs, nil
|
||||
}
|
||||
|
||||
// DescTable describes the schema of a specified table in PostgreSQL
|
||||
// scheme default: public if not specified
|
||||
func (p *PostgreSQL) DescTable(ctx context.Context, scheme, table string) ([]*types.ColumnProperty, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if scheme == "" {
|
||||
scheme = "public"
|
||||
}
|
||||
|
||||
query := fmt.Sprintf("SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = '%s' AND table_schema = '%s'", table, scheme)
|
||||
return sqlbase.DescTable(ctx, db, query)
|
||||
}
|
||||
|
||||
// SelectRows selects rows from a specified table in PostgreSQL based on a given query
|
||||
func (p *PostgreSQL) SelectRows(ctx context.Context, table, where string) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.SelectRows(ctx, db, table, where)
|
||||
}
|
||||
|
||||
// ExecQuery executes a SQL query in PostgreSQL
|
||||
func (p *PostgreSQL) ExecQuery(ctx context.Context, sql string) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.ExecQuery(ctx, db, sql)
|
||||
}
|
||||
73
dskit/postgres/timeseries.go
Normal file
73
dskit/postgres/timeseries.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/sqlbase"
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in PostgreSQL and returns the results
|
||||
func (p *PostgreSQL) Query(ctx context.Context, query *sqlbase.QueryParam) ([]map[string]interface{}, error) {
|
||||
db, err := p.NewConn(ctx, p.Shard.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.CheckMaxQueryRows(db, ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.Query(ctx, db, query)
|
||||
}
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters
|
||||
func (p *PostgreSQL) QueryTimeseries(ctx context.Context, query *sqlbase.QueryParam) ([]types.MetricValues, error) {
|
||||
db, err := p.NewConn(ctx, p.Shard.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.CheckMaxQueryRows(db, ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqlbase.QueryTimeseries(ctx, db, query, true)
|
||||
}
|
||||
|
||||
func (p *PostgreSQL) CheckMaxQueryRows(db *gorm.DB, ctx context.Context, query *sqlbase.QueryParam) error {
|
||||
sql := strings.ReplaceAll(query.Sql, ";", "")
|
||||
checkQuery := &sqlbase.QueryParam{
|
||||
Sql: fmt.Sprintf("SELECT COUNT(*) as count FROM (%s) AS subquery;", sql),
|
||||
}
|
||||
|
||||
res, err := sqlbase.Query(ctx, db, checkQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(res) > 0 {
|
||||
if count, exists := res[0]["count"]; exists {
|
||||
v, err := sqlbase.ParseFloat64Value(count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maxQueryRows := p.Shard.MaxQueryRows
|
||||
if maxQueryRows == 0 {
|
||||
maxQueryRows = 500
|
||||
}
|
||||
|
||||
if v > float64(maxQueryRows) {
|
||||
return fmt.Errorf("query result rows count %d exceeds the maximum limit %d", int(v), maxQueryRows)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -9,9 +9,9 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dskit/types"
|
||||
)
|
||||
|
||||
// NewDB creates a new Gorm DB instance based on the provided gorm.Dialector and configures the connection pool
|
||||
@@ -19,7 +19,7 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
|
||||
// Create a new Gorm DB instance
|
||||
db, err := gorm.Open(dialector, &gorm.Config{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return db, err
|
||||
}
|
||||
|
||||
// Configure the connection pool
|
||||
@@ -35,6 +35,17 @@ func NewDB(ctx context.Context, dialector gorm.Dialector, maxIdleConns, maxOpenC
|
||||
return db.WithContext(ctx), sqlDB.Ping()
|
||||
}
|
||||
|
||||
func CloseDB(db *gorm.DB) error {
|
||||
if db != nil {
|
||||
sqlDb, err := db.DB()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sqlDb.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShowTables retrieves a list of all tables in the specified database
|
||||
func ShowTables(ctx context.Context, db *gorm.DB, query string) ([]string, error) {
|
||||
var tables []string
|
||||
@@ -112,7 +123,7 @@ func DescTable(ctx context.Context, db *gorm.DB, query string) ([]*types.ColumnP
|
||||
}
|
||||
|
||||
// Convert the database-specific type to internal type
|
||||
type2, indexable := convertDBType(db.Dialector.Name(), typ)
|
||||
type2, indexable := ConvertDBType(db.Dialector.Name(), typ)
|
||||
columns = append(columns, &types.ColumnProperty{
|
||||
Field: field,
|
||||
Type: typ,
|
||||
@@ -175,7 +186,7 @@ func SelectRows(ctx context.Context, db *gorm.DB, table, query string) ([]map[st
|
||||
}
|
||||
|
||||
// convertDBType converts MySQL or PostgreSQL data types to custom internal types and determines if they are indexable
|
||||
func convertDBType(dialect, dbType string) (string, bool) {
|
||||
func ConvertDBType(dialect, dbType string) (string, bool) {
|
||||
typ := strings.ToLower(dbType)
|
||||
|
||||
// Common type conversions
|
||||
@@ -190,7 +201,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
|
||||
strings.HasPrefix(typ, "char"), strings.HasPrefix(typ, "tinytext"),
|
||||
strings.HasPrefix(typ, "mediumtext"), strings.HasPrefix(typ, "longtext"),
|
||||
strings.HasPrefix(typ, "character varying"), strings.HasPrefix(typ, "nvarchar"),
|
||||
strings.HasPrefix(typ, "nchar"):
|
||||
strings.HasPrefix(typ, "nchar"), strings.HasPrefix(typ, "bpchar"):
|
||||
return types.LogExtractValueTypeText, true
|
||||
|
||||
case strings.HasPrefix(typ, "float"), strings.HasPrefix(typ, "double"),
|
||||
@@ -203,7 +214,7 @@ func convertDBType(dialect, dbType string) (string, bool) {
|
||||
strings.HasPrefix(typ, "time"), strings.HasPrefix(typ, "smalldatetime"):
|
||||
return types.LogExtractValueTypeDate, false
|
||||
|
||||
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"):
|
||||
case strings.HasPrefix(typ, "boolean"), strings.HasPrefix(typ, "bit"), strings.HasPrefix(typ, "bool"):
|
||||
return types.LogExtractValueTypeBool, false
|
||||
}
|
||||
|
||||
|
||||
1
go.mod
1
go.mod
@@ -27,6 +27,7 @@ require (
|
||||
github.com/jinzhu/copier v0.4.0
|
||||
github.com/json-iterator/go v1.1.12
|
||||
github.com/koding/multiconfig v0.0.0-20171124222453-69c27309b2d7
|
||||
github.com/lib/pq v1.0.0
|
||||
github.com/mailru/easyjson v0.7.7
|
||||
github.com/mattn/go-isatty v0.0.19
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
|
||||
2
go.sum
2
go.sum
@@ -220,6 +220,8 @@ github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+
|
||||
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
|
||||
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
|
||||
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
|
||||
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
|
||||
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
|
||||
@@ -29,6 +29,7 @@ const (
|
||||
TDENGINE = "tdengine"
|
||||
ELASTICSEARCH = "elasticsearch"
|
||||
MYSQL = "mysql"
|
||||
POSTGRESQL = "pgsql"
|
||||
|
||||
CLICKHOUSE = "ck"
|
||||
)
|
||||
@@ -1197,7 +1198,8 @@ func (ar *AlertRule) IsInnerRule() bool {
|
||||
ar.Cate == CLICKHOUSE ||
|
||||
ar.Cate == ELASTICSEARCH ||
|
||||
ar.Prod == LOKI || ar.Cate == LOKI ||
|
||||
ar.Cate == MYSQL
|
||||
ar.Cate == MYSQL ||
|
||||
ar.Cate == POSTGRESQL
|
||||
}
|
||||
|
||||
func (ar *AlertRule) GetRuleType() string {
|
||||
|
||||
Reference in New Issue
Block a user