当前位置: 首页 > news >正文

Flink 流转表,表转流,watermark设置

流转表

首先创建一个流


@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Nan {private String xing;private String name;private Long ts;
}StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);DataStreamSource<String> sourceNan = env.socketTextStream("hdp01", 1111);
DataStreamSource<String> sourceNv = env.socketTextStream("hdp01", 2222);System.setProperty("java.net.preferIPv4Stack", "true");SingleOutputStreamOperator<Nan> beanNan = sourceNan.map(new MapFunction<String, Nan>() {@Overridepublic Nan map(String s) throws Exception {try {String[] split = s.split(",");return new Nan(split[0].substring(0, 1), split[1], Long.parseLong(split[2]));} catch (Exception e) {return null;}}
}).filter(Objects::nonNull).assignTimestampsAndWatermarks(WatermarkStrategy.<Nan>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Nan>() {@Overridepublic long extractTimestamp(Nan nan, long l) {return nan.getTs();}
})).returns(TypeInformation.of(Nan.class));

创建watermark

流转表的时候有一个点要注意,watermark必须要重新指定,否则会丢失,常用的方式如下
创建watermark,有两步,
第一步:必须要依据一个字段来创建watermark,这个字段必须是timestamp_ltz(3)的类型。
第二步:根据时间戳字段生成watermark
时间戳字段有两种获取方式
1、根据一个bigint字段进行转换
2、在流转表,且流上设置了watermark的情况下,根据内置属性rowtime创建,这个rowtime是流转表时暴露出来的事件时间
watermark也有两种获取方式
1、根据时间戳字段重新创建watermark
2、在流转表,且流上设置了watermark的情况下,沿用流上的watermark

下面是两种场景,只要记住第一种就行了,其实第二种没什么用。

1、 根据一个bigint字段进行创建时间戳字段,然后重新创建watermark

tenv.createTemporaryView("nan", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByExpression("rt", "to_timestamp_ltz(ts,3)") // 根据一个bigint字段进行转换.watermark("rt", "rt - interval '1' second ") // 重新创建watermark.build());

2、根据内置属性rowtime创建时间戳字段,然后沿用流上的watermark

tenv.createTemporaryView("nan1", beanNan, Schema.newBuilder().column("xing", DataTypes.STRING()).column("name", DataTypes.STRING()).column("ts", DataTypes.BIGINT()).columnByMetadata("rt", DataTypes.TIMESTAMP_LTZ(3),"rowtime") // 根据内置属性rowtime创建.watermark("rt", "source_watermark()") // 沿用流的watermark “source_watermark 等于 rt - interval '1' second”.build());
TableResult tableResult = tenv.executeSql("select *,current_watermark(rt) from nan");
tableResult.print();

表转流

首先创建一个表

 String source = "CREATE TABLE person (  " +"  xing STRING,  " +"  name STRING,  " +"  ts BIGINT,  " +"  rt as to_timestamp_ltz(ts,3),  " +"  watermark for rt as rt - interval '1' second  " +") WITH (  " +" 'connector' = 'kafka',  " +" 'topic' = 'flink_topic',  " +" 'properties.bootstrap.servers' = '172.16.10.139:9092',  " +" 'properties.group.id' = 'testGroup',  " +" 'scan.startup.mode' = 'latest-offset', " +" 'format' = 'json'  " +")";tenv.executeSql(source);

创建watermark

表转流,可以沿用流上的watermark,不需要额外声明

DataStream<Row> dataStream = tenv.toDataStream(table);dataStream.process(new ProcessFunction<Row, Object>() {@Overridepublic void processElement(Row value, ProcessFunction<Row, Object>.Context ctx, Collector<Object> out) throws Exception {System.out.println(value+" watermark=>"+ctx.timerService().currentWatermark());}
});
env.execute();

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • pytest参数化多种用法总结
  • Python在QtSide6(PyQt)上加载网页使用OpenCV进行图像处理
  • PyQtGraph库的基本使用
  • 集合及数据结构第九节————树和二叉树
  • SSL/TLS协议信息泄露漏洞修复
  • C++初学(14)
  • 回顾前面刷过的算法(8)
  • Java-希尔排序算法介绍、应用场景和示例代码
  • spingboot实现常规增删改查
  • erlang学习:gen_server书上案例22.6练习题4
  • jmeter通过参数文件、循环组件实现多账号登陆
  • npm install 报错解决记录
  • Golang 使用redis stream实现一个实时推送功能
  • Groupings sets详解
  • 东方银行--用 MinIO 和 Dremio 替代 Hadoop
  • co.js - 让异步代码同步化
  • CSS居中完全指南——构建CSS居中决策树
  • express + mock 让前后台并行开发
  • HTML中设置input等文本框为不可操作
  • HTTP--网络协议分层,http历史(二)
  • JAVA并发编程--1.基础概念
  • Python - 闭包Closure
  • Python利用正则抓取网页内容保存到本地
  • uni-app项目数字滚动
  • Vue.js 移动端适配之 vw 解决方案
  • 构建工具 - 收藏集 - 掘金
  • 后端_ThinkPHP5
  • 利用阿里云 OSS 搭建私有 Docker 仓库
  • 如何打造100亿SDK累计覆盖量的大数据系统
  • 试着探索高并发下的系统架构面貌
  • 阿里云重庆大学大数据训练营落地分享
  • 数据库巡检项
  • 完善智慧办公建设,小熊U租获京东数千万元A+轮融资 ...
  • ​虚拟化系列介绍(十)
  • (1/2)敏捷实践指南 Agile Practice Guide ([美] Project Management institute 著)
  • (C语言)求出1,2,5三个数不同个数组合为100的组合个数
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (分布式缓存)Redis持久化
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (十一)图像的罗伯特梯度锐化
  • (限时免费)震惊!流落人间的haproxy宝典被找到了!一切玄妙尽在此处!
  • (译)计算距离、方位和更多经纬度之间的点
  • (转)Linux下编译安装log4cxx
  • .net core webapi 部署iis_一键部署VS插件:让.NET开发者更幸福
  • .NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库
  • .NET Framework 3.5安装教程
  • .net MVC中使用angularJs刷新页面数据列表
  • .NET Reactor简单使用教程
  • .Net Remoting(分离服务程序实现) - Part.3
  • .NET面试题(二)
  • .NET应用UI框架DevExpress XAF v24.1 - 可用性进一步增强
  • .NET与 java通用的3DES加密解密方法
  • @Transactional 参数详解
  • [ 隧道技术 ] 反弹shell的集中常见方式(二)bash反弹shell
  • [.NET 即时通信SignalR] 认识SignalR (一)