当前位置: 首页 > news >正文

spark.sql

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, mean, rank, row_number, desc
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 初始化 SparkSession 对象
spark = SparkSession.builder \.appName("Example PySpark Script with TempView and SQL") \.getOrCreate()# 定义数据结构
schema = StructType([StructField("name", StringType(), True),StructField("age", IntegerType(), True),StructField("city", StringType(), True)
])# 创建第一个 DataFrame
data1 = [("Alice", 34, "New York"),("Bob", 45, "Los Angeles"),("Cathy", 29, "San Francisco"),("David", 32, "Chicago"),("Eve", 27, "Seattle")
]
df1 = spark.createDataFrame(data=data1, schema=schema)# 创建第二个 DataFrame
data2 = [("Frank", 30, "New York"),("Grace", 38, "Los Angeles"),("Hannah", 25, "San Francisco"),("Ian", 42, "Chicago"),("Jack", 28, "Seattle")
]
df2 = spark.createDataFrame(data=data2, schema=schema)# 查看 DataFrame 结构
df1.printSchema()
df2.printSchema()# 使用 filter 过滤年龄大于等于 30 的记录
filtered_df1 = df1.filter(col("age") >= 30)
filtered_df2 = df2.filter(col("age") >= 30)# 使用 group by 计算每个城市的平均年龄
grouped_df1 = filtered_df1.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)grouped_df2 = filtered_df2.groupBy("city").agg(count("name").alias("count"),mean("age").alias("avg_age")
)# 合并两个 DataFrame
merged_df = grouped_df1.union(grouped_df2)# 从合并后的 DataFrame 中随机抽取 50% 的样本
sampled_df = merged_df.sample(withReplacement=False, fraction=0.5)# 限制结果集的大小为 10 条记录
limited_df = sampled_df.limit(10)# 使用窗口函数进行排名
window_spec = Window.partitionBy("city").orderBy(desc("avg_age"))
ranked_df = limited_df.withColumn("rank", rank().over(window_spec)).withColumn("row_number", row_number().over(window_spec))# 将 DataFrame 注册为临时视图
ranked_df.createOrReplaceTempView("ranked_cities")# 使用 SQL 查询
sql_query = """
SELECT city, count, avg_age, rank, row_number
FROM ranked_cities
WHERE rank <= 2
"""# 执行 SQL 查询
sql_results = spark.sql(sql_query)# 显示结果
sql_results.show(truncate=False)# 关闭 SparkSession
spark.stop()

在 PySpark 中,createOrReplaceTempView 方法可以将 DataFrame 注册为临时视图(temporary view),这样就可以使用 SQL 查询来操作 DataFrame。临时视图只在当前 SparkSession 的生命周期内有效,并且在同一 SparkSession 中可以被多次替换。

我们可以在之前的示例中加入 createOrReplaceTempView,以便使用 SQL 查询来完成一些操作。

代码解释

  1. 创建 DataFrame:定义数据结构,并创建两个 DataFrame。
  2. 使用 filter:过滤符合条件的记录。
  3. 使用 group by:按字段进行分组聚合。
  4. 使用 union:将两个 DataFrame 合并。
  5. 使用 sample:从 DataFrame 中随机抽取样本。
  6. 使用 limit:限制结果集的大小。
  7. 使用窗口函数:添加窗口函数来执行复杂的分析。
  8. 使用 createOrReplaceTempView:注册临时视图。
  9. 使用 SQL 查询:执行 SQL 查询。 

