一、背景
本文主要介绍了spring多线程事务的解决方案,心急的小伙伴可以跳过上面的理论介绍分析部分直接看最终解决方案。
在我们日常的业务活动中,经常会出现大规模的修改插入操作,比如在3.0的活动赛事创建,涉及到十几张表的插入(一张表可能插入一行或者多行数据),由于单线程模型的关系,所有的sql都是串行,即后面的sql必须都要等到前面的sql执行完成才能继续。但是在很多场景下,sql的执行顺序并不影响业务的结果,面对这样的场景,我们很自然的想到了使用异步的方式去处理,可是我们同时又希望整个创建操作是事务性的,即要全部成功,要么全部失败,但是单纯的使用异步线程并不能达到我们理想的效果。
这个时候,我们需要一种多线程下保证事务的解决方案。
代码片段,大量的同步保存操作
public void much(){//业务操作1doBusiness1();//业务操作2doBusiness2();//业务操作3doBusiness3();//业务操作4doBusiness4();}private void doBusiness1() {//执行sql1//执行sql2//执行sql3//执行sql4}
每个业务操作可以是相关联的,也有可能是完全无关的,但如果做成异步的话我们就无法保证事务,怎么去解决这个问题呢?
二、理论先行
1.事务介绍
我们先确定spring事务的本质是什么,spring本身不支持事务,spring实现事务只是对我们原有的业务逻辑做了一层包装,他替我们决定了什么时候开启事务,什么情况下应该向数据库提交,什么时候回滚,及实现我们设置的一些事务参数,包括回滚的条件,传播类型等。
我们所熟知的spring事务有两种主流的解决方式,一种是声明式事务,一种是编程式事务。
先来讲我们最常用的声明式事务。
1.1声明式事务
声明式事务就是我们最常用的@Transactional注解,通常我们只需要在我们想交由spring控制的事务方法上加上注解即可,这个注解有一些重要的参数,由于不是本文重点,就不在此展开。这是一个经典的spring的aop实现,为了弄清楚在加上@Transactional注解后spring到底为我们做了什么,我们可以从两方面入手,一是spring如何给我们生成相应的代理对象,二是这个代理对象为我们做了什么。
事务的开始是由@EnableTransactionManagement 注解产生,这个注解在运行时会导入TransactionManagementConfigurationSelector这个类,这个类本质上是一个ImportSelector,他根据adviceMode将特定的配置类导入进去,分别为AutoProxyRegistrar 后置处理器和ProxyTransactionManagementConfiguration Advisor。
AutoProxyRegistrar 实现了ImportBeanDefinitionRegistrar 重写了registerBeanDefinitions 方法
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {boolean candidateFound = false;Set annTypes = importingClassMetadata.getAnnotationTypes();for (String annType : annTypes) {// ...AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);}// ...}@Nullablepublic static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, @Nullable Object source) {return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);}
该方法最终注入了InfrastructureAdvisorAutoProxyCreator。InfrastructureAdvisorAutoProxyCreator这个类就是一个bean的后置处理器,最终的作用就是处理需要的代理对象。
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {if (bean != null) {Object cacheKey = getCacheKey(bean.getClass(), beanName);if (this.earlyProxyReferences.remove(cacheKey) != bean) {return wrapIfNecessary(bean, beanName, cacheKey);}}return bean;}protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// ...// 拿当前bean去匹配容器中的 Advisors,如果找到符合的就生成代理对象// Create proxy if we have advice.Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));this.proxyTypes.put(cacheKey, proxy.getClass());return proxy;}this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}
ProxyTransactionManagementConfiguration的作用就是来生成具体的Advisor,他注册了三个bean,
该类主要完成以下几个任务:
创建TransactionAttributeSource对象:用于解析@Transactional注解并生成事务属性。
创建TransactionInterceptor对象:用于创建事务通知,将事务属性应用到目标方法,这其实就是一个事务模板,如下所示
protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass, final InvocationCallback invocation) throws Throwable {//TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源//可以看做是一个存放TransactionAttribute与method方法映射的池子TransactionAttributeSource tas = getTransactionAttributeSource();//获取当前事务方法对应的TransactionAttributefinal TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);//定位TransactionManagerfinal TransactionManager tm = determineTransactionManager(txAttr);.....//类型转换为局部事务管理器PlatformTransactionManager ptm = asPlatformTransactionManager(tm);final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {//TransactionManager根据TransactionAttribute创建事务后返回//TransactionInfo封装了当前事务的信息--包括TransactionStatusTransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);Object retVal;try {//继续执行过滤器链---过滤链最终会调用目标方法//因此可以理解为这里是调用目标方法retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {//目标方法抛出异常则进行判断是否需要回滚completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {//清除当前事务信息cleanupTransactionInfo(txInfo);}...//正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)commitTransactionAfterReturning(txInfo);return retVal;}...
创建TransactionAdvisor对象:将事务通知和切点(Pointcut)组合成Advisor。
创建TransactionAttributeSourceAdvisor对象:将事务属性和切点组合成Advisor
@Configuration(proxyBeanMethods = false)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();advisor.setTransactionAttributeSource(transactionAttributeSource);advisor.setAdvice(transactionInterceptor);if (this.enableTx != null) {advisor.setOrder(this.enableTx.getNumber("order"));}return advisor;}@Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public TransactionAttributeSource transactionAttributeSource() {// TransactionAttributeSource 是一个接口,具体注入的是 Annotationxxxxreturn new AnnotationTransactionAttributeSource();}@Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {TransactionInterceptor interceptor = new TransactionInterceptor();interceptor.setTransactionAttributeSource(transactionAttributeSource);if (this.txManager != null) {interceptor.setTransactionManager(this.txManager);}return interceptor;}}
@Nullableprivate TransactionAttributeSource transactionAttributeSource;private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {@Override@Nullableprotected TransactionAttributeSource getTransactionAttributeSource() {return transactionAttributeSource;}};/** * Set the transaction attribute source which is used to find transaction * attributes. This should usually be identical to the source reference * set on the transaction interceptor itself. * @see TransactionInterceptor#setTransactionAttributeSource */public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {this.transactionAttributeSource = transactionAttributeSource;}/** * Set the {@link ClassFilter} to use for this pointcut. * Default is {@link ClassFilter#TRUE}. */public void setClassFilter(ClassFilter classFilter) {this.pointcut.setClassFilter(classFilter);}@Overridepublic Pointcut getPointcut() {return this.pointcut;}
可以见到里面已经包含了pointcut,这就能将我们需要被增加的事务方法找出。
ProxyTransactionManagementConfiguration负责将需要包装的bean和方法找出并包装成advisor,InfrastructureAdvisorAutoProxyCreator根据advisor生成相应的代理对象。
小结:InfrastructureAdvisorAutoProxyCreator遍历容器中的bean,尝试去自动代理,匹配的工作就交由advisor中的point,如果匹配成功就为其创建代理对象,这个代理对象中放入了TransactionInterceptor拦截器
,等到相关方法调用时,调用的是代理对象的方法,然后通过责任链模式通过TransactionInterceptor处理,以此来进行事务的操作。
声明式事务的介绍先到这里,接下来我们来介绍下编程式事务。
1.2编程式事务
编程式事务的核心就是将spring为我们做好的那些步骤拆出来,交由开发者去控制事务何时开启、提交、回滚,他的运行本质和声明式事务并没有两样。
模板如下
public class TransactionMain {public static void main(String[] args) throws ClassNotFoundException, SQLException {test();} private static void test() {DataSource dataSource = getDS();JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置//包括隔离级别和传播行为等DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务TransactionStatus ts = jtm.getTransaction(transactionDef);//进行业务逻辑操作try {update(dataSource);jtm.commit(ts);}catch (Exception e){jtm.rollback(ts);System.out.println("发生异常,我已回滚");}}private static void update(DataSource dataSource) throws Exception {JdbcTemplate jt = new JdbcTemplate();jt.setDataSource(dataSource);jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");throw new Exception("我是来捣乱的");} }
三、方案探索
1.直接使用多线程
我们在开启代码中事务,并在业务逻辑中直接使用多线程,是否能保证事务?
@Transactionalpublic void testDirect() {new Thread(()->{Per per = new Per();per.setName("t1");perService.save(per);}).start();new Thread(()->{Per per1 = new Per();per1.setName("t2");perService.save(per1);throw new RuntimeException("Exception test");}).start();}
显然,这种方式并不能保证事务,哪怕加上了事务注解,因为子线程抛出的异常并不能在主线程中捕获,也不能被其他线程感知到。
2.事务模板中使用多线程
package com.user.util; import lombok.RequiredArgsConstructor;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.stereotype.Component;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.support.DefaultTransactionDefinition; import javax.sql.DataSource;import java.util.ArrayList;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;@Component @RequiredArgsConstructor public class MultiplyThreadTransactionManager { /** * 如果是多数据源的情况下,需要指定具体是哪一个数据源*/private final DataSource dataSource; public void runAsyncButWaitUntilAllDown(List tasks, Executor executor) {if(executor==null){throw new IllegalArgumentException("线程池不能为空");}DataSourceTransactionManager transactionManager = getTransactionManager();//是否发生了异常AtomicBoolean ex=new AtomicBoolean(); List taskFutureList=new ArrayList(tasks.size());List transactionStatusList=new ArrayList(tasks.size()); tasks.forEach(task->{taskFutureList.add(CompletableFuture.runAsync(() -> {try{//1.开启新事务transactionStatusList.add(openNewTransaction(transactionManager));//2.异步任务执行task.run();}catch (Throwable throwable){//打印异常throwable.printStackTrace();//其中某个异步任务执行出现了异常,进行标记ex.set(Boolean.TRUE);//其他任务还没执行的不需要执行了taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));}}, executor));}); try {//阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} //发生了异常则进行回滚操作,否则提交if(ex.get()){System.out.println("发生异常,全部事务回滚");transactionStatusList.forEach(transactionManager::rollback);}else {System.out.println("全部事务正常提交");transactionStatusList.forEach(transactionManager::commit);}}private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) { //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置 //包括隔离级别和传播行为等 DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition(); //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务 return transactionManager.getTransaction(transactionDef); }private DataSourceTransactionManager getTransactionManager() { return new DataSourceTransactionManager(dataSource); } }
测试
public void test(){List tasks=new ArrayList(); tasks.add(()->{Per per = new Per();per.setName("t1");perService.save(per);});tasks.add(()->{Per per = new Per();per.setName("t2");perService.save(per);});multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool()); }
执行结果
java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to threadat org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:198) ~[spring-tx-5.3.10.jar:5.3.10]at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:371) ~[spring-jdbc-5.3.10.jar:5.3.10]at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:992) ~[spring-tx-5.3.10.jar:5.3.10]at org.springframework.transaction.suppoAbstractPlatformTransactionrt.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager
结果报了这个错,这个错误信息室找不到绑定在线程上的key为HikariDataSource的资源,因为事务资源都是绑定在线程上的,当事务提交或者回滚时,他需要寻找绑定在当前线程上的资源,如果找不到,就会报错。
原理剖析:
首先我们找到绑定线程资源的关键方法org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
/** * Bind the given resource for the given key to the current thread. * @param key the key to bind the value to (usually the resource factory) * @param value the value to bind (usually the active resource object) * @throws IllegalStateException if there is already a value bound to the thread * @see ResourceTransactionManager#getResourceFactory() */public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");Map
根据debug会发现,spring在开启事务时会自动为我们调用这个方法,绑定key为HikariDataSource,value为ConnectionHolder到threadlocal中。第二次sql执行时会绑定key为DefaultSqlSessionFactory,value为DefaultSqlSessionFactory。
既然讲到了事务资源的绑定时机,下面就顺便讲一下这两种资源在何时释放。我们再回顾一下事务的执行流程及机制。spring处理事务的原理就是基于aop,每个需要实现事务的方法都要通过TransactionInterceptor这个拦截器,通过这个拦截器去实现事务增强。
@Override@Nullablepublic Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class
进而可以推断,rollback操作也是同样的道理,有兴趣的小伙伴可以自己debug一下,继续走下去,观察事务管理器为我们做了什么。
@Overridepublic final void commit(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;if (defStatus.isLocalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Transactional code has requested rollback");}processRollback(defStatus, false);return;}if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");}processRollback(defStatus, true);return;}processCommit(defStatus);}
到此,spring的整个事务流程就已经非常清晰了,我们想要实现多事务管理的方法也找到了,难就是去控制事务的资源。只要我们拿到了相应的事务资源,然后在创建自己的事务管理器控制事务何时提交或者回滚,这样我们就可以实现一个多线程同时提交回滚,类似于二阶段提交的操作,来达到多线程事务的统一。
3.多线程事务管理器
不多说,直接上代码看最终版本
package com.controller;import lombok.Builder;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.slf4j.MDC;import org.springframework.jdbc.datasource.DataSourceTransactionManager;import org.springframework.stereotype.Component;import org.springframework.transaction.TransactionStatus;import org.springframework.transaction.support.DefaultTransactionDefinition;import org.springframework.transaction.support.TransactionSynchronization;import org.springframework.transaction.support.TransactionSynchronizationManager;import org.springframework.util.CollectionUtils;import javax.sql.DataSource;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;/** * 多线程事务管理 */@Component@Slf4j@RequiredArgsConstructorpublic class MultiplyThreadTransactionManager {/** * 如果是多数据源的情况下,需要指定具体是哪一个数据源 */private final DataSource dataSource;private final static ThreadLocal immediatelyCommitFlag = new ThreadLocal();private final static ThreadLocal<List> transactionStatusListThreadLocal = new ThreadLocal();private final static ThreadLocal<List> transactionResourcesthreadLocal = new ThreadLocal();private final static ThreadLocal<Map
验证
@Transactionalpublic String test(Integer par) {log.info("get(" + par + ")");if (par == 3 || par == 5 || par == 6) {Per per2 = new Per();per2.setName("t3");per2.setGrou(Thread.currentThread().getName());perService.save(per2);}List list = new ArrayList();list.add(() -> {Per per = new Per();per.setName("t1");per.setGrou(Thread.currentThread().getName());log.info("任务开始save");perService.save(per);log.info("任务完成save");if (par == 1) {throw new RuntimeException();}});list.add(() -> {Per per1 = new Per();per1.setName("t2");per1.setGrou(Thread.currentThread().getName());log.info("任务开始save");perService.save(per1);log.info("任务完成save");if (par == 2) {throw new RuntimeException();}});log.info("runAsyncButWaitUntilAllDown start");multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(list, false);if (par == 4 || par == 5 || par == 6) {Per per3 = new Per();per3.setName("t4");per3.setGrou(Thread.currentThread().getName());perService.save(per3);if (par == 6) {throw new RuntimeException();}}log.info("multiplyThreadTransactionCommit start");multiplyThreadTransactionManager.multiplyThreadTransactionCommit();return "ss";}
有兴趣的小伙伴可以做进一步测试。
在本公司项目组中利用这个机制优化现有的业务,性能提升了约70%。
比起网上常见的多线程事务管理器,主要做了如下增强
1.支持在已存在事务下运行。
在很多场景下,我们可能会遇到多线程事务外还存在其他事物的场景下,我们需要支持兼容多种事务环境。
2.支持自定义提交时机。
有时候我们不希望事务立马提交,希望他能够和外围事务保持一致,这时候可以将runAsyncButWaitUntilAllDown的immediatelyCommit参数写成false,并手动调用multiplyThreadTransactionCommit方法去主动提交。
我们需要注意的地方,任何事都是有舍有得的,耗时的显著降低是因为利用了更多的资源,比如线程资源和数据库连接资源,尤其是数据库连接资源,更是额外宝贵,我们一定要合理评估我们的每一项决策是否有意义,风险和回报是否成正比。
还有一点需要注意,在极高并发的情况下,多线程事务容易造成死锁,因为当主事务开启的情况下,他要为他下面的子线程事务开启连接,当连接不够时就容易造成循环等待。一个比较好的做法是提前获得所有连接,并设置一个合理的超时时间
如果有小伙伴遇到了其他相关疑问,或者使用此代码发现了问题,欢迎留言讨论,共同进步。
文章转载自:doFix
原文链接:https://www.cnblogs.com/fix200/p/18066537
体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构