当前位置: 首页 > news >正文

Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":sc = SparkContext("local","Simple App")spark=SparkSession(sc)peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()rowRDD.createOrReplaceTempView("employee")personsDF = spark.sql("select * from employee")personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

相关文章:

  • GrayLog日志平台的基本使用-ssh接入Dashboards展示
  • 使用python netmiko模块批量配置Cisco、华为、H3C路由器交换机(支持 telnet 和 ssh 方式)
  • QT 输入框输入限制 正则表达式限制 整理
  • Midjourney v6 正式发布,AI创新工坊同步更新
  • 与擎创科技共建一体化“数智”运维体系,实现数字化转型
  • 在k8s中将gitlab-runner的运行pod调度到指定节点
  • 【学习笔记】Java函数式编程03 Stream流-终结操作
  • 在ajax中使用callback
  • CNVD原创漏洞审核和处理流程
  • Python in Visual Studio Code 2023年12月发布
  • 基于 Linux 的批量上传本地 Git 仓库到 Github 的实践
  • Dash中的callback的使用 多input 6
  • 人工智能_机器学习078_聚类算法_概念介绍_聚类升维_降维_各类聚类算法_有监督机器学习_无监督机器学习---人工智能工作笔记0118
  • 个性化定制的知识付费小程序,为用户提供个性化的知识服务,知识付费saas租户平台
  • Maven 项目依赖仓库配置详解:pom.xml 中的 repositories 与 Maven 配置文件的调用顺序
  • 自己简单写的 事件订阅机制
  • [数据结构]链表的实现在PHP中
  • [译]前端离线指南(上)
  • C++11: atomic 头文件
  • Cumulo 的 ClojureScript 模块已经成型
  • js ES6 求数组的交集,并集,还有差集
  • PHP 使用 Swoole - TaskWorker 实现异步操作 Mysql
  • php的插入排序,通过双层for循环
  • Puppeteer:浏览器控制器
  • React16时代,该用什么姿势写 React ?
  • 从0到1:PostCSS 插件开发最佳实践
  • - 概述 - 《设计模式(极简c++版)》
  • 构建工具 - 收藏集 - 掘金
  • 利用阿里云 OSS 搭建私有 Docker 仓库
  • 少走弯路,给Java 1~5 年程序员的建议
  • 实战:基于Spring Boot快速开发RESTful风格API接口
  • 阿里云ACE认证之理解CDN技术
  • ​configparser --- 配置文件解析器​
  • ​flutter 代码混淆
  • ​LeetCode解法汇总2696. 删除子串后的字符串最小长度
  • ​卜东波研究员:高观点下的少儿计算思维
  • ​软考-高级-信息系统项目管理师教程 第四版【第23章-组织通用管理-思维导图】​
  • # 达梦数据库知识点
  • #我与Java虚拟机的故事#连载11: JVM学习之路
  • (04)odoo视图操作
  • (1/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (11)MATLAB PCA+SVM 人脸识别
  • (android 地图实战开发)3 在地图上显示当前位置和自定义银行位置
  • (NSDate) 时间 (time )比较
  • (poj1.3.2)1791(构造法模拟)
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (介绍与使用)物联网NodeMCUESP8266(ESP-12F)连接新版onenet mqtt协议实现上传数据(温湿度)和下发指令(控制LED灯)
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (一)UDP基本编程步骤
  • (已解决)报错:Could not load the Qt platform plugin “xcb“
  • ***原理与防范
  • .aanva
  • .NET简谈互操作(五:基础知识之Dynamic平台调用)
  • .net流程开发平台的一些难点(1)