当前位置: 首页 > news >正文

Streaming源码解读 mapWithState-2

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

本文重点关注mapWithState,假设spark shell启动没问题,明白Receiver和RDD生成的过程,不明白的建议从此文开始学习,或关注YY课堂:每天20:00免费现场授课频道68917580。

案例:累计单词出现的次数,持续性的更新计数。

因为是持续性的计数,因此比较高效的算法是计算完一批数据之后将每个单词的计数保存起来,在下一批数据来之后,再做增量更新。

先在终端运行:

root@master:~# nc -lk 9999

 启动spark-shell,输入如下代码:

// 累计单词出现的次数,持续性的更新计数
    import org.apache.spark.streaming.{Durations, State, StateSpec, StreamingContext}
    import org.apache.spark.{SparkContext, SparkConf}
    sc.setCheckpointDir(".")
    val ssc = new StreamingContext(sc, Durations.seconds(5L))
    ssc.socketTextStream("localhost", 9999).
      flatMap(_.split(" ")).
      map((_, 1)).
      reduceByKey(_ + _).
      mapWithState(
        StateSpec.function(
          (word: String, countOfThisBatch: Option[Int], state: State[Int]) => {
            val newState = countOfThisBatch.getOrElse(0) + state.getOption().getOrElse(0)
            state.update(newState)
            (word, newState)
          }
        )
      ).
      print
    ssc.start

输入:183855_BQcS_120395.png

输出:183043_WaBO_120395.png

183103_rKF4_120395.png183811_ZQmH_120395.png

和updateStateByKey的结果略有差别,mapWithState只对增量的数据做操作,并且只返回增量的数据。

而updateStateByKey是返回全量的数据,而且在前文的源码解析中发现,需要cogroup一次。

 

按照案例解析源码:

看下MapWithStateDStreamImpl的实例化过程。

  1. 实例化一个InternalMapWithStateDStream,
  2. 将上一个DStream作为parent创建此InternalMapWithStateDStream,同时将创建的DStream当作当前的DStream的parent。
// MapWithStateDStream.scala line 57
  private val internalStream =
    new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)

// MapWithStateDStream.scala line 62
  override def dependencies: List[DStream[_]] = List(internalStream)

再看下InternalMapWithStateDStream的实例化,

StorageLevel设置为MEMORY_ONLY,而DStream默认的是MEMORY_ONLY_SER

// InternalMapWithStateDStream.scala line 109
persist(StorageLevel.MEMORY_ONLY)

// DStream.scala line 165
// DStream默认的storage level
  /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
  def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)

checkpoint的设置为true

// MapWithStateDStream.scala line 120
  /** Enable automatic checkpointing */
  override val mustCheckpoint = true

checkpointDuration的设置,若未设置checkpointDuration,则使用滑动窗口*默认倍数(10)。也就是每10个滑动窗口时长做一次checkpoint。

// MapWithStateDStream.scala line 123
  /** Override the default checkpoint duration */
  override def initialize(time: Time): Unit = {
    if (checkpointDuration == null) {
      checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
    }
    super.initialize(time)
  }

// InternalMapWithStateDStream.scala line 168
private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

至此,DStream的lineage如下:

SocketInputDStream -> FlatMappedDStream -> MappedDStream -> ShuffledDStream -> InternalMapWithStateDStream -> MapWithStateDStream?-> ForEachDStream

 

静态的DAG已经分析完,接下来从运行时回溯:

首先回溯到的是MapWithStateDStreamImpl.compute,依赖父DStream,InternalMapWithStateDStream.compute。

在InternalMapWithStateDStream中,也是先取上一批次产生的RDD,为什么用也呢?因为updateStateByKey也是这么做的。具体做法如下:

  • getOrCompute(validTime - slideDuration):取上一批次生产的RDD,
    • None:创建空RDD
    • Some(rdd):上一批次的有RDD
      • rdd.partitioner != Some(partitioner):若分区器不相同,使用新分区器重新生成RDD
      • rdd:直接返回已经存在的RDD
// MapWithStateDStream.scala line 134
    val prevStateRDD = getOrCompute(validTime - slideDuration) match {
      case Some(rdd) =>
        if (rdd.partitioner != Some(partitioner)) {
          // If the RDD is not partitioned the right way, let us repartition it using the
          // partition index as the key. This is to ensure that state RDD is always partitioned
          // before creating another state RDD using it
          MapWithStateRDD.createFromRDD[K, V, S, E](
            rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
        } else {
          rdd
        }
      case None =>
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )
    }

