Compare commits

...

6 Commits

Author SHA1 Message Date
ning
df9ba52e71 network bg label migrate 2024-11-08 10:32:19 +08:00
ning
ba6d2b664d fix build 2024-11-05 20:14:13 +08:00
Yening Qin
2ddcf507f9 update td query (#2267) 2024-11-05 20:01:11 +08:00
Yening Qin
17cc588a9d refactor: update pushgw append label (#2263) 2024-11-04 18:06:15 +08:00
Yening Qin
e9c7eef546 optimize tdentine (#2261) 2024-11-04 17:32:46 +08:00
Yening Qin
7451ad2e23 update default engine name (#2244) 2024-10-28 15:07:17 +08:00
7 changed files with 234 additions and 33 deletions

View File

@@ -282,7 +282,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
continue
}
series, err := cli.Query(query)
series, err := cli.Query(query,0)
arw.processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
if err != nil {
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)

View File

@@ -48,6 +48,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
cconf.MergeOperationConf()
if config.Alert.Heartbeat.EngineName == "" {
config.Alert.Heartbeat.EngineName = "default"
}
logxClean, err := logx.Init(config.Log)
if err != nil {
return nil, err
@@ -116,7 +120,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
go func() {
if models.CanMigrateBg(ctx) {
models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
}
}()
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)

View File

@@ -82,7 +82,7 @@ func (rt *Router) QueryData(c *gin.Context) {
var err error
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
for _, q := range f.Querys {
datas, err := tdClient.Query(q)
datas, err := tdClient.Query(q, 0)
ginx.Dangerous(err)
resp = append(resp, datas...)
}

View File

@@ -563,6 +563,32 @@ func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}
return DB(ctx).Model(m).Updates(fields).Error
}
func CanMigrateBg(ctx *ctx.Context) bool {
// 1.1 检查 target 表是否为空
var cnt int64
if err := DB(ctx).Model(&Target{}).Count(&cnt).Error; err != nil {
log.Println("failed to get target table count, err:", err)
return false
}
if cnt == 0 {
log.Println("target table is empty, skip migration.")
return false
}
// 1.2 判断是否已经完成迁移
var maxGroupId int64
if err := DB(ctx).Model(&Target{}).Select("MAX(group_id)").Scan(&maxGroupId).Error; err != nil {
log.Println("failed to get max group_id from target table, err:", err)
return false
}
if maxGroupId == 0 {
log.Println("migration bgid has been completed.")
return false
}
return true
}
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
// 1. 判断是否已经完成迁移
var maxGroupId int64

View File

@@ -80,14 +80,15 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
var dataResp DataResponse[T]
err = json.Unmarshal(body, &dataResp)
if err != nil {
return dat, fmt.Errorf("failed to decode response: %w", err)
return dat, fmt.Errorf("failed to decode:%s response: %w", string(body), err)
}
if dataResp.Err != "" {
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
prettyData, _ := json.Marshal(dataResp.Dat)
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
return dataResp.Dat, nil
}
@@ -176,9 +177,9 @@ func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err e
return t, fmt.Errorf("error from server: %s", dataResp.Err)
}
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
prettyData, _ := json.Marshal(dataResp.Dat)
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
return dataResp.Dat, nil
}
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {

View File

@@ -20,6 +20,12 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
for key, value := range target.TagsMap {
if index, has := labelKeys[key]; has {
// e.g. busigroup=cloud
if _, has := labelKeys[rt.Pushgw.BusiGroupLabelKey]; has {
// busigroup key already exists, skip
continue
}
// overwrite labels
if rt.Pushgw.LabelRewrite {
pt.Labels[index].Value = value

View File

@@ -185,11 +185,23 @@ func (tc *tdengineClient) QueryTable(query string) (APIResponse, error) {
}
defer resp.Body.Close()
// 限制响应体大小为10MB
maxSize := int64(10 * 1024 * 1024) // 10MB
limitedReader := http.MaxBytesReader(nil, resp.Body, maxSize)
if resp.StatusCode != http.StatusOK {
return apiResp, fmt.Errorf("HTTP error, status: %s", resp.Status)
}
err = json.NewDecoder(resp.Body).Decode(&apiResp)
return apiResp, err
err = json.NewDecoder(limitedReader).Decode(&apiResp)
if err != nil {
if strings.Contains(err.Error(), "http: request body too large") {
return apiResp, fmt.Errorf("response body exceeds 10MB limit")
}
return apiResp, err
}
return apiResp, nil
}
func (tc *tdengineClient) QueryLog(query interface{}) (APIResponse, error) {
@@ -245,15 +257,23 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colType, ok := colData[1].(string)
if !ok {
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
if int(t) == 9 { // TIMESTAMP type in v2
tsIdx = colIndex
break
}
case string:
// v3版本直接使用字符串类型
if t == "TIMESTAMP" {
tsIdx = colIndex
break
}
default:
logger.Warningf("unexpected column type: %v", colData[1])
return src
}
if colType == "TIMESTAMP" {
tsIdx = colIndex
break
continue
}
}
@@ -262,15 +282,19 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
}
for i := range src.Data {
ts, ok := src.Data[i][tsIdx].(string)
if !ok {
logger.Warningf("unexpected timestamp type: %v", src.Data[i][tsIdx])
continue
}
var t time.Time
var err error
t, err := time.Parse(time.RFC3339Nano, ts)
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", src.Data[i], err)
switch tsVal := src.Data[i][tsIdx].(type) {
case string:
// 尝试解析不同格式的时间字符串
t, err = parseTimeString(tsVal)
if err != nil {
logger.Warningf("parse timestamp string failed: %v, value: %v", err, tsVal)
continue
}
default:
logger.Warningf("unexpected timestamp type: %T, value: %v", tsVal, tsVal)
continue
}
@@ -279,7 +303,73 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
return src
}
func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
func parseTimeString(ts string) (time.Time, error) {
// 尝试不同的时间格式
formats := []string{
// 标准格式
time.Layout, // "01/02 03:04:05PM '06 -0700"
time.ANSIC, // "Mon Jan _2 15:04:05 2006"
time.UnixDate, // "Mon Jan _2 15:04:05 MST 2006"
time.RubyDate, // "Mon Jan 02 15:04:05 -0700 2006"
time.RFC822, // "02 Jan 06 15:04 MST"
time.RFC822Z, // "02 Jan 06 15:04 -0700"
time.RFC850, // "Monday, 02-Jan-06 15:04:05 MST"
time.RFC1123, // "Mon, 02 Jan 2006 15:04:05 MST"
time.RFC1123Z, // "Mon, 02 Jan 2006 15:04:05 -0700"
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
time.RFC3339Nano, // "2006-01-02T15:04:05.999999999Z07:00"
time.Kitchen, // "3:04PM"
// 实用时间戳格式
time.Stamp, // "Jan _2 15:04:05"
time.StampMilli, // "Jan _2 15:04:05.000"
time.StampMicro, // "Jan _2 15:04:05.000000"
time.StampNano, // "Jan _2 15:04:05.000000000"
time.DateTime, // "2006-01-02 15:04:05"
time.DateOnly, // "2006-01-02"
time.TimeOnly, // "15:04:05"
// 常用自定义格式
"2006-01-02T15:04:05", // 无时区的ISO格式
"2006-01-02T15:04:05.000Z",
"2006-01-02T15:04:05Z",
"2006-01-02 15:04:05.999999999", // 纳秒
"2006-01-02 15:04:05.999999", // 微秒
"2006-01-02 15:04:05.999", // 毫秒
"2006/01/02",
"20060102",
"01/02/2006",
"2006年01月02日",
"2006年01月02日 15:04:05",
}
var lastErr error
for _, format := range formats {
t, err := time.Parse(format, ts)
if err == nil {
return t, nil
}
lastErr = err
}
// 尝试解析 Unix 时间戳
if timestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
switch len(ts) {
case 10: // 秒
return time.Unix(timestamp, 0), nil
case 13: // 毫秒
return time.Unix(timestamp/1000, (timestamp%1000)*1000000), nil
case 16: // 微秒
return time.Unix(timestamp/1000000, (timestamp%1000000)*1000), nil
case 19: // 纳秒
return time.Unix(timestamp/1000000000, timestamp%1000000000), nil
}
}
return time.Time{}, fmt.Errorf("failed to parse time with any format: %v", lastErr)
}
func (tc *tdengineClient) Query(query interface{}, delay int) ([]models.DataResp, error) {
b, err := json.Marshal(query)
if err != nil {
return nil, err
@@ -296,9 +386,9 @@ func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
if q.From == "" {
// 2023-09-21T05:37:30.000Z format
to := time.Now().Unix()
to := time.Now().Unix() - int64(delay)
q.To = time.Unix(to, 0).UTC().Format(time.RFC3339)
from := to - q.Interval
from := to - q.Interval - int64(delay)
q.From = time.Unix(from, 0).UTC().Format(time.RFC3339)
}
@@ -368,9 +458,45 @@ func (tc *tdengineClient) GetColumns(database, table string) ([]Column, error) {
return columns, err
}
for _, row := range data.ColumnMeta {
var colType string
switch t := row[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", row[1])
colType = "UNKNOWN"
}
column := Column{
Name: row[0].(string),
Type: row[1].(string),
Type: colType,
Size: int(row[2].(float64)),
}
columns = append(columns, column)
@@ -454,9 +580,44 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
tsIdx := -1
for colIndex, colData := range src.ColumnMeta {
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
colName := colData[0].(string)
colType := colData[1].(string)
var colType string
// 处理v2版本数字类型和v3版本字符串类型
switch t := colData[1].(type) {
case float64:
// v2版本数字类型映射
switch int(t) {
case 1:
colType = "BOOL"
case 2:
colType = "TINYINT"
case 3:
colType = "SMALLINT"
case 4:
colType = "INT"
case 5:
colType = "BIGINT"
case 6:
colType = "FLOAT"
case 7:
colType = "DOUBLE"
case 8:
colType = "BINARY"
case 9:
colType = "TIMESTAMP"
case 10:
colType = "NCHAR"
default:
colType = "UNKNOWN"
}
case string:
// v3版本直接使用字符串类型
colType = t
default:
logger.Warningf("unexpected column type format: %v", colData[1])
continue
}
switch colType {
case "TIMESTAMP":
tsIdx = colIndex
@@ -470,7 +631,6 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
} else {
metricIdxMap[colName] = colIndex
}
default:
if len(labelMap) > 0 {
if _, ok := labelMap[colName]; !ok {
@@ -505,7 +665,7 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
metric[model.MetricNameLabel] = model.LabelValue(metricName)
// transfer 2022-06-29T05:52:16.603Z to unix timestamp
t, err := time.Parse(time.RFC3339, row[tsIdx].(string))
t, err := parseTimeString(row[tsIdx].(string))
if err != nil {
logger.Warningf("parse %v timestamp failed: %v", row, err)
continue