当前位置: 首页 > news >正文

Spark 核心编程之 RDD 介绍

一、Spark 分布式计算模拟

Driver 端将数据拆分成 n 个 Task 发送给 Executor,n 为 Executor 个数,Task 包含数据和计算逻辑,Executor 接收到 Task 后进行计算并将计算后的结果返回给 Driver

在这里插入图片描述

  • 定义封装整体数据和逻辑的资源类

    class Resource extends Serializable {// 数据集val datas: List[Int] = List(1, 2, 3, 4)// 计算逻辑val logic: Int => Int = _ * 2
    }
    
  • 定义 Task 类

    class Task extends Serializable {// 持有的数据var data: List[Int] = _// 持有的逻辑var logic: Int => Int = _// 计算方法def compute() = {data.map(logic)}
    }
    
  • 定义 Driver 类

    /*负责与 Executor 通信并将准备好的 Task 发送给 Executor
    */
    object Driver {def main(args: Array[String]): Unit = {// 1.建立与 Executor 的连接val client1 = new Socket("localhost", 8888)val client2 = new Socket("localhost", 9999)// 2.封装 Taskval resource = new Resource()val task1 = new Task()task1.data = resource.datas.take(2)task1.logic = resource.logicval task2 = new Task()task2.data = resource.datas.takeRight(2)task2.logic = resource.logic// 3.发送 Taskval objOut1 = new ObjectOutputStream(client1.getOutputStream)objOut1.writeObject(task1)objOut1.close()objOut1.flush()client1.close()val objOut2 = new ObjectOutputStream(client2.getOutputStream)objOut2.writeObject(task2)objOut2.close()objOut2.flush()client2.close()println("客户端数据发送完毕")}
    }
    
  • 定义两个 Executor 类

    /*负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor1 {def main(args: Array[String]): Unit = {// 1.启动服务并等待客户端连接val server = new ServerSocket(8888)println("服务端[8888]启动,等待客户端连接...")val client = server.accept()// 2.接收 Taskval objIn = new ObjectInputStream(client.getInputStream)val task = objIn.readObject()// 3.执行计算并输出结果val result = task.compute()println("Executor[8888]计算结果为:" + result)objIn.close()client.close()server.close()}
    }
    
    /*负责接收 Driver 发送过来的 Task 并计算出结果
    */
    object Executor2 {def main(args: Array[String]): Unit = {// 1.启动服务并等待客户端连接val server = new ServerSocket(9999)println("服务端[9999]启动,等待客户端连接...")val client = server.accept()// 2.接收 Taskval objIn = new ObjectInputStream(client.getInputStream)val task = objIn.readObject()// 3.执行计算并输出结果val result = task.compute()println("Executor[9999]计算结果为:" + result)objIn.close()client.close()server.close()}
    }
    

二、RDD 介绍

Resilient Distributed Dataset,简称 RDD,弹性分布式数据集

1. 概念

  • RDD 是 Spark 中最基本的数据处理模型,在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合
  • 弹性的特点:
    • 存储的弹性:内存与磁盘的自动切换
    • 容错的弹性:数据丢失可以自动恢复
    • 计算的弹性:计算出错重试机制
    • 分片的弹性:可根据需要重新分片
  • 分布式:数据存储在大数据集群不同节点上
  • 数据集:RDD 封装了计算逻辑,并不保存数据
  • 数据抽象:RDD 是一个抽象类,需要子类具体实现
  • 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑
  • 可分区、并行计算

2. 实现原理

2.1 IO 的基本实现原理

IO 的实现体现了装饰者设计模式思想,实现了对类的功能的增强

  • 字节流

    InputStream in = new FileInputStream("filePath");
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就打印输出一个字节System.out.println(i);
    }
    

    在这里插入图片描述

  • 缓冲字节流

    InputStream in = new BufferedInputStream(new FileInputStream("filePath"));
    int i = -1;
    while((i = in.read()) != -1) { // 每读取一个字节就放进缓存中,当超过缓存阈值就全部输出System.out.println(i);
    }
    

    在这里插入图片描述

  • 字符流

    Reader in = new BufferedReader(new InputStreamReader(new FileInputStream("filePath"), "UTF-8"));String s = null;
    while((s = in.readLine()) != null) { // 每读取一个字节就放入转换区,满足大小就转换成一个字符放入缓存区,超过缓存区阈值就将全部字符输出System.out.println(s);
    }
    

    在这里插入图片描述

2.2 RDD 与 IO 的关系
// 以 wordCount 案例
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(" "))
val wordToOne: RDD[(String, Int)] = words.map((_, 1))
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
val wordArray: Array[(String, Int)] = wordToCount.collect()
wordArray.foreach(println)

