当前位置: 首页 > news >正文

java中使用rabbitmq

文章目录

  • 前言
  • 一、引入和配置
    • 1.引入
    • 2.配置
  • 二、使用
    • 1.队列
    • 2.发布/订阅
      • 2.1 fanout(广播)
      • 2.2 direct(Routing/路由)
      • 2.3 Topics(主题)
      • 2.4 Headers
  • 总结


前言

mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安装链接。


一、引入和配置

1.引入

Spring AMQP高级消息队列协议有两部分组成,spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在这里插入图片描述

2.配置

配置参考RabbitProperties.java

spring:rabbitmq:host: 192.168.137.192port: 5672username: guestpassword: guestvirtualHost: /

二、使用

1.队列

在这里插入图片描述
RabbitConfiguration

package com.student.rabbit.queue;import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.amqp.core.Queue;
/*** Create by zjg on 2024/3/9*/
@Configuration
public class RabbitConfiguration {protected final String queueName = "queue";@Beanpublic Queue queue() {return new Queue(this.queueName);}
}

Producer

package rabbit.queue;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/9*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class Producer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate Queue queue;AtomicInteger count = new AtomicInteger(0);@Testpublic void send() {for (int i = 0; i < 10; i++) {StringBuilder builder = new StringBuilder("Hello");builder.append(" "+count.incrementAndGet());String message = builder.toString();template.convertAndSend(queue.getName(), message);System.out.println(" [x] Sent '" + message + "'");}}
}

Consumer

package com.student.rabbit.queue;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/9*/
@Component
public class Consumer {private static final Logger log = LoggerFactory.getLogger(Consumer.class);protected final String queueName = "queue";@RabbitListener(queues = queueName)public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = queueName)public void receive2(String message){log.debug("receive2:"+message);}
}

每个队列都消费了5条消息
在这里插入图片描述

2.发布/订阅

交换机类型有fanout,direct, topic, headers四种,接下来我们来学习每种方式的使用以及它们的区别。

2.1 fanout(广播)

P(生产者)产生消息给到X(交换机),X分发给绑定的所有队列。

在这里插入图片描述

RabbitFanoutConfiguration
我们定义了AnonymousQueue,它创建了一个具有生成名称的非持久、独占、自动删除队列

package com.student.rabbit.fanout;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitFanoutConfiguration {@Beanpublic FanoutExchange fanout() {return new FanoutExchange("sys.fanout");}private static class ReceiverConfig {@Beanpublic Queue fanoutQueue1() {return new AnonymousQueue();}@Beanpublic Queue fanoutQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingFanout1(FanoutExchange fanout,Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanout);}@Beanpublic Binding bindingFanout2(FanoutExchange fanout,Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanout);}}
}

FanoutProducer

package rabbit.fanout;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class FanoutProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate FanoutExchange fanout;@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < 10; i++) {StringBuilder builder = new StringBuilder("Hello");builder.append(" "+count.incrementAndGet());String message = builder.toString();template.convertAndSend(fanout.getName(), "", message);System.out.println(" [x] Sent '" + message + "'");}}
}

FanoutConsumer

package com.student.rabbit.fanout;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class FanoutConsumer {private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);@RabbitListener(queues = "#{fanoutQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{fanoutQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

总共发送10条消息,每个队列都消费了10条
在这里插入图片描述

2.2 direct(Routing/路由)

可以将根据不同的路由规则分发消息,很灵活,消费者需要哪种就订阅哪种消息。

在这里插入图片描述
在这里插入图片描述
RabbitDirectConfiguration

package com.student.rabbit.direct;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitDirectConfiguration {@Beanpublic DirectExchange direct() {return new DirectExchange("sys.direct");}private static class ReceiverConfig {@Beanpublic Queue directQueue1() {return new AnonymousQueue();}@Beanpublic Queue directQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingDirect1a(DirectExchange direct,Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(direct).with("orange");}@Beanpublic Binding bindingDirect1b(DirectExchange direct,Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(direct).with("black");}@Beanpublic Binding bindingDirect2a(DirectExchange direct,Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(direct).with("green");}@Beanpublic Binding bindingDirect2b(DirectExchange direct,Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(direct).with("black");}}
}

DirectProducer

package rabbit.direct;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class DirectProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate DirectExchange direct;private final String[] keys = {"orange", "black", "green"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);String message = builder.toString();template.convertAndSend(direct.getName(), key, message);System.out.println(" [x] Sent '" + message + "'");}}
}

