当前位置: 首页 > news >正文

RabbitMQ从入门到入土

同步与异步

同步调用

优势:

  • 时效性强,等到结果后就返回

问题:

  • 扩展性差

  • 性能下降

  • 级联失败问题

异步调用

优势:

  • 耦合度低,扩展性强

  • 无需等待,性能好

  • 故障隔离,下游服务故障不影响上游

  • 缓存消息,削峰填谷

问题:

  • 不能立刻获得结果,时效性差

  • 不确定下游业务执行是否成功

  • 业务安全依赖于Broker的可靠性

Broker:(代理,常指一种中间件,用于协调和管理不同组件之间的通信、交互、服务调用)

异步调用基于消息推送的方式,一般包含3个角色。

  • 消息发送者

  • 消息代理:管理、暂存、转发消息

  • 消息接收者

MQ技术选型

MQ:消息队列,就是存放消息的队列,也就是异步调用中的Broker。

RabbitMQ、ActiveMQ、Rocket MQ、Kafka对比

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般
  • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

  • 追求可靠性:RabbitMQ、RocketMQ

  • 追求吞吐能力:RocketMQ、Kafka

  • 追求消息低延迟:RabbitMQ、Kafka

个人认为:如果在项目中使用的是RabbitMQ,面试官问你为啥使用这个可以从以下入手:

  • 可用性高

  • 消息可靠性高

  • 虽然单机吞吐量一般,但是消息延迟低。为什么消息延迟低呢?

    • RabbitMQ之所以被认为能够提供较低的消息延迟,主要归因于以下几个因素:

      1. 高性能的底层实现:RabbitMQ是用Erlang语言编写的,Erlang专为高并发、分布式系统设计,提供了轻量级进程和共享无锁数据结构,这使得RabbitMQ在处理大量并发连接和消息时表现得非常高效。

      2. 零拷贝技术:RabbitMQ利用操作系统提供的零拷贝特性,减少数据在内核空间和用户空间之间的复制次数,从而加快消息传输速度,降低延迟。

      3. 多路复用的TCP连接:通过使用Channel(信道)这一概念,RabbitMQ可以在单个TCP连接上复用多个逻辑连接,减少了网络连接的开销,提升了通信效率。

      4. 可配置的消息优先级:RabbitMQ允许为消息设置优先级,这在某些场景下可以帮助紧急或高优先级的消息更快地被消费,减少它们的等待时间。

      5. 社区支持和成熟度:作为一个成熟的开源项目,RabbitMQ拥有活跃的开发者社区和丰富的文档资源,这意味着它经过了广泛的测试和优化,能够提供稳定的低延迟表现。

好,现在开始来介绍我们的RabbitMQ了

RabbitMQ

认识

整体架构和核心概念如下:

  • publisher:消息发布者

  • consumer:消息消费者

  • queue:队列,存储消息

  • exchange:交换机:转发消息

image-20240215142856543

Java客户端使用步骤

我们要知道的是RabbitMQ是根据AMQP协议来实现的:

AMQP:用于应用程序之间传递业务消息的开放标准

Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息

使用之前你要从官网进行下载RabbitMQ这个中间件,并启动它。

下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.3

官方使用地址:RabbitMQ Tutorials | RabbitMQ

  1. 引入Spring-amqp的依赖

    <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version> <!-- 或者使用最新版本 -->
    </dependency>

    2、配置RabbitMQ服务端信息

    spring:rabbitmq:host: 127.0.0.1 #ipport: 5672      #端口username: guest #账号password: guest #密码virtualHost:    #链接的虚拟主机addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60spublisherConfirms: true  #发布确认机制是否启用#确认消息已发送到交换机(Exchange)#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。#publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:truepublisherReturns: true #发布返回是否启用connectionTimeout: #链接超时。单位ms。0表示无穷大不超时

    3、利用RabbitTemplate发送消息

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    ​
    public class Producer {
    ​private static final String QUEUE_NAME = "hello";
    ​public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
    }

    4、利用@RabbitListener注解声明要监听的队列,监听消息

    import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    ​
    public class Consumer {
    ​private static final String QUEUE_NAME = "hello";
    ​public static void main(String[] argv) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址Connection connection = factory.newConnection();Channel channel = connection.createChannel();
    ​DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    ​System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
    }

