当前位置: 首页 > news >正文

大数据随记 —— DataFrame 与 RDD 之间的相互转换

大数据系列文章:👉 目录 👈

在这里插入图片描述

文章目录

    • ① 通过反射获取 RDD 内的 Scheme
    • ② 通过编程接口执行 Scheme

在 Spark SQL 中有两种方式可以在 DataFrame 和 RDD 中进行转换:

  • ① 利用反射机制,推导包含某种类型的 RDD,通过反射将其转换为指定类型的 DataFrame,适用于提前知道 RDD 的 Schema。
  • ② 通过编程借口与 RDD 进行交互获取 Schema,并动态创建 DataFrame,在运行时决定列及其类型。

DataFrame 中的数据结构信息,即为 Scheme

① 通过反射获取 RDD 内的 Scheme

(使用条件)已知类的 Schema,使用这种基于反射的方法会让代码更加简洁而且效果也更好。在 Scala 中,使用 case class 类型导入 RDD 并转换为 DataFrame,通过 case class 创建 Schema,case class 的参数名称会被利用反射机制作为列名。case class 可以嵌套组合成 Sequences 或者 Array。这种 RDD 可以高效的转换为 DataFrame 并注册为表。

其次,如果需要 RDD 与 DFS 或者 DS 之间互相操作,那么需要引入 import sqlContext.implicits._

  • 这里的 sqlContext 不是包名,而是创建的 SparkSession 对象(这里为 SQLContext 对象)的变量名称,所以必须先创建 SparkSession 对象再导入。
  • 这里 sqlContext 对象不能使用 var 声明,因为 Scala 只支持 val 修饰的对象的引入。

SparkSession 是 Spark 2.0 引入的概念,其封装了 SQLContext 和 HiveContext。

package sparksql  
  
import org.apache.spark.sql.SQLContext  
import org.apache.spark.{SparkConf, SparkContext}  
  
object DataFrametoRDDofReflection {  
  def main(args: Array[String]): Unit = {  
  
  }  

  def method1():Unit = {  
  
    val sparkConf = new SparkConf().setAppName("DataFrametoRDDofReflection").setMaster("local[2]")  
    val sc = new SparkContext(sparkConf)  
    val sqlContext = new SQLContext(sc)  
    
    // 引入 sqlContext.implicits._
    import sqlContext.implicits._  
  
    // 将 RDD 转成 DataFrame    
	/*val people = sc.textFile("people.txt").toDF()*/    
	val people = sc.textFile("people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt)).toDF()  
  
    people.show()  
  
    people.registerTempTable("people")  
    val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age >= 13 AND age <= 19")  
    teenagers.show()  
  
    // DataFrame 转成 RDD 进行操作:根据索引号取值  
    teenagers.map(t=>"Name:" + t(0)).collect().foreach(println)  
  
    // DataFrame 转成 RDD 进行操作:根据字段名称取值  
    teenagers.map(t=>"Name:" + t.getAs[String]("name")).collect().foreach(println)  
  
    // DataFrame 转成 RDD 进行操作:一次返回多列的值  
    teenagers.map(_.getValuesMap[Any](List("name","age"))).collect().foreach(println)  
  
    sc.stop()  
  
  }

  
  /**  
   * 定义 Person 类  
   * @param name 姓名  
   * @param age 年龄  
   */  
  case class Person(name:String,age:Int)  
  
}

② 通过编程接口执行 Scheme

通过 Spark SQL 的接口创建 RDD 的 Schema,这种方式会让代码比较冗长。这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成 Schema。可以通过以下三步创建 DataFrame:

  • 第一步将 RDD 转为包含 row 对象的 RDD
  • 第二步基于 structType 类型创建 Schema,与第一步创建的 RDD 想匹配
  • 第三步通过 SQLContext 的 createDataFrame 方法对第一步的 RDD 应用 Schema
package sparksql  
  
import org.apache.spark.sql.SQLContext  
import org.apache.spark.{SparkConf, SparkContext}  
  
object DataFrametoRDDofInterface {  
  
  def main(args: Array[String]): Unit = {  
    method2()  
  }  
  
