Flink / Scala - Metrics 使用与详解
一.引言
Flink 公开了一套度量系统,允许开发者收集运行中的数据并展示到外部系统,例如终端和监控页面,1.15.x 下 Flink 支持四套监控指标:
- A.Counter - 计数器,针对最基本的计数需求,类似 Accumulator 累加器
- B.Gauges - 仪表,针对自定义 T 的累加需求,结果通过 T.toString() 展示
- C.Histograms - 直方图,针对数值型数据统计直方图并在监控页面展示
- D.Meters - 计量 - 通过 mark 的标点方式并进行时间粒度的流量统计
用户可以在调用扩展 RichFunction 的任何用户自定义函数中调用
getRuntimeContext().getMetricGroup()
该方法会返回一个 MetricGroup 对象,其类似于 Map,通过用户自定义 String 索引用户自定义的 metric 指标,下面介绍几种 metric 指标的使用方法。
二.准备工作
1.测试 Source 逻辑
下述 demo 均使用下述数据源,该数据源继承 RichSourceFunction,从 start = 0 开始,每5s生产100 个连续数据并累加 start,循环往复。
class SourceFromCollection extends RichSourceFunction[InputData] {
private var isRunning = true
var start = 0
override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
while ( {
isRunning
}) {
(start to (start + 100)).foreach(num => {
ctx.collect(InputData(num))
})
start += 100
TimeUnit.SECONDS.sleep(5)
}
}
override def cancel(): Unit = {
isRunning = false
}
}
2.测试 ProcessFunction 逻辑
数据的生产类型也很简单,即对生产的每个数字包装为 InputData 类,其包含一个 num 变量,后续 DataStream 基于该变量 keyBy 并生成后续的 keyedStream 并通过 ProcessFunction 处理。
case class InputData(num: Int)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)
ProcessFunction 处理逻辑:
这里对于每一个到达 processFunction 的 element 随机生成 filterState 【odd-奇数】与 【even-偶数】,只有当前 element 与对应 filterState 匹配才会向下游 collect,最后将产出的数据 print 触发逻辑,后续统计指标的逻辑将在 ProcessFunction 中展示。
// 连接两个流
keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
val filterState = if (Math.random() < 0.5) {
"odd"
} else {
"even"
}
// 奇数模式
if (filterState == "odd" && inputData.num % 2 != 0) {
out.collect(inputData.num.toString)
}
// 偶数模式
if (filterState == "even" && inputData.num % 2 == 0) {
out.collect(inputData.num.toString)
}
}
}).setParallelism(2).print()
3.监控页面如何查看 metrics
在对应监控页面选择要查看的 operator ,在右侧选择 Metrics,可以通过 Addd Metric 选择指标并依次添加到图示的展示页面,Flink 内置了多个监控指标参考,如果我们自定义了 Metric 也可以在 Add Meteic 搜索框中搜索并展示
下面选择了 3 个系统统计量, currentInputWatermark - 当前输入水印、isBackPressured - 是否背压、和 numBytesInPerSec - 每 s byte 传输量,同学们也可以选择更多自己需要查看的统计量展示:
三.Counter
1.简介
Couter 用于统计数量,可以通过 inc 或者 inc(long var1) 达到递增、通过 dec 或者 dec(long var1) 达到递减的目的,只需要通过 counter(String name) 即可初始化对应 Counter。
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
Tips:
Counter 并非全局性统计量,和 spark 的 Accumulator 累加器有不同,Counter 只统计对应 operator 算子内的计数,例如 process(new ProcessFunction).setParallelism(2) 设置并行度2,则会初始化 counter_0 和 counter_1 分别累计 ProcessFunction_0 和 ProcessFunction_1 内的统计量。除此之外,你也可以继承 Counter 接口自定义统计逻辑。
2.使用
在 ProcessFunction 中调用 open 函数初始化计数器 Counter,这里初始化 evenCounter、oddCounter 和 allCounter 分别统计当前 ProcessFunction 发出的 偶数、奇数和总数的数量,可以在代码中显式的调用 counter.getCount() 方法打印计数器的值,也可以直接在监控页面查看对应的 Counter 指标。
keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
// counter
var evenCounter: Counter = _
var oddCounter: Counter = _
var allCounter: Counter = _
var taskId: Int = _
override def open(parameters: Configuration): Unit = {
taskId = getRuntimeContext.getIndexOfThisSubtask
// counter 计数
this.evenCounter = getRuntimeContext.getMetricGroup.counter("odd")
this.oddCounter = getRuntimeContext.getMetricGroup.counter("even")
this.allCounter = getRuntimeContext.getMetricGroup.counter("all")
}
override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
val filterState = if (Math.random() < 0.5) {
"odd"
} else {
"even"
}
// 奇数模式
if (filterState == "odd" && inputData.num % 2 != 0) {
oddCounter.inc()
out.collect(inputData.num.toString)
}
// 偶数模式
if (filterState == "even" && inputData.num % 2 == 0) {
evenCounter.inc()
out.collect(inputData.num.toString)
}
allCounter.inc(1)
println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount}")
}
}).setParallelism(2).print()
在监控中找到 0.keyedProcess.odd 、0.keyedProcess.even 和 o.keyedProcess.all 即可实时查看奇数、偶数和总数的指标,这里由于 filterState 采用 50% 的概率生成且奇偶数也是 50% 的概率,所以奇数或者偶数的 collect 概率为 0.5*0.5=0.25,这也和指标中的大致数据相匹配:
四.Guage
1.简介
Guage 本质上也是累积的统计量,与 Counter 相比,必须创建一个实现了 org.apache.flink.metrics.Gauge 接口的类,这从 Guage 的管方代码可以看出,只需实现 getValue 方法,而特质 T 代表其可以使用一切类型,而不局限于 Long,如果需要打印 getValue 需要调用 T.toString 方法,所以建议将自己需要展示的信息通过重写 toString 方法展示,从而更好地展示。
new class MyMapper extends RichMapFunction[String,String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
value
}
}
上述 Demo 使用了接口的简易实现方法,最终实现了一个 Int 类型的 Counter。
2.使用
在 ProcessFunction 的 open 方法中初始化 Guage 并在 processElement 方法中实现调用即可,下面换成 String 做简单展示。
keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
// guage
var valueToExpose = ""
var taskId: Int = _
override def open(parameters: Configuration): Unit = {
taskId = getRuntimeContext.getIndexOfThisSubtask
// Gauge 仪表
getRuntimeContext.getMetricGroup.gauge[String, ScalaGauge[String]]("MyGauge", ScalaGauge[String]( () => valueToExpose ) )
}
override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
val filterState = if (Math.random() < 0.5) {
"odd"
} else {
"even"
}
// 奇数模式
if (filterState == "odd" && inputData.num % 2 != 0) {
out.collect(inputData.num.toString)
}
// 偶数模式
if (filterState == "even" && inputData.num % 2 == 0) {
out.collect(inputData.num.toString)
}
valueToExpose += "-"
println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")
}
}).setParallelism(2).print()
对 T:String 进行累加,除此之外也可以基于自己的统计类,通过 ScalaGuage 的简式写法返回或者继承 Guage 的接口实现 getValue 均可。
五.Histogram
1.简介
直方图是常见的统计量,用于直观的展示数据的变化,直方图主要需要实现 update 方法更新直方图需要展示的数据,Flink 默认不提供 Histogram 方法但提供了 Wrapper 封装,允许开发者使用 Codahale/DropWizard 直方图,使用时需要添加 pom 依赖,官方给出的 1.16-SNAPSHOT 版本并未在 maven 环境中下载到,所以这里选用了低版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.14.5</version>
<scope>provided</scope>
</dependency>
class MyMapper extends RichMapFunction[Long, Long] {
@transient private var histogram: Histogram = _
override def open(config: Configuration): Unit = {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def map(value: Long): Long = {
histogram.update(value)
value
}
}
和上面的 Counter 和 Guage 相同,需要在 open 方法内通过 getRuntimeContext.getMetricGroup 实现初始化。
2.使用
在 ProcessFunction 中统计总输出的直方图,这里直方图的数字借用了 allCoutner.getCount 的数值。
keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
// counter
var allCounter: Counter = _
// histogram
var histogram: Histogram = _
var taskId: Int = _
override def open(parameters: Configuration): Unit = {
taskId = getRuntimeContext.getIndexOfThisSubtask
// counter 计数
this.allCounter = getRuntimeContext.getMetricGroup.counter("all")
// Histogram 直方图
val dropwizardHistogram: com.codahale.metrics.Histogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
histogram = getRuntimeContext.getMetricGroup.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
}
override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
val filterState = if (Math.random() < 0.5) {
"odd"
} else {
"even"
}
// 奇数模式
if (filterState == "odd" && inputData.num % 2 != 0) {
out.collect(inputData.num.toString)
}
// 偶数模式
if (filterState == "even" && inputData.num % 2 == 0) {
out.collect(inputData.num.toString)
}
allCounter.inc(1)
// 更新直方图数值
histogram.update(allCounter.getCount)
println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")
}
}).setParallelism(2).print()
为 Histogram 添加数值后,Flink Metrics 会为该统计指标提供 Mean、Max、Min 等相关统计直方图,下图分别为 Max、Min 和 Mean:
除此之外,还可以选择 p 分位数更细粒度查看指标:
六.Meter
1.简介
meter 又称计量器,该指标主要测算平均吞吐量,通过 markEvent 方法可以注册,代表时间的发生,如果有多个事件同时发生可以使用 markEvent(long n) 实现注册,使用 Meter 同样需要引入 dropwizard 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.14.5</version>
<scope>provided</scope>
</dependency>
class MyMapper extends RichMapFunction[Long,Long] {
@transient private var meter: Meter = _
override def open(config: Configuration): Unit = {
meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter())
}
override def map(value: Long): Long = {
meter.markEvent()
value
}
}
2.使用
使用 Meter 统计奇数数字的平均吞吐量。
keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
var meter: Meter = _
var taskId: Int = _
override def open(parameters: Configuration): Unit = {
taskId = getRuntimeContext.getIndexOfThisSubtask
// Meter 平均吞吐
val dropwizardMeter: com.codahale.metrics.Meter = new com.codahale.metrics.Meter()
meter = getRuntimeContext.getMetricGroup.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
}
override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
val filterState = if (Math.random() < 0.5) {
"odd"
} else {
"even"
}
// 奇数模式
if (filterState == "odd" && inputData.num % 2 != 0) {
meter.markEvent()
out.collect(inputData.num.toString)
}
// 偶数模式
if (filterState == "even" && inputData.num % 2 == 0) {
out.collect(inputData.num.toString)
}
println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")
}
}).setParallelism(2).print()
这里统计量与 parallelism 即 TaskId 相关联,所以分别有 0.keyedProcess.myMeter 和 1.keyedProcess.myMeter,这其实对应 TaskId 和 OperatorName 以及相应 Metric 标识:
七.总结
除了 Counters、Gauges、Histograms 和 Meters 外,Flink 内部自带了很多相关指标监控任务的运行,主要分类为有:
- CPU 处理器
- Memory 内存
- Threads 线程
- GarbageCollection 垃圾回收
- ClassLoader 类加载器
- Network 网络
- Default shuffle service 默认 shuffle 服务
这里涵盖的指标非常多,可以通过 Add Metric 下拉框查看对应 TaskId 以及 OperatorName 的相关上述监控指标,这里展示了 Window Operator 下 WaterMark,isBackPressured 以及 numBytesPerSec 的基本指标。