当前位置: 首页 > news >正文

ActiveMQ 与 Spring

1. ActiveMQ安装

1.1 下载(版本5.14.5)

点我官网下载

1.2 安装

解压下载的压缩文件到任意目录中(eg. C:\Program Files (x86)\apache-activemq-5.14.5),进入%ACTIVEMQ_HOME%/bin目录,根据自己的系统位数,进入32/64目录,点击activemq.bat启动ActiveMQ;

2. ActiveMQ与Spring整合使用

2.1 在Maven中添加ActiveMQ和JMS相关的pom,如下:
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.2.5.RELEASE</version>
        <!--<version>{spring.version}</version>-->
        </dependency>

        <!-- xbean 如<amq:connectionFactory /> -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.12.1</version>
        </dependency>复制代码
2.2 添加配置文件spring-activemq.xml

在配置文件中加入以下配置信息,每个配置信息都有具体的解释:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
          http://activemq.apache.org/schema/core 
          http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--配置连接ActiveMQ的连接基本信息 -->
    <amq:connectionFactory id="amqConnectionFactory"
        brokerURL="tcp://localhost:61616" userName="admin" password="admin" />

    <!-- 配置JMS连接工厂 -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义消息队列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>testQueue</value>
        </constructor-arg>
    </bean>

    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默认是false -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 配置消息队列监听者(Queue) -->
    <!-- 打开监听器,会立即去消费消息(即,起到实时消费通信的作用) -->
    <!-- <bean id="queueMessageListener" class="com.hp.common.listener.QueueMessageListener"></bean> -->

    <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
    <!-- <bean id="queueListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>
 -->
</beans>复制代码

注:在配置文件中,一定不要忘记加入ActiveMQ和JMS相关的schema

2.3 创建Producer和Consumer相关的Service

创建ProducerService,用于发送信息到消息中心

