Compare commits

...

1 Commits

Author SHA1 Message Date
ning
18c9f93986 code refactor 2025-06-12 10:55:15 +08:00

View File

@@ -14,6 +14,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/mitchellh/mapstructure"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
ckDriver "gorm.io/driver/clickhouse"
"gorm.io/gorm"
@@ -26,11 +27,15 @@ const (
)
type Clickhouse struct {
Nodes []string `json:"ck.nodes" mapstructure:"ck.nodes"`
User string `json:"ck.user" mapstructure:"ck.user"`
Password string `json:"ck.password" mapstructure:"ck.password"`
Timeout int `json:"ck.timeout" mapstructure:"ck.timeout"`
MaxQueryRows int `json:"ck.max_query_rows" mapstructure:"ck.max_query_rows"`
Nodes []string `json:"ck.nodes" mapstructure:"ck.nodes"`
User string `json:"ck.user" mapstructure:"ck.user"`
Password string `json:"ck.password" mapstructure:"ck.password"`
Protocol string `json:"ck.protocol" mapstructure:"ck.protocol"`
Timeout int `json:"ck.timeout" mapstructure:"ck.timeout"`
MaxQueryRows int `json:"ck.max_query_rows" mapstructure:"ck.max_query_rows"`
MaxIdleConns int `json:"ck.max_idle_conns" mapstructure:"ck.max_idle_conns"`
MaxOpenConns int `json:"ck.max_open_conns" mapstructure:"ck.max_open_conns"`
ConnMaxLifetime int64 `json:"ck.conn_max_lifetime" mapstructure:"ck.conn_max_lifetime"`
Client *gorm.DB `json:"-"`
ClientByHTTP *sql.DB `json:"-"`
@@ -45,57 +50,66 @@ func (c *Clickhouse) InitCli() error {
return fmt.Errorf("not found ck shard, please check datasource config")
}
addr := c.Nodes[0]
url := addr
if !strings.HasPrefix(url, "http://") {
url = "http://" + url
}
resp, err := httplib.Get(url).SetTimeout(time.Second * 1).Response()
// 忽略HTTP Code错误, 因为可能不是HTTP协议
if err != nil {
return err
}
defer resp.Body.Close()
// HTTP 协议
if resp.StatusCode == 200 {
jsonBytes, _ := io.ReadAll(resp.Body)
if len(jsonBytes) > 0 && strings.Contains(strings.ToLower(string(jsonBytes)), "ok.") {
ckconn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{addr},
Auth: clickhouse.Auth{
Username: c.User,
Password: c.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 10 * time.Second,
Protocol: clickhouse.HTTP,
})
if ckconn == nil {
return errors.New("db conn failed")
protocol := strings.ToLower(c.Protocol)
if protocol == "http" || protocol == "" {
url := addr
if !strings.HasPrefix(url, "http://") {
url = "http://" + url
}
resp, err := httplib.Get(url).SetTimeout(time.Second * 1).Response()
// 忽略HTTP Code错误, 因为可能不是HTTP协议
if err != nil {
logger.Errorf("get clickhouse http status failed, url: %s, err: %v", url, err)
} else {
defer resp.Body.Close()
if resp.StatusCode == 200 {
jsonBytes, _ := io.ReadAll(resp.Body)
if len(jsonBytes) > 0 && strings.Contains(strings.ToLower(string(jsonBytes)), "ok.") {
// 说明是 http 协议
ckconn := clickhouse.OpenDB(&clickhouse.Options{
Addr: []string{addr},
Auth: clickhouse.Auth{
Username: c.User,
Password: c.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: 10 * time.Second,
Protocol: clickhouse.HTTP,
})
if ckconn == nil {
return errors.New("db conn failed")
}
c.ClientByHTTP = ckconn
return nil
}
}
c.ClientByHTTP = ckconn
return nil
}
}
db, err := gorm.Open(
ckDriver.New(
ckDriver.Config{
DSN: fmt.Sprintf(ckDataSource,
c.User, c.Password, addr),
DisableDatetimePrecision: true,
DontSupportRenameColumn: true,
SkipInitializeWithVersion: false,
}),
)
if err != nil {
return err
if protocol == "clickhouse" || protocol == "" {
db, err := gorm.Open(
ckDriver.New(
ckDriver.Config{
DSN: fmt.Sprintf(ckDataSource,
c.User, c.Password, addr),
DisableDatetimePrecision: true,
DontSupportRenameColumn: true,
SkipInitializeWithVersion: false,
}),
)
if err != nil {
return err
}
c.Client = db
return nil
}
c.Client = db
return nil
return fmt.Errorf("unsupported clickhouse protocol: %s", c.Protocol)
}
const (