强一致性分布式事务XA 浅析

一、前言

分布式事务:分布式条件下,多个节点操作的整体事务一致性。

特别是在微服务场景下,业务 A 和业务 B 关联,事务 A 成功,事务 B 失败,由于跨系统, 就会导致不被感知。 此时从整体来看,数据是不一致的。

分布式事务中的两大基本理论:CAP 理论 与 Base 理论。

分布式事务解决方案可以分为:

  • 强一致性分布式事务解决方案:基于 CAP 理论
  • 最终一致性分布式事务解决方案:基于 Base 理论

图片[1] - 强一致性分布式事务XA 浅析 - MaxSSL

  1. 应用程序(AP:参与 DTP 分布式事务模型的应用程序。
  2. 事务管理器(TM:负责协调和管理 DTP 模型中的事务,为应用程序提供编程接口,同时管理资源管理器。
  3. 资源管理器(RM:数据库管理系统或消息服务管理器。

(2)2PC 模型

两阶段提交(Two-phase Commit, 2PC)算法,经常用来实现分布式事务。

2PC 模型两阶段执行流程:

图片[2] - 强一致性分布式事务XA 浅析 - MaxSSL

  1. CanCommit 阶段:询问是否能够执行事务。
  2. PreCommit 阶段:执行事务操作。
  3. doCommit / doRollback 阶段:正式提交事务。

3PC 模型主要解决了 单点故障问题,并减少了事务执行过程中产生的阻塞现象。

二、XA 强一致性分布式事务原理

XA 规范:

  • xa_start: 负责开启或者恢复一个事务分支,并且管理 XID 到调用线程。

  • xa_end: 负责取消当前线程与事务分支的关联。

  • xa_prepare: 询问 RM 是否准备好提交事务分支。

  • —————— 第一阶段提交 —————————

    如果是单机,可以直接跳过 prepare 和第二阶段,输入 one phase commit 事务id 直接进行提交即可。

  • xa_commit: 通知 RM 提交事务分支。

  • xa_rollback: 通知 RM 回滚事务分支。

  • xa_recover: 需要恢复的 XA 事务。

  • —————— 第二阶段提交 —————————

XA 二阶段提交:

  • 一阶段:执行 XA PREPARE 语句。
  • 二阶段:执行 XA COMMIT/ROLLBACK 语句。

XA 协议存在的问题

  1. 同步阻塞问题:一般情况下,不需要调高隔离级别,XA 默认不会改变隔离级别

    全局事务内部包含了多个独立的事务分支,这一组事务分支要不都成功,要不都失败。各个事务分支的 ACID 特性共同构成了全局事务的 ACID 特性。也就是将单个事务分支的支持的 ACID 特性提升一个层次(up a level)到分布式事务的范畴。即使在非分布事务中(即本地事务),如果对操作读很敏感,我们也需要将事务隔离级别设置为 SERIALIZABLE,而对于分布式事务来说,更是如此,可重复读隔离级别不足以保证分布式事务一致性。也就是说,如果我们使用 MySQL 来支持 XA 分布式事务的话,那么最好将事务隔离级别设置为 SERIALIZABLE,地球人都知道 SERIALIZABLE(串行化)是四个事务隔离级别中最高的一个级别,也是执行效率最低的一个级别

  2. 单点故障成熟的 XA 框架需要考虑 TM 的高可用性

    由于协调者的重要性,一旦协调者 TM 发生故障,参与者 RM 会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

  3. 数据不一致极端情况下,一定有事务失败问题,需要监控和人工处理

    在二阶段提交的阶段二中,当协调者向参与者发送 commit 请求之后,发生了局部网络异常或者在发送 commit 请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了 commit 请求。而在这部分参与者接到 commit 请求之后就会执行 commit 操作。但是其他部分未接到 commit 请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

解决 XA 存在的问题

解决 XA 数据不一致方案:

  1. 日志存储:记录 XA 事务在每个流程中的执行状态。
  2. 自定义事务恢复:通过 XA recovery 命令从资源管理器中获取需要被恢复的事务记录,然后根据 XID 匹配应用程序中存储的日志,根据事务状态进行提交或回滚。

解决事务管理器的单点故障方案:

  1. 去中心化部署:事务管理器嵌套在应用程序里面,不再单独部署。

图片[3] - 强一致性分布式事务XA 浅析 - MaxSSL

(1)MySQLXA 规范的支持

MySQL5.0.3 开始支持 InnoDB 引擎的 XA 分布式事务, MySQL Connector/J5.0.0 版本开始支持 XA

图片[4] - 强一致性分布式事务XA 浅析 - MaxSSL

完整的 XA 事务处理过程:

图片[5] - 强一致性分布式事务XA 浅析 - MaxSSL

MySQL XA 的问题

MySQL <5.7 版本会出现的问题:

  1. 已经 prepare (预提交)的事务,在客户端退出或者服务宕机的时候,二阶段提交 的事务会被回滚。

  2. 在服务器故障重启提交后,相应的 Binlog 被丢失

MySQL 5.6 版本在客户端退出的时候,自动把已经 prepare 的事务回滚了,那么 MySQL 为什么要这样做?

这主要取决于 MysQL 的内部实现,MySQL 5.7 以前的版本,对于 prepare 的事务,MySQL 是不会记录 binlog 的(官方说是减少 fsync,起到了优化的作用)。只有当分布式事务提交的时候才会把前面的操作写入 binloq 信息,所以对于 binloq 来说,分布式事务与普通的事务没有区别,而 prepare 以前的操作信息都保存在连接的 IO CACHE 中,如果这个时候客户端退出了,以前的 binloq 信息都会被丢失,再次重连后允许提交的话,会造成 Binloq 丢失,从而造成主从数据的不一致,所以官方在客户端退出的时候直接把已经 prepare 的事务都回滚了!

MySQL >5.7 版本的优化:

MySQL 对于分布式事务,在 prepare 的时候就完成了写 Binlog 的操作,通过新增一种叫 XA-preparelog-event 的 event 类型来实现,这是与以前版本的主要区别(以前版本 prepare 时不写 Binlog)

(2)手动通过 JDBC 操作 MySQL XA 事务

MySQL Connector/J 从 5.0.0 版本之后开始直接提供对 XA 的支持,也就是提供了 Java 版本 XA 接口的实现。 意味着可以直接通过 Java 代码来执行 MySQL XA 事务。

模拟下订单减库存:

  1. 下订单:创建订单
  2. 扣库存:更新库存数量
-- 数据库如下CREATE DATABASE tx_msg_order;CREATE TABLE `order`  (  `id` bigint(20) NOT NULL COMMENT '主键',  `create_time` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0) COMMENT '创建时间',  `order_no` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '' COMMENT '订单编号',  `product_id` bigint(20) NULL DEFAULT 0 COMMENT '商品id',  `pay_count` int(11) NULL DEFAULT NULL COMMENT '购买数量',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '模拟订单' ROW_FORMAT = Dynamic;CREATE DATABASE tx_msg_stock;CREATE TABLE `stock`  (  `id` bigint(11) NOT NULL COMMENT '主键id',  `product_id` bigint(20) NULL DEFAULT 0 COMMENT '商品id',  `total_count` int(11) NULL DEFAULT 0 COMMENT '商品总库存',  PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '模拟库存' ROW_FORMAT = Dynamic;

代码如下:

public class Test {    public static void main(String[] args) throws SQLException {        // 创建订单库 RM实例        Connection orderConnection = DriverManager.getConnection(                "jdbc:mysql://127.0.0.1:3306/tx_msg_order?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false",                "test", "test");        // 这里的这个true参数,是说打印出来XA分布式事务的一些日志        XAConnection orderXAConnection = new MysqlXAConnection(                (com.mysql.jdbc.Connection)orderConnection, true);        // 这个XAResource其实你可以认为是RM(Resource Manager)的一个代码中的对象实例        XAResource orderResource = orderXAConnection.getXAResource();        // 创建库存库 的RM实例        Connection stockConnection = DriverManager.getConnection(                "jdbc:mysql://127.0.0.1:3306/tx_msg_stock?useUnicode=true&characterEncoding=UTF-8&useOldAliasMetadataBehavior=true&autoReconnect=true&failOverReadOnly=false&useSSL=false",                "test", "test");        XAConnection stockXAConnection = new MysqlXAConnection(                (com.mysql.jdbc.Connection)stockConnection, true);        XAResource stockResource = stockXAConnection.getXAResource();        // 下面俩东西是分布式事务id(txid)的构成部分        byte[] gtrid = "g12345".getBytes();        int formatId = 1;        try {            // 这是说在分布式事务中的订单库的子事务的标识            // 我们在订单库要执行的操作隶属于分布式事务的一个子事务,子事务有自己的一个标识            byte[] bqual1 = "b00001".getBytes();            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId); // 这个xid代表了订单库中的子事务            // 这就是说通过START和END两个操作,定义好了分布式事务中,订单库中要执行的SQL语句            // 但是这里的SQL绝对不会执行的,只是说先定义好我要在分布式事务中,这个数据库里要执行哪些SQL语句            orderResource.start(xid1, XAResource.TMNOFLAGS);            PreparedStatement orderPreparedStatement = orderConnection.prepareStatement(                    "INSERT INTO `order` (id, create_time, order_no, product_id,  pay_count) " +                            "VALUES (1, NOW(), 1, 1, 1)");            orderPreparedStatement.execute();            orderResource.end(xid1, XAResource.TMSUCCESS);            // 这是说在分布式事务中的库存库的子事务的标识            // 大家看下,库存库的子事务的xid中的,gtrid和formatId是一样的,bqual是不一样的            // 在一个分布式事务中,涉及到多个数据库的子事务,每个子事务的txid,有一部分是一样的,一部分是不一样的            byte[] bqual2 = "b00002".getBytes();            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);            // 这就是说通过START和END两个操作,定义好了分布式事务中,库存库中要执行的SQL语句            stockResource.start(xid2, XAResource.TMNOFLAGS);            PreparedStatement stockPreparedStatement = stockConnection.prepareStatement(                    "UPDATE stock SET total_count = total_count - 1 where id = 1");            stockPreparedStatement.execute();            stockResource.end(xid2, XAResource.TMSUCCESS);            // 到这里为止,其实还啥都没干呢,不过就是定义了分布式事务中的两个库要执行的SQL语句罢了            // 2PC的阶段一:向两个库都发送prepare消息,执行事务中的SQL语句,但是不提交            int orderPrepareResult = orderResource.prepare(xid1);            int stockPrepareResult = stockResource.prepare(xid2);            // 2PC的阶段二:两个库都发送commit消息,提交事务            // 如果两个库对prepare都返回ok,那么就全部commit,对每个库都发送commit消息,完成自己本地事务的提交            if (orderPrepareResult == XAResource.XA_OK                    && stockPrepareResult == XAResource.XA_OK) {                orderResource.commit(xid1, false);                stockResource.commit(xid2, false);            } else {                // 如果如果不是所有库都对prepare返回ok,那么就全部rollback                orderResource.rollback(xid1);                stockResource.rollback(xid2);            }        } catch (XAException e) {            e.printStackTrace();        }    }}

日志输出:

Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303031,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303031,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA START 0x673132333435,0x623030303032,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA END 0x673132333435,0x623030303032,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303031,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA PREPARE 0x673132333435,0x623030303032,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303031,0x1Thu Jul 07 14:41:56 CST 2022 DEBUG: Executing XA statement: XA COMMIT 0x673132333435,0x623030303032,0x1

查看数据库:

mysql> use tx_msg_order;mysql> select * from `order`;+---------------+---------------------+---------------+------------+-----------+| id            | create_time         | order_no      | product_id | pay_count |+---------------+---------------------+---------------+------------+-----------+|             1 | 2022-07-07 06:41:56 | 1             |          1 |         1 |+---------------+---------------------+---------------+------------+-----------+mysql> use tx_msg_stock;mysql> select * from stock;+----+------------+-------------+| id | product_id | total_count |+----+------------+-------------+|  1 |       1001 |        9999 ||  2 |          1 |        9995 |+----+------------+-------------+2 rows in set (0.00 sec)

(3)JTA 事务

JTA (Java Transaction API):为 J2EE 平台提供了分布式事务服务的能力。

JTA 规范是 XA 规范的 Java 版,即把 XA 规范中规定的 DTP 模型交互接口抽象成 Java 接口中的方法,并规定每个方法要实现什么样的功能。

JTA 定义的主要接口,位于 javax.transaction 包中:

  • Transaction 接口:让应用程序得以控制事务的开始、挂起、提交、回滚等。由 Java 客户端程序或EJB 调用。
  • TransactionManager 接口:用于应用服务器管理事务状态
  • Transaction 接口:用于执行相关事务操作
  • XAResource 接口:用于在分布式事务环境下,协调事务管理器和资源管理器的工作
  • Xid 接口:为事务标识符的 Java 映射

采用 JTA+ Atomikos 分布式事务框架:底层思想也是 2PC 原理

  • JTA :主要提供了事务管理器,即分布式事务流程管控的机制
  • Atomikos 框架:提供了分布式事务的 DataSource 数据源的支持

浅析 Atomikos 源码框架:

图片[6] - 强一致性分布式事务XA 浅析 - MaxSSL

  1. 测试:调用 createOrder()
# 程序日志输出:开启 logging.level.root=DEBUG复制代码

图片[7] - 强一致性分布式事务XA 浅析 - MaxSSL

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享