当前位置: 首页 > news >正文

activeMQ使用总结 (spring 配置)

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

1 引用包

通过maven方式,应用activemq依赖包,pom.xml 添加如下信息,

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>

 

2 配置文件

配置server.properties文件如下,

#activity config
activity.mq.brokerURL=failover:(tcp://127.0.0.1:61616)

3 bean配置

配置相关bean,包括监听,消息发送,以及broker,queue/topic ,如下spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">

    <context:annotation-config/>

    <!-- 配置JMS连接工厂 -->
    <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${activity.mq.brokerURL}" />
        <property name="useAsyncSend" value="true" />
        <property name="clientID" value="providerClientConnect" />
        <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
    </bean>

    <amq:redeliveryPolicy id="activeMQRedeliveryPolicy" destination="#queueDestination"  maximumRedeliveries="10"/>

    <!-- 定义消息Destination -->
    <bean id="topicDestination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="SpringTopic"/>
    </bean>
    <bean id="queueDestination"  class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="signIncomeQueue"/>
    </bean>
    <!-- 消息发送者客户端 -->
    <bean id="providerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="providerConnectionFactory" />
        <!--<property name="defaultDestination" ref="topicDestination" />-->
        <property name="defaultDestination" ref="queueDestination" />
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="false"/>
        <!--<property name="receiveTimeout" value="10000" />-->
        <!-- deliveryMode, priority, timeToLive 的开关要生效,必须配置为true,默认false-->
        <property name="explicitQosEnabled" value="true"/>
        <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
        <!-- 发送模式
             DeliveryMode.NON_PERSISTENT=1:非持久 ;
             DeliveryMode.PERSISTENT=2:持久
        -->
        <property name="deliveryMode" value="2"/>
    </bean>

    <!-- 配置消息消费监听者 -->
    <bean id="consumerMessageListener" class="com.company.project.service.mq.ConsumerMessageListener" />

    <bean id="consumerListenerClient" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="providerConnectionFactory" />
        <property name="concurrentConsumers" value="10"/>
        <!--<property name="concurrency" value="10-20"/>-->
        <!-- 开启订阅模式 -->
        <property name="pubSubDomain" value="true"/>
        <!--<property name="destination" ref="topicDestination" />-->
        <property name="destination" ref="topicDestination" />
        <property name="subscriptionDurable" value="true"/>
        <!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,直到被这个ID的客户端消费掉-->
        <property name="clientId" value="consumerClient"/>
        <property name="messageListener" ref="consumerMessageListener" />
        <!-- 消息应答方式
             Session.AUTO_ACKNOWLEDGE  消息自动签收
             Session.CLIENT_ACKNOWLEDGE  客户端调用acknowledge方法手动签收
             Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送
        -->
        <property name="sessionAcknowledgeMode" value="2"/>
    </bean>

</beans>

4 发送接口

发送时,java最常用的有两种格式,textMessage和mapMessage,

package com.company.project.service.impl;

import com.alibaba.fastjson.JSON;
import com.hisense.hitv.service.IQueueService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.*;
import java.util.HashMap;
import java.util.Map;

@Service
public class QueueServiceImpl implements IQueueService {

    @Autowired
    private JmsTemplate jmsTemplate;

    public boolean pushMessage2QueueIncome(String uid, Integer incomeType, Integer incomeValue, Integer productCode) {
        sendMqMessageIncome(null, uid, incomeType, incomeValue, productCode);
        return false;
    }

   
    /**
     * 说明:发送的时候如果这里没有显示的指定destination.将用spring xml中配置的destination
     * @param destination
     */
    private void sendMqMessageIncome(Destination destination, final String uid,final Integer incomeType,
                                     final Integer incomeValue,final Integer productCode){
        if(null == destination){
            destination = jmsTemplate.getDefaultDestination();
        }
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                Map param = new HashMap();
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("customerId", uid);
                param.put("customerId", uid);
                mapMessage.setInt("incomeType", incomeType);
                param.put("incomeType", incomeType);
                mapMessage.setInt("incomeValue", incomeValue);
                param.put("incomeValue", incomeValue);
                mapMessage.setInt("productCode",productCode);
                param.put("productCode", productCode);
                TextMessage message= session.createTextMessage();
                message.setText(JSON.toJSONString(param));
                return message;
            }
        });
    }
}

