当前位置: 首页 > news >正文

大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • SparkSession
  • RDD、DataFrame、DataSet
  • 三者之间互相转换 详细解释
    在这里插入图片描述

核心操作

Transformation(转换操作)

定义:
Transformation是懒执行的操作,意味着这些操作在调用时并不会立即执行计算,而是会生成一个新的数据集(或RDD),它们描述了从输入数据到输出数据的转换逻辑。Transformation的计算会被延迟,直到遇到一个Action操作时才会真正触发执行。

常见操作:

  • select(): 从DataFrame中选择列。
  • filter(): 过滤掉不符合条件的行。
  • join(): 连接两个DataFrame。
  • groupBy(): 对数据进行分组。
  • agg(): 聚合操作。

Action(行动操作)

定义:
Action操作会触发Spark的计算并返回结果。与Transformation不同,Action操作会执行整个计算逻辑,并产生最终的输出,如将结果写入外部存储或将数据返回给驱动程序。

常见操作:

  • show(): 显示DataFrame的内容。
  • collect(): 将DataFrame的数据收集到驱动程序上,作为本地集合返回。
  • count(): 计算DataFrame中的行数。
  • write(): 将DataFrame的数据写入外部存储(如HDFS、S3、数据库等)。
  • take(): 返回DataFrame的前n行数据。

Action操作

与RDD类似的操作

  • show
  • collect
  • collectAsList
  • head
  • first
  • count
  • take
  • takeAsList
  • reduce

与结构相关

  • printSchema
  • explain
  • columns
  • dtypes
  • col

生成数据

保存并上传到服务器上

EMPNO,ENAME,JOB,MGR,HIREDATE,SAL,COMM,DEPTNO
7369,SMITH,CLERK,7902,2001-01-02 22:12:13,800,,20
7499,ALLEN,SALESMAN,7698,2002-01-02 22:12:13,1600,300,30
7521,WARD,SALESMAN,7698,2003-01-02 22:12:13,1250,500,30
7566,JONES,MANAGER,7839,2004-01-02 22:12:13,2975,,20
7654,MARTIN,SALESMAN,7698,2005-01-02 22:12:13,1250,1400,30
7698,BLAKE,MANAGER,7839,2005-04-02 22:12:13,2850,,30
7782,CLARK,MANAGER,7839,2006-03-02 22:12:13,2450,,10
7788,SCOTT,ANALYST,7566,2007-03-02 22:12:13,3000,,20
7839,KING,PRESIDENT,,2006-03-02 22:12:13,5000,,10
7844,TURNER,SALESMAN,7698,2009-07-02 22:12:13,1500,0,30
7876,ADAMS,CLERK,7788,2010-05-02 22:12:13,1100,,20
7900,JAMES,CLERK,7698,2011-06-02 22:12:13,950,,30
7902,FORD,ANALYST,7566,2011-07-02 22:12:13,3000,,20
7934,MILLER,CLERK,7782,2012-11-02 22:12:13,1300,,10

写入内容如下图所示:
在这里插入图片描述

测试运行

我们进入 spark-shell 进行测试

// 处理头,使用自动类型推断
val df1 = spark.read.option("header", true).option("infershema", "true").csv("test_spark_03.txt")df1.count
// 缺省显示20行
df1.union(df1).show()
// 显示2行
df1.show(2)

执行结果如下图所示:
在这里插入图片描述
继续进行测试:

// 不截断字符
df1.toJSON.show(false)
// 显示10行 不截断字符
df1.toJSON.show(10, false)

运行结果如下图所示:
在这里插入图片描述
继续进行测试:

// collect 返回数组 Array[Row]
val c1 = df1.collect()
// collectAsList 返回List Lits[Row]
val c2 = df1.collectAsList()// 返回 Row
val h1 = df1.head()
val f1 = df1.first()// 返回 Array[Row]
val h2 = df1.head(3)
val f2 = df1.take(3)// 返回 List[Row]
val t2 = df1.takeAsList(2)

运行结果如下图所示:
在这里插入图片描述
继续进行测试:

// 结构属性
// 查看列名
df1.columns
// 查看列名和类型
df1.dtypes
// 查看执行计划
df1.explain()
// 获取某个列
df1.col("ENAME")
// 常用
df1.printSchema

运行结果如下图所示:
在这里插入图片描述

Transformation 操作

  • RDD 类似的操作
  • 持久化/缓存 与 checkpoint
  • select
  • where
  • group by / 聚合
  • order by
  • join
  • 集合操作
  • 空值操作(函数)
  • 函数

