当前位置: 首页 > news >正文

Spark RDD算子

Spark RDD算子

转换算子(Transformation Operators)

类别算子名称简要介绍
映射类算子map对RDD中的每个元素进行操作,返回一个新的RDD
flatMap类似于map,但每个输入元素可映射到0或多个输出元素
mapPartitions对RDD的每个分区中的元素进行操作,返回一个新的RDD
mapPartitionsWithIndex类似于mapPartitions,但提供了分区索引
mapValues对键值对RDD中的每个值进行操作,返回一个新的键值对RDD
flatMapValues对键值对RDD中的每个值进行操作,每个值可映射到0或多个输出元素
过滤类算子filter过滤出满足条件的元素,返回一个新的RDD
distinct去除重复元素,返回一个新的RDD
分组类算子groupBy根据指定的函数对RDD中的元素进行分组,返回一个新的RDD
groupByKey对键值对RDD中的值进行分组,返回一个新的键值对RDD
reduceByKey对具有相同键的值进行合并,返回一个新的键值对RDD
aggregateByKey使用指定的聚合函数和中间结果进行聚合,返回一个新的键值对RDD
combineByKey类似于aggregateByKey,但允许更灵活的聚合过程
cogroup对两个或多个键值对RDD进行分组,返回一个新的RDD
排序类算子sortByKey根据键对键值对RDD进行排序,返回一个新的键值对RDD
sortBy根据指定的函数对RDD进行排序,返回一个新的RDD
repartitionAndSortWithinPartitions对RDD进行重新分区并在每个分区内排序,返回一个新的RDD
连接类算子union合并两个RDD,返回一个新的RDD
intersection计算两个RDD的交集,返回一个新的RDD
subtract从一个RDD中移除另一个RDD中的元素,返回一个新的RDD
cartesian计算两个RDD的笛卡尔积,返回一个新的RDD
join根据键连接两个键值对RDD,返回一个新的键值对RDD
leftOuterJoin左外连接两个键值对RDD,返回一个新的键值对RDD
rightOuterJoin右外连接两个键值对RDD,返回一个新的键值对RDD
fullOuterJoin全外连接两个键值对RDD,返回一个新的键值对RDD
分区类算子sample随机采样RDD中的元素,返回一个新的RDD
randomSplit随机划分RDD中的元素,返回一个RDD数组
pipe将RDD的每个分区作为外部进程的输入,返回一个新的RDD
coalesce减少RDD的分区数,返回一个新的RDD
repartition增加或减少RDD的分区数,并进行数据洗牌,返回一个新的RDD
partitionBy根据指定的分区器对键值对RDD进行分区,返回一个新的键值对RDD

行动算子(Action Operators)

类别算子名称简要介绍
聚合类算子reduce对RDD中的元素进行归约操作,返回一个结果
fold类似于reduce,但带有初始值
aggregate使用指定的聚合函数和中间结果对RDD进行聚合,返回一个结果
计数类算子count计算RDD中的元素数量,返回一个长整型结果
countByKey计算每个键的出现次数,返回一个键值对Map
countByValue计算每个值的出现次数,返回一个值对Map
收集类算子collect将RDD的所有元素收集到驱动程序,返回一个数组
collectAsMap将键值对RDD的所有元素收集到驱动程序,返回一个Map
take获取RDD的前n个元素,返回一个列表
takeSample随机获取RDD的样本,返回一个列表
takeOrdered获取排序后的RDD的前n个元素,返回一个列表
获取类算子first获取RDD的第一个元素,返回一个元素
top获取排序后的RDD的前n个元素,返回一个列表
遍历类算子foreach对RDD中的每个元素执行操作,不返回结果
foreachPartition对RDD的每个分区中的元素执行操作,不返回结果
保存类算子saveAsTextFile将RDD保存到文本文件中
saveAsSequenceFile将RDD保存为序列文件
saveAsObjectFile将RDD保存为对象文件
saveAsHadoopFile将RDD保存为Hadoop文件
saveAsNewAPIHadoopFile将RDD保存为新API的Hadoop文件
saveAsHadoopDataset将RDD保存为Hadoop数据集

