2019独角兽企业重金招聘Python工程师标准>>>
publish/subscribe
特点:A发送的消息可以被所有监听A的对象的接收,就好比学校的广播,所有的学生都可以收听校园广播信息。
消息生产者:
package com.zhiwei.advanced.mq.activemq.sp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 发布/订阅消息模型 测试顺序:先订阅才能收到消息
*/
public class JMSProducer {
private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.password,JMSProducer.brokeURL);
// 创建连接
Connection connection = factory.createConnection();
// 启动连接
connection.start();
// 接受或发送消息的线程
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("FirstTopic"); // 创建消息主题:Destination子类:Queue/Topic
MessageProducer messageProducer = session.createProducer(destination); // 创建消息生产者
// 发送文本消息
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("JMS Provider发送消息:" + i);
System.out.println("JMS Provider发送消息:" + i);
messageProducer.send(message);
}
// 启用事务时session需要提交
session.commit();
session.close();
connection.close();
}
}
消息消费者1:
package com.zhiwei.advanced.mq.activemq.sp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 发布/订阅消息模型
*
* 特別注意:发布订阅消息模型必须先客户端监听,然后主题发送消息
*/
public class JMSConsumer01{
private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer01.user, JMSConsumer01.password,JMSConsumer01.brokeURL); // 链接工厂
Connection connection = factory.createConnection(); // 连接
connection.start(); // 启动连接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务
Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic
MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者
messageConsumer.setMessageListener(new JMSListener()); //注册消息监听 :阻塞监听
}
}
消息消费者2:
package com.zhiwei.advanced.mq.activemq.sp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 发布/订阅消息模型
*/
public class JMSConsumer02{
private final static String user = ActiveMQConnection.DEFAULT_USER; // 默认用户名
private final static String password = ActiveMQConnection.DEFAULT_PASSWORD; // 默认密码
private final static String brokeURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 链接地址
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory(JMSConsumer02.user, JMSConsumer02.password,JMSConsumer02.brokeURL); // 链接工厂
Connection connection = factory.createConnection(); // 连接
connection.start(); // 启动连接
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 接受或发送消息的线程:消费不需事务
Destination destination = session.createTopic("FirstTopic"); // 创建连接消息主题:Destination子类:Queue/Topic
MessageConsumer messageConsumer = session.createConsumer(destination); // 创建消息生产者
messageConsumer.setMessageListener(new JMSListener()); //注册消息监听 :阻塞监听
}
}
消息队列监听器:
package com.zhiwei.advanced.mq.activemq.sp;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class JMSListener implements MessageListener{
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
try {
System.out.println(((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
消息生产者日志:
消息消费者1日志:
消息消费者2日志:
结论
发布/订阅模式下每个消费者都会消费消息队列的所有消息,这个区别于MQ点对点模式下的每一条消息只能消费者消费一次。