DirectConsumer

package com.student.rabbit.direct;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class DirectConsumer {private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);@RabbitListener(queues = "#{directQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{directQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

共发送了3条消息,有两个队列都绑定了black,所以black的消息消费2次
在这里插入图片描述

2.3 Topics(主题)

主题模式在路由的基础上增加了routingKey的模糊匹配。
*(星)可以代替一个词。
#(hash)可以代替零个或多个单词。

在这里插入图片描述
RabbitTopicConfiguration

package com.student.rabbit.topic;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitTopicConfiguration {@Beanpublic TopicExchange topic() {return new TopicExchange("sys.topic");}private static class ReceiverConfig {@Beanpublic Queue topicQueue1() {return new AnonymousQueue();}@Beanpublic Queue topicQueue2() {return new AnonymousQueue();}@Beanpublic Binding bindingTopic1a(TopicExchange topic,Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");}@Beanpublic Binding bindingTopic1b(TopicExchange topic,Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");}@Beanpublic Binding bindingTopic2a(TopicExchange topic,Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");}@Beanpublic Binding bindingTopic2b(TopicExchange topic,Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topic).with("quick.brown.*");}}
}

TopicProducer

package rabbit.topic;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class TopicProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate TopicExchange topic;private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox","lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);String message = builder.toString();template.convertAndSend(topic.getName(), key, message);System.out.println(" [x] Sent '" + message + "'");}}
}

TopicConsumer

package com.student.rabbit.topic;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class TopicConsumer {private static final Logger log = LoggerFactory.getLogger(TopicConsumer.class);@RabbitListener(queues = "#{topicQueue1.name}")public void receive1(String message){log.debug("receive1:"+message);}@RabbitListener(queues = "#{topicQueue2.name}")public void receive2(String message){log.debug("receive2:"+message);}
}

队列1匹配了中间值为orange和rabbit结尾的消息,队列2匹配了lazy开头和quick.brown开头的消息
在这里插入图片描述

2.4 Headers

关于headers模式,在官方没有找到文档,但包里还有,索性还是写一下吧。

RabbitHeadersConfiguration

package com.student.rabbit.headers;import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;/*** Create by zjg on 2024/3/10*/
@Configuration
public class RabbitHeadersConfiguration {@Beanpublic HeadersExchange headers() {return new HeadersExchange("sys.headers");}private static class ReceiverConfig {@Beanpublic Queue headersQueue1() {return new AnonymousQueue();}@Beanpublic Queue headersQueue2() {return new AnonymousQueue();}@Beanpublic Queue headersQueue3() {return new AnonymousQueue();}@Beanpublic Binding bindingHeaders1(HeadersExchange headers,Queue headersQueue1) {Map<String,Object> headerValue=new HashMap<>();headerValue.put("user","sys");return BindingBuilder.bind(headersQueue1).to(headers).whereAll(headerValue).match();}@Beanpublic Binding bindingHeaders2(HeadersExchange headers,Queue headersQueue2) {Map<String,Object> headerValue=new HashMap<>();headerValue.put("user","admin");return BindingBuilder.bind(headersQueue2).to(headers).whereAll(headerValue).match();}@Beanpublic Binding bindingHeaders3(HeadersExchange headers,Queue headersQueue3) {return BindingBuilder.bind(headersQueue3).to(headers).where("user").exists();}}
}

HeadersProducer

