当前位置: 首页 > news >正文

java消息队列ActiveMQ

安装

前置条件

  • activemq的运行依赖于jdk,需要提前安装jdk
  • 如果已经安装了jdk,需要根据jdk的版本来选择对应的版本进行安装activemq
  • 版本对应在官网上,使用java -version 看jdk的版本
  • 注意:jdk和mq的版本不一致会报错,电脑的命名不能用中文

启动

  • 有的版本bin目录会有2个目录win32和win64,有的只有win64
  • 打开bin目录
  • 点击activemq.bat,访问网址 http://127.0.0.1:8161/admin/

在这里插入图片描述

使用服务进行启动

  • 上述方式使用bat文件启动,只要cmd窗口一关,就会关闭
  • 自带安装服务和卸载服务的bat文件(使用管理员权限运行)

在这里插入图片描述

关闭或开启验证

  • 更改完配置之后记得重启一下服务

在这里插入图片描述
在这里插入图片描述

消息中间件的应用场景

  • 异步处理
  • 应用解耦
  • 流量削峰

场景

用户注册需要三个功能:写数据库、发送邮件、注册短信

异步处理

串行

在这里插入图片描述

并行

在这里插入图片描述

消息中间件

在这里插入图片描述

对比

在这里插入图片描述

应用解耦

在这里插入图片描述

流量削峰

在这里插入图片描述

中间件对比

  • activemq:java、万级吞吐量、毫秒级响应速度、可用性高(主从架构),很多公司在用
  • rabbitmq:erlang,其他同上,管理界面丰富
  • rocketmq:java,10万级别,可用性很高(分布式)
  • kafka:消息查询和追溯没有提供,大数据应用广泛

jms消息模型

  • 两种模型
  • P2P(point to point):点对点模型(queue模型)
  • P/S(publish/subscribe):发布订阅模型(topic模型)

点对点

生产者消费者之间的消息往来

在这里插入图片描述

特点

  • 每个消息只能被一个消费者消费,消费之后就直接消失了
  • 生产者和消费者没有直接依赖关系,不管消费者有没有运行,都不妨碍生产者将消息发给队列
  • 消费者成功接受消息之后需要向队列回应应答成功

在这里插入图片描述

发布订阅模型

三个角色

  • 发布者
  • 订阅者
  • 主题
  • 发布者将消息发布到主题,只有订阅了主题的订阅者才能接收到消息
  • topic来实现消息的发布
  • 当一个消息被发布,n个订阅者都可以得到消息的拷贝

在这里插入图片描述

特点

  • 一个主题可以被多个订阅者订阅
  • 存在时间的先后顺序,消费者需要先订阅,生产者才能发布消息
  • 订阅者必须保持持续运行状态,才能接收到生产者的消息

JMS API

在这里插入图片描述

在这里插入图片描述

访问地址

  • http协议:http://127.0.0.1:8161/(网页监控)
  • tcp协议:tcp://127.0.0.1:61616(java后端访问)

JMS原始方式

点对点queue

引入依赖

  • log4j的依赖也需要引入一下
    <dependencies><!--   引入原生的依赖  5.18版本不受activemq版本的影响   --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>6.1.0</version></dependency><!--   引入log4j的依赖,不引入会报错     --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.17.1</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.1</version></dependency></dependencies>

生产者

  • 后端使用的端口是tcp://localhost:61616
