当前位置: 首页 > news >正文

SpringBoot优雅的封装不同研发环境下(环境隔离)RocketMq自动ack和手动ack

1. RocketMq的maven依赖版本:

     <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.0</version></dependency>

2.RocketMq的yml文件:

# 自定义属性
system:environment:# 隔离环境名称,拼接到topic后,xxx_topic_tianxin,默认空字符串name: dev# 启动隔离,会自动在topic上拼接激活的配置文件,达到自动隔离的效果# 默认为true,配置类:EnvironmentIsolationConfigisolation: true
rocketmq:# 多个NameServer,host:port;host:port,RocketMQPropertiesnameServer: 你的NameServerproducer:# 发o送同一类消息的设置为同一个grup,保证唯一group: logistics_group# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false# access-keyaccessKey: 你的access-key# secret-keysecretKey: 你的secret-key# 是否启用消息跟踪,默认falseenableMsgTrace: false# 消息跟踪主题的名称值。如果不进行配置,可以使用默认的跟踪主题名称customizedTraceTopic: RMQ_SYS_TRACE_TOPICconsumer:# 指定消费组group: logistics_group#广播消费模式 CLUSTERING(集群消费)、BROADCASTING(广播消费)messageModel: CLUSTERING#设置消费超时时间(分钟)consumeTimeout: 1# 最大重试次数,默认16maxReconsumeTimes: 3# 其他配置参考属性类

3 IsolationConfig读取yml文件配置

package com.logistics.common.rocketMq.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;/*** @author: 吴顺杰* @create: 2024-06-18 10:01* @Description:* RocketMQ多环境隔离配置* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉*/
@Configuration
@Data
public class IsolationConfig {@Value("${system.environment.isolation:true}")private boolean enabledIsolation;@Value("${system.environment.name:''}")private String environmentName;@Value("${rocketmq.nameServer:''}")private String nameServer;@Value("${rocketmq.consumer.group:''}")private String group;@Value("${rocketmq.consumer.messageModel:''}")private String messageModel;@Value("${rocketmq.consumer.consumeTimeout:''}")private int consumeTimeout;@Value("${rocketmq.consumer.maxReconsumeTimes:''}")private int maxReconsumeTimes;
}

4.RocketMQ序列化器处理RocketMqConfig文件

主要是为了解决RocketMQ Jackson不支持Java时间类型配置