上述是取到上一批次的状态信息。是从时间维度来切入的。

从另一方面,是从RDD的lineage维度来看,取本批次的依赖的父RDD。

// MapWithStateDStream.scala line 156
// 取本批次的父RDD
    val dataRDD = parent.getOrCompute(validTime).getOrElse {
      context.sparkContext.emptyRDD[(K, V)]
    }

最后将历史状态RDD与本批次的RDD创建新的RDD:MapWithStateRDD

// MapWithStateDStream.scala line 163
    Some(new MapWithStateRDD(
      prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))

接下来再从运行时开始推导:

首次运行的时候,getOrCompute(validTime - slideDuration) --> None:

// MapWithStateDStream.scala line 145
      case None =>
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )

先取到spec,此时的spec就是mapWithState接受的参数,是StateSpec类型的,而StateSpec是抽象类,显然传入的是StateSpec的子类StateSpecImpl的实例。

// PairDStreamFunctions.scala line 375
  def mapWithState[StateType: ClassTag, MappedType: ClassTag](
      spec: StateSpec[K, V, StateType, MappedType]
    ): MapWithStateDStream[K, V, StateType, MappedType] = {
    new MapWithStateDStreamImpl[K, V, StateType, MappedType](
      self,
      spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    )
  }

// StateSpec.scala line 73
sealed abstract class StateSpec[KeyType, ValueType, StateType, MappedType] extends Serializable

115627_QFHM_120395.png

也就是说,案例中的如下代码,实际上返回的是一个StateSpecImpl类的实例。

        StateSpec.function(
          (word: String, countOfThisBatch: Option[Int], state: State[Int]) => {
            val newState = countOfThisBatch.getOrElse(0) + state.getOption().getOrElse(0)
            state.update(newState)
            (word, newState)
          }
        )

源码中来验证下,的确是new 了stateSpecImpl返回了。

传入的函数的签名是(KeyType,Option[ValueType],State[StateType])=>MappedType。

在方法内,将函数签名适配成(Time,KeyType,Option[ValueType],State[StateType])=>MappedType。相当于新增了时间维度。

将适配好的函数 作为参数 传入StateSpecImpl的构造。

// StateSpec.scala line 174
  def function[KeyType, ValueType, StateType, MappedType](
      mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
    ): StateSpec[KeyType, ValueType, StateType, MappedType] = {
    ClosureCleaner.clean(mappingFunction, checkSerializable = true)
    val wrappedFunction =
      (time: Time, key: KeyType, value: Option[ValueType], state: State[StateType]) => {
        Some(mappingFunction(key, value, state))
      }
    new StateSpecImpl(wrappedFunction)
  }

// StateSpec.scala line 230
private[streaming]
case class StateSpecImpl[K, V, S, T](
    function: (Time, K, Option[V], State[S]) => Option[T]) extends StateSpec[K, V, S, T]

看下StateSpecImpl类的继承结构。

163003_h9D2_120395.png

父类MapWithStateDStream类的定义,只定义了一个方法,stateSnapshots()。

// MapWithStateDStream.scala line 43
sealed abstract class MapWithStateDStream[KeyType, ValueType, StateType, MappedType: ClassTag](
    ssc: StreamingContext) extends DStream[MappedType](ssc) {

  /** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */
  def stateSnapshots(): DStream[(KeyType, StateType)]
}

再看MapWithStateDStream.scala line 145的分支。

最初都是null,则返回空RDD。

// MapWithStateDStream.scala line 147
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext))

// StateSpec.scala line 269
private[streaming] def getInitialStateRDD(): Option[RDD[(K, S)]] = Option(initialStateRDD)

// StateSpec.scala line 237
  @volatile private var initialStateRDD: RDD[(K, S)] = null

没有历史状态时,使用PairRDD创建MapWithStateRDD

// MapWithStateDStream.scala line 146
        MapWithStateRDD.createFromPairRDD[K, V, S, E](
          spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
          partitioner,
          validTime
        )
  • 按照分区器分区,这个很重要。上面有提到,如果分区器不一致,需要重新算的步骤。
  • 每一个分区一个MapWithStateRDDRecord,将KV保存成对应的StateMap中,并且分区器是延续的。
  • 创建新的MapWithStateRDD并返回。
