当前位置: 首页 > news >正文

大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • SparkSQL 核心操作
  • Action操作 详细解释+测试案例
  • Transformation操作 详细解释+测试案例

在这里插入图片描述

SQL 语句

总体而言:SparkSQL语HQL兼容;与HQL相比,SparkSQL更简洁。
SparkSQL是Apache Spark框架中的一个模块,专门用于处理结构化和半结构化数据。它提供了对数据进行查询、处理和分析的高级接口。

SparkSQL的核心特点包括:

  • DataFrame API:SparkSQL提供了DataFrame API,它是一种以行和列为结构的数据集,与关系数据库中的表非常相似。DataFrame支持多种数据源,如Hive、Parquet、JSON、JDBC等,可以轻松地将数据导入并进行操作。
  • SQL查询:SparkSQL允许用户通过标准的SQL语法查询DataFrame,这使得数据分析师和工程师可以使用他们熟悉的SQL语言来处理大数据。SparkSQL会自动将SQL查询转换为底层的RDD操作,从而在分布式环境中执行。
  • 与Hive集成:SparkSQL可以与Hive无缝集成,使用Hive的元数据和查询引擎。它支持HiveQL(Hive Query Language)语法,并且能够直接访问Hive中的数据。
  • 性能优化:SparkSQL采用了多种优化技术,如Catalyst查询优化器和Tungsten物理执行引擎。这些优化技术能够自动生成高效的执行计划,提高查询的执行速度。

数据样例

// 数据
1 1,2,3
2 2,3
3 1,2// 需要实现如下的效果
1 1
1 2
1 3
2 2
2 3
3 1
3 2

编写代码

package icu.wzkimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.Encoderscase class Info(id: String, tags: String)object SparkSql01 {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().appName("SparkSQLDemo").master("local[*]").getOrCreate()val sc = sparkSession.sparkContextsc.setLogLevel("WARN")val arr = Array("1 1,2,3", "2 2,3", "3 1,2")val rdd: RDD[Info] = sc.makeRDD(arr).map{line => val fields: Array[String] = line.split("\\s+")Info(fields(0), fields(1))}import sparkSession.implicits._implicit val infoEncoder = Encoders.product[Info]val ds: Dataset[Info] = sparkSession.createDataset(rdd)ds.createOrReplaceTempView("t1")sparkSession.sql("""| select id, tag| from t1| lateral view explode(split(tags, ",")) t2 as tag|""".stripMargin).showsparkSession.sql("""| select id, explode(split(tags, ","))| from t1|""".stripMargin).showsparkSession.close()}}

运行测试

控制台输出结果为:

+---+---+
| id|tag|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---++---+---+
| id|col|
+---+---+
|  1|  1|
|  1|  2|
|  1|  3|
|  2|  2|
|  2|  3|
|  3|  1|
|  3|  2|
+---+---+

运行结果

运行结果如下图所示:
在这里插入图片描述

输入与输出

SparkSQL 内建支持的数据源包括:

  • Parquet (默认数据源)
  • JSON
  • CSV
  • Avro
  • Images
  • BinaryFiles(Spark 3.0)

简单介绍一下,Parquet 是一种列式存储格式,专门为大数据处理和分析而设计。

  • 列式存储:Parquet 采用列式存储格式,这意味着同一列的数据存储在一起。这样可以极大地提高查询性能,尤其是当查询只涉及少量列时。
  • 高效压缩:由于同一列的数据具有相似性,Parquet 能够更高效地进行压缩,节省存储空间。
  • 支持复杂数据类型:Parquet 支持嵌套的数据结构,包括嵌套列表、映射和结构体,这使得它非常适合处理复杂的、半结构化的数据。
  • 跨平台:Parquet 是一种开放标准,支持多种编程语言和数据处理引擎,包括 Apache Spark、Hadoop、Impala 等。

在这里插入图片描述

Parquet

