当前位置: 首页 > news >正文

Spring配置activemq异步消息监听器

<!-- 创建工厂连接 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL"
            value="failover:(tcp://127.0.0.1:61616)?initialReconnectDelay=100" />
        <property name="useAsyncSend" value="false" />
        <property name="dispatchAsync" value="true" />
    </bean>
    <bean id="pooledJmsConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="targetConnectionFactory" />
    </bean>
    
    <bean id="queueDestinationIng" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>order.ing</value>
        </constructor-arg>
    </bean>
    
    <!-- 消息监听器 -->
    <bean id="ingMessageListener" class="com.bypay.listener.IngListener" />

    <!-- 消息监听容器 -->
    <bean id="jmsContainerIng"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
        <property name="destination" ref="queueDestinationIng" />
        <property name="messageListener" ref="ingMessageListener" />
        <property name="sessionTransacted" value="false"></property>
        <property name="concurrency" value="10-15"></property>
    </bean>

在上述配置中DefaultMessageListenerContainer主要的属性列表如下:

    messageListener: 消息侦听器,必选属性。

 

    taskExecutor: 任务调度器,可以使用线程池开并发的消费消息。如果开发者不指定,spring将会采用默认的TaskExecutor(SimpleAsyncTaskExecutor,类似于CachedThreadPool)。

 

    concurrentConsumers: 消费者的最大个数,因为在spring中messageListener实例是单例的,比如上文中的orderListener(备注:它实现了MessageListener接口),所以spring-jms不能自作主张的创建多个messageListener实例来并发消费。所以spring在内部,创建了多个MessageConsumer实例,并使用consumer.receive()方法以阻塞的方式来获取消息,当获取消息后,在执行messageListener.onMessage()方法;concurrentConsumers属性就是为了指定spring内部可以创建MessageConsumer的最大个数;当messageConsumer实例被创建后,将会封装在一个Runner接口并交给taskExecutor来调度;如果consumer在一直没有收到消息,则会被置为“idle”并从consumer列表中移除;如果所有的consumer都处于active状态,则会创建新的consumer实例直到达到maxConcurrentConsumers个数上限。通常taskExecutor的线程池容量稍大于concurrentConsumer。

 

    maxMessagesPerTask: 每个consumer所消费的消息个数,因为每个consumer都会独占一个Thread[consumer.receive()是阻塞的],当consumer消费maxMessagesPerTask个消息后,它就会退出线程,由taskExecutor重新调度。

 

    receiveTimeout: 内部的consumer在receive方法中阻塞的时间。默认为1秒。

 

    recoveryInterval:  当消息消费时,底层connection异常而无法继续,listener需要等待恢复的时间间隔。默认为5000ms。

 

    concurrency: “concurrentConsumers”与“maxConcurrentConsumers”两个参数的简写方式,格式为“5-10”,则表示concurrentConsumers为5,maxConcurrentConsumers为10。

 

    sessionTransacted: Session是否为事务类型。默认为false。

 

    messageSelector: 消息选择器。如果你希望此listener只接受某种特性的消息,可以通过指定selector的方式来过滤消息。

 

    pubSubDomain: 此消费通道是否为Topic,默认为“false”。所有与Topic有关的属性,只有在pubSubDomain为true的情况下生效。

 

    pubSubNoLocal: 对于Topic而言,此消费者是否消费本地消息。所谓本地消息,就是当Consumer与Producer公用底层一个Connection时,那么Producer发送的消息,相对于此Consumer就是本地消息。在pubSubDomain为true时有效。

 

    subscriptionDurable: 是否为“耐久性”订阅者。在pubSubDomain为true时有效。默认为false。

 

    durableSubscriptionName: 耐久订阅者名称,每个clientId下可以有多个耐久订阅者,但是他们必须有不同的名字。默认为className。

 

    errorHandler: 当listener.onMessage方法抛出异常时,异常该如何处理。

 

    autoStartup: 消费者是否自动启动,默认为true,那么在messageContainer实例化后,将会启动consumer(即调用Connection.start());如果为false,那些开发者需要在合适的时机手动启动。

    clientId: 对于Topic订阅者而言,此参数必备。

 

    sessionAcknowledgeMode: ACK MODE,默认为AUTO。spring-jms做了一件非常遗憾的事情,如果指定了sessionTransacted为true,那么在调用listener.onMessage()方法之后,则会立即提交事务(session.commit()),即使开发者使用了sessionAwareMessageListener,所以开发者无法实现基于事务的“批量”确认机制。如果开发者指定为CLIENT_ACK,那么spring-JMS将会在onMessage方法返回后立即调用message.acknowlege()方法,所以开发者自己是否确认以及何时确认,将没有意义,如果不希望spring来确认消息,只能在onMessage方法中通过抛出异常的方式。 其中“1”表示AUTO_ACKNOWLEDGE,“2”为CLIENT_ACKNOWLEDGE = 2,“3”为 DUPS_OK_ACKNOWLEDGE = 3。

