当前位置: 首页 > news >正文

ActiveMQ(二)

八:Spring整合Activemq

生产者与消费者的依赖:

<dependencies>
    <!--activemq核心依赖包 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.17.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.13.3</version>
    </dependency>
    <!-- activemq连接池-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
        <version>5.17.0</version>
    </dependency>
    <!-- spring支持jms的包 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.3.22</version>
    </dependency>
    <!-- spring相关依赖包-->
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.16</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>5.3.6</version>
    </dependency>
    <!-- spring核心依赖-->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.3.22</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>5.3.20</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>5.3.6</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-orm</artifactId>
        <version>5.3.22</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.3.6</version>
    </dependency>
    <!-- junit依赖 -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
</dependencies>

生产者与消费者共同的Spring配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd">
<!--    1.开启包的自动扫描-->
    <context:component-scan base-package="com.qf.activemq"/>
<!--    2.配置生产者-->
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
<!--            配置这个Connection连接工厂-->
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="xxx"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>
<!--    目的地bean配置Queue-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="my-spring-queue"/>
    </bean>
<!--    目的地的bean配置Topic-->
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="my-spring-topic"/>
    </bean>
<!--    Spring提供的JMS类-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!--        代入连接工厂-->
        <property name="connectionFactory" ref="connectionFactory"/>
<!--        代入目的地-->
        <property name="defaultDestination" ref="destinationTopic"/>
<!--        消息自动转换器-->
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

生产者的实现:

public class Producer   {
    @Autowired
    private JmsTemplate jmsTemplate ;
    /**
     * 发送消息
     */
    public void send(String message) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message) ;
                return textMessage ;
            }
        });
    }
}

消费者的实现:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:spring-activemq.xml")
public class TestConsumer {
​
    @Autowired
    private JmsTemplate jmsTemplate ;
​
    @Test
    public void testReceive() {
        while (true) {
            //receiveAndConvert()表示接收到消息并且获取到内容
            String context = (String) jmsTemplate.receiveAndConvert();
            System.out.println("接收到的消息为:" + context);
        }
    }
}

设置监听器

