当前位置: 首页 > news >正文

Flink / Scala - AllWindowFunction 与 KeyedProcessFunction 处理 TopK 问题

目录

一.引言

二.辅助类

1.数据源

2.数据类

三.AllWindowProcessFunction 处理 TopK

四.keyedProcessFunction 处理 TopK

1.AddSource

2.keyBy + Aggregate

3.KeyedProcessFunction 取 TopK

五.总结


一.引言

Flink 流式任务中除了常见的 UV、PV 统计外,还有针对 TimeWindow 的 TopK 问题,下面基于 AllWindowFunction 与 KeyedProcessFunction 讲解如何使用 Flink 解决 TopK 问题。

二.辅助类

1.数据源

随机生成用户 ID 与浏览 URL 及其对应的时间戳,没生成一条数据 Sleep 1s,便于观察。

import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

class ClickHouse extends SourceFunction[Event] {

  var running: Boolean = true

  val random: Random = scala.util.Random
  val urlList: Array[String] = Array("www.a.com", "www.b.com", "www.c.com", "www.d.com", "www.e.com")

  override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
    while (running) {
      val user = random.nextInt(256).toString
      val url = urlList(random.nextInt(urlList.length))
      ctx.collect(Event(user, url, System.currentTimeMillis()))
      // 定时生成
      Thread.sleep(1000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }

}

2.数据类

// 用户浏览行为
case class Event(user: String, url: String, timeStamp: Long)

case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)

DataStream[T] 数据源生成类为 case class Event,其包含用户 ID 、URL 以及时间戳,UrlViewCount 类为窗口聚合后生成的数据类,其包含当前对应窗口的起始时间 start、end 以及对应的 URL 与浏览量 Count。

三.AllWindowProcessFunction 处理 TopK

使用 windowAll 实现简单,思路清晰,可以将一段时间内的 URL 浏览内容全部收集在一起然后处理,但是缺点是 windowAll 会强制并行度为1,这在分布式场景下是不推荐的。

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable


object TopNWithAllWindow {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 读取数据源并解析事件时间
    val eventStream = env.addSource(new ClickHouse)
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forMonotonousTimestamps[Event]()
          .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
            override def extractTimestamp(event: Event, l: Long): Long = event.timeStamp
          }))

    val topK: Int = 2

    // 10s统计窗口,5s滑动时间
    eventStream.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .process(new ProcessAllWindowFunction[Event , String, TimeWindow] {

          override def process(context: this.Context, elements: Iterable[Event], out: Collector[String]): Unit = {

            // 统计每个 URL 的 Count
            val allCountInfo = new mutable.HashMap[String, Long]()

            elements.iterator.foreach(elem => {
              if (allCountInfo.contains(elem.url)) {
                allCountInfo(elem.url) += 1
              } else {
                allCountInfo(elem.url) = 1
              }
            })

            // 取 TopK
            allCountInfo.toArray.sortBy(-_._2).slice(0, topK).zipWithIndex.foreach{ case ((url, count), index) => {
              val log = s"URL: $url TopK: ${index + 1} Count: $count"
              out.collect(log)
            }}
          }
        }).print()

    env.execute()

  }

}

allWindow 集中在一个节点执行,过多数据量会造成处理延时或缓存压力过大,运行程序可以得到如下结果:

1> URL: www.a.com TopK: 1 Count: 1
2> URL: www.c.com TopK: 2 Count: 1
3> URL: www.c.com TopK: 1 Count: 3
4> URL: www.a.com TopK: 2 Count: 2

四.keyedProcessFunction 处理 TopK

windowAll 不能满足工业场景要求,也不能发挥分布式系统的特点,可以通过 keyBy 对数据进行分流,通过 window + paralisim 的方式并行处理,随后聚合得到最终结果,类似于分治的思想。

1.AddSource

初始化 ExecutionEnvironment 并 addSource 添加数据源

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val eventStream = env.addSource(new ClickHouse)
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forMonotonousTimestamps[Event]()
          .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
            override def extractTimestamp(event: Event, l: Long): Long = event.timeStamp
          }))

2.keyBy + Aggregate

基于 URL keyBy 并使用 aggregate (ACC, R) 聚合,UrlViewCountAgg 以及 UrlViewCountResult 的具体实现逻辑可以参考:Flink / Scala - Aggregate 详解与 UV、PV 统计实战。

    val countStream = eventStream.keyBy(_.url)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .aggregate(new UrlViewCountAgg, new UrlViewCountResult)

UrlVIewCountResult 函数的 OUT 类型需要由 String 修改为上面提到的 UrlViewCount:

class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {

  override def process(url: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
    val start = context.window.getStart
    val end = context.window.getEnd
//    val viewCount = s"URL: $url Count: ${elements.iterator.next()} Start: $start End: $end"
    val viewCount = UrlViewCount(url, elements.iterator.next(), start, end)
    out.collect(viewCount)
  }
  
}

3.KeyedProcessFunction 取 TopK