// MapWithStateRDD.scala line 183
  def createFromPairRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      pairRDD: RDD[(K, S)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {
// 使用分区器创建StateMap,每个分区一个MapWithStateRDDRecord
    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions ({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)
// 使用相同的分区器创建空RDD
    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)
// 适配方法签名
    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](
      stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
  }

StateMap的创建

// MapWithStateRDD.scala line 189
val stateMap = StateMap.create[K, S](SparkEnv.get.conf)

// StateMap.scala line 59
  def create[K: ClassTag, S: ClassTag](conf: SparkConf): StateMap[K, S] = {
    val deltaChainThreshold = conf.getInt("spark.streaming.sessionByKey.deltaChainThreshold",
      DELTA_CHAIN_LENGTH_THRESHOLD)
    new OpenHashMapBasedStateMap[K, S](deltaChainThreshold)
  }

// StateMap.scala line 28
/** Internal interface for defining the map that keeps track of sessions. */
private[streaming] abstract class StateMap[K: ClassTag, S: ClassTag] extends Serializable

142200_SBZ1_120395.png

构造OpenHashMapBasedStateMap,内部使用OpenHashMap数据结构。

// StateMap.scala line 79
/** Implementation of StateMap based on Spark's [[org.apache.spark.util.collection.OpenHashMap]] */
private[streaming] class OpenHashMapBasedStateMap[K: ClassTag, S: ClassTag](
    @transient @volatile var parentStateMap: StateMap[K, S],
    initialCapacity: Int = DEFAULT_INITIAL_CAPACITY,
    deltaChainThreshold: Int = DELTA_CHAIN_LENGTH_THRESHOLD
  ) extends StateMap[K, S] { self =>

// ...
// line 99
  @transient @volatile private var deltaMap = new OpenHashMap[K, StateInfo[S]](initialCapacity)
// ...
}

看下OpenHashMap的介绍

  • 快速的Map实现,比java.util.HashMap快5倍
  • key可以为null
  • 不能删除
  • 内部是使用高速的HashSet实现:OpenHashSet作为keySet,也是非null,不能删除
// OpenHashMap.scala line 34
/**
 * :: DeveloperApi ::
 * A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
 * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
 * space overhead.
 *
 * Under the hood, it uses our OpenHashSet implementation.
 */
@DeveloperApi
private[spark]
class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
    initialCapacity: Int)
  extends Iterable[(K, V)]
  with Serializable

再回到 MapWithStateRDD.scala line 190

// MapWithStateRDD.scala line 190
iterator.foreach { case (key, state) => stateMap.put(key, state, updateTime.milliseconds) }

若key不存在,则将state和time封装成StateInfo,作为map的value。

若key存在,则取出StateInfo,更新state和time,且将deleted设置为false。

// StateMap.scala line 142
  /** Add or update state */
  override def put(key: K, state: S, updateTime: Long): Unit = {
    val stateInfo = deltaMap(key)
    if (stateInfo != null) {
      stateInfo.update(state, updateTime)
    } else {
      deltaMap.update(key, new StateInfo(state, updateTime))
    }
  }

// StateMap.scala line 320
  /** Internal class to represent the state information */
  case class StateInfo[S](
      var data: S = null.asInstanceOf[S],
      var updateTime: Long = -1,
      var deleted: Boolean = false) {

    def markDeleted(): Unit = {
      deleted = true
    }

    def update(newData: S, newUpdateTime: Long): Unit = {
      data = newData
      updateTime = newUpdateTime
      deleted = false
    }
  }

从上述的deleted属性,也可以理解OpenHashapBasedStateMap对于需要删除的state的做法:是将对应的value,即StateInfo对象的deleted设置为null,而不是将key从map中删除来实现的。

封装成MapWithStateRDDRecord 类型的case class后,作为唯一的一个元素,加入迭代器。