public class Consumer implements MessageListener {
    /**
     * 监听到消息后,对消息的处理
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message ;
            try {
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

九:Springboot整合ActiveMQ

1.queue中实现生产者和消费者

1.引入相同的依赖

2.生产者的编写

配置类:

制造类:

application.yml :

3.消费者的编写

2.topic中实现生产者和消费者

生产者:

1.

2.

消费者:

十:ActiveMQ的传输协议

1.各协议介绍

2.配置协议

这里我们的要求是进行配置一个NIO协议

1.

2.所以我们要进行配置一个NIO协议

3.

3.使用auto+协议

代码演示:

1.在activemq.xml配置文件中只进行配置这一个auto+nio

2.

十一:ActiveMQ的持久化机制

十二:ActiveMQ集群搭建

当master主机宕机之后,某一台slave会变为master主机

总结:

(1)集群中的三台broker共享一个kahadb文件系统。也就是说虽然broker有三台,但是持久化的日志文件只有一份。

(2)集群中只有master身份的broker负责对消息进行处理,也就是说对于整个集群,只有master能够提供对外服务,接收消息和发送消息

的服务

(3)当master出现宕机,此时会在slave从机中自动选出新的master主机,对外进行提供服务,实现对activemq的高可用

(4)针对于jdbc的高可用,除了将持久化方案改为jdbc外,还可以进行配置mysql的主从集群,实现mysql的高可用

十三:ActiveMQ的高级特性

1.同步投递和异步投递

同步投递:

(1)生产者投递一个Message给到broker,在生产者接收到broker返回的ack确认之前,都会一直阻塞而不会执行下面的一系列逻辑操作

(2)当生产者接收到broker的ack确认机制之后,会停止阻塞而执行之后的逻辑

异步投递:

(1) 当生产者进行投递一个Message给到broker,投递完成之后,不需要broker返回ack确认,即可被视为成功 然后执行之后的逻辑

区别:

(1)ActiveMQ支持同步和异步两种方式进行将消息发送到broker。

(2)同步和异步的主要区别为:发送Message的效率高低以及是否成功可靠投递

对于同步发送来说:

1.不会丢失消息,消息的可靠性高。因为对于同步向broker发送Message来说,我们需要broker进行返回一个ack表示确认。

2.但是相对异步发送Message效率较低,需要阻塞进行broker确认。

对于异步发送来说:

1.无需broker返回ack确认,因此效率较高。

2.但是有可能丢失Message数据。分析:生产者发送一个Message给broker,对于生产者而言,只要消息发送出去那么就算是

发送成功。但是有可能broker发生网络动荡,那么第一个Message就接收不成功。但是对于生产者而言 第一个Message已经发送成功,

再向broker发送时,是进行发送第二个Message。此时就会导致丢失了第一个Message数据

(3)ActiveMQ默认使用的是异步发送

2.异步投递的实现

(1)对于ActiveMQ中的异步投递,我们使用的是回调方法机制

(2) 生产者进行投递Message给broker,投递出去之后,不阻塞,继续执行之后的逻辑

(3) 当broker成功接收到了Message时,那么就回调onSuccess方法。当broker没有接收到Message,就会回调onException方法

代码实现:

public class ProducerDemo01 {
​
    //指明activemq的地址
    public static final String URL = "tcp://192.168.204.134:61616" ;
    //指明destination目的地
    public static final String QUEUE_NAME = "my_queue_1" ;
​
    public static void main(String[] args) throws Exception{
        //1.获得连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL) ;
        //设置异步投递的方式
        connectionFactory.setUseAsyncSend(true);
        //2.获得连接对象
        Connection connection = connectionFactory.createConnection();
        //3.开启连接
        connection.start();
        //4.从连接对象中获得一个Session会话,该Session是MQ与消息生产者之间开启的会话
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE) ;
        //5.创建队列对象
        Queue queue = session.createQueue(QUEUE_NAME) ;
        // 6.创建ActiveMQMessageProducer对象
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue) ;
        for (int i = 0;i<3;i++) {
            TextMessage message = session.createTextMessage("hello" + i) ;
            producer.send(message, new AsyncCallback() {
                //生产者发送Message,broker确认成功之后回调的方法
                @Override
                public void onSuccess() {
                    System.out.println("发送成功");
                }
                //生产者发送Message,broker确认失败之后回调的方法
                @Override
                public void onException(JMSException e) {
                    System.out.println("发送失败");
                }
            });
        }
        //9.关闭连接
        connection.close();
    }
}

3.延时投递

演示:

1.找到conf文件夹下的activemq.xml文件 并且进入修改

2.修改保存之后,进行重启activemq服务器

3.

4.消息的重试机制

重试机制流程分析:

(1) 生产者投递一个Message到broker所在的队列queue或topic中

(2)消费者进行消费Message的时候,不进行开启事务,但是一直不进行手动commit提交。所以一条Message会一直存留在broker对应的队列当中从而重复被进行消费读取。

(3) 但是消息也具有重试机制,重试机制默认的阙值为6,也就是最多被重试消费6次,也就是一条Message最多被消费7次。

(4) 当被消费满7次,也就是达到阈值6时,但是还没有被打包为事务进行手动提交时,消费者就会发送一个"poison ack"给broker,则

Message就会变成一个毒消息。该Message会被从broker对应的队列拿出放到死信队列中去。

(5) 等到消费者还想继续消费这个Message第8次时,发现broker的队列中找不到该Message。

代码演示:

1.如果什么也不设置,默认重试的阙值为6

2.重新进行设置阙值

5.死信队列

死信队列不是说访问不了的队列,反而可以结合这个队列进行开展一些业务。

eg:订单超过支付时间应该被取消

(1) 上游创建订单时,将消息发送到等待支付的队列

(2) 消息的超时时间为30分钟,如果三十分钟还没有支付,则该消息进入到死信队列

(3) 对于死信队列也配有专门的消费者来处理死信队列的消息,处理方案即是把死信队列中的订单状态改为"已取消"

演示开启死信队列的步骤:

1.编辑activemq.xml配置文件

2.生产者:要确定broker对应死信队列的名字:DLQ.*

3.

6.幂等性消费

(1) 当在一些业务场景出现非幂等性情况

(2) 我们进行用户的注册,由于出现了网络动荡的问题,因此用户点击了多次,所以向broker发送了多条关于同一个的用户id的Message

(3)消费者进行消费接收broker中的Message,然后存储到MySql数据库中。但是同一个用户 相同的id怎么可以存储多份呢?

(4) 为了解决这个问题,保证幂等性,有两种方法:1.mysql中进行插入业务id作为主键,主键是唯一的,一次只可以插入一条。2.使用

redis或zookeeper的分布式锁。

(5) 我们通常使用的是redis的分布式锁,因为为了尽可能避免影响Mysql数据库的性能。

基于redis的分布式锁的实现原理:每当一个id进入之后,我们都会进行上一把以id号为唯一标识的id锁。当相同id的记录再一次过来时,

由于已经上了该id对应的锁,所以就不可以再允许通过该id对应的记录了。所以就不会再把该id对应的记录存储到数据库中了。

相关文章:

  • 某大学ipv6和ipv4结合的校园网规划设计
  • 【程序员表白大师】html七夕脱单必看源码制作
  • 车载VPA形象发展史:谁是第一个吃螃蟹的人?
  • 22.9.30 喜迎暑假多校联赛第二场(欢乐AK找回自信)ABDEFH
  • C++----智能指针
  • SpringMVC处理Ajax请求及处理和响应json格式的数据
  • 论文复现(一)
  • 龙芯+复旦微FPGA全国产VPX高速数据采集卡解决方案
  • 前端blob数据
  • Jenkins+ant+mysql 自动化构建脚本文件输出日志
  • Unity 渲染YUV数据 ---- 以Unity渲染Android Camera数据为例子
  • 高德骨子里还是个「理工男」
  • Vue指令学习 | 零基础入门
  • promise执行顺序面试题令我头秃,你能作对几道
  • 基于springboot的图书管理系统设计与实现
  • 【跃迁之路】【585天】程序员高效学习方法论探索系列(实验阶段342-2018.09.13)...
  • 【跃迁之路】【699天】程序员高效学习方法论探索系列(实验阶段456-2019.1.19)...
  • 【跃迁之路】【733天】程序员高效学习方法论探索系列(实验阶段490-2019.2.23)...
  • 8年软件测试工程师感悟——写给还在迷茫中的朋友
  • css的样式优先级
  • ES10 特性的完整指南
  • k8s 面向应用开发者的基础命令
  • mysql innodb 索引使用指南
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • Sass 快速入门教程
  • uni-app项目数字滚动
  • vue脚手架vue-cli
  • Web Storage相关
  • Web标准制定过程
  • 闭包--闭包之tab栏切换(四)
  • 从0到1:PostCSS 插件开发最佳实践
  • 前端面试题总结
  • 全栈开发——Linux
  • 异步
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • 如何正确理解,内页权重高于首页?
  • ​​​​​​​​​​​​​​Γ函数
  • #define MODIFY_REG(REG, CLEARMASK, SETMASK)
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第2节(共同的基类)
  • (非本人原创)我们工作到底是为了什么?​——HP大中华区总裁孙振耀退休感言(r4笔记第60天)...
  • (附源码)springboot宠物管理系统 毕业设计 121654
  • (附源码)ssm户外用品商城 毕业设计 112346
  • (生成器)yield与(迭代器)generator
  • (转)scrum常见工具列表
  • (转)Unity3DUnity3D在android下调试
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .NET 8.0 中有哪些新的变化?
  • .NET Core WebAPI中使用Log4net 日志级别分类并记录到数据库
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .NET Framework 服务实现监控可观测性最佳实践
  • .NetCore Flurl.Http 升级到4.0后 https 无法建立SSL连接
  • .Net小白的大学四年,内含面经
  • .Net中wcf服务生成及调用
  • .vimrc php,修改home目录下的.vimrc文件,vim配置php高亮显示