5 监听接口

监听通过onMessage接口实现,监听broker推送消息

package com.company.project.service.mq;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SessionAwareMessageListener;

import javax.jms.*;


public class ConsumerMessageListener implements SessionAwareMessageListener{
    private static Logger logger = LoggerFactory.getLogger(ConsumerMessageListener.class);

    public void onMessage(Message message, Session session) throws JMSException {
        MapMessage tm = (MapMessage) message;

        try {

            logger.info("---------消息消费---------");
            logger.info("消息ID:\t" + tm.getJMSMessageID());
        } catch (JMSException e) {
            session.recover();//唤起重传 
            e.printStackTrace();
        }
    }
}

 

转载于:https://my.oschina.net/u/2441175/blog/806460

相关文章:

  • 并发 并行 同步 异步 多线程的区别
  • web服务器内置对象,或者说是ServletAPI的实例
  • 2016年NK冬季训练赛 民间题解
  • Tips
  • ratina 视网膜屏幕解决方案大全
  • rtmp拉流测试工具
  • cmd中java -jar *.jar 提示Unable to access jarfile *.jar或Windows不能用鼠标双击运行jar文件怎么办解决方案...
  • gulp同步执行任务
  • HBase内置过滤器的一些总结
  • 【VBA编程】09.使用Excle集合对象
  • 树莓派上Java程序作为linux服务并开机自动启动
  • tracert与pathping
  • 线程池及并发编程基础总结
  • Ztree当节点没有下级时不显示下拉图标
  • Bootstrap表单验证插件bootstrapValidator使用方法整理
  • [iOS]Core Data浅析一 -- 启用Core Data
  • 【跃迁之路】【444天】程序员高效学习方法论探索系列(实验阶段201-2018.04.25)...
  • 30秒的PHP代码片段(1)数组 - Array
  • Essential Studio for ASP.NET Web Forms 2017 v2,新增自定义树形网格工具栏
  • JavaScript 基础知识 - 入门篇(一)
  • js ES6 求数组的交集,并集,还有差集
  • Mocha测试初探
  • mysql常用命令汇总
  • MySQL几个简单SQL的优化
  • Nacos系列:Nacos的Java SDK使用
  • Octave 入门
  • python docx文档转html页面
  • Python学习笔记 字符串拼接
  • Redis的resp协议
  • vagrant 添加本地 box 安装 laravel homestead
  • 解析带emoji和链接的聊天系统消息
  • 如何使用 OAuth 2.0 将 LinkedIn 集成入 iOS 应用
  • 设计模式走一遍---观察者模式
  • 深度解析利用ES6进行Promise封装总结
  • 终端用户监控:真实用户监控还是模拟监控?
  • d²y/dx²; 偏导数问题 请问f1 f2是什么意思
  • elasticsearch-head插件安装
  • ​ssh-keyscan命令--Linux命令应用大词典729个命令解读
  • ​渐进式Web应用PWA的未来
  • ​软考-高级-系统架构设计师教程(清华第2版)【第1章-绪论-思维导图】​
  • #LLM入门|Prompt#1.7_文本拓展_Expanding
  • #我与Java虚拟机的故事#连载15:完整阅读的第一本技术书籍
  • $forceUpdate()函数
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (二)pulsar安装在独立的docker中,python测试
  • (翻译)terry crowley: 写给程序员
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (六)Hibernate的二级缓存
  • (论文阅读40-45)图像描述1
  • (每日持续更新)jdk api之StringBufferInputStream基础、应用、实战
  • (十六)串口UART
  • (一)插入排序
  • (轉貼) 蒼井そら挑戰筋肉擂台 (Misc)
  • (状压dp)uva 10817 Headmaster's Headache