当前位置: 首页 > news >正文

Spark算子实战Java版,学到了

(一)概述

算子从功能上可以分为Transformations转换算子和Action行动算子。转换算子用来做数据的转换操作,比如map、flatMap、reduceByKey等都是转换算子,这类算子通过懒加载执行。行动算子的作用是触发执行,比如foreach、collect、count等都是行动算子,只有程序运行到行动算子时,转换算子才会去执行。

本文将介绍开发过程中常用的转换算子和行动算子,Spark代码基于Java编写,前置代码如下:

public class SparkTransformationTest {
    public static void main(String[] args) {
        // 前置准备
        SparkConf conf = new SparkConf();
        conf.setMaster("local[1]");
        conf.setAppName("SPARK ES");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
    }
}

(二)转换算子

map

map(func):通过函数func传递的每个元素,返回一个新的RDD。

JavaRDD<Object> map = javaRdd.map((Function<String, Object>) 
        item -> "new" + item);
map.foreach(x -> System.out.println(x));

返回一个新的RDD,数据是newa、newb、newc、newd、newe

filter

filter(func):筛选通过func处理后返回 true 的元素,返回一个新的RDD。

JavaRDD<String> filter = javaRdd.filter(item -> item.equals("a") || item.equals("b"));
filter.foreach(x -> System.out.println(x));

返回的新RDD数据是a和b。

flatMap

flatMap(func):类似于 map,但每个输入项可以映射到 0 个或更多输出项。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a,b", "c,d,e", "f,g"));
JavaRDD<String> flatMap = javaRdd.flatMap((FlatMapFunction<String, String>) 
        s -> Arrays.asList(s.split(",")).iterator());
flatMap.foreach(x -> System.out.println(x));

入参只有3个,经过flatMap映射后返回了长度为7的RDD。

mapPartitions

mapPartitions(func):类似于map,但该函数是在RDD每个partition上单独运行,因此入参会是Iterator<Object>

JavaRDD<String> mapPartitions = javaRdd.mapPartitions((FlatMapFunction<Iterator<String>, String>) stringIterator -> {
    ArrayList<String> list = new ArrayList<>();
    while (stringIterator.hasNext()) {
        list.add(stringIterator.next());
    }
    return list.iterator();
});
mapPartitions.foreach(x -> System.out.println(x));

除了是对Iterator进行处理之外,其他的都和map一样。

union

union(otherDataset):返回一个新数据集,包含两个数据集合的并集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("1", "2", "3", "4", "5"));
JavaRDD<String> unionRdd = javaRdd.union(newJavaRdd);
unionRdd.foreach(x-> System.out.println(x));

intersection

intersection(otherDataset):返回一个新数据集,包含两个数据集合的交集。

JavaRDD<String> newJavaRdd = sc.parallelize(Arrays.asList("a", "b", "3", "4", "5"));
JavaRDD<String> intersectionRdd = javaRdd.intersection(newJavaRdd);
intersectionRdd.foreach(x-> System.out.println(x));

groupByKey

groupByKey ([ numPartitions ]):在 (K, V) 对的数据集上调用时,返回 (K, Iterable) 对的数据集,可以传递一个可选numPartitions参数来设置不同数量的任务。

这里需要了解Java中的另外一种RDD,JavaPairRDD。JavaPairRDD是一种key-value类型的RDD,groupByKey就是针对JavaPairRDD的API。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Iterable<Integer>> groupByKey = javaPairRDD.groupByKey();
groupByKey.foreach(x -> System.out.println(x._1()+x._2()));

最终输出结果:

a[1, 2]
b[1]
c[3]

reduceByKey

reduceByKey(func, [numPartitions]):在 (K, V) 对的数据集上调用时,返回 (K, V) 对的数据集,其中每个键的值使用给定的 reduce 函数func聚合。和groupByKey不同的地方在于reduceByKey对value进行了聚合处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> reduceRdd = javaPairRDD.reduceByKey((Function2<Integer, Integer, Integer>) (x, y) -> x + y);
reduceRdd.foreach(x -> System.out.println(x._1()+x._2()));

最终输出结果:

a3
b1
c3

aggregateByKey

aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]):aggregateByKey这个算子相比上面这些会复杂很多,主要参数有zeroValue、seqOp、combOp,numPartitions可选。

zeroValue是该算子设置的初始值,seqOp函数是将rdd中的value值和zeroValue进行处理,combOp是将相同key的数据进行处理。