在 PySpark 中,执行 SQL 查询可能会比直接使用 DataFrame API 慢一些,原因在于以下几个方面:

  1. SQL 解析和优化:当使用 SQL 查询时,PySpark 需要解析 SQL 语句,将其转换成逻辑计划,然后进行优化,最终生成物理执行计划。这个过程可能需要一些时间,尤其是在复杂的查询中。

  2. Shuffle 操作:如果 SQL 查询涉及 shuffle 操作(例如 group by、join 等),那么数据需要重新分区和排序,这会导致额外的计算开销和磁盘 I/O。在你的例子中,虽然没有涉及 shuffle 操作,但如果查询复杂度增加,shuffle 可能成为瓶颈。

  3. 数据序列化和反序列化:在执行 SQL 查询时,数据可能需要多次序列化和反序列化,这也会影响性能。

  4. 执行计划缓存:对于重复执行的查询,执行计划可以被缓存,从而加速后续执行。但是,对于一次性查询,这种缓存带来的好处有限。

  5. 数据量:如果数据量很大,即使是简单的筛选操作也可能花费一定的时间。

优化建议

为了提高 SQL 查询的性能,可以考虑以下几个优化策略:

  1. 减少 Shuffle:尽量减少涉及 shuffle 的操作,例如使用广播 join 而不是普通的 join。

  2. 缓存 DataFrame:如果你反复使用同一个 DataFrame,可以将其缓存(persist 或 cache)以减少重复计算。

  3. 使用 DataFrame API:尽可能使用 DataFrame API 替代 SQL 查询,因为 DataFrame API 通常更高效。

  4. 索引:虽然 PySpark 本身没有索引的概念,但可以通过预处理数据来减少查询时的数据扫描范围。

  5. 调整配置:调整 Spark 的配置参数,例如增加内存分配、调整 shuffle 的参数等。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 数仓基础(九):各大公司实时数仓实践
  • Spring Security与Apache Shiro:Java安全框架的比较
  • 电信500M宽带+AX210无线网卡测速
  • charles使用ssl证书抓包https请求失败解决方案
  • python-简单的dos攻击
  • Flutter之CRC校验
  • Unity编辑器开发 Immediate Mode GUI (IMGUI)
  • 多目标应用:基于多目标雾凇算法(MORIME)的移动机器人路径规划研究(提供MATLAB代码)
  • 电脑办公之基础操作(持续更新)
  • 帆软报表使用url访问报表,自定义前端搜索,优化报表展示
  • 使用GPU加速及配置
  • java反射:动态修改注解上属性的值
  • java 实现的单例,在static块中实例化是否会有线程安全问题?
  • golang闭包中变量获取
  • pyro.optim pyro ppl 概率编程 优化器 pytorch
  • [LeetCode] Wiggle Sort
  • android图片蒙层
  • C++11: atomic 头文件
  • echarts花样作死的坑
  • HashMap ConcurrentHashMap
  • input实现文字超出省略号功能
  • Java的Interrupt与线程中断
  • mac修复ab及siege安装
  • python 装饰器(一)
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 从零开始的无人驾驶 1
  • 利用DataURL技术在网页上显示图片
  • 聊一聊前端的监控
  • 如何学习JavaEE,项目又该如何做?
  • 实现简单的正则表达式引擎
  • 怎么把视频里的音乐提取出来
  • elasticsearch-head插件安装
  • 国内开源镜像站点
  • ​​​​​​​ubuntu16.04 fastreid训练过程
  • ​DB-Engines 11月数据库排名:PostgreSQL坐稳同期涨幅榜冠军宝座
  • # 达梦数据库知识点
  • ###项目技术发展史
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • ()、[]、{}、(())、[[]]等各种括号的使用
  • (0)Nginx 功能特性
  • (2022 CVPR) Unbiased Teacher v2
  • (2024,LoRA,全量微调,低秩,强正则化,缓解遗忘,多样性)LoRA 学习更少,遗忘更少
  • (3)STL算法之搜索
  • (C)一些题4
  • (PADS学习)第二章:原理图绘制 第一部分
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (第61天)多租户架构(CDB/PDB)
  • (附源码)spring boot北京冬奥会志愿者报名系统 毕业设计 150947
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (附源码)ssm基于jsp高校选课系统 毕业设计 291627
  • (函数)颠倒字符串顺序(C语言)
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (十六)视图变换 正交投影 透视投影
  • (终章)[图像识别]13.OpenCV案例 自定义训练集分类器物体检测