当前位置: 首页 > news >正文

【微服务40】分布式事务Seata源码解析八:AT模式下本地事务的执行流程

文章目录

  • 一、前言
  • 二、本地事务SQL执行流程
    • 1、DataSourceProxy 数据库资源代理入口
      • 1)哪里使用了DataSourceProxy?
      • 2)SeataDataSourceProxy
    • 2、本地事务SQL的执行流程(execute)
      • 1)执行本地事务SQL的入口
      • 2)执行本地事务SQL逻辑
        • 1> 构建before image
        • 2> 执行SQL
        • 3> 构建after image
        • 4> 预处理undo log
    • 3、本地事务SQL的提交(commit)
      • 1)LockRetryPolicy重试机制
      • 2)本地事务提交流程
  • 三、总结

一、前言

至此,seata系列的内容包括:

  1. can not get cluster name in registry config ‘service.vgroupMapping.xx‘, please make sure registry问题解决;
  2. Seata Failed to get available servers: endpoint format should like ip:port 报错原因/解决方案汇总版(看完本文必解决问题)
  3. Seata json decode exception, Cannot construct instance of java.time.LocalDateTime报错原因/解决方案最全汇总版
  4. 【微服务 31】超细的Spring Cloud 整合Seata实现分布式事务(排坑版)
  5. 【微服务 32】Spring Cloud整合Seata、Nacos实现分布式事务案例(巨细排坑版)【云原生】
  6. 【微服务33】分布式事务Seata源码解析一:在IDEA中启动Seata Server
  7. 【微服务34】分布式事务Seata源码解析二:Seata Server启动时都做了什么
  8. 【微服务35】分布式事务Seata源码解析三:从Spring Boot特性来看Seata Client 启动时都做了什么
  9. 【微服务36】分布式事务Seata源码解析四:图解Seata Client 如何与Seata Server建立连接、通信
  10. 【微服务37】分布式事务Seata源码解析五:@GlobalTransactional如何开启全局事务
  11. 【微服务38】分布式事务Seata源码解析六:全局/分支事务分布式ID如何生成?序列号超了怎么办?时钟回拨问题如何处理?
  12. 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务

Seata最核心的全局事务执行流程,前面我们已经聊到了Seata全局事务的开启,本文接着聊Seata全局事务中执行具体业务操作时,DB操作是如何执行的(含:全局锁keys、undologs的构建(AT模式))?

在这里插入图片描述

二、本地事务SQL执行流程

全局事务的整体执行流程体现在TransactionalTemplate#execute()方法中:

在这里插入图片描述

具体代码 和 注释:

public Object execute(TransactionalExecutor business) throws Throwable {
    // 1. Get transactionInfo
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    // 获取当前事务,根据ThreadLocal,获取当前线程本地变量副本中的xid,进而判断是否存在一个全局事务
    // 刚开始一个全局事务时,肯定是没有全局事务的
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 1.2 Handle the transaction propagation.
    // 从全局事务的配置里 获取事务传播级别,默认是REQUIRED(如果存在则加入,否则开启一个新的)
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        // 根据事务的隔离级别做不同的处理
        switch (propagation) {
            case NOT_SUPPORTED:
                // If transaction is existing, suspend it.
                if (existingTransaction(tx)) {
                    // 事务存在,则挂起事务(默认将xid从RootContext中移除)
                    suspendedResourcesHolder = tx.suspend();
                }
                // Execute without transaction and return.
                return business.execute();
            case REQUIRES_NEW:
                // If transaction is existing, suspend it, and then begin new transaction.
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend();
                    tx = GlobalTransactionContext.createNew();
                }
                // Continue and execute with new transaction
                break;
            case SUPPORTS:
                // If transaction is not existing, execute without transaction.
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                // Continue and execute with new transaction
                break;
            case REQUIRED:
                // If current transaction is existing, execute with current transaction,
                // else continue and execute with new transaction.
                break;
            case NEVER:
                // If transaction is existing, throw exception.
                if (existingTransaction(tx)) {
                    throw new TransactionException(
                        String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                , tx.getXid()));
                } else {
                    // Execute without transaction and return.
                    return business.execute();
                }
            case MANDATORY:
                // If transaction is not existing, throw exception.
                if (notExistingTransaction(tx)) {
                    throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                }
                // Continue and execute with current transaction.
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
        if (tx == null) {
            // 创建全局事务(角色为事务发起者),并关联全局事务管理器
            tx = GlobalTransactionContext.createNew();
        }

        // set current tx config to holder
        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
            //    else do nothing. Of course, the hooks will still be triggered.
            // 开启全局事务,如果事务角色是'GlobalTransactionRole.Launcher',发送开始事务请求到seata-server(TC)
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // Do Your Business
                // 执行业务方法,把全局事务ID通过 MVC拦截器 / dubbo filter传递到后面的分支事务;
                // 每个分支事务都会去运行
                rs = business.execute();
            } catch (Throwable ex) {
                // 3. The needed business exception to rollback.
                // 如果全局事务执行发生了异常,则回滚;
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 4. everything is fine, commit.
            // 全局事务和分支事务运行无误,提交事务;
            commitTransaction(tx);

            return rs;
        } finally {
            //5. clear
            // 全局事务完成之后做一些清理工作
            resumeGlobalLockConfig(previousConfig);
            triggerAfterCompletion();
            cleanUp();
        }
    } finally {
        // If the transaction is suspended, resume it.
        if (suspendedResourcesHolder != null) {
            // 如果有挂起的全局事务,则恢复全局事务
            tx.resume(suspendedResourcesHolder);
        }
    }
}

