当前位置: 首页 > news >正文

Spark RDD学习: aggregate函数

最近在做项目的时候遇到了Spark RDD里面的一个aggregate函数,觉得它的用法挺有意思的,在此记录一下。

Spark 文档中对 aggregate的函数定义如下:

def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) 
=> U)(implicit arg0: ClassTag[U]): U

注释:

Aggregate the elements of each partition, and then the results for 
all the partitions, using given combine functions and a neutral 
"zero value". 
This function can return a different result type, U, 
than the type of this RDD, T. 
Thus, we need one operation for merging a T into an U 
and one operation for merging two U's, as in 
Scala.TraversableOnce. Both of these functions are allowed to 
modify and return their first argument instead of creating a new U 
to avoid memory allocation. 

aggregate函数首先对每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个操作返回的类型不需要和RDD中元素类型一致,所以在使用 aggregate()时,需要提供我们期待的返回类型的初始值,然后通过一个函数把RDD中的元素累加起来??放入累加器?。考虑到每个节点是在本地进行累加的,最终还需要提供第二个函数来将累加器两两合并。

其中seqOp操作会聚合各分区中的元素,然后combOp操作会把所有分区的聚合结果再次聚合,两个操作的初始值都是zeroValue. seqOp的操作是遍历分区中的所有元素(T),第一个T跟zeroValue做操作,结果再作为与第二个T做操作的zeroValue,直到遍历完整个分区。combOp操作是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不同类型的值。因此,需要一个操作seqOp来把分区中的元素T合并成一个U,另外一个操作combOp把所有U聚合。

下面举一个利用aggreated求平均数的例子:

val rdd = List(1,2,3,4)
val input = sc.parallelize(rdd)
val result = input.aggregate((0,0))(
(acc,value) => (acc._1 + value, acc._2 + 1),
(acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
result: (Int, Int) = (10, 4)
val avg = result._1 / result._2
avg: Int = 2.5

程序的详细过程大概如下:

  1. 首先定义一个初始值 (0, 0),即我们期待的返回类型的初始值。

  2. (acc,value) => (acc._1 + value, acc._2 + 1)value是函数定义里面的T,这里是List里面的元素。所以acc._1 + value, acc._2 + 1的过程如下:

    1. 0+1, 0+1

    2. 1+2, 1+1

    3. 3+3, 2+1

    4. 6+4, 3+1

  3. 结果为 (10,4)。在实际Spark执行中是分布式计算,可能会把List分成多个分区,假如3个,p1(1,2), p2(3), p3(4),经过计算各分区的的结果 (3,2), (3,1), (4,1),这样,执行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1)(10,4),然后再计算平均值。

相关文章:

  • 科技产品也要讲时尚 P9红蓝新色彰显独特风格
  • 在Windows 10下启用旧的照片查看器
  • 自制ssl凭证
  • jquery.Ajax回调成功后数据赋值给全局变量的问题
  • 课堂笔记 layout 布局、手风琴accordion、选项卡tabs
  • Angularjs 数据处理的几个重要方法
  • Sqoop 产生背景(一)
  • Wamp下Apache2.4.x局域网访问403的解决办法
  • MyBatis 向Sql语句中动态传参数#183;动态SQL拼接
  • zabbix图形中文乱码的问题
  • 业务人员自助BI分析不够用,还要自助数据准备?
  • finnal 评论 II
  • MySQL字符串转日期类型
  • 行业动态
  • 新浪微博客户端(52)-长按或滑动显示表情
  • CSS中外联样式表代表的含义
  • Docker 笔记(2):Dockerfile
  • fetch 从初识到应用
  • Go 语言编译器的 //go: 详解
  • java8 Stream Pipelines 浅析
  • Javascript弹出层-初探
  • js中forEach回调同异步问题
  • Mac转Windows的拯救指南
  • nginx 配置多 域名 + 多 https
  • QQ浏览器x5内核的兼容性问题
  • socket.io+express实现聊天室的思考(三)
  • Vim 折腾记
  • 大快搜索数据爬虫技术实例安装教学篇
  • 回顾2016
  • 聚类分析——Kmeans
  • 新手搭建网站的主要流程
  • 学习JavaScript数据结构与算法 — 树
  • 【云吞铺子】性能抖动剖析(二)
  • 湖北分布式智能数据采集方法有哪些?
  • ​​​​​​​​​​​​​​Γ函数
  • ​【已解决】npm install​卡主不动的情况
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • #《AI中文版》V3 第 1 章 概述
  • #include到底该写在哪
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • $HTTP_POST_VARS['']和$_POST['']的区别
  • (175)FPGA门控时钟技术
  • (2015)JS ES6 必知的十个 特性
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (delphi11最新学习资料) Object Pascal 学习笔记---第5章第5节(delphi中的指针)
  • (二十四)Flask之flask-session组件
  • (分布式缓存)Redis持久化
  • (切换多语言)vantUI+vue-i18n进行国际化配置及新增没有的语言包
  • (详细版)Vary: Scaling up the Vision Vocabulary for Large Vision-Language Models
  • (学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验
  • (转)C#调用WebService 基础
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .“空心村”成因分析及解决对策122344
  • .NET Framework 服务实现监控可观测性最佳实践
  • .NET Project Open Day(2011.11.13)