当前位置: 首页 > news >正文

(二开)Flink 修改源码拓展 SQL 语法

1、Flink 扩展 calcite 中的语法解析
1)定义需要的 SqlNode 节点类-以 SqlShowCatalogs 为例
a)类位置

flink/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCatalogs.java

在这里插入图片描述

核心方法

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {writer.keyword("SHOW CATALOGS");}
b)类血缘

在这里插入图片描述

2)修改 includes 目录下的 .ftl 文件,在 parserImpls.ftl 文件中添加语法逻辑
a)文件位置

在这里插入图片描述

b)语法示例
/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowCatalogs SqlShowCatalogs() :
{
}
{<SHOW> <CATALOGS>{return new SqlShowCatalogs(getPos());}
}
3)将 Calcite 源码中的 config.fmpp 文件复制到项目的 src/main/codegen 目录下,修改内容,来声明扩展的部分
a)文件位置

在这里插入图片描述

b)config.fmpp 内容
data: {# 解析器文件路径parser: tdd(../data/Parser.tdd)
}# 扩展文件的目录
freemarkerLinks: {includes: includes/
}
c)Parser.tdd 部分内容
# 生成的解析器包路径
package: "org.apache.flink.sql.parser.impl",
# 解析器名称
class: "FlinkSqlParserImpl",
# 引入的依赖类
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
# 新的关键字
keywords: ["CATALOGS"]
# 新增的语法解析方法
statementParserMethods: ["SqlShowCatalogs()"]
# 包含的扩展语法文件
implementationFiles: ["parserImpls.ftl"]
4)编译模板文件和语法文件

在这里插入图片描述

5)配置扩展的解析器类
withParserFactory(FlinkSqlParserImpl.FACTORY)
2、自定义扩展 Flink 的 Parser 语法
1)定义 SqlNode 类
package org.apache.flink.sql.parser.dql;import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;import java.util.Collections;
import java.util.List;/** XSHOW CATALOGS sql call. */
public class SqlXShowCatalogs extends SqlCall {public static final SqlSpecialOperator OPERATOR =new SqlSpecialOperator("XSHOW CATALOGS", SqlKind.OTHER);public SqlXShowCatalogs(SqlParserPos pos) {super(pos);}@Overridepublic SqlOperator getOperator() {return OPERATOR;}@Overridepublic List<SqlNode> getOperandList() {return Collections.emptyList();}@Overridepublic void unparse(SqlWriter writer, int leftPrec, int rightPrec) {writer.keyword("XSHOW CATALOGS");}
}
2)修改 includes 目录下的 parserImpls.ftl 文件
/**
* Parse a "XShow Catalogs" metadata query command.
*/
SqlXShowCatalogs SqlXShowCatalogs() :
{
}
{<XSHOW> <CATALOGS>{return new SqlXShowCatalogs(getPos());}
}
3)修改 Parser.tdd 文件,新增-声明拓展的部分
imports:"org.apache.flink.sql.parser.dql.SqlXShowCatalogs"keywords:"XSHOW"statementParserMethods:"SqlXShowCatalogs()"
4)重新编译
 mvn generate-resources
5)执行测试用例

可以看到,自定义 SQL 的报错,由解析失败,变为了校验失败。

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class CustomFlinkSql {public static void main(String[] args) throws Exception {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());// 拓展自定义语法 xshow catalogs 前// SQL parse failed. Non-query expression encountered in illegal contexttEnv.executeSql("xshow catalogs").print();// 拓展自定义语法 xshow catalogs 后// SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall}
}
6)查看生成的扩展解析器类

可以看到,在 FlinkSqlParserImpl 中,自定义的解析语法已经生成了。

在这里插入图片描述

3、validate 概述

在向 Flink 中添加完自定义的解析规则后,报错信息如下:

SQL validation failed. org.apache.flink.sql.parser.dql.SqlXShowCatalogs cannot be cast to org.apache.calcite.sql.SqlBasicCall
修改 validate 部分的代码
1)FlinkPlannerImpl#validate

作用:校验 SqlNode ,如果是 show catalogs 语法时直接返回。

在这里插入图片描述

sqlNode.isInstanceOf[SqlXShowCatalogs]
2)SqlToOperationConverter#convert

作用:将校验过的 SqlNode 转换为 Operator。

在这里插入图片描述

else if (validated instanceof SqlXShowCatalogs) {return Optional.of(converter.convertXShowCatalogs((SqlXShowCatalogs) validated));
}
3)SqlToOperationConverter#convertXShowCatalogs
/** Convert SHOW CATALOGS statement. */
private Operation convertXShowCatalogs(SqlXShowCatalogs sqlXShowCatalogs) {return new XShowCatalogsOperation();
}
4)XShowCatalogsOperation
package org.apache.flink.table.operations;public class XShowCatalogsOperation implements ShowOperation {@Overridepublic String asSummaryString() {return "SHOW CATALOGS";}
}
4、执行测试用例
package org.apache.flink.table.examples.java.custom;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;public class CustomFlinkSql {public static void main(String[] args) throws Exception {TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().build());// FlinkSQL原本支持的语法tEnv.executeSql("show catalogs").print();// 自定义语法tEnv.executeSql("xshow catalogs").print();}
}

