当前位置: 首页 > news >正文

SparkSQL---编程模型的操作,数据加载与落地及自定义函数的使用

一、SparkSQL编程模型的创建与转化

1、DataFrame的构建

people.txt数据:
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
people.json数据:在SparkSQL—简介及RDD V.S DataFrame V.S Dataset编程模型详解里

1、从Spark数据源进行创建

 	//创建程序入口val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()val sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据val dataFrame = spark.read.format("json").load("F:\\test\\people.json")//展示数据dataFrame.show()

2、从RDD进行转换

	//创建程序入口val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()val sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载文件val file :RDD[String] = sc.textFile("F:\\test\\person.txt")//按照分隔符进行切分val filemap :RDD[Array[String]] = file.map(_.split(" "))//指定数据类型val tran :RDD[(Int,String,Int)] = filemap.map(x=>(x(0).toInt,x(1),x(2).toInt))//带参数的是指定表头名字val dataFrame2=tran.toDF("id","name","age")

3、通过反射创建DataFrame

	case class Person(id:Int,name:String,age:Int)//样例类反射获取列名创建DataFrame//加载文件val file :RDD[String] = sc.textFile("F:\\test\\person.txt")//按照分隔符进行切分val filemap :RDD[Array[String]] = file.map(_.split(" "))//指定数据类型val tran= filemap.map(x1=>Person(x1(0).toInt,x1(1),x1(2).toInt))//将rdd转换为DataFrameval dataFrame1 = tran.toDF()

4、动态编程

    //数据和结构分离加载的方式动态创建dataFrame//加载数据val row:RDD[Row] = sc.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)))//指定schema/*val structType = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("sex", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))*/val structType = new StructType().add("id","Int").add("name","string").add("sex","Int").add("height","Double")//创建DataFrameval dataFrame3 = spark.createDataFrame(row,structType)//Row:代表的是二维表中的一行记录,或者就是一个Java对象//StructType:是该二维表的元数据信息,是StructField的集合//StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)
2、Dataset的构建
case class Student(id: Int, name: String, sex: Int, age: Int)
object Create_DataSet {def main(args: Array[String]): Unit = {//创建程序入口val spark = SparkSession.builder().appName("dataSet").master("local[*]").getOrCreate()//设置日志级别val sc = spark.sparkContextsc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))//创建DataSetval ds = spark.createDataset[Student](list)//展示输出ds.show()}

注:在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。

3、RDD和DataFrame以及DataSet的互相转换
//创建程序入口val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file = sc.textFile("F:\\test\\person.txt")//切分val fileMap = file.map(_.split(" "))//指定数据类型val tran = fileMap.map(x=>(x(0).toInt,x(1),x(2).toInt))//----------三者之间的转换--------//rdd=>DFval dataFrame = tran.toDF("id","name","age")//rdd=>DSval dataSet = tran.toDS()//DS=>rdddataSet.rdd//DF=>rdddataFrame.rdd//DF=>DSval ds = dataFrame.as[(Int,String,Int)]//DS=>DFval df = ds.toDF()

二、SparkSQL统一数据加载与落地

1、数据加载
    //创建程序入口val spark = SparkSession.builder().appName("load").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据//第一种方式:spark.read.format(数据文件格式).load(path),默认加载的文件格式为parquetspark.read.format("parquet").load("F:\\test\\parquet").show()spark.read.format("json").load("F:\\test\\json").show()spark.read.format("csv").load("F:\\test\\csv").show()//加载数据库中的表的数据spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydata").option("user","root").option("password","root").option("dbtable","person").load().show()//第二种方式spark.read.parquet("F:\\test\\parquet").show()spark.read.json("F:\\test\\json").show()spark.read.csv("F:\\test\\csv").show()//加载数据库中的表val pro =new Properties()pro.put("user","root")pro.put("password","root")spark.read.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro).show()
2、数据落地
	//创建程序入口val spark = SparkSession.builder().appName("save").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据val dataFrame = spark.read.json("F:\\test\\people.json")//数据落地//第一种方式,save的默认格式也是parquetdataFrame.write.format("parquet").save("F:\\test\\parquet")dataFrame.write.format("json").save("F:\\test\\json")dataFrame.write.format("csv").save("F:\\test\\csv")*///将数据保存到数据库dataFrame.write.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydata").option("user","root").option("password","root").option("dbtable","person").save()//第二种方式dataFrame.write.parquet("F:\\test\\parquet")dataFrame.write.json("F:\\test\\json")dataFrame.write.csv("F:\\test\\csv")//保存到数据库val pro = new Properties()pro.setProperty("user","root")pro.setProperty("password","root")dataFrame.write.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro)

三、自定义函数的使用

