mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-02 22:19:10 +00:00
Compare commits
1 Commits
v8.1.0
...
optimize-l
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ac70925ed |
@@ -92,7 +92,6 @@ func (s *Set) updateMeta(items map[string]models.HostMeta) {
|
||||
|
||||
func (s *Set) updateTargets(m map[string]models.HostMeta) error {
|
||||
if s.redis == nil {
|
||||
logger.Warningf("redis is nil")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,23 +1,26 @@
|
||||
package dscache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ccfos/nightingale/v6/datasource"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
type Cache struct {
|
||||
datas map[string]map[int64]datasource.Datasource
|
||||
mutex *sync.RWMutex
|
||||
datas map[string]map[int64]datasource.Datasource
|
||||
datasourceStatus map[string]string
|
||||
mutex *sync.RWMutex
|
||||
}
|
||||
|
||||
var DsCache = Cache{
|
||||
datas: make(map[string]map[int64]datasource.Datasource),
|
||||
mutex: new(sync.RWMutex),
|
||||
datas: make(map[string]map[int64]datasource.Datasource),
|
||||
datasourceStatus: make(map[string]string),
|
||||
mutex: new(sync.RWMutex),
|
||||
}
|
||||
|
||||
func (cs *Cache) Put(cate string, dsId int64, ds datasource.Datasource) {
|
||||
func (cs *Cache) Put(cate string, dsId int64, name string, ds datasource.Datasource) {
|
||||
cs.mutex.Lock()
|
||||
if _, found := cs.datas[cate]; !found {
|
||||
cs.datas[cate] = make(map[int64]datasource.Datasource)
|
||||
@@ -34,11 +37,11 @@ func (cs *Cache) Put(cate string, dsId int64, ds datasource.Datasource) {
|
||||
// InitClient() 在用户配置错误或远端不可用时, 会非常耗时, mutex被长期持有, 导致Get()会超时
|
||||
err := ds.InitClient()
|
||||
if err != nil {
|
||||
logger.Errorf("init plugin:%s %d %+v client fail: %v", cate, dsId, ds, err)
|
||||
cs.SetStatus(name, fmt.Sprintf("%s init plugin:%s %d %+v client fail: %v", time.Now().Format("2006-01-02 15:04:05"), cate, dsId, ds, err))
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debugf("init plugin:%s %d %+v client success", cate, dsId, ds)
|
||||
cs.SetStatus(name, fmt.Sprintf("%s init plugin:%s %d %+v client success", time.Now().Format("2006-01-02 15:04:05"), cate, dsId, ds))
|
||||
cs.mutex.Lock()
|
||||
cs.datas[cate][dsId] = ds
|
||||
cs.mutex.Unlock()
|
||||
@@ -57,3 +60,15 @@ func (cs *Cache) Get(cate string, dsId int64) (datasource.Datasource, bool) {
|
||||
|
||||
return cs.datas[cate][dsId], true
|
||||
}
|
||||
|
||||
func (cs *Cache) GetAllStatus() map[string]string {
|
||||
cs.mutex.RLock()
|
||||
defer cs.mutex.RUnlock()
|
||||
return cs.datasourceStatus
|
||||
}
|
||||
|
||||
func (cs *Cache) SetStatus(cate string, status string) {
|
||||
cs.mutex.Lock()
|
||||
cs.datasourceStatus[cate] = status
|
||||
cs.mutex.Unlock()
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package dscache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -60,7 +61,7 @@ func getDatasourcesFromDBLoop(ctx *ctx.Context, fromAPI bool) {
|
||||
atomic.StoreInt64(&PromDefaultDatasourceId, item.Id)
|
||||
}
|
||||
|
||||
logger.Debugf("get datasource: %+v", item)
|
||||
DsCache.SetStatus(item.Name, fmt.Sprintf("%s get datasource: %+v", time.Now().Format("2006-01-02 15:04:05"), item))
|
||||
ds := datasource.DatasourceInfo{
|
||||
Id: item.Id,
|
||||
Name: item.Name,
|
||||
@@ -153,7 +154,6 @@ func osN9eToDatasourceInfo(ds *datasource.DatasourceInfo, item models.Datasource
|
||||
}
|
||||
|
||||
func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
ids := make([]int64, 0)
|
||||
for _, item := range items {
|
||||
if item.Type == "prometheus" {
|
||||
continue
|
||||
@@ -164,27 +164,25 @@ func PutDatasources(items []datasource.DatasourceInfo) {
|
||||
}
|
||||
|
||||
if item.Name == "" {
|
||||
logger.Warningf("cluster name is empty, ignore %+v", item)
|
||||
DsCache.SetStatus(item.Name, fmt.Sprintf("%s cluster name is empty, ignore %+v", time.Now().Format("2006-01-02 15:04:05"), item))
|
||||
continue
|
||||
}
|
||||
|
||||
typ := strings.ReplaceAll(item.Type, ".logging", "")
|
||||
|
||||
ds, err := datasource.GetDatasourceByType(typ, item.Settings)
|
||||
if err != nil {
|
||||
logger.Warningf("get plugin:%+v fail: %v", item, err)
|
||||
DsCache.SetStatus(item.Name, fmt.Sprintf("%s get plugin:%+v fail: %v", time.Now().Format("2006-01-02 15:04:05"), item, err))
|
||||
continue
|
||||
}
|
||||
|
||||
err = ds.Validate(context.Background())
|
||||
if err != nil {
|
||||
logger.Warningf("get plugin:%+v fail: %v", item, err)
|
||||
DsCache.SetStatus(item.Name, fmt.Sprintf("%s get plugin:%+v fail: %v", time.Now().Format("2006-01-02 15:04:05"), item, err))
|
||||
continue
|
||||
}
|
||||
ids = append(ids, item.Id)
|
||||
|
||||
// 异步初始化 client 不然数据源同步的会很慢
|
||||
go DsCache.Put(typ, item.Id, ds)
|
||||
go DsCache.Put(typ, item.Id, item.Name, ds)
|
||||
}
|
||||
|
||||
logger.Debugf("get plugin by type success Ids:%v", ids)
|
||||
}
|
||||
|
||||
@@ -96,8 +96,8 @@ type TargetUpdate struct {
|
||||
|
||||
func (s *Set) UpdateTargets(lst []string, now int64) error {
|
||||
err := updateTargetsUpdateTs(lst, now, s.redis)
|
||||
if err != nil {
|
||||
logger.Errorf("failed to update targets:%v update_ts: %v", lst, err)
|
||||
if err != nil && s.ctx.IsCenter {
|
||||
logger.Warningf("failed to update targets:%v update_ts: %v", lst, err)
|
||||
}
|
||||
|
||||
if !s.ctx.IsCenter {
|
||||
|
||||
Reference in New Issue
Block a user