特点:Parquet是一种列式存储格式,特别适合大规模数据的存储和处理。它支持压缩和嵌套数据结构,因此在存储效率和读取性能方面表现优异。

使用方式:spark.read.parquet(“path/to/data”) 读取Parquet文件;df.write.parquet(“path/to/output”) 将DataFrame保存为Parquet格式。

JSON

特点:JSON是一种轻量级的数据交换格式,广泛用于Web应用程序和NoSQL数据库中。SparkSQL能够解析和生成JSON格式的数据,并支持嵌套结构。

使用方式:spark.read.json(“path/to/data”) 读取JSON文件;df.write.json(“path/to/output”) 将DataFrame保存为JSON格式。

CSV

特点:CSV(逗号分隔值)是最常见的平面文本格式之一,简单易用,但不支持嵌套结构。SparkSQL支持读取和写入CSV文件,并提供了处理缺失值、指定分隔符等功能。

使用方式:spark.read.csv(“path/to/data”) 读取CSV文件;df.write.csv(“path/to/output”) 将DataFrame保存为CSV格式。

Avro

特点:Avro是一种行式存储格式,适合大规模数据的序列化。它支持丰富的数据结构和模式演化,通常用于Hadoop生态系统中的数据存储和传输。

使用方式:spark.read.format(“avro”).load(“path/to/data”) 读取Avro文件;df.write.format(“avro”).save(“path/to/output”) 将DataFrame保存为Avro格式。

ORC

特点:ORC(Optimized Row Columnar)是一种高效的列式存储格式,专为大数据处理而设计,支持高压缩率和快速读取性能。它在存储空间和I/O性能方面表现优越。

使用方式:spark.read.orc(“path/to/data”) 读取ORC文件;df.write.orc(“path/to/output”) 将DataFrame保存为ORC格式。

Hive Tables

特点:SparkSQL能够无缝集成Hive,直接访问Hive元数据,并对Hive表进行查询。它支持HiveQL语法,并能够利用Hive的存储格式和结构。

使用方式:通过spark.sql(“SELECT * FROM hive_table”)查询Hive表;也可以使用saveAsTable将DataFrame写入Hive表。

JDBC/ODBC

特点:SparkSQL支持通过JDBC/ODBC接口连接关系型数据库,如MySQL、PostgreSQL、Oracle等。它允许从数据库读取数据并将结果写回数据库。

