当前位置: 首页 > news >正文

大数据(9e)Flink侧输出流

文章目录

  • 概述
  • 环境
  • OutputTag介绍
    • 实现分流
    • 处理迟到数据
  • 处理关窗之后到达的数据

概述

窗口允许迟到的数据,但仍有数据在关窗后到达
Flink提供了侧输出流(sideOutput)来处理关窗之后到达的数据

环境

WIN10+IDEA+JDK1.8+FLINK1.14

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.14.6</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

OutputTag介绍

OutputTag是一种命名标记,用于标记算子中的侧输出

实现分流

ctx.output:向由OutputTag标识的侧输出发出记录

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Hi {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //定义输出标签
        OutputTag<Integer> o1 = new OutputTag<Integer>("除以3余1") {};
        OutputTag<Integer> o2 = new OutputTag<Integer>("除以3余2") {};
        //创建流
        SingleOutputStreamOperator<Integer> d = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        //处理
        SingleOutputStreamOperator<Integer> s = d.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) {
                //分流
                if (value % 3 == 2) {
                    ctx.output(o2, value); //ctx.output:向由OutputTag标识的侧输出发出记录
                } else if (value % 3 == 1) {
                    ctx.output(o1, value); //ctx.output:向由OutputTag标识的侧输出发出记录
                } else {
                    out.collect(value);
                }
            }
        });
        //输出
        s.print("被3整除");
        s.getSideOutput(o1).print(o1.getId());
        s.getSideOutput(o2).print(o2.getId());
        //环境执行
        env.execute();
    }
}
测试结果
被3整除> 0
除以3余1> 1
除以3余2> 2
被3整除> 3
除以3余1> 4
除以3余2> 5
被3整除> 6
除以3余1> 7
除以3余2> 8
被3整除> 9

处理迟到数据

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Hi {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //定义测输出流的输出标签
        OutputTag<String> outputTag = new OutputTag<String>("迟到标签") {};
        //创建流,添加自定义数据源
        SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) {
                //发送水位线
                ctx.emitWatermark(new Watermark(1999L));
                //发送2条数据,其中1条迟到
                ctx.collectWithTimestamp("1998", 1998L);
                ctx.collectWithTimestamp("2000", 2000L);
            }
            @Override
            public void cancel() {}
        });
        //处理
        SingleOutputStreamOperator<String> s = d.process(new ProcessFunction<String, String>() {
            @Override
            public void processElement(String value, Context ctx, Collector<String> out) {
                //获取水位线
                long watermark = ctx.timerService().currentWatermark();
                //判断是否迟到
                if (ctx.timestamp() > watermark) {
                    //冇迟到
                    out.collect(value);
                } else {
                    //迟到:向outputTag发送数据
                    ctx.output(outputTag, value);
                }
            }
        });
        //输出
        s.print("主流输出");
        s.getSideOutput(outputTag).print("侧输出");
        //环境执行
        env.execute();
    }
}
发送1999水位线,然后发送两条数据,测试结果如下
侧输出> 1998
主流输出> 2000

处理关窗之后到达的数据

开窗后.sideOutputLateData(outputTag)

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

public class Hi {
    public static void main(String[] args) throws Exception {
        //创建执行环境,设置并行度
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //定义测输出流的输出标签
        OutputTag<String> outputTag = new OutputTag<String>("迟到标签") {};
        //创建流,添加自定义数据源
        SingleOutputStreamOperator<String> d = env.addSource(new SourceFunction<String>() {
            @Override
            public void run(SourceContext<String> ctx) {
                ctx.collectWithTimestamp("a", 4000L);
                ctx.collectWithTimestamp("b", 5000L);
                ctx.emitWatermark(new Watermark(5999L)); //发送水位线,触发【3000~5999】的窗口关闭
                ctx.collectWithTimestamp("c", 5000L);
                ctx.collectWithTimestamp("d", 5000L);
                ctx.collectWithTimestamp("e", 6000L);
                ctx.collectWithTimestamp("f", 7000L);
            }
            @Override
            public void cancel() {}
        });
        //处理
        SingleOutputStreamOperator<String> s = d
                //事件时间滚动窗口
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(3L)))
                //侧输出
                .sideOutputLateData(outputTag)
                //拼接字符串
                .reduce((a, b) -> a + "," + b);
        //输出
        s.print("主流输出");
        s.getSideOutput(outputTag).print("侧输出");
        //环境执行
        env.execute();
    }
}
中途发送水位线,触发关窗,测试结果如下
主流输出> a,b
侧输出> c
侧输出> d
主流输出> e,f

