Compare commits

..

1 Commits

Author SHA1 Message Date
Ulric Qin
c353a62656 event.Cluster use target.Cluster instead of rule.Cluster 2022-08-12 13:12:30 +08:00
57 changed files with 398 additions and 1422 deletions

View File

@@ -22,20 +22,20 @@
## Highlighted Features
- **开箱即用**
- 支持 Docker、Helm Chart、云服务等多种部署方式,集数据采集、监控告警、可视化为一体,内置多种监控仪表盘、快捷视图、告警规则模板,导入即可快速使用,**大幅降低云原生监控系统的建设成本、学习成本、使用成本**
- **专业告警**
- 可视化的告警配置和管理,支持丰富的告警规则,提供屏蔽规则、订阅规则的配置能力,支持告警多种送达渠道,支持告警自愈、告警事件管理等
- **云原生**
- 以交钥匙的方式快速构建企业级的云原生监控体系,支持 [**Categraf**](https://github.com/flashcatcloud/categraf)、Telegraf、Grafana-agent 等多种采集器,支持 Prometheus、VictoriaMetrics、M3DB、ElasticSearch 等多种数据库,兼容支持导入 Grafana 仪表盘,**与云原生生态无缝集成**
- **高性能,高可用**
- 得益于夜莺的多数据源管理引擎,和夜莺引擎侧优秀的架构设计,借助于高性能时序库,可以满足数亿时间线的采集、存储、告警分析场景,节省大量成本;
- 夜莺监控组件均可水平扩展,无单点,已在上千家企业部署落地,经受了严苛的生产实践检验。众多互联网头部公司,夜莺集群机器达百台,处理数亿级时间线,重度使用夜莺监控;
- **灵活扩展,中心化管理**
- 夜莺监控,可部署在 1 核 1G 的云主机,可在上百台机器集群化部署,可运行在 K8s 中;也可将时序库、告警引擎等组件下沉到各机房、各 Region兼顾边缘部署和中心化统一管理**解决数据割裂,缺乏统一视图的难题**
- 支持 Docker、Helm Chart 等多种部署方式,内置多种监控盘、快捷视图、告警规则模板,导入即可快速使用,活跃、专业的社区用户也在持续迭代和沉淀更多的最佳实践于产品中
- **兼容并包**
- 支持 [Categraf](https://github.com/flashcatcloud/categraf)、Telegraf、Grafana-agent 等多种采集器,支持 Prometheus、VictoriaMetrics、M3DB 等各种时序数据库,支持对接 Grafana与云原生生态无缝集成
- 集数据采集、可视化、监控告警、数据分析于一体,与云原生生态紧密集成,提供开箱即用的企业级监控分析和告警能力;
- **开放社区**
- 托管于[中国计算机学会开源发展委员会](https://www.ccf.org.cn/kyfzwyh/),有[**快猫星云**](https://flashcat.cloud)和众多公司的持续投入,和数千名社区用户的积极参与,以及夜莺监控项目清晰明确的定位,都保证了夜莺开源社区健康、长久的发展。活跃、专业的社区用户也在持续迭代和沉淀更多的最佳实践于产品中
- 托管于[中国计算机学会开源发展委员会](https://www.ccf.org.cn/kyfzwyh/),有[快猫星云](https://flashcat.cloud)的持续投入,和数千名社区用户的积极参与,以及夜莺监控项目清晰明确的定位,都保证了夜莺开源社区健康、长久的发展;
- **高性能**
- 得益于夜莺的多数据源管理引擎,和夜莺引擎侧优秀的架构设计,借助于高性能时序库,可以满足数亿时间线的采集、存储、告警分析场景,节省大量成本;
- **高可用**
- 夜莺监控组件均可水平扩展,无单点,已在上千家企业部署落地,经受了严苛的生产实践检验。众多互联网头部公司,夜莺集群机器达百台,处理十亿级时间线,重度使用夜莺监控;
- **灵活扩展**
- 夜莺监控可部署在1核1G的云主机可在上百台机器部署集群可运行在K8s中也可将时序库、告警引擎等组件下沉到各机房、各region兼顾边缘部署和中心化管理
> 如果您在使用 Prometheus 过程中,有以下的一个或者多个需求场景,推荐您无缝升级到夜莺:
> 如果您在使用 Prometheus 过程中,有以下的一个或者多个需求场景,推荐您升级到夜莺:
- Prometheus、Alertmanager、Grafana 等多个系统较为割裂,缺乏统一视图,无法开箱即用;
- 通过修改配置文件来管理 Prometheus、Alertmanager 的方式,学习曲线大,协同有难度;
@@ -50,7 +50,7 @@
> 如果您在使用 [Open-Falcon](https://github.com/open-falcon/falcon-plus),我们更推荐您升级到夜莺:
- 关于 Open-Falcon 和夜莺的详细介绍,请参考阅读[云原生监控的十个特点和趋势](https://mp.weixin.qq.com/s?__biz=MzkzNjI5OTM5Nw==&mid=2247483738&idx=1&sn=e8bdbb974a2cd003c1abcc2b5405dd18&chksm=c2a19fb0f5d616a63185cd79277a79a6b80118ef2185890d0683d2bb20451bd9303c78d083c5#rd)。
- 关于 Open-Falcon 和夜莺的详细介绍,请参考阅读[云原生监控的十个特点和趋势](https://mp.weixin.qq.com/s?__biz=MzkzNjI5OTM5Nw==&mid=2247483738&idx=1&sn=e8bdbb974a2cd003c1abcc2b5405dd18&chksm=c2a19fb0f5d616a63185cd79277a79a6b80118ef2185890d0683d2bb20451bd9303c78d083c5#rd)。
> 我们推荐您使用 [Categraf](https://github.com/flashcatcloud/categraf) 作为首选的监控数据采集器:
@@ -65,28 +65,28 @@
## Screenshots
<img src="doc/img/intro.gif" width="480">
<img src="doc/img/intro.gif" width="680">
## Architecture
<img src="doc/img/arch-product.png" width="480">
<img src="doc/img/arch-product.png" width="680">
夜莺监控可以接收各种采集器上报的监控数据(比如 [Categraf](https://github.com/flashcatcloud/categraf)、telegraf、grafana-agent、Prometheus并写入多种流行的时序数据库中可以支持Prometheus、M3DB、VictoriaMetrics、Thanos、TDEngine等提供告警规则、屏蔽规则、订阅规则的配置能力提供监控数据的查看能力提供告警自愈机制告警触发之后自动回调某个webhook地址或者执行某个脚本提供历史告警事件的存储管理、分组查看的能力。
<img src="doc/img/arch-system.png" width="480">
<img src="doc/img/arch-system.png" width="680">
夜莺 v5 版本的设计非常简单,核心是 server 和 webapi 两个模块webapi 无状态放到中心端承接前端请求将用户配置写入数据库server 是告警引擎和数据转发模块,一般随着时序库走,一个时序库就对应一套 server每套 server 可以只用一个实例也可以多个实例组成集群server 可以接收 Categraf、Telegraf、Grafana-Agent、Datadog-Agent、Falcon-Plugins 上报的数据,写入后端时序库,周期性从数据库同步告警规则,然后查询时序库做告警判断。每套 server 依赖一个 redis。
<img src="doc/img/install-vm.png" width="480">
<img src="doc/img/install-vm.png" width="680">
如果单机版本的时序数据库(比如 Prometheus 性能有瓶颈或容灾较差,我们推荐使用 [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics)VictoriaMetrics 架构较为简单性能优异易于部署和运维架构图如上。VictoriaMetrics 更详尽的文档,还请参考其[官网](https://victoriametrics.com/)。
如果单机版本的 Prometheus 性能不够或容灾较差,我们推荐使用 [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics)VictoriaMetrics 架构较为简单性能优异易于部署和运维架构图如上。VictoriaMetrics 更详尽的文档,还请参考其[官网](https://victoriametrics.com/)。
## Community
开源项目要更有生命力,离不开开放的治理架构和源源不断的开发者和用户共同参与,我们致力于建立开放、中立的开源治理架构,吸纳更多来自企业、高校等各方面对云原生监控感兴趣、有热情的开发者,一起打造有活力的夜莺开源社区。关于《夜莺开源项目和社区治理架构(草案)》,请查阅 [COMMUNITY GOVERNANCE](./doc/community-governance.md).
开源项目要更有生命力,离不开开放的治理架构和源源不断的开发者和用户共同参与,我们致力于建立开放、中立的开源治理架构,吸纳更多来自企业、高校等各方面对云原生监控感兴趣、有热情的开发者,一起打造有活力的夜莺开源社区。关于《夜莺开源项目和社区治理架构(草案)》,请查阅 **[COMMUNITY GOVERNANCE](./doc/community-governance.md)**.
**我们欢迎您以各种方式参与到夜莺开源项目和开源社区中来,工作包括不限于**
- 补充和完善文档 => [n9e.github.io](https://n9e.github.io/)
@@ -119,4 +119,4 @@
## Contact Us
推荐您关注夜莺监控公众号,及时获取相关产品和社区动态:
<img src="doc/img/n9e-vx-new.png" width="120">
<img src="doc/img/n9e-vx-new.png" width="180">

View File

@@ -1,5 +0,0 @@
## Active Contributors
- [xiaoziv](https://github.com/xiaoziv)
- [tanxiao1990](https://github.com/tanxiao1990)
- [bbaobelief](https://github.com/bbaobelief)

View File

@@ -1,5 +0,0 @@
## Committers
- [YeningQin](https://github.com/710leo)
- [FeiKong](https://github.com/kongfei605)
- [XiaqingDai](https://github.com/jsers)

View File

@@ -34,12 +34,13 @@ Committer 承担以下一个或多个职责:
Committer 记录并公示于 **[COMMITTERS](./committers.md)** 列表,并获得 **[CCF ODC](https://www.ccf.org.cn/kyfzwyh/)** 颁发的电子证书,以及享有夜莺开源社区的各种权益和福利。
### 项目管委会(PMC)
### 项目管委会成员(PMC Member)
> 项目管委会作为一个实体,来管理和领导夜莺项目,为整个项目的发展全权负责。项目管委会相关内容记录并公示于文件[PMC](./pmc.md).
> 项目管委会成员,从 Contributor 或者 Committer 中选举产生,他们拥有 [CCFOS/NIGHTINGALE](https://github.com/ccfos/nightingale) 代码仓库的写操作权限,拥有 ccf.org.cn 为后缀的邮箱地址(待上线),拥有 Nightingale 社区相关事务的投票权、以及提名 Committer 候选人的权利。 项目管委会作为一个实体,为整个项目的发展全权负责。项目管委会成员记录并公示于 **[PMC](./pmc.md)** 列表。
- 项目管委会成员(PMC Member),从 Contributor 或者 Committer 中选举产生,他们拥有 [CCFOS/NIGHTINGALE](https://github.com/ccfos/nightingale) 代码仓库的写操作权限,拥有 ccf.org.cn 为后缀的邮箱地址(待上线),拥有 Nightingale 社区相关事务的投票权、以及提名 Committer 候选人的权利。
- 项目管委会主席(PMC Chair),由 **[CCF ODC](https://www.ccf.org.cn/kyfzwyh/)** 从项目管委会成员中任命产生。管委会主席是 CCF ODC 和项目管委会之间的沟通桥梁,履行特定的项目管理职责。
### 项目管委会主席(PMC Chair)
> 项目管委会主席采用任命制,由 **[CCF ODC](https://www.ccf.org.cn/kyfzwyh/)** 从项目管委会成员中任命产生。项目管委会作为一个统一的实体,来管理和领导夜莺项目。管委会主席是 CCF ODC 和项目管委会之间的沟通桥梁,履行特定的项目管理职责。
## 沟通机制(Communication)
1. 我们推荐使用邮件列表来反馈建议(待发布);
@@ -70,4 +71,4 @@ Committer 记录并公示于 **[COMMITTERS](./committers.md)** 列表,并获
2. 提问之前请先搜索 [github issue](https://github.com/ccfos/nightingale/issues)
3. 我们优先推荐通过提交 github issue 来提问,如果[有问题点击这里](https://github.com/ccfos/nightingale/issues/new?assignees=&labels=kind%2Fbug&template=bug_report.yml) | [有需求建议点击这里](https://github.com/ccfos/nightingale/issues/new?assignees=&labels=kind%2Ffeature&template=enhancement.md)
最后,我们推荐你加入微信群,针对相关开放式问题,相互交流咨询 (请先加好友:[UlricGO](https://www.gitlink.org.cn/UlricQin/gist/tree/master/self.jpeg) 备注:夜莺加群+姓名+公司,交流群里会有开发者团队和专业、热心的群友回答问题)
最后,我们推荐你加入微信群,针对相关开放式问题,相互交流咨询 (请先加好友:[UlricGO](https://www.gitlink.org.cn/UlricQin/gist/tree/master/self.jpeg) 备注:夜莺加群+姓名+公司,交流群里会有开发者团队和专业、热心的群友回答问题)

View File

@@ -1,5 +0,0 @@
## Contributors
<a href="https://github.com/ccfos/nightingale/graphs/contributors">
<img src="https://contrib.rocks/image?repo=ccfos/nightingale" />
</a>

View File

@@ -1,5 +0,0 @@
## End Users
- [中移动](https://github.com/ccfos/nightingale/issues/897#issuecomment-1086573166)
- [inke](https://github.com/ccfos/nightingale/issues/897#issuecomment-1099840636)
- [方正证券](https://github.com/ccfos/nightingale/issues/897#issuecomment-1110492461)

View File

@@ -1,7 +0,0 @@
## PMC Chair
- [laiwei](https://github.com/laiwei)
## PMC Member
- [UlricQin](https://github.com/UlricQin)

View File

@@ -150,7 +150,7 @@ services:
- /:/hostfs
- /var/run/docker.sock:/var/run/docker.sock
ports:
- "9100:9100/tcp"
- "8094:8094/tcp"
networks:
- nightingale
depends_on:

View File

@@ -52,7 +52,7 @@ insert into user_group_member(group_id, user_id) values(1, 1);
CREATE TABLE `configs` (
`id` bigint unsigned not null auto_increment,
`ckey` varchar(191) not null,
`cval` varchar(4096) not null default '',
`cval` varchar(1024) not null default '',
PRIMARY KEY (`id`),
UNIQUE KEY (`ckey`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
@@ -226,7 +226,6 @@ CREATE TABLE `chart_share` (
CREATE TABLE `alert_rule` (
`id` bigint unsigned not null auto_increment,
`group_id` bigint not null default 0 comment 'busi group id',
`cate` varchar(128) not null,
`cluster` varchar(128) not null,
`name` varchar(255) not null,
`note` varchar(1024) not null default '',
@@ -265,7 +264,6 @@ CREATE TABLE `alert_mute` (
`id` bigint unsigned not null auto_increment,
`group_id` bigint not null default 0 comment 'busi group id',
`prod` varchar(255) not null default '',
`cate` varchar(128) not null,
`cluster` varchar(128) not null,
`tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value',
`cause` varchar(255) not null default '',
@@ -281,7 +279,6 @@ CREATE TABLE `alert_mute` (
CREATE TABLE `alert_subscribe` (
`id` bigint unsigned not null auto_increment,
`group_id` bigint not null default 0 comment 'busi group id',
`cate` varchar(128) not null,
`cluster` varchar(128) not null,
`rule_id` bigint not null default 0,
`tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value',
@@ -383,7 +380,6 @@ insert into alert_aggr_view(name, rule, cate) values('By RuleName', 'field:rule_
CREATE TABLE `alert_cur_event` (
`id` bigint unsigned not null comment 'use alert_his_event.id',
`cate` varchar(128) not null,
`cluster` varchar(128) not null,
`group_id` bigint unsigned not null comment 'busi group id of rule',
`group_name` varchar(255) not null default '' comment 'busi group name',
@@ -420,7 +416,6 @@ CREATE TABLE `alert_cur_event` (
CREATE TABLE `alert_his_event` (
`id` bigint unsigned not null AUTO_INCREMENT,
`is_recovered` tinyint(1) not null,
`cate` varchar(128) not null,
`cluster` varchar(128) not null,
`group_id` bigint unsigned not null comment 'busi group id of rule',
`group_name` varchar(255) not null default '' comment 'busi group name',
@@ -505,13 +500,3 @@ CREATE TABLE `task_record`
KEY (`create_at`, `group_id`),
KEY (`create_by`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE `alerting_engines`
(
`id` int unsigned NOT NULL AUTO_INCREMENT,
`instance` varchar(128) not null default '' comment 'instance identification, e.g. 10.9.0.9:9090',
`cluster` varchar(128) not null default '' comment 'target reader cluster',
`clock` bigint not null,
PRIMARY KEY (`id`),
UNIQUE KEY (`instance`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;

View File

@@ -54,7 +54,7 @@ insert into user_group_member(group_id, user_id) values(1, 1);
CREATE TABLE configs (
id bigserial,
ckey varchar(191) not null,
cval varchar(4096) not null default ''
cval varchar(1024) not null default ''
) ;
ALTER TABLE configs ADD CONSTRAINT configs_pk PRIMARY KEY (id);
ALTER TABLE configs ADD CONSTRAINT configs_un UNIQUE (ckey);
@@ -580,15 +580,3 @@ CREATE INDEX task_record_create_by_idx ON task_record (create_by);
COMMENT ON COLUMN task_record.id IS 'ibex task id';
COMMENT ON COLUMN task_record.group_id IS 'busi group id';
CREATE TABLE alerting_engines
(
id bigserial NOT NULL,
instance varchar(128) not null default '',
cluster varchar(128) not null default '',
clock bigint not null
) ;
ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_pk PRIMARY KEY (id);
ALTER TABLE alerting_engines ADD CONSTRAINT alerting_engines_un UNIQUE (instance);
COMMENT ON COLUMN alerting_engines.instance IS 'instance identification, e.g. 10.9.0.9:9090';
COMMENT ON COLUMN alerting_engines.cluster IS 'target reader cluster';

View File

@@ -91,10 +91,6 @@ zh:
netstat_tcp_time_wait: TIME_WAIT状态的网络链接数
netstat_udp_socket: UDP状态的网络链接数
#[ping]
ping_percent_packet_loss: ping数据包丢失百分比(%)
ping_result_code: ping返回码('0','1')
processes_blocked: 不可中断的睡眠状态下的进程数('U','D','L')
processes_dead: 回收中的进程数('X')
processes_idle: 挂起的空闲进程数('I')
@@ -132,123 +128,6 @@ zh:
http_response_response_time: http响应用时
http_response_result_code: url探测结果0为正常否则url无法访问
# [aws cloudwatch rds]
cloudwatch_aws_rds_bin_log_disk_usage_average: rds 磁盘使用平均值
cloudwatch_aws_rds_bin_log_disk_usage_maximum: rds 磁盘使用量最大值
cloudwatch_aws_rds_bin_log_disk_usage_minimum: rds binlog 磁盘使用量最低
cloudwatch_aws_rds_bin_log_disk_usage_sample_count: rds binlog 磁盘使用情况样本计数
cloudwatch_aws_rds_bin_log_disk_usage_sum: rds binlog 磁盘使用总和
cloudwatch_aws_rds_burst_balance_average: rds 突发余额平均值
cloudwatch_aws_rds_burst_balance_maximum: rds 突发余额最大值
cloudwatch_aws_rds_burst_balance_minimum: rds 突发余额最低
cloudwatch_aws_rds_burst_balance_sample_count: rds 突发平衡样本计数
cloudwatch_aws_rds_burst_balance_sum: rds 突发余额总和
cloudwatch_aws_rds_cpu_utilization_average: rds cpu 利用率平均值
cloudwatch_aws_rds_cpu_utilization_maximum: rds cpu 利用率最大值
cloudwatch_aws_rds_cpu_utilization_minimum: rds cpu 利用率最低
cloudwatch_aws_rds_cpu_utilization_sample_count: rds cpu 利用率样本计数
cloudwatch_aws_rds_cpu_utilization_sum: rds cpu 利用率总和
cloudwatch_aws_rds_database_connections_average: rds 数据库连接平均值
cloudwatch_aws_rds_database_connections_maximum: rds 数据库连接数最大值
cloudwatch_aws_rds_database_connections_minimum: rds 数据库连接最小
cloudwatch_aws_rds_database_connections_sample_count: rds 数据库连接样本数
cloudwatch_aws_rds_database_connections_sum: rds 数据库连接总和
cloudwatch_aws_rds_db_load_average: rds db 平均负载
cloudwatch_aws_rds_db_load_cpu_average: rds db 负载 cpu 平均值
cloudwatch_aws_rds_db_load_cpu_maximum: rds db 负载 cpu 最大值
cloudwatch_aws_rds_db_load_cpu_minimum: rds db 负载 cpu 最小值
cloudwatch_aws_rds_db_load_cpu_sample_count: rds db 加载 CPU 样本数
cloudwatch_aws_rds_db_load_cpu_sum: rds db 加载cpu总和
cloudwatch_aws_rds_db_load_maximum: rds 数据库负载最大值
cloudwatch_aws_rds_db_load_minimum: rds 数据库负载最小值
cloudwatch_aws_rds_db_load_non_cpu_average: rds 加载非 CPU 平均值
cloudwatch_aws_rds_db_load_non_cpu_maximum: rds 加载非 cpu 最大值
cloudwatch_aws_rds_db_load_non_cpu_minimum: rds 加载非 cpu 最小值
cloudwatch_aws_rds_db_load_non_cpu_sample_count: rds 加载非 cpu 样本计数
cloudwatch_aws_rds_db_load_non_cpu_sum: rds 加载非cpu总和
cloudwatch_aws_rds_db_load_sample_count: rds db 加载样本计数
cloudwatch_aws_rds_db_load_sum: rds db 负载总和
cloudwatch_aws_rds_disk_queue_depth_average: rds 磁盘队列深度平均值
cloudwatch_aws_rds_disk_queue_depth_maximum: rds 磁盘队列深度最大值
cloudwatch_aws_rds_disk_queue_depth_minimum: rds 磁盘队列深度最小值
cloudwatch_aws_rds_disk_queue_depth_sample_count: rds 磁盘队列深度样本计数
cloudwatch_aws_rds_disk_queue_depth_sum: rds 磁盘队列深度总和
cloudwatch_aws_rds_ebs_byte_balance__average: rds ebs 字节余额平均值
cloudwatch_aws_rds_ebs_byte_balance__maximum: rds ebs 字节余额最大值
cloudwatch_aws_rds_ebs_byte_balance__minimum: rds ebs 字节余额最低
cloudwatch_aws_rds_ebs_byte_balance__sample_count: rds ebs 字节余额样本数
cloudwatch_aws_rds_ebs_byte_balance__sum: rds ebs 字节余额总和
cloudwatch_aws_rds_ebsio_balance__average: rds ebsio 余额平均值
cloudwatch_aws_rds_ebsio_balance__maximum: rds ebsio 余额最大值
cloudwatch_aws_rds_ebsio_balance__minimum: rds ebsio 余额最低
cloudwatch_aws_rds_ebsio_balance__sample_count: rds ebsio 平衡样本计数
cloudwatch_aws_rds_ebsio_balance__sum: rds ebsio 余额总和
cloudwatch_aws_rds_free_storage_space_average: rds 免费存储空间平均
cloudwatch_aws_rds_free_storage_space_maximum: rds 最大可用存储空间
cloudwatch_aws_rds_free_storage_space_minimum: rds 最低可用存储空间
cloudwatch_aws_rds_free_storage_space_sample_count: rds 可用存储空间样本数
cloudwatch_aws_rds_free_storage_space_sum: rds 免费存储空间总和
cloudwatch_aws_rds_freeable_memory_average: rds 可用内存平均值
cloudwatch_aws_rds_freeable_memory_maximum: rds 最大可用内存
cloudwatch_aws_rds_freeable_memory_minimum: rds 最小可用内存
cloudwatch_aws_rds_freeable_memory_sample_count: rds 可释放内存样本数
cloudwatch_aws_rds_freeable_memory_sum: rds 可释放内存总和
cloudwatch_aws_rds_lvm_read_iops_average: rds lvm 读取 iops 平均值
cloudwatch_aws_rds_lvm_read_iops_maximum: rds lvm 读取 iops 最大值
cloudwatch_aws_rds_lvm_read_iops_minimum: rds lvm 读取 iops 最低
cloudwatch_aws_rds_lvm_read_iops_sample_count: rds lvm 读取 iops 样本计数
cloudwatch_aws_rds_lvm_read_iops_sum: rds lvm 读取 iops 总和
cloudwatch_aws_rds_lvm_write_iops_average: rds lvm 写入 iops 平均值
cloudwatch_aws_rds_lvm_write_iops_maximum: rds lvm 写入 iops 最大值
cloudwatch_aws_rds_lvm_write_iops_minimum: rds lvm 写入 iops 最低
cloudwatch_aws_rds_lvm_write_iops_sample_count: rds lvm 写入 iops 样本计数
cloudwatch_aws_rds_lvm_write_iops_sum: rds lvm 写入 iops 总和
cloudwatch_aws_rds_network_receive_throughput_average: rds 网络接收吞吐量平均
cloudwatch_aws_rds_network_receive_throughput_maximum: rds 网络接收吞吐量最大值
cloudwatch_aws_rds_network_receive_throughput_minimum: rds 网络接收吞吐量最小值
cloudwatch_aws_rds_network_receive_throughput_sample_count: rds 网络接收吞吐量样本计数
cloudwatch_aws_rds_network_receive_throughput_sum: rds 网络接收吞吐量总和
cloudwatch_aws_rds_network_transmit_throughput_average: rds 网络传输吞吐量平均值
cloudwatch_aws_rds_network_transmit_throughput_maximum: rds 网络传输吞吐量最大
cloudwatch_aws_rds_network_transmit_throughput_minimum: rds 网络传输吞吐量最小值
cloudwatch_aws_rds_network_transmit_throughput_sample_count: rds 网络传输吞吐量样本计数
cloudwatch_aws_rds_network_transmit_throughput_sum: rds 网络传输吞吐量总和
cloudwatch_aws_rds_read_iops_average: rds 读取 iops 平均值
cloudwatch_aws_rds_read_iops_maximum: rds 最大读取 iops
cloudwatch_aws_rds_read_iops_minimum: rds 读取 iops 最低
cloudwatch_aws_rds_read_iops_sample_count: rds 读取 iops 样本计数
cloudwatch_aws_rds_read_iops_sum: rds 读取 iops 总和
cloudwatch_aws_rds_read_latency_average: rds 读取延迟平均值
cloudwatch_aws_rds_read_latency_maximum: rds 读取延迟最大值
cloudwatch_aws_rds_read_latency_minimum: rds 最小读取延迟
cloudwatch_aws_rds_read_latency_sample_count: rds 读取延迟样本计数
cloudwatch_aws_rds_read_latency_sum: rds 读取延迟总和
cloudwatch_aws_rds_read_throughput_average: rds 读取吞吐量平均值
cloudwatch_aws_rds_read_throughput_maximum: rds 最大读取吞吐量
cloudwatch_aws_rds_read_throughput_minimum: rds 最小读取吞吐量
cloudwatch_aws_rds_read_throughput_sample_count: rds 读取吞吐量样本计数
cloudwatch_aws_rds_read_throughput_sum: rds 读取吞吐量总和
cloudwatch_aws_rds_swap_usage_average: rds 交换使用平均值
cloudwatch_aws_rds_swap_usage_maximum: rds 交换使用最大值
cloudwatch_aws_rds_swap_usage_minimum: rds 交换使用量最低
cloudwatch_aws_rds_swap_usage_sample_count: rds 交换使用示例计数
cloudwatch_aws_rds_swap_usage_sum: rds 交换使用总和
cloudwatch_aws_rds_write_iops_average: rds 写入 iops 平均值
cloudwatch_aws_rds_write_iops_maximum: rds 写入 iops 最大值
cloudwatch_aws_rds_write_iops_minimum: rds 写入 iops 最低
cloudwatch_aws_rds_write_iops_sample_count: rds 写入 iops 样本计数
cloudwatch_aws_rds_write_iops_sum: rds 写入 iops 总和
cloudwatch_aws_rds_write_latency_average: rds 写入延迟平均值
cloudwatch_aws_rds_write_latency_maximum: rds 最大写入延迟
cloudwatch_aws_rds_write_latency_minimum: rds 写入延迟最小值
cloudwatch_aws_rds_write_latency_sample_count: rds 写入延迟样本计数
cloudwatch_aws_rds_write_latency_sum: rds 写入延迟总和
cloudwatch_aws_rds_write_throughput_average: rds 写入吞吐量平均值
cloudwatch_aws_rds_write_throughput_maximum: rds 最大写入吞吐量
cloudwatch_aws_rds_write_throughput_minimum: rds 写入吞吐量最小值
cloudwatch_aws_rds_write_throughput_sample_count: rds 写入吞吐量样本计数
cloudwatch_aws_rds_write_throughput_sum: rds 写入吞吐量总和
en:
cpu_usage_idle: "CPU idle rate(unit%)"
cpu_usage_active: "CPU usage rate(unit%)"
@@ -489,7 +368,7 @@ redis_last_key_groups_scrape_duration_milliseconds: Duration of the last key gro
redis_last_slow_execution_duration_seconds: The amount of time needed for last slow execution, in seconds.
redis_latest_fork_seconds: The amount of time needed for last fork, in seconds.
redis_lazyfree_pending_objects: The number of objects waiting to be freed (as a result of calling UNLINK, or FLUSHDB and FLUSHALL with the ASYNC option).
redis_master_repl_offset: The server's current replication offset.
redis_master_repl_offset: The server's current replication offset.
redis_mem_clients_normal: Memory used by normal clients.(Gauge)
redis_mem_clients_slaves: Memory used by replica clients - Starting Redis 7.0, replica buffers share memory with the replication backlog, so this field can show 0 when replicas don't trigger an increase of memory usage.
redis_mem_fragmentation_bytes: Delta between used_memory_rss and used_memory. Note that when the total fragmentation bytes is low (few megabytes), a high ratio (e.g. 1.5 and above) is not an indication of an issue.
@@ -729,7 +608,7 @@ kafka_consumer_lag_millis: Current approximation of consumer lag for a ConsumerG
kafka_topic_partition_under_replicated_partition: 1 if Topic/Partition is under Replicated
# [zookeeper_exporter]
zk_znode_count: The total count of znodes stored
zk_znode_count: The total count of znodes stored
zk_ephemerals_count: The number of Ephemerals nodes
zk_watch_count: The number of watchers setup over Zookeeper nodes.
zk_approximate_data_size: Size of data in bytes that a zookeeper server has in its data tree
@@ -741,4 +620,4 @@ zk_open_file_descriptor_count: Number of file descriptors that a zookeeper serve
zk_max_file_descriptor_count: Maximum number of file descriptors that a zookeeper server can open
zk_avg_latency: Average time in milliseconds for requests to be processed
zk_min_latency: Maximum time in milliseconds for a request to be processed
zk_max_latency: Minimum time in milliseconds for a request to be processed
zk_max_latency: Minimum time in milliseconds for a request to be processed

View File

@@ -13,9 +13,6 @@ EngineDelay = 120
DisableUsageReport = false
# config | database
ReaderFrom = "config"
[Log]
# log write dir
Dir = "logs"
@@ -158,8 +155,15 @@ BasicAuthUser = ""
BasicAuthPass = ""
# timeout settings, unit: ms
Timeout = 30000
DialTimeout = 3000
MaxIdleConnsPerHost = 100
DialTimeout = 10000
TLSHandshakeTimeout = 30000
ExpectContinueTimeout = 1000
IdleConnTimeout = 90000
# time duration, unit: ms
KeepAlive = 30000
MaxConnsPerHost = 0
MaxIdleConns = 100
MaxIdleConnsPerHost = 10
[WriterOpt]
# queue channel count

View File

@@ -12,7 +12,6 @@ import (
type AlertCurEvent struct {
Id int64 `json:"id" gorm:"primaryKey"`
Cate string `json:"cate"`
Cluster string `json:"cluster"`
GroupId int64 `json:"group_id"` // busi group id
GroupName string `json:"group_name"` // busi group name
@@ -156,7 +155,6 @@ func (e *AlertCurEvent) ToHis() *AlertHisEvent {
return &AlertHisEvent{
IsRecovered: isRecovered,
Cate: e.Cate,
Cluster: e.Cluster,
GroupId: e.GroupId,
GroupName: e.GroupName,
@@ -251,7 +249,7 @@ func (e *AlertCurEvent) FillNotifyGroups(cache map[int64]*UserGroup) error {
return nil
}
func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clusters, cates []string, query string) (int64, error) {
func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertCurEvent{}).Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
@@ -266,10 +264,6 @@ func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clu
session = session.Where("cluster in ?", clusters)
}
if len(cates) > 0 {
session = session.Where("cate in ?", cates)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -281,7 +275,7 @@ func AlertCurEventTotal(prod string, bgid, stime, etime int64, severity int, clu
return Count(session)
}
func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clusters, cates []string, query string, limit, offset int) ([]AlertCurEvent, error) {
func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clusters []string, query string, limit, offset int) ([]AlertCurEvent, error) {
session := DB().Where("trigger_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
@@ -296,10 +290,6 @@ func AlertCurEventGets(prod string, bgid, stime, etime int64, severity int, clus
session = session.Where("cluster in ?", clusters)
}
if len(cates) > 0 {
session = session.Where("cate in ?", cates)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {

View File

@@ -7,7 +7,6 @@ import (
type AlertHisEvent struct {
Id int64 `json:"id" gorm:"primaryKey"`
Cate string `json:"cate"`
IsRecovered int `json:"is_recovered"`
Cluster string `json:"cluster"`
GroupId int64 `json:"group_id"`
@@ -92,7 +91,7 @@ func (e *AlertHisEvent) FillNotifyGroups(cache map[int64]*UserGroup) error {
return nil
}
func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, recovered int, clusters, cates []string, query string) (int64, error) {
func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string) (int64, error) {
session := DB().Model(&AlertHisEvent{}).Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
@@ -111,10 +110,6 @@ func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, rec
session = session.Where("cluster in ?", clusters)
}
if len(cates) > 0 {
session = session.Where("cate in ?", cates)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {
@@ -126,7 +121,7 @@ func AlertHisEventTotal(prod string, bgid, stime, etime int64, severity int, rec
return Count(session)
}
func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, recovered int, clusters, cates []string, query string, limit, offset int) ([]AlertHisEvent, error) {
func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, recovered int, clusters []string, query string, limit, offset int) ([]AlertHisEvent, error) {
session := DB().Where("last_eval_time between ? and ? and rule_prod = ?", stime, etime, prod)
if bgid > 0 {
@@ -145,10 +140,6 @@ func AlertHisEventGets(prod string, bgid, stime, etime int64, severity int, reco
session = session.Where("cluster in ?", clusters)
}
if len(cates) > 0 {
session = session.Where("cate in ?", cates)
}
if query != "" {
arr := strings.Fields(query)
for i := 0; i < len(arr); i++ {

View File

@@ -22,7 +22,6 @@ type TagFilter struct {
type AlertMute struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Cate string `json:"cate"`
Prod string `json:"prod"` // product empty means n9e
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Tags ormx.JSONArr `json:"tags"`
@@ -72,7 +71,7 @@ func (m *AlertMute) Verify() error {
}
if m.Etime <= m.Btime {
return fmt.Errorf("oops... etime(%d) <= btime(%d)", m.Etime, m.Btime)
return fmt.Errorf("Oops... etime(%d) <= btime(%d)", m.Etime, m.Btime)
}
if err := m.Parse(); err != nil {

View File

@@ -16,7 +16,6 @@ import (
type AlertRule struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Name string `json:"name"` // rule name
Note string `json:"note"` // will sent in notify
@@ -337,7 +336,7 @@ func AlertRuleGetsByCluster(cluster string) ([]*AlertRule, error) {
return lr, err
}
func AlertRulesGetsBy(prods []string, query, algorithm, cluster string, cates []string, disabled int) ([]*AlertRule, error) {
func AlertRulesGetsBy(prods []string, query string) ([]*AlertRule, error) {
session := DB().Where("prod in (?)", prods)
if query != "" {
@@ -348,22 +347,6 @@ func AlertRulesGetsBy(prods []string, query, algorithm, cluster string, cates []
}
}
if algorithm != "" {
session = session.Where("algorithm = ?", algorithm)
}
if cluster != "" {
session = session.Where("cluster like ?", "%"+cluster+"%")
}
if len(cates) != 0 {
session = session.Where("cate in (?)", cates)
}
if disabled != -1 {
session = session.Where("disabled = ?", disabled)
}
var lst []*AlertRule
err := session.Find(&lst).Error
if err == nil {

View File

@@ -14,7 +14,6 @@ import (
type AlertSubscribe struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"`
Cate string `json:"cate"`
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
RuleId int64 `json:"rule_id"`
RuleName string `json:"rule_name" gorm:"-"` // for fe

View File

@@ -1,94 +0,0 @@
package models
import "time"
type AlertingEngines struct {
Id int64 `json:"id" gorm:"primaryKey"`
Instance string `json:"instance"`
Cluster string `json:"cluster"` // reader cluster
Clock int64 `json:"clock"`
}
func (e *AlertingEngines) TableName() string {
return "alerting_engines"
}
// UpdateCluster 页面上用户会给各个n9e-server分配要关联的目标集群是什么
func (e *AlertingEngines) UpdateCluster(c string) error {
e.Cluster = c
return DB().Model(e).Select("cluster").Updates(e).Error
}
// AlertingEngineGetCluster 根据实例名获取对应的集群名字
func AlertingEngineGetCluster(instance string) (string, error) {
var objs []AlertingEngines
err := DB().Where("instance=?", instance).Find(&objs).Error
if err != nil {
return "", err
}
if len(objs) == 0 {
return "", nil
}
return objs[0].Cluster, nil
}
// AlertingEngineGets 拉取列表数据,用户要在页面上看到所有 n9e-server 实例列表,然后为其分配 cluster
func AlertingEngineGets(where string, args ...interface{}) ([]*AlertingEngines, error) {
var objs []*AlertingEngines
var err error
session := DB().Order("instance")
if where == "" {
err = session.Find(&objs).Error
} else {
err = session.Where(where, args...).Find(&objs).Error
}
return objs, err
}
func AlertingEngineGet(where string, args ...interface{}) (*AlertingEngines, error) {
lst, err := AlertingEngineGets(where, args...)
if err != nil {
return nil, err
}
if len(lst) == 0 {
return nil, nil
}
return lst[0], nil
}
func AlertingEngineGetsInstances(where string, args ...interface{}) ([]string, error) {
var arr []string
var err error
session := DB().Model(new(AlertingEngines)).Order("instance")
if where == "" {
err = session.Pluck("instance", &arr).Error
} else {
err = session.Where(where, args...).Pluck("instance", &arr).Error
}
return arr, err
}
func AlertingEngineHeartbeat(instance string) error {
var total int64
err := DB().Model(new(AlertingEngines)).Where("instance=?", instance).Count(&total).Error
if err != nil {
return err
}
if total == 0 {
// insert
err = DB().Create(&AlertingEngines{
Instance: instance,
Clock: time.Now().Unix(),
}).Error
} else {
// update
err = DB().Model(new(AlertingEngines)).Where("instance=?", instance).Update("clock", time.Now().Unix()).Error
}
return err
}

View File

@@ -119,7 +119,7 @@ func (bg *BusiGroup) Del() error {
return errors.New("Some targets still in the BusiGroup")
}
has, err = Exists(DB().Model(&Board{}).Where("group_id=?", bg.Id))
has, err = Exists(DB().Model(&Dashboard{}).Where("group_id=?", bg.Id))
if err != nil {
return err
}

View File

@@ -116,10 +116,6 @@ func buildTargetWhere(bgid int64, clusters []string, query string) *gorm.DB {
return session
}
func TargetTotalCount() (int64, error) {
return Count(DB().Model(new(Target)))
}
func TargetTotal(bgid int64, clusters []string, query string) (int64, error) {
return Count(buildTargetWhere(bgid, clusters, query))
}

View File

@@ -70,10 +70,6 @@ func MustLoad(fpaths ...string) {
C.EngineDelay = 120
}
if C.ReaderFrom == "" {
C.ReaderFrom = "config"
}
if C.Heartbeat.IP == "" {
// auto detect
// C.Heartbeat.IP = fmt.Sprint(GetOutboundIP())
@@ -85,11 +81,7 @@ func MustLoad(fpaths ...string) {
os.Exit(1)
}
if strings.Contains(hostname, "localhost") {
fmt.Println("Warning! hostname contains substring localhost, setting a more unique hostname is recommended")
}
C.Heartbeat.IP = hostname
C.Heartbeat.IP = hostname + "+" + fmt.Sprint(os.Getpid())
// if C.Heartbeat.IP == "" {
// fmt.Println("heartbeat ip auto got is blank")
@@ -98,6 +90,7 @@ func MustLoad(fpaths ...string) {
}
C.Heartbeat.Endpoint = fmt.Sprintf("%s:%d", C.Heartbeat.IP, C.HTTP.Port)
C.Alerting.RedisPub.ChannelKey = C.Alerting.RedisPub.ChannelPrefix + C.ClusterName
if C.Alerting.Webhook.Enable {
if C.Alerting.Webhook.Timeout == "" {
@@ -187,10 +180,9 @@ type Config struct {
RunMode string
ClusterName string
BusiGroupLabelKey string
AnomalyDataApi []string
EngineDelay int64
DisableUsageReport bool
ReaderFrom string
ForceUseServerTS bool
Log logx.Config
HTTP httpx.Config
BasicAuth gin.Accounts
@@ -202,10 +194,29 @@ type Config struct {
DB ormx.DBConfig
WriterOpt WriterGlobalOpt
Writers []WriterOptions
Reader PromOption
Reader ReaderOptions
Ibex Ibex
}
type ReaderOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string
Timeout int64
DialTimeout int64
TLSHandshakeTimeout int64
ExpectContinueTimeout int64
IdleConnTimeout int64
KeepAlive int64
MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
Headers []string
}
type WriterOptions struct {
Url string
BasicAuthUser string
@@ -304,7 +315,7 @@ func (c *Config) IsDebugMode() bool {
// Get preferred outbound ip of this machine
func GetOutboundIP() net.IP {
conn, err := net.Dial("udp", "223.5.5.5:80")
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
fmt.Println("auto get outbound ip fail:", err)
os.Exit(1)

View File

@@ -1,59 +0,0 @@
package config
import (
"sync"
"github.com/didi/nightingale/v5/src/pkg/prom"
)
type PromClient struct {
prom.API
ClusterName string
sync.RWMutex
}
var ReaderClient *PromClient = &PromClient{}
func (pc *PromClient) Set(clusterName string, c prom.API) {
pc.Lock()
defer pc.Unlock()
pc.ClusterName = clusterName
pc.API = c
}
func (pc *PromClient) Get() (string, prom.API) {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName, pc.API
}
func (pc *PromClient) GetClusterName() string {
pc.RLock()
defer pc.RUnlock()
return pc.ClusterName
}
func (pc *PromClient) GetCli() prom.API {
pc.RLock()
defer pc.RUnlock()
return pc.API
}
func (pc *PromClient) IsNil() bool {
if pc == nil {
return true
}
pc.RLock()
defer pc.RUnlock()
return pc.API == nil
}
func (pc *PromClient) Reset() {
pc.Lock()
defer pc.Unlock()
pc.ClusterName = ""
pc.API = nil
}

View File

@@ -1,81 +0,0 @@
package config
import "sync"
type PromOption struct {
Url string
BasicAuthUser string
BasicAuthPass string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
Headers []string
}
func (po *PromOption) Equal(target PromOption) bool {
if po.Url != target.Url {
return false
}
if po.BasicAuthUser != target.BasicAuthUser {
return false
}
if po.BasicAuthPass != target.BasicAuthPass {
return false
}
if po.Timeout != target.Timeout {
return false
}
if po.DialTimeout != target.DialTimeout {
return false
}
if po.MaxIdleConnsPerHost != target.MaxIdleConnsPerHost {
return false
}
if len(po.Headers) != len(target.Headers) {
return false
}
for i := 0; i < len(po.Headers); i++ {
if po.Headers[i] != target.Headers[i] {
return false
}
}
return true
}
type PromOptionsStruct struct {
Data map[string]PromOption
sync.RWMutex
}
func (pos *PromOptionsStruct) Set(clusterName string, po PromOption) {
pos.Lock()
pos.Data[clusterName] = po
pos.Unlock()
}
func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) {
pos.Lock()
pos.Data = map[string]PromOption{clusterName: po}
pos.Unlock()
}
func (pos *PromOptionsStruct) Get(clusterName string) (PromOption, bool) {
pos.RLock()
defer pos.RUnlock()
ret, has := pos.Data[clusterName]
return ret, has
}
// Data key is cluster name
var PromOptions = &PromOptionsStruct{Data: make(map[string]PromOption)}

View File

@@ -1,131 +0,0 @@
package config
import (
"encoding/json"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/prometheus/client_golang/api"
"github.com/toolkits/pkg/logger"
)
func InitReader() error {
rf := strings.ToLower(strings.TrimSpace(C.ReaderFrom))
if rf == "" || rf == "config" {
return setClientFromPromOption(C.ClusterName, C.Reader)
}
if rf == "database" {
return initFromDatabase()
}
return fmt.Errorf("invalid configuration ReaderFrom: %s", rf)
}
func initFromDatabase() error {
go func() {
for {
loadFromDatabase()
time.Sleep(time.Second)
}
}()
return nil
}
func loadFromDatabase() {
cluster, err := models.AlertingEngineGetCluster(C.Heartbeat.Endpoint)
if err != nil {
logger.Errorf("failed to get current cluster, error: %v", err)
return
}
if cluster == "" {
ReaderClient.Reset()
logger.Warning("no datasource binded to me")
return
}
ckey := "prom." + cluster + ".option"
cval, err := models.ConfigsGet(ckey)
if err != nil {
logger.Errorf("failed to get ckey: %s, error: %v", ckey, err)
return
}
if cval == "" {
ReaderClient.Reset()
return
}
var po PromOption
err = json.Unmarshal([]byte(cval), &po)
if err != nil {
logger.Errorf("failed to unmarshal PromOption: %s", err)
return
}
if ReaderClient.IsNil() {
// first time
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
}
PromOptions.Sets(cluster, po)
return
}
localPo, has := PromOptions.Get(cluster)
if !has || !localPo.Equal(po) {
if err = setClientFromPromOption(cluster, po); err != nil {
logger.Errorf("failed to setClientFromPromOption: %v", err)
return
}
PromOptions.Sets(cluster, po)
return
}
}
func newClientFromPromOption(po PromOption) (api.Client, error) {
return api.NewClient(api.Config{
Address: po.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
},
})
}
func setClientFromPromOption(clusterName string, po PromOption) error {
if clusterName == "" {
return fmt.Errorf("argument clusterName is blank")
}
if po.Url == "" {
return fmt.Errorf("prometheus url is blank")
}
cli, err := newClientFromPromOption(po)
if err != nil {
return fmt.Errorf("failed to newClientFromPromOption: %v", err)
}
ReaderClient.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.BasicAuthUser,
BasicAuthPass: po.BasicAuthPass,
Headers: po.Headers,
}))
return nil
}

View File

@@ -2,7 +2,6 @@ package engine
import (
"context"
"fmt"
"time"
"github.com/toolkits/pkg/logger"
@@ -28,18 +27,6 @@ func Start(ctx context.Context) error {
go sender.StartEmailSender()
go initReporter(func(em map[ErrorType]uint64) {
if len(em) == 0 {
return
}
title := fmt.Sprintf("server %s has some errors, please check server logs for detail", config.C.Heartbeat.IP)
msg := ""
for k, v := range em {
msg += fmt.Sprintf("error: %s, count: %d\n", k, v)
}
notifyToMaintainer(title, msg)
})
return nil
}
@@ -53,10 +40,6 @@ func Reload() {
func reportQueueSize() {
for {
time.Sleep(time.Second)
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
continue
}
promstat.GaugeAlertQueueSize.WithLabelValues(clusterName).Set(float64(EventQueue.Len()))
promstat.GaugeAlertQueueSize.WithLabelValues(config.C.ClusterName).Set(float64(EventQueue.Len()))
}
}

View File

@@ -101,13 +101,12 @@ func genNotice(event *models.AlertCurEvent) Notice {
return Notice{Event: event, Tpls: ntpls}
}
func alertingRedisPub(clusterName string, bs []byte) {
channelKey := config.C.Alerting.RedisPub.ChannelPrefix + clusterName
func alertingRedisPub(bs []byte) {
// pub all alerts to redis
if config.C.Alerting.RedisPub.Enable {
err := storage.Redis.Publish(context.Background(), channelKey, bs).Err()
err := storage.Redis.Publish(context.Background(), config.C.Alerting.RedisPub.ChannelKey, bs).Err()
if err != nil {
logger.Errorf("event_notify: redis publish %s err: %v", channelKey, err)
logger.Errorf("event_notify: redis publish %s err: %v", config.C.Alerting.RedisPub.ChannelKey, err)
}
}
}
@@ -250,7 +249,7 @@ func notify(event *models.AlertCurEvent) {
return
}
alertingRedisPub(event.Cluster, stdinBytes)
alertingRedisPub(stdinBytes)
alertingWebhook(event)
handleNotice(notice, stdinBytes)

View File

@@ -19,22 +19,7 @@ type MaintainMessage struct {
Content string `json:"content"`
}
// notify to maintainer to handle the error
func notifyToMaintainer(title, msg string) {
logger.Errorf("notifyToMaintainer, msg: %s", msg)
users := memsto.UserCache.GetMaintainerUsers()
if len(users) == 0 {
return
}
triggerTime := time.Now().Format("2006/01/02 - 15:04:05")
notifyMaintainerWithPlugin(title, msg, triggerTime, users)
notifyMaintainerWithBuiltin(title, msg, triggerTime, users)
}
func notifyMaintainerWithPlugin(title, msg, triggerTime string, users []*models.User) {
func notifyMaintainerWithPlugin(e error, title, triggerTime string, users []*models.User) {
if !config.C.Alerting.CallPlugin.Enable {
return
}
@@ -42,7 +27,7 @@ func notifyMaintainerWithPlugin(title, msg, triggerTime string, users []*models.
stdinBytes, err := json.Marshal(MaintainMessage{
Tos: users,
Title: title,
Content: "Title: " + title + "\nContent: " + msg + "\nTime: " + triggerTime,
Content: "Title: " + title + "\nContent: " + e.Error() + "\nTime: " + triggerTime,
})
if err != nil {
@@ -54,7 +39,22 @@ func notifyMaintainerWithPlugin(title, msg, triggerTime string, users []*models.
logger.Debugf("notify maintainer with plugin done")
}
func notifyMaintainerWithBuiltin(title, msg, triggerTime string, users []*models.User) {
// notify to maintainer to handle the error
func notifyToMaintainer(e error, title string) {
logger.Errorf("notifyToMaintainer, title:%s, error:%v", title, e)
users := memsto.UserCache.GetMaintainerUsers()
if len(users) == 0 {
return
}
triggerTime := time.Now().Format("2006/01/02 - 15:04:05")
notifyMaintainerWithPlugin(e, title, triggerTime, users)
notifyMaintainerWithBuiltin(e, title, triggerTime, users)
}
func notifyMaintainerWithBuiltin(e error, title, triggerTime string, users []*models.User) {
if len(config.C.Alerting.NotifyBuiltinChannels) == 0 {
return
}
@@ -104,13 +104,13 @@ func notifyMaintainerWithBuiltin(title, msg, triggerTime string, users []*models
if len(emailset) == 0 {
continue
}
content := "Title: " + title + "\nContent: " + msg + "\nTime: " + triggerTime
content := "Title: " + title + "\nContent: " + e.Error() + "\nTime: " + triggerTime
sender.WriteEmail(title, content, StringSetKeys(emailset))
case "dingtalk":
if len(dingtalkset) == 0 {
continue
}
content := "**Title: **" + title + "\n**Content: **" + msg + "\n**Time: **" + triggerTime
content := "**Title: **" + title + "\n**Content: **" + e.Error() + "\n**Time: **" + triggerTime
sender.SendDingtalk(sender.DingtalkMessage{
Title: title,
Text: content,
@@ -121,7 +121,7 @@ func notifyMaintainerWithBuiltin(title, msg, triggerTime string, users []*models
if len(wecomset) == 0 {
continue
}
content := "**Title: **" + title + "\n**Content: **" + msg + "\n**Time: **" + triggerTime
content := "**Title: **" + title + "\n**Content: **" + e.Error() + "\n**Time: **" + triggerTime
sender.SendWecom(sender.WecomMessage{
Text: content,
Tokens: StringSetKeys(wecomset),
@@ -131,7 +131,7 @@ func notifyMaintainerWithBuiltin(title, msg, triggerTime string, users []*models
continue
}
content := "Title: " + title + "\nContent: " + msg + "\nTime: " + triggerTime
content := "Title: " + title + "\nContent: " + e.Error() + "\nTime: " + triggerTime
sender.SendFeishu(sender.FeishuMessage{
Text: content,
AtMobiles: phones,

View File

@@ -1,65 +0,0 @@
package engine
import (
"sync"
"time"
)
type ErrorType string
// register new error here
const (
QueryPrometheusError ErrorType = "QueryPrometheusError"
RuntimeError ErrorType = "RuntimeError"
)
type reporter struct {
sync.Mutex
em map[ErrorType]uint64
cb func(em map[ErrorType]uint64)
}
var rp reporter
func initReporter(cb func(em map[ErrorType]uint64)) {
rp = reporter{cb: cb, em: make(map[ErrorType]uint64)}
rp.Start()
}
func Report(errorType ErrorType) {
rp.report(errorType)
}
func (r *reporter) reset() map[ErrorType]uint64 {
r.Lock()
defer r.Unlock()
if len(r.em) == 0 {
return nil
}
oem := r.em
r.em = make(map[ErrorType]uint64)
return oem
}
func (r *reporter) report(errorType ErrorType) {
r.Lock()
defer r.Unlock()
if count, has := r.em[errorType]; has {
r.em[errorType] = count + 1
} else {
r.em[errorType] = 1
}
}
func (r *reporter) Start() {
for {
select {
case <-time.After(time.Minute):
cur := r.reset()
if cur != nil {
r.cb(cur)
}
}
}
}

View File

@@ -3,15 +3,15 @@ package engine
import (
"context"
"fmt"
"log"
"math/rand"
"sort"
"strings"
"sync"
"time"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/net/httplib"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
@@ -20,6 +20,7 @@ import (
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
@@ -59,88 +60,25 @@ func filterRules() {
}
Workers.Build(mines)
RuleEvalForExternal.Build()
}
type RuleEval struct {
rule *models.AlertRule
fires *AlertCurEventMap
pendings *AlertCurEventMap
fires map[string]*models.AlertCurEvent
pendings map[string]*models.AlertCurEvent
quit chan struct{}
}
type AlertCurEventMap struct {
sync.RWMutex
Data map[string]*models.AlertCurEvent
}
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data = data
}
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data[key] = value
}
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
a.RLock()
defer a.RUnlock()
event, exists := a.Data[key]
return event, exists
}
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
a.Lock()
defer a.Unlock()
event, exists := a.Data[key]
if !exists {
return
}
event.LastEvalTime = lastEvalTime
}
func (a *AlertCurEventMap) Delete(key string) {
a.Lock()
defer a.Unlock()
delete(a.Data, key)
}
func (a *AlertCurEventMap) Keys() []string {
a.RLock()
defer a.RUnlock()
keys := make([]string, 0, len(a.Data))
for k := range a.Data {
keys = append(keys, k)
}
return keys
}
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
a.RLock()
defer a.RUnlock()
return a.Data
}
func NewAlertCurEventMap() *AlertCurEventMap {
return &AlertCurEventMap{
Data: make(map[string]*models.AlertCurEvent),
}
}
func (r *RuleEval) Stop() {
func (r RuleEval) Stop() {
logger.Infof("rule_eval:%d stopping", r.RuleID())
close(r.quit)
}
func (r *RuleEval) RuleID() int64 {
func (r RuleEval) RuleID() int64 {
return r.rule.Id
}
func (r *RuleEval) Start() {
func (r RuleEval) Start() {
logger.Infof("rule_eval:%d started", r.RuleID())
for {
select {
@@ -149,7 +87,7 @@ func (r *RuleEval) Start() {
return
default:
r.Work()
logger.Debugf("rule executed, rule_eval:%d", r.RuleID())
logger.Debugf("rule executed, rule_id=%d", r.RuleID())
interval := r.rule.PromEvalInterval
if interval <= 0 {
interval = 10
@@ -159,29 +97,26 @@ func (r *RuleEval) Start() {
}
}
func (r *RuleEval) Work() {
type AnomalyPoint struct {
Data model.Matrix `json:"data"`
Err string `json:"error"`
}
func (r RuleEval) Work() {
promql := strings.TrimSpace(r.rule.PromQl)
if promql == "" {
logger.Errorf("rule_eval:%d promql is blank", r.RuleID())
return
}
if config.ReaderClient.IsNil() {
logger.Error("reader client is nil")
return
}
clusterName, readerClient := config.ReaderClient.Get()
var value model.Value
var err error
if r.rule.Algorithm == "" && (r.rule.Cate == "" || r.rule.Cate == "prometheus") {
if r.rule.Algorithm == "" {
var warnings prom.Warnings
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
value, warnings, err = reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
//notifyToMaintainer(err, "failed to query prometheus")
Report(QueryPrometheusError)
notifyToMaintainer(err, "failed to query prometheus")
return
}
@@ -189,18 +124,34 @@ func (r *RuleEval) Work() {
logger.Errorf("rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
return
}
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
} else {
var res AnomalyPoint
count := len(config.C.AnomalyDataApi)
for _, i := range rand.Perm(count) {
url := fmt.Sprintf("%s?rid=%d", config.C.AnomalyDataApi[i], r.rule.Id)
err = httplib.Get(url).SetTimeout(time.Duration(3000) * time.Millisecond).ToJSON(&res)
if err != nil {
logger.Errorf("curl %s fail: %v", url, err)
continue
}
if res.Err != "" {
logger.Errorf("curl %s fail: %s", url, res.Err)
continue
}
value = res.Data
logger.Debugf("curl %s get: %+v", url, res.Data)
}
}
r.Judge(clusterName, conv.ConvertVectors(value))
r.judge(conv.ConvertVectors(value))
}
type WorkersType struct {
rules map[string]*RuleEval
rules map[string]RuleEval
recordRules map[string]RecordingRuleEval
}
var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)}
var Workers = &WorkersType{rules: make(map[string]RuleEval), recordRules: make(map[string]RecordingRuleEval)}
func (ws *WorkersType) Build(rids []int64) {
rules := make(map[string]*models.AlertRule)
@@ -246,13 +197,12 @@ func (ws *WorkersType) Build(rids []int64) {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
re := &RuleEval{
re := RuleEval{
rule: rules[hash],
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
fires: firemap,
pendings: make(map[string]*models.AlertCurEvent),
}
go re.Start()
@@ -307,31 +257,20 @@ func (ws *WorkersType) BuildRe(rids []int64) {
}
}
func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) {
now := time.Now().Unix()
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
if !ruleExists {
return
}
// handle recovered events
r.recoverRule(alertingKeys, now)
}
func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vectors []conv.Vector) (map[string]struct{}, bool) {
func (r RuleEval) judge(vectors []conv.Vector) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return map[string]struct{}{}, false
return
}
r.rule = curRule
count := len(vectors)
alertingKeys := make(map[string]struct{})
now := time.Now().Unix()
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s", r.rule.Id, vectors[i].Key))
@@ -339,7 +278,6 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect
// rule disabled in this time span?
if isNoneffective(vectors[i].Timestamp, r.rule) {
logger.Debugf("event_disabled: rule_eval:%d rule:%v timestamp:%d", r.rule.Id, r.rule, vectors[i].Timestamp)
continue
}
@@ -360,15 +298,16 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect
// handle target note
targetIdent, has := vectors[i].Labels["ident"]
targetNote := ""
targetCluster := ""
if has {
target, exists := memsto.TargetCache.Get(string(targetIdent))
if exists {
targetNote = target.Note
targetCluster = target.Cluster
// 对于包含ident的告警事件check一下ident所属bg和rule所属bg是否相同
// 如果告警规则选择了只在本BG生效那其他BG的机器就不能因此规则产生告警
if r.rule.EnableInBG == 1 && target.GroupId != r.rule.GroupId {
logger.Debugf("event_enable_in_bg: rule_eval:%d", r.rule.Id)
continue
}
}
@@ -395,8 +334,7 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect
tagsArr := labelMapToArr(tagsMap)
sort.Strings(tagsArr)
event.Cluster = clusterName
event.Cate = r.rule.Cate
event.Cluster = targetCluster
event.Hash = hash
event.RuleId = r.rule.Id
event.RuleName = r.rule.Name
@@ -422,15 +360,12 @@ func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vect
event.Tags = strings.Join(tagsArr, ",,")
event.IsRecovered = false
event.LastEvalTime = now
if from != "inner" {
event.LastEvalTime = event.TriggerTime
}
r.handleNewEvent(event)
}
return alertingKeys, true
// handle recovered events
r.recoverRule(alertingKeys, now)
}
func readableValue(value float64) string {
@@ -454,30 +389,26 @@ func labelMapToArr(m map[string]string) []string {
return labelStrings
}
func (r *RuleEval) handleNewEvent(event *models.AlertCurEvent) {
func (r RuleEval) handleNewEvent(event *models.AlertCurEvent) {
if event.PromForDuration == 0 {
r.fireEvent(event)
return
}
var preTriggerTime int64
preEvent, has := r.pendings.Get(event.Hash)
_, has := r.pendings[event.Hash]
if has {
r.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
preTriggerTime = preEvent.TriggerTime
r.pendings[event.Hash].LastEvalTime = event.LastEvalTime
} else {
r.pendings.Set(event.Hash, event)
preTriggerTime = event.TriggerTime
r.pendings[event.Hash] = event
}
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
if r.pendings[event.Hash].LastEvalTime-r.pendings[event.Hash].TriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
r.fireEvent(event)
}
}
func (r *RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires.Get(event.Hash); has {
r.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
func (r RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires[event.Hash]; has {
r.fires[event.Hash].LastEvalTime = event.LastEvalTime
if r.rule.NotifyRepeatStep == 0 {
// 说明不想重复通知那就直接返回了nothing to do
@@ -510,87 +441,66 @@ func (r *RuleEval) fireEvent(event *models.AlertCurEvent) {
}
}
func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for _, hash := range r.pendings.Keys() {
if _, has := alertingKeys[hash]; has {
continue
}
r.pendings.Delete(hash)
}
for hash, event := range r.fires.GetAll() {
func (r RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for hash := range r.pendings {
if _, has := alertingKeys[hash]; has {
continue
}
r.recoverEvent(hash, event, now)
delete(r.pendings, hash)
}
for hash, event := range r.fires {
if _, has := alertingKeys[hash]; has {
continue
}
// 如果配置了留观时长,就不能立马恢复了
if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration {
continue
}
// 没查到触发阈值的vector姑且就认为这个vector的值恢复了
// 我确实无法分辨是prom中有值但是未满足阈值所以没返回还是prom中确实丢了一些点导致没有数据可以返回尴尬
delete(r.fires, hash)
delete(r.pendings, hash)
event.IsRecovered = true
event.LastEvalTime = now
// 可能是因为调整了promql才恢复的所以事件里边要体现最新的promql否则用户会比较困惑
// 当然其实rule的各个字段都可能发生变化了都更新一下吧
event.RuleName = r.rule.Name
event.RuleNote = r.rule.Note
event.RuleProd = r.rule.Prod
event.RuleAlgo = r.rule.Algorithm
event.Severity = r.rule.Severity
event.PromForDuration = r.rule.PromForDuration
event.PromQl = r.rule.PromQl
event.PromEvalInterval = r.rule.PromEvalInterval
event.Callbacks = r.rule.Callbacks
event.CallbacksJSON = r.rule.CallbacksJSON
event.RunbookUrl = r.rule.RunbookUrl
event.NotifyRecovered = r.rule.NotifyRecovered
event.NotifyChannels = r.rule.NotifyChannels
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
event.NotifyGroups = r.rule.NotifyGroups
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
r.pushEventToQueue(event)
}
}
func (r *RuleEval) RecoverEvent(hash string, now int64, value float64) {
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return
}
r.rule = curRule
r.pendings.Delete(hash)
event, has := r.fires.Get(hash)
if !has {
return
}
event.TriggerValue = fmt.Sprintf("%.5f", value)
r.recoverEvent(hash, event, now)
}
func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) {
// 如果配置了留观时长,就不能立马恢复了
if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration {
return
}
// 没查到触发阈值的vector姑且就认为这个vector的值恢复了
// 我确实无法分辨是prom中有值但是未满足阈值所以没返回还是prom中确实丢了一些点导致没有数据可以返回尴尬
r.fires.Delete(hash)
r.pendings.Delete(hash)
event.IsRecovered = true
event.LastEvalTime = now
// 可能是因为调整了promql才恢复的所以事件里边要体现最新的promql否则用户会比较困惑
// 当然其实rule的各个字段都可能发生变化了都更新一下吧
event.RuleName = r.rule.Name
event.RuleNote = r.rule.Note
event.RuleProd = r.rule.Prod
event.RuleAlgo = r.rule.Algorithm
event.Severity = r.rule.Severity
event.PromForDuration = r.rule.PromForDuration
event.PromQl = r.rule.PromQl
event.PromEvalInterval = r.rule.PromEvalInterval
event.Callbacks = r.rule.Callbacks
event.CallbacksJSON = r.rule.CallbacksJSON
event.RunbookUrl = r.rule.RunbookUrl
event.NotifyRecovered = r.rule.NotifyRecovered
event.NotifyChannels = r.rule.NotifyChannels
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
event.NotifyGroups = r.rule.NotifyGroups
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
r.pushEventToQueue(event)
}
func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
func (r RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
if !event.IsRecovered {
event.LastSentTime = event.LastEvalTime
r.fires.Set(event.Hash, event)
r.fires[event.Hash] = event
}
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc()
LogEvent(event, "push_queue")
if !EventQueue.PushFront(event) {
logger.Warningf("event_push_queue: queue is full")
}
}
func filterRecordingRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()
@@ -651,12 +561,7 @@ func (r RecordingRuleEval) Work() {
return
}
if config.ReaderClient.IsNil() {
log.Println("reader client is nil")
return
}
value, warnings, err := config.ReaderClient.GetCli().Query(context.Background(), promql, time.Now())
value, warnings, err := reader.Client.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
return
@@ -673,82 +578,3 @@ func (r RecordingRuleEval) Work() {
}
}
}
type RuleEvalForExternalType struct {
sync.RWMutex
rules map[int64]RuleEval
}
var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[int64]RuleEval)}
func (re *RuleEvalForExternalType) Build() {
rids := memsto.AlertRuleCache.GetRuleIds()
rules := make(map[int64]*models.AlertRule)
for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
if rule == nil {
continue
}
re.Lock()
rules[rule.Id] = rule
re.Unlock()
}
// stop old
for rid := range re.rules {
if _, has := rules[rid]; !has {
re.Lock()
delete(re.rules, rid)
re.Unlock()
}
}
// start new
re.Lock()
defer re.Unlock()
for rid := range rules {
if _, has := re.rules[rid]; has {
// already exists
continue
}
elst, err := models.AlertCurEventGetByRule(rules[rid].Id)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
}
firemap := make(map[string]*models.AlertCurEvent)
for i := 0; i < len(elst); i++ {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
newRe := RuleEval{
rule: rules[rid],
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
}
re.rules[rid] = newRe
}
}
func (re *RuleEvalForExternalType) Get(rid int64) (RuleEval, bool) {
rule := memsto.AlertRuleCache.Get(rid)
if rule == nil {
return RuleEval{}, false
}
re.RLock()
defer re.RUnlock()
if ret, has := re.rules[rid]; has {
// already exists
return ret, has
}
return RuleEval{}, false
}

View File

@@ -41,10 +41,6 @@ func toRedis() {
return
}
if config.ReaderClient.IsNil() {
return
}
now := time.Now().Unix()
// clean old idents
@@ -53,7 +49,7 @@ func toRedis() {
Idents.Remove(key)
} else {
// use now as timestamp to redis
err := storage.Redis.HSet(context.Background(), redisKey(config.ReaderClient.GetClusterName()), key, now).Err()
err := storage.Redis.HSet(context.Background(), redisKey(config.C.ClusterName), key, now).Err()
if err != nil {
logger.Errorf("redis hset idents failed: %v", err)
}
@@ -107,14 +103,8 @@ func pushMetrics() {
return
}
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
logger.Warning("cluster name is blank")
return
}
// get all the target heartbeat timestamp
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(clusterName)).Result()
ret, err := storage.Redis.HGetAll(context.Background(), redisKey(config.C.ClusterName)).Result()
if err != nil {
logger.Errorf("handle_idents: redis hgetall fail: %v", err)
return
@@ -131,7 +121,7 @@ func pushMetrics() {
}
if now-clock > dur {
clearDeadIdent(context.Background(), clusterName, ident)
clearDeadIdent(context.Background(), config.C.ClusterName, ident)
} else {
actives[ident] = struct{}{}
}
@@ -163,7 +153,7 @@ func pushMetrics() {
if !has {
// target not exists
target = &models.Target{
Cluster: clusterName,
Cluster: config.C.ClusterName,
Ident: active,
Tags: "",
TagsJSON: []string{},

View File

@@ -27,15 +27,6 @@ var AlertMuteCache = AlertMuteCacheType{
mutes: make(map[int64][]*models.AlertMute),
}
func (amc *AlertMuteCacheType) Reset() {
amc.Lock()
defer amc.Unlock()
amc.statTotal = -1
amc.statLastUpdated = -1
amc.mutes = make(map[int64][]*models.AlertMute)
}
func (amc *AlertMuteCacheType) StatChanged(total, lastUpdated int64) bool {
if amc.statTotal == total && amc.statLastUpdated == lastUpdated {
return false
@@ -99,26 +90,19 @@ func loopSyncAlertMutes() {
func syncAlertMutes() error {
start := time.Now()
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertMuteCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertMuteStatistics(clusterName)
stat, err := models.AlertMuteStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteStatistics")
}
if !AlertMuteCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(0)
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(0)
logger.Debug("alert mutes not changed")
return nil
}
lst, err := models.AlertMuteGetsByCluster(clusterName)
lst, err := models.AlertMuteGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertMuteGetsByCluster")
}
@@ -138,8 +122,8 @@ func syncAlertMutes() error {
AlertMuteCache.Set(oks, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_mutes").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_mutes").Set(float64(len(lst)))
logger.Infof("timer: sync mutes done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@@ -27,15 +27,6 @@ var AlertRuleCache = AlertRuleCacheType{
rules: make(map[int64]*models.AlertRule),
}
func (arc *AlertRuleCacheType) Reset() {
arc.Lock()
defer arc.Unlock()
arc.statTotal = -1
arc.statLastUpdated = -1
arc.rules = make(map[int64]*models.AlertRule)
}
func (arc *AlertRuleCacheType) StatChanged(total, lastUpdated int64) bool {
if arc.statTotal == total && arc.statLastUpdated == lastUpdated {
return false
@@ -96,26 +87,19 @@ func loopSyncAlertRules() {
func syncAlertRules() error {
start := time.Now()
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertRuleCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertRuleStatistics(clusterName)
stat, err := models.AlertRuleStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleStatistics")
}
if !AlertRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(0)
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(0)
logger.Debug("alert rules not changed")
return nil
}
lst, err := models.AlertRuleGetsByCluster(clusterName)
lst, err := models.AlertRuleGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertRuleGetsByCluster")
}
@@ -128,8 +112,8 @@ func syncAlertRules() error {
AlertRuleCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_rules").Set(float64(len(m)))
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_rules").Set(float64(len(m)))
logger.Infof("timer: sync rules done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@@ -27,15 +27,6 @@ var AlertSubscribeCache = AlertSubscribeCacheType{
subs: make(map[int64][]*models.AlertSubscribe),
}
func (c *AlertSubscribeCacheType) Reset() {
c.Lock()
defer c.Unlock()
c.statTotal = -1
c.statLastUpdated = -1
c.subs = make(map[int64][]*models.AlertSubscribe)
}
func (c *AlertSubscribeCacheType) StatChanged(total, lastUpdated int64) bool {
if c.statTotal == total && c.statLastUpdated == lastUpdated {
return false
@@ -102,26 +93,19 @@ func loopSyncAlertSubscribes() {
func syncAlertSubscribes() error {
start := time.Now()
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
AlertSubscribeCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.AlertSubscribeStatistics(clusterName)
stat, err := models.AlertSubscribeStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeStatistics")
}
if !AlertSubscribeCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(0)
logger.Debug("alert subscribes not changed")
return nil
}
lst, err := models.AlertSubscribeGetsByCluster(clusterName)
lst, err := models.AlertSubscribeGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec AlertSubscribeGetsByCluster")
}
@@ -141,8 +125,8 @@ func syncAlertSubscribes() error {
AlertSubscribeCache.Set(subs, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_alert_subscribes").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_alert_subscribes").Set(float64(len(lst)))
logger.Infof("timer: sync subscribes done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@@ -79,14 +79,9 @@ func syncBusiGroups() error {
return errors.WithMessage(err, "failed to exec BusiGroupStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !BusiGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(0)
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(0)
logger.Debug("busi_group not changed")
return nil
}
@@ -99,11 +94,8 @@ func syncBusiGroups() error {
BusiGroupCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_busi_groups").Set(float64(len(m)))
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_busi_groups").Set(float64(len(m)))
logger.Infof("timer: sync busi groups done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@@ -26,15 +26,6 @@ var RecordingRuleCache = RecordingRuleCacheType{
rules: make(map[int64]*models.RecordingRule),
}
func (rrc *RecordingRuleCacheType) Reset() {
rrc.Lock()
defer rrc.Unlock()
rrc.statTotal = -1
rrc.statLastUpdated = -1
rrc.rules = make(map[int64]*models.RecordingRule)
}
func (rrc *RecordingRuleCacheType) StatChanged(total, lastUpdated int64) bool {
if rrc.statTotal == total && rrc.statLastUpdated == lastUpdated {
return false
@@ -95,26 +86,19 @@ func loopSyncRecordingRules() {
func syncRecordingRules() error {
start := time.Now()
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
RecordingRuleCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.RecordingRuleStatistics(clusterName)
stat, err := models.RecordingRuleStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleStatistics")
}
if !RecordingRuleCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(0)
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(0)
logger.Debug("recoding rules not changed")
return nil
}
lst, err := models.RecordingRuleGetsByCluster(clusterName)
lst, err := models.RecordingRuleGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec RecordingRuleGetsByCluster")
}
@@ -127,8 +111,8 @@ func syncRecordingRules() error {
RecordingRuleCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_recording_rules").Set(float64(len(m)))
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_recording_rules").Set(float64(len(m)))
logger.Infof("timer: sync recording rules done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@@ -31,15 +31,6 @@ var TargetCache = TargetCacheType{
targets: make(map[string]*models.Target),
}
func (tc *TargetCacheType) Reset() {
tc.Lock()
defer tc.Unlock()
tc.statTotal = -1
tc.statLastUpdated = -1
tc.targets = make(map[string]*models.Target)
}
func (tc *TargetCacheType) StatChanged(total, lastUpdated int64) bool {
if tc.statTotal == total && tc.statLastUpdated == lastUpdated {
return false
@@ -103,26 +94,19 @@ func loopSyncTargets() {
func syncTargets() error {
start := time.Now()
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
TargetCache.Reset()
logger.Warning("cluster name is blank")
return nil
}
stat, err := models.TargetStatistics(clusterName)
stat, err := models.TargetStatistics(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec TargetStatistics")
}
if !TargetCache.StatChanged(stat.Total, stat.LastUpdated) {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(0)
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(0)
logger.Debug("targets not changed")
return nil
}
lst, err := models.TargetGetsByCluster(clusterName)
lst, err := models.TargetGetsByCluster(config.C.ClusterName)
if err != nil {
return errors.WithMessage(err, "failed to exec TargetGetsByCluster")
}
@@ -145,8 +129,8 @@ func syncTargets() error {
TargetCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_targets").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_targets").Set(float64(len(lst)))
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_targets").Set(float64(len(lst)))
logger.Infof("timer: sync targets done, cost: %dms, number: %d", ms, len(lst))
return nil

View File

@@ -124,14 +124,9 @@ func syncUsers() error {
return errors.WithMessage(err, "failed to exec UserStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !UserCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(0)
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(0)
logger.Debug("users not changed")
return nil
}
@@ -149,11 +144,8 @@ func syncUsers() error {
UserCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_users").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_users").Set(float64(len(m)))
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_users").Set(float64(len(m)))
logger.Infof("timer: sync users done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@@ -106,14 +106,9 @@ func syncUserGroups() error {
return errors.WithMessage(err, "failed to exec UserGroupStatistics")
}
clusterName := config.ReaderClient.GetClusterName()
if !UserGroupCache.StatChanged(stat.Total, stat.LastUpdated) {
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(0)
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0)
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(0)
logger.Debug("user_group not changed")
return nil
}
@@ -150,11 +145,8 @@ func syncUserGroups() error {
UserGroupCache.Set(m, stat.Total, stat.LastUpdated)
ms := time.Since(start).Milliseconds()
if clusterName != "" {
promstat.GaugeCronDuration.WithLabelValues(clusterName, "sync_user_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(clusterName, "sync_user_groups").Set(float64(len(m)))
}
promstat.GaugeCronDuration.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(ms))
promstat.GaugeSyncNumber.WithLabelValues(config.C.ClusterName, "sync_user_groups").Set(float64(len(m)))
logger.Infof("timer: sync user groups done, cost: %dms, number: %d", ms, len(m))
return nil

View File

@@ -4,45 +4,57 @@ import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/storage"
)
// local servers
var localss string
func Heartbeat(ctx context.Context) error {
if err := heartbeat(); err != nil {
if err := heartbeat(ctx); err != nil {
fmt.Println("failed to heartbeat:", err)
return err
}
go loopHeartbeat()
go loopHeartbeat(ctx)
return nil
}
func loopHeartbeat() {
func loopHeartbeat(ctx context.Context) {
interval := time.Duration(config.C.Heartbeat.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := heartbeat(); err != nil {
if err := heartbeat(ctx); err != nil {
logger.Warning(err)
}
}
}
func heartbeat() error {
err := models.AlertingEngineHeartbeat(config.C.Heartbeat.Endpoint)
// hash struct:
// /server/heartbeat/Default -> {
// 10.2.3.4:19000 => $timestamp
// 10.2.3.5:19000 => $timestamp
// }
func redisKey(cluster string) string {
return fmt.Sprintf("/server/heartbeat/%s", cluster)
}
func heartbeat(ctx context.Context) error {
now := time.Now().Unix()
key := redisKey(config.C.ClusterName)
err := storage.Redis.HSet(ctx, key, config.C.Heartbeat.Endpoint, now).Err()
if err != nil {
return err
}
servers, err := ActiveServers()
servers, err := ActiveServers(ctx, config.C.ClusterName)
if err != nil {
return err
}
@@ -57,12 +69,37 @@ func heartbeat() error {
return nil
}
func ActiveServers() ([]string, error) {
cluster, err := models.AlertingEngineGetCluster(config.C.Heartbeat.Endpoint)
func clearDeadServer(ctx context.Context, cluster, endpoint string) {
key := redisKey(cluster)
err := storage.Redis.HDel(ctx, key, endpoint).Err()
if err != nil {
logger.Warningf("failed to hdel %s %s, error: %v", key, endpoint, err)
}
}
func ActiveServers(ctx context.Context, cluster string) ([]string, error) {
ret, err := storage.Redis.HGetAll(ctx, redisKey(cluster)).Result()
if err != nil {
return nil, err
}
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances("cluster = ? and clock > ?", cluster, time.Now().Unix()-30)
now := time.Now().Unix()
dur := int64(20)
actives := make([]string, 0, len(ret))
for endpoint, clockstr := range ret {
clock, err := strconv.ParseInt(clockstr, 10, 64)
if err != nil {
continue
}
if now-clock > dur {
clearDeadServer(ctx, cluster, endpoint)
continue
}
actives = append(actives, endpoint)
}
return actives, nil
}

View File

@@ -1,6 +1,7 @@
package naming
import (
"context"
"sort"
"github.com/didi/nightingale/v5/src/server/config"
@@ -8,7 +9,7 @@ import (
)
func IamLeader() (bool, error) {
servers, err := ActiveServers()
servers, err := ActiveServers(context.Background(), config.C.ClusterName)
if err != nil {
logger.Errorf("failed to get active servers: %v", err)
return false, err

View File

@@ -0,0 +1,46 @@
package reader
import (
"net"
"net/http"
"time"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/prometheus/client_golang/api"
)
var Client prom.API
func Init(opts config.ReaderOptions) error {
cli, err := api.NewClient(api.Config{
Address: opts.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(opts.DialTimeout) * time.Millisecond,
KeepAlive: time.Duration(opts.KeepAlive) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(opts.Timeout) * time.Millisecond,
TLSHandshakeTimeout: time.Duration(opts.TLSHandshakeTimeout) * time.Millisecond,
ExpectContinueTimeout: time.Duration(opts.ExpectContinueTimeout) * time.Millisecond,
MaxConnsPerHost: opts.MaxConnsPerHost,
MaxIdleConns: opts.MaxIdleConns,
MaxIdleConnsPerHost: opts.MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(opts.IdleConnTimeout) * time.Millisecond,
},
})
if err != nil {
return err
}
Client = prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: opts.BasicAuthUser,
BasicAuthPass: opts.BasicAuthPass,
Headers: opts.Headers,
})
return nil
}

View File

@@ -18,7 +18,7 @@ import (
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
func New(version string, reloadFunc func()) *gin.Engine {
func New(version string) *gin.Engine {
gin.SetMode(config.C.RunMode)
loggerMid := aop.Logger()
@@ -37,12 +37,12 @@ func New(version string, reloadFunc func()) *gin.Engine {
r.Use(loggerMid)
}
configRoute(r, version, reloadFunc)
configRoute(r, version)
return r
}
func configRoute(r *gin.Engine, version string, reloadFunc func()) {
func configRoute(r *gin.Engine, version string) {
if config.C.HTTP.PProf {
pprof.Register(r, "/api/debug/pprof")
}
@@ -63,13 +63,8 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) {
c.String(200, version)
})
r.POST("/-/reload", func(c *gin.Context) {
reloadFunc()
c.String(200, "reload success")
})
r.GET("/servers/active", func(c *gin.Context) {
lst, err := naming.ActiveServers()
lst, err := naming.ActiveServers(c.Request.Context(), config.C.ClusterName)
ginx.NewRender(c).Data(lst, err)
})
@@ -103,8 +98,6 @@ func configRoute(r *gin.Engine, version string, reloadFunc func()) {
service := r.Group("/v1/n9e")
service.POST("/event", pushEventToQueue)
service.POST("/make-event", makeEvent)
service.POST("/judge-event", judgeEvent)
}
func stat() gin.HandlerFunc {

View File

@@ -269,10 +269,7 @@ func datadogSeries(c *gin.Context) {
}
if succ > 0 {
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "datadog").Add(float64(succ))
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "datadog").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@@ -3,10 +3,8 @@ package router
import (
"fmt"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
promstat "github.com/didi/nightingale/v5/src/server/stat"
@@ -14,7 +12,6 @@ import (
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
)
func pushEventToQueue(c *gin.Context) {
@@ -63,11 +60,7 @@ func pushEventToQueue(c *gin.Context) {
event.NotifyChannels = strings.Join(event.NotifyChannelsJSON, " ")
event.NotifyGroups = strings.Join(event.NotifyGroupsJSON, " ")
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterAlertsTotal.WithLabelValues(cn).Inc()
}
promstat.CounterAlertsTotal.WithLabelValues(config.C.ClusterName).Inc()
engine.LogEvent(event, "http_push_queue")
if !engine.EventQueue.PushFront(event) {
msg := fmt.Sprintf("event:%+v push_queue err: queue is full", event)
@@ -76,45 +69,3 @@ func pushEventToQueue(c *gin.Context) {
}
ginx.NewRender(c).Message(nil)
}
type eventForm struct {
Alert bool `json:"alert"`
Vectors []conv.Vector `json:"vectors"`
RuleId int64 `json:"rule_id"`
Cluster string `json:"cluster"`
}
func judgeEvent(c *gin.Context) {
var form eventForm
ginx.BindJSON(c, &form)
re, exists := engine.RuleEvalForExternal.Get(form.RuleId)
if !exists {
ginx.Bomb(200, "rule not exists")
}
re.Judge(form.Cluster, form.Vectors)
ginx.NewRender(c).Message(nil)
}
func makeEvent(c *gin.Context) {
var events []*eventForm
ginx.BindJSON(c, &events)
now := time.Now().Unix()
for i := 0; i < len(events); i++ {
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId)
logger.Debugf("handle event:%+v exists:%v", events[i], exists)
if !exists {
ginx.Bomb(200, "rule not exists")
}
if events[i].Alert {
go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors)
} else {
for _, vector := range events[i].Vectors {
hash := str.MD5(fmt.Sprintf("%d_%s", events[i].RuleId, vector.Key))
now := vector.Timestamp
go re.RecoverEvent(hash, now, vector.Value)
}
}
}
ginx.NewRender(c).Message(nil)
}

View File

@@ -214,11 +214,7 @@ func falconPush(c *gin.Context) {
}
if succ > 0 {
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "openfalcon").Add(float64(succ))
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "openfalcon").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@@ -208,10 +208,7 @@ func handleOpenTSDB(c *gin.Context) {
}
if succ > 0 {
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "opentsdb").Add(float64(succ))
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "opentsdb").Add(float64(succ))
idents.Idents.MSet(ids)
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/reader"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/writer"
)
@@ -37,12 +38,7 @@ func queryPromql(c *gin.Context) {
var f promqlForm
ginx.BindJSON(c, &f)
if config.ReaderClient.IsNil() {
c.String(500, "reader client is nil")
return
}
value, warnings, err := config.ReaderClient.GetCli().Query(c.Request.Context(), f.PromQL, time.Now())
value, warnings, err := reader.Client.Query(c.Request.Context(), f.PromQL, time.Now())
if err != nil {
c.String(500, "promql:%s error:%v", f.PromQL, err)
return
@@ -146,11 +142,7 @@ func remoteWrite(c *gin.Context) {
writer.Writers.PushSample(metric, req.Timeseries[i])
}
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.CounterSampleTotal.WithLabelValues(cn, "prometheus").Add(float64(count))
}
promstat.CounterSampleTotal.WithLabelValues(config.C.ClusterName, "prometheus").Add(float64(count))
idents.Idents.MSet(ids)
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/didi/nightingale/v5/src/server/idents"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
"github.com/didi/nightingale/v5/src/server/reader"
"github.com/didi/nightingale/v5/src/server/router"
"github.com/didi/nightingale/v5/src/server/stat"
"github.com/didi/nightingale/v5/src/server/usage"
@@ -75,7 +76,9 @@ EXIT:
break EXIT
case syscall.SIGHUP:
// reload configuration?
reload()
logger.Info("start reload configs")
engine.Reload()
logger.Info("reload configs finished")
default:
break EXIT
}
@@ -124,7 +127,7 @@ func (s Server) initialize() (func(), error) {
}
// init prometheus remote reader
if err = config.InitReader(); err != nil {
if err = reader.Init(config.C.Reader); err != nil {
return fns.Ret(), err
}
@@ -144,7 +147,7 @@ func (s Server) initialize() (func(), error) {
stat.Init()
// init http server
r := router.New(s.Version, reload)
r := router.New(s.Version)
httpClean := httpx.Init(config.C.HTTP, r)
fns.Add(httpClean)
@@ -174,9 +177,3 @@ func (fs *Functions) Ret() func() {
}
}
}
func reload() {
logger.Info("start reload configs")
engine.Reload()
logger.Info("reload configs finished")
}

View File

@@ -2,6 +2,7 @@ package usage
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
@@ -11,6 +12,8 @@ import (
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/version"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/reader"
)
const (
@@ -26,6 +29,24 @@ type Usage struct {
Version string `json:"version"`
}
func getSamples() (float64, error) {
value, warns, err := reader.Client.Query(context.Background(), request, time.Now())
if err != nil {
return 0, err
}
if len(warns) > 0 {
return 0, fmt.Errorf("occur some warnings: %v", warns)
}
lst := conv.ConvertVectors(value)
if len(lst) == 0 {
return 0, fmt.Errorf("convert result is empty")
}
return lst[0].Value, nil
}
func Report() {
for {
time.Sleep(time.Minute * 10)
@@ -34,7 +55,7 @@ func Report() {
}
func report() {
tnum, err := models.TargetTotalCount()
sps, err := getSamples()
if err != nil {
return
}
@@ -44,7 +65,7 @@ func report() {
return
}
unum, err := models.UserTotal("")
num, err := models.UserTotal("")
if err != nil {
return
}
@@ -52,8 +73,8 @@ func report() {
maintainer := "blank"
u := Usage{
Samples: float64(tnum),
Users: float64(unum),
Samples: sps,
Users: float64(num),
Hostname: hostname,
Maintainer: maintainer,
Version: version.VERSION,

View File

@@ -49,22 +49,9 @@ func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[
start := time.Now()
defer func() {
cn := config.ReaderClient.GetClusterName()
if cn != "" {
promstat.ForwardDuration.WithLabelValues(cn, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
}
promstat.ForwardDuration.WithLabelValues(config.C.ClusterName, fmt.Sprint(index)).Observe(time.Since(start).Seconds())
}()
if config.C.ForceUseServerTS {
ts := start.UnixMilli()
for i := 0; i < len(items); i++ {
if len(items[i].Samples) == 0 {
continue
}
items[i].Samples[0].Timestamp = ts
}
}
req := &prompb.WriteRequest{
Timeseries: items,
}
@@ -253,16 +240,11 @@ func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
}
func reportChanSize() {
clusterName := config.ReaderClient.GetClusterName()
if clusterName == "" {
return
}
for {
time.Sleep(time.Second * 3)
for i, c := range Writers.chans {
size := len(c)
promstat.GaugeSampleQueueSize.WithLabelValues(clusterName, fmt.Sprint(i)).Set(float64(size))
promstat.GaugeSampleQueueSize.WithLabelValues(config.C.ClusterName, fmt.Sprint(i)).Set(float64(size))
}
}
}

View File

@@ -113,6 +113,7 @@ type ClusterOptions struct {
Timeout int64
DialTimeout int64
KeepAlive int64
UseTLS bool
tls.ClientConfig

View File

@@ -11,7 +11,6 @@ import (
"sync"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/webapi/config"
"github.com/prometheus/client_golang/api"
@@ -30,44 +29,10 @@ type ClustersType struct {
mutex *sync.RWMutex
}
type PromOption struct {
Url string
User string
Pass string
Headers []string
Timeout int64
DialTimeout int64
MaxIdleConnsPerHost int
}
func (cs *ClustersType) Put(name string, cluster *ClusterType) {
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.datas[name] = cluster
// 把配置信息写入DB一份这样n9e-server就可以直接从DB读取了
po := PromOption{
Url: cluster.Opts.Prom,
User: cluster.Opts.BasicAuthUser,
Pass: cluster.Opts.BasicAuthPass,
Headers: cluster.Opts.Headers,
Timeout: cluster.Opts.Timeout,
DialTimeout: cluster.Opts.DialTimeout,
MaxIdleConnsPerHost: cluster.Opts.MaxIdleConnsPerHost,
}
bs, err := json.Marshal(po)
if err != nil {
logger.Fatal("failed to marshal PromOption:", err)
return
}
key := "prom." + name + ".option"
err = models.ConfigsSet(key, string(bs))
if err != nil {
logger.Fatal("failed to set PromOption ", key, " to database, error: ", err)
}
cs.mutex.Unlock()
}
func (cs *ClustersType) Get(name string) (*ClusterType, bool) {

View File

@@ -290,9 +290,6 @@ func configRoute(r *gin.Engine, version string) {
pages.POST("/busi-group/:id/tasks", auth(), user(), perm("/job-tasks/add"), bgrw(), taskAdd)
pages.GET("/busi-group/:id/task/*url", auth(), user(), perm("/job-tasks"), taskProxy)
pages.PUT("/busi-group/:id/task/*url", auth(), user(), perm("/job-tasks/put"), bgrw(), taskProxy)
pages.GET("/servers", auth(), admin(), serversGet)
pages.PUT("/server/:id", auth(), admin(), serverBindCluster)
}
service := r.Group("/v1/n9e")
@@ -321,6 +318,5 @@ func configRoute(r *gin.Engine, version string) {
service.GET("/alert-cur-events", alertCurEventsList)
service.GET("/alert-his-events", alertHisEventsList)
service.GET("/alert-his-event/:eid", alertHisEventGet)
}
}

View File

@@ -46,14 +46,9 @@ func alertCurEventsCard(c *gin.Context) {
clusters := queryClusters(c)
rules := parseAggrRules(c)
prod := ginx.QueryStr(c, "prod", "")
cate := ginx.QueryStr(c, "cate", "$all")
cates := []string{}
if cate != "$all" {
cates = strings.Split(cate, ",")
}
// 最多获取50000个获取太多也没啥意义
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, cates, query, 50000, 0)
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, 50000, 0)
ginx.Dangerous(err)
cardmap := make(map[string]*AlertCard)
@@ -128,16 +123,11 @@ func alertCurEventsList(c *gin.Context) {
busiGroupId := ginx.QueryInt64(c, "bgid", 0)
clusters := queryClusters(c)
prod := ginx.QueryStr(c, "prod", "")
cate := ginx.QueryStr(c, "cate", "$all")
cates := []string{}
if cate != "$all" {
cates = strings.Split(cate, ",")
}
total, err := models.AlertCurEventTotal(prod, busiGroupId, stime, etime, severity, clusters, cates, query)
total, err := models.AlertCurEventTotal(prod, busiGroupId, stime, etime, severity, clusters, query)
ginx.Dangerous(err)
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, cates, query, limit, ginx.Offset(c, limit))
list, err := models.AlertCurEventGets(prod, busiGroupId, stime, etime, severity, clusters, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@@ -1,7 +1,6 @@
package router
import (
"strings"
"time"
"github.com/gin-gonic/gin"
@@ -36,16 +35,11 @@ func alertHisEventsList(c *gin.Context) {
busiGroupId := ginx.QueryInt64(c, "bgid", 0)
clusters := queryClusters(c)
prod := ginx.QueryStr(c, "prod", "")
cate := ginx.QueryStr(c, "cate", "$all")
cates := []string{}
if cate != "$all" {
cates = strings.Split(cate, ",")
}
total, err := models.AlertHisEventTotal(prod, busiGroupId, stime, etime, severity, recovered, clusters, cates, query)
total, err := models.AlertHisEventTotal(prod, busiGroupId, stime, etime, severity, recovered, clusters, query)
ginx.Dangerous(err)
list, err := models.AlertHisEventGets(prod, busiGroupId, stime, etime, severity, recovered, clusters, cates, query, limit, ginx.Offset(c, limit))
list, err := models.AlertHisEventGets(prod, busiGroupId, stime, etime, severity, recovered, clusters, query, limit, ginx.Offset(c, limit))
ginx.Dangerous(err)
cache := make(map[int64]*models.UserGroup)

View File

@@ -26,18 +26,10 @@ func alertRuleGets(c *gin.Context) {
}
func alertRulesGetByService(c *gin.Context) {
prods := strings.Split(ginx.QueryStr(c, "prods", ""), ",")
prods := strings.Fields(ginx.QueryStr(c, "prods", ""))
query := ginx.QueryStr(c, "query", "")
algorithm := ginx.QueryStr(c, "algorithm", "")
cluster := ginx.QueryStr(c, "cluster", "")
cate := ginx.QueryStr(c, "cate", "$all")
cates := []string{}
if cate != "$all" {
cates = strings.Split(cate, ",")
}
disabled := ginx.QueryInt(c, "disabled", -1)
ars, err := models.AlertRulesGetsBy(prods, query, algorithm, cluster, cates, disabled)
ars, err := models.AlertRulesGetsBy(prods, query)
if err == nil {
cache := make(map[int64]*models.UserGroup)
for i := 0; i < len(ars); i++ {

View File

@@ -1,35 +0,0 @@
package router
import (
"github.com/didi/nightingale/v5/src/models"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
)
// 页面上,拉取 server 列表
func serversGet(c *gin.Context) {
list, err := models.AlertingEngineGets("")
ginx.NewRender(c).Data(list, err)
}
type serverBindClusterForm struct {
Cluster string `json:"cluster"`
}
// 用户为某个 n9e-server 分配一个集群也可以清空设置cluster为空字符串即可
// 清空就表示这个server没啥用了可能是要下线掉或者仅仅用作转发器
func serverBindCluster(c *gin.Context) {
id := ginx.UrlParamInt64(c, "id")
ae, err := models.AlertingEngineGet("id = ?", id)
ginx.Dangerous(err)
if ae == nil {
ginx.Dangerous("no such server")
}
var f serverBindClusterForm
ginx.BindJSON(c, &f)
ginx.NewRender(c).Message(ae.UpdateCluster(f.Cluster))
}