当前位置: 首页 > news >正文

大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • RDD容错机制
  • RDD分区机制
  • RDD分区器
  • RDD自定义分区器

在这里插入图片描述

广播变量

基本介绍

有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。
为了满足这个需求,Spark提供了两种类型的变量。

  • 广播变量(broadcast variable)
  • 累加器(accumulators)
    广播变量、累加器的主要作用是为了优化Spark程序。

广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。

使用广播变量的过程如下:

  • 对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)
  • 通过Value属性访问该对象的值(Executor中)
  • 变量只会被分到各个Executor一次,作为只读值处理

在这里插入图片描述
广播变量的相关参数:

  • spark.broadcast.blockSize(缺省值: 4m)
  • spark.broadcast.checksum(缺省值:true)
  • spark.broadcast.compree(缺省值:true)

变量应用

普通JOIN

在这里插入图片描述

MapSideJoin

在这里插入图片描述

生成数据 test_spark_01.txt

1000;商品1
1001;商品2
1002;商品3
1003;商品4
1004;商品5
1005;商品6
1006;商品7
1007;商品8
1008;商品9

生成数据格式如下:
在这里插入图片描述

生成数据 test_spark_02.txt

10000;订单1;1000
10001;订单2;1001
10002;订单3;1002
10003;订单4;1003
10004;订单5;1004
10005;订单6;1005
10006;订单7;1006
10007;订单8;1007
10008;订单9;1008

生成的数据格式如下:
在这里插入图片描述

编写代码1

我们编写代码进行测试

package icu.wzkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object JoinDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("JoinDemo").setMaster("local[*]")val sc = new SparkContext(conf)sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)val productRDD: RDD[(String, String)] = sc.textFile("data/test_spark_01.txt").map {line => val fields = line.split(";")(fields(0), line)}val orderRDD: RDD[(String, String)] = sc.textFile("data/test_spark_02.txt", 8).map {line => val fields = line.split(";")(fields(2), line)}val resultRDD = productRDD.join(orderRDD)println(resultRDD.count())Thread.sleep(100000)sc.stop()}}

编译打包1

mvn clean package

并上传到服务器,准备运行
在这里插入图片描述

运行测试1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar

提交任务并执行,注意数据的路径,查看下图:
在这里插入图片描述
运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)
在这里插入图片描述

2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s
200

编写代码2

接下来,我们对比使用 MapSideJoin 的方式

package icu.wzkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object MapSideJoin {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MapSideJoin").setMaster("local[*]")val sc = new SparkContext(conf)sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)val productRDD: RDD[(String, String)] = sc.textFile("data/test_spark_01.txt").map {line => val fields = line.split(";")(fields(0), line)}val productBC = sc.broadcast(productRDD.collectAsMap())val orderRDD: RDD[(String, String)] = sc.textFile("data/test_spark_02.txt").map {line => val fields = line.split(";")(fields(2), line)}val resultRDD = orderRDD.map {case (pid, orderInfo) =>val productInfo = productBC.value(pid, (orderInfo, productInfo.getOrElse(pid, null)))}println(resultRDD.count())sc.stop()}}

编译打包2

mvn clean package

编译后上传到服务器准备执行:
在这里插入图片描述

运行测试2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar

启动我们的程序,并观察结果
在这里插入图片描述
我们可以观察到,这次只用了 0.10078 秒就完成了任务:
在这里插入图片描述

累加器

基本介绍

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。
累加器在Driver端定义、读取,在Executor中完成累加。
累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。

Spark内置了三种类型的累加器,分别是:

  • LongAccumulator 用来累加整数型
  • DoubleAccumulator 用来累加浮点型
  • CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell

spark-shell --master local[*]

启动的主界面如下:
在这里插入图片描述
写入如下的内容进行测试:

val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allwords")

我们进行测试的结果如下图所示:
在这里插入图片描述
继续编写一段进行测试:

val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}
rdd.count
rdd.collectprintln(acc1.value)
println(acc2.value)
println(acc3.value)

我们进行测试的结果如下:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • FastAPI vs Flask: 专业对比与选择
  • 使用 setResponseStatus 函数设置响应状态码
  • Prometheus Operator部署管理
  • SQL每日一题-0823(难度提升题)
  • SpringCloud Gateway及 Springboot 服务 跨域配置
  • gcc: leaf function/non-leaf function;末节函数,叶子函数
  • conda虚拟环境中安装cuda和cudnn
  • Milvus向量数据库-磁盘索引简介
  • HTTP 414错误问题
  • 第三课《排序》
  • 【html+css 绚丽Loading】 10个Loading合集(1)
  • 如何利用命令模式实现一个手游后端架构
  • 通过主成分分析实现检测金融中的异常交易模式
  • 学习node.js 十 redis的基本语法
  • Dashboard Interface 应用
  • ES6指北【2】—— 箭头函数
  • hexo+github搭建个人博客
  • 【跃迁之路】【735天】程序员高效学习方法论探索系列(实验阶段492-2019.2.25)...
  • 2019.2.20 c++ 知识梳理
  • android图片蒙层
  • Docker 笔记(1):介绍、镜像、容器及其基本操作
  • github从入门到放弃(1)
  • Java 11 发布计划来了,已确定 3个 新特性!!
  • Java-详解HashMap
  • Magento 1.x 中文订单打印乱码
  • opencv python Meanshift 和 Camshift
  • Redis中的lru算法实现
  • select2 取值 遍历 设置默认值
  • Spring核心 Bean的高级装配
  • 阿里云应用高可用服务公测发布
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • 使用 Node.js 的 nodemailer 模块发送邮件(支持 QQ、163 等、支持附件)
  • 一个SAP顾问在美国的这些年
  • hi-nginx-1.3.4编译安装
  • ​secrets --- 生成管理密码的安全随机数​
  • ​埃文科技受邀出席2024 “数据要素×”生态大会​
  • # include “ “ 和 # include < >两者的区别
  • #define与typedef区别
  • #我与Java虚拟机的故事#连载13:有这本书就够了
  • ${ }的特别功能
  • (Demo分享)利用原生JavaScript-随机数-实现做一个烟花案例
  • (第三期)书生大模型实战营——InternVL(冷笑话大师)部署微调实践
  • (附源码)python房屋租赁管理系统 毕业设计 745613
  • (七)glDrawArry绘制
  • (十八)用JAVA编写MP3解码器——迷你播放器
  • (贪心) LeetCode 45. 跳跃游戏 II
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (已解决)报错:Could not load the Qt platform plugin “xcb“
  • (轉貼) 蒼井そら挑戰筋肉擂台 (Misc)
  • ... 是什么 ?... 有什么用处?
  • .[hudsonL@cock.li].mkp勒索加密数据库完美恢复---惜分飞
  • .net core + vue 搭建前后端分离的框架
  • .Net Core webapi RestFul 统一接口数据返回格式
  • .NET Core WebAPI中使用swagger版本控制,添加注释
  • .Net Remoting(分离服务程序实现) - Part.3