当前位置: 首页 > news >正文

详解 Flink Table API 和 Flink SQL 之函数

一、系统内置函数

1. 比较函数

API函数表达式示例
Table API===,>,<,!=,>=,<=id===1001,age>18
SQL=,>,<,!=,>=,<=id=‘1001’,age>18

2. 逻辑函数

API函数表达式示例
Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalse
SQLand,or,is false,not1>1 and 2>1,true is false

3. 算术函数

API函数表达式示例
Table API+,-,*,/,n1.power(n2)1 + 1,2.power(2)
SQL+,-,*,/,power(n1,n2)1 + 1,power(2,2)

4. 字符串函数

API函数表达式示例
Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()
SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)

5. 时间函数

API函数表达式示例
Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutes
SQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day

6. 聚合函数

API函数表达式示例
Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0
SQLcount(),sum(),rank(),row_number()count(*),sum(age)

二、用户自定义函数(UDF)

UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用

1. 标量函数

Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出

/**用户自定义标量函数步骤:1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestScalarFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode = new HashCode(0.8);//3.在环境中注册函数tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法public static class HashCode extends ScalarFunction {private int factor;public HashCode(int factor) {this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义public int eval(String str) {return str.hashCode() * factor;}}
}

2. 表函数

Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出

/**用户自定义表函数步骤:1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestTableFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split = new Split("_");//3.在环境中注册函数tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>public static class Split extends TableFunction<Tuple2<String, Integer>> {private String separator = ",";public Split(String separator) {this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法public void eval(String str) {for(String s : str.split(separator)) {//使用 collect() 方法输出结果collect(new Tuple2<>(s, s.length()));}}}
}

3. 聚合函数

Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出

/**用户自定义聚合函数步骤:1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp = new AvgTemp();//3.在环境中注册函数tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");tableEnv.toRetractStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(resultSqlTable, Row.class).print("sql");env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double, Integer>> {@Overridepublic Double getValue(Tuple2<Double, Integer> accumulator) {return accumulator.f0 / accumulator.f1;}@Overridepublic Tuple2<Double, Integer> createAccumulator() {return new Tuple2<>(0.0, 0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Tuple2<Double, Integer> accumulator, Double temp) {accumulator.f0 += temp;accumulator.f1 += 1;}}
}

4. 表聚合函数

Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出

/**用户自定义表聚合函数步骤:1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数3.在 Table API 和 SQL 中使用
*/
public class TestAggregateFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream = InputStream.map(line -> {String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp = new Top2Temp();//3.在环境中注册函数tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用tableEnv.toRetractStream(resultTable, Row.class).print("result");env.execute();}//定义一个 Accumulatorpublic static class Top2TempAcc {private Double highestTemp = Double.MIN_VALUE;private Double secondHighestTemp = Double.MIN_VALUE;public Double getHighestTemp() {return highestTemp;}public void setHighestTemp(Double highestTemp) {this.highestTemp=highestTemp;}public Double getSecondHighestTemp() {return secondHighestTemp;}public void setSecondHighestTemp(Double secondHighestTemp) {this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, Integer>, Top2TempAcc> {@Overridepublic void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> out) {out.collect(new Tuple2<>(acc.getHighestTemp(), 1));out.collect(new Tuple2<>(acc.getSecondHighestTemp(), 2));}@Overridepublic Top2TempAcc createAccumulator() {return new Top2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublic void accumulate(Top2TempAcc acc, Double temp) {if(acc.getHighestTemp() < temp) {acc.setSecondHighestTemp(acc.getHighestTemp());acc.setHighestTemp(temp);} else if(acc.getSecondHighestTemp() < temp) {acc.setSecondHighestTemp(temp);}}}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 计算机网络(3) 字节顺序:网络字节序与IPv4
  • Stack详解(含动画演示)
  • Hutool有哪些常用方法
  • 服务架构的设计原则
  • DS1338/PT7C4338串行实时时钟-国产兼容RS4C1338
  • 如何免费用 Qwen2 辅助你翻译与数据分析?
  • Excel根据身份证号提取信息
  • C语言详解(预编译)
  • App推广效果分析,Xinstall助力精准优化
  • 【wiki知识库】06.文档管理页面的添加--前端Vue部分
  • 记录pytest中场景执行的token异常处理问题
  • 加油卡APP开发,汽车加油便捷新方式
  • C++:调整数组顺序使奇数位于偶数前面【面试】
  • 创新共享经济:探索Web3对新商业模式的启迪
  • 【Python入门与进阶】Python函数的定义与使用
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • 《微软的软件测试之道》成书始末、出版宣告、补充致谢名单及相关信息
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • android 一些 utils
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • AzureCon上微软宣布了哪些容器相关的重磅消息
  • classpath对获取配置文件的影响
  • Go 语言编译器的 //go: 详解
  • mac修复ab及siege安装
  • session共享问题解决方案
  • vue数据传递--我有特殊的实现技巧
  • 服务器从安装到部署全过程(二)
  • 计算机在识别图像时“看到”了什么?
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 两列自适应布局方案整理
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 限制Java线程池运行线程以及等待线程数量的策略
  • ​Kaggle X光肺炎检测比赛第二名方案解析 | CVPR 2020 Workshop
  • # SpringBoot 如何让指定的Bean先加载
  • ### RabbitMQ五种工作模式:
  • #if等命令的学习
  • $.ajax,axios,fetch三种ajax请求的区别
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (2)从源码角度聊聊Jetpack Navigator的工作流程
  • (2022 CVPR) Unbiased Teacher v2
  • (arch)linux 转换文件编码格式
  • (cljs/run-at (JSVM. :browser) 搭建刚好可用的开发环境!)
  • (Matlab)使用竞争神经网络实现数据聚类
  • (ZT)出版业改革:该死的死,该生的生
  • (八十八)VFL语言初步 - 实现布局
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (官网安装) 基于CentOS 7安装MangoDB和MangoDB Shell
  • (南京观海微电子)——I3C协议介绍
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • . ./ bash dash source 这五种执行shell脚本方式 区别
  • ./configure,make,make install的作用(转)
  • .a文件和.so文件
  • .net 4.0发布后不能正常显示图片问题
  • .NET 的程序集加载上下文