当前位置: 首页 > news >正文

ActiveMQ

一、  概述

ActiveMQ是apache出的,市面上流行且能力突出的开源消息总线,可以作为JMS中间件。完全支持JMS1.1和J2EE1.4规范。特色是支持多种语言和协议编写客户端;支持spring;支持多种协议,in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA;支持通过JDBC和journal提供高速的消息持久化;从设计上保证了高性能的集群;支持Ajax;支持与Axis的整合;可以调用内嵌的JMS  provider进行测试。

对于消息的传递有两种形式。点到点,即一个生产者和一个消费者一一对应。发布/订阅模式,即一个生产者产生消息并发送后,可以由多个消费者接收。

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。StreamMessage,java原始值的数据流;MapMessage,一套键值对;TextMesage,字符串对象;ObjectMessage,一个序列化的java对象;BytesMessage,一个字节的数据流。

使用消息队列同步索引库、缓存或生成静态页面,相比其他同步方式更简洁、低消耗性能、业务逻辑清晰、解耦合等。

二、  安装

从apache官网下载ActiveMQ。将压缩包上传的服务器,解压。到bin目录下,./activemq  start启动。浏览器访问8161端口,可以访问ActiveMQ。默认用户名和密码都是admin。如果在访问activemq时报503,可以到/etc/hosts下将机器名配置到本地ip映射,全的机器名在/etc/sysconfig/network文件中,127.0.0.1  xxx。重启avtivemq服务,再次访问。

三、  使用

activemq有两种方式,一种是topic,发布订阅,一种是queue,点到点。

queue方式的例子

    @Test

    publicvoid testQueueProducer() throws Exception {

        // 1,创建一个连接工厂对象,指定ip和端口号

        ConnectionFactory factory = newActiveMQConnectionFactory("tcp://192.168.0.121:61616");

        // 2,使用工厂创建一个Connection对象

        Connection connection = factory.createConnection();

        // 3,开启连接,调用Connection对象的start方法

        connection.start();

        // 4,创建一个Session对象

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5,使用Session创建一个Destination对象,选择queue方式

        Queue queue = session.createQueue("test-queue");

        // 6,使用Session创建一个Producer

        MessageProducer producer = session.createProducer(queue);

        // 7,创建一个Message对象,可以使用TextMessage

        // TextMessage message = new ActiveMQTextMessage();

        // message.setText("hello avtivemq");

        TextMessage message = session.createTextMessage("hello java");

        // 8,发送消息

        producer.send(message);

        // 9,关闭资源

        producer.close();

        session.close();

        connection.close();

    }



    @Test

    publicvoidtestQueueConsumer() throws Exception {

        // 1,创建一个ConnectionFactory对象连接MQ服务器

        ConnectionFactory factory = newActiveMQConnectionFactory("tcp://192.168.0.121:61616");

        // 2,创建一个连接对象

        Connection connection = factory.createConnection();

        // 3,开启连接

        connection.start();

        // 4,使用Connection对象创建一个Session对象

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5,创建一个Destination对象,Queue对象

        Queue queue = session.createQueue("test-queue");

        // 6,使用Session对象创建一个消费者对象

        MessageConsumer consumer = session.createConsumer(queue);

        // 7,接收消息

        consumer.setMessageListener(new MessageListener(){



            @Override

            publicvoid onMessage(Messagemessage) {

                // TODO Auto-generated method stub

                TextMessage message2 = (TextMessage) message;

                try {

                    String text = message2.getText();

                    System.out.println(text);

                } catch (JMSException e) {

                    // TODO Auto-generated catch block

                    e.printStackTrace();

                }

            }

        });

        // 8,等待接收消息

        System.in.read();

        // 9,关闭资源

        consumer.close();

        session.close();

        connection.close();

    }