JavaRDD<String> rdd = sc.parallelize(Arrays.asList("a:1", "a:2", "b:1", "c:3"));
JavaPairRDD<String, Integer> javaPairRDD = rdd.mapToPair(s -> {
    String[] split = s.split(":");
    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
});
JavaPairRDD<String, Integer> aggregateRdd = javaPairRDD.aggregateByKey(1,
        // seqOp函数中的第一个入参是 zeroValue,第二个入参是rdd的value,这里对所有的value+1
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2,
        // combOp函数是对同一个key的value进行处理,这里对相同key的value进行相加
        (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2);
aggregateRdd.foreach(x -> System.out.println(x._1()+":"+x._2()));

最终输出结果如下:

a:4
b:2
c:4

(三)行动算子

reduce

reduce(func):使用函数func聚合数据集的元素(它接受两个参数并返回一个)。
下面这段代码对所有rdd进行相加:

String reduce = javaRdd.reduce((Function2<String, String, String>) (v1, v2) -> {
    System.out.println(v1 + ":" + v2);
    return v1+v2;
});
System.out.println("result:"+reduce);

最终结果如下,从结果可以看出,每次对v1都是上一次reduce运行之后的结果:

a:b
ab:c
abc:d
abcd:e
result:abcde

collect()

collect():将driver中的所有元素数据通过集合的方式返回,适合小数据量的场景,大数据量会导致内存溢出。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> collect = javaRdd.collect();

count()

count():返回一个RDD中元素的数量。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
long count = javaRdd.count();

first()

first():返回第一个元素。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
String first = javaRdd.first();

take

take(n):返回前N个元素,take(1)=first()。

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.take(3);

takeOrdered

takeOrdered(n, [ordering]):返回自然排序的前N个元素,或者指定排序方法后的前N个元素。首先写一个排序类。

public class MyComparator implements Comparator<String>, Serializable {
    @Override
    public int compare(String o1, String o2) {
        return o2.compareTo(o1);
    }
}

接着是调用方式:

JavaRDD<String> javaRdd = sc.parallelize(Arrays.asList("a", "b", "c", "d", "e"));
List<String> take = javaRdd.takeOrdered(3, new MyComparator());

foreach

foreach(func):该函数对数据集的每个RDD运行func函数,foreach算子在上面的代码中已经使用到,这里不再做代码案例展示。

(四)总结

Spark的开发可以用Java或者Scala,Spark本身使用Scala编写,具体使用哪种语言进行开发需要根据项目情况考虑时间和学习成本。具体的API都可以在Spark官网查询:https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

相关文章:

  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 【设计模式】快速理解装饰者模式,及其在JDK源码中的应用
  • 你真的了解Maven吗?
  • 【转】Xcode常用快捷键与技巧分享
  • 【设计模式】快速理解观察者模式,原来它还有这么多其他名字
  • linux实际应用小技巧
  • 时间类有多复杂,JDK竟设计了三版
  • AOP之PostSharp5-LocationInterceptionAspect
  • 如何快速学习一门新技术
  • 模拟实现部分库函数(strcpy,strcmp,strcat,strstr,memcpy,memmove,memset)
  • 组成原理(一):计算机是如何组成的
  • JDK9相比于JDK8,究竟变强了多少
  • Hive之分区(Partitions)和桶(Buckets)
  • 列式存储?OLAP?ClickHouse究竟是何方神圣
  • 分享Open-E DSS V7 应用系列十篇!
  • ES6指北【2】—— 箭头函数
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • chrome扩展demo1-小时钟
  • golang 发送GET和POST示例
  • HTML-表单
  • interface和setter,getter
  • Intervention/image 图片处理扩展包的安装和使用
  • Java编程基础24——递归练习
  • Python_OOP
  • QQ浏览器x5内核的兼容性问题
  • REST架构的思考
  • Spring Cloud Alibaba迁移指南(一):一行代码从 Hystrix 迁移到 Sentinel
  • 大快搜索数据爬虫技术实例安装教学篇
  • 快速体验 Sentinel 集群限流功能,只需简单几步
  • 批量截取pdf文件
  • 悄悄地说一个bug
  • 最简单的无缝轮播
  • k8s使用glusterfs实现动态持久化存储
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #Linux(Source Insight安装及工程建立)
  • #微信小程序:微信小程序常见的配置传旨
  • $ git push -u origin master 推送到远程库出错
  • (175)FPGA门控时钟技术
  • (LeetCode 49)Anagrams
  • (初研) Sentence-embedding fine-tune notebook
  • (动态规划)5. 最长回文子串 java解决
  • (每日持续更新)jdk api之FileReader基础、应用、实战
  • (十六)一篇文章学会Java的常用API
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • .NET CORE Aws S3 使用
  • .Net 高效开发之不可错过的实用工具
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)
  • .NET分布式缓存Memcached从入门到实战
  • .net生成的类,跨工程调用显示注释
  • .net中我喜欢的两种验证码
  • .ui文件相关
  • [2016.7 Day.4] T1 游戏 [正解:二分图 偏解:奇葩贪心+模拟?(不知如何称呼不过居然比std还快)]
  • [BZOJ4016][FJOI2014]最短路径树问题
  • [C++] Windows中字符串函数的种类