当前位置: 首页 > news >正文

Flink / Scala - Metrics 使用与详解

一.引言

Flink 公开了一套度量系统,允许开发者收集运行中的数据并展示到外部系统,例如终端和监控页面,1.15.x 下 Flink 支持四套监控指标:

  • A.Counter - 计数器,针对最基本的计数需求,类似 Accumulator 累加器
  • B.Gauges - 仪表,针对自定义 T 的累加需求,结果通过 T.toString() 展示
  • C.Histograms - 直方图,针对数值型数据统计直方图并在监控页面展示
  • D.Meters - 计量 - 通过 mark 的标点方式并进行时间粒度的流量统计

用户可以在调用扩展 RichFunction 的任何用户自定义函数中调用

getRuntimeContext().getMetricGroup()

该方法会返回一个 MetricGroup 对象,其类似于 Map,通过用户自定义 String 索引用户自定义的 metric 指标,下面介绍几种 metric 指标的使用方法。

二.准备工作

1.测试 Source 逻辑

下述 demo 均使用下述数据源,该数据源继承 RichSourceFunction,从 start = 0 开始,每5s生产100 个连续数据并累加 start,循环往复。

class SourceFromCollection extends RichSourceFunction[InputData] {
  private var isRunning = true
  var start = 0

  override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = {
    while ( {
      isRunning
    }) {
      (start to (start + 100)).foreach(num => {
        ctx.collect(InputData(num))
      })
      start += 100
      TimeUnit.SECONDS.sleep(5)
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }
}

2.测试 ProcessFunction 逻辑

数据的生产类型也很简单,即对生产的每个数字包装为 InputData 类,其包含一个 num 变量,后续 DataStream 基于该变量 keyBy 并生成后续的 keyedStream 并通过 ProcessFunction 处理。

case class InputData(num: Int)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)

ProcessFunction 处理逻辑:

这里对于每一个到达 processFunction 的 element 随机生成 filterState 【odd-奇数】与 【even-偶数】,只有当前 element 与对应 filterState 匹配才会向下游 collect,最后将产出的数据 print 触发逻辑,后续统计指标的逻辑将在 ProcessFunction 中展示。

    // 连接两个流
    keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {

      override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
        val filterState = if (Math.random() < 0.5) {
          "odd"
        } else {
          "even"
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }

      }
    }).setParallelism(2).print()

3.监控页面如何查看 metrics

在对应监控页面选择要查看的 operator ,在右侧选择 Metrics,可以通过 Addd Metric 选择指标并依次添加到图示的展示页面,Flink 内置了多个监控指标参考,如果我们自定义了 Metric 也可以在 Add Meteic 搜索框中搜索并展示

下面选择了 3 个系统统计量, currentInputWatermark - 当前输入水印、isBackPressured - 是否背压、和 numBytesInPerSec - 每 s byte 传输量,同学们也可以选择更多自己需要查看的统计量展示:

三.Counter

1.简介

Couter 用于统计数量,可以通过 inc 或者 inc(long var1) 达到递增、通过 dec 或者 dec(long var1) 达到递减的目的,只需要通过 counter(String name) 即可初始化对应 Counter。

class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

Tips:

Counter 并非全局性统计量,和 spark 的 Accumulator 累加器有不同,Counter 只统计对应 operator 算子内的计数,例如 process(new ProcessFunction).setParallelism(2) 设置并行度2,则会初始化 counter_0 和 counter_1 分别累计 ProcessFunction_0 和 ProcessFunction_1 内的统计量。除此之外,你也可以继承 Counter 接口自定义统计逻辑。

2.使用

在 ProcessFunction 中调用 open 函数初始化计数器 Counter,这里初始化 evenCounter、oddCounter 和 allCounter 分别统计当前 ProcessFunction 发出的 偶数、奇数和总数的数量,可以在代码中显式的调用 counter.getCount() 方法打印计数器的值,也可以直接在监控页面查看对应的 Counter 指标。

    keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {

      // counter
      var evenCounter: Counter = _
      var oddCounter: Counter = _
      var allCounter: Counter = _

      var taskId: Int = _

      override def open(parameters: Configuration): Unit = {
        taskId = getRuntimeContext.getIndexOfThisSubtask

        // counter 计数
        this.evenCounter = getRuntimeContext.getMetricGroup.counter("odd")
        this.oddCounter = getRuntimeContext.getMetricGroup.counter("even")
        this.allCounter = getRuntimeContext.getMetricGroup.counter("all")

      }

      override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
        val filterState = if (Math.random() < 0.5) {
          "odd"
        } else {
          "even"
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          oddCounter.inc()
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          evenCounter.inc()
          out.collect(inputData.num.toString)
        }

        allCounter.inc(1)
        println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount}")

      }
    }).setParallelism(2).print()