WorkQueue

  • 一个队列绑定多个消费者,加快消息处理速度。

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

Java声明队列和交换机

SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建

  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

  • Binding:用于声明队列和交换机绑 定关系,可以用工厂类BindingBuilder构建

在consumer中创建一个类,声明队列和交换机:

1、通过配置实现

package cn.itcast.mq.config;
​
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}
​/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}
​/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
​/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}
​/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
​
2、通过注解实现@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable="true"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws Exception{System.out.println("消费者1收到了 direct.queue1的消息:【" + msg + "】");
}

交换机

消息一般都是通过exchange来发送消息的,而不是直接发送到队列中的。

常用交换机的类型有以下三种:

  • Fanout(广播):将消息发送给所有跟该交换机绑定了的queue(就是每个人都能收到)

  • Direct(定向):消息根据规则路由到指定的queue

  • Topic(话题):根据类别来进行发送消息(类似Direct)

Fouout交换机

这里定义了一个生产者发送消息:

    @Testpublic void testSendFanout(){String exchangeName = "test.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName,null,msg);}

下面是两个消费者

package cn.itcast.mq.listener;
​
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
import java.time.LocalTime;
​
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanout1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}
​@RabbitListener(queues = "fanout.queue2")public void listenFanout2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
​
}
​

这里是接收到的消息,每个都接收到了。

消费者2........接收到消息:【hello, everyone!】18:50:53.628336200
消费者1接收到消息:【hello, everyone!】18:50:53.628336200

定向交换机

Direct Exchange会将收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey

  • 发布者发送消息时,指定消息的RoutingKey

  • Exchange将消息路由到BindingKey与消息RoutingKey一直的队列

添加配置类,将交换机和队列进行绑定:

@Configuration
@EnableRabbit
public class RabbitConfig {
​@BeanDirectExchange directExchange() {return new DirectExchange("test.direct");}
​@BeanQueue redQueue() {return new Queue("redQueue", false); // false 表示队列不是持久化的}
​@BeanBinding bindingRed(DirectExchange directExchange, Queue redQueue) {return BindingBuilder.bind(redQueue()).to(directExchange).with("red"); // 绑定键为"red"}
}

发送端:

  @Testpublic void testSendFanout(){String exchangeName = "test.direct";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName,"red",msg);}   //这个red是以及绑定了的BindingKey

消费端:

@Service
public class FanoutReceiver {
​@RabbitListener(queues = "redQueue")public void receiveMessage(String message) {System.out.println("Received from redQueue: " + message);}
}

话题交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分隔。例如:

China.news (代表中国新闻这个列表)

Queue与Exchange指定BindingKey时可以使用通配符:

  • #: 代指0个或多个单词

