ShardingSphere分布式事务
ShardingJDBC支持的分布式事务方式有三种 LOCAL, XA , BASE,这三种事务的实现都不需要修改代码。
1)Local本地事务
本地事务通过Spring的@Transaction注解来实现,原生的@Transaction不具备分布式事务的功能,但是如果你使用了ShardingSphere的话,@Transaction就会增强,增强后支持分布式事务。
缺点:
- 对于跨数据库的情况下,无法保证事务的一致性。
- 在同一个事务下跨数据库操作时,第一个数据库操作正常,第二个数据库操作失败报错了,并不会进行回滚的。
但是,如果引入了ShardingSphere时,增强后的@Transaction 能保证 同一个事务下的事务一致性,对于出错的事务会及时进行回滚。
回滚实验
环境说明:
这里有2个数据库 user、user2,每个数据库中各有一张course_2表,而插入的数据中userid的value都是1,由于user2的course_2表中 userid字段我们设置成UNIQUE 唯一性,这样在user2数据库的course_2表插入数据时必然会报错,如果报错后 回滚成功,说明 可以保证分布式事务的数据一致性。
没开启事务时
1 2 3 4 5 6 7 8 9 10 11
| @Test
void testAddCourse() { for (int i = 0; i < 10; i++) { Course entity = new Course(); entity.setCname("name"+i); entity.setCstatus("1"); entity.setUserid(1L); courseMapper.insert(entity); } }
|
执行结果:
由于user2的course_2表userid字段设置了唯一索引,那么在往user2的course_2表添加数据时候必然会报错,但是并没有进行数据回滚。
开启事务并执行方法
1 2 3 4 5 6 7 8 9 10 11 12
| @Test
@Transactional void testAddCourse() { for (int i = 0; i < 10; i++) { Course entity = new Course(); entity.setCname("name"+i); entity.setCstatus("1"); entity.setUserid(1L); courseMapper.insert(entity); } }
|
如图所示,报错后 2个数据库都回滚了,说明 分布式事务生效了。
2)XA事务
- XA是通过ShardingJDBC作为事务协调者来实现 分库分表的分布式事务。
- XA是一种分布式事务规范,而实现XA事务的方式有很多,如narayana,这里使用的是Atomokios ,ShardingSphere默认的事务管理器。
回滚实验
1、引入Maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-spring-boot-starter</artifactId> <version>4.1.1</version> </dependency>
<dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-transaction-xa-core</artifactId> <version>4.1.1</version> </dependency>
|
2、配置事务管理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.example.shardingjdbc.config;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
@Configuration @EnableTransactionManagement public class TransactionConfiguration {
@Bean public PlatformTransactionManager txManager(final DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } @Bean public JdbcTemplate jdbcTemplate(final DataSource dataSource) { return new JdbcTemplate(dataSource); } }
|
1、配置@EnableTransactionManagement注解,启用事务管理
2、注入TransactionManager对象,其中对于这个事务管理器的重点就是要使用ShardingDatasource
3、执行测试方法
1 2 3 4 5 6 7 8 9 10 11 12
| @Test @Transactional @ShardingTransactionType(TransactionType.XA) void testAddCourse() { for (int i = 0; i < 10; i++) { Course entity = new Course(); entity.setCname("name"+i); entity.setCstatus("1"); entity.setUserid(1L); courseMapper.insert(entity); } }
|
重点在@ShardingTransactionType注解中声明XA类型的事务
执行结果
如图所示,由于user2的course_2表userid字段设置了唯一索引,那么在往user2的course_2表添加数据时候必然会报错,但是数据进行了回滚,XA事务测试成功。
注意:一定要配置XA事务管理器,否则事务会失效。
3)BASE柔性事务
柔性事务是基于Seata的AT模式进行二阶段提交来实现事务管理的,官方建议Seata注册到Nacos中,所以使用Seata之前要先部署Nacos。
涉及到的工具及下载地址:
nacos
nacos官网:https://nacos.io/zh-cn/
服务端2.X:https://files.javaxing.com/nacos-server-2.2.1.tar.gz
seata
源码:https://files.javaxing.com/seata-1.4.1.tar.gz
服务端:https://files.javaxing.com/seata-server-1.4.0.tar.gz
1.部署Nacos
下载压缩包并解压
1 2
| [root@S2 nacos]# wget https://files.javaxing.com/nacos-server-2.2.1.tar.gz [root@S2 nacos]# tar -xvf nacos-server-2.2.1.tar.gz
|
进入nacos文件夹并修改配置文件application.properties
1 2 3
| [root@S2 ~]# cd nacos [root@S2 nacos]# cd conf [root@S2 conf]# vim application.properties
|
搜索关键字 secret.key ,设置secret key,规则是加密密钥长度 >=256 bits 或者 >= 32 bytes,这里我们添加50位随机字母
1 2 3 4
| nacos.core.auth.plugin.nacos.token.secret.key=v2V3y2n3pGS7HjdacadEzk4IoJf3LYVyWooJbqqd7HxCpgu8jeO
# 在vim编辑器下保存文件,可以使用:x 或 :wq :x
|
以单机实例运行nacos
注意,nacos是用java写的,运行nacos之前要确保本机有jdk 8的环境,并且设置了 环境变量,否则会运行失败
1
| [root@S2 bin]# sh startup.sh -m standalone
|
查看运行日志
1
| [root@S2 bin]# tail -f /root/nacos/logs/start.out
|
运行结果显示successfully 代表nacos运行成功,默认端口号 8848,登录地址:http://ip:8848/nacos 账号和密码默认都是:nacos
2.部署Seata
上面有提供seata源码和服务端的下载地址,此时我们需要解压seata源码,复制里面的2个文件到seata服务端,因为我们需要seata为我们提供的注册配置信息。
复制 script/config-center/config.txt 和 script/config-center/nacos/nacos-config.sh 到seata目录
解压seata服务端
把上述2个文件,按以下的顺序复制到seata的目录中。
seata-server.sh 复制到 seata/bin
config.txt 复制到 seata 根目录
初始化seata配置到nacos注册中心
1
| sh nacos-config.sh localhost
|
localhost 是指 nacos注册中心的IP地址
初始化结果:
nacos注册中心:
3.修改seata配置文件
创建registry.conf文件,指定seata的注册中心信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" loadBalance = "RandomLoadBalance" loadBalanceVirtualNodes = 10
# 配置nacos注册信息,这里用的nacos,那么其他的注册中心都可以不用配置,也可以删掉 nacos { application = "seata-server" serverAddr = "10.211.55.14:8848" namespace = "public" group = "SEATA_GROUP" cluster = "default" username = "nacos" password = "nacos" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "192.168.65.232:6379" db = 0 password = "123456" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } }
config { # file、nacos 、apollo、zk、consul、etcd3 type = "nacos"
# 配置中心信息 nacos { application = "seata-server" serverAddr = "10.211.55.14:8848" namespace = "29ccf18e-e559-4a01-b5d4-61bad4a89ffd" group = "SEATA_GROUP" cluster = "default" username = "nacos" password = "nacos" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" apolloAccesskeySecret = "" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
|
这个配置里,是将seata的服务注册到nacos上,配置也从nacos上获取。
registry部分对应seata注册到nacos上的服务,config部分对应seata注册到nacos上的配置。
4.启动seata
1 2 3 4
| sh seata-server.sh -p $LISTEN_PORT -m $STORE_MODE -h $IP(此参数可选)
# 例如 sh seata-server.sh -p 18848 -m file -h 10.211.55.14
|
$LISTEN_PORT: seata服务端的端口号,默认8848,如果有nacos的话记得修改成其他端口号,避免和nacos端口号冲突
$STORE_MODE: 事务操作记录存储模式:file、db。可以在registry.conf文件中配置,默认file。
$IP(可选参数): 用于多 IP 环境下指定 Seata-Server 注册服务的IP。单网卡不需要配置。
5.在代码中启用事务
先引入相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-transaction-base-seata-at</artifactId> <version>4.1.1</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.4.1</version> </dependency>
|
注意,seata依赖的版本要与seata服务端一致,否则会出现很多不可预料的问题。
复制seata.conf文件到classpath目录
在classpath下增加seata.conf,ShardingSphere的SeataATShardingTransactionManager会读取这个配置文件。
1 2 3 4
| client { application.id = boot-test transaction.service.group = my_test_tx_group }
|
注意:application.id可以随意配置,但是transaction.service.group这个事务组不能随意配,需要在server端进行配置。对应 service.vgroupMapping.my_test_tx_group key =default 这个key中的后面一部分。
复制registry.conf文件到classpath目录
注意:该registry.conf文件要与服务端一致
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "nacos" loadBalance = "RandomLoadBalance" loadBalanceVirtualNodes = 10
# 配置nacos注册信息,这里用的nacos,那么其他的注册中心都可以不用配置,也可以删掉 nacos { application = "seata-server" serverAddr = "10.211.55.14:8848" namespace = "public" group = "SEATA_GROUP" cluster = "default" username = "nacos" password = "nacos" } eureka { serviceUrl = "http://localhost:8761/eureka" application = "default" weight = "1" } redis { serverAddr = "192.168.65.232:6379" db = 0 password = "123456" cluster = "default" timeout = 0 } zk { cluster = "default" serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } consul { cluster = "default" serverAddr = "127.0.0.1:8500" } etcd3 { cluster = "default" serverAddr = "http://localhost:2379" } sofa { serverAddr = "127.0.0.1:9603" application = "default" region = "DEFAULT_ZONE" datacenter = "DefaultDataCenter" cluster = "default" group = "SEATA_GROUP" addressWaitTime = "3000" } file { name = "file.conf" } }
config { # file、nacos 、apollo、zk、consul、etcd3 type = "nacos"
# 配置中心信息 nacos { application = "seata-server" serverAddr = "10.211.55.14:8848" namespace = "29ccf18e-e559-4a01-b5d4-61bad4a89ffd" group = "SEATA_GROUP" cluster = "default" username = "nacos" password = "nacos" } consul { serverAddr = "127.0.0.1:8500" } apollo { appId = "seata-server" apolloMeta = "http://192.168.1.204:8801" namespace = "application" apolloAccesskeySecret = "" } zk { serverAddr = "127.0.0.1:2181" sessionTimeout = 6000 connectTimeout = 2000 username = "" password = "" } etcd3 { serverAddr = "http://localhost:2379" } file { name = "file.conf" } }
|
6.测试代码
1 2 3 4 5 6 7 8 9 10 11 12
| @Test @Transactional @ShardingTransactionType(TransactionType.BASE) void testAddCourse() { for (int i = 0; i < 10; i++) { Course entity = new Course(); entity.setCname("name"+i); entity.setCstatus("1"); entity.setUserid(1L); courseMapper.insert(entity); } }
|
测试结果:
回滚成功:
ShardingJDBC分布式事务特性
1)XA事务
XA是由X/Open组织提出的分布式事务的规范,现在主流的数据库都已经实现了XA事务,但是值得注意的是,MySQL只有InnoDB引擎才支持XA事务。
1.MySQL的XA事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| # 开启事务,test是事务ID,将事务置于ACTIVE状态 XA START 'test';
# 执行SQL INSERT into course_2 VALUES(10,'test',33,30);
# 结束事务 XA END 'test'; # 预提交事务 XA PREPARE 'test';
# 当我们进行预提交后,可以考虑是否要提交事务,还是回滚事务
# 提交事务 XA COMMIT 'test'; # 回滚事务 XA ROLLBACK 'test';
|
XA事务中,事务是有事务状态的,如果对于一个ACTIVE状态的事务进行COMMIT提交,XA就会两阶段锁死,mysql就会抛出异常
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the ACTIVE state
正确的执行顺序:开启事务(start)-> 执行SQL -> 结束事务(end) -> 预提交事务(PREPARE) -> 提交或回滚事务(commit、rollback)
2.JDBC XA事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class MysqlXAConnectionTest { public static void main(String[] args) throws SQLException { boolean logXaCommands = true; Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root"); XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands); XAResource rm1 = xaConn1.getXAResource(); Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root","root"); XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands); XAResource rm2 = xaConn2.getXAResource(); byte[] gtrid = "g12345".getBytes(); int formatId = 1; try { byte[] bqual1 = "b00001".getBytes(); Xid xid1 = new MysqlXid(gtrid, bqual1, formatId); rm1.start(xid1, XAResource.TMNOFLAGS); PreparedStatement ps1 = conn1.prepareStatement("INSERT into user(name) VALUES ('tianshouzhi')"); ps1.execute(); rm1.end(xid1, XAResource.TMSUCCESS); byte[] bqual2 = "b00002".getBytes(); Xid xid2 = new MysqlXid(gtrid, bqual2, formatId); rm2.start(xid2, XAResource.TMNOFLAGS); PreparedStatement ps2 = conn2.prepareStatement("INSERT into user(name) VALUES ('wangxiaoxiao')"); ps2.execute(); rm2.end(xid2, XAResource.TMSUCCESS); int rm1_prepare = rm1.prepare(xid1); int rm2_prepare = rm2.prepare(xid2); boolean onePhase = false; if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK ) { rm1.commit(xid1, onePhase); rm2.commit(xid2, onePhase); } else { rm1.rollback(xid1); rm1.rollback(xid2); } } catch (XAException e) { e.printStackTrace(); } }
|
gtrid 是全局事务标识符
bqual 是一个分支限定符,如果没有提供默认值就是空字符串
formatID 是一个数字,用于标记gtrid和bqual值的格式,这是一个正整数,最小为0,默认值就是1。
3.XA缺点
- XA事务无法自动提交
- 如果在第一阶段或第二阶段时抛了异常,就会出现事务锁死的情况,其他线程就无法正常使用事务
- XA事务效率非常低下,全局事务的状态都需要持久化。性能非常低下,通常耗时能达到本地事务的10倍。
- XA事务在提交前出现故障的话,很难将问题隔离开。
2)Base柔性事务
柔性事务是指 Basic Available(基本可用)、Soft-state(软状态/柔性事务)、Eventual Consistency(最终一致性)。
核心思想:既然无法保证分布式事务实时的强一致性,那就采用合适的方式来保证最终结果一致性(即使过程中数据不一致,最终结果一致也可以接受)。
如:客户下订单时需要处理的信息有订单信息、物流信息、地址信息,在处理的过程中允许信息不一致,但是当订单信息处理完之后必须保证数据一致性。
柔性事务几种处理模式:
- 最大努力通知型
- 分布式事务的参与方都将自己的处理结果通知给其他参与者,只能说尽力而为,不能确保结果一致性,适用于很多跨公司、流程复杂的场景。
- 如:客户在电商网站下单后,电商系统调用了支付系统获得支付地址,而客户支付成功后,支付系统告知电商系统需要修改订单状态为成功,至于电商系统是否最终会修改订单状态为成功,无法保证
- 补偿性
- 不保证事务的实时一致性,对于未一致的数据在事后进行补偿。
- 如电商系统和支付系统,在一定的账期(时间)内允许不一致,但是通过定期对账(数据对比),尽量保证双方最终的一致性。
- 异步确保型
- 典型的场景就是RocketMQ的事务消息机制。通过不断的异步确认,保证分布式事务的最终一致性。
- 两阶段型
- 第一阶段准备阶段:分布式事务的各个参与方都提交自己的本地事务,并且锁定相关的资源。
- 第二阶段提交阶段:由一个第三方的事务协调者综合处理各方的事务执行情况,通知各个参与方统一进行事务提交或者回退。
- 三阶段型
- 二阶段的缺点
- 由于两阶段协议在准备阶段需要锁定资源,例如在数据库中,就是要加行锁,防止其他事务来修改数据,但是这样会导致在第二个阶段数据无法正常回滚。
- 三阶段询问阶段
- 三阶段协议会在两阶段的准备阶段之前加一个询问阶段,在这一阶段,事务协调者只是询问各个参与方是否做好了准备。
- 如:对于redis,就判断redis是否建立好了连接,数据库则是是否建立好了JDBC连接等等
- 二阶段和三阶段的区别
- 二阶段会锁资源,而三阶段不锁资源,并且对事务的一致性更强
- 二阶段对代码入侵较小,三阶段对代码入侵比较大,所有业务都按照三阶段的要求改造成TCC的模式。所以三阶段适合于一些对分布式事务准确性和时效性要求非常高的场景
- 三阶段的示例
- 银行转账示例,A需要向B支付100元。使用TCC,在try阶段,通常会要求给订单设定一个状态UPDATING,同时A减少100元,B增加100元,并且将A需要减少的100元与B需要增加的100元这两个数据都单独记录下来,相当于锁定库存。这样可以用来实现类似锁资源的效果。然后在后续的confirm或者cancel操作中,将事务最终进行对齐。在这一步,首先需要修改订单状态,然后修改A和B的账户。这里注意,给A和B调整的账户都需要从锁定的资源中取,而不能凭空修改账户的数据。
- SAGA模式
- 由分布式事务的各个参与方自己提供正向事务操作命令以及对应的反向回滚命令。事务协调者可以在各个参与方提交事务后,随时协调各个事务参与方进行回滚。
- 每个SAGA事务都会包含T1,T2,T3….Tn一个步骤队列,并且每个操作都会预先携带回滚命令,而SAGA事务要保证所有步骤都成功,如果有一个步骤失败了,所有的参与方都从该步骤开始往前回滚。
- 例如对于客户扣款100块钱的操作,电商网站和支付宝都提供扣减客户100块钱的操作作为正向事务,同时也提供给客户加100块钱余额的操作作为逆向操作(备用的,如果步骤失败了回滚的时候用)。
- 这样事务协调者可以在检查电商网站和支付宝的扣款行为后,随时通知他们进行回滚。
- 适合于事务流程比较长,参与方比较多的场景。
ShardingSphere柔性事务总结
ShardingSphere中对于柔性事务的支持,更多的基于Seata的AT模式,来实现的两阶段提交。
虽然XA和AT都是基于两阶段协议提供的实现,但是AT模式相比XA模式,简化了对于资源锁的要求,在大部分的业务场景下,AT模式比XA模式性能稍高。
ShardingProxy分布式事务
如果想使用XA事务,直接调整配置就可以了,因为ShardingProxy默认就集成了XA的jar包,支持XA事务,默认事务管理器是Atomikos。