在这里插入图片描述

5、总结-FlinkSQL 的执行流程
1、对 SQL 进行校验final SqlNode validated = flinkPlanner.validate(sqlNode);2、预校验重写 Insert 语句3、调用 SqlNode.validate() 进行校验1)如果是:ExtendedSqlNode【SqlCreateHiveTable、SqlCreateTable、SqlTableLike】2)如果是:SqlKind.DDL、SqlKind.INSERT 等,无需校验,直接返回 SqlNode3)如果是:SqlRichExplain4)其它:validator.validate(sqlNode)1.校验作用域和表达式:validateScopedExpression(topNode, scope)a)将 SqlNode 进行规范化重写b)如果SQL是【TOP_LEVEL = concat(QUERY, DML, DDL)】,则在父作用域中注册查询c)校验 validateQuery i)validateFeatureii)validateNamespaceiii)validateModalityiv)validateAccessv)validateSnapshotd)如果SQL不是【TOP_LEVEL = concat(QUERY, DML, DDL)】进行类型推导2.获取校验之后的节点类型2、将 SQLNode 转换为 Operationconverter.convertSqlQuery(validated)1)生成逻辑执行计划 RelNodeRelRoot relational = planner.rel(validated);1.对查询进行转换sqlToRelConverter.convertQuery(validatedSqlNode)2)创建 PlannerQueryOperationnew PlannerQueryOperation(relational.project());3、将 Operation 转换为 List<Transformation<?>>
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));1)对 RelNode 逻辑执行计划进行优化,获取 optimizedRelNodesval optimizedRelNodes = optimize(relNodes)2)将 optimizedRelNodes 转换为 execGraphval execGraph = translateToExecNodeGraph(optimizedRelNodes)3)将 execGraph 转换为 transformations1.使用代码生成技术生成Function,后续可以反射调用val convertFunc = CodeGenUtils.genToInternalConverter(ctx, inputType)

相关文章:

  • Day 13 python学习笔记
  • Leetcode.274 H 指数
  • 如何使用python快速修改Excel表单中的大量数据
  • 【OJ for Divide and Conquer】OJ题解
  • Mysql第四篇---数据库索引优化与查询优化
  • 基于PHP的仓库库存管理系统设计与实现(源码+lw+部署文档+讲解等)
  • 主定理(一般式)
  • spring框架回顾
  • 05、Python -- 爬取ts文件格式视频思路
  • 高三高考免费试卷真题押题知识点合集
  • Android拖放startDragAndDrop拖拽onDrawShadow动态添加View,Kotlin(3)
  • 多个相同地址的I2C设备,如何挂载在同一条总线上
  • Ansible脚本进阶---playbook
  • Web入门笔记
  • UE5使用Dash插件实现程序化地形场景制作
  • JS 中的深拷贝与浅拷贝
  • C++类中的特殊成员函数
  • Java 23种设计模式 之单例模式 7种实现方式
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • JavaWeb(学习笔记二)
  • jquery cookie
  • JS变量作用域
  • JS数组方法汇总
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • socket.io+express实现聊天室的思考(三)
  • 安卓应用性能调试和优化经验分享
  • 半理解系列--Promise的进化史
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 短视频宝贝=慢?阿里巴巴工程师这样秒开短视频
  • 高程读书笔记 第六章 面向对象程序设计
  • 码农张的Bug人生 - 初来乍到
  • 人脸识别最新开发经验demo
  • 如何用vue打造一个移动端音乐播放器
  • 带你开发类似Pokemon Go的AR游戏
  • ### Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLTr
  • #pragma 指令
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (C语言)fgets与fputs函数详解
  • (Matlab)使用竞争神经网络实现数据聚类
  • (二)PySpark3:SparkSQL编程
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)spring boot智能服药提醒app 毕业设计 102151
  • (剑指Offer)面试题34:丑数
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (四)【Jmeter】 JMeter的界面布局与组件概述
  • (转)http-server应用
  • (转载)深入super,看Python如何解决钻石继承难题
  • .htaccess配置重写url引擎
  • .net 设置默认首页
  • .NET/C# 获取一个正在运行的进程的命令行参数
  • .NET4.0并行计算技术基础(1)
  • .Net通用分页类(存储过程分页版,可以选择页码的显示样式,且有中英选择)
  • .pings勒索病毒的威胁:如何应对.pings勒索病毒的突袭?
  • .project文件
  • [ linux ] linux 命令英文全称及解释