  • *:代指一个单词

配置类代码:

@Configuration
@EnableRabbit
public class RabbitConfig {
​@BeanTopicExchange topicExchange() {return new TopicExchange("test.topic");}
​@BeanQueue chinaNewsQueue() {return new Queue("china.news.queue", false);}
​@BeanQueue chinaSportsQueue() {return new Queue("china.sports.queue", false);}
​@BeanBinding bindingChinaNews(TopicExchange topicExchange, Queue chinaNewsQueue) {return BindingBuilder.bind(chinaNewsQueue()).to(topicExchange).with("China.news"); }
​@BeanBinding bindingChinaSports(TopicExchange topicExchange, Queue chinaSportsQueue) {return BindingBuilder.bind(chinaSportsQueue()).to(topicExchange).with("China.sports.*"); // 匹配以"China.sports."开头的所有Routing Key}
}

生产者代码:

@Test
public void testSendTopic() {String exchangeName = "test.topic";String routingKey = "China.news"; // 使用符合Topic规则的Routing KeyString msg = "Breaking news from China!";rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}

消费者:

@Service
public class TopicReceiver {
​@RabbitListener(queues = "china.news.queue")public void receiveNews(String message) {System.out.println("Received news from 'China.news': " + message);}
​@RabbitListener(queues = "china.sports.queue")public void receiveSports(String message) {System.out.println("Received sports update from 'China.sports.*': " + message);}
}

消息转换器

Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

所以,我们就希望让消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

1、引入依赖

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

2、配置Bean

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

可靠性

发送者的可靠性

生产者重连

有时候由于网络波动,出现连接MQ失败情况。

我们可以通过配置开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制inital-interval: 1000ms #失败后的初始等待时间multiplier: 1 #失败后下次等待时长的倍数,下次等待时长 = initial-interval * multiplermax-attempts: 3 #最大重试次数

注意:

  • 当网络不稳定的时候,利用重试机制可以有效的提高消息发送成功率,但是SpringAMQP的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是呗阻塞的,会影响性能。

  • 如果对于业务性能有要求的时候,建议禁用重试机制。如果一定要用,请进行适当的配置。

生产者确认

生产者确认的作用:为了让生产者知道消息是否被消费成功。

RabbitMQ的Publisher Confirm 和Publisher Return两种确认机制:

  • ConfirmCallback:当RabbitMQ成功处理消息时调用

  • ReturnCallback:当消息无法路由到任何队列时调用,例如由于交换器找不到匹配的队列,此时会返回消息及原因。

开启确认机制后,以下两种情况会返回消息被接收的ACK:

  • 消息被投递到匹配的队列

  • 持久化消息写入磁盘

注意这一种情况:

当消息被投递到MQ后,但是路由失败(没有匹配规则的队列),一样会返回ACK

实现生产者确认
  1. 配置实现:

    spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: true#publisher-confirm-type有3中模式:
    #- none:关闭confirm机制
    #- simple:同步阻塞等待MQ的回执消息
    #- correlated:MQ异步回调方式返回回执消息

  2. import com.rabbitmq.client.*;
    ​
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicInteger;
    ​
    public class RabbitMQProducerConfirmAndReturn {
    ​private static final String QUEUE_NAME = "my_queue";private static final String EXCHANGE_NAME = "my_exchange";private static final String ROUTING_KEY = "my_routing_key";
    ​public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost"); // 根据实际情况设置RabbitMQ服务器地址Connection connection = factory.newConnection();Channel channel = connection.createChannel();
    ​// 开启发布确认channel.confirmSelect();
    ​AtomicInteger outstandingConfirms = new AtomicInteger(0);
    ​// 添加ConfirmCallbackchannel.addConfirmListener((deliveryTag, multiple) -> {System.out.println("Confirmed delivery for message with tag: " + deliveryTag);outstandingConfirms.decrementAndGet();}, (deliveryTag, multiple) -> {System.out.println("Negative acknowledgement received for message with tag: " + deliveryTag);// 这里可以根据需要处理未确认消息的逻辑});
    ​// 添加ReturnCallbackchannel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {System.out.println("Returned message: " + new String(body));Map<String, Object> headers = properties.getHeaders();if (headers != null && headers.containsKey("messageId")) {String messageId = (String) headers.get("messageId");System.out.println("Message with ID " + messageId + " was not routed.");// 这里可以添加消息未路由的重试逻辑}});
    ​try {String customMessageId = "msg-id-123";sendMessageWithId(channel, EXCHANGE_NAME, ROUTING_KEY, "Hello, World!".getBytes(), customMessageId);} finally {channel.close();connection.close();}}
    ​private static void sendMessageWithId(Channel channel, String exchange, String routingKey, byte[] messageBody, String messageId) throws IOException {Map<String, Object> headers = new HashMap<>();headers.put("messageId", messageId); // 自定义消息ID作为header
    ​BasicProperties props = new BasicProperties.Builder().deliveryMode(2) // 持久化消息.headers(headers).build();
    ​channel.basicPublish(exchange, routingKey, props, messageBody);outstandingConfirms.incrementAndGet(); // 记录待确认的消息数System.out.println("Published message with ID " + messageId);}
    }

MQ的可靠性

数据持久化

默认情况,Rabbit会将收到的消息存在内存中。

RabbitMQ实现数据持久化(存放在磁盘中)有3个方面:

  • 交换机持久化

    • 声明的时候将 durable属性配置为true

  • 队列持久化

    • 声明的时候将 durable属性配置为true

  • 消息持久化

    • 设置消息属性中的deliveryMode2来标记消息为持久化

注意事项:

  • 持久化消息并不保证零丢失,因为它们在内存中排队等待写入磁盘时仍有可能因系统崩溃而丢失。

  • 持久化会增加消息发布的延迟,因为消息必须等待被写入磁盘。

  • 队列和交换器的持久化并不会自动持久化其中的消息,消息的持久化需要单独设置。

  • 如果之前声明的队列或交换器是非持久化的,需要先删除原有队列或交换器,然后重新声明为持久化版本,否则会遇到错误。

  • 为了确保消息不丢失,除了持久化之外,还需要考虑消费者确认(Ack)机制,以及可能的死信队列和重试策略。

Lazy Queue

惰性队列特征:

  1. 接收到消息直接存入磁盘,非内存(内存中只保留最近的消息,默认2048条)

  2. 消费者要消费消息才会从磁盘读取消息并加载到内存中

  3. 支持数百万条的消息存储

在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

实现方式
  1. Java代码声明中实现

    @Bean
    public Queue lazyQueue() {return QueueBuilder.durable("lazy.queue")  //持久化.lazy()                 //开启lazy模式.build();
    }

  2. 消费端实现:

    @RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",arguments = @Argument(name = "x-queue-mode", value = "lazy")))//lazy开启了消息持久化
    public void listenLazyQueue(String msg){log.info("接收到lazy.queue的消息:{}",msg);
    }

消费者的确认机制

  • 为了确定消费者是否成功处理消息而提出的消费者确认机制。

  • 当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

    • ack:成功处理消息,RabbitMQ从队列中删除该消息

    • nack:消息处理失败,RabbitMQ需要再次投递消息

    • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(消费者无法处理接收到的消息时;消费者在处理消息的过程中发生了异常)

实现:

SpringAMQP已经实现了消息确认功能,运行通过配置文件选择ACK处理方式,有3种方式:

  • none:不处理。即消息投递后,直接返回ack,不安全,不建议使用

  • manual :手动模式。需要自己在业务种调用api,发送ack或者reject,存在业务入侵,但是更灵活

  • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强,当业务正常执行,返回ack,出现异常

    • 业务异常,nack

    • 消息处理或校验异常,reject

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: manual

建议一般采用manual实现。

失败重试机制

  • 问题:消费者出现异常后,消息会不断的重新入队,发送给消费者,这样无限循环,导致mq消息处理飙升。

  • 解决方案:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enable: trueinitial-interval: 1000ms #初始失败等待时长multiplier: 1max-attempts: 3 #最大重试次数statuless: true #true无状态,false有状态。如果业务中包含事务,这里改成false

如果重试次数耗尽的时候,如果消息依然失败,还有兜底的策略,可以实现MessageRecoverer接口实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认这种方式)

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽,将失败消息发送给指定交换机(专门用来处理失败消息的,死信交换机)

RepublishMessageRecoverer的示例:

  1. 定义接收失败消息的交换机、队列,并绑定关系。此处实现略。

  2. 定义RepublishMessageRecoverer

    @ConditionalOnProperty(prefix = "spring.rabbitmq.retry",name = "enable", havingValue = "true")
    //可以在配置类上加入这个代码,只有上述条件满足的时候,配置才实现
    ​
    ​
    //这里还有配置交换机、队列、以及进行绑定
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, " error.direct", "error");
    }

业务幂等性

幂等:一个数学概念。f(x) = f( f(x) )。意思就是:同一个业务,执行一次或多次,对业务的影响是一致的。(比如说删除和查询)

唯一消息id

给每一消息都设置一个唯一id,利用唯一id区分是否重复消息。

  1. 每一条消息都生成一个唯一id,与消息一起投递给消费者

  2. 消费者接收到消息后处理自己的业务,业务完成将消息id保存到数据库

  3. 如果下次收到相同消息,去数据库中判断是否存储,存在则重复消息放弃处理

@Bean
public MessageConverter messageConverter(){//1、定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();//2、配置启动自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否重复消息jjmc.setCreateMessageIds(true);
}

这个消息id是保存在header中的

@RabbitListener(queues = "yourQueueName")
public void listen(Message message) {MessageProperties messageProperties = message.getMessageProperties();String messageId = messageProperties.getMessageId(); // 获取消息IDif (messageId != null) {System.out.println("Received message with ID: " + messageId);// 进行幂等性检查或其他基于ID的处理逻辑}// 其他消息处理逻辑...
}

