mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-03 06:29:16 +00:00
Compare commits
6 Commits
fix-exec-s
...
stable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
df9ba52e71 | ||
|
|
ba6d2b664d | ||
|
|
2ddcf507f9 | ||
|
|
17cc588a9d | ||
|
|
e9c7eef546 | ||
|
|
7451ad2e23 |
@@ -282,7 +282,7 @@ func (arw *AlertRuleWorker) GetTdengineAnomalyPoint(rule *models.AlertRule, dsId
|
||||
continue
|
||||
}
|
||||
|
||||
series, err := cli.Query(query)
|
||||
series, err := cli.Query(query,0)
|
||||
arw.processor.Stats.CounterQueryDataTotal.WithLabelValues(fmt.Sprintf("%d", arw.datasourceId)).Inc()
|
||||
if err != nil {
|
||||
logger.Warningf("rule_eval rid:%d query data error: %v", rule.Id, err)
|
||||
|
||||
@@ -48,6 +48,10 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
|
||||
cconf.MergeOperationConf()
|
||||
|
||||
if config.Alert.Heartbeat.EngineName == "" {
|
||||
config.Alert.Heartbeat.EngineName = "default"
|
||||
}
|
||||
|
||||
logxClean, err := logx.Init(config.Log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -116,7 +120,11 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
|
||||
redis, sso, ctx, metas, idents, targetCache, userCache, userGroupCache)
|
||||
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, config.Alert, targetCache, busiGroupCache, idents, metas, writers, ctx)
|
||||
|
||||
go models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
|
||||
go func() {
|
||||
if models.CanMigrateBg(ctx) {
|
||||
models.MigrateBg(ctx, pushgwRouter.Pushgw.BusiGroupLabelKey)
|
||||
}
|
||||
}()
|
||||
|
||||
r := httpx.GinEngine(config.Global.RunMode, config.HTTP, configCvalCache.PrintBodyPaths, configCvalCache.PrintAccessLog)
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ func (rt *Router) QueryData(c *gin.Context) {
|
||||
var err error
|
||||
tdClient := rt.TdendgineClients.GetCli(f.DatasourceId)
|
||||
for _, q := range f.Querys {
|
||||
datas, err := tdClient.Query(q)
|
||||
datas, err := tdClient.Query(q, 0)
|
||||
ginx.Dangerous(err)
|
||||
resp = append(resp, datas...)
|
||||
}
|
||||
|
||||
@@ -563,6 +563,32 @@ func (m *Target) UpdateFieldsMap(ctx *ctx.Context, fields map[string]interface{}
|
||||
return DB(ctx).Model(m).Updates(fields).Error
|
||||
}
|
||||
|
||||
func CanMigrateBg(ctx *ctx.Context) bool {
|
||||
// 1.1 检查 target 表是否为空
|
||||
var cnt int64
|
||||
if err := DB(ctx).Model(&Target{}).Count(&cnt).Error; err != nil {
|
||||
log.Println("failed to get target table count, err:", err)
|
||||
return false
|
||||
}
|
||||
if cnt == 0 {
|
||||
log.Println("target table is empty, skip migration.")
|
||||
return false
|
||||
}
|
||||
|
||||
// 1.2 判断是否已经完成迁移
|
||||
var maxGroupId int64
|
||||
if err := DB(ctx).Model(&Target{}).Select("MAX(group_id)").Scan(&maxGroupId).Error; err != nil {
|
||||
log.Println("failed to get max group_id from target table, err:", err)
|
||||
return false
|
||||
}
|
||||
|
||||
if maxGroupId == 0 {
|
||||
log.Println("migration bgid has been completed.")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func MigrateBg(ctx *ctx.Context, bgLabelKey string) {
|
||||
// 1. 判断是否已经完成迁移
|
||||
var maxGroupId int64
|
||||
|
||||
@@ -80,14 +80,15 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
|
||||
var dataResp DataResponse[T]
|
||||
err = json.Unmarshal(body, &dataResp)
|
||||
if err != nil {
|
||||
return dat, fmt.Errorf("failed to decode response: %w", err)
|
||||
return dat, fmt.Errorf("failed to decode:%s response: %w", string(body), err)
|
||||
}
|
||||
|
||||
if dataResp.Err != "" {
|
||||
return dat, fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
|
||||
prettyData, _ := json.Marshal(dataResp.Dat)
|
||||
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
|
||||
return dataResp.Dat, nil
|
||||
}
|
||||
|
||||
@@ -176,9 +177,9 @@ func PostByUrl[T any](url string, cfg conf.CenterApi, v interface{}) (t T, err e
|
||||
return t, fmt.Errorf("error from server: %s", dataResp.Err)
|
||||
}
|
||||
|
||||
logger.Debugf("get data from %s, data: %+v", url, dataResp.Dat)
|
||||
prettyData, _ := json.Marshal(dataResp.Dat)
|
||||
logger.Debugf("get data from %s, data: %s", url, string(prettyData))
|
||||
return dataResp.Dat, nil
|
||||
|
||||
}
|
||||
|
||||
func PostJSON(url string, timeout time.Duration, v interface{}, retries ...int) (response []byte, code int, err error) {
|
||||
|
||||
@@ -20,6 +20,12 @@ func (rt *Router) AppendLabels(pt *prompb.TimeSeries, target *models.Target, bgC
|
||||
|
||||
for key, value := range target.TagsMap {
|
||||
if index, has := labelKeys[key]; has {
|
||||
// e.g. busigroup=cloud
|
||||
if _, has := labelKeys[rt.Pushgw.BusiGroupLabelKey]; has {
|
||||
// busigroup key already exists, skip
|
||||
continue
|
||||
}
|
||||
|
||||
// overwrite labels
|
||||
if rt.Pushgw.LabelRewrite {
|
||||
pt.Labels[index].Value = value
|
||||
|
||||
@@ -185,11 +185,23 @@ func (tc *tdengineClient) QueryTable(query string) (APIResponse, error) {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 限制响应体大小为10MB
|
||||
maxSize := int64(10 * 1024 * 1024) // 10MB
|
||||
limitedReader := http.MaxBytesReader(nil, resp.Body, maxSize)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return apiResp, fmt.Errorf("HTTP error, status: %s", resp.Status)
|
||||
}
|
||||
err = json.NewDecoder(resp.Body).Decode(&apiResp)
|
||||
return apiResp, err
|
||||
|
||||
err = json.NewDecoder(limitedReader).Decode(&apiResp)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "http: request body too large") {
|
||||
return apiResp, fmt.Errorf("response body exceeds 10MB limit")
|
||||
}
|
||||
return apiResp, err
|
||||
}
|
||||
|
||||
return apiResp, nil
|
||||
}
|
||||
|
||||
func (tc *tdengineClient) QueryLog(query interface{}) (APIResponse, error) {
|
||||
@@ -245,15 +257,23 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
|
||||
tsIdx := -1
|
||||
for colIndex, colData := range src.ColumnMeta {
|
||||
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
|
||||
colType, ok := colData[1].(string)
|
||||
if !ok {
|
||||
// 处理v2版本数字类型和v3版本字符串类型
|
||||
switch t := colData[1].(type) {
|
||||
case float64:
|
||||
// v2版本数字类型映射
|
||||
if int(t) == 9 { // TIMESTAMP type in v2
|
||||
tsIdx = colIndex
|
||||
break
|
||||
}
|
||||
case string:
|
||||
// v3版本直接使用字符串类型
|
||||
if t == "TIMESTAMP" {
|
||||
tsIdx = colIndex
|
||||
break
|
||||
}
|
||||
default:
|
||||
logger.Warningf("unexpected column type: %v", colData[1])
|
||||
return src
|
||||
}
|
||||
|
||||
if colType == "TIMESTAMP" {
|
||||
tsIdx = colIndex
|
||||
break
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,15 +282,19 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
|
||||
}
|
||||
|
||||
for i := range src.Data {
|
||||
ts, ok := src.Data[i][tsIdx].(string)
|
||||
if !ok {
|
||||
logger.Warningf("unexpected timestamp type: %v", src.Data[i][tsIdx])
|
||||
continue
|
||||
}
|
||||
var t time.Time
|
||||
var err error
|
||||
|
||||
t, err := time.Parse(time.RFC3339Nano, ts)
|
||||
if err != nil {
|
||||
logger.Warningf("parse %v timestamp failed: %v", src.Data[i], err)
|
||||
switch tsVal := src.Data[i][tsIdx].(type) {
|
||||
case string:
|
||||
// 尝试解析不同格式的时间字符串
|
||||
t, err = parseTimeString(tsVal)
|
||||
if err != nil {
|
||||
logger.Warningf("parse timestamp string failed: %v, value: %v", err, tsVal)
|
||||
continue
|
||||
}
|
||||
default:
|
||||
logger.Warningf("unexpected timestamp type: %T, value: %v", tsVal, tsVal)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -279,7 +303,73 @@ func TimeFormat(src APIResponse, timeFormat string) APIResponse {
|
||||
return src
|
||||
}
|
||||
|
||||
func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
|
||||
func parseTimeString(ts string) (time.Time, error) {
|
||||
// 尝试不同的时间格式
|
||||
formats := []string{
|
||||
// 标准格式
|
||||
time.Layout, // "01/02 03:04:05PM '06 -0700"
|
||||
time.ANSIC, // "Mon Jan _2 15:04:05 2006"
|
||||
time.UnixDate, // "Mon Jan _2 15:04:05 MST 2006"
|
||||
time.RubyDate, // "Mon Jan 02 15:04:05 -0700 2006"
|
||||
time.RFC822, // "02 Jan 06 15:04 MST"
|
||||
time.RFC822Z, // "02 Jan 06 15:04 -0700"
|
||||
time.RFC850, // "Monday, 02-Jan-06 15:04:05 MST"
|
||||
time.RFC1123, // "Mon, 02 Jan 2006 15:04:05 MST"
|
||||
time.RFC1123Z, // "Mon, 02 Jan 2006 15:04:05 -0700"
|
||||
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
|
||||
time.RFC3339Nano, // "2006-01-02T15:04:05.999999999Z07:00"
|
||||
time.Kitchen, // "3:04PM"
|
||||
|
||||
// 实用时间戳格式
|
||||
time.Stamp, // "Jan _2 15:04:05"
|
||||
time.StampMilli, // "Jan _2 15:04:05.000"
|
||||
time.StampMicro, // "Jan _2 15:04:05.000000"
|
||||
time.StampNano, // "Jan _2 15:04:05.000000000"
|
||||
time.DateTime, // "2006-01-02 15:04:05"
|
||||
time.DateOnly, // "2006-01-02"
|
||||
time.TimeOnly, // "15:04:05"
|
||||
|
||||
// 常用自定义格式
|
||||
"2006-01-02T15:04:05", // 无时区的ISO格式
|
||||
"2006-01-02T15:04:05.000Z",
|
||||
"2006-01-02T15:04:05Z",
|
||||
"2006-01-02 15:04:05.999999999", // 纳秒
|
||||
"2006-01-02 15:04:05.999999", // 微秒
|
||||
"2006-01-02 15:04:05.999", // 毫秒
|
||||
"2006/01/02",
|
||||
"20060102",
|
||||
"01/02/2006",
|
||||
"2006年01月02日",
|
||||
"2006年01月02日 15:04:05",
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for _, format := range formats {
|
||||
t, err := time.Parse(format, ts)
|
||||
if err == nil {
|
||||
return t, nil
|
||||
}
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
// 尝试解析 Unix 时间戳
|
||||
if timestamp, err := strconv.ParseInt(ts, 10, 64); err == nil {
|
||||
switch len(ts) {
|
||||
case 10: // 秒
|
||||
return time.Unix(timestamp, 0), nil
|
||||
case 13: // 毫秒
|
||||
return time.Unix(timestamp/1000, (timestamp%1000)*1000000), nil
|
||||
case 16: // 微秒
|
||||
return time.Unix(timestamp/1000000, (timestamp%1000000)*1000), nil
|
||||
case 19: // 纳秒
|
||||
return time.Unix(timestamp/1000000000, timestamp%1000000000), nil
|
||||
}
|
||||
}
|
||||
|
||||
return time.Time{}, fmt.Errorf("failed to parse time with any format: %v", lastErr)
|
||||
}
|
||||
|
||||
func (tc *tdengineClient) Query(query interface{}, delay int) ([]models.DataResp, error) {
|
||||
b, err := json.Marshal(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -296,9 +386,9 @@ func (tc *tdengineClient) Query(query interface{}) ([]models.DataResp, error) {
|
||||
|
||||
if q.From == "" {
|
||||
// 2023-09-21T05:37:30.000Z format
|
||||
to := time.Now().Unix()
|
||||
to := time.Now().Unix() - int64(delay)
|
||||
q.To = time.Unix(to, 0).UTC().Format(time.RFC3339)
|
||||
from := to - q.Interval
|
||||
from := to - q.Interval - int64(delay)
|
||||
q.From = time.Unix(from, 0).UTC().Format(time.RFC3339)
|
||||
}
|
||||
|
||||
@@ -368,9 +458,45 @@ func (tc *tdengineClient) GetColumns(database, table string) ([]Column, error) {
|
||||
return columns, err
|
||||
}
|
||||
for _, row := range data.ColumnMeta {
|
||||
var colType string
|
||||
switch t := row[1].(type) {
|
||||
case float64:
|
||||
// v2版本数字类型映射
|
||||
switch int(t) {
|
||||
case 1:
|
||||
colType = "BOOL"
|
||||
case 2:
|
||||
colType = "TINYINT"
|
||||
case 3:
|
||||
colType = "SMALLINT"
|
||||
case 4:
|
||||
colType = "INT"
|
||||
case 5:
|
||||
colType = "BIGINT"
|
||||
case 6:
|
||||
colType = "FLOAT"
|
||||
case 7:
|
||||
colType = "DOUBLE"
|
||||
case 8:
|
||||
colType = "BINARY"
|
||||
case 9:
|
||||
colType = "TIMESTAMP"
|
||||
case 10:
|
||||
colType = "NCHAR"
|
||||
default:
|
||||
colType = "UNKNOWN"
|
||||
}
|
||||
case string:
|
||||
// v3版本直接使用字符串类型
|
||||
colType = t
|
||||
default:
|
||||
logger.Warningf("unexpected column type format: %v", row[1])
|
||||
colType = "UNKNOWN"
|
||||
}
|
||||
|
||||
column := Column{
|
||||
Name: row[0].(string),
|
||||
Type: row[1].(string),
|
||||
Type: colType,
|
||||
Size: int(row[2].(float64)),
|
||||
}
|
||||
columns = append(columns, column)
|
||||
@@ -454,9 +580,44 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
|
||||
|
||||
tsIdx := -1
|
||||
for colIndex, colData := range src.ColumnMeta {
|
||||
// 类型参考 https://docs.taosdata.com/taos-sql/data-type/
|
||||
colName := colData[0].(string)
|
||||
colType := colData[1].(string)
|
||||
var colType string
|
||||
// 处理v2版本数字类型和v3版本字符串类型
|
||||
switch t := colData[1].(type) {
|
||||
case float64:
|
||||
// v2版本数字类型映射
|
||||
switch int(t) {
|
||||
case 1:
|
||||
colType = "BOOL"
|
||||
case 2:
|
||||
colType = "TINYINT"
|
||||
case 3:
|
||||
colType = "SMALLINT"
|
||||
case 4:
|
||||
colType = "INT"
|
||||
case 5:
|
||||
colType = "BIGINT"
|
||||
case 6:
|
||||
colType = "FLOAT"
|
||||
case 7:
|
||||
colType = "DOUBLE"
|
||||
case 8:
|
||||
colType = "BINARY"
|
||||
case 9:
|
||||
colType = "TIMESTAMP"
|
||||
case 10:
|
||||
colType = "NCHAR"
|
||||
default:
|
||||
colType = "UNKNOWN"
|
||||
}
|
||||
case string:
|
||||
// v3版本直接使用字符串类型
|
||||
colType = t
|
||||
default:
|
||||
logger.Warningf("unexpected column type format: %v", colData[1])
|
||||
continue
|
||||
}
|
||||
|
||||
switch colType {
|
||||
case "TIMESTAMP":
|
||||
tsIdx = colIndex
|
||||
@@ -470,7 +631,6 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
|
||||
} else {
|
||||
metricIdxMap[colName] = colIndex
|
||||
}
|
||||
|
||||
default:
|
||||
if len(labelMap) > 0 {
|
||||
if _, ok := labelMap[colName]; !ok {
|
||||
@@ -505,7 +665,7 @@ func ConvertToTStData(src APIResponse, key Keys, ref string) ([]models.DataResp,
|
||||
metric[model.MetricNameLabel] = model.LabelValue(metricName)
|
||||
|
||||
// transfer 2022-06-29T05:52:16.603Z to unix timestamp
|
||||
t, err := time.Parse(time.RFC3339, row[tsIdx].(string))
|
||||
t, err := parseTimeString(row[tsIdx].(string))
|
||||
if err != nil {
|
||||
logger.Warningf("parse %v timestamp failed: %v", row, err)
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user