当前位置: 首页 > news >正文

【算子2】spark(四):spark core:trans算子中key-value类型的算子使用说明

文章目录

    • 1. Key-Value类型的算子分类
    • 2. 常用算子使用举例
      • partitionBy:通过key进行分区
      • groupBykey:通过key进行分组
      • combineByKey
      • join
      • cogroup(otherDataset, [numTasks])

1. Key-Value类型的算子分类

一、(输入分区与输出分区)一对一

mapValues(func): 针对于(K,V)形式的类型只对V进行操作
sortByKey([ascending], [numTasks]): 按照key值进行排序

 
二、RDD聚集

reduceByKey(func): 将相同key的value值进行收集,例如求出每个key的平均值
combineByKey(func): 对数据进行分区内和分区间的操作
partitionBy(func): 通过key值进行分区;分区逻辑可以是默认可以是自定类(继承Partitioner

 

三、连接

join(otherDataset, [numTasks])): 类似于mysql中的外键关联,关联要素key相同
leftOutJoin()\rightOutJoin:
cogroup(otherDataset): 与join类似,但不同的是,没有相同的key时,同样可以进行联合。

 
 

2. 常用算子使用举例

partitionBy:通过key进行分区

println("分区数"+arrayRDD.partitions.size)
//分区逻辑可以是默认可以是自定类(继承Partitioner)
val partitionByRDD: RDD[(Int, String)] = arrayRDD.partitionBy(new HashPartitioner(2))
println("分区数"+partitionByRDD.partitions.size)

 

groupBykey:通过key进行分组

相同的key分为一组

//将数据变成Tuple2
val ArrayRDD: RDD[String] = sc.makeRDD(Array("one", "two", "two", "three", "three", "three"))
val mapRDD: RDD[(String, Int)] = ArrayRDD.map((_,1))

//groupByKey
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()

//map对Tuple2的CompactBuffer进行求和操作
val sumRDD: RDD[(String, Int)] = groupByKeyRDD.map(t=>(t._1,t._2.sum))
sumRDD.collect().foreach(println)

 

combineByKey

对数据进行分区内和分区间的操作

val combineByKeyRDD: RDD[(String, (Int, Int))] = ListRDD.combineByKey(
      (_, 1),
      (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
      (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
    )
    combineByKeyRDD.collect().foreach(println)
    //求平均:
    val avrRDD: RDD[(String, Int)] = combineByKeyRDD.map {
      case (x, y) => (x, y._1 / y._2)
    }
    avrRDD.collect().foreach(println)

在这里插入图片描述

 

join

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

1)创建第一个pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))2)创建第二个pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))3)join操作并打印结果
scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

 

cogroup(otherDataset, [numTasks])

作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。

1)创建第一个pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(6,"b"),(2,"c"),(5,"d")))2)创建第二个pairRDD
scala> val rdd1 = sc.parallelize(Array((1,"a"),(5,"b"),(5,"b")))3)cogroup两个RDD并打印结果
scala> rdd.cogroup(rdd1).collect()

与join类似,但不同的是,没有相同的key时,同样可以进行联合。
在这里插入图片描述

相关文章:

  • 9.25
  • codeforces-1734C - Removing Smallest Multiples
  • Java IO流的“四大家族”
  • 源码编译perl5遇到的问题汇总
  • 63 岁老工程师设计一屏双计算器软件工具,一起看看?
  • python实现自动换桌面壁纸恶搞程序【带源码】--------- 2.程序调试和打包
  • 抛开去中心化叙事 我们需要DAO的4个理由
  • 【Android入门】5、Broadcast 广播、Kotlin 的高阶函数、泛型、委托
  • clickhouse
  • 【周赛复盘】力扣第 312 场单周赛
  • QT通过QSS文件样式表设置改变窗体与按钮背景外观
  • kotlin基础知识
  • Keras学习记录之模型
  • LeetCode 0329. 矩阵中的最长递增路径
  • JavaEE:线程安全问题的原因和解决方案
  • 网络传输文件的问题
  • 【EOS】Cleos基础
  • avalon2.2的VM生成过程
  • Brief introduction of how to 'Call, Apply and Bind'
  • create-react-app做的留言板
  • Fabric架构演变之路
  • input实现文字超出省略号功能
  • JavaScript设计模式系列一:工厂模式
  • jdbc就是这么简单
  • mysql常用命令汇总
  • mysql外键的使用
  • php ci框架整合银盛支付
  • scrapy学习之路4(itemloder的使用)
  • SpiderData 2019年2月25日 DApp数据排行榜
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 飞驰在Mesos的涡轮引擎上
  • 构建二叉树进行数值数组的去重及优化
  • 前端每日实战:61# 视频演示如何用纯 CSS 创作一只咖啡壶
  • 新书推荐|Windows黑客编程技术详解
  • 没有任何编程基础可以直接学习python语言吗?学会后能够做什么? ...
  • ​【C语言】长篇详解,字符系列篇3-----strstr,strtok,strerror字符串函数的使用【图文详解​】
  • ​【已解决】npm install​卡主不动的情况
  • #NOIP 2014#Day.2 T3 解方程
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • (4)logging(日志模块)
  • (52)只出现一次的数字III
  • (6)添加vue-cookie
  • (九十四)函数和二维数组
  • (六)激光线扫描-三维重建
  • (没学懂,待填坑)【动态规划】数位动态规划
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (一)【Jmeter】JDK及Jmeter的安装部署及简单配置
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)eclipse内存溢出设置 -Xms212m -Xmx804m -XX:PermSize=250M -XX:MaxPermSize=356m
  • .NET Framework与.NET Framework SDK有什么不同?
  • .NET 动态调用WebService + WSE + UsernameToken
  • .NET 服务 ServiceController
  • .Net环境下的缓存技术介绍
  • @GlobalLock注解作用与原理解析
  • [ C++ ] STL_stack(栈)queue(队列)使用及其重要接口模拟实现