当前位置: 首页 > news >正文

消息队列实现分布式事务

 业务流转图:

搭建环境:activemq + springboot + mybatis + mysql

1、下载activemq配置activemq配置信息(conf/activemq.xml):

2、建表td_order_event,分别在每个服务对应的每个库创建一张临时流转表记录,这边演示创建两边一模一样的表);

CREATE TABLE `td_order_event` (
  `id` tinyint(10) NOT NULL,
  `order_type` tinyint(10) DEFAULT NULL COMMENT '订单类型(0: 创建,1, 已下单,2,已支付  )',
  `process` varchar(255) DEFAULT NULL,
  `content` varchar(500) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '订单中间事件表',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;


3、分别准备两个服务 、搭建环境导入相关依赖;

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.22</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.22</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

4、配置生产端链接信息application.yml, 及代码编写;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;


@Configuration
public class ActiveConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public Queue queue() {
        return new ActiveMQQueue("ActiveMQQueue");
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(brokerUrl);
    }
}
@SpringBootApplication
@MapperScan(value = "com.xxx.serviceorder.dao")
@EnableJms
@EnableScheduling
public class ServiceOrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServiceOrderApplication.class, args);
    }

}

4.2 编写sql语句, 分别一个查询语句,更新数据;

 4.3 编写定时任务,监听数据;


import com.alibaba.fastjson.JSONObject;
import com.xckk.serviceorder.dao.TdOrderEventDao;
import com.xckk.serviceorder.entity.TdOrderEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.Queue;
import java.util.Date;
import java.util.List;

@Component
public class Produce {
    @Autowired
    private TdOrderEventDao tdOrderEventDao;

    @Autowired
    private Queue queue;
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    @Scheduled(cron = "0/5 * * * * ?")
    @Transactional(rollbackFor = Exception.class)
    public void task() {
        System.out.println(new Date() +"【开始执行】");

        // 查询新建的中间表
        List<TdOrderEvent> tdOrderEvents = tdOrderEventDao.selectOrderEventByType("0");

        for (TdOrderEvent tdOrderEvent : tdOrderEvents) {
            tdOrderEventDao.updateOrderEventById(tdOrderEvent.getId());

            System.out.println(tdOrderEvent.getId() + "数据修改成功");
            //
            jmsMessagingTemplate.convertAndSend(queue, JSONObject.toJSONString(tdOrderEvent));
        }

    }
}

5、编写消费端代码;

 sql语句,利用主键id,确保消息重复;

 5.1 配置mqbean信息


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration
public class ActiveConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String passWord;


    @Bean
    public ActiveMQConnectionFactory connectionFactory(RedeliveryPolicy redeliveryPolicy) {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return connectionFactory;
    }

    /**
     * 重发配置
     * @return
     */
    @Bean
    public RedeliveryPolicy redeliveryPolicy() {
        RedeliveryPolicy policy = new RedeliveryPolicy();
        return policy;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory containerFactory = new DefaultJmsListenerContainerFactory();
        containerFactory.setConnectionFactory(activeMQConnectionFactory);
        // 1: 自动确认 2:客户端手动确认 3:自动批量确认 4:事务提交并确认
        containerFactory.setSessionAcknowledgeMode(2);
        return containerFactory;
    }


}

业务处理 :处理失败测重试六次,六次都失败则加入死信队列处理;


import com.alibaba.fastjson.JSONObject;
import com.xckk.servicepay.dao.TdOrderEventDao;
import com.xckk.servicepay.entity.TdOrderEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

@Component
public class ConsumerQueue {

    @Autowired
    private TdOrderEventDao tdOrderEventDao;

    @JmsListener(destination = "ActiveMQQueue", containerFactory = "jmsListenerContainerFactory")
    public void receive(TextMessage textMessage, Session session) throws JMSException {
        System.out.println(" 消费的消息:"+textMessage.getText());
        try {
            String text = textMessage.getText();
            TdOrderEvent tdOrderEvent = JSONObject.toJavaObject(JSONObject.parseObject(text), TdOrderEvent.class);
            tdOrderEventDao.insert(tdOrderEvent);

            textMessage.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("ActiveMQQueue>>> 异常!!!!");
            session.recover();
        }
    }
    /**
     * 死信队列
     *
     * @param text
     */
    @JmsListener(destination = "DLQ.ActiveMQQueue")
    public void receive(String text) {
        System.out.println("处理失败的数据!!!" + text);
    }
}

验证:

1、启动mq 

2、启动两端服务

3、插入一条测试数据

 

数据处理完毕;

4、测试异常信息(比如mq挂掉、消息重复等等)

相关文章:

  • 前端性能优化指标 + 检测工具
  • CubeMx笔记 --pwm输出+输入捕获
  • 轻松玩转树莓派Pico之一、新手上路
  • 目前我国网络安全人才市场状况
  • Redis源码解读之用RedisAe实现一个简单的HTTP服务器
  • 【极简python】第一章 print与变量
  • HAL库与Cubemx\rt-thread Nano系列教程-01-新建HAL工程及移植RT-Nano到Alios Developer Kit
  • 论文阅读_知识蒸馏_MobileBERT
  • No2.搭建基本的资源端解析token(资源服务端)
  • Vue入门【四】-- 事件机制与双向数据绑定
  • 小型超市管理系统的设计与实现 毕业设计-附源码011136
  • R语言缺失时间序列的填充:补齐时间序列数据中所有缺失的时间索引、使用na.locf函数将缺失值NA替换为前序时刻最近的值
  • 26.STM32 SPI通信接口
  • [JS] node.js 入门
  • 卸载mysq并重新安装教程
  • [ JavaScript ] 数据结构与算法 —— 链表
  • Java比较器对数组,集合排序
  • jquery ajax学习笔记
  • js 实现textarea输入字数提示
  • JS题目及答案整理
  • Lucene解析 - 基本概念
  • maya建模与骨骼动画快速实现人工鱼
  • Python连接Oracle
  • SpringCloud集成分布式事务LCN (一)
  • Vue小说阅读器(仿追书神器)
  • Vue学习第二天
  • 个人博客开发系列:评论功能之GitHub账号OAuth授权
  • 经典排序算法及其 Java 实现
  • 爬虫模拟登陆 SegmentFault
  • 前端工程化(Gulp、Webpack)-webpack
  • 如何胜任知名企业的商业数据分析师?
  • 如何实现 font-size 的响应式
  • 时间复杂度与空间复杂度分析
  • 小程序开发中的那些坑
  • ​马来语翻译中文去哪比较好?
  • #绘制圆心_R语言——绘制一个诚意满满的圆 祝你2021圆圆满满
  • ()、[]、{}、(())、[[]]命令替换
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (论文阅读32/100)Flowing convnets for human pose estimation in videos
  • (十)【Jmeter】线程(Threads(Users))之jp@gc - Stepping Thread Group (deprecated)
  • (转)chrome浏览器收藏夹(书签)的导出与导入
  • (转载)Linux 多线程条件变量同步
  • (轉貼) 寄發紅帖基本原則(教育部禮儀司頒布) (雜項)
  • .NET CF命令行调试器MDbg入门(四) Attaching to Processes
  • .NET/C# 获取一个正在运行的进程的命令行参数
  • .NET大文件上传知识整理
  • .net对接阿里云CSB服务
  • .net分布式压力测试工具(Beetle.DT)
  • .NET基础篇——反射的奥妙
  • .Net接口调试与案例
  • .NET面试题(二)
  • .net项目IIS、VS 附加进程调试
  • .NET项目中存在多个web.config文件时的加载顺序
  • .net中调用windows performance记录性能信息