  def method2(): Unit = {  
    val sparkConf = new SparkConf().setAppName("DataFrametoRDDofInterface").setMaster("local[2]")  
    val sc = new SparkContext(sparkConf)  
    val sqlContext = new SQLContext(sc)  
  
    import sqlContext.implicits._  
    
    val people = sc.textFile("people.txt")  
  
    // 以字符串的方式定义 DataFrame 的 Schema 信息  
    val schemaString = "name age"  
  
    // 导入所需要的类  
    import org.apache.spark.sql.Row  
    import org.apache.spark.sql.types.{StructType,StructField,StringType}  
  
    // 根据自定义的字符串 schema 信息产生 DataFrame 的 Schema    
    val schema = StructType(  
      schemaString.split(" ").map(fieldName=>StructField(fieldName,StringType,true)))  
    
    // 将 RDD 转换成 Row    
    val rowRDD = people.map(_.split(",")).map(p=>Row(p(0),p(1).trim))  
  
    // 将 Schema 作用到 RDD 上  
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD,schema)  
  
    // 将 DataFrame 注册成临时表  
    peopleDataFrame.registerTempTable("people")  
  
    // 获取 name 字段的值  
    val results = sqlContext.sql("SELECT name FROM people")  
    results.map(t => "Name" + t(0)).collect().foreach(println) 
     
    sc.stop()  
    
  }  
}

在这里插入图片描述

相关文章:

  • React 学习笔记 2022-08
  • 【实用工具系列之爬虫】python爬取资讯数据
  • 易基因|植物育种:ChIP-seq(组蛋白)揭示H3K36me修饰影响温度诱导的植物可变剪接和开花
  • java计算机毕业设计计算机组成原理教学演示软件源码+数据库+系统+lw文档+mybatis+运行部署
  • Elasticsearch:简体繁体转换分词器 - STConvert analysis
  • C语言经典算法实例1:求二维数组最大最小值
  • 【C++入门基础】命名空间 | 缺省参数 | 函数重载 | 引用 | 内联函数
  • VMware安装与配置Linux 虚拟机
  • Code For Better 谷歌开发者之声——我与Android同成长
  • 【深度学习】(五)目标检测——下篇
  • web前端面试高频考点——Vue3.0新增API(生命周期,ref、toRef 和 toRefs 的理解和最佳使用方式)
  • 管理经济学--重点
  • Java学习路线(个人学习总结)
  • 【数据结构】跳表SkipList代码解析(C++)
  • 中医治疗特发性震颤,哪些食物不能吃?
  • 【技术性】Search知识
  • 【跃迁之路】【444天】程序员高效学习方法论探索系列(实验阶段201-2018.04.25)...
  • HTTP那些事
  • IP路由与转发
  • JavaScript-Array类型
  • JavaScript标准库系列——Math对象和Date对象(二)
  • JSONP原理
  • React组件设计模式(一)
  • spring学习第二天
  • SQL 难点解决:记录的引用
  • windows下使用nginx调试简介
  • 分布式任务队列Celery
  • 汉诺塔算法
  • 计算机常识 - 收藏集 - 掘金
  • 两列自适应布局方案整理
  • 聊聊redis的数据结构的应用
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 漂亮刷新控件-iOS
  • 扫描识别控件Dynamic Web TWAIN v12.2发布,改进SSL证书
  • 系统认识JavaScript正则表达式
  • 在Mac OS X上安装 Ruby运行环境
  • 自动记录MySQL慢查询快照脚本
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • #13 yum、编译安装与sed命令的使用
  • ${ }的特别功能
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (C++17) optional的使用
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (ZT)出版业改革:该死的死,该生的生
  • (二刷)代码随想录第16天|104.二叉树的最大深度 559.n叉树的最大深度● 111.二叉树的最小深度● 222.完全二叉树的节点个数
  • (附源码)springboot美食分享系统 毕业设计 612231
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (转)全文检索技术学习(三)——Lucene支持中文分词
  • * 论文笔记 【Wide Deep Learning for Recommender Systems】
  • .[hudsonL@cock.li].mkp勒索病毒数据怎么处理|数据解密恢复
  • .htaccess 强制https 单独排除某个目录
  • .NET BackgroundWorker
  • .net CHARTING图表控件下载地址