一、转换算子(Transformation Operators)

1、逐条处理

// val rdd2: RDD[U] = rdd.map(f: T=>U)
val data = Array("word1", "word2", "word3")
// 将单词 word 映射为元组 (word, 1)
val rdd: RDD[(String, Int)] = sc.parallelize(data).map(word => (word, 1))

2、扁平化处理

val sentences = List("Hello world", "Apache Spark is awesome", "FlatMap example")
// 将一个句子拆分为多个单词,1=>多,并将所有的结果元素合并为一个新的 RDD
val rdd: RDD[String] = sc.parallelize(sentences).flatMap(word => word.split(" "))

3、分区内逐行处理

【✔ 】:以分区为单位(分区不变)逐行处理数据
map VS mapPartitions
1、数量:前者一进一出IO数量一致,后者多进多出IO数量不一定一致
2、性能:前者多分区逐条处理,后者各分区并行逐条处理更佳,常时间占用内存易导致OOM,内存小不推荐
3、类型:两者入口和出口类型都必须一致,后者进出都必须是迭代器

// 在同一个 mapPartitions 操作中进行过滤和转换操作,可以减少对数据的多次遍历,从而提高性能。
mapParitions( 
// ≈ 子查询
it.filter(...) // 谓词下缀
)// 分别使用 mapPartitions 和 filter 进行转换和过滤操作,增加计算开销。
mapParitions(...)
fielter(...)     // where
// 一个包含整数的 RDD,过滤出偶数并将其乘以 2。
val data = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
val rdd: RDD[Int] = sc.parallelize(data, 3) // 创建3个分区
// 使用 mapPartitions 进行过滤和转换
val resultRDD: RDD[Int] = rdd.mapPartitions { it =>val filtered = it.filter(_ % 2 == 0) // 过滤偶数filtered.map(_ * 2) // 将每个偶数乘以 2
}// 【分区内逐行处理】:以分区为单位(分区不变)逐行处理数据,并追加分区编号
val rdd2: RDD[U] = rdd.mapPartitionsWithIndex(f:(Int,Iterator[T])=>Iterator[U][,preservePar:Boolean])
// mapPartitionsWithIndex 功能同mapPartitions,区别在于追加了分区编号,一般用于去掉表头和分析调试

4、转内存数组

分区的数据转为同类型的内存数组,分区不变 rdd:RDD[T]

// val rdd2: RDD[Array[T]] = rdd.glom();
// 创建一个包含 1 到 100 的整数 RDD,分为 5 个分区
val rdd = sc.parallelize(1 to 100, 5)
// 使用 glom() 将每个分区的元素合并成一个数组
val glommedRDD = rdd.glom()
// 输出每个分区合并后的数组
glommedRDD.collect().foreach { partition =>println("Partition contains: " + partition.mkString(", "))
}

5、数据过滤

过滤规则 f:T=>Boolean,保留满足条件数据,分区不变,不推荐用
【数据可能倾斜】某些分区的数据量远远超过了其他分区,造成数据分布不均匀

// val rdd2: RDD[T] = rdd.filter(f:T=>Boolean)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 filter 过滤出偶数
val evenRDD = rdd.filter(num => num % 2 == 0)

6、数据分组

同键同组同区,同区可多组;打乱shuffle,按f:T=>K规则,分区不变,【数据可能倾斜】

// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, partioner:Partitioner)
// val rdd2: RDD[(K,Iterable[T])] = rdd.groupBy(f:T=>K, numPartitions:Int)// 定义一个自定义分区器,根据奇偶数进行分区
class OddEvenPartitioner(numPartitions: Int) extends Partitioner {override def numPartitions: Int = numPartitionsoverride def getPartition(key: Any): Int = {val k = key.toStringif (k == "even") 0 else 1}
}	
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 groupBy 方法,根据奇偶数进行分组
val groupedRDD = rdd.groupBy(num => if (num % 2 == 0) "even" else "odd", new OddEvenPartitioner(2))

7、数据抽样

函数名:sample
参数:
withReplacement:Boolean 是否有放回抽样
fraction:Double 抽样率
seed:Long 随机种子,默认为当前时间戳(一般缺省)
无放回抽样: sample(false, 0.4) => 抽样40%的数据,40条左右
有放回抽样: sample(true, 0.4) => 每条数据被抽取的概率为40% (可能有重复的元素)

// val rdd2: RDD[T] = rdd.sample(withReplacement:Boolean, fraction:Double, seed:Long)
// 创建一个包含 1 到 10 的整数 RDD
val rdd = sc.parallelize(1 to 10)
// 使用 sample 方法,从 RDD 中有放回随机抽样抽中每个数概率为0.9 
val sampledRDD = rdd.sample(withReplacement = true, fraction = 0.9)

8、数据去重

采用该方法去重,数据规模比较大的情况下,数据压力比较大,因为数据需要在不同的分区间比较
一般采用分组的方式,将去重字段作为分组字段,在不同的分区内并行去重

val rdd2: RDD[T] = rdd.distinct()
// numPartitions: Int 设定去重后的分区数
// 隐式参数 order: Ordering[T] 用于指定元素类型 T 的排序方式,以便在对元素进行去重时进行比较。
val rdd2: RDD[T] = rdd.distinct(numPartitions: Int)(implicit order: Ording[T] = null)

9、数据排序

处理数据f:T=>K,升降序asc: Boolean,分区数numPartitions:Int
默认排序前后分区一致,【有shuffle】,除非重新设定 numPartitions
全局排序,多分区间交换数据,效率较低。优化见 PairRDD
若:K为基本类型,则无需提供第二参数列表中的隐式参数 ord: Ordering[K]
若:K为自定义类型,则必须提供第二参数

case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)

sortBy

// val rdd: RDD[T] = rdd.sortBy(f:T=>K,asc:Boolean,numPartitions:Int)(implicit ord: Ordering[K], ctag: ClassTag[K])val rdd = sc.parallelize(stuData)
// 使用 sortBy 方法对 RDD 按照 stu_class 进行排序,升序,指定分区数为 2
val sortedRdd: RDD[Student] = rdd.sortBy(stu => stu.stu_class, ascending = true, 2)// 按照班级升序排序,如果班级相同则按照学生 ID 降序
val sortedRdd: RDD[Student] = rdd.sortBy(s => s, ascending = true, numPartitions = 2)(Ordering.by(s => (s.stu_class, -s.stu_id)),ClassTag(classOf[Student])    // 显式地传递了 ClassTag 参数 ClassTag(classOf[Student]),以确保在运行时能获取 Student 类型信息。
)

sortByKey

//  按照班级升序排序,如果班级相同则按照学生 ID 降序
val rdd = sc.parallelize(stuData)
// 将 RDD 转换为键值对 RDD,其中键是 (stu_class, -stu_id)
val pairRDD: RDD[((String, Int), Student)] = rdd.map(student => ((student.stu_class, -student.stu_id), student))
// 使用 sortByKey 方法对键进行排序
val sortedPairRDD: RDD[((String, Int), Student)] = pairRDD.sortByKey()
// 取出排序后的 Student 对象
val sortedRDD: RDD[Student] = sortedPairRDD.map(_._2)

10、交并补差

多个类型 RDD[T]:纵向
交并差操作:数据类型一致,根据元素 equals 认定是否相同
【自定义类型】:必须重写 equals 方法,因为默认等值判断 == 判断地址
拉链操作:要求分区数和分区内的数据量一致

