当前位置: 首页 > news >正文

alertmanager源码阅读 - dispatcher

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

初始化

dispacher的初始化是在main的reload方法里面cmd/alertmanager/main.go

reload := func() (err error) {
    disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)
    go disp.Run()
}

接下来就是dispatch/dispatch.go的Run()

func (d *Dispatcher) Run() {
    d.run(d.alerts.Subscribe())
}

我们再回头看一下Subscribe()返回的是什么,当前版本的接口实现在provider/mem/mem.go

func (a *Alerts) Subscribe() provider.AlertIterator {
    ch   = make(chan *types.Alert, 200)
    alerts, err := a.getPending()
    go func() {
        for _, a := range alerts {
			select {
			case ch <- a:
			case <-done:
				return
			}
		}
    }
    return provider.NewAlertIterator(ch, done, err)
}
func NewAlertIterator(ch <-chan *types.Alert, done chan struct{}, err error) AlertIterator {
    return &alertIterator{
		ch:   ch,
		done: done,
		err:  err,
	}
}

接下来回到dispatch/dispatch.go的run(),这里就是dispatcher真正逻辑处理的地方

func (d *Dispatcher) run(it provider.AlertIterator) {
    cleanup := time.NewTicker(30 * time.Second)
    for {
		select {
            case alert, ok := <-it.Next():
                for _, r := range d.route.Match(alert.Labels) {
                    d.processAlert(alert, r)
                }
            case <-cleanup.C:
                for _, groups := range d.aggrGroups {
                    for _, ag := range groups {
                        if ag.empty() {  //每隔30秒,清理alert为空的group
                            ag.stop()
                            delete(groups, ag.fingerprint())
                        }
                    }
                }
            case <-d.ctx.Done():
			    return
        }
}

告警处理

从上面run()的代码可以看出,一旦alert满足配置里面的route规则,则会调用processAlert()方法。

路由匹配

先来看一下route如何匹配的,主要是route.Match(),代码在dispatch/route.go

func (r *Route) Match(lset model.LabelSet) []*Route {
    if !r.Matchers.Match(lset) {
		return nil
	}
    for _, cr := range r.Routes {
		matches := cr.Match(lset)
        all = append(all, matches...)
		if matches != nil && !cr.Continue { //此处的continue为配置route项下的continue值,默认为false,即如果已经匹配到第一个route规则,就返回。
			break
		}
	}
    if len(all) == 0 { 
		all = append(all, r) //如果子路由没有匹配,使用默认的路由
        }
	return all
    }
}

告警处理

route匹配完成,将对告警进行处理,第一步是在processAlert()方法

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
	group := model.LabelSet{}
	for ln, lv := range alert.Labels {
		if _, ok := route.RouteOpts.GroupBy[ln]; ok {
			group[ln] = lv
		}
	}
	fp := group.Fingerprint()
	groups, ok := d.aggrGroups[route]
	if !ok {
		groups = map[model.Fingerprint]*aggrGroup{}
		d.aggrGroups[route] = groups
	}
	ag, ok := groups[fp]
	if !ok {
		ag = newAggrGroup(d.ctx, group, route, d.timeout)   //新建AggrGroup,ag.next会用到配置里面的group_wait,即首次启动后多少秒执行第一次aggrGroup.run()里面的逻辑
		groups[fp] = ag
		go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { //启动AggrGroup
			_, _, err := d.stage.Exec(ctx, alerts...)
			if err != nil {
				log.Errorf("Notify for %d alerts failed: %s", len(alerts), err)
			}
			return err == nil
		})
	}
	ag.insert(alert) //将alert插入到AggrGroup的alerts中
}

func (ag *aggrGroup) insert(alert *types.Alert) {
    ag.alerts[alert.Fingerprint()] = alert
    if !ag.hasSent && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
		ag.next.Reset(0)
	}
}

我们可以看出,alert处理又转交到ag.run()方法中

func (ag *aggrGroup) run(nf notifyFunc) {
    ag.done = make(chan struct{})
    for {
		select {
		case now := <-ag.next.C:
            ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval)) //根据配置里的group_interval,设置group一次的timeout时间
            ctx = notify.WithNow(ctx, now)
            ctx = notify.WithGroupKey(ctx, ag.GroupKey())
			ctx = notify.WithGroupLabels(ctx, ag.labels)
			ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
			ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)  //根据配置里的repeat_interval,设置notify重复发送的时间
            ag.next.Reset(ag.opts.GroupInterval)
            ag.flush(func(alerts ...*types.Alert) bool {
				return nf(ctx, alerts...)
			})
        case <-ag.ctx.Done():
			return
		}
    }
}