使用方式:spark.read.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).load() 读取数据库表;df.write.format(“jdbc”).option(“url”, “jdbc:mysql://host/db”).option(“dbtable”, “table”).option(“user”, “username”).option(“password”, “password”).save() 将DataFrame写入数据库。

Text Files

特点:SparkSQL可以处理简单的文本文件,每一行被读取为一个字符串。适合用于处理纯文本数据。

使用方式:spark.read.text(“path/to/data”) 读取文本文件;df.write.text(“path/to/output”) 将DataFrame保存为文本格式。

Delta Lake (外部插件)

特点:Delta Lake是一种开源存储层,构建在Parquet格式之上,支持ACID事务、可扩展元数据处理和流批一体的实时数据处理。尽管不是内建的数据源,但它在Spark生态系统中得到了广泛支持。

使用方式:spark.read.format(“delta”).load(“path/to/delta-table”) 读取Delta表;df.write.format(“delta”).save(“path/to/delta-table”) 将DataFrame保存为Delta格式。

测试案例

val df1 =
spark.read.format("parquet").load("data/users.parquet")
// Use Parquet; you can omit format("parquet") if you wish as
it's the default
val df2 = spark.read.load("data/users.parquet")// Use CSV
val df3 = spark.read.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load("data/people1.csv")// Use JSON
val df4 = spark.read.format("json")
.load("data/emp.json")

此外还支持 JDBC 的方式:

val jdbcDF = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://h122.wzk.icu/spark_test?useSSL=false").option("driver", "com.mysql.jdbc.Driver").option("user", "hive").option("password", "hive@wzk.icu").load()
jdbcDF.show()

访问Hive

在这里插入图片描述

导入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version>
</dependency>

hive-site

需要在项目的 Resource 目录下,新增一个 hive-site.xml
备注:最好使用 metastore service连接Hive,使用直接metastore的方式时,SparkSQL程序会修改Hive的版本信息

<configuration><property><name>hive.metastore.uris</name><value>thrift://h122.wzk.icu:9083</value></property>
</configuration>

编写代码

object AccessHive {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("Demo1").master("local[*]").enableHiveSupport()// 设为true时,Spark使用与Hive相同的约定来编写Parquet数据.config("spark.sql.parquet.writeLegacyFormat", true).getOrCreate()val sc = spark.sparkContextsc.setLogLevel("warn")spark.sql("show databases").showspark.sql("select * from ods.ods_trade_product_info").showval df: DataFrame = spark.table("ods.ods_trade_product_info")df.show()df.write.mode(SaveMode.Append).saveAsTable("ods.ods_trade_product_info_back")spark.table("ods.ods_trade_product_info_back").showspark.close()}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • [ComfyUI]Flux+MiniCPM-V强强联手艺术创意,媲美GPT4V级国产多模态视觉大模型
  • 【C/C++】pointer vs reference
  • JavaScript - Ajax
  • Spring常用的注解有哪些?作用是什么?怎么用?
  • Python 爬虫入门(十二):正则表达式「详细介绍」
  • outlook在“对我发送的邮件应用规则”时只能移动邮件副本的问题和解决方案
  • 缔造“神话”的不止悟空,海信电视也有“画质神话”
  • 【2024年】为Python股票量化分析最新整理的免费股票数据API接口之实时数据
  • ROS机器人专用云台相机防抖摄像头
  • 2024Go语言面试宝典Golang零基础实战项目面试八股力扣算法笔记等
  • 【jvm】jvm方法和栈帧的关系
  • 【后端学前端】纯HTML实现响应式布局
  • 03-JavaScript高阶( 代码)
  • C语言—指针(2)
  • 结合量子技术解决数据传输安全
  • @jsonView过滤属性
  • “寒冬”下的金三银四跳槽季来了,帮你客观分析一下局面
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • angular组件开发
  • github指令
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • Iterator 和 for...of 循环
  • javascript从右向左截取指定位数字符的3种方法
  • Java方法详解
  • ng6--错误信息小结(持续更新)
  • React系列之 Redux 架构模式
  • spring boot下thymeleaf全局静态变量配置
  • Vue源码解析(二)Vue的双向绑定讲解及实现
  • 力扣(LeetCode)965
  • 前端js -- this指向总结。
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 世界上最简单的无等待算法(getAndIncrement)
  • 我看到的前端
  • 消息队列系列二(IOT中消息队列的应用)
  • 学习使用ExpressJS 4.0中的新Router
  • 用 Swift 编写面向协议的视图
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • #HarmonyOS:基础语法
  • #我与Java虚拟机的故事#连载05:Java虚拟机的修炼之道
  • (1)Map集合 (2)异常机制 (3)File类 (4)I/O流
  • (12)Hive调优——count distinct去重优化
  • (32位汇编 五)mov/add/sub/and/or/xor/not
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (ros//EnvironmentVariables)ros环境变量
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (附源码)springboot 校园学生兼职系统 毕业设计 742122
  • (附源码)springboot 智能停车场系统 毕业设计065415
  • (四)JPA - JQPL 实现增删改查
  • (正则)提取页面里的img标签
  • (转)Linux整合apache和tomcat构建Web服务器
  • (轉貼) VS2005 快捷键 (初級) (.NET) (Visual Studio)
  • ***测试-HTTP方法
  • .NET CORE 2.0发布后没有 VIEWS视图页面文件
  • .NET Core 通过 Ef Core 操作 Mysql
  • .NET core 自定义过滤器 Filter 实现webapi RestFul 统一接口数据返回格式