val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))
// 【求交集】:重载可重设分区数numPartitions: Int,或定义分区规则par: Partitioner[T]
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.intersection(rdd2:RDD[T], par:Partitioner[T])
// 【求并集】:不去重
val rdd3: RDD[T] = rdd1.union(rdd2:RDD[T])
// 【求差集】:重载可重设分区数numPartitions:Int,或定义分区规则par:Partitioner[T]
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T])
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], numPartitions:Int)
val rdd3: RDD[T] = rdd1.subtract(rdd2:RDD[T], par:Partitioner[T])

11、拉链操作

val rdd1: RDD[Int] = sc.makeRDD(Seq(1, 3, 4, 6, 4, 3, 33, 31))
val rdd2: RDD[Int] = sc.makeRDD(Seq(2, 2, 4, 5, 1, 22, 3, 1))val rdd2: RDD[(T,U)] = rdd1.zip(rdd2:RDD[U])
val rdd2: RDD[(T,Long)] = rdd1.zipWithIndex()
val rdd2: RDD[(T,Long)] = rdd1.zipWithUniqueId()
// 有三个重载:1+1,1+2,1+3
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A])(f:(Iterator[T],Iterator[A])=>Iterator[V])
val rdd2: RDD[V]=rdd.zipPartitions(rddA:RDD[A],preserveParitioning:Boolean)(f:(Iterator[T],Iterator[A])=>Iterator[V])

12、键值映射类算子

map

case class Student(stu_id: Int, stu_name: String, stu_class: String)
val stuData = Seq(Student(1, "李明", "ccc"),Student(2, "小刚", "bbb"),Student(3, "小红", "aaa"),Student(4, "张三", "aaa")
)
val rdd = sc.parallelize(stuData)// 使用 map 对 RDD 中的每个 Student 对象的 stu_id 增加 1
val resultRdd1 = rdd.map(stu => stu.copy(stu_id = stu.stu_id + 1))// Student 对象转换为一个包含 (stu_id, stu_name) 元组的 RDD
val resultRdd2 = rdd.map(stu => (stu.stu_id, stu.stu_name))

mapPartitions

val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6), numSlices = 2)
val resultRDD = rdd.mapPartitions(iter => {// 对每个分区的元素求和val sum = iter.sum// 返回求和结果作为新的分区Iterator(sum)
})
resultRDD.collect().foreach(println)	// 输出为6 15

13、键值分组聚合类

reduceByKey

// reduceByKey + foldByKey + aggregateByKey 都调用 combineByKeyClassTag
// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同
// group by + combiner + reduce
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(f:(V,V)=>V, numPartitions:Int)
val pairRdd2:RDD[(K,V)] = pairRdd.reduceByKey(partitioner:Partitioner, f:(V,V)=>V)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val totalScoresRDD = scoresRDD.reduceByKey((score1, score2) => score1 + score2)
// 简写 val totalScoresRDD = scoresRDD.reduceByKey(_+_)
totalScoresRDD.collect().foreach(println)	// 输出(Alice,240) (Bob,185)

foldByKey

// 【按键聚合值】: combiner和reduce的值类型相同,计算规则相同,带初值
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,numPartitions:Int)(inParOp:(V,V)=>V)
val pairRdd2:RDD[(K,V)] = pairRdd.foldByKey(initV:V,partitioner:Partitioner)(inParOp:(V,V)=>V)
// 数据同上 reduceByKey
val initialScores = 0
val totalScoresRDD = scoresRDD.foldByKey(initialScores)((score1, score2) => score1 + score2)

aggregateByKey