转载于:https://www.cnblogs.com/wangjian1990/p/6689703.html

相关文章:

  • HTML起步——学习2
  • 1.Zabbix 3.0 基础
  • bzoj4823[CQOI2017]老C的方块
  • 工资计算(用SQL来计算)
  • 电梯演说模板练习
  • 优达学城-并行编程-Unit2 硬件内存
  • ajax,json
  • 修饰符
  • 网络中数据的传输过程
  • 如何把你的Windows PC变成瘦客户机
  • codevs 1086 栈(Catalan数)
  • Java设计模式---策略模式
  • 2017BUPT校赛 H-Black-white Tree
  • hibernate之复合主键作为外键的相关配置
  • js中match函数方法
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • 4月23日世界读书日 网络营销论坛推荐《正在爆发的营销革命》
  • Angular数据绑定机制
  • ES6简单总结(搭配简单的讲解和小案例)
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • JavaScript 是如何工作的:WebRTC 和对等网络的机制!
  • JavaScript创建对象的四种方式
  • MySQL主从复制读写分离及奇怪的问题
  • Puppeteer:浏览器控制器
  • SpiderData 2019年2月23日 DApp数据排行榜
  • 表单中readonly的input等标签,禁止光标进入(focus)的几种方式
  • 得到一个数组中任意X个元素的所有组合 即C(n,m)
  • 观察者模式实现非直接耦合
  • 如何正确配置 Ubuntu 14.04 服务器?
  • 详解NodeJs流之一
  • 因为阿里,他们成了“杭漂”
  • 怎么将电脑中的声音录制成WAV格式
  • const的用法,特别是用在函数前面与后面的区别
  • 带你开发类似Pokemon Go的AR游戏
  • ​【原创】基于SSM的酒店预约管理系统(酒店管理系统毕业设计)
  • #define,static,const,三种常量的区别
  • #define用法
  • $ is not function   和JQUERY 命名 冲突的解说 Jquer问题 (
  • (Forward) Music Player: From UI Proposal to Code
  • (solr系列:一)使用tomcat部署solr服务
  • (笔试题)合法字符串
  • (四)JPA - JQPL 实现增删改查
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • ***php进行支付宝开发中return_url和notify_url的区别分析
  • .describe() python_Python-Win32com-Excel
  • .NET 表达式计算:Expression Evaluator
  • .NET6使用MiniExcel根据数据源横向导出头部标题及数据
  • .NET与 java通用的3DES加密解密方法
  • @Autowired和@Resource的区别
  • @SuppressWarnings(unchecked)代码的作用
  • [ C++ ] STL---仿函数与priority_queue
  • [ HTML + CSS + Javascript ] 复盘尝试制作 2048 小游戏时遇到的问题
  • [ 攻防演练演示篇 ] 利用通达OA 文件上传漏洞上传webshell获取主机权限
  • [1159]adb判断手机屏幕状态并点亮屏幕
  • [2019.3.20]BZOJ4573 [Zjoi2016]大森林