当前位置: 首页 > news >正文

RDD 专项练习

RDD 专项练习

现有分数信息文件 scores.txt

班级ID 姓名 年龄 性别 科目 成绩
12 张三 25 男 chinese 50
12 张三 25 男 math 60
12 张三 25 男 english 70
12 李四 20 男 chinese 50
12 李四 20 男 math 50
12 李四 20 男 english 50
12 王芳 19 女 chinese 70
12 王芳 19 女 math 70
12 王芳 19 女 english 70
13 张大三 25 男 chinese 60
13 张大三 25 男 math 60
13 张大三 25 男 english 70
13 李大四 20 男 chinese 50
13 李大四 20 男 math 60
13 李大四 20 男 english 50
13 王小芳 19 女 chinese 70
13 王小芳 19 女 math 80
13 王小芳 19 女 english 70
需求如下:
1、一共有多少人参加考试?
2、一共有多少个大于、小于、等于20岁的人参加考试?
3、分别有多个男生和女生参加考试?
4、各个班有多少人参加考试?
5、语文和数学科目的平均成绩是多少?
6、单个人平均成绩是多少?
7、各班平均成绩是多少?
8、各班男女生平均总成绩是多少?
9、全校语文成绩最高分是多少?
10、各班各个科目最高和最低成绩是多少?
11、总成绩大于 150 分的 12 班的女生有几个?

一、预处理

定义一个内部case类用于存储分数数据
创建SparkContext对象
读取数据文件,跳过第一行(标题行),并映射为Score对象(附加年龄类型)

import org.apache.spark.rdd.RDD  
import org.apache.spark.{SparkConf, SparkContext}  object SparkRDD {  // 定义一个内部case类用于存储分数数据  private case class Score(classId: Int, name: String, age: Int, gender: String, subject: String, score: Int, _type: String)  def main(args: Array[String]): Unit = {  // 创建Spark配置对象  val conf = new SparkConf()  .setAppName("spark_rdd") // 设置应用名称  .setMaster("local[4]") // 设置运行模式为本地模式,并分配4个核心  // 获取或创建SparkContext对象  val sc = SparkContext.getOrCreate(conf)  // 指定数据文件路径  val path = "file:///D:\\myOwnProject\\spark_first\\data\\scores.txt"  // 读取数据文件,跳过第一行(标题行),并映射为Score对象  val scores: RDD[Score] = sc.textFile(path, 4) // 读取文件,分区数为4  .mapPartitionsWithIndex { // 对每个分区应用索引和迭代器  case (index, iterator) => if (index == 0) iterator.drop(1) else iterator // 如果是第一个分区,则跳过第一行  }  .mapPartitions( // 对每个分区应用映射操作  _.map(line => { // 对分区中的每一行进行处理  val a = line.split("\\s+") // 按空白字符分割每行  val age = a(2).toInt // 将年龄字段转换为整数  val _type = age match { // 根据年龄设置类型  case age if age > 20 => "GT20" // 年龄大于20  case age if age == 20 => "EQ20" // 年龄等于20  case age if age < 20 => "LT20" // 年龄小于20  }  // 构造Score对象  Score(a(0).toInt, a(1), a(2).toInt, a(3), a(4), a(5).toInt, _type)  })  ).cache() // 将RDD缓存到内存中,优化:RDD结果被不断使用// 打印所有处理后的Score对象  scores.foreach(println)  //程序结束时停止SparkContext  sc.stop()  }  
}

二、处理需求

1、一共有多少人参加考试?

val count1 = scores.mapPartitions(_.map(_.name))	// scores如上预处理,下同.distinct().count()println(s"${count1} 人参加考试")			//6人参加考试

2、一共有多少个大于、小于、等于20岁的人参加考试?