// 与 reduceByKey 和 foldByKey 不同,aggregateByKey 允许指定一个初始值和两个聚合函数,可以对每个键的值进行更复杂的聚合操作。
// 【✔ 按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,numPartitions:Int)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.aggregateByKey(initV:U,partitioner:Partitioner)(inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val rddPair2: RDD[(String, Float)] = rddPair.aggregateByKey(0.0f)(_+_,_+_)val resultRDD = pairRdd.aggregateByKey(zeroValue)((acc, value) => {// 对每个分区中的值进行聚合操作// 初始值为 zeroValue// 返回值将作为每个分区聚合的结果// 这个函数在每个分区内执行// acc 是聚合的累加器,value 是当前处理的值// 返回一个新的累加器},(acc1, acc2) => {// 对多个分区的聚合结果进行合并操作// acc1 和 acc2 是两个分区的聚合结果// 返回合并后的结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))
val zeroValue = (0, 0) // 初始值为元组 (sum, count)
val resultRDD = scoresRDD.aggregateByKey(zeroValue)((acc, score) => {// 分区内聚合函数,计算每个学生的总成绩和数量(acc._1 + score, acc._2 + 1)},(acc1, acc2) => {// 分区间聚合函数,合并每个学生的总成绩和数量(acc1._1 + acc2._1, acc1._2 + acc2._2)}
)
// 每个学生的总成绩和数量
resultRDD.foreach(println)	// (Alice,(240,3)) (Bob,(185,2))

combineByKey

// 【按键分别执行分区内和分区间计算】: combiner和reduce的值类型可不同,计算规则可不同
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,numPartitions:Int)
val pairRdd2:RDD[(K,U)] = pairRdd.combineByKey(initV:V=>U,inParOp:(U,V)=>U,betParOp:(U,U)=>U,partitioner:Partitioner,mapSideCombine:Boolean,serializer:Serializer)val resultRDD = pairRDD.combineByKey((value) => {// 创建组合器函数,用于将每个值转换为另一种形式(通常是一个累加器或缓冲区)// 返回值将作为每个键的第一个值的处理结果},(acc, value) => {// 合并值函数,用于将每个值与累加器或缓冲区进行合并// 返回值将作为每个键的聚合结果},(acc1, acc2) => {// 合并累加器函数,用于合并两个累加器或缓冲区// 返回值将作为多个分区的聚合结果}
)
val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val resultRDD = scoresRDD.combineByKey((score) => (score, 1), // 创建组合器,初始值为 (成绩, 1),表示总成绩和数量(acc, score) => (acc._1 + score, acc._2 + 1), // 合并值,累加总成绩和数量(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) // 合并累加器,合并总成绩和数量
)// 计算平均成绩
val avgScoresRDD = resultRDD.mapValues(sumCount => sumCount._1.toDouble / sumCount._2)avgScoresRDD.collect().foreach(println)

groupByKey

// groupByKey 会将具有相同键的元素分组在一起,但是这个操作可能会导致数据倾斜,因为它会将所有具有相同键的元素放在同一个分区中。
// 【✔ 按键分组】
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey()
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(numPartitions:Int)
val pairRdd2: RDD[(K, Iterable[V])] = pairRdd.groupByKey(partitioner:Partitioner)val scoresRDD = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val groupedRDD = scoresRDD.groupByKey()groupedRDD.collect().foreach { case (name, scores) =>println(s"$name: ${scores.mkString(", ")}")
}

14、关联聚合

groupWith

// groupWith 对两个 RDD 中具有相同键的元素进行分组,返回一个新的 RDD,其中每个键对应的值是一个包含两个 RDD 中相同键的元素组成的元组。
// 【多数据集分组】:1VN 同键同组,不同RDD值进入TupleN的不同Iterable
-------------------------------------------------------------------------------
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.groupWith(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1],Iterable[V2],Iterable[V3])] = pairRdd.groupWith(otherA: RDD[(K,V1)],otherB: RDD[(K,V2)],otherC: RDD[(K,V3)])val scoresRDD1 = sc.parallelize(Seq(("Alice", 80),("Bob", 90),("Alice", 85),("Bob", 95),("Alice", 75)
))val scoresRDD2 = sc.parallelize(Seq(("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good"),("Bob", "Excellent"),("Alice", "Good")
))val groupedRDD = scoresRDD1.groupWith(scoresRDD2)groupedRDD.collect().foreach { case (name, (scores1, scores2)) =>println(s"$name: (${scores1.mkString(", ")}) | (${scores2.mkString(", ")})")
}