在这里插入图片描述

  • RDD 的数据处理方式类似于 IO 流,也包含了装饰者设计模式思想
  • RDD 的数据只有在调用 collect 方法时才会真正地执行业务逻辑操作,之前的操作都是对 RDD 功能的扩展
  • RDD 是不保存数据的,但是 IO 流的缓存区可以临时保存一部分数据

3. 核心属性

  • 分区列表:用于执行任务时并行计算,是实现分布式计算的重要属性

    protected def getPartitions: Array[Partition]
    
  • 分区计算函数:Spark 在计算时,是使用分区函数对每一个分区进行计算

    def compute(split: Partition, context: TaskContext): Iterator[T]
    
  • RDD 之间的依赖关系:RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建立依赖关系

    protected def getDependencies: Seq[Dependency[_]] = deps
    
  • 分区器:可选,当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

    @transient val partitioner: Option[Partitioner] = None
    
  • 首选位置:可选,计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算,可以判断发送到哪个节点计算效率最优

    protected def getPreferredLocations(split: Partition): Seq[String] = Nil

4. 执行原理

在这里插入图片描述

  • 启动 Yarn 集群环境(ResourceManager 和 NodeManager)
  • Spark 通过申请资源创建调度节点(Driver)和计算节点(Executor)
  • Spark 框架根据需求将计算逻辑根据分区划分成不同的任务(task)
  • 调度节点将任务根据计算节点状态发送到对应的计算节点进行计算
  • 总结:RDD 在整个流程中主要用于将逻辑进行封装,并生成 Task 发送给 Executor 节点执行计算

5. RDD 创建

object TestRDDCreate {def main(args: Array[String]): Unit = {// 1.创建 spark 连接对象val sparkConf = new SparkConf.setMaster("local[*]").setAppName("RDD")val sc = new SparkContext(sparkConf)// 2.创建 RDD// 2.1 从集合(内存)中创建 RDD:parallelize() 和 makeRDD()val seq: Seq[Int] = Seq[Int](1,2,3,4)val rdd1: RDD[Int] = sc.parallelize(seq)val rdd2: RDD[Int] = sc.makeRDD(seq) // 推荐,makeRDD底层实现是调用parallelize方法rdd1.collect.foreach(println)rdd2.collect.foreach(println)println("=================")// 2.2 从外部存储(文件)创建 RDD:本地的文件系统、Hadoop支持的数据集(如HDFS、HBase等)// 2.2.1 使用本地文件的绝对路径或相对路径(相对于项目的根目录)创建val rdd3: RDD[String] = sc.textFile("D:\\data\\1.txt")// val rdd3: RDD[String] = sc.textFile("input/1.txt")// 2.2.2 使用目录创建(读取目录下所有文件)val rdd4 = sc.textFile("D:\\data") // 以行为单位读取,结果是内容字符串val rdd41 = sc.wholeTextFiles("D:\\data") // 以文件为单位读取,结果是二元组,第一个元素为文件路径,第二个元素为文件内容// 2.2.3 使用路径通配符创建val rdd5 = sc.textFile("input/1*.txt")// 2.2.4 使用分布式文件系统路径val rdd6 = sc.textFile("hdfs://hadoop102:8020/data/word.txt")println("=================")// 2.3 从其他 RDD 创建:通过一个 RDD 运算完后,再产生新的 RDD// 2.4 使用 new 直接创建 RDD:一般由 Spark 框架自身使用// 3.关闭连接sc.stop()}
}

6. RDD 并行度与分区

6.1 分区规则
  • RDD 有分区列表属性,可以将一个作业切分成多个任务后,发送给不同的 Executor 节点并行计算,而能够同时计算的任务数量称之为并行度