val map = Map(("GT20", "20岁以上"), ("EQ20", "20岁"), ("LT20", "20岁以下"))
scores.mapPartitions(_.map(s => ((s.name, s._type), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${map.get(s._1).get}的人数为${s._2}"))
/*
20岁以上的人数为2
20岁以下的人数为2
20岁的人数为2
*/

3、分别有多个男生和女生参加考试?

scores.mapPartitions(_.map(s => ((s.name, s.gender), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${s._1}生参加考试的人数为${s._2}"))
/*
男生参加考试的人数为4
女生参加考试的人数为2
*/

4、各个班有多少人参加考试?

注意同班,同名去重

方法一:groupByKey 去重

scores.mapPartitions(_.map(s => ((s.name, s.classId), 1))).groupByKey().mapPartitions(_.map(s => (s._1._2, 1))).reduceByKey(_ + _).foreach(s => println(s"${s._1}班参加考试的人数为${s._2}"))
/*
12班参加考试的人数为3
13班参加考试的人数为3
*/

方法二:distinct去重(不推荐)

scores.mapPartitions(_.map(t=>(t.classId, t.name))).distinct().map(s => (s._1, 1)).reduceByKey(_+_).foreach(s => println(s"${s._1}班参加考试的人数为${s._2}"))

5、语文数学科目的平均成绩是多少?

scores.mapPartitions(_.collect({	 	// 存在非语数外科目,如何过滤case s if s.subject.matches("chinese|math") => (s.subject, s.score)})).groupByKey().map(t => (t._1, t._2.sum * 1.0f / t._2.size)).foreach(s => println(s"${s._1}平均分:${s._2}"))

6、单个人平均成绩是多少?

scores.mapPartitions(_.map(t=>(t.name,t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)

7、各班平均成绩是多少?

scores.mapPartitions(_.map(t=>(t.classId,t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)

8、各班男女生平均总成绩是多少?

scores.mapPartitions(_.map(t=>((t.classId,t.gender),t.score))).groupByKey().map(t=>(t._1,t._2.sum*1.0f/t._2.size)).foreach(println)

9、全校语文成绩最高分是多少?

val chineseMax = scores.mapPartitions(_.filter(_.subject.equals("chinese")).map(_.score)).max()
println(s"最高语文成绩为${chineseMax}")

10、各班各个科目最高和最低成绩是多少?

scores.mapPartitions(_.map(s => ((s.subject, s.classId), (s.score, s.score)))
)
.reduceByKey((s1, s2) => (if (s1._1 > s2._1) s1._1 else s2._1, (if (s1._2 < s2._2) s1._2 else s2._2))
)
.foreach(s => println(s"${s._1._2}${s._1._1}科目的最大成绩为${s._2._1},最小成绩为${s._2._2}"))

结果

13班chinese科目的最大成绩为70,最小成绩为50
12班english科目的最大成绩为70,最小成绩为50
12班chinese科目的最大成绩为70,最小成绩为50
13班english科目的最大成绩为70,最小成绩为50
12班math科目的最大成绩为70,最小成绩为50
13班math科目的最大成绩为80,最小成绩为60

11、总成绩大于 150 分的 12 班的女生有几个?

val count2 = scores.mapPartitions(_.filter(s => s.gender == "女" && s.classId == 12).map(s => (s.name, s.score))).reduceByKey(_ + _).filter(s => s._2 > 150).count()
println(s"总成绩大于 150 分的 12 班的女生有${count2}个")

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 提升机器视觉与机器学习软件安全性的实践策略
  • OpenCV距离变换函数distanceTransform的使用
  • 01MFC建立单个文件类型——画线
  • 9. Python的魔法函数
  • DP讨论——适配器模式
  • 使用sklearn的基本流程
  • Qt5 Ubuntu18 QStackedWidget
  • 路由守卫中使用next()跳转到指定路径时会无限循环
  • 【C/C++】【学生成绩管理系统】深度剖析
  • Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比
  • anaconda常用指令学习
  • 企业网络实验dhcp-snooping、ip source check,防非法dhcp服务器、自动获取ip(虚拟机充当DHCP服务器)、禁手动修改IP
  • 【爬虫】Python实现,模拟天眼查登录验证获取token
  • windows电脑的linux虚拟机连接电脑网络的方法
  • 项目收获总结--本地缓存方案选型及使用缓存的坑
  • @jsonView过滤属性
  • Angular 4.x 动态创建组件
  • Docker 笔记(2):Dockerfile
  • Java 多线程编程之:notify 和 wait 用法
  • JavaScript实现分页效果
  • Quartz实现数据同步 | 从0开始构建SpringCloud微服务(3)
  • React-Native - 收藏集 - 掘金
  • React中的“虫洞”——Context
  • springMvc学习笔记(2)
  • Theano - 导数
  • vue--为什么data属性必须是一个函数
  • yii2权限控制rbac之rule详细讲解
  • 案例分享〡三拾众筹持续交付开发流程支撑创新业务
  • 多线程事务回滚
  • 给新手的新浪微博 SDK 集成教程【一】
  • 记录:CentOS7.2配置LNMP环境记录
  • 盘点那些不知名却常用的 Git 操作
  • 配置 PM2 实现代码自动发布
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 人脸识别最新开发经验demo
  • 如何编写一个可升级的智能合约
  • 如何设计一个微型分布式架构?
  • 跳前端坑前,先看看这个!!
  • 微信端页面使用-webkit-box和绝对定位时,元素上移的问题
  • 一道闭包题引发的思考
  • 怎么将电脑中的声音录制成WAV格式
  • mysql面试题分组并合并列
  • ​人工智能之父图灵诞辰纪念日,一起来看最受读者欢迎的AI技术好书
  • (33)STM32——485实验笔记
  • (附源码)springboot人体健康检测微信小程序 毕业设计 012142
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (论文阅读23/100)Hierarchical Convolutional Features for Visual Tracking
  • (每日一问)设计模式:设计模式的原则与分类——如何提升代码质量?
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (五)IO流之ByteArrayInput/OutputStream
  • (学习日记)2024.02.29:UCOSIII第二节
  • (一)kafka实战——kafka源码编译启动
  • (转)关于pipe()的详细解析
  • (轉貼) 2008 Altera 亞洲創新大賽 台灣學生成果傲視全球 [照片花絮] (SOC) (News)
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。