cogroup

// 重载 1+1 1+2 1+3,追加再分区操作
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)])
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],numPartitions:Int)
val pairRdd2: RDD[(K, (Iterable[V],Iterable[V1])] = pairRdd.cogroup(otherA: RDD[(K,V1)],partitioner:Partitioner)
// 示例同groupWith,两者区别:cogroup可对分组后的元素进行更复杂的操作,如在分组后对每个键的值进行聚合计算。

15、关联操作

【关联操作】:1V1
横向,根据键做关联
重载:numPartitions:Int 或 partitioner:Partitioner

val pairRdd: RDD[(K, (V, V1))] = pairRdd1.join(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (V, Option[V1]))] = pairRdd1.leftOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), V1)] = pairRdd1.rightOuterJoin(pairRdd3:RDD[(K,V1)])
val pairRdd: RDD[(K, (Option[V]), Option[V1])] = pairRdd1.fullOuterJoin( pairRdd3:RDD[(K,V1)])// 创建两个 RDD
val rdd1 = sc.parallelize(Seq((1, "Alice"),(2, "Bob"),(3, "Charlie")
))
val rdd2 = sc.parallelize(Seq((1, 25),(3, 30),(4, 35)
))
// join:内连接
val innerJoinRDD = rdd1.join(rdd2)
// leftOuterJoin:左外连接
val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
// rightOuterJoin:右外连接
val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
// fullOuterJoin:全外连接
val fullOuterJoinRDD = rdd1.fullOuterJoin(rdd2)

二、行动算子(Action Operators)

【返回】所有元素分别在分区间和分区内执行【聚集】操作的结果
reduce & fold 分区内和分区间执行相同操作,且类型与元素类型一致
aggregate 分区内和分区间执行不同操作,且类型与元素类型不一致

val rst:T = rdd.reduce(f:(T,T)=>T)
val rst:T = rdd.fold(init:T)(f:(T,T)=>T)
val rst:U = rdd.aggregate(init:U)(f:(U,T)=>U,f:(U,T)=>U)
val array:Array[T] = rdd.collect()	// 返回包含数据集中所有元素的数组val rst:Long = rdd.count()	// 返回数据集中元素数量val rst:Map[K,Long] = pairRdd.countByKey()	// 返回一个包含键值对的 Map,其中键是 RDD 中的键,值是对应键出现的次数。val rst:T = rdd.max()	// 返回数据集中最大值val rst:T = rdd.min()	// 返回数据集中最小值val rst:T = rdd.first()	// 返回数据集中的第一个元素val array:Array[T] = rdd.take(num:Int)	// 返回数据集中的前 num 个元素val array:Array[T] = rdd.takeOrdered(num:Int)(implicit ord:Ordering[T])	// 返回排序后数据集中的前 num 个元素rdd.foreach(f:T=>Unit)	// 遍历迭代
rdd.foreachPartition(f:Iterable[T]=>Unit) // 写数据库操作首选

持久化至文本文件,重载追加压缩功能

import org.apache.hadoop.io.compress.{BZip2Codec, SnappyCodec}
import io.airlift.compress.lzo.LzopCodec
rdd.saveAsTextFile("out_path",classOf[BZip2Codec])
rdd.saveAsTextFile(path:String)
rdd.saveAsTextFile(path:String,codec: Class[_ <: CompressionCodec])
rdd.saveAsObjectFile(path:String)

三、Spark RDD 并行度控制

默认的并行度:200

