当前位置: 首页 > news >正文

Spark:数据倾斜处理一般从什么地方入手

参考:https://zhidao.baidu.com/question/308753793256679684.html

 触发shuffle的常见算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。

要解决数据倾斜的问题,首先要定位数据倾斜发生在什么地方,首先是哪个stage,直接在Web UI上看就可以,然后查看运行耗时的task,查看数据是否倾斜了!

根据这个task,根据stage划分原理,推算出数据倾斜发生在哪个shuffle类算子上。 查看导致数据倾斜的key的数据分布情况 根据执行操作的不同,可以有很多种查看key分布的方式: 
1,如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。
2,如果是Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来各个key出现的次数,collect、take到客户端打印一下,就可以看到key的分布情况。

 

比如针对wordCount案例,最后的reduceByKey算子导致了数据倾斜:

val sampledPairs = pairs.sample(false,0.1) 

//对pairs采样10%
val sampledWordCounts = sampledPairs.countByKey() 
sampledWordCounts.foreach(println(_)) 

数据倾斜的解决办法


解决方案一:使用Hive ETL预处理数据 适用场景:导致数据倾斜的是Hive表,Hive表中的数据本身很不均匀,业务场景需要频繁使用Spark对Hive表执行某个分析操作。 实现思路:提前将join等操作执行,进行Hive阶段的ETL。将导致数据倾斜的shuffle前置。 
优缺点:实现简单,Spark作业性能提升,但是Hive ETL还是会发生数据倾斜,导致Hive ETL的速度很慢。

实践经验:将数据倾斜提前到上游的Hive ETL,每天就执行一次,慢就慢点吧。


解决方案二:过滤少数导致倾斜的key 适用场景:少数几个key导致数据倾斜,而且对计算本身影响并不大的话。 实现思路:比如Spark SQL中直接用where条件过滤掉这些key,如果是RDD的话,用filter算子过滤掉这些key。如果是动态判断哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。 
优缺点:实现简单,效果也好。缺点是一般情况下导致倾斜的key还是很多的,不会是少数。 


解决方案三:提高shuffle操作的并行度 适用场景:直接面对数据倾斜的简单解决方案。 
实现思路:对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行的shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by,join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数默认值是200,对于很多场景来说有点过小。 优缺点:简单能缓解,缺点是没有根除问题,效果有限。 


解决方案四:两阶段聚合(局部聚合+全局聚合) 适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适合这种方案。 实现思路:先局部聚合,给每个key打一个小范围的随机数,比如10以内的随机数,相当于分成10份,一个task分成10个task。聚合聚合后,去掉key上的随机数前缀,再次进行全局聚合操作。 优缺点:大幅度缓解数据倾斜,缺点是仅适用于聚合类的shuffle操作。 


解决方案五:将reduce join转为map join

相关文章:

  • MapReduce:中map和reduce的数量设置问题
  • MapReduce: 计数器(Counter)
  • Hive:HiveQL中如何排查数据倾斜问题
  • Java:字符序列:String,StringBuilder,StringBuffer三者的区别
  • Hive:分区和分桶
  • sql:Oracle:驱动表是什么?
  • MapReduce:原理之Word Count 以及Java实现
  • Hive:数据倾斜调优/解决方案总结
  • Spark:对数据倾斜的八种处理方法
  • Spark:spark集群中什么是cpu-core 内核?RDD分区个数?集群的节点个数?及三者与并行度的关系
  • Spark:通过sample算子找出导致数据倾斜的key
  • MapReduce:用通俗易懂的大白话讲解MapReduce原理
  • Spark:任务中如何确定spark分区数、task数目、core个数、worker节点个数、excutor数量
  • JVM :内存初学 (堆(heap)、栈(stack)和方法区(method) ) perfect
  • BI测试
  • [nginx文档翻译系列] 控制nginx
  • 【附node操作实例】redis简明入门系列—字符串类型
  • angular学习第一篇-----环境搭建
  • Apache Spark Streaming 使用实例
  • eclipse的离线汉化
  • Java读取Properties文件的六种方法
  • java中具有继承关系的类及其对象初始化顺序
  • jquery cookie
  • mysql 5.6 原生Online DDL解析
  • React 快速上手 - 06 容器组件、展示组件、操作组件
  • React组件设计模式(一)
  • Redash本地开发环境搭建
  • sublime配置文件
  • vue 个人积累(使用工具,组件)
  • 订阅Forge Viewer所有的事件
  • 理清楚Vue的结构
  • 如何设计一个比特币钱包服务
  • 设计模式走一遍---观察者模式
  • 适配iPhoneX、iPhoneXs、iPhoneXs Max、iPhoneXr 屏幕尺寸及安全区域
  • 我的zsh配置, 2019最新方案
  • 异步
  • 用jQuery怎么做到前后端分离
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • HanLP分词命名实体提取详解
  • 完善智慧办公建设,小熊U租获京东数千万元A+轮融资 ...
  • ###51单片机学习(1)-----单片机烧录软件的使用,以及如何建立一个工程项目
  • #1015 : KMP算法
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • $Django python中使用redis, django中使用(封装了),redis开启事务(管道)
  • ( 10 )MySQL中的外键
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (06)Hive——正则表达式
  • (1)常见O(n^2)排序算法解析
  • (1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)
  • (M)unity2D敌人的创建、人物属性设置,遇敌掉血
  • (笔试题)合法字符串
  • (二)斐波那契Fabonacci函数
  • (仿QQ聊天消息列表加载)wp7 listbox 列表项逐一加载的一种实现方式,以及加入渐显动画...
  • (附源码)springboot人体健康检测微信小程序 毕业设计 012142
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统