import jakarta.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;// 队列生产者 点对点
public class Producer {public static void main(String[] args) throws JMSException {// 1. 创建连接工厂 默认使用tcp协议ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 2. 创建连接Connection connection = activeMQConnectionFactory.createConnection();// 3. 启动连接connection.start();// 4. 创建会话// 第一个参数:是否开启事务// 第二个参数:消息确认机制Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 创建队列Queue queue = session.createQueue("queue01");// 6. 创建消息生产者MessageProducer producer = session.createProducer(queue);// 7. 创建消息 createTextMessage创建文本消息TextMessage textMessage = session.createTextMessage("Hello, ActiveMQ!");// 8. 发送消息producer.send(textMessage);// 9. 关闭资源 从后往前关闭session.close();connection.close();}
}

下图表示发送消息成功

在这里插入图片描述

消费者

一般方式(不常用)
  • 步骤和生产者如出一辙,就是最后变成了receive方法
  • 使用while true,使得消费者一直处在等待
// 点对点消费者
public class Consumer {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 接受并且消费消息//receive()是阻塞方法,如果没有消息会一直等待// 一般不会关闭消费者,因为一旦关闭就不能接收消息了while (true) {Message textMessage = consumer.receive();if (textMessage == null) {break;}//判断消息类型if (textMessage instanceof TextMessage) {TextMessage message = (TextMessage) textMessage;System.out.println("接收到消息:" + message.getText());}}}
}
监听器(推荐使用)

使用监听器,更加优雅

public class Listener {public static void main(String[] args) throws JMSException {//1. 创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");//2. 创建连接Connection connection = connectionFactory.createConnection();//3. 启动连接connection.start();//4. 创建会话Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5. 指定队列Queue queue = session.createQueue("queue01");//6. 创建消息消费者MessageConsumer consumer = session.createConsumer(queue);//7. 设置监听器 匿名内部类consumer.setMessageListener(new MessageListener() {// 重写onMessage方法 接收消息@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收到消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【Python零基础学习】变量和简单数据类型
  • 【Python】Jupyter Notebook的安装及简单使用
  • 【流媒体协议】RTMP协议概述
  • c++精品小游戏(无错畅玩版)
  • 一文打通pytorch中几个常见的张量操作
  • 第43集《大佛顶首楞严经》
  • 贪吃蛇游戏的实现:C++ 控制台版
  • PyTorch:从零实现一个双向循环神经网络
  • MySQL-MHA高可用配置及故障切换
  • 凸分析与凸优化精解【1】
  • Oracle-OracleConnection
  • JavaScript高阶笔记总结(Xmind格式):第三天
  • 如何在阿里云环境中通过 Jenkins 实现 .NET Core 应用的 Docker 化部署:从 GitLab 拉取代码到自动化 CI/CD 流程的完整指南
  • x264 编码器 SSIM 算法源码分析
  • 【Python】基础语法介绍
  • 【译】JS基础算法脚本:字符串结尾
  • 《剑指offer》分解让复杂问题更简单
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • js学习笔记
  • MySQL常见的两种存储引擎:MyISAM与InnoDB的爱恨情仇
  • React as a UI Runtime(五、列表)
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • use Google search engine
  • 动态魔术使用DBMS_SQL
  • 模型微调
  • 前端面试之闭包
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • 2017年360最后一道编程题
  • ionic异常记录
  • Nginx惊现漏洞 百万网站面临“拖库”风险
  • zabbix3.2监控linux磁盘IO
  • 选择阿里云数据库HBase版十大理由
  • ​Base64转换成图片,android studio build乱码,找不到okio.ByteString接腾讯人脸识别
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • $forceUpdate()函数
  • $GOPATH/go.mod exists but should not goland
  • (2)空速传感器
  • (Mirage系列之二)VMware Horizon Mirage的经典用户用例及真实案例分析
  • (zhuan) 一些RL的文献(及笔记)
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (黑马出品_高级篇_01)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
  • (一)VirtualBox安装增强功能
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • *1 计算机基础和操作系统基础及几大协议
  • .class文件转换.java_从一个class文件深入理解Java字节码结构
  • .NET CORE 第一节 创建基本的 asp.net core
  • .net core 调用c dll_用C++生成一个简单的DLL文件VS2008
  • .net core 依赖注入的基本用发
  • .NET/C# 使用反射调用含 ref 或 out 参数的方法
  • .project文件
  • @modelattribute注解用postman测试怎么传参_接口测试之问题挖掘
  • @Query中countQuery的介绍
  • @RequestBody与@ResponseBody的使用
  • @Resource和@Autowired的区别
  • @Valid和@NotNull字段校验使用