在前一篇文章: 【微服务39】分布式事务Seata源码解析七:图解Seata事务执行流程之开启全局事务,我们已经聊到了开启全局事务,本文继续聊开启全局事务之后,本地事务中的SQL执行流程。

在这里插入图片描述

1、DataSourceProxy 数据库资源代理入口

在Spring Cloud 整合Seata实现分布式事务一文中有聊到Spring Cloud 集成Seata 的AT模式,需要写一个配置类DataSourceConfig,其中会注入一个Bean(DataSourceProxy):

在这里插入图片描述

到这里,博主有一个问题:注入DataSourceProxy到Spring容器中之后,哪里会用到它?执行数据增删改查时如何切换到代理数据源?

1)哪里使用了DataSourceProxy?

在这里插入图片描述

从源码来看,有一个Spring AOP抽象类AbstractAutoProxyCreator的子类SeataAutoDataSourceProxyCreator

Spring 通过 AbstractAutoProxyCreator来创建 AOP 代理,其实现了BeanPostProcessor 接口,在 bean 初始化完成之后会创建它的代理,让后将代理对象增加到Spring容器。

在Seata 中,SeataAutoDataSourceProxyCreator的主要作用是为数据源DataSource添加Advisor,当数据源执行操作时,便会进入到SeataAutoDataSourceProxyAdvice类中处理;

在这里插入图片描述

因此,当数据源执行CRUD操作时,由于添加了AOP代理,会进入到SeataAutoDataSourceProxyAdvice#invoke()方法中:

在这里插入图片描述

咦,这里没有DataSourceProxy呀,只有SeataDataSourceProxy,从命名来看,这俩类总感觉有点关系!

2)SeataDataSourceProxy

在这里插入图片描述

从DataSourceProxy类的继承结构来看,DataSourceProxy实现了SeataDataSourceProxy接口;因此SeataAutoDataSourceProxyAdvice#invoke()方法中动态代理类实际就是DataSourceProxy

2、本地事务SQL的执行流程(execute)

1)执行本地事务SQL的入口

在这里插入图片描述
JDBC的执行流程:

  1. 第一步:注册驱动;
  2. 第二步:获取与数据库的连接Connection;
  3. 第三步:获取数据库操作对象Statement;
  4. 第四步:执行sql语句(DQL、DML…),并且返回结果集;
  5. 第五步:处理查询结果集;
  6. 第六步:释放资源、关闭连接;
try {
    //加载数据库驱动
    Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
    // do something
}
Connection conn = DriverManager.getConnection(URL, USER_NAME, PASSWORD);
PreparedStatement pst = conn.prepareStatement("update user set name=? where id = ?");
pst.setString(1, "bobDog");
pst.setInt(2, 1);
int updateRes = pst.executeUpdate();
if (updateRes > 0) {
    System.out.println("更新成功!");
}