业务判断

结合实际业务逻辑做判断。

比如说一个订单业务,我们要防止订单状态修改后不再继续被修改,就可以对订单业务进行判断:

  • 如果订单是未支付状态,才变成已支付状态

  • 如果订单是已支付状态,状态不变

延迟消息

生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定消息后才收到消息。

死信交换机

死信:当队列中的消息满足下列情况之一后,就成为了死信。(dead letter)

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。

  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

在队列的dead-letter-exchange属性中指定一个A交换机,则该队列中的死信会投递到A交换机中,该A交换机就是死信交换机Dead Letter Exchange,简称DLX)。

image-20240611130025919

插件实现

官方提供了一个插件实现延迟消息,网址如下:Community Plugins | RabbitMQ

下载rabbitmq_delayed_message_exchage

1、声明延迟交换机

1)方式一

 @Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed()  //设置delay的属性为true.durable(true).build();}

2)方式二

  
  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed = true 开启延迟交换机key = "delay"))public void listenToDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}",msg);}

2、测试:

@Testvoid testSendDelayMessage(){rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//设置消息延迟时间message.getMessageProperties().setDelay(1000);return message;}});log.info("消息发送成功!");}

消息堆积

为什么会出现消息堆积这种问题呢?

RabbitMQ 消息堆积问题通常发生在生产者发送消息的速度远大于消费者处理消息的速度时,这可能导致队列中消息的累积,直至达到存储上限,进而影响系统性能甚至导致消息丢失。

解决方案

要生成解决方案的前提,我们首先要先确定对应的消息堆积产生场景,对症下药。

  • 消费者处理消息速度太慢

    • 增加消费者数量

    • 优化消费者性能,优化代码,增加资源

    • 消息预取限制(prefetch在配置文件中的配置),以避免一次处理过多消息导致处理缓慢

  • 队列容量太小

    • 增加队列容量

  • 网络故障,导致消息可能丢失,导致消息在队列中堆积

    • 监控 + 告警,确保网络故障发送时能快速发现并解决问题

    • 持久化 + 高可用:确保消息和队列持久化以避免消息丢失,并使用镜像队列提高可用性

  • 消费者故障

    • 使用死信队列:将无法处理的添加到死信队列中,避免阻塞主队列

    • 容错机制:消费者自动重复和错误处理逻辑

  • 队列配置不当

    • 优化队列配置:检测并优化消息确认模式,队列长度限制和其他相关配置

  • 消息太大了,处理时间较长

    • 消息分片:将大型消息分割成小的消息片段,加速处理

  • 业务逻辑复杂或耗时

    • 优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间

  • 消息产生速度快于消费者速度

    • 限流

    • 负载均衡:消费者间公平分配,避免个别消费者加载

  • 其他配置优化

    • 设置消息优先级,确保优先级高的先处理

    • 配置文件描述符的限制,内存使用限制

补充

消息队列的路由模型

消息队列的路由模型指的是消息从生产者到消费者的传递路径和方式。常见的消息队列路由模型包括:

  • 点对点模型(Point-to-Point):消息被发送到一个队列中,只有一个消费者可以接收并处理该消息。

  • 发布-订阅模型(Publish-Subscribe):消息被发送到一个主题(或交换机)中,多个消费者可以订阅该主题,并且每个消费者都可以收到消息并独立处理。

  • 路由模型(Routing):消息根据特定的路由规则被发送到不同的队列中,消费者根据队列接收并处理消息。

消息队列的应用经验(使用场景)

  • 异步通信:在系统内部或者不同系统之间进行异步通信,提高系统的响应速度和吞吐量。

  • 任务调度和削峰填谷:通过消息队列进行任务调度,将请求分散到不同的时间段或者不同的处理节点,避免系统在高峰时期负载过重。

  • 分布式事务:在分布式系统中使用消息队列进行事件的发布和订阅,保证系统的一致性和可靠性。

  • 日志收集和数据分析:通过消息队列将日志数据发送到消息队列中进行集中收集和处理,方便进行数据分析和监控。

消息消费顺序性问题

很多时候,我们的MQ使用并不需要保证顺序消费,比如订单超时等。

但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如在库存更新场景中,减少库存和增加库存的通知必须按照接收顺序处理,以防止库存数量错误或超卖现象。

那这个时候我们该怎么实现我们消息消费顺序性呢?

目前的方案是:

  • 单一消费者:一个队列绑定一个消费者,采用单活模式实现顺序消费

  • 分区策略:将不同的消息进行分区(Direct),然后不同的区绑定不同的消息队列,可以加大并发

  • 排序:顺序消息ID+手动排序

这里是一个单活模式的例子:

/*** 创建一个 单活模式的队列* @param name* @return queue*/private Queue creatQueue(String name) {HashMap<String, Object> args = new HashMap<>();// x-single-active-consumer 单活模式 队列// 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,// 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}

相关文章:

  • 什么是校园抄表系统?
  • 基于SOA海鸥优化算法的三维曲面最高点搜索matlab仿真
  • ABSD方法论:一种有效的软件开发方法
  • 网络故障排除:保持网络稳定与业务连续
  • esp32s3-gc9a01-lvgl
  • 爬取京东商品图片的Python实现方法
  • 跨国大文件传输需要哪些方面?怎么实现数据快速传输?
  • 堡垒机的自动化运维,快速安全提升运维效率
  • 基于电压矢量变换的锁相环simulink建模与仿真
  • 【大数据-算法】资源调度算法:动态资源分配策略的深入探讨
  • 更适合工程师和研究僧的FPGA专项培训课程
  • 简单聊聊Vue
  • 华为鲲鹏应用开发基础:鲲鹏处理器及关键硬件特性介绍(二)
  • 操作系统真相还原--第七章中断实验BUG--找不到中断向量表
  • 如何使用Python在word文档中创建表格
  • [ JavaScript ] 数据结构与算法 —— 链表
  • 【前端学习】-粗谈选择器
  • 2019.2.20 c++ 知识梳理
  • CSS3 聊天气泡框以及 inherit、currentColor 关键字
  • dva中组件的懒加载
  • gulp 教程
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • React-redux的原理以及使用
  • Redux系列x:源码分析
  • Spring Boot快速入门(一):Hello Spring Boot
  • vue+element后台管理系统,从后端获取路由表,并正常渲染
  • vue学习系列(二)vue-cli
  • 从重复到重用
  • 服务器之间,相同帐号,实现免密钥登录
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 如何在GitHub上创建个人博客
  • 数据可视化之 Sankey 桑基图的实现
  • 学习HTTP相关知识笔记
  • 1.Ext JS 建立web开发工程
  • Semaphore
  • 完善智慧办公建设,小熊U租获京东数千万元A+轮融资 ...
  • ​比特币大跌的 2 个原因
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (zhuan) 一些RL的文献(及笔记)
  • (ZT) 理解系统底层的概念是多么重要(by趋势科技邹飞)
  • (笔记)M1使用hombrew安装qemu
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)springboot炼糖厂地磅全自动控制系统 毕业设计 341357
  • (南京观海微电子)——COF介绍
  • (三)mysql_MYSQL(三)
  • (已解决)什么是vue导航守卫
  • (转载)Google Chrome调试JS
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...
  • ... fatal error LINK1120:1个无法解析的外部命令 的解决办法
  • .bat批处理(五):遍历指定目录下资源文件并更新
  • .NET Core 实现 Redis 批量查询指定格式的Key