package rabbit.headers;import com.student.SpringbootStart;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.atomic.AtomicInteger;/*** Create by zjg on 2024/3/10*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootStart.class)
public class HeadersProducer {@Autowiredprivate RabbitTemplate template;@Autowiredprivate HeadersExchange headers;private final String[] keys = {"sys", "admin"};@Testpublic void send() {AtomicInteger count = new AtomicInteger(0);for (int i = 0; i < keys.length; i++) {StringBuilder builder = new StringBuilder("Hello to ");String key = keys[count.getAndIncrement()];builder.append(" "+key);MessageProperties messageProperties=new MessageProperties();messageProperties.setHeader("user",key);Message message = MessageBuilder.withBody(builder.toString().getBytes()).andProperties(messageProperties).build();template.send(headers.getName(), "", message);System.out.println(" [x] Sent '" + message + "'");}}
}

HeadersConsumer

package com.student.rabbit.headers;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** Create by zjg on 2024/3/10*/
@Component
public class HeadersConsumer {private static final Logger log = LoggerFactory.getLogger(HeadersConsumer.class);@RabbitListener(queues = "#{headersQueue1.name}")public void receive1(Message message){log.debug("receive1:"+new String(message.getBody()));}@RabbitListener(queues = "#{headersQueue2.name}")public void receive2(Message message){log.debug("receive2:"+new String(message.getBody()));}@RabbitListener(queues = "#{headersQueue3.name}")public void receive3(Message message){log.debug("receive3:"+new String(message.getBody()));}
}

第一个队列接收sys消息,第二个队列接收admin消息,第三个队列只要包含user头的消息都接收。
在这里插入图片描述


总结

回到顶部
安装看这里
官方文档
官方网站
其他项目,可参考官方案例
路漫漫其修远兮,吾将上下而求索。

相关文章:

  • 【Docker】PostgreSQL 容器化部署
  • Elasticsearch:机器学习与人工智能 - 理解差异
  • ctf_show笔记篇(web入门---代码审计)
  • 【Linux基础(三)】信号
  • 爬虫练习:获取某网站的房价信息
  • Gitea相关漏洞
  • 【深入理解设计模式】命令设计模式
  • Linux应用程序对异步通知的处理
  • 算法时空复杂度分析:大O表示法
  • print()大揭秘:如何用Python打印出多样字符
  • 4G安卓核心板T310_紫光展锐平台方案
  • MYSQL--JSON_OBJECT 和 JSON_ARRAYAGG
  • Qt控制台项目也能使用opencv的imshow来显示摄像头视频
  • Playwright中page.locator快速查找网页元素和对象交互操作
  • Python刘诗诗
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • __proto__ 和 prototype的关系
  • 「译」Node.js Streams 基础
  • C++类中的特殊成员函数
  • gitlab-ci配置详解(一)
  • iBatis和MyBatis在使用ResultMap对应关系时的区别
  • JavaScript服务器推送技术之 WebSocket
  • leetcode388. Longest Absolute File Path
  • mysql_config not found
  • PHP 使用 Swoole - TaskWorker 实现异步操作 Mysql
  • SAP云平台里Global Account和Sub Account的关系
  • SegmentFault 2015 Top Rank
  • Sublime Text 2/3 绑定Eclipse快捷键
  • vue.js框架原理浅析
  • vue:响应原理
  • vue脚手架vue-cli
  • 浮动相关
  • 给自己的博客网站加上酷炫的初音未来音乐游戏?
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 检测对象或数组
  • 排序算法学习笔记
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 我感觉这是史上最牛的防sql注入方法类
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • Nginx实现动静分离
  • ​2020 年大前端技术趋势解读
  • #DBA杂记1
  • #include到底该写在哪
  • #LLM入门|Prompt#1.7_文本拓展_Expanding
  • #mysql 8.0 踩坑日记
  • (4.10~4.16)
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (poj1.3.2)1791(构造法模拟)
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)springboot宠物医疗服务网站 毕业设计688413
  • (论文阅读31/100)Stacked hourglass networks for human pose estimation
  • (三)Honghu Cloud云架构一定时调度平台
  • .gitignore文件_Git:.gitignore
  • .net framwork4.6操作MySQL报错Character set ‘utf8mb3‘ is not supported 解决方法