当前位置: 首页 > news >正文

详解 Spark SQL 代码开发之用户自定义函数

一、UDF

一进一出函数

/**语法:SparkSession.udf.register(func_name: String, op: T => K)
*/
object TestSparkSqlUdf {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:给 username 字段的每个值添加前缀*/spark.udf.register("prefixName", name => "Name: " + name)df.createOrReplaceTempView("user")spark.sql("select prefixName(username), age from user").show()// 关闭环境spark.close()}}

二、UDAF

多进一出函数,即聚合函数

1. 弱类型函数

/**自定义步骤:1.继承 UserDefinedAggregateFunction 抽象类(已过时)2.重写 8 个方法
*/
object TestSparkSqlUdaf {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定义求年龄平均值的udaf函数*/val myAvgUdaf = new MyAvgUdaf()spark.udf.register("ageAvg", myAvgUdaf)df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 关闭环境spark.close()}}// 自定义聚合函数类,实现求年龄平均值
class MyAvgUdaf extends UserDefinedAggregateFunction {// 输入数据的结构类型def inputSchema: StructType = {// StructType 是样例类StructType(Array(// StructField 是样例类,必传参数 name: String, dataType: DataTypeStructField("age", LongType)))} // 缓冲区的结构类型def bufferSchema: StructType = {StructType(Array(StructField("totalAge", LongType),StructField("count", LongType)))}// 输出数据的结构类型def dataType: DataType = DoubleType// 函数稳定性def deterministic: Boolean = true// 缓冲区初始化def initialize(buffer: MutableAggregationBuffer): Unit = {buffer.update(0, 0L)buffer.update(1, 0L)}// 接收输入数据更新缓冲区数据def update(buffer: MutableAggregationBuffer, input: Row): Unit = {val totalAge = buffer.getLong(0)val count = buffer.getLong(1)val age = input.getLong(0)buffer.update(0, totalAge + age)buffer.update(1, count + 1)}// 合并缓冲区def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))}// 计算最终结果def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble/buffer.getLong(1)}}

2. 强类型函数

2.1 Spark3.0 之前
/**自定义步骤:1.继承 Aggregator 抽象类,定义泛型IN:输入数据类型BUF:缓冲区类型OUT:输出数据类型2.重写 6 个方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定义求年龄平均值的udaf函数*/// Spark3.0 之前的强类型UDAF函数必须在 DSL 语法中使用val ds = df.as[User]// 将UDAF函数对象转换成 DSL 语法中的查询列val col: TypedColumn[User, Double] = new MyAvgUdaf().toColumnds.select(col).show()// 关闭环境spark.close()}}// 定义封装输入的一行数据的类
case class User(username: String, age: Long)// 定义缓冲区类
case class Buff(var totalAge: Long, var count: Long)// 自定义聚合函数类,实现求年龄平均值
class MyAvgUdaf extends Aggregator[User, Buff, Long] {// 缓冲区初始化override def zero: Buff = Buff(0L, 0L)// 根据输入数据更新缓冲区数据override def reduce(buff: Buff, in: User): Buff = {buff.totalAge = buff.totalAge + in.agebuff.count = buff.count + 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 计算最终结果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默认的编解码器,用于序列化,固定写法//自定义类型是 product // 缓冲区编码操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出数据编码操作// 自带类型根据类型选择override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}
2.2 Spark3.0 之后
/**自定义步骤:1.继承 Aggregator 抽象类,定义泛型IN:输入数据类型BUF:缓冲区类型OUT:输出数据类型2.重写 6 个方法
*/
object TestSparkSqlUdaf1 {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df: DataFrame = spark.read.json("data/user.json")/*需求:自定义求年龄平均值的udaf函数*/// Spark3.0 之后的强类型UDAF可以在 SQL 语法中使用val myAvgUdaf = new MyAvgUdaf()// 注册函数时需要使用 functions.udaf(func) 包装转换spark.udf.register("ageAvg", functions.udaf(myAvgUdaf))df.createOrReplaceTempView("user")spark.sql("select ageAvg(age) from user").show()// 关闭环境spark.close()}}// 定义缓冲区类
case class Buff(var totalAge: Long, var count: Long)// 自定义聚合函数类,实现求年龄平均值
class MyAvgUdaf extends Aggregator[Long, Buff, Long] {// 缓冲区初始化override def zero: Buff = Buff(0L, 0L)// 根据输入数据更新缓冲区数据override def reduce(buff: Buff, in: Long): Buff = {buff.totalAge = buff.totalAge + inbuff.count = buff.count + 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff = {buff1.totalAge = buff1.totalAge + buff2.totalAgebuff1.count = buff1.count + buff2.countbuff1}// 计算最终结果override def finish(buff: Buff): Double = {buff.totalAge.toDouble/buff.count} //DataSet 默认的编解码器,用于序列化,固定写法//自定义类型是 product // 缓冲区编码操作override def bufferEncoder: Encoder[Buff] = Encoders.product// 输出数据编码操作// 自带类型根据类型选择override def outputEncoder: Encoder[Double] = Encoders.scalaDouble}

相关文章:

  • Flink状态State | 大数据技术
  • elementUI - 折叠以及多选的组件
  • Java1.8+ idea hbuilder+ uniapp、vue上门家政小程序APP源码开发
  • 【Spring Cloud】微服务日志收集系统-ELK+Kafka
  • AndroidFlutter混合开发
  • 一个小时搞定JAVA面向对象(4)——继承
  • LeetCode-239.滑动窗口最大值
  • 用增之Google
  • 24、Linux网络端口
  • 详解redis配置文件
  • SQL常用语句--模糊查询LIKE
  • Android 编译 C 文件报错 fatal error: ‘jni.h‘ file not found
  • 网络安全中攻击溯源方法
  • 对人脸图像进行性别和年龄的判断
  • 结构体指针
  • 【vuex入门系列02】mutation接收单个参数和多个参数
  • 3.7、@ResponseBody 和 @RestController
  • Create React App 使用
  • github从入门到放弃(1)
  • java8 Stream Pipelines 浅析
  • javascript面向对象之创建对象
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • maven工程打包jar以及java jar命令的classpath使用
  • python学习笔记 - ThreadLocal
  • Spring技术内幕笔记(2):Spring MVC 与 Web
  • Vue 动态创建 component
  • 分布式事物理论与实践
  • 关于Flux,Vuex,Redux的思考
  • 计算机常识 - 收藏集 - 掘金
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 使用Gradle第一次构建Java程序
  • 双管齐下,VMware的容器新战略
  • 我的业余项目总结
  • 原生JS动态加载JS、CSS文件及代码脚本
  • 《码出高效》学习笔记与书中错误记录
  • postgresql行列转换函数
  • # centos7下FFmpeg环境部署记录
  • #{} 和 ${}区别
  • #laravel部署安装报错loadFactoriesFrom是undefined method #
  • #宝哥教你#查看jquery绑定的事件函数
  • #职场发展#其他
  • $.ajax中的eval及dataType
  • $nextTick的使用场景介绍
  • (+4)2.2UML建模图
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (Forward) Music Player: From UI Proposal to Code
  • (六)Flink 窗口计算
  • (论文阅读26/100)Weakly-supervised learning with convolutional neural networks
  • (免费领源码)Python#MySQL图书馆管理系统071718-计算机毕业设计项目选题推荐
  • (学习日记)2024.01.19
  • (转)Linux整合apache和tomcat构建Web服务器
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • .net 8 发布了,试下微软最近强推的MAUI
  • .Net 8.0 新的变化
  • .NET Core 通过 Ef Core 操作 Mysql