当前位置: 首页 > news >正文

工作中常用的RabbitMQ实践

目录

1.前置知识

准备工作

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic



1.前置知识

rabbitmq有五种工作模式;按照有无交换机分为两大类

无交换机的:简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公平分发两种模式)

有交换机:发布-订阅、路由模式、主题模式

准备工作

安装rabbitmq,并成功启动

2.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

package com.zsp.quartz.queue;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
public class BusinessConfig {// 声明direct交换机public static final String EXCHANGE_DIRECT= "exchange_direct_inform";// 声明fanout交换机public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";// 声明topic交换机public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
}

TestProducer

生产消息

package com.zsp.quartz.queue;import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class TestProducer {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void Producer_topics_springbootTest() {//使用rabbitTemplate发送消息String message = "";User user = new User();user.setName("张三");user.setEmail("anjduahsd");message = JSON.toJSONString(user);// directrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);// fanoutrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);// topicrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);}
}

4.消费者

消费者目录结构:

BusinessConfig内容解析:

①定义交换机类型

②配置交换机与队列的绑定关系

③通过容器工厂声明队列

package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
@Slf4j
@Configuration
public class BusinessConfig {// 声明directpublic static final String EXCHANGE_DIRECT= "exchange_direct_inform";public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";public void BindDirectEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");} catch (Exception e) {log.error("声明Direct->email队列时失败", e);}}public void BindDirectSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");} catch (Exception e) {log.error("声明Direct->sms失败", e);}}// 声明fanoutpublic static final String EXCHANGE_FANOUT= "exchange_fanout_inform";public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";public void BindFanoutEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");} catch (Exception e) {log.error("声明Fanout->email队列时失败", e);}}public void BindFanoutSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");} catch (Exception e) {log.error("声明Fanout->sms失败", e);}}// 声明topicpublic static final String EXCHANGE_TOPIC= "exchange_topic_inform";public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";public void BindTopicEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);} catch (Exception e) {log.error("声明Topic->email队列时失败", e);}}public void BindTopicSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");} catch (Exception e) {log.error("声明Topic->sms失败", e);}}// 声明队列@Autowired@Qualifier(value = "zspConnectionFactory")private ConnectionFactory connectionFactory;@PostConstructpublic void shengmingQueue() {try {Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false);BindDirectEmail(channel);BindDirectSms(channel);BindFanoutEmail(channel);BindFanoutSms(channel);BindTopicEmail(channel);BindTopicSms(channel);} catch (Exception e) {log.error("业务实例声明绑定队列报错:",e);}}
}

RabbitFactory内容解析:

①创建自定义连接工厂

②通过@Qualifier准确注入连接工厂,创建个性化容器工厂

package com.zsp.consumer.queue;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableRabbit
public class RabbitFactory {@Bean("zspConnectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();// 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("root");connectionFactory.setPassword("root");return connectionFactory;}@Bean("rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);return factory;}
}

ReceiveHandler内容解析:

监听绑定的队列消息

package com.zsp.consumer.queue;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ReceiveHandler {//监听自定义的Direct队列@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")public void directSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void directEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->email队列" + jsonObject);}//监听自定义的Fanout队列@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")public void FanoutSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void FanoutEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->email队列" + jsonObject);}//监听自定义的Topic队列@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")public void TopicSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void TopicEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->email队列" + jsonObject);}
}

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写
简单理解就是实例,也就是rabbitmq服务地址是在哪里,实例包括了域名、端口、账号、密码等。

相关文章:

  • GPT-4 变懒了?官方回复
  • Linux 网络协议
  • 秋招春招,我没有拿到一个offer怎么办?
  • 关于IDEA中maven的作用以及如何配置MAVEN
  • springboot(ssm滁州市特产销售系统 特产商城系统Java系统
  • SQLMap介绍
  • 低多边形3D建模石头材质纹理贴图
  • 【微服务】springboot整合quartz使用详解
  • 无人零售店,凭借黑科技引领,它的前景如何?
  • GDPU 数据结构 天码行空13
  • 用户登录权限
  • 【C++】C++中的String类详解及模拟实现示例
  • 【QT】Qt常用数值输入和显示控件
  • 【GlobalMapper精品教程】067:基于无人机航拍照片快速创建正射影像图
  • zookeeper常用接口
  • SegmentFault for Android 3.0 发布
  • ES2017异步函数现已正式可用
  • JavaScript异步流程控制的前世今生
  • Protobuf3语言指南
  • React的组件模式
  • Swoft 源码剖析 - 代码自动更新机制
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 从零搭建Koa2 Server
  • 设计模式 开闭原则
  • 设计模式走一遍---观察者模式
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 世界编程语言排行榜2008年06月(ActionScript 挺进20强)
  • 试着探索高并发下的系统架构面貌
  • 推荐一个React的管理后台框架
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • #传输# #传输数据判断#
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • (C语言)二分查找 超详细
  • (ISPRS,2021)具有遥感知识图谱的鲁棒深度对齐网络用于零样本和广义零样本遥感图像场景分类
  • (js)循环条件满足时终止循环
  • (ZT)北大教授朱青生给学生的一封信:大学,更是一个科学的保证
  • (仿QQ聊天消息列表加载)wp7 listbox 列表项逐一加载的一种实现方式,以及加入渐显动画...
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (区间dp) (经典例题) 石子合并
  • (太强大了) - Linux 性能监控、测试、优化工具
  • (一)eclipse Dynamic web project 工程目录以及文件路径问题
  • (一)utf8mb4_general_ci 和 utf8mb4_unicode_ci 适用排序和比较规则场景
  • (原创)可支持最大高度的NestedScrollView
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • (转)创业的注意事项
  • .bat批处理(五):遍历指定目录下资源文件并更新
  • .gitignore
  • .NET CF命令行调试器MDbg入门(一)
  • .NET Core 成都线下面基会拉开序幕
  • .net 重复调用webservice_Java RMI 远程调用详解,优劣势说明
  • .NET/C# 编译期间能确定的相同字符串,在运行期间是相同的实例
  • .netcore 如何获取系统中所有session_ASP.NET Core如何解决分布式Session一致性问题
  • .Net各种迷惑命名解释
  • .net中的Queue和Stack