当前位置: 首页 > news >正文

Spark SQL支持DataFrame操作的数据源

DataFrame提供统一接口加载和保存数据源中的数据,包括:结构化数据、Parquet文件、JSON文件、Hive表,以及通过JDBC连接外部数据源。一个DataFrame可以作为普通的RDD操作,也可以通过(registerTempTable)注册成一个临时表,支持在临时表的数据上运行SQL查询操作。

一、数据源加载保存操作

DataFrame数据源默认文件为Parquet格式,可以通过spark.sql.sources.default参数进行重新修改。
不论何种格式的数据源均采取统一API、read和write进行操作,代码如下:

// 读取parquet格式数据
val df =sqlContext.read.load("file:///$SPARK_HOME/examples/src/main/resources/users.parquet")
// 从DataFrame写数据并保存成Parquet格式
df.write.save("saveusers.parquet")

1,指定选项

Spark支持通过完全限定名称(如org.apache.spark.sql.parquet)指定数据源的附加选项,内置数据源可以使用短名称(json、parquet、jdbc),Spark SQL支持通过format将任何类型的DataFrames转换成其他类型。

val df = sqlContext.read.format("json").load("file:///$SPARK_HOME examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

2,保存模式

可以通过配置SaveMode指定如何处理现有数据,实现保存模式不使用任何锁定,而且不是原子操作;因此,多路数据写入相同位置是不安全的。当执行overwrite时,写入新数据之前原来数据将被删除。
在这里插入图片描述

3,保存持久表

当使用HiveContext时,DataFrames通过saveAsTable命令保存为持久表使用,与registerTempTable命令不同,saveAsTable实现Dataframe的内容,并创建一个指向Hive Metastore中数据的指针。即使Spark程序重新启动,连接相同Metastore的数据不会发生变化。
默认情况下saveAsTable将创建一个“管理表”,这意味着数据的位置将由Metastore控制,当表被删除时,管理表将表数据自动删除。

二、Parquet文件

Parquet是一种支持多种数据处理系统的存储格式,Spark SQL提供了读写Parquet文件,并且自动保存原始数据的模式。

1,Parquet文件优点

(1)高效,Parquet采取列式存储避免读入不需要的数据,具有极好的性能和GC。
(2)方便的压缩和解压缩,并具有极好的压缩比例。
(3)可以直接固化为Parquet文件,也可以直接读取Parquet文件,具有比磁盘更好的缓存效果。
Spark SQL对读写Parquet文件提供支持,方便加载Parquet文件数据到DataFrame,供Spark SQL操作,也可以将DataFrame写入Parquet文件,并自动保留原始Scheme架构。
在外部数据源方面,Spark对Parquet的支持有了很大的加强,更快的metadata discovery和schema merging;同时能够读取其他工具或者库生成的非标准合法的Parquet文件;以及更快、更鲁棒的动态分区插入。

2,加载数据编程

通过sqlContext.implicits._隐式转换一个RDD为DataFrame,并将DataFrame保存为Parquet文件;加载保存的Parquet文件,重新构建一个DataFrame,注册成临时表,供SQL查询使用。

// 创建sqlContextval 
sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 隐式转换为一个DataFrame
import sqlContext.implicits._
// 使用case定义schema,实现Person接口
case class Person(name: String, age: Int)
// 读取文件创建一个MappedRDD,并将数据写入Person模式类,隐式转换为DataFrame
val peopleDF = sc.textFile("file:///$SPARK_HOME/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
// 保存DataFrame,保存为Parquet格式
peopleDF.write.parquet("people.parquet")
// 加载Parquet文件作为DataFrame
val parquetFile = sqlContext.read.parquet("people.parquet")
// 将DataFrame注册为临时表,供SQL查询使用
parquetFile.registerTempTable("parquetTable")
val result = sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19")
result.map(t => "Name: " + t(0)).collect().foreach(println)

3,分区发现(partition discovery)

表分区(table partitioning)是一种常见的优化方法,用于像Hive一样的系统。对于分区表,数据通常存储在不同的目录中,在每个分区目录路径中对分区列的值进行编码。
Parquet数据源能够自动发现和推断分区信息,使用以下目录结构存储以前使用的人口数据到一个分区表,以gender和country作为分区列:

path└──table
        ├── gender=male
        │   ├── ...
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
			├── ...
			├── country=US
			│   └── data.parquet
			├── country=CN
			│   └── data.parquet
			└── ...

通过路径path/table,使用SQLContext.read的parquet或load命令,Spark SQL自动提取分区信息,返回的DataFrame模式如下:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

分区列的数据类型是自动映射,支持numeric数据类型和string类型自动推断。

4,模式合并(schema merging)

如同ProtocolBuffer、Avro、Thrift,Parquet也支持模式演进,用户可以从一个简单的模式开始,逐步根据需要添加更多的列。通过这种方式,用户最终得到多个不同但是能相互兼容模式的Parquet文件,Parquet数据源能够自动检测这种情况,进而合并这些文件。
由于模式合并是相对昂贵的操作,在很多情况下并非必须,为了提升性能,在1.5.0版本中默认关闭。

// 隐式转换一个RDD为DataFrame
import sqlContext.implicits._
// 创建一个DataFrame,存储数据到一个分区目录
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// 创建一个新DataFrame,存储在一个新的分区目录
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// 读取分区表
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// 通过基础DataFrame函数,以树格式打印Schema,包含分区目录下全部的分区列
df3.printSchema()
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key: int (nullable = true)

Parquet数据源自动从文件路径中发现了key这个分区列,并且正确合并了两个不相同但相容的Schema。值得注意的是,如果最后的查询中查询条件跳过了key=1这个分区,Spark SQL的查询优化器会根据这个查询条件将该分区目录剪掉,完全不扫描该目录中的数据,从而提升查询性能。

5,配置

在SQLContext中使用setConf方法,或在运行时使用SQL命令SET key=value,实现对Parquet文件的配置
在这里插入图片描述

三、JSON数据集

Spark SQL可以自动推断出一个JSON数据集的Schema并作为一个DataFrame加载,通过SQLContext.read.json()方法使用JSON文件创建DataFrame。

// 创建sqlContextval 
sqlContext = new org.apache.spark.sql.SQLContext(sc)
// 设置JSON数据集的路径,可以是单个文件或者一个目录
val path= file:///Spark_Home/examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// 打印schema,并显示推断的schema
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// 注册DataFrame作为一个临时表
people.registerTempTable("jsonTable")
// 使用sql运行SQL表达式
val teenagers = sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 13 AND age <= 19")

或者通过转换一个JSON对象的RDD[String]创建DataFrame。

val anotherRDD = sc.parallelize("""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherRDD)

四、Hive表

Spark SQL支持从Hive表中读写数据,然而默认版本Spark组件并不包括Hive大量的依赖关系。Hive支持通过添加-Phive和-Phive-thriftserver标志对Spark重新构建一个包括Hive的新组件,Hive的新组件必须分发到所有的Worker节点上,因为Worker节点需要访问Hive的serialization和deserialization库(SerDes),以便于访问存储在Hive中的数据,所以该Hive集合Jar包必须拷贝到所有的Worker节点。
除了基本的SQLContext,Spark SQL还可以创建一个HiveContext,该HiveContext通过基本的SQLContext提供了一系列的方法集,可以使用更完整的HiveQL解析器查询,访问Hive的UDF,并从Hive表读取数据,以及SerDe支持。
在这里插入图片描述

1,示例数据

新建一个kv1.txt文件,数据如下:

238 val_238
86  val_86
311 val_311
27  val_27
165 val_165
409 val_409
255 val_255
278 val_278
98  val_98

2,创建HiveContext

使用Hive,必须先构建一个继承SQLContext的HiveContext对象,并加入在MetaStore中查找表和使用HiveQL写查询功能的支持;可以在conf目录hive-site.xml文件中添加Hive的配置文件,当运行一个YARN集群时,datanucleus jars和hive-site.xml必须在Driver和全部的Executors启动。
一个简单的方法如下:在spark-submit命令行通过–jars参数和–file参数加载,即使hive-site.xml文件没有配置,仍然可以创建一个HiveContext,并会在当前目录下自动地创建metastore_db和warehouse。

使用Scala语言说明HiveContext创建方式:

// SparkContext实例
val sc: SparkContext = ...
// 通过sc创建HiveContext的实例hiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

3,使用Hive操作数据

使用HiveContext无需单独安装Hive,可以使用spark.sql.dialect选项选择解析查询语句的SQL的特定转化,这个参数可以使用SQLContext上的setConf方法,也可以使用SQL上的SETkey=value命令进行修改。

// 通过HiveContext的sql命令创建表
hiveContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
// 加载数据, $SPARK_HOME指Spark文件安装目录,使用“file:// ...”标识的本地文件,使用“hdfs:// ...”标识的HDFS存储系统的文件
hiveContext.sql("LOAD DATA LOCAL INPATH 'file:///$Spark_Home/examples/src/main/resources/kv1.txt' INTO TABLE src")
// HiveQL的查询表达
hiveContext.sql("FROM src SELECT key,value").collect().foreach(println)
// 使用HiveContext创建表命令
CREATE [EXTERNAL] TABLE[IF NOT EXISTS] table_name
(col_name data_type,)
[PARTITIONED BY(col_name data_type,)]
[[ROW FORMAT row_format]]
[STORED AS file_format]
[LOCATION hdfs_path]

4,Spark支持的Hive特性

(1)Hive查询语句,包括:SELECT、GROUP BY、ORDER BY、CLUSTER BY、SORT BY;
(2)Hive运算符,包括:关系运算符(=、<>、、<>、<、>、>=、<=等)、算术运算符(+、-、*、/、%等)、逻辑运算符(AND、&&、OR、||等)、复杂类型构造函数、数据函数(sign、ln、cos等)、字符串函数(instr、length、printf等);
(3)用户自定义函数(UDF);
(4)用户自定义聚合函数(UDAF);
(5)用户定义的序列化格式(SerDes);
(6)连接操作,包括:JOIN、{LEFT|RIGHT|FULL}OUTER JOIN、LEFT SEMI JOIN、CROSS JOIN;
(7)联合操作(Unions);
(8)子查询:SELECT col FROM(SELECT a+b AS col from t1)t2;
(9)抽样(Sampling);
(10)解释(Explain);
(11)分区表(Partitioned tables);
(12)所有的HiveDDL操作函数,包括:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE;
(13)大多数Hive数据类型TINYINT、SMALLINT、INT、BIGINT、BOOLEAN、FLOAT、DOUBLE、STRING、BINARY、TIMESTAMP、DATE、ARRAY<>、MAP<>、STRUCT<>。

五、通过JDBC连接数据库

Spark SQL还包括一个可以通过JDBC从其他数据库读取数据的数据源,并返回一个DataFrame,在Spark SQL很容易处理,或者Join其他的数据源。除了Scala语言,Java或Python语言也很容易操作而不需要提供一个Class Tag。(不同于Spark SQL JDBC server允许其他应用程序使用Spark SQL运行查询。)

在Spark类路径中包含特定数据库的JDBC驱动程序,如通过Spark Shell连接postgresql命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
val jdbcDF = sqlContext.load("jdbc", Map(
   "url" -> "jdbc:postgresql:dbserver",
   "dbtable" -> "schema.tablename"))

使用数据源API,加载远程数据库的表作为一个DataFrame和Spark SQL临时表在这里插入图片描述

文章来源:《Spark核心技术与高级应用》 作者:于俊;向海;代其锋;马海平

文章内容仅供学习交流,如有侵犯,联系删除哦!

相关文章:

  • 字符串函数和内存函数
  • 极智AI | GPT4来了,ChatGPT又该升级了
  • 处理机调度与死锁-----计算机操作系统
  • Java中有哪几种方式来创建线程执行任务?
  • 制作INCA和CANape通用的A2L
  • 【华为OD机试真题2023 JAVA】寻找核酸检测点
  • ChatGPT是如何训练得到的?通俗讲解
  • 蓝桥杯历年真题训练
  • 做技术,最忌讳东张西望
  • 大学生考研的意义?
  • 2023年最新最全 VSCode 插件推荐
  • 大屏页面使用transform属性scale进行缩放,高德地图点击事件失效,地图展示模糊
  • Docker圣经:大白话说Docker底层原理,6W字实现Docker自由
  • css实现炫酷充电动画
  • hadoop理论基础(一)
  • JS 中的深拷贝与浅拷贝
  • Angular 响应式表单 基础例子
  • Java程序员幽默爆笑锦集
  • js操作时间(持续更新)
  • laravel5.5 视图共享数据
  • macOS 中 shell 创建文件夹及文件并 VS Code 打开
  • PV统计优化设计
  • windows-nginx-https-本地配置
  • XForms - 更强大的Form
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 关于Flux,Vuex,Redux的思考
  • 关于字符编码你应该知道的事情
  • 缓存与缓冲
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 开源SQL-on-Hadoop系统一览
  • 坑!为什么View.startAnimation不起作用?
  • 前端路由实现-history
  • 融云开发漫谈:你是否了解Go语言并发编程的第一要义?
  • 深度学习在携程攻略社区的应用
  • 通过git安装npm私有模块
  • 一道闭包题引发的思考
  • 用jquery写贪吃蛇
  • 怎么将电脑中的声音录制成WAV格式
  • puppet连载22:define用法
  • ​ ​Redis(五)主从复制:主从模式介绍、配置、拓扑(一主一从结构、一主多从结构、树形主从结构)、原理(复制过程、​​​​​​​数据同步psync)、总结
  • ​2021半年盘点,不想你错过的重磅新书
  • # 数论-逆元
  • #我与Java虚拟机的故事#连载01:人在JVM,身不由己
  • $.ajax()参数及用法
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (libusb) usb口自动刷新
  • (pt可视化)利用torch的make_grid进行张量可视化
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (二)构建dubbo分布式平台-平台功能导图
  • (一)认识微服务
  • (转)总结使用Unity 3D优化游戏运行性能的经验
  • .bat批处理(七):PC端从手机内复制文件到本地
  • .NET Core 中插件式开发实现
  • .Net FrameWork总结