在监控中找到 0.keyedProcess.odd 、0.keyedProcess.even 和 o.keyedProcess.all 即可实时查看奇数、偶数和总数的指标,这里由于 filterState 采用 50% 的概率生成且奇偶数也是 50% 的概率,所以奇数或者偶数的 collect 概率为 0.5*0.5=0.25,这也和指标中的大致数据相匹配:

四.Guage

1.简介

Guage 本质上也是累积的统计量,与 Counter 相比,必须创建一个实现了 org.apache.flink.metrics.Gauge 接口的类,这从 Guage 的管方代码可以看出,只需实现 getValue 方法,而特质 T 代表其可以使用一切类型,而不局限于 Long,如果需要打印 getValue 需要调用 T.toString 方法,所以建议将自己需要展示的信息通过重写 toString 方法展示,从而更好地展示。

new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

上述 Demo 使用了接口的简易实现方法,最终实现了一个 Int 类型的 Counter。

2.使用

在 ProcessFunction 的 open 方法中初始化 Guage 并在 processElement 方法中实现调用即可,下面换成 String 做简单展示。 

   keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {

      // guage
      var valueToExpose = ""

      var taskId: Int = _

      override def open(parameters: Configuration): Unit = {
        taskId = getRuntimeContext.getIndexOfThisSubtask

        // Gauge 仪表
        getRuntimeContext.getMetricGroup.gauge[String, ScalaGauge[String]]("MyGauge", ScalaGauge[String]( () => valueToExpose ) )
      }

      override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
        val filterState = if (Math.random() < 0.5) {
          "odd"
        } else {
          "even"
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }

        valueToExpose += "-"

        println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")

      }
    }).setParallelism(2).print()

对 T:String 进行累加,除此之外也可以基于自己的统计类,通过 ScalaGuage 的简式写法返回或者继承 Guage 的接口实现 getValue 均可。 

五.Histogram

1.简介

直方图是常见的统计量,用于直观的展示数据的变化,直方图主要需要实现 update 方法更新直方图需要展示的数据,Flink 默认不提供 Histogram 方法但提供了 Wrapper 封装,允许开发者使用 Codahale/DropWizard 直方图,使用时需要添加 pom 依赖,官方给出的 1.16-SNAPSHOT 版本并未在 maven 环境中下载到,所以这里选用了低版本。

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>1.14.5</version>
            <scope>provided</scope>
        </dependency>
class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
        
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }
  
  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

和上面的 Counter 和 Guage 相同,需要在 open 方法内通过 getRuntimeContext.getMetricGroup 实现初始化。

2.使用

在 ProcessFunction 中统计总输出的直方图,这里直方图的数字借用了 allCoutner.getCount 的数值。

    keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {

      // counter
      var allCounter: Counter = _

      // histogram
      var histogram: Histogram = _

      var taskId: Int = _

      override def open(parameters: Configuration): Unit = {
        taskId = getRuntimeContext.getIndexOfThisSubtask

        // counter 计数
        this.allCounter = getRuntimeContext.getMetricGroup.counter("all")

        // Histogram 直方图
        val  dropwizardHistogram: com.codahale.metrics.Histogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
        histogram = getRuntimeContext.getMetricGroup.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
      }

      override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
        val filterState = if (Math.random() < 0.5) {
          "odd"
        } else {
          "even"
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }

        allCounter.inc(1)

        // 更新直方图数值
        histogram.update(allCounter.getCount)

        println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")

      }
    }).setParallelism(2).print()

为 Histogram 添加数值后,Flink Metrics 会为该统计指标提供 Mean、Max、Min 等相关统计直方图,下图分别为 Max、Min 和 Mean:

除此之外,还可以选择 p 分位数更细粒度查看指标:

六.Meter

1.简介

 meter 又称计量器,该指标主要测算平均吞吐量,通过 markEvent 方法可以注册,代表时间的发生,如果有多个事件同时发生可以使用 markEvent(long n) 实现注册,使用 Meter 同样需要引入 dropwizard 依赖:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-dropwizard</artifactId>
            <version>1.14.5</version>
            <scope>provided</scope>
        </dependency>
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

2.使用

