当前位置: 首页 > news >正文

【ActiveMQ】- 发布/订阅模式

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

publish/subscribe

特点:A发送的消息可以被所有监听A的对象的接收,就好比学校的广播,所有的学生都可以收听校园广播信息。


消息生产者:

package com.zhiwei.advanced.mq.activemq.sp;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布/订阅消息模型 测试顺序:先订阅才能收到消息
 */
public class JMSProducer {

	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址

	public static void main(String[] args) throws Exception {

		 // 连接工厂
		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.password,JMSProducer.brokeURL);
		
		// 创建连接
		Connection connection = factory.createConnection(); 
		
		// 启动连接
		connection.start(); 
		
		// 接受或发送消息的线程
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		
		Destination destination = session.createTopic("FirstTopic"); // 创建消息主题:Destination子类:Queue/Topic

		MessageProducer messageProducer = session.createProducer(destination); // 创建消息生产者

		// 发送文本消息
		for (int i = 0; i < 10; i++) {
			
			TextMessage message = session.createTextMessage("JMS Provider发送消息:" + i);
			
			System.out.println("JMS Provider发送消息:" + i);
			
			messageProducer.send(message);
		}

		// 启用事务时session需要提交
		session.commit();
		session.close();
		connection.close();
	}

}

消息消费者1:

package com.zhiwei.advanced.mq.activemq.sp;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布/订阅消息模型
 * 
 * 特別注意:发布订阅消息模型必须先客户端监听,然后主题发送消息
 */
public class JMSConsumer01{

	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址

	public static void main(String[] args) throws Exception {

		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer01.user, JMSConsumer01.password,JMSConsumer01.brokeURL); // 链接工厂
		Connection connection = factory.createConnection(); // 连接
		connection.start(); // 启动连接
	
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务
		Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic

		MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者
        messageConsumer.setMessageListener(new JMSListener());   //注册消息监听 :阻塞监听    
	}

}

消息消费者2:

package com.zhiwei.advanced.mq.activemq.sp;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 发布/订阅消息模型
 */
public class JMSConsumer02{

	private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
	private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
	private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址

	public static void main(String[] args) throws Exception {

		ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer02.user, JMSConsumer02.password,JMSConsumer02.brokeURL); // 链接工厂
		Connection connection = factory.createConnection(); // 连接
		connection.start(); // 启动连接
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务
		Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic

		MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者
        messageConsumer.setMessageListener(new JMSListener());   //注册消息监听 :阻塞监听    
	}

}

消息队列监听器:

package com.zhiwei.advanced.mq.activemq.sp;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class JMSListener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		
		if(message instanceof TextMessage){
			try {
				System.out.println(((TextMessage) message).getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		
	}
}

消息生产者日志:

消息消费者1日志:

消息消费者2日志:


结论

发布/订阅模式下每个消费者都会消费消息队列的所有消息,这个区别于MQ点对点模式下的每一条消息只能消费者消费一次。

转载于:https://my.oschina.net/yangzhiwei256/blog/3014202

相关文章:

  • 效能改进之项目例会导入实践
  • iOS | NSProxy
  • Java I/O输入输出流
  • conda常用的命令
  • “寒冬”下的金三银四跳槽季来了,帮你客观分析一下局面
  • 零代码玩转数据可视化
  • Dubbo 安装ZooKeeper环境
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • 程序猿福利来啦,神目AI开放平台免费送人脸识别SDK啦
  • java异常
  • Go test 命令行参数
  • 观察者模式与发布/订阅模式学习
  • go标准库的学习-runtime
  • java多线程
  • 喜讯:以太坊“君士坦丁堡”升级,截止目前稳定运转
  • [笔记] php常见简单功能及函数
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • Apache Zeppelin在Apache Trafodion上的可视化
  • ECMAScript6(0):ES6简明参考手册
  • extract-text-webpack-plugin用法
  • Invalidate和postInvalidate的区别
  • JAVA 学习IO流
  • JavaScript工作原理(五):深入了解WebSockets,HTTP/2和SSE,以及如何选择
  • JAVA并发编程--1.基础概念
  • Redis字符串类型内部编码剖析
  • Spring声明式事务管理之一:五大属性分析
  • Vue 2.3、2.4 知识点小结
  • VUE es6技巧写法(持续更新中~~~)
  • web标准化(下)
  • Windows Containers 大冒险: 容器网络
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 利用jquery编写加法运算验证码
  • 漂亮刷新控件-iOS
  • 如何进阶一名有竞争力的程序员?
  • 算法---两个栈实现一个队列
  • 听说你叫Java(二)–Servlet请求
  • 问题之ssh中Host key verification failed的解决
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • 写给高年级小学生看的《Bash 指南》
  • 智能合约Solidity教程-事件和日志(一)
  • 扩展资源服务器解决oauth2 性能瓶颈
  • ​批处理文件中的errorlevel用法
  • #define、const、typedef的差别
  • #if和#ifdef区别
  • #NOIP 2014#day.2 T1 无限网络发射器选址
  • #多叉树深度遍历_结合深度学习的视频编码方法--帧内预测
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • (2022版)一套教程搞定k8s安装到实战 | RBAC
  • (6)添加vue-cookie
  • (C语言)求出1,2,5三个数不同个数组合为100的组合个数
  • (pojstep1.3.1)1017(构造法模拟)
  • (笔记)Kotlin——Android封装ViewBinding之二 优化
  • (分享)自己整理的一些简单awk实用语句
  • (附源码)node.js知识分享网站 毕业设计 202038
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统