当前位置: 首页 > news >正文

制作一个rabbitmq-sdk以及rabbitmq消费者实现定时上下线功能

目录结构

在这里插入图片描述

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.aasee</groupId><artifactId>sms-modules</artifactId><version>3.6.3</version></parent><artifactId>sms-rabbitmq-starter</artifactId><name>sms-rabbitmq-starter</name><description>rabbitmq-sdk</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- SpringBoot Boot rabbitmq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- skywalking --><dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-logback-1.x</artifactId><version>9.3.0</version></dependency><dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-trace</artifactId><version>9.3.0</version></dependency></dependencies>
</project>
com.aasee.rabbitmq.configure.Callback
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@Slf4j
public class Callback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {/**** @param correlationData correlation data for the callback. 相关配置信息* @param ack true for ack, false for nack exchange交换机 是否成功收到了消息。true代表成功,false代表失败* @param cause An optional cause, for nack, when available, otherwise null.  失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("confirm方法被执行了!!!!! correlationData:{}",correlationData);if (ack){// 接受成功log.info("接受消息成功! correlationDataId: {} ,cause: {} " ,correlationData.getId(), cause);}else {// 接受失败log.error("接受消息失败! correlationDataId: {} ,cause: {} " ,correlationData.getId(), cause);}}/*** 回退模式:当消息发送给Exchange后, Exchange路由到Queue失败时 才会执行 ReturnCallBack* 步骤:*  1. 开启回退模式: publisher-returns: true #是否开启发送端消息抵达队列的确认*  2. 设置ReturnCallBack*  3. 设置Exchange处理消息的模式:*      1. 如果消息没有路由到Queue,则丢弃信息(默认)*      2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack  设置mandatory为true**//**** @param returned the returned message and metadata.*/@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("return机制方法被执行了。。。。。");log.info("消息(message):" + returned.getMessage());log.info("退回原因代码(replyCode):" + returned.getReplyCode());log.info("退回原因(replyText):" + returned.getReplyText());log.info("交换机(exchange):" + returned.getExchange());log.info("路由Key(routingKey):" + returned.getRoutingKey());// TODO 处理未到Queue的数据(或者使用备份交换机)}
}
com.aasee.rabbitmq.configure.CustomMessageInterceptor
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;/*** 自定义消息拦截器** @author Aasee* @date 2024-03-11*/
@Component
public class CustomMessageInterceptor implements MessagePostProcessor {@Overridepublic Message postProcessMessage(Message message) {// 获取原始消息属性MessageProperties properties = message.getMessageProperties();// 设置新的消息头部信息(如果有需求)
//        properties.setHeader("loginUser", SecurityUtils.getLoginUser());return new Message(message.getBody(), properties);}
}
com.aasee.rabbitmq.configure.RabbitMQConfig
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ配置** @author Aasee* @date 2024-02-24*/
@Configuration
public class RabbitMQConfig {/*** rabbitmq Redis 防止重复消费键*/public final static String RABBITMQ_REDIS_KEY="RabbitMq_Send_Sms:";public final static String RABBITMQ_REISSUE_KEY="RabbitMq_Reissue_Sms:";public final static String RABBITMQ_CALLBACK_KEY="RabbitMq_Callback_Sms:";//--------------交换机名称-----------------------------------public static final String EXCHANGE_NAME = "aasee_topic_exchange";//---------------队列名称------------------------------------public static final String QUEUE_NAME = "aasee_queue";public static final String SMS_QUEUE_NAME = "cloud_sms_queue";public static final String REISSUE_QUEUE_NAME = "sms_reissue_queue";public static final String CALLBACK_QUEUE_NAME = "callback_sms_queue";//--------------路由键名称-----------------------------------public static final String ROUTING_KEY = "aasee.#";public static final String SMS_ROUTING_KEY = "sms.#";public static final String REISSUE_ROUTING_KEY = "reissue.#";public static final String CALLBACK_ROUTING_KEY = "callback.#";//------------------------交换机--------------------------// 交换机@Bean(value = "aaseeExchange")public Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//------------------------队列--------------------------// 队列@Bean("aaseeQueue")public Queue bootQueue(){return QueueBuilder.durable(QUEUE_NAME).build();}// SMS 队列@Bean("cloudSmsQueue")public Queue smsQueue(){return QueueBuilder.durable(SMS_QUEUE_NAME).build();}// SMS 补发 队列@Bean("smsReissueQueue")public Queue smsReissueQueue(){return QueueBuilder.durable(REISSUE_QUEUE_NAME).build();}// 短信回调 队列@Bean("callbackSmsQueue")public Queue callbackSmsQueue(){return QueueBuilder.durable(CALLBACK_QUEUE_NAME).build();}//-------------------路由绑定-------------------------// 队列和交换机绑定关系 Binding/*1. 知道哪个队列2. 知道哪个交换机3. routing key*/// 路由绑定@Beanpublic Binding bingQueueExchange(@Qualifier("aaseeQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();}// SMS 路由绑定@Beanpublic Binding bingSmsQueueExchange(@Qualifier("cloudSmsQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(SMS_ROUTING_KEY).noargs();}// 补发 路由绑定@Beanpublic Binding bingSmsReissueQueueExchange(@Qualifier("smsReissueQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(REISSUE_ROUTING_KEY).noargs();}// 短信回调 路由绑定@Beanpublic Binding bingCallbackSmsQueueExchange(@Qualifier("callbackSmsQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(CALLBACK_ROUTING_KEY).noargs();}// Mq模板类@Bean//设置rabbitTemplate的scope为:prototype
//    @Scope("prototype")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setBeforePublishPostProcessors(new CustomMessageInterceptor());//成功回调template.setConfirmCallback(new Callback());// 开启mandatory模式(开启失败回调)template.setMandatory(true);//失败回调template.setReturnsCallback(new Callback());return template;}}
com.aasee.rabbitmq.service.RabbitMqService
import com.alibaba.fastjson2.JSON;
import com.aasee.rabbitmq.configure.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** Rabbit MQ 工具类** @author Aasee* @date 2024-03-11*/
//@SuppressWarnings(value = {"unchecked", "rawtypes"})
@Component
@Slf4j
public class RabbitMqService {@Autowiredpublic RabbitTemplate rabbitTemplate;/*** 发送 MQ 消息** @param exchange             交换机* @param routingKey           路由键* @param value                放入mq中的消息体(需要对象)* @param messagePostProcessor 消息后处理器(自定义处理)* @param correlationData      相关数据(用于传递唯一标识,跟踪绑定数据信息)*/public <T> void sendMqMessage(String exchange, String routingKey,T value, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) {// 推送到Mq中rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(value), messagePostProcessor,correlationData);}/*** 发送 MQ 消息** @param exchange             交换机* @param routingKey           路由键* @param value                放入mq中的消息体(需要对象)* @param messagePostProcessor 消息后处理器(自定义处理)*/public <T> void sendMqMessage(String exchange, String routingKey,T value, MessagePostProcessor messagePostProcessor) {// 唯一标识用于判断消息身份和内容String messageId = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(messageId);// 推送到Mq中String jsonString = JSON.toJSONString(value);log.info("发送 MQ 消息! exchange: {} , routingKey: {} , messageId: {} , jsonString: {}",exchange,routingKey,messageId,jsonString);rabbitTemplate.convertAndSend(exchange, routingKey, jsonString, messagePostProcessor,correlationData);}/*** 发送短信信息** @param value                短信信息* @param messagePostProcessor 消息后处理器(自定义处理)*/public <T> void sendSmsMessage(T value, MessagePostProcessor messagePostProcessor) {// 推送到Mq中rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.SMS_ROUTING_KEY, JSON.toJSONString(value), messagePostProcessor);}
}
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\com.aasee.rabbitmq.configure.RabbitMQConfig, \com.aasee.rabbitmq.service.RabbitMqService, \com.aasee.rabbitmq.configure.CustomMessageInterceptor

以上就是sdk的所有配置内容,使用bean的自动装配原理让这个sdk被引入时可以自动被spring托管配置。

接下来就是展示如何使用sdk的,同时展示nacos配置或者说application.yml配置

引入SDK
<dependency><groupId>com.aasee</groupId><artifactId>sms-rabbitmq-starter</artifactId><version>3.6.3</version>
</dependency>

可以把这个sdk放到阿里云制品仓库,或者自建私服,又或是直接托管到maven中央仓库,这样你的小伙伴们就能直接引入你的sdk

application.yml
# rabbitmq 配置rabbitmq:host: xx.xx.xxx.xxxvirtual-host: /cloudsmsTestusername: rootpassword: xxxxxxport: 5672publisher-confirm-type: correlatedpublisher-returns: true #是否开启发送端消息抵达队列的确认template:mandatory: true # 只要消息没有正确抵达队列,以异步方式优先执行我们自己设置的回调,设置交换机处理失败消息的模式listener:simple:acknowledge-mode: manualprefetch: 1 #更改为每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送
定时上下线

在开发中我发现了一个有趣的需求,定时上线消费,定时下线停止消费,生产者可以持续往队列里发送消息,但是消费者则可以在指定时间,或者通过手动的方式上下线,以下是具体实现方法

com.aasee.smsconsumer.scheduler.ConsumerScheduler
import com.aasee.common.core.constant.ChannelConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
@Slf4j
public class ConsumerScheduler implements ApplicationListener<ContextRefreshedEvent> {@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RabbitListenerEndpointRegistry registry;// 每天早上8点启动消费者// todo 创建接口用于手动启动@Scheduled(cron = "0 0 8 * * ?")public void startListening() {
//        RabbitListenerEndpointRegistry registry = applicationContext.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class);startConsumer();}public void startConsumer() {if (!registry.getListenerContainer(ChannelConstant.SMSCONSUMER).isRunning()) {registry.getListenerContainer(ChannelConstant.SMSCONSUMER).start();log.info("smsConsumer 开启监听");}if (!registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).isRunning()) {registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).start();log.info("reissueConsumer 开启监听");}}// 每天晚上7点停止消费者@Scheduled(cron = "0 0 19 * * ?")public void stopListening() {
//        RabbitListenerEndpointRegistry registry = applicationContext.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class);if (registry.getListenerContainer(ChannelConstant.SMSCONSUMER).isRunning()) {registry.getListenerContainer(ChannelConstant.SMSCONSUMER).stop();log.info("smsConsumer 停止监听");}if (registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).isRunning()) {registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).stop();log.info("reissueConsumer 停止监听");}}// 这个是为了每次启动或者重启服务后马上开始消费,可以取消@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {log.info("服务启动开启消费!");startConsumer();}
}
注意

如何你们存在这种定时上下线的需求,需要在**@RabbitListener**注解上加多一个参数 autoStartup = “false” ,这样消费者就不会自动消费消息了

@RabbitListener(id = ChannelConstant.REISSUECONSUMER,queues = RabbitMQConfig.REISSUE_QUEUE_NAME,autoStartup = "false")

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 怎么让Nginx可以访问某一IP的每个后台controller接口
  • Html css样式总结
  • go语言 结构体
  • 低代码平台:数据筛选功能的全新变革
  • [Redis] 渐进式遍历+使用jedis操作Redis+使用Spring操作Redis
  • 7款国内AI搜索引擎大全网站
  • Project Online 专业版部署方案
  • 周末总结(2024/09/22)
  • 使用 Fairseq 进行音频预训练:Train a wav2vec 2.0 base model配置与实现
  • 【C语言零基础入门篇 - 16】:栈和队列
  • IT行业中的工作生活平衡探讨
  • LeetcodeLCR 116. 省份数量
  • java项目之常规应急物资管理系统(源码+文档)
  • Study Plan For Algorithms - Part36
  • 如何在Chrome最新浏览器中调用ActiveX控件?
  • 9月CHINA-PUB-OPENDAY技术沙龙——IPHONE
  • @jsonView过滤属性
  • Android系统模拟器绘制实现概述
  • express.js的介绍及使用
  • github指令
  • PV统计优化设计
  • select2 取值 遍历 设置默认值
  • 看完九篇字体系列的文章,你还觉得我是在说字体?
  • 如何解决微信端直接跳WAP端
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 为视图添加丝滑的水波纹
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 一天一个设计模式之JS实现——适配器模式
  • 移动端 h5开发相关内容总结(三)
  • 译自由幺半群
  • 关于Android全面屏虚拟导航栏的适配总结
  • 正则表达式-基础知识Review
  • #DBA杂记1
  • #我与Java虚拟机的故事#连载12:一本书带我深入Java领域
  • $.ajax()
  • $emit传递多个参数_PPC和MIPS指令集下二进制代码中函数参数个数的识别方法
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (十六)串口UART
  • (一)模式识别——基于SVM的道路分割实验(附资源)
  • (转)大道至简,职场上做人做事做管理
  • (转)淘淘商城系列——使用Spring来管理Redis单机版和集群版
  • (转载)OpenStack Hacker养成指南
  • .Net Attribute详解(上)-Attribute本质以及一个简单示例
  • .NET CLR基本术语
  • .NET Core 中的路径问题
  • .Net 基于MiniExcel的导入功能接口示例
  • .NET/C# 使窗口永不激活(No Activate 永不获得焦点)
  • .net最好用的JSON类Newtonsoft.Json获取多级数据SelectToken
  • /etc/X11/xorg.conf 文件被误改后进不了图形化界面
  • @PreAuthorize注解
  • @取消转义
  • [ vulhub漏洞复现篇 ] struts2远程代码执行漏洞 S2-005 (CVE-2010-1870)
  • [ 常用工具篇 ] POC-bomber 漏洞检测工具安装及使用详解
  • [C#学习笔记]Newtonsoft.Json