Compare commits

...

5 Commits

Author SHA1 Message Date
ning
46083d741d fix: query data 2025-12-30 19:21:16 +08:00
ning
3eeb705b39 update ds perm check 2025-12-30 16:51:10 +08:00
ning
8d87e69ee7 fix: datasource delete 2025-12-30 16:50:10 +08:00
pioneerlfn
3da85d8e28 fix: doris exec sql timeout unit: s -> ms 2025-12-29 14:27:42 +08:00
pioneerlfn
b50410b88a refactor: update doris query 2025-12-26 16:32:57 +08:00
8 changed files with 93 additions and 20 deletions

View File

@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
pages.POST("/ds-query", rt.auth(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)

View File

@@ -1,18 +1,23 @@
package router
import (
"context"
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
)
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
// todo: 后续需要根据 cate 判断是否需要权限
return true
}
@@ -107,10 +112,13 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
}
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var resp []models.DataResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
var (
resp []models.DataResp
mu sync.Mutex
wg sync.WaitGroup
errs []error
rCtx = ctx.Request.Context()
)
for _, q := range f.Queries {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
@@ -122,12 +130,17 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
logger.Warningf("cluster:%d not exists", f.DatasourceId)
return nil, fmt.Errorf("cluster not exists")
}
vCtx := rCtx
if f.Cate == models.DORIS {
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, err := plug.QueryData(ctx.Request.Context(), query)
data, err := plug.QueryData(vCtx, query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()

View File

@@ -181,7 +181,7 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
}
}
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
Keys: types.Keys{

View File

@@ -57,3 +57,29 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
return cs.datas[cate][dsId], true
}
func (cs *Cache) Delete(cate string, dsId int64) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
if _, found := cs.datas[cate]; !found {
return
}
delete(cs.datas[cate], dsId)
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
}
// GetAllIds 返回缓存中所有数据源的 ID按类型分组
func (cs *Cache) GetAllIds() map[string][]int64 {
cs.mutex.RLock()
defer cs.mutex.RUnlock()
result := make(map[string][]int64)
for cate, dsMap := range cs.datas {
ids := make([]int64, 0, len(dsMap))
for dsId := range dsMap {
ids = append(ids, dsId)
}
result[cate] = ids
}
return result
}

View File

@@ -173,7 +173,10 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
}
func PutDatasources(items []datasource.DatasourceInfo) {
// 记录当前有效的数据源 ID按类型分组
validIds := make(map[string]map[int64]struct{})
ids := make([]int64, 0)
for _, item := range items {
if item.Type == "prometheus" {
continue
@@ -202,6 +205,12 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}
ids = append(ids, item.Id)
// 记录有效的数据源 ID
if _, ok := validIds[typ]; !ok {
validIds[typ] = make(map[int64]struct{})
}
validIds[typ][item.Id] = struct{}{}
// 异步初始化 client 不然数据源同步的会很慢
go func() {
defer func() {
@@ -213,5 +222,19 @@ func PutDatasources(items []datasource.DatasourceInfo) {
}()
}
// 删除 items 中不存在但 DsCache 中存在的数据源
cachedIds := DsCache.GetAllIds()
for cate, dsIds := range cachedIds {
for _, dsId := range dsIds {
if _, ok := validIds[cate]; !ok {
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
DsCache.Delete(cate, dsId)
} else if _, ok := validIds[cate][dsId]; !ok {
// 该数据源 ID 在 items 中不存在,删除
DsCache.Delete(cate, dsId)
}
}
}
logger.Debugf("get plugin by type success Ids:%v", ids)
}

View File

@@ -32,7 +32,7 @@ type Doris struct {
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
User string `json:"doris.user" mapstructure:"doris.user"` //
Password string `json:"doris.password" mapstructure:"doris.password"` //
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
@@ -127,7 +127,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
if timeout == 0 {
timeout = 60
}
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
}
// ShowDatabases lists all databases in Doris

View File

@@ -10,13 +10,14 @@ const (
TimeseriesAggregationTimestamp = "__ts__"
)
// QueryLogs 查询日志
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 等同于 Query()
return d.Query(ctx, query)
return d.Query(ctx, query, true)
}
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
values, err := d.QueryTimeseries(ctx, query)
if err != nil {

View File

@@ -15,6 +15,10 @@ const (
TimeFieldFormatDateTime = "datetime"
)
type noNeedCheckMaxRowKey struct{}
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
// 不再拼接SQL, 完全信赖用户的输入
type QueryParam struct {
Database string `json:"database"`
@@ -39,7 +43,7 @@ var (
)
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
// 校验SQL的合法性, 过滤掉 write请求
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
for _, item := range sqlItem {
@@ -48,10 +52,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
}
}
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
if checkMaxRow {
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
}
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
@@ -63,8 +69,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
// 使用 Query 方法执行查询Query方法内部已包含MaxQueryRows检查
rows, err := d.Query(ctx, query)
// 默认需要检查,除非调用方声明不需要检查
checkMaxRow := true
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
checkMaxRow = false
}
rows, err := d.Query(ctx, query, checkMaxRow)
if err != nil {
return nil, err
}