  • RDD 在创建时可以指定分区个数

    // makeRDD 方法的第二个参数表示分区的数量,其默认值为 defaultParallelism 方法的返回值;
    // defaultParallelism 方法底层最终执行返回的是 scheduler.conf.getInt("spark.default.parallelism", totalCores) 的返回值;
    // 首先会在 SparkConf 中获取 key="spark.default.parallelism" 的值,获取到则返回;如果获取不到则返回 totalCores 的值,即当前运行环境(机器)的最大可用 CPU 核数val rdd1 = sc.makeRDD(List(1,2,3,4), 2)// 将数据保存成对应个数的分区文件
    rdd1.saveAsTextFile("output1") // 目录下有 2 个分区文件// textFile 方法的第二个参数表示最小的分区数量;其默认值为 defaultMinPartitions 方法的返回值;
    // defaultMinPartitions 方法实现为:math.min(defaultParallelism, 2),如果没指定 "spark.default.parallelism" 的值,则最小分区数为 2
    // 可以通过第二个参数指定最小分区数
    // 由于 spark 读取文件底层使用的是 Hadoop 的 TextInputFormat,所以其分区计算使用的是 TextInputFormat 的 getSplits 方法:
    // 首先获取文件的总字节数:totalSize (如 7 字节);再根据指定的最小分区数计算出每个分区存储的字节大小:goalSize = totalSize/(numSplits == 0 ? 1 : numSplits) = 7/2 = 3;再根据 1.1 倍原则判断剩余的大小是否需要创建新分区:7 - 7/3 = 1,由于 1/3 + 1 > 1.1,所以需要创建新分区,即 7/3 + 1 = 3 个分区 
    val rdd2 = sc.textFile("input/1.txt", 2)rdd2.saveAsTextFile("output2") // 目录下有 3 个分区文件
    
  • 建立 Spark 连接时可以指定并行度配置

    // local 表示单核运行;local[n] 表示指定核数运行;local[*] 表示最大核数运行
    val sparkConf = new SparkConf.setMaster("local[*]").setAppName("spark")
    sparkConf.set("spark.default.parallelism", "5")val sc = new SparkContext(sparkConf)
    
6.2 分区数据分配规则
  • makeRDD 创建:集合(内存)数据

    // 以 5 个元素的集合和 3 个分区创建 RDD
    val list = List(1,2,3,4,5)
    val rdd = sc.makeRDD(list, 3)// makeRDD 底层调用 parallelize
    parallelize(list, 3)// parallelize 中创建 ParallelCollectionRDD
    new ParallelCollectionRDD(.., list, 3, ..)// ParallelCollectionRDD 中有核心属性分区列表
    def getPartitions: Array[Partition] = {// 调用 slice 方法slice(list, 3)
    }// slice 方法中有分配的核心方法 positions
    def positions(length: Int, numSlice: Int): Iterator[(Int,Int)] = { // 5, 3(0 until numSlices).iterator.map { // (0, 1, 2)i => { // 0 -- 1 -- 2val start = ((i * length) / numSlices).toInt // 0 -- 1 -- 3val end = (((i + 1) * length) / numSlices).toInt // 1 -- 3 -- 5(start, end) // (0,1) -- (1,3) -- (3,5)}}
    }// slice 中会对数据集进行类型模式匹配判断
    case _ => { // 调用 position 方法再映射positions(list.toArray.length, 3).map { // 5, 3case (start,end) => list.toArray.slice(start, end) // slice(from,until)// (0,1) -> (1,2,3,4,5) -> 1// (1,3) -> (1,2,3,4,5) -> 2,3// (3,5) -> (1,2,3,4,5) -> 4,5}
    }// 结论:3 个分区文件中的数据分配:【1】,【2,3】,【4,5】
    