与RDD类似的操作

  • map
  • filter
  • flatMap
  • mapPartitions
  • sample
  • randomSplit
  • limt
  • distinct
  • dropDuplicates
  • describe

我们进行测试:

val df1 = spark.read.csv("/opt/wzk/data/people1.csv")
// 获取第1列
df1.map(row => row.getAs[String](0)).show// randomSplit 将DF、DS按给定参数分成多份
val df2 = df1.randomSplit(Array(0.5, 0.6, 0.7))
df2(0).count
df2(1).count
df2(2).count

测试结果如下图:
在这里插入图片描述

我们继续进行测试:

// 取10行数据生成新的Dataset
val df2 = df1.limit(10)
// distinct 去重
val df2 = df1.union(df1)
df2.distinct.count// dropDuplicates 按列值去重
df2.dropDuplicates.show
df2.dropDuplicates("_c0").show

执行结果如下图:
在这里插入图片描述

存储相关

  • cacheTable
  • persist
  • checkpoint
  • unpersist
  • cache

备注:Dataset默认的存储级别是 MEMEORY_AND_DISK

spark.sparkContext.setCheckpointDir("hdfs://h121.wzk.icu:9000/checkpoint")df1.show()
df1.checkpoint()
df1.cache()import org.apache.spark.storage.StorageLevel
df1.persist(StorageLevel.MEMORY_ONLY)
df1.count()
df1.unpersist(true)

执行结果如下图所示:
在这里插入图片描述

select相关

  • 列的多种表示
  • select
  • selectExpr

启动 Spark-Shell 继续进行测试

// 这里注意 option("header", "true") 自动解析一下表头
val df1 = spark.read.option("header", "true").csv("/opt/wzk/data/people1.csv")// $ col() 等等 不可以混用!!!(有解决方法,但是建议不混用!!!)
// 可以多种形式获取到列
df1.select($"name", $"age", $"job").show

执行结果如下图所示:
在这里插入图片描述

继续进行测试

df1.select("name", "age", "job").show(3)
df1.select(col("name"), col("age"), col("job")).show(3)
df1.select($"name", $"age"+1000, $"job").show(5)

运行结果如下图所示:
在这里插入图片描述

where相关

接着对上述内容进行测试:

df1.filter("age > 25").show
df1.filter("age > 25 and name == 'wzk18'").show
df1.where("age > 25").show
df1.where("age > 25 and name == 'wzk19'").show

运行测试结果如下图:
在这里插入图片描述

groupBy相关

  • groupBy
  • agg
  • max
  • min
  • avg
  • sum
  • count

进行测试:

// 由于我的字段中没有数值类型的,就不做测试了
df1.groupBy("Job").sum("sal").show
df1.groupBy("Job").max("sal").show
df1.groupBy("Job").min("sal").show
df1.groupBy("Job").avg("sal").show
df1.groupBy("Job").count.show
df1.groupBy("Job").avg("sal").where("avg(sal) > 2000").show
df1.groupBy("Job").avg("sal").where($"avg(sal)" > 2000).show
df1.groupBy("Job").agg("sal"->"max", "sal"->"min", "sal"-
>"avg", "sal"->"sum", "sal"->"count").show
df1.groupBy("deptno").agg("sal"->"max", "sal"->"min", "sal"-
>"avg", "sal"->"sum", "sal"->"count").show

orderBy相关

orderBy == sort

df1.orderBy("name").show(5)
df1.orderBy($"name".asc).show(5)
df1.orderBy(-$"age").show(5)

运行测试的结果如下图所示:
在这里插入图片描述
继续进行测试:

df1.sort("age").show(3)
df1.sort($"age".asc).show(3)
df1.sort(col("age")).show(3)

测试结果如下图所示:
在这里插入图片描述

JOIN相关

// 笛卡尔积
df1.crossJoin(df1).count
// 等值连接(单字段)
df1.join(df1, "name").count
// 等值连接(多字段)
df1.join(df1, Seq("name", "age")).show

运行的测试结果如下图所示:
在这里插入图片描述

这里编写两个case:


// 第一个数据集
case class StudentAge(sno: Int, name: String, age: Int)val lst = List(StudentAge(1,"Alice", 18), StudentAge(2,"Andy", 19), StudentAge(3,"Bob", 17), StudentAge(4,"Justin", 21), StudentAge(5,"Cindy", 20))val ds1 = spark.createDataset(lst)// 第二个数据集
case class StudentHeight(sname: String, height: Int)val rdd = sc.makeRDD(List(StudentHeight("Alice", 160), StudentHeight("Andy", 159), StudentHeight("Bob", 170), StudentHeight("Cindy", 165), StudentHeight("Rose", 160)))val ds2 = rdd.toDS