topic方式的例子

         @Test

         public voidtestTopicProducer() throws Exception {

                   // 1,创建一个连接工厂对象,指定ip和端口号

                   ConnectionFactoryfactory = new ActiveMQConnectionFactory("tcp://192.168.0.121:61616");

                   // 2,使用工厂创建一个Connection对象

                   Connection connection= factory.createConnection();

                   // 3,开启连接,调用Connection对象的start方法

                   connection.start();

                   // 4,创建一个Session对象

                   Session session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                   // 5,使用Session创建一个Destination对象,选择topic方式

                   Topic topic = session.createTopic("test-topic");

                   // 6,使用Session创建一个Producer

                   MessageProducer producer= session.createProducer(topic);

                   // 7,创建一个Message对象,可以使用TextMessage

                   // TextMessagemessage = new ActiveMQTextMessage();

                   //message.setText("hello avtivemq");

                   TextMessage message= session.createTextMessage("hello topic");

                   // 8,发送消息

                   producer.send(message);

                   // 9,关闭资源

                   producer.close();

                   session.close();

                   connection.close();

         }



         @Test

         public voidtestTopicConsumer() throws Exception {

                   // 1,创建一个ConnectionFactory对象连接MQ服务器

                   ConnectionFactoryfactory = new ActiveMQConnectionFactory("tcp://192.168.0.121:61616");

                   // 2,创建一个连接对象

                   Connection connection= factory.createConnection();

                   // 3,开启连接

                   connection.start();

                   // 4,使用Connection对象创建一个Session对象

                   Session session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                   // 5,创建一个Destination对象,topic对象

                   Topic topic = session.createTopic("test-topic");

                   // 6,使用Session对象创建一个消费者对象

                   MessageConsumer consumer= session.createConsumer(topic);

                   // 7,接收消息

                   consumer.setMessageListener(newMessageListener() {



                            @Override

                            public voidonMessage(Message message) {

                                     //TODO Auto-generated method stub

                                     TextMessagemessage2 = (TextMessage) message;

                                     try{

                                               Stringtext = message2.getText();

                                               System.out.println(text);

                                     }catch (JMSException e) {

                                               //TODO Auto-generated catch block

                                               e.printStackTrace();

                                     }

                            }

                   });

                   System.out.println("topic消费者2监听中...");

                   // 8,等待接收消息

                   System.in.read();

                   // 9,关闭资源

                   consumer.close();

                   session.close();

                   connection.close();

         }

queue方式和topic方式的区别,queue方式,消息发送后,如果没有被消费会保存在服务器,只有当消息被消费后才清除。topic方式,消息发送后,不管有没有被消费都不会保存在服务器。使用topic方式,也可以实现发送后保存到服务器,需要客户端配置clientid,做消息订阅。

四、  ActiveMQ整合spring

在maven工程下,需要引入activiemq相关的jar的依赖。

   <dependency>

                   <groupId>org.springframework</groupId>

                   <artifactId>spring-jms</artifactId>

         </dependency>

         <dependency>

                   <groupId>org.springframework</groupId>

                   <artifactId>spring-context-support</artifactId>

         </dependency>

         <dependency>

                   <groupId>org.apache.activemq</groupId>

                   <artifactId>activemq-all</artifactId>

         </dependency>

在资源目录下配置spring整合ActiveMQ的配置文件。

      <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->

                   <bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">

                            <propertyname="brokerURL" value="tcp://192.168.0.121:61616" />

                   </bean>

                   <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

                   <bean id="connectionFactory"

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <!--目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

                            <propertyname="targetConnectionFactory" ref="targetConnectionFactory"/>

                   </bean>

                   <!-- 配置生产者 -->

                   <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

                   <bean id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">

                            <!--这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

                            <propertyname="connectionFactory" ref="connectionFactory" />

                   </bean>

                  

                   <!--这个是队列目的地,点对点的 -->

                   <bean id="queueDestination"class="org.apache.activemq.command.ActiveMQQueue">

                            <constructor-arg>

                                     <value>spring-queue</value>

                            </constructor-arg>

                   </bean>

                   <!--这个是主题目的地,一对多的 -->

                   <bean id="topicDestination"class="org.apache.activemq.command.ActiveMQTopic">

                            <constructor-argvalue="topic" />

                   </bean>

