当前位置: 首页 > news >正文

(二)PySpark3:SparkSQL编程

目录

一、SparkSQL介绍

二、创建DataFrame

1、通过ToDF方法

2、通过createDataFrame方法

3、通过读取文件或数据库

三、保存DataFrame

四、DataFrame API

1、显示数据

2、统计信息

3、类RDD操作

4、类Excel操作

5、类SQL表操作

五、DataFrame+SQL

1、注册视图

2、操作Hive表

六、总结


一、SparkSQL介绍

Spark SQL是Apache Spark生态系统的一个关键组件,专注于处理和分析结构化和半结构化的大规模数据。Spark SQL建立在Spark核心之上,为用户提供了高效且易用的数据处理接口,从而将关系型和非关系型数据融入到分布式计算环境中。

核心概念之一是DataFrame API,它提供了一个高级的、面向数据的抽象,允许用户以声明性的方式处理数据。DataFrame是一个分布式的、具有表格结构的数据集,类似于传统数据库中的表。通过DataFrame API,用户可以执行各种数据操作,包括过滤、转换、聚合、连接等,而无需深入了解底层的分布式计算模型。

RDD,DataFrame和DataSet对比:

RDD可以存储任何类型的数据,包括结构化数据、半结构化数据和非结构化数据,RDD的操作更接近底层,更适合对数据进行底层控制和自定义处理。

DataFrame构建在RDD之上,提供了更高级的抽象,是一个分布式的、以列为主的数据集合,类似于关系型数据库中的表。DataFrame可以通过多种数据源进行创建,包括结构化数据源(如JSON、CSV、Parquet)和Hive表,并且提供了丰富的SQL和DataFrameAPI,可以方便地进行数据处理和分析。

DataSet在DataFrame基础上进一步增加了数据类型信息,可以通过编程语言的类型系统来检查错误,并提供更好的编译时类型检查。

DataFrame和DataSet都支持SQL交互式查询,可以和 Hive无缝衔接。DataSet只有Scala语言和Java语言接口中才支持,在Python和R语言接口只支持DataFrame。

二、创建DataFrame

首先导包:

import pandas as pd 
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import Row
pd.DataFrame.iteritems = pd.DataFrame.items

1、通过ToDF方法

可以将RDD用toDF方法转换成DataFrame。

rdd = sc.parallelize([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)])
df = rdd.toDF(["id","name","age","sal"])
df.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

2、通过createDataFrame方法

可以将Pandas.DataFrame转换成pyspark中的DataFrame,也可直接对数据列表、schema进行转换。

#将pandas.DataFrame转换为pyspark.DataFrame
pdf = pd.DataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)],columns=["id","name","age","sal"])
df = spark.createDataFrame(pdf)
print(type(df))#将list转换为pyspark.DataFrame
df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)],["id","name","age","sal"])
print(type(df))#根据指定rdd和schema创建pyspark.DataFrame
schema = StructType([StructField("id", IntegerType(), nullable = False),StructField("name", StringType(), nullable = True),StructField("age", IntegerType(), nullable = True),StructField("sal", FloatType(), nullable = True),])rdd = sc.parallelize([Row(1,"James",27,1000),Row(2,"Bob",22,500),Row(3,"Alice",25,800),Row(4,"Jon",29,1200),])df = spark.createDataFrame(rdd, schema)
print(type(df))

输出结果:

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>

3、通过读取文件或数据库

可以通过读取json、csv等文件,或hive、mysql数据表得到DataFrame。

spark.read.csv(...): 从 CSV 文件中读取数据。
spark.read.json(...): 从 JSON 文件中读取数据。
spark.read.parquet(...): 从 Parquet 文件中读取数据。
spark.read.text(...): 读取文本文件。
spark.read.format(...): 使用指定的格式读取数据。

读取json文件:

df = spark.read.json("test.json")
df.show()

输出结果:test.json内容如下:

{"id":1,"name":"James","age":27,"sal":1000}
{"id":2,"name":"Bob","age":22,"sal":500}
{"id":3,"name":"Alice","age":25,"sal":800}
{"id":4,"name":"Jon","age":29,"sal":1200}

读取parquet文件:

df = spark.read.parquet("data/users.parquet")
df.show()

读取csv文件:

#方式1
df = spark.read.option("header","true") \.option("inferSchema","true") \.option("delimiter", ",") \.csv("test.csv")#方式2
df = spark.read.format("com.databricks.spark.csv") \.option("header", "true") \.option("inferSchema", "true") \.option("delimiter", ",") \.load("test.csv")#方式3
df = spark.read.csv(path="test.csv", header=True,   #指定将第一行作为列名inferSchema=True, #自动推断出每列的数据类型sep=','  #分隔符)
df.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

读取Hive数据表:

spark.sql("CREATE TABLE IF NOT EXISTS test (id INT, name STRING, age INT, sal FLOAT) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'data/test.txt' INTO TABLE test")
df = spark.sql("SELECT * FROM test")

三、保存DataFrame

 通过df.write()对DataFrame进行保存。

#保存为csv文件
df.write.format("csv").option("header","true").save("data/test.csv")#保存为json文件
df.write.json("data/test.json")#保存成parquet文件
df.write.parquet("data/test.parquet")
df.write.partitionBy("age").format("parquet").save("data/test.parquet")#保存成hive数据表
df.write.bucketBy(2, "name").sortBy("age").saveAsTable("test")

四、DataFrame API

1、显示数据

①df.collect()

用于将DataFrame中的所有行收集到Driver节点上,并以列表的形式返回这些行。

df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,"Jon",29,1200)],["id","name","age","sal"])
df.collect()

输出结果:

[Row(id=1, name='James', age=27, sal=1000),Row(id=2, name='Bob', age=22, sal=500),Row(id=3, name='Alice', age=25, sal=800),Row(id=4, name='Jon', age=29, sal=1200)]

②df.first()

获取第一行数据。

df.first()

输出结果:

Row(id=1, name='James', age=27, sal=1000)

③df.head(n)

获取前n行数据。

df.head(2)

输出结果:

[Row(id=1, name='James', age=27, sal=1000),Row(id=2, name='Bob', age=22, sal=500)]

④df.show(n)

与df.head(n)类似,但是df.show(n)是打印成表格。

df.show(2)

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+
only showing top 2 rows

⑤df.printSchema()

用于打印DataFrame的模式schema,定义了各列的名称和类型。

df.printSchema()

输出结果:

root|-- id: long (nullable = true)|-- name: string (nullable = true)|-- age: long (nullable = true)|-- sal: long (nullable = true)

2、统计信息

①df.describe()

一般与df.show()连用,用于查看DataFrame的数据分布。

df.describe().show()

输出结果:

+-------+------------------+-----+------------------+----------------+
|summary|                id| name|               age|             sal|
+-------+------------------+-----+------------------+----------------+
|  count|                 4|    4|                 4|               4|
|   mean|               2.5| null|             25.75|           875.0|
| stddev|1.2909944487358056| null|2.9860788111948193|298.607881119482|
|    min|                 1|Alice|                22|             500|
|    max|                 4|  Jon|                29|            1200|
+-------+------------------+-----+------------------+----------------+

若只想查看某一列的数据分布,如:df.describe('age').show()。

②df.count()

返回数据总行数。

df.count()

输出结果:

4

③聚合函数

一些常用的聚合函数如下sum()、mean()、min()、max()、avg()

#求最小工资
df.select(F.min(df['sal'])).show()
#输出结果:
+--------+
|min(sal)|
+--------+
|     500|
+--------+#求最大工资
df.select(F.max(df['sal'])).show()
#输出结果:
+--------+
|max(sal)|
+--------+
|    1200|
+--------+#求总工资
df.select(F.sum(df['sal'])).show()
#输出结果:
+--------+
|sum(sal)|
+--------+
|    3500|
+--------+#求平均工资
df.select(F.avg(df['sal'])).show()
#输出结果:
+--------+
|avg(sal)|
+--------+
|   875.0|
+--------+

同时对多列操作:

df.agg({"name":"count","age":"max","sal":"avg"}).show()#输出:
+-----------+--------+--------+
|count(name)|max(age)|avg(sal)|
+-----------+--------+--------+
|          4|      29|   875.0|
+-----------+--------+--------+

④df.stat.freqItems()

统计值出现的频率。

#统计age、name两列出现频率超过0.25的值
df.stat.freqItems(("age","name"),0.25).show()

输出结果:

+----------------+--------------------+
|   age_freqItems|      name_freqItems|
+----------------+--------------------+
|[29, 22, 25, 27]|[Bob, Jon, Alice,...|
+----------------+--------------------+

3、类RDD操作

可以把DataFrame当做数据类型为Row的RDD来进行操作。

部分操作需要先转换为RDD才能运行,如map、flatMap等等。

部分操作可以直接在DataFrame上进行,如filter、distinct、sample、cache、intersect等等。

①df.map()

#所有人age+1
rdd = df.rdd.map(lambda x:Row(x[2]+1))
rdd.toDF(["age"]).show()

输出结果:

+---+
|age|
+---+
| 28|
| 23|
| 26|
| 30|
+---+

②df.flatMap()

rdd = df.rdd.flatMap(lambda x:x[1].split('o')).map(lambda x:Row(x))
rdd.toDF(["name"]).show()

输出结果:

+-----+
| name|
+-----+
|James|
|    B|
|    b|
|Alice|
|    J|
|    n|
+-----+

③df.filter()

#筛选工资大于800的
df.filter(col("sal")>800).show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+#筛选姓名为Bob,以下三种方式输出结果一致
df.filter(col("name")=="Bob").show()
df.filter(df["name"]=="Bob").show()
df.filter("name='Bob'").show()+---+----+---+---+
| id|name|age|sal|
+---+----+---+---+
|  2| Bob| 22|500|
+---+----+---+---+#筛选姓名以J开头的
df.filter(col("name").startswith("J")).show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+#筛选除指定值外的其他数据
broads = sc.broadcast(["James","Bob"])
df.filter(~col("name").isin(broads.value)).show() +---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  3|Alice| 25| 800|
|  4|  Jon| 29|1200|
+---+-----+---+----+

④df.distinct()

#去重
df.distinct().show()

⑤df.cache()

#cache缓存
df.cache()#释放缓存
df.unpersist()

⑥df.sample()

随机抽样。

#withReplacement=False表示无放回,即抽取不重复的数据
#fraction=0.5表示抽样的比例为50%
#seed为随机种子,用于复现
df_sample = df.sample(withReplacement=False, fraction=0.5, seed=2)
df_sample.show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  3|Alice| 25| 800|
+---+-----+---+----+

⑦df.intersect(df)

取两个DataFrame所有交集的行,返回结果不包含重复行

df.intersect(df_sample).show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  3|Alice| 25| 800|
|  1|James| 27|1000|
+---+-----+---+----+

⑧df.exceptAll()

求差集。

#从df中移除与df_sample相同的行,返回一个新的DataFrame
df.exceptAll(df_sample).show()

输出结果:

+---+----+---+----+
| id|name|age| sal|
+---+----+---+----+
|  2| Bob| 22| 500|
|  4| Jon| 29|1200|
+---+----+---+----+

4、类Excel操作

①df.withColumn()

增加列。

df = df.withColumn("birthyear",-df["age"]+2024)
df.show()

输出结果:

+---+-----+---+----+---------+
| id| name|age| sal|birthyear|
+---+-----+---+----+---------+
|  1|James| 27|1000|     1997|
|  2|  Bob| 22| 500|     2002|
|  3|Alice| 25| 800|     1999|
|  4|  Jon| 29|1200|     1995|
+---+-----+---+----+---------+

②df.select()

筛选列。

df.select("name","age").show()

输出结果:

+-----+---+
| name|age|
+-----+---+
|James| 27|
|  Bob| 22|
|Alice| 25|
|  Jon| 29|
+-----+---+

③df.drop()

删除列。

#删除一列
df.drop("age").show() #删除多列
df.drop(*["age","birthyear"]).show()

④df.withColumnRenamed()

对列进行重命名。

#对一列进行重命名
df.withColumnRenamed("sal","salary").show() #对多列进行重命名
df.withColumnRenamed("sal","salary").withColumnRenamed("birthyear","year").show() 

⑤df.sort()、df.orderBy()

按照某一列或某几列进行排序。

#按照某一列进行排序
df.sort(df["age"].desc()).show() #降序
df.sort(df["age"].asc()).show()  #升序#按照某几列进行排序
df.orderBy(col("age").asc(), col("sal").desc()).show()

⑥df.na.drop()、df.na.fill()

处理带空值的行。

注意,在填充空值时,只能对相同数据类型的列的空值进行填充。

df = spark.createDataFrame([(1,"James",27,1000),(2,"Bob",22,500),(3,"Alice",25,800),(4,None,29,None)],["id","name","age","sal"])
df.show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4| null| 29|null|
+---+-----+---+----+#删除带有nan值的行
df.na.drop().show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
+---+-----+---+----+#填充nan值
df.na.fill("Jon").show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4|  Jon| 29|null|
+---+-----+---+----+df.na.fill(0).show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
|  3|Alice| 25| 800|
|  4| null| 29|   0|
+---+-----+---+----+

⑦df.replace()

替换指定的值。

#注意,不能同时对不同数据类型的值进行替换
#例如,这句代码会报错:df.replace({"James": "Jim",1000: 100}).show()
df.replace({"James": "Jim", "Bob":"Bieber" }).show()
df.replace({1000: 100}).show()

⑧df.dropDuplicates()

跟distinct方法不同的是,dropDuplicates方法接收传参,可以根据指定字段去重。

df.dropDuplicates(["name"]).show()

5、类SQL表操作

类SQL表操作主要包括表查询(select,selectExpr,where),表连接(join,union,unionAll),表分组(groupby,agg,pivot)等操作。

①df.select()

df.select()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算列。

#筛选两列,并限制输出前两行
#df.limit(2)与df.take(2)类似,不过limit输出的是DataFrame,take输出的是list
df.select("age","name").limit(2).show()
+---+-----+
|age| name|
+---+-----+
| 27|James|
| 22|  Bob|
+---+-----+#可以对列进行操作
df.select("name",df["age"] + 1).show()
+-----+---------+
| name|(age + 1)|
+-----+---------+
|James|       28|
|  Bob|       23|
|Alice|       26|
|  Jon|       30|
+-----+---------+#通过toDF()对列进行重命名
df.select("name",-df["age"]+2024).toDF("name","birth_year").show()
+-----+----------+
| name|birth_year|
+-----+----------+
|James|      1997|
|  Bob|      2002|
|Alice|      1999|
|  Jon|      1995|
+-----+----------+

②df.selectExpr()

df.selectExpr()用于对DataFrame进行列的表达式操作,允许使用SQL表达式来筛选、计算和重命名列。参数是一个字符串列表,其中每个字符串都是一个SQL表达式。

#创建一个UDF函数
spark.udf.register("getBirthYear",lambda x:2024-x)#调用函数对列进行转换并重命名
df.selectExpr("name", "getBirthYear(age) as birth_year" , "UPPER(name) as NAME" ).show()
+-----+----------+-----+
| name|birth_year| NAME|
+-----+----------+-----+
|James|      1997|JAMES|
|  Bob|      2002|  BOB|
|Alice|      1999|ALICE|
|  Jon|      1995|  JON|
+-----+----------+-----+#使用row_number()函数
df.selectExpr("name","age","row_number() over (order by age desc) as order").show()
+-----+---+-----+
| name|age|order|
+-----+---+-----+
|  Jon| 29|    1|
|James| 27|    2|
|Alice| 25|    3|
|  Bob| 22|    4|
+-----+---+-----+

使用df.selectExpr()还可以将DataFrame转换为复合类型。

#array类型
dfarray = df.selectExpr("name","array(age,sal) as info")
dfarray.selectExpr("name","info[0] as age","info[1] as sal").show()#struct类型
df_struct = df.selectExpr("name","struct(age,sal) as info")
df_struct.selectExpr("name","info.age","info.sal").show()#map类型
df_map = df.selectExpr("name","map('age',age,'sal',sal) as info")
df_map.selectExpr("name","info['age'] as age","info['sal'] as sal").show()#输出结果
+-----+---+----+
| name|age| sal|
+-----+---+----+
|James| 27|1000|
|  Bob| 22| 500|
|Alice| 25| 800|
|  Jon| 29|1200|
+-----+---+----+#构造named_struct类型
df_named_struct = df.selectExpr("name","named_struct('age',age,'sal',sal) as info")
df_named_struct.show() 
+-----+----------+
| name|      info|
+-----+----------+
|James|{27, 1000}|
|  Bob| {22, 500}|
|Alice| {25, 800}|
|  Jon|{29, 1200}|
+-----+----------+#转换为json类型
df_named_struct.selectExpr("name","to_json(info) as json_info").show()
+-----+--------------------+
| name|           json_info|
+-----+--------------------+
|James|{"age":27,"sal":1...|
|  Bob|{"age":22,"sal":500}|
|Alice|{"age":25,"sal":800}|
|  Jon|{"age":29,"sal":1...|
+-----+--------------------+

③df.where()

用法与SQL中的where一致,注意书写时,等于是一个“=”。

df.where("name='Bob' or age=27").show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+

④df.join(df)

df.join(df, on='列名', how='inner')。on参数可以指定连接方式为inner、left、right、outer、semi、full、leftanti、anti"等多种方式。关联的列如果有多列,则传入一个列名list。

scores = spark.createDataFrame([("James","English",90),("James","Math",60),("Bob","Math",50),("Bob","Physics",50),("Alice","Math",70),("Alice","Physics",80),("Jon","English",40),("Jon","Math",80)]) \.toDF("name","subject","score") 
scores.show()df.join(scores,on="name",how="inner").show()
df.join(scores,df["name"]==scores["name"],"inner").show()
+-----+---+---+----+-------+-----+
| name| id|age| sal|subject|score|
+-----+---+---+----+-------+-----+
|Alice|  3| 25| 800|   Math|   70|
|Alice|  3| 25| 800|Physics|   80|
|  Bob|  2| 22| 500|   Math|   50|
|  Bob|  2| 22| 500|Physics|   50|
|James|  1| 27|1000|English|   90|
|James|  1| 27|1000|   Math|   60|
|  Jon|  4| 29|1200|English|   40|
|  Jon|  4| 29|1200|   Math|   80|
+-----+---+---+----+-------+-----+

⑤df.union(df)、df.unionAll(df)

df.where("name='Jon'").union(df.limit(2)).show()df.where("name='Jon'").unionAll(df.limit(2)).show()

输出结果:

+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  4|  Jon| 29|1200|
|  1|James| 27|1000|
|  2|  Bob| 22| 500|
+---+-----+---+----+

⑥df.groupBy()

df_join = df.join(scores,on="name",how="inner")
df_join.groupBy("name").mean("score").show()
+-----+----------+
| name|avg(score)|
+-----+----------+
|Alice|      75.0|
|  Bob|      50.0|
|James|      75.0|
|  Jon|      60.0|
+-----+----------+#多列聚合,并重命名
df_join.groupBy("name")\.agg(F.mean("age").alias("mean_age"),F.collect_list("score").alias("scores")).show()
+-----+----------+--------+
| name|mean_score|  scores|
+-----+----------+--------+
|Alice|      75.0|[70, 80]|
|  Bob|      50.0|[50, 50]|
|James|      75.0|[90, 60]|
|  Jon|      60.0|[40, 80]|
+-----+----------+--------+#与以上输出结果一致
df_join.groupBy("name").agg(F.expr("mean(score) as mean_score"),F.expr("collect_list(score) as scores")).show()#数据透视表(行转列)
df_join.groupBy("subject").pivot("name").max("score").show()
+-------+-----+----+-----+----+
|subject|Alice| Bob|James| Jon|
+-------+-----+----+-----+----+
|   Math|   70|  50|   60|  80|
|English| null|null|   90|  40|
|Physics|   80|  50| null|null|
+-------+-----+----+-----+----+

五、DataFrame+SQL

将DataFrame注册为临时表视图或者全局表视图后,可以使用SQL select语句对DataFrame进行操作,从而方便地实现对数据的查询、排序,不过由于DataFrame不可变,所以不支持delete、truncate、update等语句。不过可以通过SparkSQL对Hive表直接进行增删改查等操作。

1、注册视图

当使用createOrReplaceTempView()方法时,会创建一个临时表视图。这个视图只在当前的SparkSession中有效,当会话结束或者程序终止时,该视图也会随之消失。

如果使用createOrReplaceGlobalTempView方法,则会创建一个全局临时表视图。与临时表视图不同,全局临时表在整个Spark应用程序中都是有效的,不会因为单个SparkSession的结束而失效。

无论是临时表视图还是全局表视图,都不会占用额外的内存空间,它们实际上是对现有DataFrame的一种引用或者说是一种命名方式,方便用户通过SQL语句来进行数据操作。

①临时表视图

df.createOrReplaceTempView("test")
query='''select * from testwhere age>26'''
spark.sql(query).show()
+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

②全局表视图

df.createOrReplaceGlobalTempView("test")
query='''select * from global_temp.testwhere age>26'''
spark.sql(query).show()#创建一个新Session也能使用全局表
spark.newSession().sql(query).show()+---+-----+---+----+
| id| name|age| sal|
+---+-----+---+----+
|  1|James| 27|1000|
|  4|  Jon| 29|1200|
+---+-----+---+----+

2、操作Hive表

①创建表

hsql = """CREATE TABLE IF NOT EXISTS `test`(`name` STRING COMMENT '姓名',`age` INT COMMENT '年龄')PARTITIONED BY ( `sex` STRING  COMMENT '性别')
""".replace("\n"," ")
spark.sql(hsql) 

②删除表

hsql= "DROP TABLE IF EXISTS test" 
spark.sql(hsql) 

③写入表

#动态写入数据到hive分区表
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df = spark.createDataFrame([("James",27,"1"),("Bob",22,"1"),("Alice",25,"0"),("Jon",29,"1")]).toDF("name","age","sex")
df.write.mode("overwrite").format("hive")\
.partitionBy("sex").saveAsTable("test")

六、总结

总的来说,Spark SQL是一个功能强大的工具,适合于处理大规模数据集和进行复杂的数据分析。Spark SQL能够访问多种数据源,包括本地数据集、HDFS、Hive、HBase等,并且通过集成类RDD、类Excel、类SQL的数据处理操作,增强了数据处理的易用性和多样性。

相关文章:

  • Python基础语法:基本数据类型(数字类型和布尔类型)
  • 作业1-32 P1059 [NOIP2006 普及组] 明明的随机数
  • Material UI 5 学习01-按钮组件
  • 如何系统的学习Python——Python的基本语法
  • 2025张宇考研数学,百度网盘视频课+36讲PDF讲义+真题
  • 前人砍树型代码写法vue屎山代码
  • ManualResetEvent 在线程中的使用C#
  • 【VTK编译】带PassionReconstruction的编译过程
  • Linux下进程相关概念详解
  • Diffusion Models for Implicit Image Segmentation Ensembles
  • MySQL之索引详解
  • 【RK3288 Android6, T8PRO 快捷按键 gpio 配置上拉输入】
  • ROS从入门到精通4-2:Docker安装ROS、可视化仿真与终端复用
  • 利用MATLAB模拟点电荷的电场分布
  • 通过 python 和 wget 批量下载文件(在Linux/Ubuntu/Debian中测试)
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • Angular数据绑定机制
  • css属性的继承、初识值、计算值、当前值、应用值
  • eclipse(luna)创建web工程
  • HashMap剖析之内部结构
  • JavaScript-Array类型
  • LeetCode18.四数之和 JavaScript
  • ubuntu 下nginx安装 并支持https协议
  • webpack项目中使用grunt监听文件变动自动打包编译
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 初识 beanstalkd
  • 基于webpack 的 vue 多页架构
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 精彩代码 vue.js
  • 如何借助 NoSQL 提高 JPA 应用性能
  • 软件开发学习的5大技巧,你知道吗?
  • 山寨一个 Promise
  • 手机端车牌号码键盘的vue组件
  • 详解移动APP与web APP的区别
  • 2017年360最后一道编程题
  • ​3ds Max插件CG MAGIC图形板块为您提升线条效率!
  • !!Dom4j 学习笔记
  • #pragma multi_compile #pragma shader_feature
  • #QT(串口助手-界面)
  • #在线报价接单​再坚持一下 明天是真的周六.出现货 实单来谈
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (Oracle)SQL优化技巧(一):分页查询
  • (附源码)springboot优课在线教学系统 毕业设计 081251
  • (解决办法)ASP.NET导出Excel,打开时提示“您尝试打开文件'XXX.xls'的格式与文件扩展名指定文件不一致
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (一)使用Mybatis实现在student数据库中插入一个学生信息
  • (源码版)2024美国大学生数学建模E题财产保险的可持续模型详解思路+具体代码季节性时序预测SARIMA天气预测建模
  • (转)GCC在C语言中内嵌汇编 asm __volatile__
  • (转)Mysql的优化设置
  • (转)scrum常见工具列表
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • (转载)从 Java 代码到 Java 堆
  • *Django中的Ajax 纯js的书写样式1
  • .net6+aspose.words导出word并转pdf