最后返会 MapWithStateRDD实例,包含之前的状态RDD和本批次的分区后的数据RDD

  1. 之前的状态RDD:RDD[MapWithStateRDDRecord[K, S, E]
  2. 本批次的数据RDD:RDD[(K, V)]

在本案例中,K是String,S是Int,E是(String,Int),V是Int。

 

创建MapWithStateRDD实例,RDD依赖的RDD有两个,一个是之前的历史状态RDD,一个是当前批次的RDD。

// MapWithStateRDD.scala line 119
private[streaming] class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
    private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]],
    private var partitionedDataRDD: RDD[(K, V)],
    mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
    batchTime: Time,
    timeoutThresholdTime: Option[Long]
  ) extends RDD[MapWithStateRDDRecord[K, S, E]](
    partitionedDataRDD.sparkContext,
    List(
      new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD),
      new OneToOneDependency(partitionedDataRDD))
  )

同样,当非首次时,见MapWithStateDStream.scala line 135

则会判断,若分区器有变化,则会重新创建RDD,因为分区器是要被后续沿用的。若分区器没修改,则直接返回。

此处的rdd是MapWithStateRDDRecord[K,S,V],

// MapWithStateDStream.scala line 140
          MapWithStateRDD.createFromRDD[K, V, S, E](
            rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
MapWithStateRDD.createFromRDD[K, V, S, E](rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime):
  1. _.stateMap.getAll():将历史的状态过滤掉本次有的key,在加上本次更新中非删除的数据,就是所有的状态了。
    1. 162720_VnqQ_120395.png
  2. rdd.flatMap:就是扁平化,很简单的RDD转换。输出为MapPartitionsRDD[(K, S, Long)],本案例中,K:String,S:Int。Long代表的是updateTime。
  3. MapWithStateRDD.createFromRDD:
    1. val pairRDD = rdd.map { x => (x._1, (x._2, x._3)) }:转换成key:(state,updateTime)格式,类似StateInfo case class
    2. 按照分区器分区后,将rdd中的数据装入新的StateMap中。
    3. 剩余做法和 createFromPairRDD做法一致。
// MapWithStateRDD.scala line 202
  def createFromRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
      rdd: RDD[(K, S, Long)],
      partitioner: Partitioner,
      updateTime: Time): MapWithStateRDD[K, V, S, E] = {

    val pairRDD = rdd.map { x => (x._1, (x._2, x._3)) }
    val stateRDD = pairRDD.partitionBy(partitioner).mapPartitions({ iterator =>
      val stateMap = StateMap.create[K, S](SparkEnv.get.conf)
      iterator.foreach { case (key, (state, updateTime)) =>
        stateMap.put(key, state, updateTime)
      }
      Iterator(MapWithStateRDDRecord(stateMap, Seq.empty[E]))
    }, preservesPartitioning = true)

    val emptyDataRDD = pairRDD.sparkContext.emptyRDD[(K, V)].partitionBy(partitioner)

    val noOpFunc = (time: Time, key: K, value: Option[V], state: State[S]) => None

    new MapWithStateRDD[K, V, S, E](
      stateRDD, emptyDataRDD, noOpFunc, updateTime, None)
  }

那么在RDD计算时,MapWithStateRDD.scala line 144 ,调用 MapWithStateRDDRecord.updateRecordWithData方法,得到MapWithStateRDDRecord,并封装成迭代器。

看下MapWithStateRDDRecord.updateRecordWithData:

  1. prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }:先取出历史状态的RDD
  2. 对本批数据中的key,依次遍历。
    1. wrappedState.wrap(newStateMap.get(key)):只是用历史状态做初始化。设置defined flags:172518_aPfo_120395.png
    2. val returned = mappingFunction(batchTime, key, Some(value), wrappedState):执行自定义的函数;也就是案例中的代码段:
      1. 本案例中逻辑是次数相加:将历史状态和当前数据做聚合
      2. 再将状态更新。173041_gU3V_120395.png
      3. 返回MappedType,即(单词,计数):(String,Int)。
                (word: String, countOfThisBatch: Option[Int], state: State[Int]) => {
                  val newState = countOfThisBatch.getOrElse(0) + state.getOption().getOrElse(0)
                  state.update(newState)
                  (word, newState)
                }
    3. 封装成迭代器返回
  3. 移除超时数据(按配置可选)
  4. 封装成MapWithStateRDDRecord后返回。

 

那么至此,InternalMapWithStateDStream的内部已经分析完,下面进入到依赖InternalMapWithStateDStream的MapWithStateDStream。

