当前位置: 首页 > news >正文

Java版Flink使用指南——从RabbitMQ中队列中接入消息流

大纲

  • 创建RabbitMQ队列
  • 新建工程
    • 新增依赖
    • 编码
      • 设置数据源配置
      • 读取、处理数据
      • 完整代码
    • 打包、上传和运行任务
    • 测试
  • 工程代码

在《Java版Flink使用指南——安装Flink和使用IntelliJ制作任务包》一文中,我们完成了第一个小型Demo的编写。例子中的数据是代码预先指定的。而现实中,数据往往来源于外部。本文我们将尝试Flink从RabbitMQ中读取数据,然后输出到日志中。
关于RabbitMQ的知识可以参阅《RabbitMQ实践》。

创建RabbitMQ队列

我们创建一个Classic队列data.from.rbtmq。注意要选择Durable类型,这是后续用的默认连接器的限制。
具体方法见《RabbitMQ实践——在管理后台测试消息收发功能》。
在这里插入图片描述

后续我们将在后台通过默认交换器,给这个队列新增消息。

新建工程

我们在IntelliJ中新建一个工程DataFromRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java。
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version>
</dependency>

编码

设置数据源配置

String queueName = "data.from.rbtmq";
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;// create a RabbitMQ source
RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());

读取、处理数据

下面代码通过addSource添加RabbitMQ数据源。注意,不能使用fromSource方法,是因为RMQSource没有实现SourceFunction方法。

final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);

完整代码

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.example;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/*** Skeleton for a Flink DataStream Job.** <p>For a tutorial how to write a Flink application, check the* tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.** <p>To package your application into a JAR file for execution, run* 'mvn clean package' on the command line.** <p>If you change the name of the main class (with the public static void main(String[] args))* method, change the respective entry in the POM.xml file (simply search for 'mainClass').*/
public class DataStreamJob {public static void main(String[] args) throws Exception {// Sets up the execution environment, which is the main entry point// to building Flink applications.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String queueName = "data.from.rbtmq";String host = "172.21.112.140"; // IP of the rabbitmq serverint port = 5672;String username = "admin";String password = "fangliang";String virtualHost = "/";int parallelism = 1;// create a RabbitMQ sourceRMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSource<String> rmqSource = new RMQSource<>(rmqConnectionConfig, queueName, true, new SimpleStringSchema());final DataStream<String> stream = env.addSource(rmqSource).name(username + "'s source from " + queueName).setParallelism(parallelism);stream.print().name(username + "'s data from " + queueName);env.execute("Flink Java API Skeleton");}
}

打包、上传和运行任务

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

测试

在RabbitMQ后台的默认交换器中,发布一条消息到data.from.rbtmq
在这里插入图片描述
然后使用下面指令可以看到Flink读取到消息并执行了print方法

tail log/flink-*-taskexecutor-*.out

==> flink-fangliang-taskexecutor-0-fangliang.out <==
data from http://172.21.112.140:15672/#/exchanges/%2F/amq.default

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 卷积神经网络——LeNet——FashionMNIST
  • tensorflow之欠拟合与过拟合,正则化缓解
  • Google Hacking
  • server nat表和会话表的作用及NAT地址转换详细
  • Linux 一键部署Mysql 8.4.1 LTS
  • 深度学习Day-24:ResNeXt-50算法思考
  • Kafka学习
  • PS怎么给图片打马赛克
  • 从 ArcMap 迁移到 ArcGIS Pro
  • linux创建定时任务
  • react启用mobx @decorators装饰器语法
  • nigix的下载使用
  • LeetCode K次取反后最大化的数组和(贪心算法)
  • 【深度学习基础】MAC pycharm 专业版安装与激活
  • 【银河麒麟服务器操作系统】系统夯死分析及处理建议
  • Angular6错误 Service: No provider for Renderer2
  • CAP理论的例子讲解
  • CentOS6 编译安装 redis-3.2.3
  • java2019面试题北京
  • Java读取Properties文件的六种方法
  • Ruby 2.x 源代码分析:扩展 概述
  • vue从入门到进阶:计算属性computed与侦听器watch(三)
  • 闭包--闭包之tab栏切换(四)
  • 测试开发系类之接口自动化测试
  • 对超线程几个不同角度的解释
  • 番外篇1:在Windows环境下安装JDK
  • 优秀架构师必须掌握的架构思维
  • 原创:新手布局福音!微信小程序使用flex的一些基础样式属性(一)
  • 扩展资源服务器解决oauth2 性能瓶颈
  • # Java NIO(一)FileChannel
  • # windows 运行框输入mrt提示错误:Windows 找不到文件‘mrt‘。请确定文件名是否正确后,再试一次
  • $().each和$.each的区别
  • $(this) 和 this 关键字在 jQuery 中有何不同?
  • (env: Windows,mp,1.06.2308310; lib: 3.2.4) uniapp微信小程序
  • (八)光盘的挂载与解挂、挂载CentOS镜像、rpm安装软件详细学习笔记
  • (不用互三)AI绘画工具应该如何选择
  • (草履虫都可以看懂的)PyQt子窗口向主窗口传递参数,主窗口接收子窗口信号、参数。
  • (纯JS)图片裁剪
  • (分布式缓存)Redis分片集群
  • (九)信息融合方式简介
  • (六)Hibernate的二级缓存
  • (三十)Flask之wtforms库【剖析源码上篇】
  • (算法)求1到1亿间的质数或素数
  • (学习日记)2024.04.10:UCOSIII第三十八节:事件实验
  • (一)硬件制作--从零开始自制linux掌上电脑(F1C200S) <嵌入式项目>
  • (原创)可支持最大高度的NestedScrollView
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法
  • .NET I/O 学习笔记:对文件和目录进行解压缩操作
  • .Net Web项目创建比较不错的参考文章
  • .NET 动态调用WebService + WSE + UsernameToken
  • .net6 core Worker Service项目,使用Exchange Web Services (EWS) 分页获取电子邮件收件箱列表,邮件信息字段
  • .NET程序集编辑器/调试器 dnSpy 使用介绍
  • [Angular 基础] - 自定义指令,深入学习 directive
  • [AR]Vumark(下一代条形码)
  • [bzoj1038][ZJOI2008]瞭望塔