Seata代理的数据库资源DataSource底层也是JDBC操作数据库,所以也需要先获取数据库连接Connection、再根据数据库连接获取数据库操作对象Statement、接着再通过Statement#execute()执行SQL。在Seata中的表现为:

  1. 先获取seata代理的数据库连接ConnectionProxy;
    在这里插入图片描述
  2. 再根据ConnectionProxy获取一个数据库操作对象 StatementProxyPreparedStatementProxy
    在这里插入图片描述
  3. 然后再利用数据库操作对象 StatementProxyPreparedStatementProxy 的execute() 或 executeUpdate() 方法执行SQL语句。
    在这里插入图片描述
    在这里插入图片描述

StatementProxyPreparedStatementProxy 增强了所有的execute方法,由ExecuteTemplate选择需要的Executor执行来sql。

下面以常见的更新操作(PreparedStatementProxy#executeUpdate())为例:

ExecuteTemplate#execute()重载方法调用链路如下:

在这里插入图片描述

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                 StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {
    // 没获取到全局锁,并且事务模式不是AT
    if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    // 获取DB的类型
    String dbType = statementProxy.getConnectionProxy().getDbType();
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        sqlRecognizers = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                dbType);
    }
    Executor<T> executor;
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        executor = new PlainExecutor<>(statementProxy, statementCallback);
    } else {
        if (sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            // 数据库操作类型
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case INSERT_ON_DUPLICATE_UPDATE:
                    switch (dbType) {
                        case JdbcConstants.MYSQL:
                        case JdbcConstants.MARIADB:
                            executor =
                                new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                    }
                    break;
                default:
                    executor = new PlainExecutor<>(statementProxy, statementCallback);
                    break;
            }
        } else {
            executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
        }
    }
    T rs;
    try {
        // 通过Executor真正的执行
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException) ex;
    }
    return rs;
}
  1. 如果当前事务不需要获取全局锁,并且不是AT模式,则以original statement的方式执行。默认Seata Client层面不需要获取全局锁,事务模式是AT模式。
  2. 获取到的DB类型,比如MySQL、Oracle…,博主的项目DBType是MYSQL。
  3. 获取SQL DML类型,并根据DML类型,选择不同的Executor。这里可以看做是策略模式。

因为示例是Update类型,所以最终选择的Executor是UpdateExecutor。

2)执行本地事务SQL逻辑

UpdateExecutor#execute()方法中会执行本地事务SQL,UpdateExecutor的类继承图如下:

在这里插入图片描述

除了数据更新前后的Image构造体现在UpdateExecutor类的方法中,其余方法均在其父类BaseTransactionalExecutor中,包括execute()方法。

@Override
public T execute(Object... args) throws Throwable {
    // 从全局事务上下文中获取xid
    String xid = RootContext.getXID();
    if (xid != null) {
        // 将xid绑定到ConnectionContext中,后续提交本地事务时会用到
        statementProxy.getConnectionProxy().bind(xid);
    }

    // RootContext.requireGlobalLock()检查是否需要全局锁,默认不需要
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    return doExecute(args);
}

开始执行本地事务SQL时:

  1. 首先从全局事务上下文RootContext中获取到xid,如果存在全局事务xid,则将xid绑定到数据库连接的上下文ConnectionContext中;
  2. 从全局事务上下文RootContext获取是否全局锁标识,默认不需要;如果需要获取全局锁,则将数据库连接上下文ConnectionContext中的isGlobalLockRequire设置为true;
  3. 调用doExecute()方法真正开始执行SQL;

UpdateExecutor#doExecutor()方法:

在这里插入图片描述

开启了全局事务之后,DML语句的本地事务不会自动提交。

即使自动提交没有关闭,AbstractDMLBaseExecutor#doExecute(Object… args)方法中也会先将其关闭,然后再以非自动提交的方式执行SQL,走ConnectionProxy提交本地事务,然后再将自动提交设置为true;这一块逻辑体现在executeAutoCommitTrue()方法中:

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.changeAutoCommit();
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}

正常情况下都是直接以非自动提交的方式执行,即执行executeAutoCommitFalse()方法:

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
        throw new NotSupportYetException("multi pk only support mysql!");
    }
    // 根据SQL语句构建before image,目标SQL执行之前的数据镜像:从数据库根据ID主键等信息查询出更新前的数据;
    TableRecords beforeImage = beforeImage();
    // 真正的去执行SQL语句,但是本地事务还没有提交
    T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    int updateCount = statementProxy.getUpdateCount();
    if (updateCount > 0) {
        // 目标SQL执行之后的数据镜像:从数据库根据ID主键等信息查询出更新后的数据;
        TableRecords afterImage = afterImage(beforeImage);
        // 准备好undo log数据
        prepareUndoLog(beforeImage, afterImage);
    }
    return result;
}