这里基于 windowEnd 进行 keyBy 确保同期时间窗口的数据都能汇聚在一个节点,其次使用 ListState 对每个 URL Window 生成的 UrlViewCount 进行汇总。这里使用了 TimerService 对事件进行了注册,TimerService 相关内容可以参考:Flink - Timer 与 TimerService 源码分析与详解,过期时间采用 windowEnd + 1 可以确保所有 URL 对应的时间窗口数据都可以被收集,从而保证结果的相对正确。

    val topK: Int = 2
    val result = countStream.keyBy(_.windowEnd)
        .process(new KeyedProcessFunction[Long, UrlViewCount, String] {
          var urlViewCountListState: ListState[UrlViewCount] = _

          // 初始化 ListState
          override def open(parameters: Configuration): Unit = {
            urlViewCountListState = getRuntimeContext.getListState(
              new ListStateDescriptor[UrlViewCount]("URL_VIEW_COUNT_LIST", classOf[UrlViewCount])
            )
          }

          override def processElement(value: UrlViewCount, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#Context, out: Collector[String]): Unit = {
            // Count 数据添加至列表状态中
            urlViewCountListState.add(value)
            // 注册 window + 1ms 的定时器等待全部数据到齐
            ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)
          }

          override def onTimer(timestamp: Long, ctx:KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

            val allCountInfo = new ArrayBuffer[UrlViewCount]()

            // 获取全部数据
            val it = urlViewCountListState.get().iterator()
            while (it.hasNext) {
              allCountInfo.append(it.next())
            }

            // 清除状态
            urlViewCountListState.clear()

            // 取 TopK
            val topKUrl = allCountInfo.sortBy(-_.count).slice(0, topK)

            // 输出
            topKUrl.zipWithIndex.foreach{ case(urlViewCount, index) =>
              val url = urlViewCount.url
              val count = urlViewCount.count
              val log = s"WindowEnd: ${timestamp - 1} Top: ${index + 1} Url: $url Count: $count"
              out.collect(log)
            }
          }
        }).print()

    env.execute()

最后遍历 ListState 的数据并排序获得 TopK,最后记得将 ListState 清除掉,避免存储的占用。

五.总结

相比于 windowAll,keyBy 之后再聚合的方式可以充分发挥分布式环境的特点,但是由于 URL 的浏览存在热点 hotspot 的情况,例如 www.a.com 浏览了 1w次,而 www.b.com 浏览了 10 次,很明显产生了数据倾斜,虽然使用了分布式,但是对应 www.a.com 的节点会压力过大,这时候需要在 keyBy 的时候进行调整,例如给 URL 增加随机数即 URL + "_" + Random,使其均匀分布在各个节点上,在 keyedProcessFunction 处在 split[0] 获取 URL,完成总的统计。

相关文章:

  • C++20之Concpet(概念部分,之二)
  • 【Spirng】@Component和@Configuration和@Bean的区别
  • 跟着江南一点雨学习springmvc(3)
  • 安卓手机使用Tasker实现应用级功能,屏幕翻译v9,翻译复制贴图
  • 一篇文章吃透 CSS3 属性: transition过渡 与 transform动画
  • 通讯录的动态版本
  • Docker搭建Kafka集群
  • WPS增加正则处理函数,简直如虎添翼
  • opencloudos容器镜像优化
  • 二.go语言条件与循环
  • 高阶函数1
  • 电子信息考研择校
  • 互联网数据管理平台
  • 本科行政管理毕业论文什么题目好写点?
  • kmp の 笔记
  • 【翻译】babel对TC39装饰器草案的实现
  • Android路由框架AnnoRouter:使用Java接口来定义路由跳转
  • Angular Elements 及其运作原理
  • CODING 缺陷管理功能正式开始公测
  • JAVA多线程机制解析-volatilesynchronized
  • rabbitmq延迟消息示例
  • React-Native - 收藏集 - 掘金
  • SQLServer插入数据
  • WebSocket使用
  • 阿里云应用高可用服务公测发布
  • 解决iview多表头动态更改列元素发生的错误
  • 容器服务kubernetes弹性伸缩高级用法
  • 少走弯路,给Java 1~5 年程序员的建议
  • 为视图添加丝滑的水波纹
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • #{}和${}的区别?
  • #etcd#安装时出错
  • #控制台大学课堂点名问题_课堂随机点名
  • (8)Linux使用C语言读取proc/stat等cpu使用数据
  • (zz)子曾经曰过:先有司,赦小过,举贤才
  • (待修改)PyG安装步骤
  • (十)【Jmeter】线程(Threads(Users))之jp@gc - Stepping Thread Group (deprecated)
  • (一)appium-desktop定位元素原理
  • (原創) 如何讓IE7按第二次Ctrl + Tab時,回到原來的索引標籤? (Web) (IE) (OS) (Windows)...
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • .Net 4.0并行库实用性演练
  • .net core开源商城系统源码,支持可视化布局小程序
  • .NET8.0 AOT 经验分享 FreeSql/FreeRedis/FreeScheduler 均已通过测试
  • .net开发引用程序集提示没有强名称的解决办法
  • .NET框架
  • .vimrc php,修改home目录下的.vimrc文件,vim配置php高亮显示
  • @GetMapping和@RequestMapping的区别
  • @RequestMapping 的作用是什么?
  • @SpringBootApplication 包含的三个注解及其含义
  • @staticmethod和@classmethod的作用与区别
  • [@Controller]4 详解@ModelAttribute
  • []我的函数库
  • [2]十道算法题【Java实现】
  • [BetterExplained]书写是为了更好的思考(转载)