接下来alert处理又转交到ag.flush()方法中

func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
	if ag.empty() {
		return
	}
    var (
		alerts      = make(map[model.Fingerprint]*types.Alert, len(ag.alerts))
		alertsSlice = make([]*types.Alert, 0, len(ag.alerts))
	)
    for fp, alert := range ag.alerts {
		alerts[fp] = alert
		alertsSlice = append(alertsSlice, alert)
	}
    if notify(alertsSlice...) {
        for fp, a := range alerts {
			if a.Resolved() && ag.alerts[fp] == a {
				delete(ag.alerts, fp)  //alert已经解决,删除alert
			}
		}

		ag.hasSent = true
    }
}

这里有notify func(...*types.Alert) bool这个方法参数,它的定义在调用ag.run()时传入的

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
			_, _, err := d.stage.Exec(ctx, alerts...)
			return err == nil
		})

dispatcher的stage在dispatcher初始化时已经指定,是notify.BuildPipeline()返回,代码cmd/alertmanager/main.go

pipeline = notify.BuildPipeline(
			conf.Receivers,
			tmpl,
			waitFunc,
			inhibitor,
			silences,
			notificationLog,
			marker,
		)
disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc)

接下来就由notify来处理alert,将在notify的分析里面进行详解。

转载于:https://my.oschina.net/sannychan/blog/1602254

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • java8-模拟hadoop
  • 第 13 章 Barman for PostgreSQL
  • spark 源码编译 standalone 模式部署
  • 在华为设备上实施GRE隧道和IPSEC ***
  • 如何在数据库动态建表
  • 十年阿里java架构师的六大设计原则和项目经验
  • 基于 python + WebDriverAgent 的“跳一跳”小程序高分教程
  • json logstash 解析失败 ctrl-code 1
  • 5-2 equal getClass or instanceOf
  • linux kernel编译配置相关
  • 不要在构造函数中抛出异常
  • 老男孩教育教您批量建立nagios配置文件的方法
  • 使用jQuery获取session中存储的list集合
  • 如何理解接口-Java系列
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • [case10]使用RSQL实现端到端的动态查询
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • 2019.2.20 c++ 知识梳理
  • angular组件开发
  • CNN 在图像分割中的简史:从 R-CNN 到 Mask R-CNN
  • CSS居中完全指南——构建CSS居中决策树
  • ES6 学习笔记(一)let,const和解构赋值
  • java取消线程实例
  • jquery cookie
  • Linux编程学习笔记 | Linux IO学习[1] - 文件IO
  • MySQL常见的两种存储引擎:MyISAM与InnoDB的爱恨情仇
  • MySQL数据库运维之数据恢复
  • 从tcpdump抓包看TCP/IP协议
  • 给新手的新浪微博 SDK 集成教程【一】
  • 汉诺塔算法
  • 猴子数据域名防封接口降低小说被封的风险
  • 手机端车牌号码键盘的vue组件
  • 协程
  • 一起参Ember.js讨论、问答社区。
  • 终端用户监控:真实用户监控还是模拟监控?
  • const的用法,特别是用在函数前面与后面的区别
  • 策略 : 一文教你成为人工智能(AI)领域专家
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • ​​​​​​​STM32通过SPI硬件读写W25Q64
  • ​secrets --- 生成管理密码的安全随机数​
  • #、%和$符号在OGNL表达式中经常出现
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • (ZT)一个美国文科博士的YardLife
  • (八)Flask之app.route装饰器函数的参数
  • (笔记)M1使用hombrew安装qemu
  • (紀錄)[ASP.NET MVC][jQuery]-2 純手工打造屬於自己的 jQuery GridView (含完整程式碼下載)...
  • (数据大屏)(Hadoop)基于SSM框架的学院校友管理系统的设计与实现+文档
  • (四)软件性能测试
  • (轉貼) VS2005 快捷键 (初級) (.NET) (Visual Studio)
  • (最完美)小米手机6X的Usb调试模式在哪里打开的流程
  • ***利用Ms05002溢出找“肉鸡
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .gitignore文件_Git:.gitignore
  • .Net Core 微服务之Consul(三)-KV存储分布式锁
  • .NET Core/Framework 创建委托以大幅度提高反射调用的性能