在这里插入图片描述

由于AbstractDMLBaseExecutor提供了公用的executeAutoCommitFalse()给Insert、Delete、Update类型的Executor使用,所以无论是Insert、Delete还是Update操作都会走AbstractDMLBaseExecutor#executeAutoCommitFalse()方法执行SQL。不过MySQL的MySQLInsertOrUpdateExecutor是个个例,其执行SQL的逻辑由自己实现(有兴趣可以自己看一下MySQLInsertOrUpdateExecutor)。

以非自动提交执行SQL的流程如下:

  1. beforeImage() – 根据SQL语句构建before image,查询目标sql执行前的数据快照;
    • Update、Delete操作从数据库根据ID主键等信息查询出更新前的数据;
    • Insert操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  2. 执行SQL语句,但是本地事务还没有提交;
  3. afterImage() – 构建after image,查询目标SQL执行之后的数据快照;
    • Insert、Update操作从数据库根据ID主键等信息查询出更新后的数据;
    • Delete操作直接返回空的TableRecords,其中只包含TableMeta,没有数据记录;
  4. prepareUndoLog(beforeImage, afterImage) --> 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中。
    其中还包括构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中。

下面就这几步展开看一看;

1> 构建before image

此处依旧以Update为例:

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

2> 执行SQL

在这里插入图片描述
最终使用源Statement执行SQL;

3> 构建after image

执行完SQL之后,再构建SQL查询出当前最新的数据记录作为after image;

在这里插入图片描述

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

4> 预处理undo log

将before image 和 after image合并作为回滚日志undo log,存储到当前数据库连接上下文ConnectionContext中。

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
        return;
    }
    if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
        if (beforeImage.getRows().size() != afterImage.getRows().size()) {
            throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
        }
    }
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    // 1、构建全局锁key信息,针对更新的一批数据主键ID构建这批数据的全局锁key
    // 例如:table_name:id_1101
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        // 将lockKeys信息保存到ConnectionContext中,在注册分支事务时,再将全局锁信息放入到TC中进行检查、存储
        connectionProxy.appendLockKey(lockKeys);

        // 2、构建undo log
        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        // 将undo log信息保存到ConnectionContext中
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html),具体细节见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志)

由于关闭了AutoCommit,所以在Statement.execute()执行完SQL之后,需要“手动”提交本地事务。

3、本地事务SQL的提交(commit)

回到ConnectionProxy#commit()方法,这里是“手动”提交本地事务的入口;

@Override
public void commit() throws SQLException {
    try {
        // 由LockRetryPolicy负责提交事务,LockRetryPolicy中包含全局锁的概念,支持retry重试策略
        lockRetryPolicy.execute(() -> {
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
            rollback();
        }
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

本地事务的提交又会委托给LockRetryPolicy的execute方法来执行;

1)LockRetryPolicy重试机制

LockRetryPolicy是ConnectionProxy的静态内部类,其中包含了全局锁的概念,支持retry策略,当出现全局锁冲突时支持多次重试获取全局锁。

在这里插入图片描述

默认情况下execute()方法中:

  • LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT为TRUE,可以通过配置client.rm.lock.retryPolicyBranchRollbackOnConflict=false属性改变;
  • connection.getContext().isAutoCommitChanged()为FALSE;

所以默认情况下,都会走重试获取全局锁的逻辑:doRetryOnLockConflict()方法。(当然可以选择开启自动提交事务、并设置属性client.rm.lock.retryPolicyBranchRollbackOnConflict=true,这样便不会走重试获取全局锁逻辑。)

protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
    LockRetryController lockRetryController = new LockRetryController();
    while (true) {
        try {
            return callable.call();
        } catch (LockConflictException lockConflict) {
            // 出现全局锁冲突,回滚本地事务
            onException(lockConflict);
            // AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released
            if (connection.getContext().isAutoCommitChanged()
                    && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) {
                lockConflict.setCode(TransactionExceptionCode.LockKeyConflict);
            }
            // 线程睡眠10ms,然后再重试,超过重试次数,抛出异常结束流程
            lockRetryController.sleep(lockConflict);
        } catch (Exception e) {
            // 出现非全局锁冲突的异常,则直接报错返回
            onException(e);
            throw e;
        }
    }
}

doRetryOnLockConflict()方法中:

  • 如果因为全局锁冲突导致提交本地事务失败,先回滚本地事务,然后会判断重试次数(lockRetryTimes,默认30次)再进行重试,重试之前会让线程睡眠一段时间(lockRetryInterval,默认10ms)。如果重试次数已经够了,则直接抛出异常结束流程。
    在这里插入图片描述
  • 如果因为其他异常(包括超过重试次数)导致提交本地事务失败,直接回滚本地事务、抛出异常结束流程。

上面的ConnectionProxy#onException()方法中负责回滚本地事务、清理当前连接的ConnectionContext中的undo log信息、全局锁keys信息;

在这里插入图片描述

了解完了全局锁冲突引起的重试机制,下面接着看本地事务的提交流程。

2)本地事务提交流程

