当前位置: 首页 > news >正文

Java版Flink使用指南——自定义无界流生成器

大纲

  • 新建工程
    • 自定义无界流
  • 使用
  • 打包、提交、运行
  • 工程代码

在《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们让外部组件RabbitMQ充当了无界流的数据源,使得Flink进行了流式处理。在《Java版Flink使用指南——将消息写入到RabbitMQ的队列中》一文中,我们使用了Flink自带的数据生成器,生成了有限数据,从而让Flink以批处理形式运行了该任务。
本文我们将自定义一个无界流生成器,以方便后续测试。

新建工程

我们新建一个名字叫UnboundedStreamGenerator的工程。
Archetype:org.apache.flink:flink-quickstart-java
版本:1.19.1
在这里插入图片描述

自定义无界流

新建src/main/java/org/example/generator/UnBoundedStreamGenerator.java
然后UnBoundedStreamGenerator实现RichSourceFunction接口

public abstract class RichSourceFunction<OUT> extends AbstractRichFunctionimplements SourceFunction<OUT> {private static final long serialVersionUID = 1L;
}

主要实现SourceFunction接口的run和cancel方法。run方法用来获取获取,cancel方法用于终止任务。

package org.example.generator;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;public class UnBoundedStreamGenerator extends RichSourceFunction<Long> {private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<Long> ctx) throws Exception {long count = 0L;while (isRunning) {Thread.sleep(1000); // Simulate delayctx.collect(count++); // Emit data}}@Overridepublic void cancel() {isRunning = false;System.out.println("UnBoundedStreamGenerator canceled");}
}

在run方法中,我们每隔一秒产生一条数据,且这个数字自增。

使用

我们使用addSource方法,将该无界流生成器添加成数据源。然后将其输出到日志。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.example.generator.UnBoundedStreamGenerator;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new UnBoundedStreamGenerator()).name("Custom Stream Source").setParallelism(1) .print(); // For demonstration, print the stream to stdout// Execute program, beginning computation.env.execute("Flink Java API Skeleton");}
}

打包、提交、运行

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
使用下面命令查看日志输出

tail -f log/*

在这里插入图片描述
然后我们在后台点击Cancel Job
在这里插入图片描述
可以看到输出
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【爬虫】解析爬取的数据
  • [1]从概念到实践:电商智能助手在AI Agent技术驱动下的落地实战案例深度剖析(AI Agent技术打造个性化、智能化的用户助手)
  • 基于React 实现井字棋
  • vue vite+three在线编辑模型导入导出
  • Emacs有什么优点,用Emacs写程序真的比IDE更方便吗?
  • S5730 OSPF: 配置 OSPF 进程和区域
  • 硬盘模式vmd怎么改ahci_电脑vmd改ahci模式详细步骤
  • Visual Studio编译优化选项
  • PPTP、L2TP、IPSec、IPS 有什么区别?
  • 星网安全产品线成立 引领卫星互联网解决方案创新
  • 美团到家平台业务探索
  • [终端安全]-8 隐私保护和隐私计算技术
  • Apache Seata Mac下的Seata Demo环境搭建
  • 华为如何做成数字化转型?
  • 设计模式使用场景实现示例及优缺点(结构型模式——享元模式)
  • Apache Spark Streaming 使用实例
  • iOS仿今日头条、壁纸应用、筛选分类、三方微博、颜色填充等源码
  • JAVA SE 6 GC调优笔记
  • Javascript编码规范
  • JavaScript设计模式之工厂模式
  • RxJS 实现摩斯密码(Morse) 【内附脑图】
  • Terraform入门 - 3. 变更基础设施
  • 对话 CTO〡听神策数据 CTO 曹犟描绘数据分析行业的无限可能
  • 如何设计一个微型分布式架构?
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 入口文件开始,分析Vue源码实现
  • 使用 @font-face
  • 数据库写操作弃用“SELECT ... FOR UPDATE”解决方案
  • 责任链模式的两种实现
  • 2017年360最后一道编程题
  • Python 之网络式编程
  • 阿里云ACE认证学习知识点梳理
  • 通过调用文摘列表API获取文摘
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • #微信小程序:微信小程序常见的配置传旨
  • (11)MSP430F5529 定时器B
  • (2)Java 简介
  • (4) PIVOT 和 UPIVOT 的使用
  • (day 12)JavaScript学习笔记(数组3)
  • (floyd+补集) poj 3275
  • (html5)在移动端input输入搜索项后 输入法下面为什么不想百度那样出现前往? 而我的出现的是换行...
  • (笔试题)合法字符串
  • (纯JS)图片裁剪
  • (个人笔记质量不佳)SQL 左连接、右连接、内连接的区别
  • (四)【Jmeter】 JMeter的界面布局与组件概述
  • (一)【Jmeter】JDK及Jmeter的安装部署及简单配置
  • (转)EOS中账户、钱包和密钥的关系
  • .MSSQLSERVER 导入导出 命令集--堪称经典,值得借鉴!
  • .net 4.0 A potentially dangerous Request.Form value was detected from the client 的解决方案
  • .Net Core与存储过程(一)
  • .NET 自定义中间件 判断是否存在 AllowAnonymousAttribute 特性 来判断是否需要身份验证
  • .NET/C# 使用 #if 和 Conditional 特性来按条件编译代码的不同原理和适用场景
  • .NET程序员迈向卓越的必由之路
  • .NET和.COM和.CN域名区别