相关文章:

  • GAN生成漫画脸
  • 【Linux】权限管理
  • 图数据结构之字典实现(Python版)
  • RK3588移植-opencv交叉编译aarch64
  • [世界杯]根据赔率计算各种组合可能性与赔率
  • 【linux】进程控制详述
  • 【数据结构】二叉树详解(下篇)
  • 世界杯小组赛频繁爆冷?这或许是强队的谋略 一分钟带你了解2022卡塔尔世界杯赛制
  • Linux 普通用户执行 docker 命令
  • 纯代码方式杀死指定进程名的进程(LinuxWindows)
  • Chapter9.4:线性系统的状态空间分析与综合(下)
  • 【Pandas数据处理100例】(九十九):Pandas使用at_time()筛选出特定时间点的数据行
  • 线程池ThreadPoolExecutor
  • 图神经网络
  • Allegro调丝印规范操作指导
  • 【399天】跃迁之路——程序员高效学习方法论探索系列(实验阶段156-2018.03.11)...
  • Apache Pulsar 2.1 重磅发布
  • C++类中的特殊成员函数
  • JAVA_NIO系列——Channel和Buffer详解
  • Map集合、散列表、红黑树介绍
  • Netty+SpringBoot+FastDFS+Html5实现聊天App(六)
  • Node项目之评分系统(二)- 数据库设计
  • Otto开发初探——微服务依赖管理新利器
  • overflow: hidden IE7无效
  • tab.js分享及浏览器兼容性问题汇总
  • windows下mongoDB的环境配置
  • 基于axios的vue插件,让http请求更简单
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 前端
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 如何利用MongoDB打造TOP榜小程序
  • 设计模式走一遍---观察者模式
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • 整理一些计算机基础知识!
  • ​linux启动进程的方式
  • #LLM入门|Prompt#1.7_文本拓展_Expanding
  • #Lua:Lua调用C++生成的DLL库
  • $(selector).each()和$.each()的区别
  • (附源码)node.js知识分享网站 毕业设计 202038
  • .[hudsonL@cock.li].mkp勒索病毒数据怎么处理|数据解密恢复
  • .Mobi域名介绍
  • .MyFile@waifu.club.wis.mkp勒索病毒数据怎么处理|数据解密恢复
  • .net 前台table如何加一列下拉框_如何用Word编辑参考文献
  • .NET 药厂业务系统 CPU爆高分析
  • .NET8.0 AOT 经验分享 FreeSql/FreeRedis/FreeScheduler 均已通过测试
  • @cacheable 是否缓存成功_Spring Cache缓存注解
  • @Transactional 详解
  • @四年级家长,这条香港优才计划+华侨生联考捷径,一定要看!
  • [ vulhub漏洞复现篇 ] ThinkPHP 5.0.23-Rce
  • [2018][note]用于超快偏振开关和动态光束分裂的all-optical有源THz超表——
  • [2669]2-2 Time类的定义
  • [APUE]进程关系(下)
  • [BZOJ1010] [HNOI2008] 玩具装箱toy (斜率优化)
  • [BZOJ5125]小Q的书架(决策单调性+分治DP+树状数组)
  • [CDOJ 1343] 卿学姐失恋了