在这里插入图片描述

LockRetryPolicy#execute()方法中会运行方法的入参Callable,在ConnectionProxy#commit()方法中传入的到LockRetryPolicy#execute()方法中的Callable为:

() -> {
        doCommit();
        return null;
    }

doCommit()方法:

private void doCommit() throws SQLException {
    // 当前DML操作在全局事务中时,判定条件:ConnectionContext中包含xid
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        // 如果使用了@GlobalLock,需要获取全局锁
        processLocalCommitWithGlobalLocks();
    } else {
        // 不在分布式事务中,则以原生connection提交本地事务
        targetConnection.commit();
    }
}

doCommit()方法中分三种情况进行不同的处理:

  1. 如果当前DML操作在全局事务中,即:当前连接的ConnectionContext中包含xid,则以处理全局事务方式(processGlobalTransactionCommit()提交本地事务;
  2. 如果使用了@GlobalLock,需要获取全局锁,再以原生connection提交本地事务;
    在这里插入图片描述
  3. 否则如果事务不在分布式事务中,则以原生connection提交本地事务;
    在这里插入图片描述

正常我们使用分布式事务,一般肯定是要以全局事务的方式执行DML操作;即:默认会进入到processGlobalTransactionCommit():

private void processGlobalTransactionCommit() throws SQLException {
    try {
        // 向远程的TC中注册分支事务,并检查、增加全局行锁
        register();
    } catch (TransactionException e) {
        // 出现异常时,回滚本地事务 再重试。
        // 大多数情况是因为全局锁冲突走到这里。
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        // 回滚日志管理组件,持久化undo log
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        // 提交本地事务
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        // 上报分支事务执行失败,用于监控
        report(false);
        throw new SQLException(ex);
    }
    // 上报分支事务执行成功,默认不会上报
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    // 重置连接的ConnectionContext
    context.reset();
}

以全局事务的方式提交本地事务会做四件事:

  1. 通过netty请求TC,注册分支事务,并检查、增加全局行锁;
    • 如果出现异常,则回滚本地事务。若异常类型为全局锁冲突LockConflictException,则进入重试策略;其他异常类型则直接抛出SQLException
  2. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB;
  3. 提交本地事务,真正将业务数据和回滚日志 持久化到DB;
  4. 向TC上报本地事务提交结果;
    • 如果持久化undo log 或 提交本地事务出现异常,则上报分支事务执行失败;
    • 如果本地事务提交成功,上报分支事务执行成功;默认并不会上报。

最后,清空当前数据库连接的ConnectionContext。

点个关注、订阅一下专栏(https://blog.csdn.net/saintmm/category_11953405.html);

  • 分支事务的注册细节见下一篇文章(【微服务41】分布式事务Seata源码解析九:分支事务如何注册到全局事务);

  • undo log持久化细节 见下下篇文章(【微服务42】分布式事务Seata源码解析十:AT模式下如何构建undo log日志);

三、总结

AT模式下本地事务的SQL执行流程,即RM的分支事务执行流程,主要包括一下几步:

  • 开始执行本地事务的SQL之前,从全局事务上下文RootContext中获取到xid,然后将xid绑定到数据库连接的上下文ConnectionContext中;
  1. 构建before image,查询目标sql执行前的数据快照;
  2. 执行目标SQL语句,但是本地事务还没有提交;
  3. 构建after image,查询目标SQL执行之后的数据快照;
  4. 将before image 和 after image合并作为回滚日志undo log,保存到当前数据库连接上下文ConnectionContext中;
  5. 构建当前本地事务要占用所有全局锁key信息,然后将其保存到当前数据库连接上下文ConnectionContext中;
  6. 通过netty请求TC,注册分支事务,并检查、增加全局行锁;这里可能会出现全局锁冲突 导致注册分支事务失败,所以有一个重试机制;
  7. 将执行SQL时保存到ConnectionContext中的undo log 回滚日志 保存到DB(undo_log表);
  8. 提交本地事务;
  9. 向TC上报本地事务提交结果;
  10. 最后清空当前数据库连接的ConnectionContext,恢复现场。

整个SQL提交可以理解为两阶段提交:

  • 一阶段:先注册分支事务,检查全局锁。
  • 二阶段:插入undolog、提交本地事务。

相关文章:

  • 面试汇总(一)
  • 分布式任务调度XXL-JOB-第二章-SpringBoot集成XXL-JOB
  • Linux系统下查看被杀死进程的信息
  • 粒子群算法PSO求解最大值和最小值案例(超详细注释)
  • LeetCode每日一题——902. 最大为 N 的数字组合
  • Java学习--JDBC
  • C++中GDAL批量创建多个栅格图像文件并批量写入数据
  • 基于maven的spring项目实现登录注册(SSM)
  • 【C++】动态内存管理
  • 波士顿动力再惊艳!机器人大秀男团舞,举手投足人味满满,多次转卖后展示新标签...
  • Python 代码托管到码云平台,原来这么简单
  • 【MATLAB教程案例26】图像特征点提取算法matlab仿真与分析——sift,surf,kaze,corner,BRISK等
  • 前端深拷贝与浅拷贝(附实现方法)
  • C#工业生产线MES系统,源代码分享
  • 2022软考高项十大领域知识整理(四)-人力资源管理、干系人管理、采购管理
  • Flannel解读
  • HTTP那些事
  • JavaScript 一些 DOM 的知识点
  • JavaScript实现分页效果
  • Java比较器对数组,集合排序
  • nfs客户端进程变D,延伸linux的lock
  • nodejs:开发并发布一个nodejs包
  • Odoo domain写法及运用
  • React-flux杂记
  • 面试总结JavaScript篇
  • 在Mac OS X上安装 Ruby运行环境
  • elasticsearch-head插件安装
  • 翻译 | The Principles of OOD 面向对象设计原则
  • ​MySQL主从复制一致性检测
  • !! 2.对十份论文和报告中的关于OpenCV和Android NDK开发的总结
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • (2)nginx 安装、启停
  • (编程语言界的丐帮 C#).NET MD5 HASH 哈希 加密 与JAVA 互通
  • (附源码)springboot 智能停车场系统 毕业设计065415
  • (深入.Net平台的软件系统分层开发).第一章.上机练习.20170424
  • (原创) cocos2dx使用Curl连接网络(客户端)
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...
  • .helper勒索病毒的最新威胁:如何恢复您的数据?
  • .net framework4与其client profile版本的区别
  • .net 写了一个支持重试、熔断和超时策略的 HttpClient 实例池
  • .NET/C# 获取一个正在运行的进程的命令行参数
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .NET使用HttpClient以multipart/form-data形式post上传文件及其相关参数
  • .net下简单快捷的数值高低位切换
  • @KafkaListener注解详解(一)| 常用参数详解
  • [ 常用工具篇 ] AntSword 蚁剑安装及使用详解
  • [APUE]进程关系(下)
  • [BUG] Authentication Error
  • [BZOJ1053][HAOI2007]反素数ant
  • [CSDN首发]鱿鱼游戏的具体玩法详细介绍
  • [English]英语积累本
  • [Hadoop in China 2011] 蒋建平:探秘基于Hadoop的华为共有云
  • [HTML]Web前端开发技术12(HTML5、CSS3、JavaScript )——喵喵画网页
  • [Linux] CE知识随笔含Ansible、防火墙、VIM、其他服务