package com.logistics.common.rocketMq.config;import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;import java.util.List;/*** RocketMQ序列化器处理** @author 吴顺杰* @since 2024/8/04*/
@Configuration
public class RocketMqConfig {/*** 解决RocketMQ Jackson不支持Java时间类型配置*/@Bean@Primarypublic RocketMQMessageConverter createRocketMQMessageConverter() {RocketMQMessageConverter converter = new RocketMQMessageConverter();CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter();List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters();for (MessageConverter messageConverter : messageConverterList) {if (messageConverter instanceof MappingJackson2MessageConverter) {MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter;ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper();// 增加Java8时间模块支持,实体类可以传递LocalDate/LocalDateTimeobjectMapper.registerModules(new JavaTimeModule());}}return converter;}
}

JSON工具类封装

package com.logistics.common.rocketMq.utils;import com.alibaba.fastjson.JSONObject;/*** JSON工具类* 像工具类这种,建议一定要二次封装,避免出现漏洞时可以快速替换** @author 吴顺杰* @since 2024/6/16*/
public class JsonUtil {private JsonUtil() {}public static String toJson(Object value) {return JSONObject.toJSONString(value);}public static <T> T toObject(String jsonStr, Class<T> clazz) {return JSONObject.parseObject(jsonStr, clazz);}
}

5.rocketMq生产者封装

调度任务生产者:LogisticsAddDispatchMqProducer

package com.logistics.business.rocketMq.producer;import com.logistics.common.exception.base.BaseException;
import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.template.RocketMqTemplate;
import com.logistics.common.utils.spring.SpringUtils;
import lombok.extern.slf4j.Slf4j;/*** 新增调度任务生产者MQ队列*/
@Slf4j
public class LogisticsAddDispatchMqProducer {private static RocketMqTemplate rocketMqTemplate = SpringUtils.getBean(RocketMqTemplate.class);public static void sendAddDispatchMqMessage(AddDispatchMqMessage message) {log.info("新增调度任务成功发送新增调度消息MQ,内容: {}", message);try {rocketMqTemplate.asyncSend(AddDispatchMqContant.ADD_DISPATCH_TOPIC, AddDispatchMqContant.ADD_DISPATCH_TAG, message);} catch (Exception e) {log.error("新增调度任务成功发送新增调度消息MQ失败,内容: {}", message, e);throw new BaseException("新增调度任务成功发送新增调度消息MQ失败,请联系管理员");}}
}

RocketMQ模板类

package com.logistics.common.rocketMq.template;import com.alibaba.fastjson.JSONObject;
import com.logistics.common.rocketMq.config.IsolationConfig;
import com.logistics.common.rocketMq.constant.RocketMqSysConstant;
import com.logistics.common.rocketMq.domain.BaseMqMessage;
import com.logistics.common.rocketMq.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.Resource;/*** RocketMQ模板类**/
@Component
@Slf4j
public class RocketMqTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);@Resource(name = "rocketMQTemplate")private RocketMQTemplate template;@Resourceprivate IsolationConfig isolationConfig;/*** 获取模板,如果封装的方法不够提供原生的使用方式*/public RocketMQTemplate getTemplate() {return template;}/*** 构建目的地*/public String buildDestination(String topic, String tag) {return topic + RocketMqSysConstant.DELIMITER + tag;}/*** 发送同步消息*/public <T extends BaseMqMessage> SendResult syncSend(String topic, String tag, T message) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}// 设置业务键,此处根据公共的参数进行处理Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String buildDestination = buildDestination(topic, tag);SendResult sendResult = template.syncSend(buildDestination, sendMessage);// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集LOGGER.info("[{}]同步消息[{}]发送结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));return sendResult;}/*** 发送异步消息** @param topic* @param tag* @param message* @param <T>*/public <T extends BaseMqMessage> void asyncSend(String topic, String tag, T message) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}// 设置业务键,此处根据公共的参数进行处理Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String buildDestination = buildDestination(topic, tag);template.asyncSend(buildDestination, sendMessage, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {LOGGER.info("[{}]MQ异步消息[{}]发送成功结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));if (sendResult.getSendStatus() != SendStatus.SEND_OK) {//可以存入数据库做处理log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());}}@Overridepublic void onException(Throwable throwable) {LOGGER.info("[{}]MQ异步消息[{}]发送失败结果[{}]", buildDestination, JsonUtil.toJson(message), JSONObject.toJSON(throwable.getMessage()));//可以存入数据库做处理}});}/*** 发送延迟消息** @param message* @param delayLevel* @param <T>* @return*/public <T extends BaseMqMessage> SendResult syncDelaySend(String topic, String tag, T message, int delayLevel) {// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {topic = topic + "_" + isolationConfig.getEnvironmentName();}Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();String destination = buildDestination(topic, tag);SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));return sendResult;}}

AddDispatchMqMessage消息实体

package com.logistics.common.rocketMq.domain;import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 新增调度任务发送队列消息体*/
@Data
@Builder
@NoArgsConstructor
public class AddDispatchMqMessage extends BaseMqMessage {/*** 调度id*/private Long dispatchId;public AddDispatchMqMessage(Long dispatchId) {this.dispatchId = dispatchId;}
}

AddDispatchMqContant类

package com.logistics.common.rocketMq.constant;/*** 新增调度任务MQ队列*/
public class AddDispatchMqContant {/*** 消费主题*/public static final String ADD_DISPATCH_TOPIC = "add_dispatch_topic";/*** 消费标签*/public static final String ADD_DISPATCH_TAG = "add_dispatch_tag";/*** 消费组*/public static final String ADD_DISPATCH_GROUP = "add_dispatch_group";
}

6.rocketMq消费者封装

ACK简介
在实际使用RocketMQ的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功,可能会存在需要多次消费才能成功或者一直消费失败的情况,那作为发送者该做如何处理呢?

RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。

1.手动ack封装

新增增调度任务消费者启动监听类

package com.logistics.business.rocketMq.listener;import com.logistics.business.rocketMq.comsumer.LogisticsAddDispatchMqComsumer;
import com.logistics.common.rocketMq.config.IsolationConfig;
import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** 新增调度任务消费者启动监听类*/
@Component
@Slf4j
public class RetryLogisticsAddDispatchListener {@Resourceprivate IsolationConfig isolationConfig;@Resourceprivate LogisticsAddDispatchMqComsumer logisticsAddDispatchMqComsumer;private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();@PostConstructpublic void start() {try {//启动环境隔离String topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC;if (isolationConfig.isEnabledIsolation() && StringUtils.hasText(isolationConfig.getEnvironmentName())) {consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP + "_" + isolationConfig.getEnvironmentName());topic = topic + "_" + isolationConfig.getEnvironmentName();} else {consumer.setConsumerGroup(AddDispatchMqContant.ADD_DISPATCH_GROUP);}consumer.setNamesrvAddr(isolationConfig.getNameServer());//设置集群消费模式consumer.setMessageModel(MessageModel.valueOf(isolationConfig.getMessageModel()));//设置消费超时时间(分钟)consumer.setConsumeTimeout(isolationConfig.getConsumeTimeout());//最大重试次数consumer.setMaxReconsumeTimes(isolationConfig.getMaxReconsumeTimes());//订阅主题consumer.subscribe(topic, AddDispatchMqContant.ADD_DISPATCH_TAG);//注册消息监听器consumer.registerMessageListener(logisticsAddDispatchMqComsumer);//启动消费端consumer.start();log.info("新增调度任务消费者MQ监听队列启动成功");} catch (MQClientException e) {e.printStackTrace();}}
}

LogisticsAddDispatchMqComsumer注册消息监听器

package com.logistics.business.rocketMq.comsumer;import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.utils.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.nio.charset.StandardCharsets;
import java.util.List;/*** 新增调度任务消费者MQ队列*/
@Slf4j
@Component
public class LogisticsAddDispatchMqComsumer implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt message = msgs.get(0);try {String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);AddDispatchMqMessage addDispatchMqMessage = JsonUtil.toObject(messageBody, AddDispatchMqMessage.class);System.out.println("messageId: " + message.getMsgId() + ",topic: " +message.getTopic() + ",addDispatchMqMessage: " + addDispatchMqMessage);System.out.println(1 / 0);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,mq的偏移量才会下移。也就是手动ack,也只有手动返回CONSUME_SUCCESS,消息体才会偏移。

return ConsumeConcurrentlyStatus.RECONSUME_LATER;

返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; mq会默认重试16次,每次执行间隔

不等。最长好像是2个多小时,具体多少自己看官方文档,一般线上环境设置重试五次失败就进入死信队列了,我这里设置的是重试三次

onsumer Started.
date=Fri Aug 05 14:08:52 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:08:52 CST 2022
ReconsumeTimes=0                        '第一次处理'date=Fri Aug 05 14:09:02 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:09:02 CST 2022
ReconsumeTimes=1                       '第2次处理 与第一次间隔10s'date=Fri Aug 05 14:09:33 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:09:33 CST 2022
ReconsumeTimes=2						'第3次处理 与第2次间隔20s'date=Fri Aug 05 14:10:33 CST 2022 *******
msg=0A28A4923EC018B4AAC217A272330000
date=Fri Aug 05 14:10:33 CST 2022
ReconsumeTimes=3                       '第4次处理 与第3次间隔1m'

2.自动ack封装

BaseMqMessageListener封装

package com.zhjt.rocketmq.listener;import com.zhjt.rocketmq.constant.RocketMqSysConstant;
import com.zhjt.rocketmq.domain.BaseMqMessage;
import com.zhjt.rocketmq.template.RocketMqTemplate;
import com.zhjt.rocketmq.utils.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;/*** 抽象消息监听器,封装了所有公共处理业务,如* 1、基础日志记录* 2、异常处理* 3、消息重试* 4、警告通知* 5、....** @author 吴顺杰* @since 2024/6/17*/
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {/*** 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化*/protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Resourceprivate RocketMqTemplate rocketMqTemplate;/*** 消息者名称** @return 消费者名称*/protected abstract String consumerName();/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void overMaxRetryTimesMessage(T message);/*** 是否过滤消息,例如某些** @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean isFilter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常** @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean isThrowException();/*** 最大重试此处** @return 最大重试次数,默认10次*/protected int maxRetryTimes() {return 10;}/*** isRetry开启时,重新入队延迟时间** @return -1:立即入队重试*/protected int retryDelayLevel() {return -1;}/*** 由父类来完成基础的日志和调配,下面的只是提供一个思路*/public void dispatchMessage(T message) {MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());// 基础日志记录被父类处理了logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));if (isFilter(message)) {logger.info("消息不满足消费条件,已过滤");return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > maxRetryTimes()) {overMaxRetryTimesMessage(message);return;}try {long start = Instant.now().toEpochMilli();handleMessage(message);long end = Instant.now().toEpochMilli();logger.info("消息消费成功,耗时[{}ms]", (end - start));} catch (Exception e) {logger.error("消息消费异常", e);// 是捕获异常还是抛出,由子类决定if (isThrowException()) {throw new RuntimeException(e);}if (isRetry()) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (Objects.nonNull(annotation)) {message.setSource(message.getSource() + "消息重试");message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());} catch (Exception ex) {throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}}}
}

IsolationConfigNew文件里面的方法:

package com.zhjt.rocketmq.config;/*** @author: 吴顺杰* @create: 2024-06-18 10:01* @Description:*/import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.util.StringUtils;/*** RocketMQ多环境隔离配置* 原理:对于每个配置的Bean在实例化前,拿到Bean的监听器注解把group或者topic改掉** @author tianxincoord@163.com* @since 2022/5/18*/
@Configuration
public class IsolationConfigNew implements BeanPostProcessor {@Value("${system.environment.isolation:true}")private boolean enabledIsolation;@Value("${system.environment.name:''}")private String environmentName;@Overridepublic Object postProcessBeforeInitialization(@NonNull Object bean,@NonNull String beanName) throws BeansException {// DefaultRocketMQListenerContainer是监听器实现类if (bean instanceof DefaultRocketMQListenerContainer) {DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;// 开启消息隔离情况下获取隔离配置,隔离topic,根据自己的需求隔离group或者tagif (enabledIsolation && StringUtils.hasText(environmentName)) {container.setTopic(String.join("_", container.getTopic(), environmentName));container.setConsumerGroup(String.join("_", container.getConsumerGroup(), environmentName));}return container;}return bean;}
}

消费者监听实现LogisticsAddDispatchMqComsumer2

package com.logistics.business.rocketMq.comsumer;import com.logistics.common.rocketMq.constant.AddDispatchMqContant;
import com.logistics.common.rocketMq.domain.AddDispatchMqMessage;
import com.logistics.common.rocketMq.listener.BaseMqMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 新增调度任务消费者MQ队列*/
@Slf4j
@Component
@RocketMQMessageListener(topic = AddDispatchMqContant.ADD_DISPATCH_TOPIC,consumerGroup = AddDispatchMqContant.ADD_DISPATCH_GROUP,selectorExpression = AddDispatchMqContant.ADD_DISPATCH_GROUP,// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小consumeThreadMax = 21
)
public class LogisticsAddDispatchMqComsumer2 extends BaseMqMessageListener<AddDispatchMqMessage>implements RocketMQListener<AddDispatchMqMessage> {/*** 此处只是说明封装的思想,更多还是要根据业务操作决定* 内功心法有了,无论什么招式都可以发挥最大威力*/@Overrideprotected String consumerName() {return "RocketMq监听消息";}@Overridepublic void onMessage(AddDispatchMqMessage message) {// 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型super.dispatchMessage(message);}@Overrideprotected void handleMessage(AddDispatchMqMessage message) throws Exception {// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试// 业务异常直接抛出异常即可 否则捕获异常没有抛出 无法进行重试log.info("RocketMq监消息消费数据:{}", message);}@Overrideprotected void overMaxRetryTimesMessage(AddDispatchMqMessage message) {// 当超过指定重试次数消息时此处方法会被调用// 生产中可以进行回退或其他业务操作}@Overrideprotected boolean isRetry() {return true;}@Overrideprotected int maxRetryTimes() {// 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用return 5;}@Overrideprotected boolean isThrowException() {// 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常return false;}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Python爬虫—常用的网络爬虫工具推荐
  • 3.服务注册_服务发现
  • 口语笔记——非谓语动词
  • 【代码随想录训练营第42期 Day38打卡 - 动态规划Part6 - LeetCode 322. 零钱兑换 279.完全平方数 139.单词拆分
  • MyBatis-Plus联表查询(mybatis-plus-join)
  • 1.ftp简介
  • C++程序调用SetWindowsHookEx全局拦截键盘按键消息和窗口消息的Hook实例分享
  • 相机掉帧采集速度慢怎么办巨型帧9014
  • [随便学学]在doker服务器中实现ssh免密登陆
  • 力扣经典题目之->相同的树(递归判断两颗二叉树是否相同)
  • SpringBoot 一文复习知识点概览
  • 一个干净的python项目(没连数据库啥的)
  • 华为 2024 届校园招聘-硬件通⽤/单板开发——第一套(部分题目分享,完整版带答案,共十套)
  • 游泳耳机哪个牌子好?四大爆款游泳耳机实测,优缺点秒懂!
  • flutter之image_picker上传图片
  • JS 中的深拷贝与浅拷贝
  • CSS居中完全指南——构建CSS居中决策树
  • golang中接口赋值与方法集
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • javascript数组去重/查找/插入/删除
  • java中的hashCode
  • JS正则表达式精简教程(JavaScript RegExp 对象)
  • PHP CLI应用的调试原理
  • React的组件模式
  • React中的“虫洞”——Context
  • vue-router 实现分析
  • 电商搜索引擎的架构设计和性能优化
  • 聊聊flink的TableFactory
  • 你真的知道 == 和 equals 的区别吗?
  • 如何优雅地使用 Sublime Text
  • 适配iPhoneX、iPhoneXs、iPhoneXs Max、iPhoneXr 屏幕尺寸及安全区域
  • 用jQuery怎么做到前后端分离
  • 掌握面试——弹出框的实现(一道题中包含布局/js设计模式)
  • Spring Batch JSON 支持
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • 如何在招聘中考核.NET架构师
  • 直播平台建设千万不要忘记流媒体服务器的存在 ...
  • ​一帧图像的Android之旅 :应用的首个绘制请求
  • ​用户画像从0到100的构建思路
  • #define与typedef区别
  • #pragma once与条件编译
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • ()、[]、{}、(())、[[]]命令替换
  • (1)(1.13) SiK无线电高级配置(六)
  • (2/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (floyd+补集) poj 3275
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (Matlab)使用竞争神经网络实现数据聚类
  • (PySpark)RDD实验实战——取最大数出现的次数
  • (附源码)springboot宠物管理系统 毕业设计 121654
  • (三)终结任务
  • (淘宝无限适配)手机端rem布局详解(转载非原创)
  • (学习日记)2024.01.19
  • (一)SvelteKit教程:hello world
  • *p=a是把a的值赋给p,p=a是把a的地址赋给p。