@Service
public class ProducerService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    private Queue queue;

    /**
     * 根据目的地发送消息
     */
    public void sendMessage(Destination destination, final String msg) {
        System.out.println(Thread.currentThread().getName() + " 向队列" + destination.toString()
                + "发送消息------->" + msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
    public String send(String userId, String msg) {
        System.out.println(
                Thread.currentThread().getName() + " 向 " + userId + " 的队列" + userId.toString() + "发送消息------>" + msg);
        queue = new ActiveMQQueue(userId);
        jmsTemplate.send(queue, new MessageCreator() {

            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message=session.createTextMessage(msg);
                message.setStringProperty(userId, msg);
                return message;
            }
        });
        return "发送成功";

    }

    /**
     * 向默认目的地发送消息
     */
    public String sendMessage(final String msg) {
        String destination = jmsTemplate.getDefaultDestinationName();
        System.out
                .println(Thread.currentThread().getName() + " 向队列" + destination + "发送消息---------------------->" + msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
        return "发送成功";
    }

}复制代码

创建ConsumerService,用于接受消息


@Service
public class ConsumerService{

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    public String receive(Destination destination) {
        TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
        try {
            System.out.println("从队列" + destination.toString() + "收到了消息:\t" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return textMessage.toString();
    }

    public String receive(String userId) {
        Queue queue=new ActiveMQQueue(userId+"?consumer.prefetchSize=4");
        Message message = null;
        String property=null;

        try {

             message=jmsTemplate.receive(queue);
             property=message.getStringProperty(userId);
            System.out.println("从队列" + queue.toString() + "收到了消息:\t" + property);
        } catch (JMSException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        return property;
    }

}复制代码
2.4 添加Controller,用于曝露接口
@Controller
@RequestMapping(value="/mq")
public class MessageController {
    private Logger logger = Logger.getLogger(MessageController.class);

    @Resource(name = "demoQueueDestination")
    private Destination destination;

    @Autowired
    private ProducerService producer;

    @Autowired
    private ConsumerService consumer;


    @RequestMapping(value = "/SendMessage", method = RequestMethod.POST,produces="application/json")
    @ResponseBody
    public void send(@RequestParam(value = "userId",required=false)String userId,@RequestParam(value = "msg")String msg) {
        logger.info(Thread.currentThread().getName() + "------------send to jms Start");
        if (userId==null||"".equals(userId)) {
            producer.sendMessage(destination, msg);
        }else {
            producer.send(userId, msg);
        }

        logger.info(Thread.currentThread().getName() + "------------send to jms End");
    }

    @RequestMapping(value = "/ReceiveMessage", method = RequestMethod.GET)
    @ResponseBody
    public Object receive(@RequestParam(value = "userId",required=false)String userId) {
        logger.info(Thread.currentThread().getName() + "------------receive from jms Start");
        String tm=null;
        if (userId==null||"".equals(userId)) {
             tm = consumer.receive(destination);
        } else {
             tm = consumer.receive(userId);
        }
        logger.info(Thread.currentThread().getName() + "------------receive from jms End");
        return tm.toString();
    }

}复制代码
2.5 配置监听器(ek)

如果在配置文件中打开了监听器的注释,即打开监听器,消费者会立即去消费消息,则还需要添加如下代码:

public class QueueMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        TextMessage tm=(TextMessage) message;
        try {
            System.out.println("QueueMessageListener监听到了文本消息:\t"
                    + tm.getText());
            //do other work
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}复制代码

3. 测试

启动tomcat,将Javaweb项目运行在tomcat中,通过postman测试接口和方法
接受消息接口:http://localhost:8080/`{project_neme}`/mq/ReceiveMessage?userId={消息队列名称}
发送消息接口:http://localhost:8080/`{project_neme}`/mq/SendMessage?userId={消息队列名称}&msg={参数}

4. 其他场景及技术应用

场景1: 对于mq队列中的消息,系统需要做一些监控或者问题的跟踪,则需要去查看MQ中的数据,但是有需要保证在查看之后不会被删除,因为在P2P模式中,consumer.receive()后消息之后,消息就被消费,MQ不会发送其他consumer,对于这种场景该如何考虑采用ActiveMQ的何种技术去做?
场景2:将使用JDBC持久化的ActiveMQ转换为其他存储方式(文件存储、Kaha、memory),需要做数据迁移,那如何实现?
解决:对于这两种场景,都可以用消息队列中消息查看的方式去实现;
第一个场景,使用ActiveMQ的Browser可以查看未被消费的信息,这样既保证数据不会被消费,也可以实现自己的其他业务;
第二个场景,可以使用Browser将未被消费的信息拿出来,然后再通过produce.send()的方式,将消息发送到其他存储方式的ActiveMQ上;

以下代码实现了使用Browser读出某个队列中未消费的所有消息,并将它们放到list中

public class BrowersService {

    private static final Logger logger=LogManager.getLogger(BrowersService.class);
    //配置文件配置的jmsTemplate
    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    public void getMessageFromQuese(String queueName){
        List<String> message=jmsTemplate.browse(queueName, new BrowserCallback<List<String>>() {
            @Override
            public List<String> doInJms(Session session, QueueBrowser browser) throws JMSException {
                Enumeration<TextMessage> enumeration=browser.getEnumeration();
                List<String> messages=new ArrayList<>();
                while (enumeration.hasMoreElements()) {
                    TextMessage textMessage = (TextMessage) enumeration.nextElement();
                    logger.info("Message text: "+ textMessage.getText()
                    +" ID: "+textMessage.getJMSMessageID());
                    messages.add(textMessage.getText());
                }
                return messages;
            }
        });
        logger.info("message from browser  "+message);
    }
}复制代码

相关文章:

  • 菜鸟学SQL注入 --- 一个简单的教学案例
  • 块级元素和内联元素的区别与转换
  • SQL Server 2005 触发器应用
  • hadoop-hdfs文件系统的安装与部署
  • 第四篇:数据预处理(一) - 缺失值处理
  • HDU 2586 How far away ?(LCA模板 近期公共祖先啊)
  • Mocha BSM产品亮点——报告管理
  • Spring-data-jpa详解,全方位介绍。
  • 互联网DNS存在重大漏洞 黑客可能控制网络流量
  • 基于 POI 封装 ExcelUtil 精简的 Excel 导入导出
  • 发布Web服务器上的虚拟主机:ISA2006系列之十一
  • NFS部署及优化(二)
  • java枚举与.net中的枚举区别
  • 在Hibernate中配置多对多连接表
  • ionic2 自定义cordova插件开发以及使用 (Android)
  • (十五)java多线程之并发集合ArrayBlockingQueue
  • “大数据应用场景”之隔壁老王(连载四)
  • 4月23日世界读书日 网络营销论坛推荐《正在爆发的营销革命》
  • 8年软件测试工程师感悟——写给还在迷茫中的朋友
  • Android路由框架AnnoRouter:使用Java接口来定义路由跳转
  • avalon2.2的VM生成过程
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • MYSQL如何对数据进行自动化升级--以如果某数据表存在并且某字段不存在时则执行更新操作为例...
  • vue2.0项目引入element-ui
  • webpack入门学习手记(二)
  • Zepto.js源码学习之二
  • 极限编程 (Extreme Programming) - 发布计划 (Release Planning)
  • 每天10道Java面试题,跟我走,offer有!
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 漂亮刷新控件-iOS
  • 前端学习笔记之观察者模式
  • 强力优化Rancher k8s中国区的使用体验
  • 使用putty远程连接linux
  • 源码之下无秘密 ── 做最好的 Netty 源码分析教程
  • 06-01 点餐小程序前台界面搭建
  • 你对linux中grep命令知道多少?
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • Prometheus VS InfluxDB
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • 数据库巡检项
  • #Linux(Source Insight安装及工程建立)
  • #我与Java虚拟机的故事#连载08:书读百遍其义自见
  • (C语言)逆序输出字符串
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (七)MySQL是如何将LRU链表的使用性能优化到极致的?
  • (完整代码)R语言中利用SVM-RFE机器学习算法筛选关键因子
  • (一)python发送HTTP 请求的两种方式(get和post )
  • (转) SpringBoot:使用spring-boot-devtools进行热部署以及不生效的问题解决
  • (转)linux 命令大全
  • ./include/caffe/util/cudnn.hpp: In function ‘const char* cudnnGetErrorString(cudnnStatus_t)’: ./incl
  • .h头文件 .lib动态链接库文件 .dll 动态链接库
  • .NET Windows:删除文件夹后立即判断,有可能依然存在
  • .NET 中 GetHashCode 的哈希值有多大概率会相同(哈希碰撞)
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...
  • .NET(C#) Internals: as a developer, .net framework in my eyes