  • textFile 创建:文件数据

    /**读取的文件内容:1.txt,@表示换行符的位置1@@2@@3
    */
    // 创建文件读取的 RDD
    val rdd = sc.textFile("input/1.txt", 2) // 由于文件为 7 字节,所以分区数为 3,每个分区存储 3 字节// 1.Spark文件读取是使用 hadoop 的方式以行为单位读取,与字节数无关
    // 2.Spark是以偏移量的形式来读取一行数据,且偏移量不会被重复读取
    /*1@@ -> 偏移量:0122@@ -> 偏移量:3453 -> 偏移量:6
    */
    // 3.每个分区所包含的偏移量范围:(起始偏移量 ~ 起始偏移量 + 分区字节数)
    /*分区0:[0 - 0 + 3 = 3]  -> 1@@2分区1:[3 - 3 + 3 = 6]    -> 3分区2:[6 - 6 + 3 = 9]    -> 
    */// 结论:3 个分区文件中的数据分配:【1, 2】,【3】,【】
    

相关文章:

  • 独家首发 | 基于 KAN、KAN卷积的轴承故障诊断模型
  • 【SpringMVC】_简单示例计算器
  • PMP认证与NPDP认证哪个含金量高?
  • Redis 和 Mysql 如何保证两者数据一致性
  • Android下HWC以及drm_hwcomposer普法(下)
  • 2024盘古石初赛(服务器部分)
  • 前后端联调小细节
  • Go 如何通过 Kafka 客户端库 生产与消费消息
  • 2023年西安交通大学校赛(E-雪中楼)
  • 【深度学习】YOLOv10实战:20行代码将笔记本摄像头改装成目标检测监控
  • 实现样式一键切换
  • 超融合架构下,虚拟机高可用机制如何构建?
  • android 应用安装目录
  • 跨境电商多店铺:怎么管理?风险如何规避?
  • 微信小程序-页面配置
  • ES6系统学习----从Apollo Client看解构赋值
  • Otto开发初探——微服务依赖管理新利器
  • RxJS: 简单入门
  • sessionStorage和localStorage
  • spring + angular 实现导出excel
  • ubuntu 下nginx安装 并支持https协议
  • 爱情 北京女病人
  • 产品三维模型在线预览
  • 关于使用markdown的方法(引自CSDN教程)
  • 基于Volley网络库实现加载多种网络图片(包括GIF动态图片、圆形图片、普通图片)...
  • 排序(1):冒泡排序
  • 巧用 TypeScript (一)
  • 十年未变!安全,谁之责?(下)
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 为视图添加丝滑的水波纹
  • Mac 上flink的安装与启动
  • ​【数据结构与算法】冒泡排序:简单易懂的排序算法解析
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • !!【OpenCV学习】计算两幅图像的重叠区域
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • (1)(1.19) TeraRanger One/EVO测距仪
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (2024)docker-compose实战 (8)部署LAMP项目(最终版)
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)springboot 校园学生兼职系统 毕业设计 742122
  • (附源码)springboot社区居家养老互助服务管理平台 毕业设计 062027
  • (十)T检验-第一部分
  • (十八)devops持续集成开发——使用docker安装部署jenkins流水线服务
  • (十三)Java springcloud B2B2C o2o多用户商城 springcloud架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)...
  • (四)React组件、useState、组件样式
  • (转)C语言家族扩展收藏 (转)C语言家族扩展
  • (转)EXC_BREAKPOINT僵尸错误
  • (转)Oracle 9i 数据库设计指引全集(1)
  • (转)Sql Server 保留几位小数的两种做法
  • (转载)Google Chrome调试JS
  • (转载)微软数据挖掘算法:Microsoft 时序算法(5)
  • .h头文件 .lib动态链接库文件 .dll 动态链接库
  • .net core 实现redis分片_基于 Redis 的分布式任务调度框架 earth-frost
  • .NET简谈设计模式之(单件模式)