Compare commits

...

24 Commits

Author SHA1 Message Date
ning
96f3cfa065 add all perms api 2023-03-16 11:40:00 +08:00
ning
144f0ad795 update pkg version 2023-03-08 17:59:03 +08:00
ning
1375ff1435 delete log 2023-03-01 17:43:34 +08:00
ning
26dc03146b update rule enable time 2023-03-01 16:42:08 +08:00
ning
e8378c6858 update rule enable time 2023-03-01 16:14:16 +08:00
ning
da182f1b05 add log 2023-03-01 15:48:02 +08:00
ning
cad0d3cf0f add log 2023-03-01 15:35:42 +08:00
ning
fec1e686f4 add log 2023-03-01 15:25:55 +08:00
ning
a357f11164 add log 2023-03-01 14:59:07 +08:00
ning
d03ba4c4d0 fix: query https api 2023-01-30 16:44:51 +08:00
kongfei605
d531178c9b convert tplx.Funcmap to text.funcmap (#1352) 2023-01-10 21:02:20 +08:00
ning
174df1495c refactor: change some log level 2023-01-10 19:59:41 +08:00
ning
ffe423148d fix: push event api 2023-01-10 19:08:39 +08:00
ning
926559c9a7 refactor: motify log print 2023-01-10 15:54:55 +08:00
Yening Qin
136642f126 optimize handle external event (#1350)
* optimize handle external event
2023-01-10 13:30:45 +08:00
Ulric Qin
a054828fcc Merge branch 'main' of github.com:ccfos/nightingale 2023-01-06 23:39:49 +08:00
Ulric Qin
e46e946689 code refactor 2023-01-06 23:39:37 +08:00
ning
cf083c543b fix: alert mute sync 2023-01-06 16:22:41 +08:00
xiaoziv
2e1508fdd3 feat: rule engine rewrite (#1340)
* feat: rule engine rewrite

* rename filter to muteStrategy

* rename file

* fix bg strategy match bug

* fix deadlock

* Update mute_strategy.go

* Update rule_helper.go

* use rule from cache

* add comment

* add IdentDeletedMuteStrategy

* rename strategy

* rename eventTags to tagsMap

Co-authored-by: ulricqin <ulricqin@qq.com>
2023-01-06 16:16:22 +08:00
kongfei605
954543a5b2 Parse rules without html escaper (#1345) 2023-01-06 11:02:36 +08:00
Ulric Qin
71a402c33c code refactor 2023-01-06 10:51:33 +08:00
Ulric Qin
e30a5a316f code refactor 2023-01-06 10:50:18 +08:00
jsp-kld
0c9b7de391 Dashboard for VMware (#1331)
by [vsphere-monitor](https://github.com/jsp-kld/vsphere-monitor)
2022-12-20 21:00:59 +08:00
Yening Qin
063b6f63df fix load prom options from database and add more log (#1330)
* add more log
* fix PromOptions set
2022-12-20 11:59:01 +08:00
30 changed files with 3724 additions and 1074 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,7 @@ ClusterName = "Default"
BusiGroupLabelKey = "busigroup"
# sleep x seconds, then start judge engine
EngineDelay = 60
EngineDelay = 30
DisableUsageReport = true

8
go.mod
View File

@@ -6,7 +6,7 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/gin-contrib/pprof v1.3.0
github.com/gin-gonic/gin v1.7.4
github.com/gin-gonic/gin v1.7.7
github.com/go-ldap/ldap/v3 v3.4.1
github.com/go-redis/redis/v9 v9.0.0-rc.1
github.com/gogo/protobuf v1.3.2
@@ -24,7 +24,7 @@ require (
github.com/prometheus/common v0.32.1
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/tidwall/gjson v1.14.0
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5
github.com/toolkits/pkg v1.3.3
github.com/urfave/cli/v2 v2.3.0
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
@@ -74,10 +74,10 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
go.uber.org/automaxprocs v1.4.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/text v0.3.8 // indirect
google.golang.org/appengine v1.6.6 // indirect
google.golang.org/genproto v0.0.0-20211007155348-82e027067bd4 // indirect
google.golang.org/grpc v1.41.0 // indirect

20
go.sum
View File

@@ -97,8 +97,8 @@ github.com/gin-contrib/pprof v1.3.0/go.mod h1:waMjT1H9b179t3CxuG1cV3DHpga6ybizwf
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.7.4 h1:QmUZXrvJ9qZ3GfWvQ+2wnW/1ePrTEJqPKMYEU3lD/DM=
github.com/gin-gonic/gin v1.7.4/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
github.com/go-asn1-ber/asn1-ber v1.5.1 h1:pDbRAunXzIUXfx4CB2QJFv5IuPiuoW+sWvr/Us009o8=
github.com/go-asn1-ber/asn1-ber v1.5.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
@@ -372,8 +372,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5 h1:kMCwr2gNHjHEVgw+uNVdiPbGadj4TekbIfrTXElZeI0=
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5/go.mod h1:PvTBg/UxazPgBz6VaCM7FM7kJldjfVrsuN6k4HT/VuY=
github.com/toolkits/pkg v1.3.3 h1:qpQAQ18Jr47dv4NcBALlH0ad7L2PuqSh5K+nJKNg5lU=
github.com/toolkits/pkg v1.3.3/go.mod h1:USXArTJlz1f1DCnQHNPYugO8GPkr1NRhP4eYQZQVshk=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
@@ -383,6 +383,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
@@ -415,8 +416,9 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -447,6 +449,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -499,6 +502,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -544,10 +548,12 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -556,8 +562,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -608,6 +615,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -3,9 +3,9 @@ package models
import (
"bytes"
"fmt"
"html/template"
"strconv"
"strings"
"text/template"
"github.com/didi/nightingale/v5/src/pkg/tplx"
)
@@ -77,7 +77,7 @@ func (e *AlertCurEvent) ParseRule(field string) error {
}
text := strings.Join(append(defs, f), "")
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(tplx.TemplateFuncMap).Parse(text)
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(template.FuncMap(tplx.TemplateFuncMap)).Parse(text)
if err != nil {
return err
}

View File

@@ -136,7 +136,9 @@ func (m *AlertMute) Add() error {
if err := m.Verify(); err != nil {
return err
}
m.CreateAt = time.Now().Unix()
now := time.Now().Unix()
m.CreateAt = now
m.UpdateAt = now
return Insert(m)
}

View File

@@ -14,45 +14,50 @@ import (
)
type AlertRule struct {
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Name string `json:"name"` // rule name
Note string `json:"note"` // will sent in notify
Prod string `json:"prod"` // product empty means n9e
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` //
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
PromQl string `json:"prom_ql"` // just one ql
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
EnableStime string `json:"enable_stime"` // e.g. 00:00
EnableEtime string `json:"enable_etime"` // e.g. 23:59
EnableDaysOfWeek string `json:"-"` // split by space: 0 1 2 3 4 5 6
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // split by space: 233 43
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
RecoverDuration int64 `json:"recover_duration"` // unit: s
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"` // sop url
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
Id int64 `json:"id" gorm:"primaryKey"`
GroupId int64 `json:"group_id"` // busi group id
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
Name string `json:"name"` // rule name
Note string `json:"note"` // will sent in notify
Prod string `json:"prod"` // product empty means n9e
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` //
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
PromQl string `json:"prom_ql"` // just one ql
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
EnableStime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableStimeJSON string `json:"enable_stime" gorm:"-"` // for fe
EnableStimesJSON []string `json:"enable_stimes" gorm:"-"` // for fe
EnableEtime string `json:"-"` // split by space: "00:00 10:00 12:00"
EnableEtimeJSON string `json:"enable_etime" gorm:"-"` // for fe
EnableEtimesJSON []string `json:"enable_etimes" gorm:"-"` // for fe
EnableDaysOfWeek string `json:"-"` // eg: "0 1 2 3 4 5 6 ; 0 1 2"
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
EnableDaysOfWeeksJSON [][]string `json:"enable_days_of_weeks" gorm:"-"` // for fe
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // split by space: 233 43
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
RecoverDuration int64 `json:"recover_duration"` // unit: s
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"` // sop url
AppendTags string `json:"-"` // split by space: service=n9e mod=api
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"`
UpdateBy string `json:"update_by"`
}
func (ar *AlertRule) TableName() string {
@@ -224,7 +229,29 @@ func (ar *AlertRule) FillNotifyGroups(cache map[int64]*UserGroup) error {
}
func (ar *AlertRule) FE2DB() error {
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeekJSON, " ")
if len(ar.EnableStimesJSON) > 0 {
ar.EnableStime = strings.Join(ar.EnableStimesJSON, " ")
ar.EnableEtime = strings.Join(ar.EnableEtimesJSON, " ")
} else {
ar.EnableStime = ar.EnableStimeJSON
ar.EnableEtime = ar.EnableEtimeJSON
}
if len(ar.EnableDaysOfWeeksJSON) > 0 {
for i := 0; i < len(ar.EnableDaysOfWeeksJSON); i++ {
if len(ar.EnableDaysOfWeeksJSON) == 1 {
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeeksJSON[i], " ")
} else {
if i == len(ar.EnableDaysOfWeeksJSON)-1 {
ar.EnableDaysOfWeek += strings.Join(ar.EnableDaysOfWeeksJSON[i], " ")
} else {
ar.EnableDaysOfWeek += strings.Join(ar.EnableDaysOfWeeksJSON[i], " ") + ";"
}
}
}
} else {
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeekJSON, " ")
}
ar.NotifyChannels = strings.Join(ar.NotifyChannelsJSON, " ")
ar.NotifyGroups = strings.Join(ar.NotifyGroupsJSON, " ")
ar.Callbacks = strings.Join(ar.CallbacksJSON, " ")
@@ -239,7 +266,21 @@ func (ar *AlertRule) FE2DB() error {
}
func (ar *AlertRule) DB2FE() {
ar.EnableDaysOfWeekJSON = strings.Fields(ar.EnableDaysOfWeek)
ar.EnableStimesJSON = strings.Fields(ar.EnableStime)
ar.EnableEtimesJSON = strings.Fields(ar.EnableEtime)
if len(ar.EnableEtimesJSON) > 0 {
ar.EnableStimeJSON = ar.EnableStimesJSON[0]
ar.EnableEtimeJSON = ar.EnableEtimesJSON[0]
}
cache := strings.Split(ar.EnableDaysOfWeek, ";")
for i := 0; i < len(cache); i++ {
ar.EnableDaysOfWeeksJSON = append(ar.EnableDaysOfWeeksJSON, strings.Fields(cache[i]))
}
if len(ar.EnableDaysOfWeeksJSON) > 0 {
ar.EnableDaysOfWeekJSON = ar.EnableDaysOfWeeksJSON[0]
}
ar.NotifyChannelsJSON = strings.Fields(ar.NotifyChannels)
ar.NotifyGroupsJSON = strings.Fields(ar.NotifyGroups)
ar.CallbacksJSON = strings.Fields(ar.Callbacks)
@@ -425,3 +466,38 @@ func AlertRuleStatistics(cluster string) (*Statistics, error) {
return stats[0], nil
}
func (ar *AlertRule) IsPrometheusRule() bool {
return ar.Algorithm == "" && (ar.Cate == "" || strings.ToLower(ar.Cate) == "prometheus")
}
func (ar *AlertRule) GenerateNewEvent() *AlertCurEvent {
event := &AlertCurEvent{}
ar.UpdateEvent(event)
return event
}
func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
if event == nil {
return
}
event.GroupId = ar.GroupId
event.Cate = ar.Cate
event.RuleId = ar.Id
event.RuleName = ar.Name
event.RuleNote = ar.Note
event.RuleProd = ar.Prod
event.RuleAlgo = ar.Algorithm
event.Severity = ar.Severity
event.PromForDuration = ar.PromForDuration
event.PromQl = ar.PromQl
event.PromEvalInterval = ar.PromEvalInterval
event.Callbacks = ar.Callbacks
event.CallbacksJSON = ar.CallbacksJSON
event.RunbookUrl = ar.RunbookUrl
event.NotifyRecovered = ar.NotifyRecovered
event.NotifyChannels = ar.NotifyChannels
event.NotifyChannelsJSON = ar.NotifyChannelsJSON
event.NotifyGroups = ar.NotifyGroups
event.NotifyGroupsJSON = ar.NotifyGroupsJSON
}

View File

@@ -1,7 +1,9 @@
package conv
import (
"fmt"
"math"
"strings"
"github.com/prometheus/common/model"
)
@@ -13,6 +15,12 @@ type Vector struct {
Value float64 `json:"value"`
}
func (v *Vector) ReadableValue() string {
ret := fmt.Sprintf("%.5f", v.Value)
ret = strings.TrimRight(ret, "0")
return strings.TrimRight(ret, ".")
}
func ConvertVectors(value model.Value) (lst []Vector) {
if value == nil {
return

View File

@@ -1,8 +1,10 @@
package config
import (
"strings"
"sync"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
)
@@ -11,9 +13,12 @@ type PromClientMap struct {
Clients map[string]prom.API
}
var ReaderClients *PromClientMap = &PromClientMap{Clients: make(map[string]prom.API)}
var ReaderClients = &PromClientMap{Clients: make(map[string]prom.API)}
func (pc *PromClientMap) Set(clusterName string, c prom.API) {
if c == nil {
return
}
pc.Lock()
defer pc.Unlock()
pc.Clients[clusterName] = c
@@ -38,10 +43,6 @@ func (pc *PromClientMap) GetCli(cluster string) prom.API {
}
func (pc *PromClientMap) IsNil(cluster string) bool {
if pc == nil {
return true
}
pc.RLock()
defer pc.RUnlock()
@@ -53,6 +54,30 @@ func (pc *PromClientMap) IsNil(cluster string) bool {
return c == nil
}
// Hit 根据当前有效的cluster和规则的cluster配置计算有效的cluster列表
func (pc *PromClientMap) Hit(cluster string) []string {
pc.RLock()
defer pc.RUnlock()
clusters := make([]string, 0, len(pc.Clients))
if cluster == models.ClusterAll {
for c := range pc.Clients {
clusters = append(clusters, c)
}
return clusters
}
ruleClusters := strings.Fields(cluster)
for c := range pc.Clients {
for _, rc := range ruleClusters {
if rc == c {
clusters = append(clusters, c)
continue
}
}
}
return clusters
}
func (pc *PromClientMap) Reset() {
pc.Lock()
defer pc.Unlock()

View File

@@ -1,6 +1,10 @@
package config
import "sync"
import (
"sync"
"github.com/didi/nightingale/v5/src/pkg/tls"
)
type PromOption struct {
ClusterName string
@@ -11,6 +15,9 @@ type PromOption struct {
Timeout int64
DialTimeout int64
UseTLS bool
tls.ClientConfig
MaxIdleConnsPerHost int
Headers []string
@@ -65,12 +72,6 @@ func (pos *PromOptionsStruct) Set(clusterName string, po PromOption) {
pos.Unlock()
}
func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) {
pos.Lock()
pos.Data = map[string]PromOption{clusterName: po}
pos.Unlock()
}
func (pos *PromOptionsStruct) Del(clusterName string) {
pos.Lock()
delete(pos.Data, clusterName)

View File

@@ -73,7 +73,7 @@ func loadFromDatabase() {
}
if cval == "" {
logger.Warningf("ckey: %s is empty", ckey)
logger.Debugf("ckey: %s is empty", ckey)
continue
}
@@ -92,7 +92,7 @@ func loadFromDatabase() {
}
logger.Info("setClientFromPromOption success: ", cluster)
PromOptions.Sets(cluster, po)
PromOptions.Set(cluster, po)
continue
}
@@ -103,7 +103,7 @@ func loadFromDatabase() {
continue
}
PromOptions.Sets(cluster, po)
PromOptions.Set(cluster, po)
}
}
@@ -119,17 +119,28 @@ func loadFromDatabase() {
}
func newClientFromPromOption(po PromOption) (api.Client, error) {
transport := &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
}
if po.UseTLS {
tlsConfig, err := po.TLSConfig()
if err != nil {
logger.Errorf("new cluster %s fail: %v", po.Url, err)
return nil, err
}
transport.TLSClientConfig = tlsConfig
}
return api.NewClient(api.Config{
Address: po.Url,
RoundTripper: &http.Transport{
// TLSClientConfig: tlsConfig,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
}).DialContext,
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
},
Address: po.Url,
RoundTripper: transport,
})
}
@@ -142,11 +153,17 @@ func setClientFromPromOption(clusterName string, po PromOption) error {
return fmt.Errorf("prometheus url is blank")
}
if strings.HasPrefix(po.Url, "https") {
po.UseTLS = true
po.InsecureSkipVerify = true
}
cli, err := newClientFromPromOption(po)
if err != nil {
return fmt.Errorf("failed to newClientFromPromOption: %v", err)
}
logger.Debugf("setClientFromPromOption: %s, %+v", clusterName, po)
ReaderClients.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
BasicAuthUser: po.BasicAuthUser,
BasicAuthPass: po.BasicAuthPass,

View File

@@ -76,9 +76,10 @@ func persist(event *models.AlertCurEvent) {
// 不管是告警还是恢复,全量告警里都要记录
if err := his.Add(); err != nil {
logger.Errorf(
"event_persist_his_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
@@ -101,9 +102,10 @@ func persist(event *models.AlertCurEvent) {
if event.Id > 0 {
if err := event.Add(); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,
@@ -126,9 +128,10 @@ func persist(event *models.AlertCurEvent) {
if event.Id > 0 {
if err := event.Add(); err != nil {
logger.Errorf(
"event_persist_cur_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
err,
event.RuleId,
event.Cluster,
event.Hash,
event.TagsJSON,
event.TriggerTime,

View File

@@ -1,33 +0,0 @@
package engine
import (
"strconv"
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
)
func isNoneffective(timestamp int64, alertRule *models.AlertRule) bool {
if alertRule.Disabled == 1 {
return true
}
tm := time.Unix(timestamp, 0)
triggerTime := tm.Format("15:04")
triggerWeek := strconv.Itoa(int(tm.Weekday()))
if alertRule.EnableStime <= alertRule.EnableEtime {
if triggerTime < alertRule.EnableStime || triggerTime > alertRule.EnableEtime {
return true
}
} else {
if triggerTime < alertRule.EnableStime && triggerTime > alertRule.EnableEtime {
return true
}
}
alertRule.EnableDaysOfWeek = strings.Replace(alertRule.EnableDaysOfWeek, "7", "0", 1)
return !strings.Contains(alertRule.EnableDaysOfWeek, triggerWeek)
}

View File

@@ -5,13 +5,15 @@ import (
"fmt"
"time"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/server/common/sender"
"github.com/didi/nightingale/v5/src/server/config"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/toolkits/pkg/container/list"
"github.com/toolkits/pkg/logger"
)
var EventQueue = list.NewSafeListLimited(10000000)
func Start(ctx context.Context) error {
err := reloadTpls()
if err != nil {
@@ -22,7 +24,9 @@ func Start(ctx context.Context) error {
go loopConsume(ctx)
// filter my rules and start worker
go loopFilterRules(ctx)
//go loopFilterRules(ctx)
go ruleHolder.LoopSyncRules(ctx)
go reportQueueSize()

View File

@@ -17,11 +17,12 @@ func LogEvent(event *models.AlertCurEvent, location string, err ...error) {
}
logger.Infof(
"event(%s %s) %s: rule_id=%d %v%s@%d %s",
"event(%s %s) %s: rule_id=%d cluster:%s %v%s@%d %s",
event.Hash,
status,
location,
event.RuleId,
event.Cluster,
event.TagsJSON,
event.TriggerValue,
event.TriggerTime,

View File

@@ -1,90 +0,0 @@
package engine
import (
"strings"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/memsto"
)
// 如果传入了clock这个可选参数就表示使用这个clock表示的时间否则就从event的字段中取TriggerTime
func IsMuted(event *models.AlertCurEvent, clock ...int64) bool {
mutes, has := memsto.AlertMuteCache.Gets(event.GroupId)
if !has || len(mutes) == 0 {
return false
}
for i := 0; i < len(mutes); i++ {
if matchMute(event, mutes[i], clock...) {
return true
}
}
return false
}
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
if mute.Disabled == 1 {
return false
}
ts := event.TriggerTime
if len(clock) > 0 {
ts = clock[0]
}
// 如果不是全局的,判断 cluster
if mute.Cluster != models.ClusterAll {
// event.Cluster 是一个字符串可能是多个cluster的组合比如"cluster1 cluster2"
clusters := strings.Fields(mute.Cluster)
cm := make(map[string]struct{}, len(clusters))
for i := 0; i < len(clusters); i++ {
cm[clusters[i]] = struct{}{}
}
// 判断event.Cluster是否包含在cm中
if _, has := cm[event.Cluster]; !has {
return false
}
}
if ts < mute.Btime || ts > mute.Etime {
return false
}
return matchTags(event.TagsMap, mute.ITags)
}
func matchTag(value string, filter models.TagFilter) bool {
switch filter.Func {
case "==":
return filter.Value == value
case "!=":
return filter.Value != value
case "in":
_, has := filter.Vset[value]
return has
case "not in":
_, has := filter.Vset[value]
return !has
case "=~":
return filter.Regexp.MatchString(value)
case "!~":
return !filter.Regexp.MatchString(value)
}
// unexpect func
return false
}
func matchTags(eventTagsMap map[string]string, itags []models.TagFilter) bool {
for _, filter := range itags {
value, has := eventTagsMap[filter.Key]
if !has {
return false
}
if !matchTag(value, filter) {
return false
}
}
return true
}

View File

@@ -0,0 +1,202 @@
package engine
import (
"strconv"
"strings"
"time"
"github.com/toolkits/pkg/logger"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/memsto"
)
var AlertMuteStrategies = AlertMuteStrategiesType{
&TimeNonEffectiveMuteStrategy{},
&IdentNotExistsMuteStrategy{},
&BgNotMatchMuteStrategy{},
&EventMuteStrategy{},
}
type AlertMuteStrategiesType []AlertMuteStrategy
func (ss AlertMuteStrategiesType) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
for _, s := range ss {
if s.IsMuted(rule, event) {
logger.Debugf("[%T] mute: rule:%+v event:%+v", s, rule, event)
return true
}
}
return false
}
// AlertMuteStrategy 是过滤event的抽象,当返回true时,表示该告警时间由于某些原因不需要告警
type AlertMuteStrategy interface {
IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool
}
// TimeNonEffectiveMuteStrategy 根据规则配置的告警时间过滤,如果产生的告警不在规则配置的告警时间内,则不告警
type TimeNonEffectiveMuteStrategy struct{}
func (s *TimeNonEffectiveMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
if rule.Disabled == 1 {
logger.Debugf("[%T] mute: rule_disabled:%d cluster:%s", s, rule.Id, event.Cluster)
return true
}
tm := time.Unix(event.TriggerTime, 0)
triggerTime := tm.Format("15:04")
triggerWeek := strconv.Itoa(int(tm.Weekday()))
enableStime := strings.Fields(rule.EnableStime)
enableEtime := strings.Fields(rule.EnableEtime)
enableDaysOfWeek := strings.Split(rule.EnableDaysOfWeek, ";")
length := len(enableDaysOfWeek)
// enableStime,enableEtime,enableDaysOfWeek三者长度肯定相同这里循环一个即可
for i := 0; i < length; i++ {
enableDaysOfWeek[i] = strings.Replace(enableDaysOfWeek[i], "7", "0", 1)
if !strings.Contains(enableDaysOfWeek[i], triggerWeek) {
continue
}
if enableStime[i] <= enableEtime[i] {
if triggerTime < enableStime[i] || triggerTime > enableEtime[i] {
continue
}
} else {
if triggerTime < enableStime[i] && triggerTime > enableEtime[i] {
continue
}
}
// 到这里说明当前时刻在告警规则的某组生效时间范围内,直接返回 false
return false
}
return true
}
// IdentNotExistsMuteStrategy 根据ident是否存在过滤,如果ident不存在,则target_up的告警直接过滤掉
type IdentNotExistsMuteStrategy struct{}
func (s *IdentNotExistsMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
ident, has := event.TagsMap["ident"]
if !has {
return false
}
_, exists := memsto.TargetCache.Get(ident)
// 如果是target_up的告警,且ident已经不存在了,直接过滤掉
// 这里的判断有点太粗暴了,但是目前没有更好的办法
if !exists && strings.Contains(rule.PromQl, "target_up") {
logger.Debugf("[%T] mute: rule_eval:%d cluster:%s ident:%s", s, rule.Id, event.Cluster, ident)
return true
}
return false
}
// BgNotMatchMuteStrategy 当规则开启只在bg内部告警时,对于非bg内部的机器过滤
type BgNotMatchMuteStrategy struct{}
func (s *BgNotMatchMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
// 没有开启BG内部告警,直接不过滤
if rule.EnableInBG == 0 {
return false
}
ident, has := event.TagsMap["ident"]
if !has {
return false
}
target, exists := memsto.TargetCache.Get(ident)
// 对于包含ident的告警事件check一下ident所属bg和rule所属bg是否相同
// 如果告警规则选择了只在本BG生效那其他BG的机器就不能因此规则产生告警
if exists && target.GroupId != rule.GroupId {
logger.Debugf("[%T] mute: rule_eval:%d cluster:%s", s, rule.Id, event.Cluster)
return true
}
return false
}
type EventMuteStrategy struct{}
var EventMuteStra = new(EventMuteStrategy)
func (s *EventMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
mutes, has := memsto.AlertMuteCache.Gets(event.GroupId)
if !has || len(mutes) == 0 {
return false
}
for i := 0; i < len(mutes); i++ {
if matchMute(event, mutes[i]) {
return true
}
}
return false
}
// matchMute 如果传入了clock这个可选参数就表示使用这个clock表示的时间否则就从event的字段中取TriggerTime
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
if mute.Disabled == 1 {
return false
}
ts := event.TriggerTime
if len(clock) > 0 {
ts = clock[0]
}
// 如果不是全局的,判断 cluster
if mute.Cluster != models.ClusterAll {
// mute.Cluster 是一个字符串可能是多个cluster的组合比如"cluster1 cluster2"
clusters := strings.Fields(mute.Cluster)
cm := make(map[string]struct{}, len(clusters))
for i := 0; i < len(clusters); i++ {
cm[clusters[i]] = struct{}{}
}
// 判断event.Cluster是否包含在cm中
if _, has := cm[event.Cluster]; !has {
return false
}
}
if ts < mute.Btime || ts > mute.Etime {
return false
}
return matchTags(event.TagsMap, mute.ITags)
}
func matchTag(value string, filter models.TagFilter) bool {
switch filter.Func {
case "==":
return filter.Value == value
case "!=":
return filter.Value != value
case "in":
_, has := filter.Vset[value]
return has
case "not in":
_, has := filter.Vset[value]
return !has
case "=~":
return filter.Regexp.MatchString(value)
case "!~":
return !filter.Regexp.MatchString(value)
}
// unexpect func
return false
}
func matchTags(eventTagsMap map[string]string, itags []models.TagFilter) bool {
for _, filter := range itags {
value, has := eventTagsMap[filter.Key]
if !has {
return false
}
if !matchTag(value, filter) {
return false
}
}
return true
}

View File

@@ -1,5 +0,0 @@
package engine
import "github.com/toolkits/pkg/container/list"
var EventQueue = list.NewSafeListLimited(10000000)

166
src/server/engine/rule.go Normal file
View File

@@ -0,0 +1,166 @@
package engine
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
)
type RuleContext interface {
Key() string
Hash() string
Prepare()
Start()
Eval()
Stop()
}
var ruleHolder = &RuleHolder{
alertRules: make(map[string]RuleContext),
recordRules: make(map[string]RuleContext),
externalAlertRules: make(map[string]*AlertRuleContext),
}
type RuleHolder struct {
externalLock sync.RWMutex
// key: hash
alertRules map[string]RuleContext
// key: hash
recordRules map[string]RuleContext
// key: key
externalAlertRules map[string]*AlertRuleContext
}
func (rh *RuleHolder) LoopSyncRules(ctx context.Context) {
time.Sleep(time.Duration(config.C.EngineDelay) * time.Second)
duration := 9000 * time.Millisecond
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
rh.SyncAlertRules()
rh.SyncRecordRules()
}
}
}
func (rh *RuleHolder) SyncAlertRules() {
ids := memsto.AlertRuleCache.GetRuleIds()
alertRules := make(map[string]RuleContext)
externalAllRules := make(map[string]*AlertRuleContext)
for _, id := range ids {
rule := memsto.AlertRuleCache.Get(id)
if rule == nil {
continue
}
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule
if !rule.IsPrometheusRule() {
ruleClusters := strings.Fields(rule.Cluster)
for _, cluster := range ruleClusters {
// hash ring not hit
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
continue
}
externalRule := NewAlertRuleContext(rule, cluster)
externalAllRules[externalRule.Key()] = externalRule
}
continue
}
ruleClusters := config.ReaderClients.Hit(rule.Cluster)
for _, cluster := range ruleClusters {
// hash ring not hit
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
continue
}
alertRule := NewAlertRuleContext(rule, cluster)
alertRules[alertRule.Hash()] = alertRule
}
}
for hash, rule := range alertRules {
if _, has := rh.alertRules[hash]; !has {
rule.Prepare()
rule.Start()
rh.alertRules[hash] = rule
}
}
for hash, rule := range rh.alertRules {
if _, has := alertRules[hash]; !has {
rule.Stop()
delete(rh.alertRules, hash)
}
}
for hash, rule := range externalAllRules {
rh.externalLock.Lock()
if _, has := rh.externalAlertRules[hash]; !has {
rule.Prepare()
rh.externalAlertRules[hash] = rule
}
rh.externalLock.Unlock()
}
rh.externalLock.Lock()
for hash := range rh.externalAlertRules {
if _, has := externalAllRules[hash]; !has {
delete(rh.externalAlertRules, hash)
}
}
rh.externalLock.Unlock()
}
func (rh *RuleHolder) SyncRecordRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()
recordRules := make(map[string]RuleContext)
for _, id := range ids {
rule := memsto.RecordingRuleCache.Get(id)
if rule == nil {
continue
}
ruleClusters := config.ReaderClients.Hit(rule.Cluster)
for _, cluster := range ruleClusters {
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
continue
}
recordRule := NewRecordRuleContext(rule, cluster)
recordRules[recordRule.Hash()] = recordRule
}
}
for hash, rule := range recordRules {
if _, has := rh.recordRules[hash]; !has {
rule.Prepare()
rule.Start()
rh.recordRules[hash] = rule
}
}
for hash, rule := range rh.recordRules {
if _, has := recordRules[hash]; !has {
rule.Stop()
delete(rh.recordRules, hash)
}
}
}
func GetExternalAlertRule(cluster string, id int64) (*AlertRuleContext, bool) {
key := fmt.Sprintf("alert-%s-%d", cluster, id)
ruleHolder.externalLock.RLock()
defer ruleHolder.externalLock.RUnlock()
rule, has := ruleHolder.externalAlertRules[key]
return rule, has
}

View File

@@ -0,0 +1,300 @@
package engine
import (
"context"
"fmt"
"strings"
"time"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
type AlertRuleContext struct {
cluster string
quit chan struct{}
rule *models.AlertRule
fires *AlertCurEventMap
pendings *AlertCurEventMap
}
func NewAlertRuleContext(rule *models.AlertRule, cluster string) *AlertRuleContext {
return &AlertRuleContext{
cluster: cluster,
quit: make(chan struct{}),
rule: rule,
}
}
func (arc *AlertRuleContext) RuleFromCache() *models.AlertRule {
return memsto.AlertRuleCache.Get(arc.rule.Id)
}
func (arc *AlertRuleContext) Key() string {
return fmt.Sprintf("alert-%s-%d", arc.cluster, arc.rule.Id)
}
func (arc *AlertRuleContext) Hash() string {
return str.MD5(fmt.Sprintf("%d_%d_%s_%s",
arc.rule.Id,
arc.rule.PromEvalInterval,
arc.rule.PromQl,
arc.cluster,
))
}
func (arc *AlertRuleContext) Prepare() {
arc.recoverAlertCurEventFromDb()
}
func (arc *AlertRuleContext) Start() {
logger.Infof("eval:%s started", arc.Key())
interval := arc.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
go func() {
for {
select {
case <-arc.quit:
return
default:
arc.Eval()
time.Sleep(time.Duration(interval) * time.Second)
}
}
}()
}
func (arc *AlertRuleContext) Eval() {
promql := strings.TrimSpace(arc.rule.PromQl)
if promql == "" {
logger.Errorf("rule_eval:%s promql is blank", arc.Key())
return
}
if config.ReaderClients.IsNil(arc.cluster) {
logger.Errorf("rule_eval:%s error reader client is nil", arc.Key())
return
}
readerClient := config.ReaderClients.GetCli(arc.cluster)
var value model.Value
var err error
cachedRule := arc.RuleFromCache()
if cachedRule == nil {
logger.Errorf("rule_eval:%s rule not found", arc.Key())
return
}
// 如果是单个goroutine执行, 完全可以考虑把cachedRule赋值给arc.rule, 不会有问题
// 但是在externalRule的场景中, 会调用HandleVectors/RecoverSingle;就行不通了,还是在需要的时候从cache中拿rule吧
// arc.rule = cachedRule
// 如果cache中的规则由prometheus规则改为其他类型也没必要再去prometheus查询了
if cachedRule.IsPrometheusRule() {
var warnings prom.Warnings
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%s promql:%s, error:%v", arc.Key(), promql, err)
//notifyToMaintainer(err, "failed to query prometheus")
Report(QueryPrometheusError)
return
}
if len(warnings) > 0 {
logger.Errorf("rule_eval:%s promql:%s, warnings:%v", arc.Key(), promql, warnings)
return
}
logger.Debugf("rule_eval:%s promql:%s, value:%v", arc.Key(), promql, value)
}
arc.HandleVectors(conv.ConvertVectors(value), "inner")
}
func (arc *AlertRuleContext) HandleVectors(vectors []conv.Vector, from string) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
cachedRule := arc.RuleFromCache()
if cachedRule == nil {
logger.Errorf("rule_eval:%s rule not found", arc.Key())
return
}
now := time.Now().Unix()
alertingKeys := map[string]struct{}{}
for _, vector := range vectors {
alertVector := NewAlertVector(arc, cachedRule, vector, from)
event := alertVector.BuildEvent(now)
// 如果event被mute了,本质也是fire的状态,这里无论如何都添加到alertingKeys中,防止fire的事件自动恢复了
alertingKeys[alertVector.Hash()] = struct{}{}
if AlertMuteStrategies.IsMuted(cachedRule, event) {
logger.Debugf("rule_eval:%s event:%+v is muted", arc.Key(), event)
continue
}
arc.handleEvent(event)
}
arc.HandleRecover(alertingKeys, now)
}
func (arc *AlertRuleContext) HandleRecover(alertingKeys map[string]struct{}, now int64) {
for _, hash := range arc.pendings.Keys() {
if _, has := alertingKeys[hash]; has {
continue
}
arc.pendings.Delete(hash)
}
for hash := range arc.fires.GetAll() {
if _, has := alertingKeys[hash]; has {
continue
}
arc.RecoverSingle(hash, now, nil)
}
}
func (arc *AlertRuleContext) RecoverSingle(hash string, now int64, value *string) {
cachedRule := arc.RuleFromCache()
if cachedRule == nil {
logger.Errorf("rule_eval:%s rule not found", arc.Key())
return
}
event, has := arc.fires.Get(hash)
if !has {
return
}
// 如果配置了留观时长,就不能立马恢复了
if cachedRule.RecoverDuration > 0 && now-event.LastEvalTime < cachedRule.RecoverDuration {
return
}
if value != nil {
event.TriggerValue = *value
}
// 没查到触发阈值的vector姑且就认为这个vector的值恢复了
// 我确实无法分辨是prom中有值但是未满足阈值所以没返回还是prom中确实丢了一些点导致没有数据可以返回尴尬
arc.fires.Delete(hash)
arc.pendings.Delete(hash)
// 可能是因为调整了promql才恢复的所以事件里边要体现最新的promql否则用户会比较困惑
// 当然其实rule的各个字段都可能发生变化了都更新一下吧
cachedRule.UpdateEvent(event)
event.IsRecovered = true
event.LastEvalTime = now
arc.pushEventToQueue(event)
}
func (arc *AlertRuleContext) handleEvent(event *models.AlertCurEvent) {
if event == nil {
logger.Debugf("rule_eval:%s event:%+v is nil", arc.Key(), event)
return
}
if event.PromForDuration == 0 {
arc.fireEvent(event)
return
}
var preTriggerTime int64
preEvent, has := arc.pendings.Get(event.Hash)
if has {
arc.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
preTriggerTime = preEvent.TriggerTime
} else {
arc.pendings.Set(event.Hash, event)
preTriggerTime = event.TriggerTime
}
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
arc.fireEvent(event)
}
}
func (arc *AlertRuleContext) fireEvent(event *models.AlertCurEvent) {
// As arc.rule maybe outdated, use rule from cache
cachedRule := arc.RuleFromCache()
if cachedRule == nil {
logger.Errorf("rule_eval:%s event:%+v is nil", arc.Key(), event)
return
}
if fired, has := arc.fires.Get(event.Hash); has {
arc.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
if cachedRule.NotifyRepeatStep == 0 {
// 说明不想重复通知那就直接返回了nothing to do
logger.Debugf("rule_eval:%s event:%+v nothing to do", arc.Key(), event)
return
}
// 之前发送过告警了,这次是否要继续发送,要看是否过了通道静默时间
if event.LastEvalTime > fired.LastSentTime+int64(cachedRule.NotifyRepeatStep)*60 {
if cachedRule.NotifyMaxNumber == 0 {
// 最大可以发送次数如果是0表示不想限制最大发送次数一直发即可
event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
arc.pushEventToQueue(event)
} else {
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
if fired.NotifyCurNumber >= cachedRule.NotifyMaxNumber {
logger.Debugf("rule_eval:%s event:%+v notify to max number", arc.Key(), event)
return
} else {
event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
arc.pushEventToQueue(event)
}
}
}
} else {
event.NotifyCurNumber = 1
event.FirstTriggerTime = event.TriggerTime
arc.pushEventToQueue(event)
}
}
func (arc *AlertRuleContext) pushEventToQueue(event *models.AlertCurEvent) {
if !event.IsRecovered {
event.LastSentTime = event.LastEvalTime
arc.fires.Set(event.Hash, event)
}
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
LogEvent(event, "push_queue")
if !EventQueue.PushFront(event) {
logger.Warningf("event_push_queue: queue is full, event:%+v", event)
}
}
func (arc *AlertRuleContext) Stop() {
logger.Infof("%s stopped", arc.Key())
close(arc.quit)
}
func (arc *AlertRuleContext) recoverAlertCurEventFromDb() {
arc.pendings = NewAlertCurEventMap(nil)
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(arc.rule.Id, arc.cluster)
if err != nil {
logger.Errorf("recover event from db for rule:%s failed, err:%s", arc.Key(), err)
arc.fires = NewAlertCurEventMap(nil)
return
}
fireMap := make(map[string]*models.AlertCurEvent)
for _, event := range curEvents {
event.DB2Mem()
fireMap[event.Hash] = event
}
arc.fires = NewAlertCurEventMap(fireMap)
}

View File

@@ -0,0 +1,189 @@
package engine
import (
"fmt"
"sort"
"strings"
"sync"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/memsto"
)
type AlertCurEventMap struct {
sync.RWMutex
Data map[string]*models.AlertCurEvent
}
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data = data
}
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data[key] = value
}
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
a.RLock()
defer a.RUnlock()
event, exists := a.Data[key]
return event, exists
}
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
a.Lock()
defer a.Unlock()
event, exists := a.Data[key]
if !exists {
return
}
event.LastEvalTime = lastEvalTime
}
func (a *AlertCurEventMap) Delete(key string) {
a.Lock()
defer a.Unlock()
delete(a.Data, key)
}
func (a *AlertCurEventMap) Keys() []string {
a.RLock()
defer a.RUnlock()
keys := make([]string, 0, len(a.Data))
for k := range a.Data {
keys = append(keys, k)
}
return keys
}
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
a.RLock()
defer a.RUnlock()
return a.Data
}
func NewAlertCurEventMap(data map[string]*models.AlertCurEvent) *AlertCurEventMap {
if data == nil {
return &AlertCurEventMap{
Data: make(map[string]*models.AlertCurEvent),
}
}
return &AlertCurEventMap{
Data: data,
}
}
// AlertVector 包含一个告警事件的告警上下文
type AlertVector struct {
Ctx *AlertRuleContext
Rule *models.AlertRule
Vector conv.Vector
From string
tagsMap map[string]string
tagsArr []string
target string
targetNote string
groupName string
}
func NewAlertVector(ctx *AlertRuleContext, rule *models.AlertRule, vector conv.Vector, from string) *AlertVector {
if rule == nil {
rule = ctx.rule
}
av := &AlertVector{
Ctx: ctx,
Rule: rule,
Vector: vector,
From: from,
}
av.fillTags()
av.mayHandleIdent()
av.mayHandleGroup()
return av
}
func (av *AlertVector) Hash() string {
return str.MD5(fmt.Sprintf("%d_%s_%s", av.Rule.Id, av.Vector.Key, av.Ctx.cluster))
}
func (av *AlertVector) fillTags() {
// handle series tags
tagsMap := make(map[string]string)
for label, value := range av.Vector.Labels {
tagsMap[string(label)] = string(value)
}
// handle rule tags
for _, tag := range av.Rule.AppendTagsJSON {
arr := strings.SplitN(tag, "=", 2)
tagsMap[arr[0]] = arr[1]
}
tagsMap["rulename"] = av.Rule.Name
av.tagsMap = tagsMap
// handle tagsArr
av.tagsArr = labelMapToArr(tagsMap)
}
func (av *AlertVector) mayHandleIdent() {
// handle ident
if ident, has := av.tagsMap["ident"]; has {
if target, exists := memsto.TargetCache.Get(ident); exists {
av.target = target.Ident
av.targetNote = target.Note
}
}
}
func (av *AlertVector) mayHandleGroup() {
// handle bg
bg := memsto.BusiGroupCache.GetByBusiGroupId(av.Rule.GroupId)
if bg != nil {
av.groupName = bg.Name
}
}
func (av *AlertVector) BuildEvent(now int64) *models.AlertCurEvent {
event := av.Rule.GenerateNewEvent()
event.TriggerTime = av.Vector.Timestamp
event.TagsMap = av.tagsMap
event.Cluster = av.Ctx.cluster
event.Hash = av.Hash()
event.TargetIdent = av.target
event.TargetNote = av.targetNote
event.TriggerValue = av.Vector.ReadableValue()
event.TagsJSON = av.tagsArr
event.GroupName = av.groupName
event.Tags = strings.Join(av.tagsArr, ",,")
event.IsRecovered = false
if av.From == "inner" {
event.LastEvalTime = now
} else {
event.LastEvalTime = event.TriggerTime
}
return event
}
func labelMapToArr(m map[string]string) []string {
numLabels := len(m)
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", label, value))
}
if numLabels > 1 {
sort.Strings(labelStrings)
}
return labelStrings
}

View File

@@ -0,0 +1,100 @@
package engine
import (
"context"
"fmt"
"strings"
"time"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/writer"
)
type RecordRuleContext struct {
cluster string
quit chan struct{}
rule *models.RecordingRule
}
func NewRecordRuleContext(rule *models.RecordingRule, cluster string) *RecordRuleContext {
return &RecordRuleContext{
cluster: cluster,
quit: make(chan struct{}),
rule: rule,
}
}
func (rrc *RecordRuleContext) Key() string {
return fmt.Sprintf("record-%s-%d", rrc.cluster, rrc.rule.Id)
}
func (rrc *RecordRuleContext) Hash() string {
return str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rrc.rule.Id,
rrc.rule.PromEvalInterval,
rrc.rule.PromQl,
rrc.cluster,
))
}
func (rrc *RecordRuleContext) Prepare() {}
func (rrc *RecordRuleContext) Start() {
logger.Infof("eval:%s started", rrc.Key())
interval := rrc.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
go func() {
for {
select {
case <-rrc.quit:
return
default:
rrc.Eval()
time.Sleep(time.Duration(interval) * time.Second)
}
}
}()
}
func (rrc *RecordRuleContext) Eval() {
promql := strings.TrimSpace(rrc.rule.PromQl)
if promql == "" {
logger.Errorf("eval:%s promql is blank", rrc.Key())
return
}
if config.ReaderClients.IsNil(rrc.cluster) {
logger.Errorf("eval:%s reader client is nil", rrc.Key())
return
}
value, warnings, err := config.ReaderClients.GetCli(rrc.cluster).Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("eval:%d promql:%s, error:%v", rrc.Key(), promql, err)
return
}
if len(warnings) > 0 {
logger.Errorf("eval:%d promql:%s, warnings:%v", rrc.Key(), promql, warnings)
return
}
ts := conv.ConvertToTimeSeries(value, rrc.rule)
if len(ts) != 0 {
for _, v := range ts {
writer.Writers.PushSample(rrc.rule.Name, v, rrc.cluster)
}
}
}
func (rrc *RecordRuleContext) Stop() {
logger.Infof("%s stopped", rrc.Key())
close(rrc.quit)
}

