ActiveMQ
一、 概述
ActiveMQ是apache出的,市面上流行且能力突出的开源消息总线,可以作为JMS中间件。完全支持JMS1.1和J2EE1.4规范。特色是支持多种语言和协议编写客户端;支持spring;支持多种协议,in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA;支持通过JDBC和journal提供高速的消息持久化;从设计上保证了高性能的集群;支持Ajax;支持与Axis的整合;可以调用内嵌的JMS provider进行测试。
对于消息的传递有两种形式。点到点,即一个生产者和一个消费者一一对应。发布/订阅模式,即一个生产者产生消息并发送后,可以由多个消费者接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。StreamMessage,java原始值的数据流;MapMessage,一套键值对;TextMesage,字符串对象;ObjectMessage,一个序列化的java对象;BytesMessage,一个字节的数据流。
使用消息队列同步索引库、缓存或生成静态页面,相比其他同步方式更简洁、低消耗性能、业务逻辑清晰、解耦合等。
二、 安装
从apache官网下载ActiveMQ。将压缩包上传的服务器,解压。到bin目录下,./activemq start启动。浏览器访问8161端口,可以访问ActiveMQ。默认用户名和密码都是admin。如果在访问activemq时报503,可以到/etc/hosts下将机器名配置到本地ip映射,全的机器名在/etc/sysconfig/network文件中,127.0.0.1 xxx。重启avtivemq服务,再次访问。
三、 使用
activemq有两种方式,一种是topic,发布订阅,一种是queue,点到点。
queue方式的例子
@Test
publicvoid testQueueProducer() throws Exception {
// 1,创建一个连接工厂对象,指定ip和端口号
ConnectionFactory factory = newActiveMQConnectionFactory("tcp://192.168.0.121:61616");
// 2,使用工厂创建一个Connection对象
Connection connection = factory.createConnection();
// 3,开启连接,调用Connection对象的start方法
connection.start();
// 4,创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5,使用Session创建一个Destination对象,选择queue方式
Queue queue = session.createQueue("test-queue");
// 6,使用Session创建一个Producer
MessageProducer producer = session.createProducer(queue);
// 7,创建一个Message对象,可以使用TextMessage
// TextMessage message = new ActiveMQTextMessage();
// message.setText("hello avtivemq");
TextMessage message = session.createTextMessage("hello java");
// 8,发送消息
producer.send(message);
// 9,关闭资源
producer.close();
session.close();
connection.close();
}
@Test
publicvoidtestQueueConsumer() throws Exception {
// 1,创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactory factory = newActiveMQConnectionFactory("tcp://192.168.0.121:61616");
// 2,创建一个连接对象
Connection connection = factory.createConnection();
// 3,开启连接
connection.start();
// 4,使用Connection对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5,创建一个Destination对象,Queue对象
Queue queue = session.createQueue("test-queue");
// 6,使用Session对象创建一个消费者对象
MessageConsumer consumer = session.createConsumer(queue);
// 7,接收消息
consumer.setMessageListener(new MessageListener(){
@Override
publicvoid onMessage(Messagemessage) {
// TODO Auto-generated method stub
TextMessage message2 = (TextMessage) message;
try {
String text = message2.getText();
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
// 8,等待接收消息
System.in.read();
// 9,关闭资源
consumer.close();
session.close();
connection.close();
}
topic方式的例子
@Test
public voidtestTopicProducer() throws Exception {
// 1,创建一个连接工厂对象,指定ip和端口号
ConnectionFactoryfactory = new ActiveMQConnectionFactory("tcp://192.168.0.121:61616");
// 2,使用工厂创建一个Connection对象
Connection connection= factory.createConnection();
// 3,开启连接,调用Connection对象的start方法
connection.start();
// 4,创建一个Session对象
Session session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5,使用Session创建一个Destination对象,选择topic方式
Topic topic = session.createTopic("test-topic");
// 6,使用Session创建一个Producer
MessageProducer producer= session.createProducer(topic);
// 7,创建一个Message对象,可以使用TextMessage
// TextMessagemessage = new ActiveMQTextMessage();
//message.setText("hello avtivemq");
TextMessage message= session.createTextMessage("hello topic");
// 8,发送消息
producer.send(message);
// 9,关闭资源
producer.close();
session.close();
connection.close();
}
@Test
public voidtestTopicConsumer() throws Exception {
// 1,创建一个ConnectionFactory对象连接MQ服务器
ConnectionFactoryfactory = new ActiveMQConnectionFactory("tcp://192.168.0.121:61616");
// 2,创建一个连接对象
Connection connection= factory.createConnection();
// 3,开启连接
connection.start();
// 4,使用Connection对象创建一个Session对象
Session session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5,创建一个Destination对象,topic对象
Topic topic = session.createTopic("test-topic");
// 6,使用Session对象创建一个消费者对象
MessageConsumer consumer= session.createConsumer(topic);
// 7,接收消息
consumer.setMessageListener(newMessageListener() {
@Override
public voidonMessage(Message message) {
//TODO Auto-generated method stub
TextMessagemessage2 = (TextMessage) message;
try{
Stringtext = message2.getText();
System.out.println(text);
}catch (JMSException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.out.println("topic消费者2监听中...");
// 8,等待接收消息
System.in.read();
// 9,关闭资源
consumer.close();
session.close();
connection.close();
}
queue方式和topic方式的区别,queue方式,消息发送后,如果没有被消费会保存在服务器,只有当消息被消费后才清除。topic方式,消息发送后,不管有没有被消费都不会保存在服务器。使用topic方式,也可以实现发送后保存到服务器,需要客户端配置clientid,做消息订阅。
四、 ActiveMQ整合spring
在maven工程下,需要引入activiemq相关的jar的依赖。
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
在资源目录下配置spring整合ActiveMQ的配置文件。
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<propertyname="brokerURL" value="tcp://192.168.0.121:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!--目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<propertyname="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<!--这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<propertyname="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination"class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination"class="org.apache.activemq.command.ActiveMQTopic">
<constructor-argvalue="topic" />
</bean>
测试
测试消息生产者
@Test
public void sendMessage() throwsException {
// 初始化spring容器
ApplicationContextcontext = new ClassPathXmlApplicationContext(
"classpath:spring/applicationContext-activemq.xml");
// 获得JmsTemplate对象
JmsTemplate template= context.getBean(JmsTemplate.class);
// 获得Destination对象
Destination destination= (Destination) context.getBean("queueDestination");
// 发送消息
template.send(destination,new MessageCreator() {
@Override
publicMessage createMessage(Session session) throws JMSException {
returnsession.createTextMessage("hello spring activemq queue");
}
});
}
测试消息接收者,需要配置消息侦听的bean。
先编写一个消息侦听的实现类
public class MyMessageListener implements MessageListener {
@Override
public voidonMessage(Message message) {
// TODOAuto-generated method stub
TextMessage message2= (TextMessage) message;
try {
String text= message2.getText();
System.out.println(text);
} catch(JMSException e) {
// TODOAuto-generated catch block
e.printStackTrace();
}
}
}
在spring中装配消息侦听和配置默认消息侦听容器
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL"value="tcp://192.168.0.121:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory"ref="targetConnectionFactory" />
</bean>
<!-- 配置生产者 -->
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory"ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 -->
<bean id="queueDestination"class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>spring-queue</value>
</constructor-arg>
</bean>
<!--这个是主题目的地,一对多的 -->
<bean id="topicDestination"class="org.apache.activemq.command.ActiveMQTopic">
<constructor-argvalue="topic" />
</bean>
<!-- 自定义的消息侦听实现类 -->
<bean id="myMessageListener" class="com.ten.e3shop.search.message.MyMessageListener"></bean>
<!-- 消息侦听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory"ref="connectionFactory" />
<property name="destination"ref="queueDestination" />
<property name="messageListener"ref="myMessageListener" />
</bean>
测试接收消息
@Test
public void testConsumer()throws Exception {
// 初始化spring容器
ApplicationContextcontext = new ClassPathXmlApplicationContext(
"classpath:spring/applicationContext-activemq.xml");
MyMessageListenerlistener = context.getBean(MyMessageListener.class);
System.in.read();
}
五、 同步索引库的实现例子
创建一个侦听添加商品信息的实现类,注意为了等待数据插入事务提交,做了1s的等待。
public class ItemAddMessageListener implements MessageListener {
@Autowired
private ItemMapper itemMapper;
@Autowired
private SolrServer solrServer;
@Override
public voidonMessage(Message message) {
// TODOAuto-generated method stub
TextMessage textMessage= (TextMessage) message;
try {
// 接收商品id
String text= textMessage.getText();
long itemId= Long.parseLong(text);
// 等待插入数据库事务提交,否则还没有插入数据库,查询得到null,使用null报异常
Thread.sleep(1000);
// 根据id查询商品信息
SearchItemitem = itemMapper.getItemById(itemId);
// 创建一个文档对象
SolrInputDocumentdocument = new SolrInputDocument();
// 将商品信息写入文档的域
document.addField("id",item.getId());
document.addField("item_title",item.getTitle());
document.addField("item_sell_point",item.getSell_point());
document.addField("item_price",item.getPrice());
document.addField("item_image",item.getImage());
document.addField("item_category_name",item.getCategory_name());
// 将文档写入索引库
solrServer.add(document);
// 提交
solrServer.commit();
} catch(Exception e) {
// TODOAuto-generated catch block
e.printStackTrace();
}
}
}
在springmvc中配置这个侦听类,并加载到侦听容器。
<!-- 自定义的消息侦听实现类 -->
<bean id="itemAddMessageListener" class="com.ten.e3shop.search.message.ItemAddMessageListener">
</bean>
<!-- 消息侦听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory"ref="connectionFactory" />
<property name="destination"ref="topicDestination" />
<property name="messageListener"ref="itemAddMessageListener" />
</bean>
在添加商品的controller方法中或者service方法中发送消息。如果在controller方法中发送消息,不需要上述接收消息侦听中的等待1s。
// 发送商品添加消息
jmsTemplate.send(topicDestination,new MessageCreator() {
@Override
publicMessage createMessage(Session session) throws JMSException {
TextMessagemessage = session.createTextMessage(Long.toString(id));
returnmessage;
}
});
同样的,商品的增和删中,也可以使用ActiveMQ做同步索引。