mirror of
https://github.com/ccfos/nightingale.git
synced 2026-03-06 07:59:14 +00:00
Compare commits
24 Commits
v5.14.4
...
v5.14.5-fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96f3cfa065 | ||
|
|
144f0ad795 | ||
|
|
1375ff1435 | ||
|
|
26dc03146b | ||
|
|
e8378c6858 | ||
|
|
da182f1b05 | ||
|
|
cad0d3cf0f | ||
|
|
fec1e686f4 | ||
|
|
a357f11164 | ||
|
|
d03ba4c4d0 | ||
|
|
d531178c9b | ||
|
|
174df1495c | ||
|
|
ffe423148d | ||
|
|
926559c9a7 | ||
|
|
136642f126 | ||
|
|
a054828fcc | ||
|
|
e46e946689 | ||
|
|
cf083c543b | ||
|
|
2e1508fdd3 | ||
|
|
954543a5b2 | ||
|
|
71a402c33c | ||
|
|
e30a5a316f | ||
|
|
0c9b7de391 | ||
|
|
063b6f63df |
2430
etc/dashboards/vmware_by_vsphere-monitor.json
Normal file
2430
etc/dashboards/vmware_by_vsphere-monitor.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -9,7 +9,7 @@ ClusterName = "Default"
|
||||
BusiGroupLabelKey = "busigroup"
|
||||
|
||||
# sleep x seconds, then start judge engine
|
||||
EngineDelay = 60
|
||||
EngineDelay = 30
|
||||
|
||||
DisableUsageReport = true
|
||||
|
||||
|
||||
8
go.mod
8
go.mod
@@ -6,7 +6,7 @@ require (
|
||||
github.com/coreos/go-oidc v2.2.1+incompatible
|
||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||
github.com/gin-contrib/pprof v1.3.0
|
||||
github.com/gin-gonic/gin v1.7.4
|
||||
github.com/gin-gonic/gin v1.7.7
|
||||
github.com/go-ldap/ldap/v3 v3.4.1
|
||||
github.com/go-redis/redis/v9 v9.0.0-rc.1
|
||||
github.com/gogo/protobuf v1.3.2
|
||||
@@ -24,7 +24,7 @@ require (
|
||||
github.com/prometheus/common v0.32.1
|
||||
github.com/prometheus/prometheus v2.5.0+incompatible
|
||||
github.com/tidwall/gjson v1.14.0
|
||||
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5
|
||||
github.com/toolkits/pkg v1.3.3
|
||||
github.com/urfave/cli/v2 v2.3.0
|
||||
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c
|
||||
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
|
||||
@@ -74,10 +74,10 @@ require (
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
go.uber.org/automaxprocs v1.4.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/text v0.3.8 // indirect
|
||||
google.golang.org/appengine v1.6.6 // indirect
|
||||
google.golang.org/genproto v0.0.0-20211007155348-82e027067bd4 // indirect
|
||||
google.golang.org/grpc v1.41.0 // indirect
|
||||
|
||||
20
go.sum
20
go.sum
@@ -97,8 +97,8 @@ github.com/gin-contrib/pprof v1.3.0/go.mod h1:waMjT1H9b179t3CxuG1cV3DHpga6ybizwf
|
||||
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
|
||||
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
|
||||
github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
|
||||
github.com/gin-gonic/gin v1.7.4 h1:QmUZXrvJ9qZ3GfWvQ+2wnW/1ePrTEJqPKMYEU3lD/DM=
|
||||
github.com/gin-gonic/gin v1.7.4/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
|
||||
github.com/gin-gonic/gin v1.7.7 h1:3DoBmSbJbZAWqXJC3SLjAPfutPJJRN1U5pALB7EeTTs=
|
||||
github.com/gin-gonic/gin v1.7.7/go.mod h1:axIBovoeJpVj8S3BwE0uPMTeReE4+AfFtqpqaZ1qq1U=
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.1 h1:pDbRAunXzIUXfx4CB2QJFv5IuPiuoW+sWvr/Us009o8=
|
||||
github.com/go-asn1-ber/asn1-ber v1.5.1/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0=
|
||||
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
|
||||
@@ -372,8 +372,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5 h1:kMCwr2gNHjHEVgw+uNVdiPbGadj4TekbIfrTXElZeI0=
|
||||
github.com/toolkits/pkg v1.3.1-0.20220824084030-9f9f830a05d5/go.mod h1:PvTBg/UxazPgBz6VaCM7FM7kJldjfVrsuN6k4HT/VuY=
|
||||
github.com/toolkits/pkg v1.3.3 h1:qpQAQ18Jr47dv4NcBALlH0ad7L2PuqSh5K+nJKNg5lU=
|
||||
github.com/toolkits/pkg v1.3.3/go.mod h1:USXArTJlz1f1DCnQHNPYugO8GPkr1NRhP4eYQZQVshk=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
@@ -383,6 +383,7 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
@@ -415,8 +416,9 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
|
||||
golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 h1:HWj/xjIHfjYU5nVXpTM0s39J9CbLn7Cc5a7IC5rwsMQ=
|
||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@@ -447,6 +449,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
|
||||
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@@ -499,6 +502,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
@@ -544,10 +548,12 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@@ -556,8 +562,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
@@ -608,6 +615,7 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
|
||||
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
||||
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@@ -3,9 +3,9 @@ package models
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/pkg/tplx"
|
||||
)
|
||||
@@ -77,7 +77,7 @@ func (e *AlertCurEvent) ParseRule(field string) error {
|
||||
}
|
||||
|
||||
text := strings.Join(append(defs, f), "")
|
||||
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(tplx.TemplateFuncMap).Parse(text)
|
||||
t, err := template.New(fmt.Sprint(e.RuleId)).Funcs(template.FuncMap(tplx.TemplateFuncMap)).Parse(text)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -136,7 +136,9 @@ func (m *AlertMute) Add() error {
|
||||
if err := m.Verify(); err != nil {
|
||||
return err
|
||||
}
|
||||
m.CreateAt = time.Now().Unix()
|
||||
now := time.Now().Unix()
|
||||
m.CreateAt = now
|
||||
m.UpdateAt = now
|
||||
return Insert(m)
|
||||
}
|
||||
|
||||
|
||||
@@ -14,45 +14,50 @@ import (
|
||||
)
|
||||
|
||||
type AlertRule struct {
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"` // busi group id
|
||||
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
|
||||
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
|
||||
Name string `json:"name"` // rule name
|
||||
Note string `json:"note"` // will sent in notify
|
||||
Prod string `json:"prod"` // product empty means n9e
|
||||
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
|
||||
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
|
||||
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` //
|
||||
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
|
||||
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
|
||||
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
|
||||
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
|
||||
PromQl string `json:"prom_ql"` // just one ql
|
||||
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
|
||||
EnableStime string `json:"enable_stime"` // e.g. 00:00
|
||||
EnableEtime string `json:"enable_etime"` // e.g. 23:59
|
||||
EnableDaysOfWeek string `json:"-"` // split by space: 0 1 2 3 4 5 6
|
||||
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
|
||||
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
|
||||
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
|
||||
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
|
||||
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
|
||||
NotifyGroups string `json:"-"` // split by space: 233 43
|
||||
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
|
||||
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
|
||||
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
|
||||
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
|
||||
RecoverDuration int64 `json:"recover_duration"` // unit: s
|
||||
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
|
||||
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
|
||||
RunbookUrl string `json:"runbook_url"` // sop url
|
||||
AppendTags string `json:"-"` // split by space: service=n9e mod=api
|
||||
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
Id int64 `json:"id" gorm:"primaryKey"`
|
||||
GroupId int64 `json:"group_id"` // busi group id
|
||||
Cate string `json:"cate"` // alert rule cate (prometheus|elasticsearch)
|
||||
Cluster string `json:"cluster"` // take effect by clusters, seperated by space
|
||||
Name string `json:"name"` // rule name
|
||||
Note string `json:"note"` // will sent in notify
|
||||
Prod string `json:"prod"` // product empty means n9e
|
||||
Algorithm string `json:"algorithm"` // algorithm (''|holtwinters), empty means threshold
|
||||
AlgoParams string `json:"-" gorm:"algo_params"` // params algorithm need
|
||||
AlgoParamsJson interface{} `json:"algo_params" gorm:"-"` //
|
||||
Delay int `json:"delay"` // Time (in seconds) to delay evaluation
|
||||
Severity int `json:"severity"` // 1: Emergency 2: Warning 3: Notice
|
||||
Disabled int `json:"disabled"` // 0: enabled, 1: disabled
|
||||
PromForDuration int `json:"prom_for_duration"` // prometheus for, unit:s
|
||||
PromQl string `json:"prom_ql"` // just one ql
|
||||
PromEvalInterval int `json:"prom_eval_interval"` // unit:s
|
||||
EnableStime string `json:"-"` // split by space: "00:00 10:00 12:00"
|
||||
EnableStimeJSON string `json:"enable_stime" gorm:"-"` // for fe
|
||||
EnableStimesJSON []string `json:"enable_stimes" gorm:"-"` // for fe
|
||||
EnableEtime string `json:"-"` // split by space: "00:00 10:00 12:00"
|
||||
EnableEtimeJSON string `json:"enable_etime" gorm:"-"` // for fe
|
||||
EnableEtimesJSON []string `json:"enable_etimes" gorm:"-"` // for fe
|
||||
EnableDaysOfWeek string `json:"-"` // eg: "0 1 2 3 4 5 6 ; 0 1 2"
|
||||
EnableDaysOfWeekJSON []string `json:"enable_days_of_week" gorm:"-"` // for fe
|
||||
EnableDaysOfWeeksJSON [][]string `json:"enable_days_of_weeks" gorm:"-"` // for fe
|
||||
EnableInBG int `json:"enable_in_bg"` // 0: global 1: enable one busi-group
|
||||
NotifyRecovered int `json:"notify_recovered"` // whether notify when recovery
|
||||
NotifyChannels string `json:"-"` // split by space: sms voice email dingtalk wecom
|
||||
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
|
||||
NotifyGroups string `json:"-"` // split by space: 233 43
|
||||
NotifyGroupsObj []UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
|
||||
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
|
||||
NotifyRepeatStep int `json:"notify_repeat_step"` // notify repeat interval, unit: min
|
||||
NotifyMaxNumber int `json:"notify_max_number"` // notify: max number
|
||||
RecoverDuration int64 `json:"recover_duration"` // unit: s
|
||||
Callbacks string `json:"-"` // split by space: http://a.com/api/x http://a.com/api/y'
|
||||
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
|
||||
RunbookUrl string `json:"runbook_url"` // sop url
|
||||
AppendTags string `json:"-"` // split by space: service=n9e mod=api
|
||||
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
|
||||
CreateAt int64 `json:"create_at"`
|
||||
CreateBy string `json:"create_by"`
|
||||
UpdateAt int64 `json:"update_at"`
|
||||
UpdateBy string `json:"update_by"`
|
||||
}
|
||||
|
||||
func (ar *AlertRule) TableName() string {
|
||||
@@ -224,7 +229,29 @@ func (ar *AlertRule) FillNotifyGroups(cache map[int64]*UserGroup) error {
|
||||
}
|
||||
|
||||
func (ar *AlertRule) FE2DB() error {
|
||||
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeekJSON, " ")
|
||||
if len(ar.EnableStimesJSON) > 0 {
|
||||
ar.EnableStime = strings.Join(ar.EnableStimesJSON, " ")
|
||||
ar.EnableEtime = strings.Join(ar.EnableEtimesJSON, " ")
|
||||
} else {
|
||||
ar.EnableStime = ar.EnableStimeJSON
|
||||
ar.EnableEtime = ar.EnableEtimeJSON
|
||||
}
|
||||
|
||||
if len(ar.EnableDaysOfWeeksJSON) > 0 {
|
||||
for i := 0; i < len(ar.EnableDaysOfWeeksJSON); i++ {
|
||||
if len(ar.EnableDaysOfWeeksJSON) == 1 {
|
||||
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeeksJSON[i], " ")
|
||||
} else {
|
||||
if i == len(ar.EnableDaysOfWeeksJSON)-1 {
|
||||
ar.EnableDaysOfWeek += strings.Join(ar.EnableDaysOfWeeksJSON[i], " ")
|
||||
} else {
|
||||
ar.EnableDaysOfWeek += strings.Join(ar.EnableDaysOfWeeksJSON[i], " ") + ";"
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ar.EnableDaysOfWeek = strings.Join(ar.EnableDaysOfWeekJSON, " ")
|
||||
}
|
||||
ar.NotifyChannels = strings.Join(ar.NotifyChannelsJSON, " ")
|
||||
ar.NotifyGroups = strings.Join(ar.NotifyGroupsJSON, " ")
|
||||
ar.Callbacks = strings.Join(ar.CallbacksJSON, " ")
|
||||
@@ -239,7 +266,21 @@ func (ar *AlertRule) FE2DB() error {
|
||||
}
|
||||
|
||||
func (ar *AlertRule) DB2FE() {
|
||||
ar.EnableDaysOfWeekJSON = strings.Fields(ar.EnableDaysOfWeek)
|
||||
ar.EnableStimesJSON = strings.Fields(ar.EnableStime)
|
||||
ar.EnableEtimesJSON = strings.Fields(ar.EnableEtime)
|
||||
if len(ar.EnableEtimesJSON) > 0 {
|
||||
ar.EnableStimeJSON = ar.EnableStimesJSON[0]
|
||||
ar.EnableEtimeJSON = ar.EnableEtimesJSON[0]
|
||||
}
|
||||
|
||||
cache := strings.Split(ar.EnableDaysOfWeek, ";")
|
||||
for i := 0; i < len(cache); i++ {
|
||||
ar.EnableDaysOfWeeksJSON = append(ar.EnableDaysOfWeeksJSON, strings.Fields(cache[i]))
|
||||
}
|
||||
if len(ar.EnableDaysOfWeeksJSON) > 0 {
|
||||
ar.EnableDaysOfWeekJSON = ar.EnableDaysOfWeeksJSON[0]
|
||||
}
|
||||
|
||||
ar.NotifyChannelsJSON = strings.Fields(ar.NotifyChannels)
|
||||
ar.NotifyGroupsJSON = strings.Fields(ar.NotifyGroups)
|
||||
ar.CallbacksJSON = strings.Fields(ar.Callbacks)
|
||||
@@ -425,3 +466,38 @@ func AlertRuleStatistics(cluster string) (*Statistics, error) {
|
||||
|
||||
return stats[0], nil
|
||||
}
|
||||
|
||||
func (ar *AlertRule) IsPrometheusRule() bool {
|
||||
return ar.Algorithm == "" && (ar.Cate == "" || strings.ToLower(ar.Cate) == "prometheus")
|
||||
}
|
||||
|
||||
func (ar *AlertRule) GenerateNewEvent() *AlertCurEvent {
|
||||
event := &AlertCurEvent{}
|
||||
ar.UpdateEvent(event)
|
||||
return event
|
||||
}
|
||||
|
||||
func (ar *AlertRule) UpdateEvent(event *AlertCurEvent) {
|
||||
if event == nil {
|
||||
return
|
||||
}
|
||||
event.GroupId = ar.GroupId
|
||||
event.Cate = ar.Cate
|
||||
event.RuleId = ar.Id
|
||||
event.RuleName = ar.Name
|
||||
event.RuleNote = ar.Note
|
||||
event.RuleProd = ar.Prod
|
||||
event.RuleAlgo = ar.Algorithm
|
||||
event.Severity = ar.Severity
|
||||
event.PromForDuration = ar.PromForDuration
|
||||
event.PromQl = ar.PromQl
|
||||
event.PromEvalInterval = ar.PromEvalInterval
|
||||
event.Callbacks = ar.Callbacks
|
||||
event.CallbacksJSON = ar.CallbacksJSON
|
||||
event.RunbookUrl = ar.RunbookUrl
|
||||
event.NotifyRecovered = ar.NotifyRecovered
|
||||
event.NotifyChannels = ar.NotifyChannels
|
||||
event.NotifyChannelsJSON = ar.NotifyChannelsJSON
|
||||
event.NotifyGroups = ar.NotifyGroups
|
||||
event.NotifyGroupsJSON = ar.NotifyGroupsJSON
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package conv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
)
|
||||
@@ -13,6 +15,12 @@ type Vector struct {
|
||||
Value float64 `json:"value"`
|
||||
}
|
||||
|
||||
func (v *Vector) ReadableValue() string {
|
||||
ret := fmt.Sprintf("%.5f", v.Value)
|
||||
ret = strings.TrimRight(ret, "0")
|
||||
return strings.TrimRight(ret, ".")
|
||||
}
|
||||
|
||||
func ConvertVectors(value model.Value) (lst []Vector) {
|
||||
if value == nil {
|
||||
return
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/pkg/prom"
|
||||
)
|
||||
|
||||
@@ -11,9 +13,12 @@ type PromClientMap struct {
|
||||
Clients map[string]prom.API
|
||||
}
|
||||
|
||||
var ReaderClients *PromClientMap = &PromClientMap{Clients: make(map[string]prom.API)}
|
||||
var ReaderClients = &PromClientMap{Clients: make(map[string]prom.API)}
|
||||
|
||||
func (pc *PromClientMap) Set(clusterName string, c prom.API) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
pc.Clients[clusterName] = c
|
||||
@@ -38,10 +43,6 @@ func (pc *PromClientMap) GetCli(cluster string) prom.API {
|
||||
}
|
||||
|
||||
func (pc *PromClientMap) IsNil(cluster string) bool {
|
||||
if pc == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
pc.RLock()
|
||||
defer pc.RUnlock()
|
||||
|
||||
@@ -53,6 +54,30 @@ func (pc *PromClientMap) IsNil(cluster string) bool {
|
||||
return c == nil
|
||||
}
|
||||
|
||||
// Hit 根据当前有效的cluster和规则的cluster配置计算有效的cluster列表
|
||||
func (pc *PromClientMap) Hit(cluster string) []string {
|
||||
pc.RLock()
|
||||
defer pc.RUnlock()
|
||||
clusters := make([]string, 0, len(pc.Clients))
|
||||
if cluster == models.ClusterAll {
|
||||
for c := range pc.Clients {
|
||||
clusters = append(clusters, c)
|
||||
}
|
||||
return clusters
|
||||
}
|
||||
|
||||
ruleClusters := strings.Fields(cluster)
|
||||
for c := range pc.Clients {
|
||||
for _, rc := range ruleClusters {
|
||||
if rc == c {
|
||||
clusters = append(clusters, c)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
return clusters
|
||||
}
|
||||
|
||||
func (pc *PromClientMap) Reset() {
|
||||
pc.Lock()
|
||||
defer pc.Unlock()
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
package config
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/pkg/tls"
|
||||
)
|
||||
|
||||
type PromOption struct {
|
||||
ClusterName string
|
||||
@@ -11,6 +15,9 @@ type PromOption struct {
|
||||
Timeout int64
|
||||
DialTimeout int64
|
||||
|
||||
UseTLS bool
|
||||
tls.ClientConfig
|
||||
|
||||
MaxIdleConnsPerHost int
|
||||
|
||||
Headers []string
|
||||
@@ -65,12 +72,6 @@ func (pos *PromOptionsStruct) Set(clusterName string, po PromOption) {
|
||||
pos.Unlock()
|
||||
}
|
||||
|
||||
func (pos *PromOptionsStruct) Sets(clusterName string, po PromOption) {
|
||||
pos.Lock()
|
||||
pos.Data = map[string]PromOption{clusterName: po}
|
||||
pos.Unlock()
|
||||
}
|
||||
|
||||
func (pos *PromOptionsStruct) Del(clusterName string) {
|
||||
pos.Lock()
|
||||
delete(pos.Data, clusterName)
|
||||
|
||||
@@ -73,7 +73,7 @@ func loadFromDatabase() {
|
||||
}
|
||||
|
||||
if cval == "" {
|
||||
logger.Warningf("ckey: %s is empty", ckey)
|
||||
logger.Debugf("ckey: %s is empty", ckey)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ func loadFromDatabase() {
|
||||
}
|
||||
|
||||
logger.Info("setClientFromPromOption success: ", cluster)
|
||||
PromOptions.Sets(cluster, po)
|
||||
PromOptions.Set(cluster, po)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ func loadFromDatabase() {
|
||||
continue
|
||||
}
|
||||
|
||||
PromOptions.Sets(cluster, po)
|
||||
PromOptions.Set(cluster, po)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -119,17 +119,28 @@ func loadFromDatabase() {
|
||||
}
|
||||
|
||||
func newClientFromPromOption(po PromOption) (api.Client, error) {
|
||||
transport := &http.Transport{
|
||||
// TLSClientConfig: tlsConfig,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
|
||||
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
|
||||
}
|
||||
|
||||
if po.UseTLS {
|
||||
tlsConfig, err := po.TLSConfig()
|
||||
if err != nil {
|
||||
logger.Errorf("new cluster %s fail: %v", po.Url, err)
|
||||
return nil, err
|
||||
}
|
||||
transport.TLSClientConfig = tlsConfig
|
||||
}
|
||||
|
||||
return api.NewClient(api.Config{
|
||||
Address: po.Url,
|
||||
RoundTripper: &http.Transport{
|
||||
// TLSClientConfig: tlsConfig,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: time.Duration(po.DialTimeout) * time.Millisecond,
|
||||
}).DialContext,
|
||||
ResponseHeaderTimeout: time.Duration(po.Timeout) * time.Millisecond,
|
||||
MaxIdleConnsPerHost: po.MaxIdleConnsPerHost,
|
||||
},
|
||||
Address: po.Url,
|
||||
RoundTripper: transport,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -142,11 +153,17 @@ func setClientFromPromOption(clusterName string, po PromOption) error {
|
||||
return fmt.Errorf("prometheus url is blank")
|
||||
}
|
||||
|
||||
if strings.HasPrefix(po.Url, "https") {
|
||||
po.UseTLS = true
|
||||
po.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
cli, err := newClientFromPromOption(po)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to newClientFromPromOption: %v", err)
|
||||
}
|
||||
|
||||
logger.Debugf("setClientFromPromOption: %s, %+v", clusterName, po)
|
||||
ReaderClients.Set(clusterName, prom.NewAPI(cli, prom.ClientOptions{
|
||||
BasicAuthUser: po.BasicAuthUser,
|
||||
BasicAuthPass: po.BasicAuthPass,
|
||||
|
||||
@@ -76,9 +76,10 @@ func persist(event *models.AlertCurEvent) {
|
||||
// 不管是告警还是恢复,全量告警里都要记录
|
||||
if err := his.Add(); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_his_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
|
||||
"event_persist_his_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
@@ -101,9 +102,10 @@ func persist(event *models.AlertCurEvent) {
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
@@ -126,9 +128,10 @@ func persist(event *models.AlertCurEvent) {
|
||||
if event.Id > 0 {
|
||||
if err := event.Add(); err != nil {
|
||||
logger.Errorf(
|
||||
"event_persist_cur_fail: %v rule_id=%d hash=%s tags=%v timestamp=%d value=%s",
|
||||
"event_persist_cur_fail: %v rule_id=%d cluster:%s hash=%s tags=%v timestamp=%d value=%s",
|
||||
err,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.Hash,
|
||||
event.TagsJSON,
|
||||
event.TriggerTime,
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
)
|
||||
|
||||
func isNoneffective(timestamp int64, alertRule *models.AlertRule) bool {
|
||||
if alertRule.Disabled == 1 {
|
||||
return true
|
||||
}
|
||||
|
||||
tm := time.Unix(timestamp, 0)
|
||||
triggerTime := tm.Format("15:04")
|
||||
triggerWeek := strconv.Itoa(int(tm.Weekday()))
|
||||
|
||||
if alertRule.EnableStime <= alertRule.EnableEtime {
|
||||
if triggerTime < alertRule.EnableStime || triggerTime > alertRule.EnableEtime {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
if triggerTime < alertRule.EnableStime && triggerTime > alertRule.EnableEtime {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
alertRule.EnableDaysOfWeek = strings.Replace(alertRule.EnableDaysOfWeek, "7", "0", 1)
|
||||
|
||||
return !strings.Contains(alertRule.EnableDaysOfWeek, triggerWeek)
|
||||
}
|
||||
@@ -5,13 +5,15 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/server/common/sender"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
"github.com/toolkits/pkg/container/list"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
)
|
||||
|
||||
var EventQueue = list.NewSafeListLimited(10000000)
|
||||
|
||||
func Start(ctx context.Context) error {
|
||||
err := reloadTpls()
|
||||
if err != nil {
|
||||
@@ -22,7 +24,9 @@ func Start(ctx context.Context) error {
|
||||
go loopConsume(ctx)
|
||||
|
||||
// filter my rules and start worker
|
||||
go loopFilterRules(ctx)
|
||||
//go loopFilterRules(ctx)
|
||||
|
||||
go ruleHolder.LoopSyncRules(ctx)
|
||||
|
||||
go reportQueueSize()
|
||||
|
||||
|
||||
@@ -17,11 +17,12 @@ func LogEvent(event *models.AlertCurEvent, location string, err ...error) {
|
||||
}
|
||||
|
||||
logger.Infof(
|
||||
"event(%s %s) %s: rule_id=%d %v%s@%d %s",
|
||||
"event(%s %s) %s: rule_id=%d cluster:%s %v%s@%d %s",
|
||||
event.Hash,
|
||||
status,
|
||||
location,
|
||||
event.RuleId,
|
||||
event.Cluster,
|
||||
event.TagsJSON,
|
||||
event.TriggerValue,
|
||||
event.TriggerTime,
|
||||
|
||||
@@ -1,90 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
)
|
||||
|
||||
// 如果传入了clock这个可选参数,就表示使用这个clock表示的时间,否则就从event的字段中取TriggerTime
|
||||
func IsMuted(event *models.AlertCurEvent, clock ...int64) bool {
|
||||
mutes, has := memsto.AlertMuteCache.Gets(event.GroupId)
|
||||
if !has || len(mutes) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := 0; i < len(mutes); i++ {
|
||||
if matchMute(event, mutes[i], clock...) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
|
||||
if mute.Disabled == 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
ts := event.TriggerTime
|
||||
if len(clock) > 0 {
|
||||
ts = clock[0]
|
||||
}
|
||||
|
||||
// 如果不是全局的,判断 cluster
|
||||
if mute.Cluster != models.ClusterAll {
|
||||
// event.Cluster 是一个字符串,可能是多个cluster的组合,比如"cluster1 cluster2"
|
||||
clusters := strings.Fields(mute.Cluster)
|
||||
cm := make(map[string]struct{}, len(clusters))
|
||||
for i := 0; i < len(clusters); i++ {
|
||||
cm[clusters[i]] = struct{}{}
|
||||
}
|
||||
|
||||
// 判断event.Cluster是否包含在cm中
|
||||
if _, has := cm[event.Cluster]; !has {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if ts < mute.Btime || ts > mute.Etime {
|
||||
return false
|
||||
}
|
||||
|
||||
return matchTags(event.TagsMap, mute.ITags)
|
||||
}
|
||||
|
||||
func matchTag(value string, filter models.TagFilter) bool {
|
||||
switch filter.Func {
|
||||
case "==":
|
||||
return filter.Value == value
|
||||
case "!=":
|
||||
return filter.Value != value
|
||||
case "in":
|
||||
_, has := filter.Vset[value]
|
||||
return has
|
||||
case "not in":
|
||||
_, has := filter.Vset[value]
|
||||
return !has
|
||||
case "=~":
|
||||
return filter.Regexp.MatchString(value)
|
||||
case "!~":
|
||||
return !filter.Regexp.MatchString(value)
|
||||
}
|
||||
// unexpect func
|
||||
return false
|
||||
}
|
||||
|
||||
func matchTags(eventTagsMap map[string]string, itags []models.TagFilter) bool {
|
||||
for _, filter := range itags {
|
||||
value, has := eventTagsMap[filter.Key]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
if !matchTag(value, filter) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
202
src/server/engine/mute_strategy.go
Normal file
202
src/server/engine/mute_strategy.go
Normal file
@@ -0,0 +1,202 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
)
|
||||
|
||||
var AlertMuteStrategies = AlertMuteStrategiesType{
|
||||
&TimeNonEffectiveMuteStrategy{},
|
||||
&IdentNotExistsMuteStrategy{},
|
||||
&BgNotMatchMuteStrategy{},
|
||||
&EventMuteStrategy{},
|
||||
}
|
||||
|
||||
type AlertMuteStrategiesType []AlertMuteStrategy
|
||||
|
||||
func (ss AlertMuteStrategiesType) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
|
||||
for _, s := range ss {
|
||||
if s.IsMuted(rule, event) {
|
||||
logger.Debugf("[%T] mute: rule:%+v event:%+v", s, rule, event)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// AlertMuteStrategy 是过滤event的抽象,当返回true时,表示该告警时间由于某些原因不需要告警
|
||||
type AlertMuteStrategy interface {
|
||||
IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool
|
||||
}
|
||||
|
||||
// TimeNonEffectiveMuteStrategy 根据规则配置的告警时间过滤,如果产生的告警不在规则配置的告警时间内,则不告警
|
||||
type TimeNonEffectiveMuteStrategy struct{}
|
||||
|
||||
func (s *TimeNonEffectiveMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
|
||||
if rule.Disabled == 1 {
|
||||
logger.Debugf("[%T] mute: rule_disabled:%d cluster:%s", s, rule.Id, event.Cluster)
|
||||
return true
|
||||
}
|
||||
|
||||
tm := time.Unix(event.TriggerTime, 0)
|
||||
triggerTime := tm.Format("15:04")
|
||||
triggerWeek := strconv.Itoa(int(tm.Weekday()))
|
||||
|
||||
enableStime := strings.Fields(rule.EnableStime)
|
||||
enableEtime := strings.Fields(rule.EnableEtime)
|
||||
enableDaysOfWeek := strings.Split(rule.EnableDaysOfWeek, ";")
|
||||
length := len(enableDaysOfWeek)
|
||||
// enableStime,enableEtime,enableDaysOfWeek三者长度肯定相同,这里循环一个即可
|
||||
for i := 0; i < length; i++ {
|
||||
enableDaysOfWeek[i] = strings.Replace(enableDaysOfWeek[i], "7", "0", 1)
|
||||
if !strings.Contains(enableDaysOfWeek[i], triggerWeek) {
|
||||
continue
|
||||
}
|
||||
if enableStime[i] <= enableEtime[i] {
|
||||
if triggerTime < enableStime[i] || triggerTime > enableEtime[i] {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
if triggerTime < enableStime[i] && triggerTime > enableEtime[i] {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// 到这里说明当前时刻在告警规则的某组生效时间范围内,直接返回 false
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// IdentNotExistsMuteStrategy 根据ident是否存在过滤,如果ident不存在,则target_up的告警直接过滤掉
|
||||
type IdentNotExistsMuteStrategy struct{}
|
||||
|
||||
func (s *IdentNotExistsMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
|
||||
ident, has := event.TagsMap["ident"]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
_, exists := memsto.TargetCache.Get(ident)
|
||||
// 如果是target_up的告警,且ident已经不存在了,直接过滤掉
|
||||
// 这里的判断有点太粗暴了,但是目前没有更好的办法
|
||||
if !exists && strings.Contains(rule.PromQl, "target_up") {
|
||||
logger.Debugf("[%T] mute: rule_eval:%d cluster:%s ident:%s", s, rule.Id, event.Cluster, ident)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// BgNotMatchMuteStrategy 当规则开启只在bg内部告警时,对于非bg内部的机器过滤
|
||||
type BgNotMatchMuteStrategy struct{}
|
||||
|
||||
func (s *BgNotMatchMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
|
||||
// 没有开启BG内部告警,直接不过滤
|
||||
if rule.EnableInBG == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
ident, has := event.TagsMap["ident"]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
|
||||
target, exists := memsto.TargetCache.Get(ident)
|
||||
// 对于包含ident的告警事件,check一下ident所属bg和rule所属bg是否相同
|
||||
// 如果告警规则选择了只在本BG生效,那其他BG的机器就不能因此规则产生告警
|
||||
if exists && target.GroupId != rule.GroupId {
|
||||
logger.Debugf("[%T] mute: rule_eval:%d cluster:%s", s, rule.Id, event.Cluster)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type EventMuteStrategy struct{}
|
||||
|
||||
var EventMuteStra = new(EventMuteStrategy)
|
||||
|
||||
func (s *EventMuteStrategy) IsMuted(rule *models.AlertRule, event *models.AlertCurEvent) bool {
|
||||
mutes, has := memsto.AlertMuteCache.Gets(event.GroupId)
|
||||
if !has || len(mutes) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for i := 0; i < len(mutes); i++ {
|
||||
if matchMute(event, mutes[i]) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// matchMute 如果传入了clock这个可选参数,就表示使用这个clock表示的时间,否则就从event的字段中取TriggerTime
|
||||
func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int64) bool {
|
||||
if mute.Disabled == 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
ts := event.TriggerTime
|
||||
if len(clock) > 0 {
|
||||
ts = clock[0]
|
||||
}
|
||||
|
||||
// 如果不是全局的,判断 cluster
|
||||
if mute.Cluster != models.ClusterAll {
|
||||
// mute.Cluster 是一个字符串,可能是多个cluster的组合,比如"cluster1 cluster2"
|
||||
clusters := strings.Fields(mute.Cluster)
|
||||
cm := make(map[string]struct{}, len(clusters))
|
||||
for i := 0; i < len(clusters); i++ {
|
||||
cm[clusters[i]] = struct{}{}
|
||||
}
|
||||
|
||||
// 判断event.Cluster是否包含在cm中
|
||||
if _, has := cm[event.Cluster]; !has {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if ts < mute.Btime || ts > mute.Etime {
|
||||
return false
|
||||
}
|
||||
|
||||
return matchTags(event.TagsMap, mute.ITags)
|
||||
}
|
||||
|
||||
func matchTag(value string, filter models.TagFilter) bool {
|
||||
switch filter.Func {
|
||||
case "==":
|
||||
return filter.Value == value
|
||||
case "!=":
|
||||
return filter.Value != value
|
||||
case "in":
|
||||
_, has := filter.Vset[value]
|
||||
return has
|
||||
case "not in":
|
||||
_, has := filter.Vset[value]
|
||||
return !has
|
||||
case "=~":
|
||||
return filter.Regexp.MatchString(value)
|
||||
case "!~":
|
||||
return !filter.Regexp.MatchString(value)
|
||||
}
|
||||
// unexpect func
|
||||
return false
|
||||
}
|
||||
|
||||
func matchTags(eventTagsMap map[string]string, itags []models.TagFilter) bool {
|
||||
for _, filter := range itags {
|
||||
value, has := eventTagsMap[filter.Key]
|
||||
if !has {
|
||||
return false
|
||||
}
|
||||
if !matchTag(value, filter) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
package engine
|
||||
|
||||
import "github.com/toolkits/pkg/container/list"
|
||||
|
||||
var EventQueue = list.NewSafeListLimited(10000000)
|
||||
166
src/server/engine/rule.go
Normal file
166
src/server/engine/rule.go
Normal file
@@ -0,0 +1,166 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
"github.com/didi/nightingale/v5/src/server/naming"
|
||||
)
|
||||
|
||||
type RuleContext interface {
|
||||
Key() string
|
||||
Hash() string
|
||||
Prepare()
|
||||
Start()
|
||||
Eval()
|
||||
Stop()
|
||||
}
|
||||
|
||||
var ruleHolder = &RuleHolder{
|
||||
alertRules: make(map[string]RuleContext),
|
||||
recordRules: make(map[string]RuleContext),
|
||||
externalAlertRules: make(map[string]*AlertRuleContext),
|
||||
}
|
||||
|
||||
type RuleHolder struct {
|
||||
externalLock sync.RWMutex
|
||||
|
||||
// key: hash
|
||||
alertRules map[string]RuleContext
|
||||
// key: hash
|
||||
recordRules map[string]RuleContext
|
||||
|
||||
// key: key
|
||||
externalAlertRules map[string]*AlertRuleContext
|
||||
}
|
||||
|
||||
func (rh *RuleHolder) LoopSyncRules(ctx context.Context) {
|
||||
time.Sleep(time.Duration(config.C.EngineDelay) * time.Second)
|
||||
duration := 9000 * time.Millisecond
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(duration):
|
||||
rh.SyncAlertRules()
|
||||
rh.SyncRecordRules()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rh *RuleHolder) SyncAlertRules() {
|
||||
ids := memsto.AlertRuleCache.GetRuleIds()
|
||||
alertRules := make(map[string]RuleContext)
|
||||
externalAllRules := make(map[string]*AlertRuleContext)
|
||||
for _, id := range ids {
|
||||
rule := memsto.AlertRuleCache.Get(id)
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// 如果 rule 不是通过 prometheus engine 来告警的,则创建为 externalRule
|
||||
if !rule.IsPrometheusRule() {
|
||||
ruleClusters := strings.Fields(rule.Cluster)
|
||||
for _, cluster := range ruleClusters {
|
||||
// hash ring not hit
|
||||
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
|
||||
continue
|
||||
}
|
||||
|
||||
externalRule := NewAlertRuleContext(rule, cluster)
|
||||
externalAllRules[externalRule.Key()] = externalRule
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
ruleClusters := config.ReaderClients.Hit(rule.Cluster)
|
||||
for _, cluster := range ruleClusters {
|
||||
// hash ring not hit
|
||||
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
|
||||
continue
|
||||
}
|
||||
|
||||
alertRule := NewAlertRuleContext(rule, cluster)
|
||||
alertRules[alertRule.Hash()] = alertRule
|
||||
}
|
||||
}
|
||||
|
||||
for hash, rule := range alertRules {
|
||||
if _, has := rh.alertRules[hash]; !has {
|
||||
rule.Prepare()
|
||||
rule.Start()
|
||||
rh.alertRules[hash] = rule
|
||||
}
|
||||
}
|
||||
|
||||
for hash, rule := range rh.alertRules {
|
||||
if _, has := alertRules[hash]; !has {
|
||||
rule.Stop()
|
||||
delete(rh.alertRules, hash)
|
||||
}
|
||||
}
|
||||
|
||||
for hash, rule := range externalAllRules {
|
||||
rh.externalLock.Lock()
|
||||
if _, has := rh.externalAlertRules[hash]; !has {
|
||||
rule.Prepare()
|
||||
rh.externalAlertRules[hash] = rule
|
||||
}
|
||||
rh.externalLock.Unlock()
|
||||
}
|
||||
|
||||
rh.externalLock.Lock()
|
||||
for hash := range rh.externalAlertRules {
|
||||
if _, has := externalAllRules[hash]; !has {
|
||||
delete(rh.externalAlertRules, hash)
|
||||
}
|
||||
}
|
||||
rh.externalLock.Unlock()
|
||||
}
|
||||
|
||||
func (rh *RuleHolder) SyncRecordRules() {
|
||||
ids := memsto.RecordingRuleCache.GetRuleIds()
|
||||
recordRules := make(map[string]RuleContext)
|
||||
for _, id := range ids {
|
||||
rule := memsto.RecordingRuleCache.Get(id)
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
ruleClusters := config.ReaderClients.Hit(rule.Cluster)
|
||||
for _, cluster := range ruleClusters {
|
||||
if !naming.ClusterHashRing.IsHit(cluster, fmt.Sprintf("%d", rule.Id), config.C.Heartbeat.Endpoint) {
|
||||
continue
|
||||
}
|
||||
recordRule := NewRecordRuleContext(rule, cluster)
|
||||
recordRules[recordRule.Hash()] = recordRule
|
||||
}
|
||||
}
|
||||
|
||||
for hash, rule := range recordRules {
|
||||
if _, has := rh.recordRules[hash]; !has {
|
||||
rule.Prepare()
|
||||
rule.Start()
|
||||
rh.recordRules[hash] = rule
|
||||
}
|
||||
}
|
||||
|
||||
for hash, rule := range rh.recordRules {
|
||||
if _, has := recordRules[hash]; !has {
|
||||
rule.Stop()
|
||||
delete(rh.recordRules, hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetExternalAlertRule(cluster string, id int64) (*AlertRuleContext, bool) {
|
||||
key := fmt.Sprintf("alert-%s-%d", cluster, id)
|
||||
ruleHolder.externalLock.RLock()
|
||||
defer ruleHolder.externalLock.RUnlock()
|
||||
rule, has := ruleHolder.externalAlertRules[key]
|
||||
return rule, has
|
||||
}
|
||||
300
src/server/engine/rule_alert.go
Normal file
300
src/server/engine/rule_alert.go
Normal file
@@ -0,0 +1,300 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/pkg/prom"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
)
|
||||
|
||||
type AlertRuleContext struct {
|
||||
cluster string
|
||||
quit chan struct{}
|
||||
|
||||
rule *models.AlertRule
|
||||
fires *AlertCurEventMap
|
||||
pendings *AlertCurEventMap
|
||||
}
|
||||
|
||||
func NewAlertRuleContext(rule *models.AlertRule, cluster string) *AlertRuleContext {
|
||||
return &AlertRuleContext{
|
||||
cluster: cluster,
|
||||
quit: make(chan struct{}),
|
||||
rule: rule,
|
||||
}
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) RuleFromCache() *models.AlertRule {
|
||||
return memsto.AlertRuleCache.Get(arc.rule.Id)
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Key() string {
|
||||
return fmt.Sprintf("alert-%s-%d", arc.cluster, arc.rule.Id)
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Hash() string {
|
||||
return str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
arc.rule.Id,
|
||||
arc.rule.PromEvalInterval,
|
||||
arc.rule.PromQl,
|
||||
arc.cluster,
|
||||
))
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Prepare() {
|
||||
arc.recoverAlertCurEventFromDb()
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Start() {
|
||||
logger.Infof("eval:%s started", arc.Key())
|
||||
interval := arc.rule.PromEvalInterval
|
||||
if interval <= 0 {
|
||||
interval = 10
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-arc.quit:
|
||||
return
|
||||
default:
|
||||
arc.Eval()
|
||||
time.Sleep(time.Duration(interval) * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Eval() {
|
||||
promql := strings.TrimSpace(arc.rule.PromQl)
|
||||
if promql == "" {
|
||||
logger.Errorf("rule_eval:%s promql is blank", arc.Key())
|
||||
return
|
||||
}
|
||||
|
||||
if config.ReaderClients.IsNil(arc.cluster) {
|
||||
logger.Errorf("rule_eval:%s error reader client is nil", arc.Key())
|
||||
return
|
||||
}
|
||||
|
||||
readerClient := config.ReaderClients.GetCli(arc.cluster)
|
||||
|
||||
var value model.Value
|
||||
var err error
|
||||
|
||||
cachedRule := arc.RuleFromCache()
|
||||
if cachedRule == nil {
|
||||
logger.Errorf("rule_eval:%s rule not found", arc.Key())
|
||||
return
|
||||
}
|
||||
|
||||
// 如果是单个goroutine执行, 完全可以考虑把cachedRule赋值给arc.rule, 不会有问题
|
||||
// 但是在externalRule的场景中, 会调用HandleVectors/RecoverSingle;就行不通了,还是在需要的时候从cache中拿rule吧
|
||||
// arc.rule = cachedRule
|
||||
|
||||
// 如果cache中的规则由prometheus规则改为其他类型,也没必要再去prometheus查询了
|
||||
if cachedRule.IsPrometheusRule() {
|
||||
var warnings prom.Warnings
|
||||
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%s promql:%s, error:%v", arc.Key(), promql, err)
|
||||
//notifyToMaintainer(err, "failed to query prometheus")
|
||||
Report(QueryPrometheusError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(warnings) > 0 {
|
||||
logger.Errorf("rule_eval:%s promql:%s, warnings:%v", arc.Key(), promql, warnings)
|
||||
return
|
||||
}
|
||||
logger.Debugf("rule_eval:%s promql:%s, value:%v", arc.Key(), promql, value)
|
||||
}
|
||||
arc.HandleVectors(conv.ConvertVectors(value), "inner")
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) HandleVectors(vectors []conv.Vector, from string) {
|
||||
// 有可能rule的一些配置已经发生变化,比如告警接收人、callbacks等
|
||||
// 这些信息的修改是不会引起worker restart的,但是确实会影响告警处理逻辑
|
||||
// 所以,这里直接从memsto.AlertRuleCache中获取并覆盖
|
||||
cachedRule := arc.RuleFromCache()
|
||||
if cachedRule == nil {
|
||||
logger.Errorf("rule_eval:%s rule not found", arc.Key())
|
||||
return
|
||||
}
|
||||
now := time.Now().Unix()
|
||||
alertingKeys := map[string]struct{}{}
|
||||
for _, vector := range vectors {
|
||||
alertVector := NewAlertVector(arc, cachedRule, vector, from)
|
||||
event := alertVector.BuildEvent(now)
|
||||
// 如果event被mute了,本质也是fire的状态,这里无论如何都添加到alertingKeys中,防止fire的事件自动恢复了
|
||||
alertingKeys[alertVector.Hash()] = struct{}{}
|
||||
if AlertMuteStrategies.IsMuted(cachedRule, event) {
|
||||
logger.Debugf("rule_eval:%s event:%+v is muted", arc.Key(), event)
|
||||
continue
|
||||
}
|
||||
arc.handleEvent(event)
|
||||
}
|
||||
|
||||
arc.HandleRecover(alertingKeys, now)
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) HandleRecover(alertingKeys map[string]struct{}, now int64) {
|
||||
for _, hash := range arc.pendings.Keys() {
|
||||
if _, has := alertingKeys[hash]; has {
|
||||
continue
|
||||
}
|
||||
arc.pendings.Delete(hash)
|
||||
}
|
||||
|
||||
for hash := range arc.fires.GetAll() {
|
||||
if _, has := alertingKeys[hash]; has {
|
||||
continue
|
||||
}
|
||||
arc.RecoverSingle(hash, now, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) RecoverSingle(hash string, now int64, value *string) {
|
||||
cachedRule := arc.RuleFromCache()
|
||||
if cachedRule == nil {
|
||||
logger.Errorf("rule_eval:%s rule not found", arc.Key())
|
||||
return
|
||||
}
|
||||
event, has := arc.fires.Get(hash)
|
||||
if !has {
|
||||
return
|
||||
}
|
||||
// 如果配置了留观时长,就不能立马恢复了
|
||||
if cachedRule.RecoverDuration > 0 && now-event.LastEvalTime < cachedRule.RecoverDuration {
|
||||
return
|
||||
}
|
||||
if value != nil {
|
||||
event.TriggerValue = *value
|
||||
}
|
||||
|
||||
// 没查到触发阈值的vector,姑且就认为这个vector的值恢复了
|
||||
// 我确实无法分辨,是prom中有值但是未满足阈值所以没返回,还是prom中确实丢了一些点导致没有数据可以返回,尴尬
|
||||
arc.fires.Delete(hash)
|
||||
arc.pendings.Delete(hash)
|
||||
|
||||
// 可能是因为调整了promql才恢复的,所以事件里边要体现最新的promql,否则用户会比较困惑
|
||||
// 当然,其实rule的各个字段都可能发生变化了,都更新一下吧
|
||||
cachedRule.UpdateEvent(event)
|
||||
event.IsRecovered = true
|
||||
event.LastEvalTime = now
|
||||
arc.pushEventToQueue(event)
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) handleEvent(event *models.AlertCurEvent) {
|
||||
if event == nil {
|
||||
logger.Debugf("rule_eval:%s event:%+v is nil", arc.Key(), event)
|
||||
return
|
||||
}
|
||||
if event.PromForDuration == 0 {
|
||||
arc.fireEvent(event)
|
||||
return
|
||||
}
|
||||
|
||||
var preTriggerTime int64
|
||||
preEvent, has := arc.pendings.Get(event.Hash)
|
||||
if has {
|
||||
arc.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
|
||||
preTriggerTime = preEvent.TriggerTime
|
||||
} else {
|
||||
arc.pendings.Set(event.Hash, event)
|
||||
preTriggerTime = event.TriggerTime
|
||||
}
|
||||
|
||||
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
|
||||
arc.fireEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) fireEvent(event *models.AlertCurEvent) {
|
||||
// As arc.rule maybe outdated, use rule from cache
|
||||
cachedRule := arc.RuleFromCache()
|
||||
if cachedRule == nil {
|
||||
logger.Errorf("rule_eval:%s event:%+v is nil", arc.Key(), event)
|
||||
return
|
||||
}
|
||||
if fired, has := arc.fires.Get(event.Hash); has {
|
||||
arc.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
|
||||
|
||||
if cachedRule.NotifyRepeatStep == 0 {
|
||||
// 说明不想重复通知,那就直接返回了,nothing to do
|
||||
logger.Debugf("rule_eval:%s event:%+v nothing to do", arc.Key(), event)
|
||||
return
|
||||
}
|
||||
|
||||
// 之前发送过告警了,这次是否要继续发送,要看是否过了通道静默时间
|
||||
if event.LastEvalTime > fired.LastSentTime+int64(cachedRule.NotifyRepeatStep)*60 {
|
||||
if cachedRule.NotifyMaxNumber == 0 {
|
||||
// 最大可以发送次数如果是0,表示不想限制最大发送次数,一直发即可
|
||||
event.NotifyCurNumber = fired.NotifyCurNumber + 1
|
||||
event.FirstTriggerTime = fired.FirstTriggerTime
|
||||
arc.pushEventToQueue(event)
|
||||
} else {
|
||||
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
|
||||
if fired.NotifyCurNumber >= cachedRule.NotifyMaxNumber {
|
||||
logger.Debugf("rule_eval:%s event:%+v notify to max number", arc.Key(), event)
|
||||
return
|
||||
} else {
|
||||
event.NotifyCurNumber = fired.NotifyCurNumber + 1
|
||||
event.FirstTriggerTime = fired.FirstTriggerTime
|
||||
arc.pushEventToQueue(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
event.NotifyCurNumber = 1
|
||||
event.FirstTriggerTime = event.TriggerTime
|
||||
arc.pushEventToQueue(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) pushEventToQueue(event *models.AlertCurEvent) {
|
||||
if !event.IsRecovered {
|
||||
event.LastSentTime = event.LastEvalTime
|
||||
arc.fires.Set(event.Hash, event)
|
||||
}
|
||||
|
||||
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
|
||||
LogEvent(event, "push_queue")
|
||||
if !EventQueue.PushFront(event) {
|
||||
logger.Warningf("event_push_queue: queue is full, event:%+v", event)
|
||||
}
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) Stop() {
|
||||
logger.Infof("%s stopped", arc.Key())
|
||||
close(arc.quit)
|
||||
}
|
||||
|
||||
func (arc *AlertRuleContext) recoverAlertCurEventFromDb() {
|
||||
arc.pendings = NewAlertCurEventMap(nil)
|
||||
|
||||
curEvents, err := models.AlertCurEventGetByRuleIdAndCluster(arc.rule.Id, arc.cluster)
|
||||
if err != nil {
|
||||
logger.Errorf("recover event from db for rule:%s failed, err:%s", arc.Key(), err)
|
||||
arc.fires = NewAlertCurEventMap(nil)
|
||||
return
|
||||
}
|
||||
|
||||
fireMap := make(map[string]*models.AlertCurEvent)
|
||||
for _, event := range curEvents {
|
||||
event.DB2Mem()
|
||||
fireMap[event.Hash] = event
|
||||
}
|
||||
|
||||
arc.fires = NewAlertCurEventMap(fireMap)
|
||||
}
|
||||
189
src/server/engine/rule_helper.go
Normal file
189
src/server/engine/rule_helper.go
Normal file
@@ -0,0 +1,189 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
)
|
||||
|
||||
type AlertCurEventMap struct {
|
||||
sync.RWMutex
|
||||
Data map[string]*models.AlertCurEvent
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
a.Data = data
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
a.Data[key] = value
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
event, exists := a.Data[key]
|
||||
return event, exists
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
event, exists := a.Data[key]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
event.LastEvalTime = lastEvalTime
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Delete(key string) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
delete(a.Data, key)
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Keys() []string {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
keys := make([]string, 0, len(a.Data))
|
||||
for k := range a.Data {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
return a.Data
|
||||
}
|
||||
|
||||
func NewAlertCurEventMap(data map[string]*models.AlertCurEvent) *AlertCurEventMap {
|
||||
if data == nil {
|
||||
return &AlertCurEventMap{
|
||||
Data: make(map[string]*models.AlertCurEvent),
|
||||
}
|
||||
}
|
||||
return &AlertCurEventMap{
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
// AlertVector 包含一个告警事件的告警上下文
|
||||
type AlertVector struct {
|
||||
Ctx *AlertRuleContext
|
||||
Rule *models.AlertRule
|
||||
Vector conv.Vector
|
||||
From string
|
||||
|
||||
tagsMap map[string]string
|
||||
tagsArr []string
|
||||
target string
|
||||
targetNote string
|
||||
groupName string
|
||||
}
|
||||
|
||||
func NewAlertVector(ctx *AlertRuleContext, rule *models.AlertRule, vector conv.Vector, from string) *AlertVector {
|
||||
if rule == nil {
|
||||
rule = ctx.rule
|
||||
}
|
||||
av := &AlertVector{
|
||||
Ctx: ctx,
|
||||
Rule: rule,
|
||||
Vector: vector,
|
||||
From: from,
|
||||
}
|
||||
av.fillTags()
|
||||
av.mayHandleIdent()
|
||||
av.mayHandleGroup()
|
||||
return av
|
||||
}
|
||||
|
||||
func (av *AlertVector) Hash() string {
|
||||
return str.MD5(fmt.Sprintf("%d_%s_%s", av.Rule.Id, av.Vector.Key, av.Ctx.cluster))
|
||||
}
|
||||
|
||||
func (av *AlertVector) fillTags() {
|
||||
// handle series tags
|
||||
tagsMap := make(map[string]string)
|
||||
for label, value := range av.Vector.Labels {
|
||||
tagsMap[string(label)] = string(value)
|
||||
}
|
||||
|
||||
// handle rule tags
|
||||
for _, tag := range av.Rule.AppendTagsJSON {
|
||||
arr := strings.SplitN(tag, "=", 2)
|
||||
tagsMap[arr[0]] = arr[1]
|
||||
}
|
||||
|
||||
tagsMap["rulename"] = av.Rule.Name
|
||||
av.tagsMap = tagsMap
|
||||
|
||||
// handle tagsArr
|
||||
av.tagsArr = labelMapToArr(tagsMap)
|
||||
}
|
||||
|
||||
func (av *AlertVector) mayHandleIdent() {
|
||||
// handle ident
|
||||
if ident, has := av.tagsMap["ident"]; has {
|
||||
if target, exists := memsto.TargetCache.Get(ident); exists {
|
||||
av.target = target.Ident
|
||||
av.targetNote = target.Note
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (av *AlertVector) mayHandleGroup() {
|
||||
// handle bg
|
||||
bg := memsto.BusiGroupCache.GetByBusiGroupId(av.Rule.GroupId)
|
||||
if bg != nil {
|
||||
av.groupName = bg.Name
|
||||
}
|
||||
}
|
||||
|
||||
func (av *AlertVector) BuildEvent(now int64) *models.AlertCurEvent {
|
||||
event := av.Rule.GenerateNewEvent()
|
||||
event.TriggerTime = av.Vector.Timestamp
|
||||
event.TagsMap = av.tagsMap
|
||||
event.Cluster = av.Ctx.cluster
|
||||
event.Hash = av.Hash()
|
||||
event.TargetIdent = av.target
|
||||
event.TargetNote = av.targetNote
|
||||
event.TriggerValue = av.Vector.ReadableValue()
|
||||
event.TagsJSON = av.tagsArr
|
||||
event.GroupName = av.groupName
|
||||
event.Tags = strings.Join(av.tagsArr, ",,")
|
||||
event.IsRecovered = false
|
||||
|
||||
if av.From == "inner" {
|
||||
event.LastEvalTime = now
|
||||
} else {
|
||||
event.LastEvalTime = event.TriggerTime
|
||||
}
|
||||
return event
|
||||
}
|
||||
|
||||
func labelMapToArr(m map[string]string) []string {
|
||||
numLabels := len(m)
|
||||
|
||||
labelStrings := make([]string, 0, numLabels)
|
||||
for label, value := range m {
|
||||
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", label, value))
|
||||
}
|
||||
|
||||
if numLabels > 1 {
|
||||
sort.Strings(labelStrings)
|
||||
}
|
||||
return labelStrings
|
||||
}
|
||||
100
src/server/engine/rule_record.go
Normal file
100
src/server/engine/rule_record.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/writer"
|
||||
)
|
||||
|
||||
type RecordRuleContext struct {
|
||||
cluster string
|
||||
quit chan struct{}
|
||||
|
||||
rule *models.RecordingRule
|
||||
}
|
||||
|
||||
func NewRecordRuleContext(rule *models.RecordingRule, cluster string) *RecordRuleContext {
|
||||
return &RecordRuleContext{
|
||||
cluster: cluster,
|
||||
quit: make(chan struct{}),
|
||||
rule: rule,
|
||||
}
|
||||
}
|
||||
|
||||
func (rrc *RecordRuleContext) Key() string {
|
||||
return fmt.Sprintf("record-%s-%d", rrc.cluster, rrc.rule.Id)
|
||||
}
|
||||
|
||||
func (rrc *RecordRuleContext) Hash() string {
|
||||
return str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
rrc.rule.Id,
|
||||
rrc.rule.PromEvalInterval,
|
||||
rrc.rule.PromQl,
|
||||
rrc.cluster,
|
||||
))
|
||||
}
|
||||
|
||||
func (rrc *RecordRuleContext) Prepare() {}
|
||||
|
||||
func (rrc *RecordRuleContext) Start() {
|
||||
logger.Infof("eval:%s started", rrc.Key())
|
||||
interval := rrc.rule.PromEvalInterval
|
||||
if interval <= 0 {
|
||||
interval = 10
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-rrc.quit:
|
||||
return
|
||||
default:
|
||||
rrc.Eval()
|
||||
time.Sleep(time.Duration(interval) * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (rrc *RecordRuleContext) Eval() {
|
||||
promql := strings.TrimSpace(rrc.rule.PromQl)
|
||||
if promql == "" {
|
||||
logger.Errorf("eval:%s promql is blank", rrc.Key())
|
||||
return
|
||||
}
|
||||
|
||||
if config.ReaderClients.IsNil(rrc.cluster) {
|
||||
logger.Errorf("eval:%s reader client is nil", rrc.Key())
|
||||
return
|
||||
}
|
||||
|
||||
value, warnings, err := config.ReaderClients.GetCli(rrc.cluster).Query(context.Background(), promql, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("eval:%d promql:%s, error:%v", rrc.Key(), promql, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(warnings) > 0 {
|
||||
logger.Errorf("eval:%d promql:%s, warnings:%v", rrc.Key(), promql, warnings)
|
||||
return
|
||||
}
|
||||
ts := conv.ConvertToTimeSeries(value, rrc.rule)
|
||||
if len(ts) != 0 {
|
||||
for _, v := range ts {
|
||||
writer.Writers.PushSample(rrc.rule.Name, v, rrc.cluster)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rrc *RecordRuleContext) Stop() {
|
||||
logger.Infof("%s stopped", rrc.Key())
|
||||
close(rrc.quit)
|
||||
}
|
||||
@@ -1,830 +0,0 @@
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/server/writer"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/pkg/prom"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/memsto"
|
||||
"github.com/didi/nightingale/v5/src/server/naming"
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
)
|
||||
|
||||
func loopFilterRules(ctx context.Context) {
|
||||
// wait for samples
|
||||
time.Sleep(time.Duration(config.C.EngineDelay) * time.Second)
|
||||
|
||||
duration := time.Duration(9000) * time.Millisecond
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(duration):
|
||||
filterRules()
|
||||
filterRecordingRules()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 一个规则可能会在多个集群中生效,所以这里要把规则拆分成多个,此结构记录 id 和 cluster 的对应关系
|
||||
type RuleSimpleInfo struct {
|
||||
Id int64
|
||||
Cluster string
|
||||
}
|
||||
|
||||
func filterRules() {
|
||||
ids := memsto.AlertRuleCache.GetRuleIds()
|
||||
logger.Debugf("AlertRuleCache.GetRuleIds success, ids.len: %d", len(ids))
|
||||
|
||||
count := len(ids)
|
||||
mines := make([]*RuleSimpleInfo, 0, count)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
rule := memsto.AlertRuleCache.Get(ids[i])
|
||||
if rule == nil {
|
||||
logger.Debugf("AlertRuleCache.Get(%d) failed", ids[i])
|
||||
continue
|
||||
}
|
||||
|
||||
var clusters []string
|
||||
if rule.Cluster == models.ClusterAll {
|
||||
clusters = config.ReaderClients.GetClusterNames()
|
||||
} else {
|
||||
clusters = strings.Fields(rule.Cluster)
|
||||
}
|
||||
|
||||
for _, cluster := range clusters {
|
||||
if config.ReaderClients.IsNil(cluster) {
|
||||
// 没有这个集群的配置,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
|
||||
if err != nil {
|
||||
logger.Warningf("rid:%d cluster:%s failed to get node from hashring:%v", ids[i], cluster, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if node == config.C.Heartbeat.Endpoint {
|
||||
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Workers.Build(mines)
|
||||
RuleEvalForExternal.Build()
|
||||
}
|
||||
|
||||
type RuleEval struct {
|
||||
cluster string
|
||||
rule *models.AlertRule
|
||||
fires *AlertCurEventMap
|
||||
pendings *AlertCurEventMap
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
type AlertCurEventMap struct {
|
||||
sync.RWMutex
|
||||
Data map[string]*models.AlertCurEvent
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) SetAll(data map[string]*models.AlertCurEvent) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
a.Data = data
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Set(key string, value *models.AlertCurEvent) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
a.Data[key] = value
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Get(key string) (*models.AlertCurEvent, bool) {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
event, exists := a.Data[key]
|
||||
return event, exists
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) UpdateLastEvalTime(key string, lastEvalTime int64) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
event, exists := a.Data[key]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
event.LastEvalTime = lastEvalTime
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Delete(key string) {
|
||||
a.Lock()
|
||||
defer a.Unlock()
|
||||
delete(a.Data, key)
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) Keys() []string {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
keys := make([]string, 0, len(a.Data))
|
||||
for k := range a.Data {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (a *AlertCurEventMap) GetAll() map[string]*models.AlertCurEvent {
|
||||
a.RLock()
|
||||
defer a.RUnlock()
|
||||
return a.Data
|
||||
}
|
||||
|
||||
func NewAlertCurEventMap() *AlertCurEventMap {
|
||||
return &AlertCurEventMap{
|
||||
Data: make(map[string]*models.AlertCurEvent),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) Stop() {
|
||||
logger.Infof("rule_eval:%d stopping", r.RuleID())
|
||||
close(r.quit)
|
||||
}
|
||||
|
||||
func (r *RuleEval) RuleID() int64 {
|
||||
return r.rule.Id
|
||||
}
|
||||
|
||||
func (r *RuleEval) Start() {
|
||||
logger.Infof("rule_eval:%d started", r.RuleID())
|
||||
for {
|
||||
select {
|
||||
case <-r.quit:
|
||||
// logger.Infof("rule_eval:%d stopped", r.RuleID())
|
||||
return
|
||||
default:
|
||||
r.Work()
|
||||
logger.Debugf("rule executed, rule_eval:%d", r.RuleID())
|
||||
interval := r.rule.PromEvalInterval
|
||||
if interval <= 0 {
|
||||
interval = 10
|
||||
}
|
||||
time.Sleep(time.Duration(interval) * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) Work() {
|
||||
promql := strings.TrimSpace(r.rule.PromQl)
|
||||
if promql == "" {
|
||||
logger.Errorf("rule_eval:%d promql is blank", r.RuleID())
|
||||
return
|
||||
}
|
||||
|
||||
if config.ReaderClients.IsNil(r.cluster) {
|
||||
logger.Error("reader client is nil")
|
||||
return
|
||||
}
|
||||
|
||||
readerClient := config.ReaderClients.GetCli(r.cluster)
|
||||
|
||||
var value model.Value
|
||||
var err error
|
||||
if r.rule.Algorithm == "" && (r.rule.Cate == "" || strings.ToLower(r.rule.Cate) == "prometheus") {
|
||||
var warnings prom.Warnings
|
||||
value, warnings, err = readerClient.Query(context.Background(), promql, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
|
||||
//notifyToMaintainer(err, "failed to query prometheus")
|
||||
Report(QueryPrometheusError)
|
||||
return
|
||||
}
|
||||
|
||||
if len(warnings) > 0 {
|
||||
logger.Errorf("rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
|
||||
return
|
||||
}
|
||||
logger.Debugf("rule_eval:%d promql:%s, value:%v", r.RuleID(), promql, value)
|
||||
}
|
||||
|
||||
r.Judge(r.cluster, conv.ConvertVectors(value))
|
||||
}
|
||||
|
||||
type WorkersType struct {
|
||||
rules map[string]*RuleEval
|
||||
recordRules map[string]RecordingRuleEval
|
||||
}
|
||||
|
||||
var Workers = &WorkersType{rules: make(map[string]*RuleEval), recordRules: make(map[string]RecordingRuleEval)}
|
||||
|
||||
func (ws *WorkersType) Build(ris []*RuleSimpleInfo) {
|
||||
rules := make(map[string]*RuleSimpleInfo)
|
||||
|
||||
for i := 0; i < len(ris); i++ {
|
||||
rule := memsto.AlertRuleCache.Get(ris[i].Id)
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
rule.Id,
|
||||
rule.PromEvalInterval,
|
||||
rule.PromQl,
|
||||
ris[i].Cluster,
|
||||
))
|
||||
|
||||
rules[hash] = ris[i]
|
||||
}
|
||||
|
||||
// stop old
|
||||
for hash := range Workers.rules {
|
||||
if _, has := rules[hash]; !has {
|
||||
Workers.rules[hash].Stop()
|
||||
delete(Workers.rules, hash)
|
||||
}
|
||||
}
|
||||
|
||||
// start new
|
||||
for hash := range rules {
|
||||
if _, has := Workers.rules[hash]; has {
|
||||
// already exists
|
||||
continue
|
||||
}
|
||||
|
||||
elst, err := models.AlertCurEventGetByRuleIdAndCluster(rules[hash].Id, rules[hash].Cluster)
|
||||
if err != nil {
|
||||
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
firemap := make(map[string]*models.AlertCurEvent)
|
||||
for i := 0; i < len(elst); i++ {
|
||||
elst[i].DB2Mem()
|
||||
firemap[elst[i].Hash] = elst[i]
|
||||
}
|
||||
fires := NewAlertCurEventMap()
|
||||
fires.SetAll(firemap)
|
||||
re := &RuleEval{
|
||||
rule: memsto.AlertRuleCache.Get(rules[hash].Id),
|
||||
quit: make(chan struct{}),
|
||||
fires: fires,
|
||||
pendings: NewAlertCurEventMap(),
|
||||
cluster: rules[hash].Cluster,
|
||||
}
|
||||
|
||||
go re.Start()
|
||||
Workers.rules[hash] = re
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WorkersType) BuildRe(ris []*RuleSimpleInfo) {
|
||||
rules := make(map[string]*RuleSimpleInfo)
|
||||
|
||||
for i := 0; i < len(ris); i++ {
|
||||
rule := memsto.RecordingRuleCache.Get(ris[i].Id)
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
rule.Id,
|
||||
rule.PromEvalInterval,
|
||||
rule.PromQl,
|
||||
ris[i].Cluster,
|
||||
))
|
||||
|
||||
rules[hash] = ris[i]
|
||||
}
|
||||
|
||||
// stop old
|
||||
for hash := range Workers.recordRules {
|
||||
if _, has := rules[hash]; !has {
|
||||
Workers.recordRules[hash].Stop()
|
||||
delete(Workers.recordRules, hash)
|
||||
}
|
||||
}
|
||||
|
||||
// start new
|
||||
for hash := range rules {
|
||||
if _, has := Workers.recordRules[hash]; has {
|
||||
// already exists
|
||||
continue
|
||||
}
|
||||
re := RecordingRuleEval{
|
||||
rule: memsto.RecordingRuleCache.Get(rules[hash].Id),
|
||||
quit: make(chan struct{}),
|
||||
cluster: rules[hash].Cluster,
|
||||
}
|
||||
|
||||
go re.Start()
|
||||
Workers.recordRules[hash] = re
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) Judge(clusterName string, vectors []conv.Vector) {
|
||||
now := time.Now().Unix()
|
||||
|
||||
alertingKeys, ruleExists := r.MakeNewEvent("inner", now, clusterName, vectors)
|
||||
if !ruleExists {
|
||||
return
|
||||
}
|
||||
|
||||
// handle recovered events
|
||||
r.recoverRule(alertingKeys, now)
|
||||
}
|
||||
|
||||
func (r *RuleEval) MakeNewEvent(from string, now int64, clusterName string, vectors []conv.Vector) (map[string]struct{}, bool) {
|
||||
// 有可能rule的一些配置已经发生变化,比如告警接收人、callbacks等
|
||||
// 这些信息的修改是不会引起worker restart的,但是确实会影响告警处理逻辑
|
||||
// 所以,这里直接从memsto.AlertRuleCache中获取并覆盖
|
||||
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
|
||||
if curRule == nil {
|
||||
return map[string]struct{}{}, false
|
||||
}
|
||||
|
||||
r.rule = curRule
|
||||
|
||||
count := len(vectors)
|
||||
alertingKeys := make(map[string]struct{})
|
||||
for i := 0; i < count; i++ {
|
||||
// compute hash
|
||||
hash := str.MD5(fmt.Sprintf("%d_%s_%s", r.rule.Id, vectors[i].Key, r.cluster))
|
||||
alertingKeys[hash] = struct{}{}
|
||||
|
||||
// rule disabled in this time span?
|
||||
if isNoneffective(vectors[i].Timestamp, r.rule) {
|
||||
logger.Debugf("event_disabled: rule_eval:%d rule:%v timestamp:%d", r.rule.Id, r.rule, vectors[i].Timestamp)
|
||||
continue
|
||||
}
|
||||
|
||||
// handle series tags
|
||||
tagsMap := make(map[string]string)
|
||||
for label, value := range vectors[i].Labels {
|
||||
tagsMap[string(label)] = string(value)
|
||||
}
|
||||
|
||||
// handle rule tags
|
||||
for _, tag := range r.rule.AppendTagsJSON {
|
||||
arr := strings.SplitN(tag, "=", 2)
|
||||
tagsMap[arr[0]] = arr[1]
|
||||
}
|
||||
|
||||
tagsMap["rulename"] = r.rule.Name
|
||||
|
||||
// handle target note
|
||||
targetIdent, has := tagsMap["ident"]
|
||||
targetNote := ""
|
||||
if has {
|
||||
target, exists := memsto.TargetCache.Get(string(targetIdent))
|
||||
if exists {
|
||||
targetNote = target.Note
|
||||
|
||||
// 对于包含ident的告警事件,check一下ident所属bg和rule所属bg是否相同
|
||||
// 如果告警规则选择了只在本BG生效,那其他BG的机器就不能因此规则产生告警
|
||||
if r.rule.EnableInBG == 1 && target.GroupId != r.rule.GroupId {
|
||||
logger.Debugf("event_enable_in_bg: rule_eval:%d", r.rule.Id)
|
||||
continue
|
||||
}
|
||||
} else if strings.Contains(r.rule.PromQl, "target_up") {
|
||||
// target 已经不存在了,可能是被删除了
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
event := &models.AlertCurEvent{
|
||||
TriggerTime: vectors[i].Timestamp,
|
||||
TagsMap: tagsMap,
|
||||
GroupId: r.rule.GroupId,
|
||||
RuleName: r.rule.Name,
|
||||
Cluster: clusterName,
|
||||
}
|
||||
|
||||
bg := memsto.BusiGroupCache.GetByBusiGroupId(r.rule.GroupId)
|
||||
if bg != nil {
|
||||
event.GroupName = bg.Name
|
||||
}
|
||||
|
||||
// isMuted need TriggerTime RuleName TagsMap and clusterName
|
||||
if IsMuted(event) {
|
||||
logger.Infof("event_muted: rule_id=%d %s", r.rule.Id, vectors[i].Key)
|
||||
continue
|
||||
}
|
||||
|
||||
tagsArr := labelMapToArr(tagsMap)
|
||||
sort.Strings(tagsArr)
|
||||
|
||||
event.Cate = r.rule.Cate
|
||||
event.Hash = hash
|
||||
event.RuleId = r.rule.Id
|
||||
event.RuleName = r.rule.Name
|
||||
event.RuleNote = r.rule.Note
|
||||
event.RuleProd = r.rule.Prod
|
||||
event.RuleAlgo = r.rule.Algorithm
|
||||
event.Severity = r.rule.Severity
|
||||
event.PromForDuration = r.rule.PromForDuration
|
||||
event.PromQl = r.rule.PromQl
|
||||
event.PromEvalInterval = r.rule.PromEvalInterval
|
||||
event.Callbacks = r.rule.Callbacks
|
||||
event.CallbacksJSON = r.rule.CallbacksJSON
|
||||
event.RunbookUrl = r.rule.RunbookUrl
|
||||
event.NotifyRecovered = r.rule.NotifyRecovered
|
||||
event.NotifyChannels = r.rule.NotifyChannels
|
||||
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
|
||||
event.NotifyGroups = r.rule.NotifyGroups
|
||||
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
|
||||
event.TargetIdent = string(targetIdent)
|
||||
event.TargetNote = targetNote
|
||||
event.TriggerValue = readableValue(vectors[i].Value)
|
||||
event.TagsJSON = tagsArr
|
||||
event.Tags = strings.Join(tagsArr, ",,")
|
||||
event.IsRecovered = false
|
||||
event.LastEvalTime = now
|
||||
if from != "inner" {
|
||||
event.LastEvalTime = event.TriggerTime
|
||||
}
|
||||
|
||||
r.handleNewEvent(event)
|
||||
|
||||
}
|
||||
|
||||
return alertingKeys, true
|
||||
}
|
||||
|
||||
func readableValue(value float64) string {
|
||||
ret := fmt.Sprintf("%.5f", value)
|
||||
ret = strings.TrimRight(ret, "0")
|
||||
return strings.TrimRight(ret, ".")
|
||||
}
|
||||
|
||||
func labelMapToArr(m map[string]string) []string {
|
||||
numLabels := len(m)
|
||||
|
||||
labelStrings := make([]string, 0, numLabels)
|
||||
for label, value := range m {
|
||||
labelStrings = append(labelStrings, fmt.Sprintf("%s=%s", label, value))
|
||||
}
|
||||
|
||||
if numLabels > 1 {
|
||||
sort.Strings(labelStrings)
|
||||
}
|
||||
|
||||
return labelStrings
|
||||
}
|
||||
|
||||
func (r *RuleEval) handleNewEvent(event *models.AlertCurEvent) {
|
||||
if event.PromForDuration == 0 {
|
||||
r.fireEvent(event)
|
||||
return
|
||||
}
|
||||
|
||||
var preTriggerTime int64
|
||||
preEvent, has := r.pendings.Get(event.Hash)
|
||||
if has {
|
||||
r.pendings.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
|
||||
preTriggerTime = preEvent.TriggerTime
|
||||
} else {
|
||||
r.pendings.Set(event.Hash, event)
|
||||
preTriggerTime = event.TriggerTime
|
||||
}
|
||||
|
||||
if event.LastEvalTime-preTriggerTime+int64(event.PromEvalInterval) >= int64(event.PromForDuration) {
|
||||
r.fireEvent(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) fireEvent(event *models.AlertCurEvent) {
|
||||
if fired, has := r.fires.Get(event.Hash); has {
|
||||
r.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
|
||||
|
||||
if r.rule.NotifyRepeatStep == 0 {
|
||||
// 说明不想重复通知,那就直接返回了,nothing to do
|
||||
return
|
||||
}
|
||||
|
||||
// 之前发送过告警了,这次是否要继续发送,要看是否过了通道静默时间
|
||||
if event.LastEvalTime > fired.LastSentTime+int64(r.rule.NotifyRepeatStep)*60 {
|
||||
if r.rule.NotifyMaxNumber == 0 {
|
||||
// 最大可以发送次数如果是0,表示不想限制最大发送次数,一直发即可
|
||||
event.NotifyCurNumber = fired.NotifyCurNumber + 1
|
||||
event.FirstTriggerTime = fired.FirstTriggerTime
|
||||
r.pushEventToQueue(event)
|
||||
} else {
|
||||
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
|
||||
if fired.NotifyCurNumber >= r.rule.NotifyMaxNumber {
|
||||
return
|
||||
} else {
|
||||
event.NotifyCurNumber = fired.NotifyCurNumber + 1
|
||||
event.FirstTriggerTime = fired.FirstTriggerTime
|
||||
r.pushEventToQueue(event)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
} else {
|
||||
event.NotifyCurNumber = 1
|
||||
event.FirstTriggerTime = event.TriggerTime
|
||||
r.pushEventToQueue(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) recoverRule(alertingKeys map[string]struct{}, now int64) {
|
||||
for _, hash := range r.pendings.Keys() {
|
||||
if _, has := alertingKeys[hash]; has {
|
||||
continue
|
||||
}
|
||||
r.pendings.Delete(hash)
|
||||
}
|
||||
|
||||
for hash, event := range r.fires.GetAll() {
|
||||
if _, has := alertingKeys[hash]; has {
|
||||
continue
|
||||
}
|
||||
|
||||
r.recoverEvent(hash, event, now)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RuleEval) RecoverEvent(hash string, now int64, value float64) {
|
||||
curRule := memsto.AlertRuleCache.Get(r.rule.Id)
|
||||
if curRule == nil {
|
||||
return
|
||||
}
|
||||
r.rule = curRule
|
||||
|
||||
r.pendings.Delete(hash)
|
||||
event, has := r.fires.Get(hash)
|
||||
if !has {
|
||||
return
|
||||
}
|
||||
|
||||
event.TriggerValue = fmt.Sprintf("%.5f", value)
|
||||
r.recoverEvent(hash, event, now)
|
||||
}
|
||||
|
||||
func (r *RuleEval) recoverEvent(hash string, event *models.AlertCurEvent, now int64) {
|
||||
// 如果配置了留观时长,就不能立马恢复了
|
||||
if r.rule.RecoverDuration > 0 && now-event.LastEvalTime < r.rule.RecoverDuration {
|
||||
return
|
||||
}
|
||||
|
||||
// 没查到触发阈值的vector,姑且就认为这个vector的值恢复了
|
||||
// 我确实无法分辨,是prom中有值但是未满足阈值所以没返回,还是prom中确实丢了一些点导致没有数据可以返回,尴尬
|
||||
r.fires.Delete(hash)
|
||||
r.pendings.Delete(hash)
|
||||
|
||||
event.IsRecovered = true
|
||||
event.LastEvalTime = now
|
||||
// 可能是因为调整了promql才恢复的,所以事件里边要体现最新的promql,否则用户会比较困惑
|
||||
// 当然,其实rule的各个字段都可能发生变化了,都更新一下吧
|
||||
event.RuleName = r.rule.Name
|
||||
event.RuleNote = r.rule.Note
|
||||
event.RuleProd = r.rule.Prod
|
||||
event.RuleAlgo = r.rule.Algorithm
|
||||
event.Severity = r.rule.Severity
|
||||
event.PromForDuration = r.rule.PromForDuration
|
||||
event.PromQl = r.rule.PromQl
|
||||
event.PromEvalInterval = r.rule.PromEvalInterval
|
||||
event.Callbacks = r.rule.Callbacks
|
||||
event.CallbacksJSON = r.rule.CallbacksJSON
|
||||
event.RunbookUrl = r.rule.RunbookUrl
|
||||
event.NotifyRecovered = r.rule.NotifyRecovered
|
||||
event.NotifyChannels = r.rule.NotifyChannels
|
||||
event.NotifyChannelsJSON = r.rule.NotifyChannelsJSON
|
||||
event.NotifyGroups = r.rule.NotifyGroups
|
||||
event.NotifyGroupsJSON = r.rule.NotifyGroupsJSON
|
||||
r.pushEventToQueue(event)
|
||||
}
|
||||
|
||||
func (r *RuleEval) pushEventToQueue(event *models.AlertCurEvent) {
|
||||
if !event.IsRecovered {
|
||||
event.LastSentTime = event.LastEvalTime
|
||||
r.fires.Set(event.Hash, event)
|
||||
}
|
||||
|
||||
promstat.CounterAlertsTotal.WithLabelValues(event.Cluster).Inc()
|
||||
LogEvent(event, "push_queue")
|
||||
if !EventQueue.PushFront(event) {
|
||||
logger.Warningf("event_push_queue: queue is full")
|
||||
}
|
||||
}
|
||||
|
||||
func filterRecordingRules() {
|
||||
ids := memsto.RecordingRuleCache.GetRuleIds()
|
||||
|
||||
count := len(ids)
|
||||
mines := make([]*RuleSimpleInfo, 0, count)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
rule := memsto.RecordingRuleCache.Get(ids[i])
|
||||
if rule == nil {
|
||||
logger.Debugf("rule %d not found", ids[i])
|
||||
continue
|
||||
}
|
||||
|
||||
var clusters []string
|
||||
if rule.Cluster == models.ClusterAll {
|
||||
clusters = config.ReaderClients.GetClusterNames()
|
||||
} else {
|
||||
clusters = strings.Fields(rule.Cluster)
|
||||
}
|
||||
|
||||
for _, cluster := range clusters {
|
||||
if config.ReaderClients.IsNil(cluster) {
|
||||
// 没有这个集群的配置,跳过
|
||||
continue
|
||||
}
|
||||
|
||||
node, err := naming.ClusterHashRing.GetNode(cluster, fmt.Sprint(ids[i]))
|
||||
if err != nil {
|
||||
logger.Warning("failed to get node from hashring:", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if node == config.C.Heartbeat.Endpoint {
|
||||
mines = append(mines, &RuleSimpleInfo{Id: ids[i], Cluster: cluster})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Workers.BuildRe(mines)
|
||||
}
|
||||
|
||||
type RecordingRuleEval struct {
|
||||
cluster string
|
||||
rule *models.RecordingRule
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func (r RecordingRuleEval) Stop() {
|
||||
logger.Infof("recording_rule_eval:%d stopping", r.RuleID())
|
||||
close(r.quit)
|
||||
}
|
||||
|
||||
func (r RecordingRuleEval) RuleID() int64 {
|
||||
return r.rule.Id
|
||||
}
|
||||
|
||||
func (r RecordingRuleEval) Start() {
|
||||
logger.Infof("recording_rule_eval:%d started", r.RuleID())
|
||||
for {
|
||||
select {
|
||||
case <-r.quit:
|
||||
// logger.Infof("rule_eval:%d stopped", r.RuleID())
|
||||
return
|
||||
default:
|
||||
r.Work()
|
||||
interval := r.rule.PromEvalInterval
|
||||
if interval <= 0 {
|
||||
interval = 10
|
||||
}
|
||||
time.Sleep(time.Duration(interval) * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r RecordingRuleEval) Work() {
|
||||
promql := strings.TrimSpace(r.rule.PromQl)
|
||||
if promql == "" {
|
||||
logger.Errorf("recording_rule_eval:%d promql is blank", r.RuleID())
|
||||
return
|
||||
}
|
||||
|
||||
if config.ReaderClients.IsNil(r.cluster) {
|
||||
log.Println("reader client is nil")
|
||||
return
|
||||
}
|
||||
|
||||
value, warnings, err := config.ReaderClients.GetCli(r.cluster).Query(context.Background(), promql, time.Now())
|
||||
if err != nil {
|
||||
logger.Errorf("recording_rule_eval:%d promql:%s, error:%v", r.RuleID(), promql, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(warnings) > 0 {
|
||||
logger.Errorf("recording_rule_eval:%d promql:%s, warnings:%v", r.RuleID(), promql, warnings)
|
||||
return
|
||||
}
|
||||
ts := conv.ConvertToTimeSeries(value, r.rule)
|
||||
if len(ts) != 0 {
|
||||
for _, v := range ts {
|
||||
writer.Writers.PushSample(r.rule.Name, v, r.cluster)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type RuleEvalForExternalType struct {
|
||||
sync.RWMutex
|
||||
rules map[string]RuleEval // key: hash of ruleid_promevalinterval_promql_cluster
|
||||
}
|
||||
|
||||
var RuleEvalForExternal = RuleEvalForExternalType{rules: make(map[string]RuleEval)}
|
||||
|
||||
func (re *RuleEvalForExternalType) Build() {
|
||||
rids := memsto.AlertRuleCache.GetRuleIds()
|
||||
rules := make(map[string]*RuleSimpleInfo)
|
||||
|
||||
for i := 0; i < len(rids); i++ {
|
||||
rule := memsto.AlertRuleCache.Get(rids[i])
|
||||
if rule == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
var clusters []string
|
||||
if rule.Cluster == models.ClusterAll {
|
||||
clusters = config.ReaderClients.GetClusterNames()
|
||||
} else {
|
||||
clusters = strings.Fields(rule.Cluster)
|
||||
}
|
||||
|
||||
for _, cluster := range clusters {
|
||||
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
rule.Id,
|
||||
rule.PromEvalInterval,
|
||||
rule.PromQl,
|
||||
cluster,
|
||||
))
|
||||
re.Lock()
|
||||
rules[hash] = &RuleSimpleInfo{
|
||||
Id: rule.Id,
|
||||
Cluster: cluster,
|
||||
}
|
||||
re.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// stop old
|
||||
for oldHash := range re.rules {
|
||||
if _, has := rules[oldHash]; !has {
|
||||
re.Lock()
|
||||
delete(re.rules, oldHash)
|
||||
re.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// start new
|
||||
re.Lock()
|
||||
defer re.Unlock()
|
||||
for hash, ruleSimple := range rules {
|
||||
if _, has := re.rules[hash]; has {
|
||||
// already exists
|
||||
continue
|
||||
}
|
||||
|
||||
elst, err := models.AlertCurEventGetByRuleIdAndCluster(ruleSimple.Id, ruleSimple.Cluster)
|
||||
if err != nil {
|
||||
logger.Errorf("worker_build: AlertCurEventGetByRule failed: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
firemap := make(map[string]*models.AlertCurEvent)
|
||||
for i := 0; i < len(elst); i++ {
|
||||
elst[i].DB2Mem()
|
||||
firemap[elst[i].Hash] = elst[i]
|
||||
}
|
||||
fires := NewAlertCurEventMap()
|
||||
fires.SetAll(firemap)
|
||||
newRe := RuleEval{
|
||||
rule: memsto.AlertRuleCache.Get(ruleSimple.Id),
|
||||
quit: make(chan struct{}),
|
||||
fires: fires,
|
||||
pendings: NewAlertCurEventMap(),
|
||||
cluster: ruleSimple.Cluster,
|
||||
}
|
||||
|
||||
re.rules[hash] = newRe
|
||||
}
|
||||
}
|
||||
|
||||
func (re *RuleEvalForExternalType) Get(rid int64, cluster string) (RuleEval, bool) {
|
||||
re.RLock()
|
||||
defer re.RUnlock()
|
||||
rule := memsto.AlertRuleCache.Get(rid)
|
||||
if rule == nil {
|
||||
return RuleEval{}, false
|
||||
}
|
||||
|
||||
hash := str.MD5(fmt.Sprintf("%d_%d_%s_%s",
|
||||
rule.Id,
|
||||
rule.PromEvalInterval,
|
||||
rule.PromQl,
|
||||
cluster,
|
||||
))
|
||||
|
||||
if ret, has := re.rules[hash]; has {
|
||||
return ret, has
|
||||
}
|
||||
|
||||
return RuleEval{}, false
|
||||
}
|
||||
@@ -48,6 +48,15 @@ func (chr *ClusterHashRingType) GetNode(cluster, pk string) (string, error) {
|
||||
return chr.Rings[cluster].Get(pk)
|
||||
}
|
||||
|
||||
func (chr *ClusterHashRingType) IsHit(cluster string, pk string, currentNode string) bool {
|
||||
node, err := chr.GetNode(cluster, pk)
|
||||
if err != nil {
|
||||
logger.Debugf("cluster:%s pk:%s failed to get node from hashring:%v", cluster, pk, err)
|
||||
return false
|
||||
}
|
||||
return node == currentNode
|
||||
}
|
||||
|
||||
func (chr *ClusterHashRingType) Set(cluster string, r *consistent.Consistent) {
|
||||
chr.RLock()
|
||||
defer chr.RUnlock()
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"github.com/didi/nightingale/v5/src/pkg/aop"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/naming"
|
||||
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
)
|
||||
|
||||
|
||||
@@ -5,15 +5,17 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/engine"
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/toolkits/pkg/ginx"
|
||||
"github.com/toolkits/pkg/logger"
|
||||
"github.com/toolkits/pkg/str"
|
||||
|
||||
"github.com/didi/nightingale/v5/src/models"
|
||||
"github.com/didi/nightingale/v5/src/pkg/poster"
|
||||
"github.com/didi/nightingale/v5/src/server/common/conv"
|
||||
"github.com/didi/nightingale/v5/src/server/config"
|
||||
"github.com/didi/nightingale/v5/src/server/engine"
|
||||
"github.com/didi/nightingale/v5/src/server/naming"
|
||||
promstat "github.com/didi/nightingale/v5/src/server/stat"
|
||||
)
|
||||
|
||||
func pushEventToQueue(c *gin.Context) {
|
||||
@@ -38,8 +40,7 @@ func pushEventToQueue(c *gin.Context) {
|
||||
event.TagsMap[arr[0]] = arr[1]
|
||||
}
|
||||
|
||||
// isMuted only need TriggerTime RuleName and TagsMap
|
||||
if engine.IsMuted(event) {
|
||||
if engine.EventMuteStra.IsMuted(nil, event) {
|
||||
logger.Infof("event_muted: rule_id=%d %s", event.RuleId, event.Hash)
|
||||
ginx.NewRender(c).Message(nil)
|
||||
return
|
||||
@@ -87,34 +88,60 @@ type eventForm struct {
|
||||
func judgeEvent(c *gin.Context) {
|
||||
var form eventForm
|
||||
ginx.BindJSON(c, &form)
|
||||
re, exists := engine.RuleEvalForExternal.Get(form.RuleId, form.Cluster)
|
||||
ruleContext, exists := engine.GetExternalAlertRule(form.Cluster, form.RuleId)
|
||||
if !exists {
|
||||
ginx.Bomb(200, "rule not exists")
|
||||
}
|
||||
re.Judge(form.Cluster, form.Vectors)
|
||||
ruleContext.HandleVectors(form.Vectors, "http")
|
||||
ginx.NewRender(c).Message(nil)
|
||||
}
|
||||
|
||||
func makeEvent(c *gin.Context) {
|
||||
var events []*eventForm
|
||||
ginx.BindJSON(c, &events)
|
||||
now := time.Now().Unix()
|
||||
//now := time.Now().Unix()
|
||||
for i := 0; i < len(events); i++ {
|
||||
re, exists := engine.RuleEvalForExternal.Get(events[i].RuleId, events[i].Cluster)
|
||||
node, err := naming.ClusterHashRing.GetNode(events[i].Cluster, fmt.Sprintf("%d", events[i].RuleId))
|
||||
if err != nil {
|
||||
logger.Warningf("event:%+v get node err:%v", events[i], err)
|
||||
ginx.Bomb(200, "event node not exists")
|
||||
}
|
||||
|
||||
if node != config.C.Heartbeat.Endpoint {
|
||||
err := forwardEvent(events[i], node)
|
||||
if err != nil {
|
||||
logger.Warningf("event:%+v forward err:%v", events[i], err)
|
||||
ginx.Bomb(200, "event forward error")
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
ruleContext, exists := engine.GetExternalAlertRule(events[i].Cluster, events[i].RuleId)
|
||||
logger.Debugf("handle event:%+v exists:%v", events[i], exists)
|
||||
if !exists {
|
||||
ginx.Bomb(200, "rule not exists")
|
||||
}
|
||||
|
||||
if events[i].Alert {
|
||||
go re.MakeNewEvent("http", now, events[i].Cluster, events[i].Vectors)
|
||||
go ruleContext.HandleVectors(events[i].Vectors, "http")
|
||||
} else {
|
||||
for _, vector := range events[i].Vectors {
|
||||
hash := str.MD5(fmt.Sprintf("%d_%s_%s", events[i].RuleId, vector.Key, events[i].Cluster))
|
||||
now := vector.Timestamp
|
||||
go re.RecoverEvent(hash, now, vector.Value)
|
||||
alertVector := engine.NewAlertVector(ruleContext, nil, vector, "http")
|
||||
readableString := vector.ReadableValue()
|
||||
go ruleContext.RecoverSingle(alertVector.Hash(), vector.Timestamp, &readableString)
|
||||
}
|
||||
}
|
||||
}
|
||||
ginx.NewRender(c).Message(nil)
|
||||
}
|
||||
|
||||
// event 不归本实例处理,转发给对应的实例
|
||||
func forwardEvent(event *eventForm, instance string) error {
|
||||
ur := fmt.Sprintf("http://%s/v1/n9e/make-event", instance)
|
||||
res, code, err := poster.PostJSON(ur, time.Second*5, []*eventForm{event}, 3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger.Infof("forward event: result=succ url=%s code=%d event:%v response=%s", ur, code, event, string(res))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}, clusters ...strin
|
||||
if ok {
|
||||
succ := c.PushFront(v)
|
||||
if !succ {
|
||||
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, c.Len())
|
||||
logger.Warningf("Write cluster:%s channel(%s) full, current channel size: %d", cluster, ident, c.Len())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,6 +141,7 @@ func configRoute(r *gin.Engine, version string) {
|
||||
pages.GET("/auth/callback", loginCallback)
|
||||
pages.GET("/auth/callback/cas", loginCallbackCas)
|
||||
pages.GET("/auth/callback/oauth", loginCallbackOAuth)
|
||||
pages.GET("/auth/perms", allPerms)
|
||||
|
||||
pages.GET("/metrics/desc", metricsDescGetFile)
|
||||
pages.POST("/metrics/desc", metricsDescGetMap)
|
||||
|
||||
@@ -214,11 +214,19 @@ func loginCallback(c *gin.Context) {
|
||||
|
||||
if user != nil {
|
||||
if config.C.OIDC.CoverAttributes {
|
||||
user.Nickname = ret.Nickname
|
||||
user.Email = ret.Email
|
||||
user.Phone = ret.Phone
|
||||
user.UpdateAt = time.Now().Unix()
|
||||
if ret.Nickname != "" {
|
||||
user.Nickname = ret.Nickname
|
||||
}
|
||||
|
||||
if ret.Email != "" {
|
||||
user.Email = ret.Email
|
||||
}
|
||||
|
||||
if ret.Phone != "" {
|
||||
user.Phone = ret.Phone
|
||||
}
|
||||
|
||||
user.UpdateAt = time.Now().Unix()
|
||||
user.Update("email", "nickname", "phone", "update_at")
|
||||
}
|
||||
} else {
|
||||
@@ -316,9 +324,18 @@ func loginCallbackCas(c *gin.Context) {
|
||||
ginx.Dangerous(err)
|
||||
if user != nil {
|
||||
if config.C.CAS.CoverAttributes {
|
||||
user.Nickname = ret.Nickname
|
||||
user.Email = ret.Email
|
||||
user.Phone = ret.Phone
|
||||
if ret.Nickname != "" {
|
||||
user.Nickname = ret.Nickname
|
||||
}
|
||||
|
||||
if ret.Email != "" {
|
||||
user.Email = ret.Email
|
||||
}
|
||||
|
||||
if ret.Phone != "" {
|
||||
user.Phone = ret.Phone
|
||||
}
|
||||
|
||||
user.UpdateAt = time.Now().Unix()
|
||||
ginx.Dangerous(user.Update("email", "nickname", "phone", "update_at"))
|
||||
}
|
||||
@@ -409,11 +426,19 @@ func loginCallbackOAuth(c *gin.Context) {
|
||||
|
||||
if user != nil {
|
||||
if config.C.OAuth.CoverAttributes {
|
||||
user.Nickname = ret.Nickname
|
||||
user.Email = ret.Email
|
||||
user.Phone = ret.Phone
|
||||
user.UpdateAt = time.Now().Unix()
|
||||
if ret.Nickname != "" {
|
||||
user.Nickname = ret.Nickname
|
||||
}
|
||||
|
||||
if ret.Email != "" {
|
||||
user.Email = ret.Email
|
||||
}
|
||||
|
||||
if ret.Phone != "" {
|
||||
user.Phone = ret.Phone
|
||||
}
|
||||
|
||||
user.UpdateAt = time.Now().Unix()
|
||||
user.Update("email", "nickname", "phone", "update_at")
|
||||
}
|
||||
} else {
|
||||
|
||||
@@ -19,3 +19,18 @@ func permsGets(c *gin.Context) {
|
||||
lst, err := models.OperationsOfRole(strings.Fields(user.Roles))
|
||||
ginx.NewRender(c).Data(lst, err)
|
||||
}
|
||||
|
||||
func allPerms(c *gin.Context) {
|
||||
roles, err := models.RoleGetsAll()
|
||||
ginx.Dangerous(err)
|
||||
m := make(map[string][]string)
|
||||
for _, r := range roles {
|
||||
lst, err := models.OperationsOfRole(strings.Fields(r.Name))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
m[r.Name] = lst
|
||||
}
|
||||
|
||||
ginx.NewRender(c).Data(m, err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user