	val spark = SparkSession.builder().appName("UDF").master("local[*]").getOrCreate()val sc = spark.sparkContextsc.setLogLevel("WARN")//案例//加载文件val dataFrame = spark.read.json("file:\\F:\\test\\people.json")//sql查询风格//首先将数据注册为一张表dataFrame.createOrReplaceTempView("people")//赋予函数功能val fun=(x:String)=>{x.toUpperCase()}//注册函数spark.udf.register("upper",fun)//使用sql风格查询spark.sql("select name, upper(name) from people").show()

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 最新全国各省市水系矢量数据(2024年更新)
  • 【教学类-73-02】20240805广口瓶(宽口瓶)02
  • 类和对象——日期类实现
  • 从一到无穷大 #32 TimeCloth,云上的快速 Point-in-Time Recovery
  • [Meachines] [Easy] shocker CGI-BIN Shell Shock + Perl权限提升
  • python rsa如何安装
  • live2d C++ sdk 分析
  • 高仲富:49岁搞AI,白天种菜卖菜,晚上学数学搞程序
  • 都2024年了,SQL语句还需要手写吗?
  • 【限免】通信信号与干扰信号【附MATLAB代码】
  • python 图片爬虫记录
  • 实时数仓分层架构详解
  • 【Python】常用的pdf提取库介绍对比
  • 电子电气架构 --- SOVD在域控制器的应用
  • 缓存一致性问题
  • [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  • 〔开发系列〕一次关于小程序开发的深度总结
  • Android开源项目规范总结
  • canvas 绘制双线技巧
  • ES6 ...操作符
  • Java 9 被无情抛弃,Java 8 直接升级到 Java 10!!
  • Mysql数据库的条件查询语句
  • NSTimer学习笔记
  • PhantomJS 安装
  • Redux系列x:源码分析
  • Terraform入门 - 3. 变更基础设施
  • VirtualBox 安装过程中出现 Running VMs found 错误的解决过程
  • 得到一个数组中任意X个元素的所有组合 即C(n,m)
  • 基于Volley网络库实现加载多种网络图片(包括GIF动态图片、圆形图片、普通图片)...
  • 精益 React 学习指南 (Lean React)- 1.5 React 与 DOM
  • 利用jquery编写加法运算验证码
  • 盘点那些不知名却常用的 Git 操作
  • 使用 5W1H 写出高可读的 Git Commit Message
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • 选择阿里云数据库HBase版十大理由
  • ​Kaggle X光肺炎检测比赛第二名方案解析 | CVPR 2020 Workshop
  • ​必胜客礼品卡回收多少钱,回收平台哪家好
  • ‌移动管家手机智能控制汽车系统
  • (003)SlickEdit Unity的补全
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (20)docke容器
  • (c语言)strcpy函数用法
  • (Java数据结构)ArrayList
  • (初研) Sentence-embedding fine-tune notebook
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)springboot电竞专题网站 毕业设计 641314
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (四)鸿鹄云架构一服务注册中心
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • (一)为什么要选择C++
  • (原創) 未来三学期想要修的课 (日記)
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)ABI是什么
  • (转)shell中括号的特殊用法 linux if多条件判断
  • (转)使用VMware vSphere标准交换机设置网络连接