View File

@@ -1,830 +0,0 @@
package engine
import (
"context"
"fmt"
"log"
"sort"
"strings"
"sync"
"time"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/prometheus/common/model"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/prom"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/memsto"
"github.com/didi/nightingale/v5/src/server/naming"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
func loopFilterRules(ctx context.Context) {
// wait for samples
time.Sleep(time.Duration(config.C.EngineDelay) * time.Second)
duration := time.Duration(9000) * time.Millisecond
for {
select {
case <-ctx.Done():
return
case <-time.After(duration):
filterRules()
filterRecordingRules()
}
}
}
// 一个规则可能会在多个集群中生效,所以这里要把规则拆分成多个,此结构记录 id 和 cluster 的对应关系
type RuleSimpleInfo struct {
Id int64
Cluster string
}
func filterRules() {
ids := memsto.AlertRuleCache.GetRuleIds()
logger.Debugf("AlertRuleCache.GetRuleIds success, ids.len: %d", len(ids))
count := len(ids)
mines := make([]*RuleSimpleInfo, 0, count)
for i := 0; i < count; i++ {
rule := memsto.AlertRuleCache.Get(ids[i])
if rule == nil {
logger.Debugf("AlertRuleCache.Get(%d) failed", ids[i])
continue
}
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}
for _, cluster := range clusters {
if config.ReaderClients.IsNil(cluster) {
// 没有这个集群的配置,跳过
continue
}
node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
if err != nil {
logger.Warningf("rid:%d cluster:%s failed to get node from hashring:%v", ids[i], cluster, err)
continue
}
if node == config.C.Heartbeat.Endpoint {
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
}
}
}
Workers.Build(mines)
RuleEvalForExternal.Build()
}
type RuleEval struct {
cluster string
rule *models.AlertRule
fires *AlertCurEventMap
pendings *AlertCurEventMap
quit chan struct{}
}
type AlertCurEventMap struct {
sync.RWMutex
Data map[string]*models.AlertCurEvent
}
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data = data
}
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
a.Lock()
defer a.Unlock()
a.Data[key] = value
}
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
a.RLock()
defer a.RUnlock()
event, exists := a.Data[key]
return event, exists
}
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
a.Lock()
defer a.Unlock()
event, exists := a.Data[key]
if !exists {
return
}
event.LastEvalTime = lastEvalTime
}
func (a *AlertCurEventMap) Delete(key string) {
a.Lock()
defer a.Unlock()
delete(a.Data, key)
}
func (a *AlertCurEventMap) Keys() []string {
a.RLock()
defer a.RUnlock()
keys := make([]string, 0, len(a.Data))
for k := range a.Data {
keys = append(keys, k)
}
return keys
}
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
a.RLock()
defer a.RUnlock()
return a.Data
}
func NewAlertCurEventMap() *AlertCurEventMap {
return &AlertCurEventMap{
Data: make(map[string]*models.AlertCurEvent),
}
}
func (r *RuleEval) Stop() {
logger.Infof("rule_eval:%d stopping", r.RuleID())
close(r.quit)
}
func (r *RuleEval) RuleID() int64 {
return r.rule.Id
}
func (r *RuleEval) Start() {
logger.Infof("rule_eval:%d started", r.RuleID())
for {
select {
case <-r.quit:
// logger.Infof("rule_eval:%d stopped", r.RuleID())
return
default:
r.Work()
logger.Debugf("rule executed, rule_eval:%d", r.RuleID())
interval := r.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
time.Sleep(time.Duration(interval) * time.Second)
}
}
}
func (r *RuleEval) Work() {
promql := strings.TrimSpace(r.rule.PromQl)
if promql == "" {
logger.Errorf("rule_eval:%d promql is blank", r.RuleID())
return
}
if config.ReaderClients.IsNil(r.cluster) {
logger.Error("reader client is nil")
return
}
readerClient := config.ReaderClients.GetCli(r.cluster)
var value model.Value
var err error
if r.rule.Algorithm == "" && (r.rule.Cate == "" || strings.ToLower(r.rule.Cate) == "prometheus") {
var warnings prom.Warnings
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
//notifyToMaintainer(err, "failed to query prometheus")
Report(QueryPrometheusError)
return
}
if len(warnings) > 0 {
logger.Errorf("rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
return
}
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
}
r.Judge(r.cluster, conv.ConvertVectors(value))
}
type WorkersType struct {
rules map[string]*RuleEval
recordRules map[string]RecordingRuleEval
}
var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)}
func (ws *WorkersType) Build(ris []*RuleSimpleInfo) {
rules := make(map[string]*RuleSimpleInfo)
for i := 0; i < len(ris); i++ {
rule := memsto.AlertRuleCache.Get(ris[i].Id)
if rule == nil {
continue
}
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
ris[i].Cluster,
))
rules[hash] = ris[i]
}
// stop old
for hash := range Workers.rules {
if _, has := rules[hash]; !has {
Workers.rules[hash].Stop()
delete(Workers.rules, hash)
}
}
// start new
for hash := range rules {
if _, has := Workers.rules[hash]; has {
// already exists
continue
}
elst, err := models.AlertCurEventGetByRuleIdAndCluster(rules[hash].Id, rules[hash].Cluster)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
}
firemap := make(map[string]*models.AlertCurEvent)
for i := 0; i < len(elst); i++ {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
re := &RuleEval{
rule: memsto.AlertRuleCache.Get(rules[hash].Id),
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
cluster: rules[hash].Cluster,
}
go re.Start()
Workers.rules[hash] = re
}
}
func (ws *WorkersType) BuildRe(ris []*RuleSimpleInfo) {
rules := make(map[string]*RuleSimpleInfo)
for i := 0; i < len(ris); i++ {
rule := memsto.RecordingRuleCache.Get(ris[i].Id)
if rule == nil {
continue
}
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
ris[i].Cluster,
))
rules[hash] = ris[i]
}
// stop old
for hash := range Workers.recordRules {
if _, has := rules[hash]; !has {
Workers.recordRules[hash].Stop()
delete(Workers.recordRules, hash)
}
}
// start new
for hash := range rules {
if _, has := Workers.recordRules[hash]; has {
// already exists
continue
}
re := RecordingRuleEval{
rule: memsto.RecordingRuleCache.Get(rules[hash].Id),
quit: make(chan struct{}),
cluster: rules[hash].Cluster,
}
go re.Start()
Workers.recordRules[hash] = re
}
}
func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) {
now := time.Now().Unix()
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
if !ruleExists {
return
}
// handle recovered events
r.recoverRule(alertingKeys, now)
}
func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vectors []conv.Vector) (map[string]struct{}, bool) {
// 有可能rule的一些配置已经发生变化比如告警接收人、callbacks等
// 这些信息的修改是不会引起worker restart的但是确实会影响告警处理逻辑
// 所以这里直接从memsto.AlertRuleCache中获取并覆盖
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return map[string]struct{}{}, false
}
r.rule = curRule
count := len(vectors)
alertingKeys := make(map[string]struct{})
for i := 0; i < count; i++ {
// compute hash
hash := str.MD5(fmt.Sprintf("%d_%s_%s", r.rule.Id, vectors[i].Key, r.cluster))
alertingKeys[hash] = struct{}{}
// rule disabled in this time span?
if isNoneffective(vectors[i].Timestamp, r.rule) {
logger.Debugf("event_disabled: rule_eval:%d rule:%v timestamp:%d", r.rule.Id, r.rule, vectors[i].Timestamp)
continue
}
// handle series tags
tagsMap := make(map[string]string)
for label, value := range vectors[i].Labels {
tagsMap[string(label)] = string(value)
}
// handle rule tags
for _, tag := range r.rule.AppendTagsJSON {
arr := strings.SplitN(tag, "=", 2)
tagsMap[arr[0]] = arr[1]
}
tagsMap["rulename"] = r.rule.Name
// handle target note
targetIdent, has := tagsMap["ident"]
targetNote := ""
if has {
target, exists := memsto.TargetCache.Get(string(targetIdent))
if exists {
targetNote = target.Note
// 对于包含ident的告警事件check一下ident所属bg和rule所属bg是否相同
// 如果告警规则选择了只在本BG生效那其他BG的机器就不能因此规则产生告警
if r.rule.EnableInBG == 1 && target.GroupId != r.rule.GroupId {
logger.Debugf("event_enable_in_bg: rule_eval:%d", r.rule.Id)
continue
}
} else if strings.Contains(r.rule.PromQl, "target_up") {
// target 已经不存在了,可能是被删除了
continue
}
}
event := &models.AlertCurEvent{
TriggerTime: vectors[i].Timestamp,
TagsMap: tagsMap,
GroupId: r.rule.GroupId,
RuleName: r.rule.Name,
Cluster: clusterName,
}
bg := memsto.BusiGroupCache.GetByBusiGroupId(r.rule.GroupId)
if bg != nil {
event.GroupName = bg.Name
}
// isMuted need TriggerTime RuleName TagsMap and clusterName
if IsMuted(event) {
logger.Infof("event_muted: rule_id=%d %s", r.rule.Id, vectors[i].Key)
continue
}
tagsArr := labelMapToArr(tagsMap)
sort.Strings(tagsArr)
event.Cate = r.rule.Cate
event.Hash = hash
event.RuleId = r.rule.Id
event.RuleName = r.rule.Name
event.RuleNote = r.rule.Note
event.RuleProd = r.rule.Prod
event.RuleAlgo = r.rule.Algorithm
event.Severity = r.rule.Severity
event.PromForDuration = r.rule.PromForDuration
event.PromQl = r.rule.PromQl
event.PromEvalInterval = r.rule.PromEvalInterval
event.Callbacks = r.rule.Callbacks
event.CallbacksJSON = r.rule.CallbacksJSON
event.RunbookUrl = r.rule.RunbookUrl
event.NotifyRecovered = r.rule.NotifyRecovered
event.NotifyChannels = r.rule.NotifyChannels
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
event.NotifyGroups = r.rule.NotifyGroups
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
event.TargetIdent = string(targetIdent)
event.TargetNote = targetNote
event.TriggerValue = readableValue(vectors[i].Value)
event.TagsJSON = tagsArr
event.Tags = strings.Join(tagsArr, ",,")
event.IsRecovered = false
event.LastEvalTime = now
if from != "inner" {
event.LastEvalTime = event.TriggerTime
}
r.handleNewEvent(event)
}
return alertingKeys, true
}
func readableValue(value float64) string {
ret := fmt.Sprintf("%.5f", value)
ret = strings.TrimRight(ret, "0")
return strings.TrimRight(ret, ".")
}
func labelMapToArr(m map[string]string) []string {
numLabels := len(m)
labelStrings := make([]string, 0, numLabels)
for label, value := range m {
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", label, value))
}
if numLabels > 1 {
sort.Strings(labelStrings)
}
return labelStrings
}
func (r *RuleEval) handleNewEvent(event *models.AlertCurEvent) {
if event.PromForDuration == 0 {
r.fireEvent(event)
return
}
var preTriggerTime int64
preEvent, has := r.pendings.Get(event.Hash)
if has {
r.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
preTriggerTime = preEvent.TriggerTime
} else {
r.pendings.Set(event.Hash, event)
preTriggerTime = event.TriggerTime
}
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
r.fireEvent(event)
}
}
func (r *RuleEval) fireEvent(event *models.AlertCurEvent) {
if fired, has := r.fires.Get(event.Hash); has {
r.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
if r.rule.NotifyRepeatStep == 0 {
// 说明不想重复通知那就直接返回了nothing to do
return
}
// 之前发送过告警了,这次是否要继续发送,要看是否过了通道静默时间
if event.LastEvalTime > fired.LastSentTime+int64(r.rule.NotifyRepeatStep)*60 {
if r.rule.NotifyMaxNumber == 0 {
// 最大可以发送次数如果是0表示不想限制最大发送次数一直发即可
event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
r.pushEventToQueue(event)
} else {
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
if fired.NotifyCurNumber >= r.rule.NotifyMaxNumber {
return
} else {
event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
r.pushEventToQueue(event)
}
}
}
} else {
event.NotifyCurNumber = 1
event.FirstTriggerTime = event.TriggerTime
r.pushEventToQueue(event)
}
}
func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
for _, hash := range r.pendings.Keys() {
if _, has := alertingKeys[hash]; has {
continue
}
r.pendings.Delete(hash)
}
for hash, event := range r.fires.GetAll() {
if _, has := alertingKeys[hash]; has {
continue
}
r.recoverEvent(hash, event, now)
}
}
func (r *RuleEval) RecoverEvent(hash string, now int64, value float64) {
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
if curRule == nil {
return
}
r.rule = curRule
r.pendings.Delete(hash)
event, has := r.fires.Get(hash)
if !has {
return
}
event.TriggerValue = fmt.Sprintf("%.5f", value)
r.recoverEvent(hash, event, now)
}
func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) {
// 如果配置了留观时长,就不能立马恢复了
if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration {
return
}
// 没查到触发阈值的vector姑且就认为这个vector的值恢复了
// 我确实无法分辨是prom中有值但是未满足阈值所以没返回还是prom中确实丢了一些点导致没有数据可以返回尴尬
r.fires.Delete(hash)
r.pendings.Delete(hash)
event.IsRecovered = true
event.LastEvalTime = now
// 可能是因为调整了promql才恢复的所以事件里边要体现最新的promql否则用户会比较困惑
// 当然其实rule的各个字段都可能发生变化了都更新一下吧
event.RuleName = r.rule.Name
event.RuleNote = r.rule.Note
event.RuleProd = r.rule.Prod
event.RuleAlgo = r.rule.Algorithm
event.Severity = r.rule.Severity
event.PromForDuration = r.rule.PromForDuration
event.PromQl = r.rule.PromQl
event.PromEvalInterval = r.rule.PromEvalInterval
event.Callbacks = r.rule.Callbacks
event.CallbacksJSON = r.rule.CallbacksJSON
event.RunbookUrl = r.rule.RunbookUrl
event.NotifyRecovered = r.rule.NotifyRecovered
event.NotifyChannels = r.rule.NotifyChannels
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
event.NotifyGroups = r.rule.NotifyGroups
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
r.pushEventToQueue(event)
}
func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
if !event.IsRecovered {
event.LastSentTime = event.LastEvalTime
r.fires.Set(event.Hash, event)
}
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
LogEvent(event, "push_queue")
if !EventQueue.PushFront(event) {
logger.Warningf("event_push_queue: queue is full")
}
}
func filterRecordingRules() {
ids := memsto.RecordingRuleCache.GetRuleIds()
count := len(ids)
mines := make([]*RuleSimpleInfo, 0, count)
for i := 0; i < count; i++ {
rule := memsto.RecordingRuleCache.Get(ids[i])
if rule == nil {
logger.Debugf("rule %d not found", ids[i])
continue
}
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}
for _, cluster := range clusters {
if config.ReaderClients.IsNil(cluster) {
// 没有这个集群的配置,跳过
continue
}
node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
if err != nil {
logger.Warning("failed to get node from hashring:", err)
continue
}
if node == config.C.Heartbeat.Endpoint {
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
}
}
}
Workers.BuildRe(mines)
}
type RecordingRuleEval struct {
cluster string
rule *models.RecordingRule
quit chan struct{}
}
func (r RecordingRuleEval) Stop() {
logger.Infof("recording_rule_eval:%d stopping", r.RuleID())
close(r.quit)
}
func (r RecordingRuleEval) RuleID() int64 {
return r.rule.Id
}
func (r RecordingRuleEval) Start() {
logger.Infof("recording_rule_eval:%d started", r.RuleID())
for {
select {
case <-r.quit:
// logger.Infof("rule_eval:%d stopped", r.RuleID())
return
default:
r.Work()
interval := r.rule.PromEvalInterval
if interval <= 0 {
interval = 10
}
time.Sleep(time.Duration(interval) * time.Second)
}
}
}
func (r RecordingRuleEval) Work() {
promql := strings.TrimSpace(r.rule.PromQl)
if promql == "" {
logger.Errorf("recording_rule_eval:%d promql is blank", r.RuleID())
return
}
if config.ReaderClients.IsNil(r.cluster) {
log.Println("reader client is nil")
return
}
value, warnings, err := config.ReaderClients.GetCli(r.cluster).Query(context.Background(), promql, time.Now())
if err != nil {
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
return
}
if len(warnings) > 0 {
logger.Errorf("recording_rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
return
}
ts := conv.ConvertToTimeSeries(value, r.rule)
if len(ts) != 0 {
for _, v := range ts {
writer.Writers.PushSample(r.rule.Name, v, r.cluster)
}
}
}
type RuleEvalForExternalType struct {
sync.RWMutex
rules map[string]RuleEval // key: hash of ruleid_promevalinterval_promql_cluster
}
var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[string]RuleEval)}
func (re *RuleEvalForExternalType) Build() {
rids := memsto.AlertRuleCache.GetRuleIds()
rules := make(map[string]*RuleSimpleInfo)
for i := 0; i < len(rids); i++ {
rule := memsto.AlertRuleCache.Get(rids[i])
if rule == nil {
continue
}
var clusters []string
if rule.Cluster == models.ClusterAll {
clusters = config.ReaderClients.GetClusterNames()
} else {
clusters = strings.Fields(rule.Cluster)
}
for _, cluster := range clusters {
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
cluster,
))
re.Lock()
rules[hash] = &RuleSimpleInfo{
Id: rule.Id,
Cluster: cluster,
}
re.Unlock()
}
}
// stop old
for oldHash := range re.rules {
if _, has := rules[oldHash]; !has {
re.Lock()
delete(re.rules, oldHash)
re.Unlock()
}
}
// start new
re.Lock()
defer re.Unlock()
for hash, ruleSimple := range rules {
if _, has := re.rules[hash]; has {
// already exists
continue
}
elst, err := models.AlertCurEventGetByRuleIdAndCluster(ruleSimple.Id, ruleSimple.Cluster)
if err != nil {
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
continue
}
firemap := make(map[string]*models.AlertCurEvent)
for i := 0; i < len(elst); i++ {
elst[i].DB2Mem()
firemap[elst[i].Hash] = elst[i]
}
fires := NewAlertCurEventMap()
fires.SetAll(firemap)
newRe := RuleEval{
rule: memsto.AlertRuleCache.Get(ruleSimple.Id),
quit: make(chan struct{}),
fires: fires,
pendings: NewAlertCurEventMap(),
cluster: ruleSimple.Cluster,
}
re.rules[hash] = newRe
}
}
func (re *RuleEvalForExternalType) Get(rid int64, cluster string) (RuleEval, bool) {
re.RLock()
defer re.RUnlock()
rule := memsto.AlertRuleCache.Get(rid)
if rule == nil {
return RuleEval{}, false
}
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
rule.Id,
rule.PromEvalInterval,
rule.PromQl,
cluster,
))
if ret, has := re.rules[hash]; has {
return ret, has
}
return RuleEval{}, false
}