使用 Meter 统计奇数数字的平均吞吐量。

    keyedStream.process(new KeyedProcessFunction[Int, InputData, String] {
      var meter: Meter = _

      var taskId: Int = _

      override def open(parameters: Configuration): Unit = {
        taskId = getRuntimeContext.getIndexOfThisSubtask

        // Meter 平均吞吐
        val dropwizardMeter: com.codahale.metrics.Meter = new com.codahale.metrics.Meter()
        meter = getRuntimeContext.getMetricGroup.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
      }

      override def processElement(inputData: InputData, context: KeyedProcessFunction[Int, InputData, String]#Context, out: Collector[String]): Unit = {
        val filterState = if (Math.random() < 0.5) {
          "odd"
        } else {
          "even"
        }
        // 奇数模式
        if (filterState == "odd" && inputData.num % 2 != 0) {
          meter.markEvent()
          out.collect(inputData.num.toString)
        }
        // 偶数模式
        if (filterState == "even" && inputData.num % 2 == 0) {
          out.collect(inputData.num.toString)
        }

        println(s"TaskId: $taskId Even: ${evenCounter.getCount} Odd: ${oddCounter.getCount} All: ${allCounter.getCount} value: $valueToExpose")

      }
    }).setParallelism(2).print()

这里统计量与 parallelism 即 TaskId 相关联,所以分别有 0.keyedProcess.myMeter 和 1.keyedProcess.myMeter,这其实对应 TaskId 和 OperatorName 以及相应 Metric 标识:

七.总结

除了 Counters、Gauges、Histograms 和 Meters 外,Flink 内部自带了很多相关指标监控任务的运行,主要分类为有:

-  CPU 处理器

-  Memory 内存

- Threads 线程

- GarbageCollection 垃圾回收

- ClassLoader 类加载器

- Network 网络

- Default shuffle service 默认 shuffle 服务

这里涵盖的指标非常多,可以通过 Add Metric 下拉框查看对应 TaskId 以及 OperatorName 的相关上述监控指标,这里展示了 Window Operator 下 WaterMark,isBackPressured 以及 numBytesPerSec 的基本指标。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 日常开发中比较实用的命令行
  • 阿里云AliGenie开发天猫语音功能-入门篇
  • 【校招VIP】前端布局模块之Flex弹性布局
  • 开放式激光振镜运动控制器(二):ZMC408SCAN激光接口与控制
  • 前端HTML5 +CSS3 4.CSS基础 4 选择器进阶 5 背景相关属性
  • Google Earth Engine(GEE)——sentinel-2 NDVI多时相影像展示
  • Java设计模式之解释器模式
  • 雅思 Band 7+ 预备课程
  • CSS背景(background)
  • Linux上快速安装zookeeper
  • MySQL主从复制架构实现
  • stack使用+模拟实现
  • 软件测试中的集成测试到底是什么?集成的方法又有哪些?
  • centos安装NIS
  • pd.Series().rank()的个人理解
  • @angular/forms 源码解析之双向绑定
  • 【mysql】环境安装、服务启动、密码设置
  • 【译】理解JavaScript:new 关键字
  • JS+CSS实现数字滚动
  • js数组之filter
  • leetcode-27. Remove Element
  • 纯 javascript 半自动式下滑一定高度,导航栏固定
  • 回顾 Swift 多平台移植进度 #2
  • 排序算法之--选择排序
  • 前端攻城师
  • 如何设计一个微型分布式架构?
  • 深度学习中的信息论知识详解
  • 数据可视化之 Sankey 桑基图的实现
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 终端用户监控:真实用户监控还是模拟监控?
  • 字符串匹配基础上
  • [地铁译]使用SSD缓存应用数据——Moneta项目: 低成本优化的下一代EVCache ...
  • Mac 上flink的安装与启动
  • 机器人开始自主学习,是人类福祉,还是定时炸弹? ...
  • # 20155222 2016-2017-2 《Java程序设计》第5周学习总结
  • ######## golang各章节终篇索引 ########
  • #vue3 实现前端下载excel文件模板功能
  • $.ajax()
  • ( )的作用是将计算机中的信息传送给用户,计算机应用基础 吉大15春学期《计算机应用基础》在线作业二及答案...
  • (42)STM32——LCD显示屏实验笔记
  • (C#)获取字符编码的类
  • (C语言)共用体union的用法举例
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (编程语言界的丐帮 C#).NET MD5 HASH 哈希 加密 与JAVA 互通
  • (附源码)ssm旅游企业财务管理系统 毕业设计 102100
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (五)Python 垃圾回收机制
  • (原創) 如何將struct塞進vector? (C/C++) (STL)
  • (转载)(官方)UE4--图像编程----着色器开发
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .NET Core引入性能分析引导优化
  • .NET 材料检测系统崩溃分析
  • .w文件怎么转成html文件,使用pandoc进行Word与Markdown文件转化
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)