当前位置: 首页 > news >正文

Flink 对接 Hudi 查询数据,java代码编写

1.pom.xml文件需要引入下面包

    <properties><flink.version>1.15.4</flink.version><hudi.version>0.13.1</hudi.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- hudi --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink1.15-bundle</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-client</artifactId><version>0.14.1</version></dependency></dependencies>

2.java代码如下

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;import java.util.List;public class HudiTest {@Testpublic void test01() throws Exception {StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment  tableEnv = StreamTableEnvironment.create(streamEnv);tableEnv.executeSql("CREATE TABLE IF NOT EXISTS table_name (\n" +"    resume_id bigint,\n" +"    update_by STRING,\n" +"    gmt_modified Timestamp ,\n" +"    del_flag int, \n" +"    invite_operation_date STRING,\n" +"    induct_date date ,\n" +"    leave_date date ,\n" +"    PRIMARY KEY (resume_id) NOT ENFORCED\n" +") with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://177.17.17.200:8088/user/hudi/resume_demo/hr_resume',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");Table table = tableEnv.sqlQuery("select * from table_name");// 启动 Flink 作业DataStream<Row> dataStream = tableEnv.toDataStream(table);streamEnv.execute();List<Row> rows = dataStream.executeAndCollect(100);//收集100条数据for (Row row : rows) {StringBuilder rowString = new StringBuilder();for (int i = 0; i < row.getArity(); i++) {rowString.append(row.getField(i)).append("|");}System.out.println(rowString.toString());}}
}

3.说明

经过测试,这里的sql中,支持下面的一些sql
简单where条件
limit 10 offset 0
不支持
order by

相关文章:

  • Windows驱动开发系列文章一
  • Deepin Linux 深度 V23 beige 官方源及换镜像源方法。
  • 【MySQL精通之路】优化
  • 一千题,No.0037(组个最小数)
  • 2021职称继续教育--中国共产党的光辉历程及其经验
  • 服务器的远程桌面无法连接,服务器远程桌面无法连接问题处理教程
  • nginx配置文件
  • 分布式事务-TCC
  • 锐捷网络与您相约第七届数字中国建设峰会 共话数字未来
  • RestTemplet 自定义消息转换器总结
  • 香港Web3媒体:Techub News
  • 动手学深度学习(Pytorch版)代码实践-深度学习基础-01基础函数的使用
  • 价值飙升30%,AI PC拉动半导体出货潮
  • 今日好料推荐(大数据湖体系规划)
  • Codeforces Round 947 (Div. 1 + Div. 2) D. Paint the Tree 题解 DFS
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • 【跃迁之路】【444天】程序员高效学习方法论探索系列(实验阶段201-2018.04.25)...
  • HashMap ConcurrentHashMap
  • JavaScript设计模式与开发实践系列之策略模式
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • Js基础知识(四) - js运行原理与机制
  • Linux链接文件
  • nodejs调试方法
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • python 学习笔记 - Queue Pipes,进程间通讯
  • SpiderData 2019年2月16日 DApp数据排行榜
  • unity如何实现一个固定宽度的orthagraphic相机
  • 代理模式
  • 基于webpack 的 vue 多页架构
  • 学习笔记DL002:AI、机器学习、表示学习、深度学习,第一次大衰退
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • FaaS 的简单实践
  • 阿里云ACE认证之理解CDN技术
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • 积累各种好的链接
  • ​总结MySQL 的一些知识点:MySQL 选择数据库​
  • (2)空速传感器
  • (附源码)计算机毕业设计ssm高校《大学语文》课程作业在线管理系统
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (黑马点评)二、短信登录功能实现
  • (回溯) LeetCode 46. 全排列
  • (力扣题库)跳跃游戏II(c++)
  • (十八)三元表达式和列表解析
  • (原)Matlab的svmtrain和svmclassify
  • (终章)[图像识别]13.OpenCV案例 自定义训练集分类器物体检测
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • .bat批处理(三):变量声明、设置、拼接、截取
  • .describe() python_Python-Win32com-Excel
  • .NET 反射的使用
  • .NET 回调、接口回调、 委托
  • .NET:自动将请求参数绑定到ASPX、ASHX和MVC(菜鸟必看)
  • .NET程序集编辑器/调试器 dnSpy 使用介绍
  • @Autowired自动装配
  • @ModelAttribute使用详解