分区数的体现方式

  • 分区数 numPartitions | numSlices: Int

    • 示例:val rdd = sc.parallelize(data, numPartitions = 5)

    • 分区逻辑

      partitionIndex = fieldName.hashCode() % numPartitions
      
    • 扩展随机字段0 ~ numPartitions

  • 分区器 partitioner: Partitioner (针对键值对 RDD)

    • 默认的分区器HashPartitioner
再分区算子

用于对数据进行重新分配。

  • coalescecoalesce(numPartitions: Int, shuffle: Boolean)
    • 用于减少分区数,通常在不需要洗牌时使用。
    • 示例:val coalescedRDD = rdd.coalesce(2, shuffle = false)
  • repartitionrepartition(numPartitions: Int)
    • 等同于 coalesce(numPartitions, true),用于增加或减少分区数,并进行数据洗牌。
    • 示例:val repartitionedRDD = rdd.repartition(4)

相关文章:

  • 代码随想录算法训练营第三十一天| 455.分发饼干,376. 摆动序列 ,53. 最大子序和
  • 10进制与二、八、十六进制的转换
  • Day25 首页待办事项及备忘录添加功能
  • MFC 使用sapi文字转换为语音
  • 跨域、JSONP、CORS、Spring、Spring Security解决方案
  • maven的install不报错但deploy到nexus报400错误
  • AI大模型探索之路-实战篇16:优化决策流程:Agent智能数据分析平台中Planning功能实践
  • Python 中的装饰器及其作用
  • Oracle数据库连接并访问Microsoft SQL Server数据库
  • Apple开发者macOS设备与描述文件Profile创建完整过程
  • 协程库——面试问题
  • 大模型Prompt-Tuning技术进阶
  • 中介子方程十一
  • 异常(Exception)
  • 【VUE3 element时间选择器默认选择七天】
  • 3.7、@ResponseBody 和 @RestController
  • js正则,这点儿就够用了
  • Phpstorm怎样批量删除空行?
  • Python进阶细节
  • ReactNativeweexDeviceOne对比
  • Spark RDD学习: aggregate函数
  • vagrant 添加本地 box 安装 laravel homestead
  • Vim Clutch | 面向脚踏板编程……
  • Vue.js 移动端适配之 vw 解决方案
  • 道格拉斯-普克 抽稀算法 附javascript实现
  • 复杂数据处理
  • 给初学者:JavaScript 中数组操作注意点
  • 诡异!React stopPropagation失灵
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 配置 PM2 实现代码自动发布
  • 区块链将重新定义世界
  • 驱动程序原理
  • 小程序01:wepy框架整合iview webapp UI
  • 在 Chrome DevTools 中调试 JavaScript 入门
  • NLPIR智能语义技术让大数据挖掘更简单
  • (+3)1.3敏捷宣言与敏捷过程的特点
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (day 12)JavaScript学习笔记(数组3)
  • (DenseNet)Densely Connected Convolutional Networks--Gao Huang
  • (windows2012共享文件夹和防火墙设置
  • (附源码)spring boot北京冬奥会志愿者报名系统 毕业设计 150947
  • (函数)颠倒字符串顺序(C语言)
  • (四)【Jmeter】 JMeter的界面布局与组件概述
  • (四)图像的%2线性拉伸
  • (一)80c52学习之旅-起始篇
  • (一一四)第九章编程练习
  • (转)微软牛津计划介绍——屌爆了的自然数据处理解决方案(人脸/语音识别,计算机视觉与语言理解)...
  • .a文件和.so文件
  • .net打印*三角形
  • :=
  • @Autowired多个相同类型bean装配问题
  • @html.ActionLink的几种参数格式
  • [ 云计算 | Azure 实践 ] 在 Azure 门户中创建 VM 虚拟机并进行验证
  • [12] 使用 CUDA 进行图像处理
  • [240621] Anthropic 发布了 Claude 3.5 Sonnet AI 助手 | Socket.IO 拒绝服务漏洞