当前位置: 首页 > news >正文

学习ActiveMQ(三):发布/订阅模式(topic)演示

 

  1.在这个项目中新增两个java类,主题生产者和主题消费者:

  2.和点对点的代码差别并不大,所以将消费者和生产者的分别代码拷入新增的java类中,再修改就好了。

appProducerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appProducerTopic {

    private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式
    private static final String topicName = "topic-test";//定义主题的名称

    public static void main(String[] args) throws  JMSException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建destination
        Destination destination = session.createTopic(topicName);
        //6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i = 0; i < 100; i++) {

            TextMessage textMessage = session.createTextMessage("test" + i);
            //7.发送消息
            producer.send(textMessage);

            System.out.println("发送消息" + textMessage.getText());

        }
        //8.关闭连接
        connection.close();
    }
}
appConsumerTopic代码:标红字体是做出了修改,由创建队列改为了创建主题。
package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appConsumerTopic {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";//定义主题的名称

    public static void main(String[] args) throws  JMSException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建destination
        Destination destination = session.createTopic(topicName);
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收到的消息:" + textMessage.getText());

                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了)
        //connection.close();
    }
}

  3.测试

  首先启动消费者这个java类,观察控制台,如下图:

 

  接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。

 

  此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。

   6.打开activemq的控制台查看topics:(http://127.0.0.1:8161/admin/topics.jsp)如下图所示:有一个名字是我们设置的queue-test的主题,消费者也有一个就是我们创建的那个消费者类,主题中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。

  7.那么如果我启动了两个订阅相同的消费者呢?为了结果能清晰,重启activemq服务,关掉之前的Java类启动,然后启动两边消费者,再启动一个生产者。如下图:生产者生产了100条消息。

 

  8.分别看看两个消费者的接收消息,如下两张图:两个消费者都接受到了一模一样的100条消息。

  9.总结:主题订阅发布模式,有多个消费的订阅相同时,消费者不会相互相互影响,都会分别接收到生产者的全部消息。

转载于:https://www.cnblogs.com/liuyuan1227/p/10740493.html

相关文章:

  • 【本人秃顶程序员】Java面试 32个核心必考点完全解析
  • 加一度分享竞价工作自检清单,升职加薪指日可待
  • windows下nginx的安装及使用(转载)
  • Mobx初探
  • 【云吞铺子】性能抖动剖析(二)
  • react-nativeAndroid打包报错:Daemon: AAPT2 aapt2-3.2.1-4818971-osx Daemon #0
  • 查看文件权限
  • RocketMQ快速上手
  • 4-4 重构get请求+格式化响应数据
  • Redis安装部署踩坑
  • “僕は存在していなかった”-22/7-CG音乐视频制作
  • 数位dp-Bomb
  • TNW-微信公众号各种消息交互
  • 221 springCloud 学习
  • 2018-2019-2 网络对抗技术 20165324 Exp6:信息收集与漏洞扫描
  • [ 一起学React系列 -- 8 ] React中的文件上传
  • 10个最佳ES6特性 ES7与ES8的特性
  • CODING 缺陷管理功能正式开始公测
  • go append函数以及写入
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • Java超时控制的实现
  • Java的Interrupt与线程中断
  • 订阅Forge Viewer所有的事件
  • 多线程事务回滚
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 解析带emoji和链接的聊天系统消息
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 区块链共识机制优缺点对比都是什么
  • 一些关于Rust在2019年的思考
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • Nginx实现动静分离
  • ​io --- 处理流的核心工具​
  • ​渐进式Web应用PWA的未来
  • ![CDATA[ ]] 是什么东东
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (done) NLP “bag-of-words“ 方法 (带有二元分类和多元分类两个例子)词袋模型、BoW
  • (done) 两个矩阵 “相似” 是什么意思?
  • (五)MySQL的备份及恢复
  • (一) springboot详细介绍
  • (一)ClickHouse 中的 `MaterializedMySQL` 数据库引擎的使用方法、设置、特性和限制。
  • (原創) 如何動態建立二維陣列(多維陣列)? (.NET) (C#)
  • (转)如何上传第三方jar包至Maven私服让maven项目可以使用第三方jar包
  • .Net core 6.0 升8.0
  • .NET 中 GetHashCode 的哈希值有多大概率会相同(哈希碰撞)
  • .NetCore项目nginx发布
  • .NET微信公众号开发-2.0创建自定义菜单
  • @31省区市高考时间表来了,祝考试成功
  • @select 怎么写存储过程_你知道select语句和update语句分别是怎么执行的吗?
  • @SuppressWarnings注解
  • @开发者,一文搞懂什么是 C# 计时器!
  • [AIGC] MySQL存储引擎详解
  • [BT]BUUCTF刷题第9天(3.27)
  • [C#]猫叫人醒老鼠跑 C#的委托及事件
  • [Django开源学习 1]django-vue-admin
  • [elastic 8.x]java客户端连接elasticsearch与操作索引与文档