运行测试的结果如下图所示:
在这里插入图片描述
接下来我们进行连表操作:

// 连表操作 不可以使用 "name"==="sname" !!!
ds1.join(ds2, 'name==='sname).show
ds1.join(ds2, ds1("name")===ds2("sname")).show
ds1.join(ds2, $"name"===$"sname").show
ds1.join(ds2, $"name"===$"sname", "inner").show

测试的运行结果如下图所示:
在这里插入图片描述

在这里插入图片描述

集合相关

val ds3 = ds1.select("name")
val ds4 = ds2.select("sname")// union 求并集、不去重
ds3.union(ds4).show
// unionAll(过时了)与union等价
// intersect 求交
ds3.intersect(ds4).show
// except 求差
ds3.except(ds4).show

运行结果如下图所示:
在这里插入图片描述

空值处理

math.sqrt(-1.0)
math.sqrt(-1.0).inNaN()
df1.show
// 删除所有列的空值和NaN
df1.na.drop.show
// 删除某列的空值和NaN
df1.na.drop(Array("xxx")).show
// 对列进行填充
df1.na.fill(1000).show
df1.na.fill(1000, Array("xxx")).show

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 百度最新旗舰大模型文心4.0 Turbo精调服务上线
  • flutter_markdown 基本使用
  • C语言分析数据在内存中的存储一:(整形在内存中的存储)
  • 手动修改zk类型的kafka offset
  • Django 中render、redirect 和 HttpResponse的区别
  • QPushbutton checked状态下文字显示不全
  • 学懂C++(三十六):深入理解与实现C++进程间通信(IPC)
  • Controller中接收数组参数 post请求中在body中传+post请求中通过表单形式传(x-www-form-urlencoded)
  • Python接口自动化测试框架介绍
  • 使用FFmpeg更改视频播放速度的方法
  • 设计资讯 | 这款受数学方程启发的平板桌:配集成黑胶唱片机和无线充电器
  • 一篇文章带你了解网络安全就业前景,零基础入门到精通,收藏这篇就够了
  • React+Vis.js(03):vis.js设置节点形状
  • 用Scratch编程打造你的策略游戏:《保卫萝卜》入门教程
  • Groovy DSL从入门到项目实战(一)
  • php的引用
  • 2017 年终总结 —— 在路上
  • HashMap剖析之内部结构
  • IE报vuex requires a Promise polyfill in this browser问题解决
  • Intervention/image 图片处理扩展包的安装和使用
  • maven工程打包jar以及java jar命令的classpath使用
  • PV统计优化设计
  • React 快速上手 - 06 容器组件、展示组件、操作组件
  • 反思总结然后整装待发
  • 浮动相关
  • 和 || 运算
  • 入职第二天:使用koa搭建node server是种怎样的体验
  • 深入 Nginx 之配置篇
  • 什么软件可以剪辑音乐?
  • 世界上最简单的无等待算法(getAndIncrement)
  • 学习ES6 变量的解构赋值
  • Java总结 - String - 这篇请使劲喷我
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • # 达梦数据库知识点
  • #我与Java虚拟机的故事#连载11: JVM学习之路
  • (2)关于RabbitMq 的 Topic Exchange 主题交换机
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (补充):java各种进制、原码、反码、补码和文本、图像、音频在计算机中的存储方式
  • (超简单)构建高可用网络应用:使用Nginx进行负载均衡与健康检查
  • (附源码)小程序儿童艺术培训机构教育管理小程序 毕业设计 201740
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (一)Kafka 安全之使用 SASL 进行身份验证 —— JAAS 配置、SASL 配置
  • (原+转)Ubuntu16.04软件中心闪退及wifi消失
  • (转)淘淘商城系列——使用Spring来管理Redis单机版和集群版
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • *p=a是把a的值赋给p,p=a是把a的地址赋给p。
  • .NET 4.0中的泛型协变和反变
  • .NET CORE 2.0发布后没有 VIEWS视图页面文件
  • .NET 事件模型教程(二)
  • .NET 中选择合适的文件打开模式(CreateNew, Create, Open, OpenOrCreate, Truncate, Append)
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)
  • .NET设计模式(11):组合模式(Composite Pattern)
  • @Import注解详解
  • @RequestParam,@RequestBody和@PathVariable 区别
  • @TableLogic注解说明,以及对增删改查的影响