测试

测试消息生产者

         @Test

         public void sendMessage() throwsException {

                   // 初始化spring容器

                   ApplicationContextcontext = new ClassPathXmlApplicationContext(

                                     "classpath:spring/applicationContext-activemq.xml");

                   // 获得JmsTemplate对象

                   JmsTemplate template= context.getBean(JmsTemplate.class);

                   // 获得Destination对象

                   Destination destination= (Destination) context.getBean("queueDestination");

                   // 发送消息

                   template.send(destination,new MessageCreator() {



                            @Override

                            publicMessage createMessage(Session session) throws JMSException {

                                     returnsession.createTextMessage("hello spring activemq queue");

                            }

                   });

         }

测试消息接收者,需要配置消息侦听的bean。

先编写一个消息侦听的实现类

public class MyMessageListener implements MessageListener {



         @Override

         public voidonMessage(Message message) {

                   // TODOAuto-generated method stub

                   TextMessage message2= (TextMessage) message;

                   try {

                            String text= message2.getText();

                            System.out.println(text);

                   } catch(JMSException e) {

                            // TODOAuto-generated catch block

                            e.printStackTrace();

                   }

         }



}

在spring中装配消息侦听和配置默认消息侦听容器

     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->

         <bean id="targetConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">

                   <property name="brokerURL"value="tcp://192.168.0.121:61616" />

         </bean>

         <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->

         <bean id="connectionFactory"

                   class="org.springframework.jms.connection.SingleConnectionFactory">

                   <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->

                   <property name="targetConnectionFactory"ref="targetConnectionFactory" />

         </bean>

         <!-- 配置生产者 -->

         <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->

         <bean id="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">

                   <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->

                   <property name="connectionFactory"ref="connectionFactory" />

         </bean>

        

         <!--这个是队列目的地,点对点的 -->

         <bean id="queueDestination"class="org.apache.activemq.command.ActiveMQQueue">

                   <constructor-arg>

                            <value>spring-queue</value>

                   </constructor-arg>

         </bean>

         <!--这个是主题目的地,一对多的 -->

         <bean id="topicDestination"class="org.apache.activemq.command.ActiveMQTopic">

                   <constructor-argvalue="topic" />

         </bean>

      <!-- 自定义的消息侦听实现类 -->

      <bean id="myMessageListener" class="com.ten.e3shop.search.message.MyMessageListener"></bean>

      <!-- 消息侦听容器 -->

      <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">

                   <property name="connectionFactory"ref="connectionFactory" />

                   <property name="destination"ref="queueDestination" />

                   <property name="messageListener"ref="myMessageListener" />

         </bean>

测试接收消息

         @Test

         public void testConsumer()throws Exception {

                   // 初始化spring容器

                   ApplicationContextcontext = new ClassPathXmlApplicationContext(

                                     "classpath:spring/applicationContext-activemq.xml");

                   MyMessageListenerlistener = context.getBean(MyMessageListener.class);

                   System.in.read();

         }

五、  同步索引库的实现例子

创建一个侦听添加商品信息的实现类,注意为了等待数据插入事务提交,做了1s的等待。

public class ItemAddMessageListener implements MessageListener {



         @Autowired

         private ItemMapper itemMapper;

         @Autowired

         private SolrServer solrServer;



         @Override