View File

@@ -48,6 +48,15 @@ func (chr *ClusterHashRingType) GetNode(cluster, pk string) (string, error) {
return chr.Rings[cluster].Get(pk)
}
func (chr *ClusterHashRingType) IsHit(cluster string, pk string, currentNode string) bool {
node, err := chr.GetNode(cluster, pk)
if err != nil {
logger.Debugf("cluster:%s pk:%s failed to get node from hashring:%v", cluster, pk, err)
return false
}
return node == currentNode
}
func (chr *ClusterHashRingType) Set(cluster string, r *consistent.Consistent) {
chr.RLock()
defer chr.RUnlock()

View File

@@ -14,7 +14,6 @@ import (
"github.com/didi/nightingale/v5/src/pkg/aop"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/naming"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)

View File

@@ -5,15 +5,17 @@ import (
"strings"
"time"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/engine"
promstat "github.com/didi/nightingale/v5/src/server/stat"
"github.com/gin-gonic/gin"
"github.com/toolkits/pkg/ginx"
"github.com/toolkits/pkg/logger"
"github.com/toolkits/pkg/str"
"github.com/didi/nightingale/v5/src/models"
"github.com/didi/nightingale/v5/src/pkg/poster"
"github.com/didi/nightingale/v5/src/server/common/conv"
"github.com/didi/nightingale/v5/src/server/config"
"github.com/didi/nightingale/v5/src/server/engine"
"github.com/didi/nightingale/v5/src/server/naming"
promstat "github.com/didi/nightingale/v5/src/server/stat"
)
func pushEventToQueue(c *gin.Context) {
@@ -38,8 +40,7 @@ func pushEventToQueue(c *gin.Context) {
event.TagsMap[arr[0]] = arr[1]
}
// isMuted only need TriggerTime RuleName and TagsMap
if engine.IsMuted(event) {
if engine.EventMuteStra.IsMuted(nil, event) {
logger.Infof("event_muted: rule_id=%d %s", event.RuleId, event.Hash)
ginx.NewRender(c).Message(nil)
return
@@ -87,34 +88,60 @@ type eventForm struct {
func judgeEvent(c *gin.Context) {
var form eventForm
ginx.BindJSON(c, &form)
re, exists := engine.RuleEvalForExternal.Get(form.RuleId, form.Cluster)
ruleContext, exists := engine.GetExternalAlertRule(form.Cluster, form.RuleId)
if !exists {
ginx.Bomb(200, "rule not exists")
}
re.Judge(form.Cluster, form.Vectors)
ruleContext.HandleVectors(form.Vectors, "http")
ginx.NewRender(c).Message(nil)
}
func makeEvent(c *gin.Context) {
var events []*eventForm
ginx.BindJSON(c, &events)
now := time.Now().Unix()
//now := time.Now().Unix()
for i := 0; i < len(events); i++ {
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId, events[i].Cluster)
node, err := naming.ClusterHashRing.GetNode(events[i].Cluster, fmt.Sprintf("%d", events[i].RuleId))
if err != nil {
logger.Warningf("event:%+v get node err:%v", events[i], err)
ginx.Bomb(200, "event node not exists")
}
if node != config.C.Heartbeat.Endpoint {
err := forwardEvent(events[i], node)
if err != nil {
logger.Warningf("event:%+v forward err:%v", events[i], err)
ginx.Bomb(200, "event forward error")
}
continue
}
ruleContext, exists := engine.GetExternalAlertRule(events[i].Cluster, events[i].RuleId)
logger.Debugf("handle event:%+v exists:%v", events[i], exists)
if !exists {
ginx.Bomb(200, "rule not exists")
}
if events[i].Alert {
go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors)
go ruleContext.HandleVectors(events[i].Vectors, "http")
} else {
for _, vector := range events[i].Vectors {
hash := str.MD5(fmt.Sprintf("%d_%s_%s", events[i].RuleId, vector.Key, events[i].Cluster))
now := vector.Timestamp
go re.RecoverEvent(hash, now, vector.Value)
alertVector := engine.NewAlertVector(ruleContext, nil, vector, "http")
readableString := vector.ReadableValue()
go ruleContext.RecoverSingle(alertVector.Hash(), vector.Timestamp, &readableString)
}
}
}
ginx.NewRender(c).Message(nil)
}
// event 不归本实例处理,转发给对应的实例
func forwardEvent(event *eventForm, instance string) error {
ur := fmt.Sprintf("http://%s/v1/n9e/make-event", instance)
res, code, err := poster.PostJSON(ur, time.Second*5, []*eventForm{event}, 3)
if err != nil {
return err
}
logger.Infof("forward event: result=succ url=%s code=%d event:%v response=%s", ur, code, event, string(res))
return nil
}

View File

@@ -154,7 +154,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...strin
if ok {
succ := c.PushFront(v)
if !succ {
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, c.Len())
logger.Warningf("Write cluster:%s channel(%s) full, current channel size: %d", cluster, ident, c.Len())
}
}
}

View File

@@ -141,6 +141,7 @@ func configRoute(r *gin.Engine, version string) {
pages.GET("/auth/callback", loginCallback)
pages.GET("/auth/callback/cas", loginCallbackCas)
pages.GET("/auth/callback/oauth", loginCallbackOAuth)
pages.GET("/auth/perms", allPerms)
pages.GET("/metrics/desc", metricsDescGetFile)
pages.POST("/metrics/desc", metricsDescGetMap)

View File

@@ -214,11 +214,19 @@ func loginCallback(c *gin.Context) {
if user != nil {
if config.C.OIDC.CoverAttributes {
user.Nickname = ret.Nickname
user.Email = ret.Email
user.Phone = ret.Phone
user.UpdateAt = time.Now().Unix()
if ret.Nickname != "" {
user.Nickname = ret.Nickname
}
if ret.Email != "" {
user.Email = ret.Email
}
if ret.Phone != "" {
user.Phone = ret.Phone
}
user.UpdateAt = time.Now().Unix()
user.Update("email", "nickname", "phone", "update_at")
}
} else {
@@ -316,9 +324,18 @@ func loginCallbackCas(c *gin.Context) {
ginx.Dangerous(err)
if user != nil {
if config.C.CAS.CoverAttributes {
user.Nickname = ret.Nickname
user.Email = ret.Email
user.Phone = ret.Phone
if ret.Nickname != "" {
user.Nickname = ret.Nickname
}
if ret.Email != "" {
user.Email = ret.Email
}
if ret.Phone != "" {
user.Phone = ret.Phone
}
user.UpdateAt = time.Now().Unix()
ginx.Dangerous(user.Update("email", "nickname", "phone", "update_at"))
}
@@ -409,11 +426,19 @@ func loginCallbackOAuth(c *gin.Context) {
if user != nil {
if config.C.OAuth.CoverAttributes {
user.Nickname = ret.Nickname
user.Email = ret.Email
user.Phone = ret.Phone
user.UpdateAt = time.Now().Unix()
if ret.Nickname != "" {
user.Nickname = ret.Nickname
}
if ret.Email != "" {
user.Email = ret.Email
}
if ret.Phone != "" {
user.Phone = ret.Phone
}
user.UpdateAt = time.Now().Unix()
user.Update("email", "nickname", "phone", "update_at")
}
} else {

View File

@@ -19,3 +19,18 @@ func permsGets(c *gin.Context) {
lst, err := models.OperationsOfRole(strings.Fields(user.Roles))
ginx.NewRender(c).Data(lst, err)
}
func allPerms(c *gin.Context) {
roles, err := models.RoleGetsAll()
ginx.Dangerous(err)
m := make(map[string][]string)
for _, r := range roles {
lst, err := models.OperationsOfRole(strings.Fields(r.Name))
if err != nil {
continue
}
m[r.Name] = lst
}
ginx.NewRender(c).Data(m, err)
}