Spark Rdd之mapToPair,flatMapToPair
文章目录
- spark.api.java
- mapToPair
- flatMapToPair
- spark.api.scala
spark.api.java
mapToPair
此函数会对一个RDD中的每个元素调用f函数,调用f函数后会进行一定的操作把每个元素都转换成一个<K,V>类型的对象
- 源码
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
- demo
val rdd: RDD[String] = sc.makeRDD(Seq("java", "scala", "rdd"))
val javaPairRdd: JavaPairRDD[String, Int] = rdd.toJavaRDD().mapToPair(f => {
(f, 1)
})
javaPairRdd.foreach(println)
//(java,1)
//(scala,1)
//(rdd,1)
flatMapToPair
函数相对于mapToPair多了个flatMap的功能,它能够将元素拆分后在转换成键值对
- 源码
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
- demo
val flatMapPairRdd: JavaPairRDD[String, Int] = rdd.toJavaRDD().flatMapToPair(f => {
val arrayList = new util.ArrayList[(String, Int)]()
arrayList.add((f, 1))
arrayList.iterator()
})
flatMapPairRdd.foreach(println)
//(java,1)
//(scala,1)
//(rdd,1)
spark.api.scala
scala 无这两个算子,以下是代替方案(麻烦),建议还是转javaRdd吧
maptoPair = map
flatMapToPair = flatMap + map