当前位置: 首页 > news >正文

【SpringCloud】RabbitMQ——五种方式实现发送和接收消息

SpringAMQP

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

1.Basic Queue 简单队列模型

1.1引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.2消息发送

在publisher服务的application.yml中添加配置

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: haojiale # 用户名password: 123321 # 密码

在publisher服务中编写测试类,并利用RabbitTemplate实现消息发送

package com.example.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

1.3消息接收

在consumer服务的application.yml中添加的配置内容同消息发送一样

在consumer服务中编写监听器,消费消息

package com.example.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

2.WorkQueue

WorkQueue也被称为任务模型。就是让多个消费者绑定到一个队列,共同消费队列中的消息
在这里插入图片描述

当消息处理比较耗时的时候,可能生产消息的速度远远大于消息的消费速度,消息就会堆积越来越多,无法及时处理,使用work模型,多个消费者共同处理消息提升消费速度

2.1消息发送

循环发送消息,模拟大量消息堆积现象

在publisher服务中添加一个测试方法:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

2.2消息接收

模拟多个消费者绑定同一个队列,在consumer服务中添加2个新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

执行测试方法结果是消费者1很快处理完了自己的25条消息,消费者2却在缓慢处理自己的25条消息,也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力是存在问题的。

解决

修改consumer服务的application.yml文件配置,通过设置prefetch来控制消费者预取的消息数量

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.发布/订阅

发布订阅的模型图如图:
在这里插入图片描述

Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

3.1 Fanout

在这里插入图片描述

3.1.1声明队列和交换机

在这里插入图片描述

在consumer服务中声明队列和交换机

package com.example.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("example.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

3.1.2消息发送

在publisher服务中编写测试方法发送消息

@Test
public void testFanoutExchange() {// 队列名称String exchangeName = "example.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.1.3消息接收

在consumer服务中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

3.1.4总结

交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败情况下消息丢失
  • FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的Bean是什么?

  • Queue
  • FanoutExchange
  • Binding

3.2 Direct

在这里插入图片描述

在Direct模型下:

  • 队列与交换机的绑定不能是任意绑定,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不会再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RountingKey与消息的RoutingKey完全一致,才会收到消息

3.2.1消息发送

在这里插入图片描述

在publisher服务中添加测试方法:

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.2.2消息接收——基于注解声明交换机和队列

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

3.2.3总结

Direct交换机与Fanout交换机的区别?

  • Fanout交换机将消息路由给每一个与之绑定的队列
  • Direct交换机根据RoutingKey判断路由给哪个队列
  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

3.3 Topic

Topic类型的Exchang可以让队列在绑定Routing key的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词
在这里插入图片描述

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

图示:

3.3.1消息发送

在这里插入图片描述

在publisher服务的SpringAmqpTest类中添加测试方法:

@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.3.2消息接收

在consumer服务中添加方法:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Java后端面试题(redis相关2)(day8)
  • Linux---系统安全
  • 【博主推荐】HTML5新闻,博客,官网网站源码文章瀑布流+详情页面
  • Kotlin IntelliJ IDEA 环境搭建
  • Unity | AmplifyShaderEditor插件基础(第二集:模版说明)
  • VSCode 都有哪些值得推荐的插件
  • RxJava在Android中的应用
  • dataX从orcal数据库抽取数据插入gbase 8a数据库 与 OceanBase数据库
  • MySQL-MVCC举例说明
  • 单库平滑迁移至分库分表架构方案
  • 数字营销中的人工智能 --- 完整指南 (By Hubspot)
  • 从0开始搭建vue + flask 旅游景点数据分析系统(九):旅游景点管理之增删改查
  • @Async 异步注解使用
  • 基于YOLOv10深度学习的草莓成熟度检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、人工智能
  • C# VideoCapture 多路视频播放
  • 「前端」从UglifyJSPlugin强制开启css压缩探究webpack插件运行机制
  • CSS 专业技巧
  • electron原来这么简单----打包你的react、VUE桌面应用程序
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • Java比较器对数组,集合排序
  • Java教程_软件开发基础
  • Vue实战(四)登录/注册页的实现
  • vue自定义指令实现v-tap插件
  • 翻译 | 老司机带你秒懂内存管理 - 第一部(共三部)
  • 机器人定位导航技术 激光SLAM与视觉SLAM谁更胜一筹?
  • 巧用 TypeScript (一)
  • 如何合理的规划jvm性能调优
  • 算法系列——算法入门之递归分而治之思想的实现
  • 写代码的正确姿势
  • 云大使推广中的常见热门问题
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​你们这样子,耽误我的工作进度怎么办?
  • #进阶:轻量级ORM框架Dapper的使用教程与原理详解
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • #如何使用 Qt 5.6 在 Android 上启用 NFC
  • #微信小程序(布局、渲染层基础知识)
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (delphi11最新学习资料) Object Pascal 学习笔记---第13章第6节 (嵌套的Finally代码块)
  • (echarts)echarts使用时重新加载数据之前的数据存留在图上的问题
  • (附源码)php新闻发布平台 毕业设计 141646
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (附源码)springboot炼糖厂地磅全自动控制系统 毕业设计 341357
  • (排序详解之 堆排序)
  • (十)【Jmeter】线程(Threads(Users))之jp@gc - Stepping Thread Group (deprecated)
  • (算法)Game
  • .“空心村”成因分析及解决对策122344
  • .NET 8 跨平台高性能边缘采集网关
  • .net core 控制台应用程序读取配置文件app.config
  • .NET使用存储过程实现对数据库的增删改查
  • .NET中的Event与Delegates,从Publisher到Subscriber的衔接!
  • .pyc文件还原.py文件_Python什么情况下会生成pyc文件?
  • @RestControllerAdvice异常统一处理类失效原因
  • @Transactional 参数详解
  • [ C++ ] 类和对象( 下 )
  • [boost]使用boost::function和boost::bind产生的down机一例