Compare commits

..

1 Commits

Author SHA1 Message Date
ning
288878a0f9 update oauth2 2026-02-04 17:20:26 +08:00
10 changed files with 86 additions and 166 deletions

View File

@@ -571,19 +571,12 @@ func (rt *Router) loginCallbackFeiShu(c *gin.Context) {
} else {
user = new(models.User)
defaultRoles := []string{}
defaultUserGroups := []int64{}
if rt.Sso.FeiShu != nil && rt.Sso.FeiShu.FeiShuConfig != nil {
defaultRoles = rt.Sso.FeiShu.FeiShuConfig.DefaultRoles
defaultUserGroups = rt.Sso.FeiShu.FeiShuConfig.DefaultUserGroups
}
user.FullSsoFields(feishu.SsoTypeName, ret.Username, ret.Nickname, ret.Phone, ret.Email, defaultRoles)
// create user from feishu
ginx.Dangerous(user.Add(rt.Ctx))
if len(defaultUserGroups) > 0 {
ginx.Dangerous(user.UpdateUserGroup(rt.Ctx, defaultUserGroups))
}
}
// set user login state

View File

@@ -1,16 +1,13 @@
package router
import (
"context"
"fmt"
"sort"
"sync"
"github.com/ccfos/nightingale/v6/alert/eval"
"github.com/ccfos/nightingale/v6/dscache"
"github.com/ccfos/nightingale/v6/dskit/doris"
"github.com/ccfos/nightingale/v6/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
@@ -120,13 +117,10 @@ func (rt *Router) QueryLogBatch(c *gin.Context) {
}
func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.QueryParam) ([]models.DataResp, error) {
var (
resp []models.DataResp
mu sync.Mutex
wg sync.WaitGroup
errs []error
rCtx = ctx.Request.Context()
)
var resp []models.DataResp
var mu sync.Mutex
var wg sync.WaitGroup
var errs []error
for _, q := range f.Queries {
if !anonymousAccess && !CheckDsPerm(ctx, f.DatasourceId, f.Cate, q) {
@@ -138,17 +132,12 @@ func QueryDataConcurrently(anonymousAccess bool, ctx *gin.Context, f models.Quer
logger.Warningf("cluster:%d not exists", f.DatasourceId)
return nil, fmt.Errorf("cluster not exists")
}
vCtx := rCtx
if f.Cate == models.DORIS {
vCtx = context.WithValue(vCtx, doris.NoNeedCheckMaxRow, true)
}
wg.Add(1)
go func(query interface{}) {
defer wg.Done()
data, err := plug.QueryData(vCtx, query)
data, err := plug.QueryData(ctx.Request.Context(), query)
if err != nil {
logger.Warningf("query data error: req:%+v err:%v", query, err)
mu.Lock()

View File

@@ -79,19 +79,52 @@ func (d *Doris) Equal(p datasource.Datasource) bool {
return false
}
return d.Addr == newest.Addr &&
d.FeAddr == newest.FeAddr &&
d.User == newest.User &&
d.Password == newest.Password &&
d.EnableWrite == newest.EnableWrite &&
d.UserWrite == newest.UserWrite &&
d.PasswordWrite == newest.PasswordWrite &&
d.MaxQueryRows == newest.MaxQueryRows &&
d.Timeout == newest.Timeout &&
d.MaxIdleConns == newest.MaxIdleConns &&
d.MaxOpenConns == newest.MaxOpenConns &&
d.ConnMaxLifetime == newest.ConnMaxLifetime &&
d.ClusterName == newest.ClusterName
// only compare first shard
if d.Addr != newest.Addr {
return false
}
if d.User != newest.User {
return false
}
if d.Password != newest.Password {
return false
}
if d.EnableWrite != newest.EnableWrite {
return false
}
if d.FeAddr != newest.FeAddr {
return false
}
if d.MaxQueryRows != newest.MaxQueryRows {
return false
}
if d.Timeout != newest.Timeout {
return false
}
if d.MaxIdleConns != newest.MaxIdleConns {
return false
}
if d.MaxOpenConns != newest.MaxOpenConns {
return false
}
if d.ConnMaxLifetime != newest.ConnMaxLifetime {
return false
}
if d.ClusterName != newest.ClusterName {
return false
}
return true
}
func (d *Doris) MakeLogQuery(ctx context.Context, query interface{}, eventTags []string, start, end int64) (interface{}, error) {
@@ -148,7 +181,7 @@ func (d *Doris) QueryData(ctx context.Context, query interface{}) ([]models.Data
}
}
items, err := d.QueryTimeseries(ctx, &doris.QueryParam{
items, err := d.QueryTimeseries(context.TODO(), &doris.QueryParam{
Database: dorisQueryParam.Database,
Sql: dorisQueryParam.SQL,
Keys: types.Keys{

View File

@@ -39,9 +39,6 @@ type Doris struct {
MaxQueryRows int `json:"doris.max_query_rows" mapstructure:"doris.max_query_rows"`
ClusterName string `json:"doris.cluster_name" mapstructure:"doris.cluster_name"`
EnableWrite bool `json:"doris.enable_write" mapstructure:"doris.enable_write"`
// 写用户,用来区分读写用户,减少数据源
UserWrite string `json:"doris.user_write" mapstructure:"doris.user_write"`
PasswordWrite string `json:"doris.password_write" mapstructure:"doris.password_write"`
}
// NewDorisWithSettings initializes a new Doris instance with the given settings
@@ -91,13 +88,13 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
var keys []string
keys = append(keys, d.Addr)
keys = append(keys, d.User, d.Password)
keys = append(keys, d.Password, d.User)
if len(database) > 0 {
keys = append(keys, database)
}
cachedKey := strings.Join(keys, ":")
cachedkey := strings.Join(keys, ":")
// cache conn with database
conn, ok := pool.PoolClient.Load(cachedKey)
conn, ok := pool.PoolClient.Load(cachedkey)
if ok {
return conn.(*sql.DB), nil
}
@@ -105,7 +102,7 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
var err error
defer func() {
if db != nil && err == nil {
pool.PoolClient.Store(cachedKey, db)
pool.PoolClient.Store(cachedkey, db)
}
}()
@@ -124,79 +121,6 @@ func (d *Doris) NewConn(ctx context.Context, database string) (*sql.DB, error) {
return db, nil
}
// NewWriteConn establishes a new connection to Doris for write operations
// When EnableWrite is true and UserWrite is configured, it uses the write user credentials
// Otherwise, it reuses the read connection from NewConn
func (d *Doris) NewWriteConn(ctx context.Context, database string) (*sql.DB, error) {
// If write user is not configured, reuse the read connection
if !d.EnableWrite || len(d.UserWrite) == 0 {
return d.NewConn(ctx, database)
}
if len(d.Addr) == 0 {
return nil, errors.New("empty fe-node addr")
}
// Set default values similar to postgres implementation
if d.Timeout == 0 {
d.Timeout = 60000
}
if d.MaxIdleConns == 0 {
d.MaxIdleConns = 10
}
if d.MaxOpenConns == 0 {
d.MaxOpenConns = 100
}
if d.ConnMaxLifetime == 0 {
d.ConnMaxLifetime = 14400
}
if d.MaxQueryRows == 0 {
d.MaxQueryRows = 500
}
// Use write user credentials
user := d.UserWrite
password := d.PasswordWrite
var keys []string
keys = append(keys, d.Addr)
keys = append(keys, user, password)
if len(database) > 0 {
keys = append(keys, database)
}
cachedKey := strings.Join(keys, ":")
// cache conn with database
conn, ok := pool.PoolClient.Load(cachedKey)
if ok {
return conn.(*sql.DB), nil
}
var db *sql.DB
var err error
defer func() {
if db != nil && err == nil {
pool.PoolClient.Store(cachedKey, db)
}
}()
// Simplified connection logic for Doris using MySQL driver
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8", user, password, d.Addr, database)
db, err = sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
// Set connection pool configuration for write connections
// Use more conservative values since write operations are typically less frequent
writeMaxIdleConns := max(d.MaxIdleConns/5, 2)
writeMaxOpenConns := max(d.MaxOpenConns/10, 5)
db.SetMaxIdleConns(writeMaxIdleConns)
db.SetMaxOpenConns(writeMaxOpenConns)
db.SetConnMaxLifetime(time.Duration(d.ConnMaxLifetime) * time.Second)
return db, nil
}
// createTimeoutContext creates a context with timeout based on Doris configuration
func (d *Doris) createTimeoutContext(ctx context.Context) (context.Context, context.CancelFunc) {
timeout := d.Timeout
@@ -548,7 +472,7 @@ func (d *Doris) ExecContext(ctx context.Context, database string, sql string) er
timeoutCtx, cancel := d.createTimeoutContext(ctx)
defer cancel()
db, err := d.NewWriteConn(timeoutCtx, database)
db, err := d.NewConn(timeoutCtx, database)
if err != nil {
return err
}

View File

@@ -10,14 +10,13 @@ const (
TimeseriesAggregationTimestamp = "__ts__"
)
// QueryLogs 查询日志
// TODO: 待测试, MAP/ARRAY/STRUCT/JSON 等类型能否处理
func (d *Doris) QueryLogs(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 等同于 Query()
return d.Query(ctx, query, true)
return d.Query(ctx, query)
}
// QueryHistogram 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
// 本质是查询时序数据, 取第一组, SQL由上层封装, 不再做复杂的解析和截断
func (d *Doris) QueryHistogram(ctx context.Context, query *QueryParam) ([][]float64, error) {
values, err := d.QueryTimeseries(ctx, query)
if err != nil {

View File

@@ -15,10 +15,6 @@ const (
TimeFieldFormatDateTime = "datetime"
)
type noNeedCheckMaxRowKey struct{}
var NoNeedCheckMaxRow = noNeedCheckMaxRowKey{}
// 不再拼接SQL, 完全信赖用户的输入
type QueryParam struct {
Database string `json:"database"`
@@ -43,7 +39,7 @@ var (
)
// Query executes a given SQL query in Doris and returns the results with MaxQueryRows check
func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool) ([]map[string]interface{}, error) {
func (d *Doris) Query(ctx context.Context, query *QueryParam) ([]map[string]interface{}, error) {
// 校验SQL的合法性, 过滤掉 write请求
sqlItem := strings.Split(strings.ToUpper(query.Sql), " ")
for _, item := range sqlItem {
@@ -52,12 +48,10 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
}
}
if checkMaxRow {
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
// 检查查询结果行数
err := d.CheckMaxQueryRows(ctx, query.Database, query.Sql)
if err != nil {
return nil, err
}
rows, err := d.ExecQuery(ctx, query.Database, query.Sql)
@@ -69,12 +63,8 @@ func (d *Doris) Query(ctx context.Context, query *QueryParam, checkMaxRow bool)
// QueryTimeseries executes a time series data query using the given parameters with MaxQueryRows check
func (d *Doris) QueryTimeseries(ctx context.Context, query *QueryParam) ([]types.MetricValues, error) {
// 默认需要检查,除非调用方声明不需要检查
checkMaxRow := true
if noCheck, ok := ctx.Value(NoNeedCheckMaxRow).(bool); ok && noCheck {
checkMaxRow = false
}
rows, err := d.Query(ctx, query, checkMaxRow)
// 使用 Query 方法执行查询Query方法内部已包含MaxQueryRows检查
rows, err := d.Query(ctx, query)
if err != nil {
return nil, err
}

View File

@@ -6,7 +6,7 @@ import (
"fmt"
"github.com/ccfos/nightingale/v6/pkg/ctx"
"github.com/ccfos/nightingale/v6/pkg/poster"
"github.com/ccfos/nightingale/v6/pkg/poster"
"gorm.io/gorm"
)

View File

@@ -315,18 +315,6 @@ func (u *User) UpdatePassword(ctx *ctx.Context, password, updateBy string) error
}).Error
}
func (u *User) UpdateUserGroup(ctx *ctx.Context, userGroupIds []int64) error {
count := len(userGroupIds)
for i := 0; i < count; i++ {
err := UserGroupMemberAdd(ctx, userGroupIds[i], u.Id)
if err != nil {
return err
}
}
return nil
}
func UpdateUserLastActiveTime(ctx *ctx.Context, userId int64, lastActiveTime int64) error {
return DB(ctx).Model(&User{}).Where("id = ?", userId).Updates(map[string]interface{}{
"last_active_time": lastActiveTime,

View File

@@ -32,18 +32,17 @@ type SsoClient struct {
}
type Config struct {
Enable bool `json:"enable"`
AuthURL string `json:"auth_url"`
DisplayName string `json:"display_name"`
AppID string `json:"app_id"`
AppSecret string `json:"app_secret"`
RedirectURL string `json:"redirect_url"`
UsernameField string `json:"username_field"` // name, email, phone
FeiShuEndpoint string `json:"feishu_endpoint"` // 飞书API端点默认为 open.feishu.cn
Proxy string `json:"proxy"`
CoverAttributes bool `json:"cover_attributes"`
DefaultRoles []string `json:"default_roles"`
DefaultUserGroups []int64 `json:"default_user_groups"`
Enable bool `json:"enable"`
AuthURL string `json:"auth_url"`
DisplayName string `json:"display_name"`
AppID string `json:"app_id"`
AppSecret string `json:"app_secret"`
RedirectURL string `json:"redirect_url"`
UsernameField string `json:"username_field"` // name, email, phone
FeiShuEndpoint string `json:"feishu_endpoint"` // 飞书API端点默认为 open.feishu.cn
Proxy string `json:"proxy"`
CoverAttributes bool `json:"cover_attributes"`
DefaultRoles []string `json:"default_roles"`
}
type CallbackOutput struct {
@@ -313,8 +312,6 @@ func (s *SsoClient) Callback(redis storage.Redis, ctx context.Context, code, sta
// 根据UsernameField配置确定username
switch s.FeiShuConfig.UsernameField {
case "userid":
callbackOutput.Username = username
case "name":
if nickname == "" {
return nil, errors.New("feishu user name is empty")

View File

@@ -240,6 +240,13 @@ func (s *SsoClient) getUserInfo(ClientId, UserInfoAddr, accessToken string, Tran
}
r.Header.Add("Authorization", "Bearer "+accessToken)
req = r
} else if TranTokenMethod == "access_token_header" {
r, err := http.NewRequest("GET", UserInfoAddr, nil)
if err != nil {
return nil, err
}
r.Header.Add("access_token", accessToken)
req = r
} else {
r, err := http.NewRequest("GET", UserInfoAddr, nil)
if err != nil {