当前位置: 首页 > news >正文

Java版Flink使用指南——将消息写入到RabbitMQ的队列中

大纲

  • 新建工程
    • 新增依赖
  • 编码
    • 自动产生数据
    • 写入RabbitMQ
  • 测试
  • 工程代码

在 《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》一文中,我们介绍了如何使用Java在Flink中读取RabbitMQ中的数据,并将其写入日志中。本文将通过代码产生一些数据,然后将它们写入到另外一个RabbitMQ队列中。

新建工程

我们在IntelliJ中新建一个工程SinkToRabbitMQ。
Archetype填入:org.apache.flink:flink-quickstart-java
版本填入与Flink的版本:1.19.1
在这里插入图片描述

新增依赖

在pom.xml中新增RabbitMQ连接器

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq</artifactId><version>3.0.1-1.17</version></dependency>

编码

自动产生数据

这段代码将产生两个字符串数据,后续这些数据会被写入到RabbitMQ的队列中。

List<String> data = new ArrayList<>();
data.add("Hello, World!");
data.add("Hello, Flink!");
DataStream<String> stream = env.fromCollection(data);

写入RabbitMQ

不同于《Java版Flink使用指南——从RabbitMQ中队列中接入消息流》中创建RMQSource用来接收RabbitMQ队列中数据,这次我们创建RMQSink用来发布数据。

String sinkQueueName = "data.to.rbtmq"; // name of the queue to send data to
String host = "172.21.112.140"; // IP of the rabbitmq server
int port = 5672;
String username = "admin";
String password = "fangliang";
String virtualHost = "/";
int parallelism = 1;RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder().setHost(host).setPort(port).setUserName(username).setPassword(password).setVirtualHost(virtualHost).build();RMQSink<String> stringRMQSink = new RMQSink<>(rmqConnectionConfig, sinkQueueName, new SimpleStringSchema());
stream.addSink(stringRMQSink).name(username + "'s sink to " + sinkQueueName).setParallelism(parallelism);	

测试

打包、提交并运行任务
在这里插入图片描述
然后在RabbitMQ的后台可以看到收到两条消息
在这里插入图片描述
其内容也是我们之前在代码中生成的内容
在这里插入图片描述

工程代码

https://github.com/f304646673/FlinkDemo

相关文章:

  • Spring——自动装配Bean
  • 编程语言一般学几种语言:探索编程语言的广度与深度
  • 商家为何疯狂送钱?用户如何省钱?一篇文章带你了解!
  • Qt图形编辑类使用总结
  • # 数仓建模:如何构建主题宽表模型?
  • 2.4G芯片开发的遥控玩具方案介绍 东莞酷得
  • Ubuntu中sqlite的使用方法
  • vue3+vite搭建第一个cesium项目详细步骤及环境配置(附源码)
  • Vue3 + Vite项目使用SVG图片
  • OLED柔性显示屏的金线封装胶
  • 【开源合规】开源许可证风险场景详细解读
  • 响应式设计的双璧:WebKit 支持 CSS Flexbox 和 Grid 布局深度解析
  • 强引用?软引用?弱引用?虚引用?一文带你彻底搞懂!!
  • 演唱会售票系统(Springboot+MySQL+Mybatis+BootStrap)
  • flask使用定时任务flask_apscheduler(APScheduler)
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • “Material Design”设计规范在 ComponentOne For WinForm 的全新尝试!
  • 【JavaScript】通过闭包创建具有私有属性的实例对象
  • Android Volley源码解析
  • Logstash 参考指南(目录)
  • maya建模与骨骼动画快速实现人工鱼
  • miaov-React 最佳入门
  • Protobuf3语言指南
  • select2 取值 遍历 设置默认值
  • socket.io+express实现聊天室的思考(三)
  • spring boot下thymeleaf全局静态变量配置
  • Terraform入门 - 3. 变更基础设施
  • WePY 在小程序性能调优上做出的探究
  • Zsh 开发指南(第十四篇 文件读写)
  • 从零开始的无人驾驶 1
  • 从输入URL到页面加载发生了什么
  • 关于for循环的简单归纳
  • 前端存储 - localStorage
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • Spring第一个helloWorld
  • ​探讨元宇宙和VR虚拟现实之间的区别​
  • # 消息中间件 RocketMQ 高级功能和源码分析(七)
  • #HarmonyOS:基础语法
  • $LayoutParams cannot be cast to android.widget.RelativeLayout$LayoutParams
  • (07)Hive——窗口函数详解
  • (2)空速传感器
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (附源码)ssm考试题库管理系统 毕业设计 069043
  • (附源码)ssm失物招领系统 毕业设计 182317
  • (含笔试题)深度解析数据在内存中的存储
  • (十六)Flask之蓝图
  • (未解决)jmeter报错之“请在微信客户端打开链接”
  • (一)搭建springboot+vue前后端分离项目--前端vue搭建
  • (转) RFS+AutoItLibrary测试web对话框
  • ./configure,make,make install的作用(转)
  • .net 反编译_.net反编译的相关问题
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)
  • .sys文件乱码_python vscode输出乱码
  • @component注解的分类
  • [20171101]rman to destination.txt