174100_tVzY_120395.png

此时_.flagMap[MappedType]中,_=MapWithStateRDD

{_.mappedData}中_=174339_dNlD_120395.png,此处的mappedData为每次匹配的key调用自定义函数后的返回,本案例中为 (word, newState)。也就是说,状态更新和函数返回是两码事。考虑下:为什么mappedData是一个Seq类型,是因为,在本批次中,同一个单词可能会出现多次,则每次都会调用下自定义的函数。

将本章的案例的reduceByKey注释,并在同一批输入中重复一个单词,就可以很好的解释。

 

updateStateByKey和MapWithState的异同:

相同点:都是使用历史状态和增量数据合并的逻辑来实现的。

差异:

  1. 在保存历史状态的实现中
    1. updateStateByKey是复用自身RDD,通过RDD来实现历史状态的保存的。
    2. mapWithState是通过内部的RDD来实现的。是保存于MapWithStateRDDRecord
  2. 在性能方便
    1. updateStateByKey是对全量历史数据与增量数据进行cogroup,性能肯定低。
    2. mapWithState只是更新增量数据部分。性能相对较高。
  3. ​​​​​​​返回数据范围
    1. updateStateByKey​​​​​​​返回的是全量数据
    2. mapWithState只返回更新的数据部分。

 

其他差异请读者留言,一起补充完整。

databricks官方demo

 

 

转载于:https://my.oschina.net/corleone/blog/684215

相关文章:

  • 你真的知道 == 和 equals 的区别吗?
  • 关于错误提示:此实现不是 Windows 平台 FIPS 验证的加密算法的一部分的解决方案...
  • 【c++进阶:c++ algorithm的常用函数】
  • (转) RFS+AutoItLibrary测试web对话框
  • 刚刚接触python的感想
  • linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能
  • 第70届(2019)IMO中国国家队选拔考试试题
  • xml编码格式在tomcat下运行出错,eclipse里面直接运行没有问题
  • 微服务架构 SpringCloud(二)Eureka(服务注册和服务发现基础篇)
  • Android图片缓存之Glide进阶
  • css(二):高度与文档流,line-box与box,宽度,position与
  • SQL Server2005作业执行失败的解决办法
  • 进度条14
  • 阿里云RDS PostgreSQL GPU加速规格(支持GIS时空加速)发布
  • 怎么隐藏MathType标尺
  • 【跃迁之路】【733天】程序员高效学习方法论探索系列(实验阶段490-2019.2.23)...
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • Apache Pulsar 2.1 重磅发布
  • docker-consul
  • gulp 教程
  • iOS 系统授权开发
  • Java 内存分配及垃圾回收机制初探
  • JAVA之继承和多态
  • js面向对象
  • MySQL-事务管理(基础)
  • Nacos系列:Nacos的Java SDK使用
  • PHP 小技巧
  • php的插入排序,通过双层for循环
  • Spring核心 Bean的高级装配
  • SQLServer插入数据
  • Storybook 5.0正式发布:有史以来变化最大的版本\n
  • Webpack入门之遇到的那些坑,系列示例Demo
  • 模型微调
  • 入口文件开始,分析Vue源码实现
  • 阿里云IoT边缘计算助力企业零改造实现远程运维 ...
  • ​Java并发新构件之Exchanger
  • ​香农与信息论三大定律
  • (3)llvm ir转换过程
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (附源码)springboot码头作业管理系统 毕业设计 341654
  • (附源码)springboot美食分享系统 毕业设计 612231
  • (附源码)ssm高校志愿者服务系统 毕业设计 011648
  • (转)MVC3 类型“System.Web.Mvc.ModelClientValidationRule”同时存在
  • (转载)CentOS查看系统信息|CentOS查看命令
  • .[hudsonL@cock.li].mkp勒索病毒数据怎么处理|数据解密恢复
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .net CHARTING图表控件下载地址
  • .net mvc 获取url中controller和action
  • .Net Web窗口页属性
  • .Net+SQL Server企业应用性能优化笔记4——精确查找瓶颈
  • .NETCORE 开发登录接口MFA谷歌多因子身份验证
  • .so文件(linux系统)
  • [8481302]博弈论 斯坦福game theory stanford week 1
  • [corCTF 2022] CoRJail: From Null Byte Overflow To Docker Escape