mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 14:38:55 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46083d741d | ||
|
|
3eeb705b39 | ||
|
|
8d87e69ee7 | ||
|
|
3da85d8e28 | ||
|
|
b50410b88a |
@@ -211,8 +211,8 @@ func (rt *Router) Config(r *gin.Engine) {
|
||||
pages.GET("/datasource/brief", rt.auth(), rt.user(), rt.datasourceBriefs)
|
||||
pages.POST("/datasource/query", rt.auth(), rt.user(), rt.datasourceQuery)
|
||||
|
||||
pages.POST("/ds-query", rt.auth(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.QueryLogV2)
|
||||
pages.POST("/ds-query", rt.auth(), rt.user(), rt.QueryData)
|
||||
pages.POST("/logs-query", rt.auth(), rt.user(), rt.QueryLogV2)
|
||||
|
||||
pages.POST("/tdengine-databases", rt.auth(), rt.tdengineDatabases)
|
||||
pages.POST("/tdengine-tables", rt.auth(), rt.tdengineTables)
|
||||
|
||||
@@ -1,18 +1,23 @@
|
||||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/dscache"
|
||||
"github.com/ccfos/nightingale/v6/dskit/doris"
|
||||
"github.com/ccfos/nightingale/v6/models"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
func CheckDsPerm(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
type CheckDsPermFunc func(c *gin.Context, dsId int64, cate string, q interface{}) bool
|
||||
|
||||
var CheckDsPerm CheckDsPermFunc = func(c *gin.Context, dsId int64, cate string, q interface{}) bool {
|
||||
// todo: 后续需要根据 cate 判断是否需要权限
|
||||
return true
|
||||
}
|
||||
@@ -107,10 +112,13 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
|
||||
}
|
||||
|
||||
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
|
||||
var resp []models.DataResp
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
var (
|
||||
resp []models.DataResp
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
errs []error
|
||||
rCtx = ctx.Request.Context()
|
||||
)
|
||||
|
||||
for _, q := range f.Queries {
|
||||
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
|
||||
@@ -122,12 +130,17 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
|
||||
logger.Warningf("cluster:%d not exists", f.DatasourceId)
|
||||
return nil, fmt.Errorf("cluster not exists")
|
||||
}
|
||||
|
||||
vCtx := rCtx
|
||||
if f.Cate == models.DORIS {
|
||||
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(query interface{}) {
|
||||
defer wg.Done()
|
||||
|
||||
data, err := plug.QueryData(ctx.Request.Context(), query)
|
||||
data, err := plug.QueryData(vCtx, query)
|
||||
if err != nil {
|
||||
logger.Warningf("query data error: req:%+v err:%v", query, err)
|
||||
mu.Lock()
|
||||
|
||||
@@ -181,7 +181,7 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
|
||||
}
|
||||
}
|
||||
|
||||
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
|
||||
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
|
||||
Database: dorisQueryParam.Database,
|
||||
Sql: dorisQueryParam.SQL,
|
||||
Keys: types.Keys{
|
||||
|
||||
@@ -57,3 +57,29 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
|
||||
|
||||
return cs.datas[cate][dsId], true
|
||||
}
|
||||
|
||||
func (cs *Cache) Delete(cate string, dsId int64) {
|
||||
cs.mutex.Lock()
|
||||
defer cs.mutex.Unlock()
|
||||
if _, found := cs.datas[cate]; !found {
|
||||
return
|
||||
}
|
||||
delete(cs.datas[cate], dsId)
|
||||
|
||||
logger.Debugf("delete plugin:%s %d from cache", cate, dsId)
|
||||
}
|
||||
|
||||
// GetAllIds 返回缓存中所有数据源的 ID,按类型分组
|
||||
func (cs *Cache) GetAllIds() map[string][]int64 {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
result := make(map[string][]int64)
|
||||
for cate, dsMap := range cs.datas {
|
||||
ids := make([]int64, 0, len(dsMap))
|
||||
for dsId := range dsMap {
|
||||
ids = append(ids, dsId)
|
||||
}
|
||||
result[cate] = ids
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -173,7 +173,10 @@ func esN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
|
||||
}
|
||||
|
||||
func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
// 记录当前有效的数据源 ID,按类型分组
|
||||
validIds := make(map[string]map[int64]struct{})
|
||||
ids := make([]int64, 0)
|
||||
|
||||
for _, item := range items {
|
||||
if item.Type == "prometheus" {
|
||||
continue
|
||||
@@ -202,6 +205,12 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
ids = append(ids, item.Id)
|
||||
|
||||
// 记录有效的数据源 ID
|
||||
if _, ok := validIds[typ]; !ok {
|
||||
validIds[typ] = make(map[int64]struct{})
|
||||
}
|
||||
validIds[typ][item.Id] = struct{}{}
|
||||
|
||||
// 异步初始化 client 不然数据源同步的会很慢
|
||||
go func() {
|
||||
defer func() {
|
||||
@@ -213,5 +222,19 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}()
|
||||
}
|
||||
|
||||
// 删除 items 中不存在但 DsCache 中存在的数据源
|
||||
cachedIds := DsCache.GetAllIds()
|
||||
for cate, dsIds := range cachedIds {
|
||||
for _, dsId := range dsIds {
|
||||
if _, ok := validIds[cate]; !ok {
|
||||
// 该类型在 items 中完全不存在,删除缓存中的所有该类型数据源
|
||||
DsCache.Delete(cate, dsId)
|
||||
} else if _, ok := validIds[cate][dsId]; !ok {
|
||||
// 该数据源 ID 在 items 中不存在,删除
|
||||
DsCache.Delete(cate, dsId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ type Doris struct {
|
||||
FeAddr string `json:"doris.fe_addr" mapstructure:"doris.fe_addr"` // fe http endpoint
|
||||
User string `json:"doris.user" mapstructure:"doris.user"` //
|
||||
Password string `json:"doris.password" mapstructure:"doris.password"` //
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"`
|
||||
Timeout int `json:"doris.timeout" mapstructure:"doris.timeout"` // ms
|
||||
MaxIdleConns int `json:"doris.max_idle_conns" mapstructure:"doris.max_idle_conns"`
|
||||
MaxOpenConns int `json:"doris.max_open_conns" mapstructure:"doris.max_open_conns"`
|
||||
ConnMaxLifetime int `json:"doris.conn_max_lifetime" mapstructure:"doris.conn_max_lifetime"`
|
||||
@@ -127,7 +127,7 @@ func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, cont
|
||||
if timeout == 0 {
|
||||
timeout = 60
|
||||
}
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
|
||||
return context.WithTimeout(ctx, time.Duration(timeout)*time.Millisecond)
|
||||
}
|
||||
|
||||
// ShowDatabases lists all databases in Doris
|
||||
|
||||
@@ -10,13 +10,14 @@ const (
|
||||
TimeseriesAggregationTimestamp = "__ts__"
|
||||
)
|
||||
|
||||
// QueryLogs 查询日志
|
||||
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
|
||||
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
// 等同于 Query()
|
||||
return d.Query(ctx, query)
|
||||
return d.Query(ctx, query, true)
|
||||
}
|
||||
|
||||
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
|
||||
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
|
||||
values, err := d.QueryTimeseries(ctx, query)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,6 +15,10 @@ const (
|
||||
TimeFieldFormatDateTime = "datetime"
|
||||
)
|
||||
|
||||
type noNeedCheckMaxRowKey struct{}
|
||||
|
||||
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
|
||||
|
||||
// 不再拼接SQL, 完全信赖用户的输入
|
||||
type QueryParam struct {
|
||||
Database string `json:"database"`
|
||||
@@ -39,7 +43,7 @@ var (
|
||||
)
|
||||
|
||||
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
|
||||
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
|
||||
// 校验SQL的合法性, 过滤掉 write请求
|
||||
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
|
||||
for _, item := range sqlItem {
|
||||
@@ -48,10 +52,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
}
|
||||
}
|
||||
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if checkMaxRow {
|
||||
// 检查查询结果行数
|
||||
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
|
||||
@@ -63,8 +69,12 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]inte
|
||||
|
||||
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
|
||||
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
|
||||
// 使用 Query 方法执行查询,Query方法内部已包含MaxQueryRows检查
|
||||
rows, err := d.Query(ctx, query)
|
||||
// 默认需要检查,除非调用方声明不需要检查
|
||||
checkMaxRow := true
|
||||
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
|
||||
checkMaxRow = false
|
||||
}
|
||||
rows, err := d.Query(ctx, query, checkMaxRow)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user