2019独角兽企业重金招聘Python工程师标准>>>
初始化
dispacher的初始化是在main的reload方法里面cmd/alertmanager/main.go
reload := func() (err error) {
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)
go disp.Run()
}
接下来就是dispatch/dispatch.go的Run()
func (d *Dispatcher) Run() {
d.run(d.alerts.Subscribe())
}
我们再回头看一下Subscribe()返回的是什么,当前版本的接口实现在provider/mem/mem.go
func (a *Alerts) Subscribe() provider.AlertIterator {
ch = make(chan *types.Alert, 200)
alerts, err := a.getPending()
go func() {
for _, a := range alerts {
select {
case ch <- a:
case <-done:
return
}
}
}
return provider.NewAlertIterator(ch, done, err)
}
func NewAlertIterator(ch <-chan *types.Alert, done chan struct{}, err error) AlertIterator {
return &alertIterator{
ch: ch,
done: done,
err: err,
}
}
接下来回到dispatch/dispatch.go的run(),这里就是dispatcher真正逻辑处理的地方
func (d *Dispatcher) run(it provider.AlertIterator) {
cleanup := time.NewTicker(30 * time.Second)
for {
select {
case alert, ok := <-it.Next():
for _, r := range d.route.Match(alert.Labels) {
d.processAlert(alert, r)
}
case <-cleanup.C:
for _, groups := range d.aggrGroups {
for _, ag := range groups {
if ag.empty() { //每隔30秒,清理alert为空的group
ag.stop()
delete(groups, ag.fingerprint())
}
}
}
case <-d.ctx.Done():
return
}
}
告警处理
从上面run()的代码可以看出,一旦alert满足配置里面的route规则,则会调用processAlert()方法。
路由匹配
先来看一下route如何匹配的,主要是route.Match(),代码在dispatch/route.go
func (r *Route) Match(lset model.LabelSet) []*Route {
if !r.Matchers.Match(lset) {
return nil
}
for _, cr := range r.Routes {
matches := cr.Match(lset)
all = append(all, matches...)
if matches != nil && !cr.Continue { //此处的continue为配置route项下的continue值,默认为false,即如果已经匹配到第一个route规则,就返回。
break
}
}
if len(all) == 0 {
all = append(all, r) //如果子路由没有匹配,使用默认的路由
}
return all
}
}
告警处理
route匹配完成,将对告警进行处理,第一步是在processAlert()方法
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
group := model.LabelSet{}
for ln, lv := range alert.Labels {
if _, ok := route.RouteOpts.GroupBy[ln]; ok {
group[ln] = lv
}
}
fp := group.Fingerprint()
groups, ok := d.aggrGroups[route]
if !ok {
groups = map[model.Fingerprint]*aggrGroup{}
d.aggrGroups[route] = groups
}
ag, ok := groups[fp]
if !ok {
ag = newAggrGroup(d.ctx, group, route, d.timeout) //新建AggrGroup,ag.next会用到配置里面的group_wait,即首次启动后多少秒执行第一次aggrGroup.run()里面的逻辑
groups[fp] = ag
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { //启动AggrGroup
_, _, err := d.stage.Exec(ctx, alerts...)
if err != nil {
log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
}
return err == nil
})
}
ag.insert(alert) //将alert插入到AggrGroup的alerts中
}
func (ag *aggrGroup) insert(alert *types.Alert) {
ag.alerts[alert.Fingerprint()] = alert
if !ag.hasSent && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
ag.next.Reset(0)
}
}
我们可以看出,alert处理又转交到ag.run()方法中
func (ag *aggrGroup) run(nf notifyFunc) {
ag.done = make(chan struct{})
for {
select {
case now := <-ag.next.C:
ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval)) //根据配置里的group_interval,设置group一次的timeout时间
ctx = notify.WithNow(ctx, now)
ctx = notify.WithGroupKey(ctx, ag.GroupKey())
ctx = notify.WithGroupLabels(ctx, ag.labels)
ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) //根据配置里的repeat_interval,设置notify重复发送的时间
ag.next.Reset(ag.opts.GroupInterval)
ag.flush(func(alerts ...*types.Alert) bool {
return nf(ctx, alerts...)
})
case <-ag.ctx.Done():
return
}
}
}
接下来alert处理又转交到ag.flush()方法中
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
if ag.empty() {
return
}
var (
alerts = make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
alertsSlice = make([]*types.Alert, 0, len(ag.alerts))
)
for fp, alert := range ag.alerts {
alerts[fp] = alert
alertsSlice = append(alertsSlice, alert)
}
if notify(alertsSlice...) {
for fp, a := range alerts {
if a.Resolved() && ag.alerts[fp] == a {
delete(ag.alerts, fp) //alert已经解决,删除alert
}
}
ag.hasSent = true
}
}
这里有notify func(...*types.Alert) bool这个方法参数,它的定义在调用ag.run()时传入的
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
_, _, err := d.stage.Exec(ctx, alerts...)
return err == nil
})
dispatcher的stage在dispatcher初始化时已经指定,是notify.BuildPipeline()返回,代码cmd/alertmanager/main.go
pipeline = notify.BuildPipeline(
conf.Receivers,
tmpl,
waitFunc,
inhibitor,
silences,
notificationLog,
marker,
)
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)
接下来就由notify来处理alert,将在notify的分析里面进行详解。