         public voidonMessage(Message message) {

                   // TODOAuto-generated method stub

                   TextMessage textMessage= (TextMessage) message;

                   try {

                            // 接收商品id

                            String text= textMessage.getText();

                            long itemId= Long.parseLong(text);

                            // 等待插入数据库事务提交,否则还没有插入数据库,查询得到null,使用null报异常

                            Thread.sleep(1000);

                            // 根据id查询商品信息

                            SearchItemitem = itemMapper.getItemById(itemId);

                            // 创建一个文档对象

                            SolrInputDocumentdocument = new SolrInputDocument();

                            // 将商品信息写入文档的域

                            document.addField("id",item.getId());

                            document.addField("item_title",item.getTitle());

                            document.addField("item_sell_point",item.getSell_point());

                            document.addField("item_price",item.getPrice());

                            document.addField("item_image",item.getImage());

                            document.addField("item_category_name",item.getCategory_name());

                            // 将文档写入索引库

                            solrServer.add(document);

                            // 提交

                            solrServer.commit();

                   } catch(Exception e) {

                            // TODOAuto-generated catch block

                            e.printStackTrace();

                   }

         }



}

在springmvc中配置这个侦听类,并加载到侦听容器。

      <!-- 自定义的消息侦听实现类 -->

      <bean id="itemAddMessageListener" class="com.ten.e3shop.search.message.ItemAddMessageListener">

      </bean>

      <!-- 消息侦听容器 -->

      <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">

                   <property name="connectionFactory"ref="connectionFactory" />

                   <property name="destination"ref="topicDestination" />

                   <property name="messageListener"ref="itemAddMessageListener" />

      </bean>

在添加商品的controller方法中或者service方法中发送消息。如果在controller方法中发送消息,不需要上述接收消息侦听中的等待1s。

            // 发送商品添加消息

                   jmsTemplate.send(topicDestination,new MessageCreator() {



                            @Override

                            publicMessage createMessage(Session session) throws JMSException {

                                     TextMessagemessage = session.createTextMessage(Long.toString(id));

                                     returnmessage;

                            }

                   });

同样的,商品的增和删中,也可以使用ActiveMQ做同步索引。

相关文章:

  • 删除数据库重复记录
  • session共享问题
  • springmvc中ajax响应json报406错误的两种原因
  • 数据库切片
  • tomcat热部署
  • c++
  • linux系统编程
  • proc*c/c++简介
  • [one_demo_14]一个简单的easyui的demo
  • ztree
  • PowerDesigner
  • POI简单介绍
  • shiro
  • ehcache
  • highcharts
  • php的引用
  • 【跃迁之路】【477天】刻意练习系列236(2018.05.28)
  • Android Studio:GIT提交项目到远程仓库
  • Apache的基本使用
  • es6--symbol
  • go语言学习初探(一)
  • Gradle 5.0 正式版发布
  • HomeBrew常规使用教程
  • linux学习笔记
  • Promise面试题2实现异步串行执行
  • python3 使用 asyncio 代替线程
  • redis学习笔记(三):列表、集合、有序集合
  • spring boot下thymeleaf全局静态变量配置
  • ubuntu 下nginx安装 并支持https协议
  • 分布式熔断降级平台aegis
  • 记一次和乔布斯合作最难忘的经历
  • 区块链将重新定义世界
  • 微信小程序--------语音识别(前端自己也能玩)
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 字符串匹配基础上
  • 【运维趟坑回忆录 开篇】初入初创, 一脸懵
  • 整理一些计算机基础知识!
  • ​Python 3 新特性:类型注解
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (33)STM32——485实验笔记
  • (delphi11最新学习资料) Object Pascal 学习笔记---第8章第5节(封闭类和Final方法)
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (十)【Jmeter】线程(Threads(Users))之jp@gc - Stepping Thread Group (deprecated)
  • (已解决)什么是vue导航守卫
  • (原創) 如何優化ThinkPad X61開機速度? (NB) (ThinkPad) (X61) (OS) (Windows)
  • (转)Windows2003安全设置/维护
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .mysql secret在哪_MYSQL基本操作(上)
  • .NET 反射 Reflect
  • .net 验证控件和javaScript的冲突问题
  • .NET/C# 在 64 位进程中读取 32 位进程重定向后的注册表
  • .NET下ASPX编程的几个小问题
  • .NET下的多线程编程—1-线程机制概述
  • .NET项目中存在多个web.config文件时的加载顺序