spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。
事务拦截器TransactionInterceptor
spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。
MethodInterceptor接口
MethodInterceptor接口的实现类封装aop切面拦截逻辑:
public interface MethodInterceptor extends Interceptor {/** * Implement this method to perform extra treatments before and after the invocation. */Object invoke(MethodInvocation invocation) throws Throwable;}
TransactionInterceptor类
TransactionInterceptor类封装了事务拦截逻辑:
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {// ...@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// Adapt to TransactionAspectSupport's invokeWithinTransaction...return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}// ...}
事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。
invokeWithinTransaction方法
protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass,final InvocationCallback invocation) throws Throwable {// 如果方法没有被Transactional注解标注,则返回null// 返回的是AnnotationTransactionAttributeSource对象// 用于获取Transactional注解相关属性,// 比如rollbackOn, propagationBehavior, isolationLevel等TransactionAttributeSource tas = getTransactionAttributeSource();TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);// 获取事务管理器// 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚 TransactionManager tm = determineTransactionManager(txAttr);// Reactive事务,略// 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚PlatformTransactionManager ptm = asPlatformTransactionManager(tm);// 获取事务方法的唯一标识// 格式为"类名.方法名"final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {// 创建事务TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);Object retVal;try {// This is an around advice: Invoke the next interceptor in the chain.// This will normally result in a target object being invoked.retVal = invocation.proceedWithInvocation();} catch (Throwable ex) {// Handle a throwable, completing the transaction.// We may commit or roll back, depending on the configuration.completeTransactionAfterThrowing(txInfo, ex);throw ex;} finally {// Reset the TransactionInfo ThreadLocal.// Call this in all cases: exception or normal return!cleanupTransactionInfo(txInfo);}if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {// Set rollback-only in case of Vavr failure matching our rollback rules...TransactionStatus status = txInfo.getTransactionStatus();if (status != null && txAttr != null) {retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);}}// 提交事务commitTransactionAfterReturning(txInfo);return retVal;} else {// CallbackPreferringPlatformTransactionManager事务管理器逻辑,略}}
创建事务
Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,TransactionAttribute txAttr, final String joinpointIdentification) {// If no name specified, apply method identification as transaction name.if (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {@Overridepublic String getName() {return joinpointIdentification;}};}TransactionStatus status = null;if (txAttr != null) {if (tm != null) {// 创建新事务或者返回已存在事务, 这取决于传播级别。// 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。status = tm.getTransaction(txAttr);}}// 使用指定的事务属性和TransactionStatus创建TransactionInforeturn prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
getTransaction(txAttr)
这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
public final TransactionStatus getTransaction(TransactionDefinition definition)throws TransactionException {// Use defaults if no transaction definition given.TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());// DataSourceTransactionManager实现类返回DataSourceTransactionObject对象// DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等Object transaction = doGetTransaction();boolean debugEnabled = logger.isDebugEnabled();if (isExistingTransaction(transaction)) {// Existing transaction found -> check propagation behavior to find out how to behave.return handleExistingTransaction(def, transaction, debugEnabled);}// No existing transaction found -> check propagation behavior to find out how to proceed.if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);// Creating new transactiontry {return startTransaction(def, transaction, debugEnabled, suspendedResources);} catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}} else {// Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}}
doGetTransaction()
protected Object doGetTransaction() {// DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等DataSourceTransactionObject txObject = new DataSourceTransactionObject();// 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为truetxObject.setSavepointAllowed(isNestedTransactionAllowed());// 从ThreadLocal获取当前线程上绑定的ConnectionHolder// ConnectionHolder对象保存着数据库连接// 业务方法第一次执行时为nullConnectionHolder conHolder =(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());txObject.setConnectionHolder(conHolder, false);return txObject;}
当前有事务handleExistingTransaction方法
if (isExistingTransaction(transaction)) {// Existing transaction found -> check propagation behavior to find out how to behave.return handleExistingTransaction(def, transaction, debugEnabled);}
isExistingTransaction方法判断当前是否存在事务:
protected boolean isExistingTransaction(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// 判断存在数据库连接且开启了事务return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());}
handleExistingTransaction方法:
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {// 传播级别为NEVERif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {// 传播级别设置为NEVER时,如果当前有事务,则抛出异常throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}// 传播级别为NOT_SUPPORTEDif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {// 挂起当前事务Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}// 传播级别为REQUIRES_NEWif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {// 挂起当前事务,然后创建新事务SuspendedResourcesHolder suspendedResources = suspend(transaction);try {return startTransaction(definition, transaction, debugEnabled, suspendedResources);} catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}// 传播级别为NESTEDif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}// Creating nested transactionif (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);status.createAndHoldSavepoint();return status;}else {// Nested transaction through nested begin and commit/rollback calls.// Usually only for JTA: Spring synchronization might get activated here// in case of a pre-existing JTA transaction.return startTransaction(definition, transaction, debugEnabled, null);}}// 传播级别为SUPPORTS/REQUIREDif (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException( "Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);}
传播级别为NEVER
传播级别设置为NEVER时,如果当前有事务,抛出异常:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}
传播级别为NOT_SUPPORTED
挂起当前事务,业务方法以无事务方式执行:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {// 挂起当前事务Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}
prepareTransactionStatus方法:
- 创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
- 使用TransactionSynchronizationManager将事务属性绑定到当前线程
- 初始化当前线程TransactionSynchronization集
由于传播级别为NOT_SUPPORTED所以此处不会开启事务。
传播级别为REQUIRES_NEW
挂起当前事务,创建新事务:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {// 挂起当前事务SuspendedResourcesHolder suspendedResources = suspend(transaction);try {// 开启新事务return startTransaction(definition, transaction, debugEnabled, suspendedResources);} catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}
开启新事务:
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {// 值为trueboolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);// 开启新事务doBegin(transaction, definition);// 初始化当前线程TransactionSynchronization集prepareSynchronization(status, definition);return status;}protected void doBegin(Object transaction, TransactionDefinition definition) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;Connection con = null;try {if (!txObject.hasConnectionHolder() ||txObject.getConnectionHolder().isSynchronizedWithTransaction()) {// 打开一个新连接Connection newCon = obtainDataSource().getConnection();txObject.setConnectionHolder(new ConnectionHolder(newCon), true);}txObject.getConnectionHolder().setSynchronizedWithTransaction(true);con = txObject.getConnectionHolder().getConnection();Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);txObject.setPreviousIsolationLevel(previousIsolationLevel);txObject.setReadOnly(definition.isReadOnly());// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,// so we don't want to do it unnecessarily (for example if we've explicitly// configured the connection pool to set it already).if (con.getAutoCommit()) {txObject.setMustRestoreAutoCommit(true);// 设置手动提交con.setAutoCommit(false);}// The default implementation executes a "SET TRANSACTION READ ONLY" statement // if the "enforceReadOnly" flag is set to true and the transaction definition // indicates a read-only transaction.prepareTransactionalConnection(con, definition);txObject.getConnectionHolder().setTransactionActive(true);int timeout = determineTimeout(definition);if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {txObject.getConnectionHolder().setTimeoutInSeconds(timeout);}// Bind the connection holder to the thread.if (txObject.isNewConnectionHolder()) {TransactionSynchronizationManager .bindResource(obtainDataSource(), txObject.getConnectionHolder());}} catch (Throwable ex) {if (txObject.isNewConnectionHolder()) {DataSourceUtils.releaseConnection(con, obtainDataSource());txObject.setConnectionHolder(null, false);}throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);}}
传播级别为NESTED
为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}// Creating nested transaction// 默认就是trueif (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);// 设置保存点status.createAndHoldSavepoint();return status;} else {// Nested transaction through nested begin and commit/rollback calls.// Usually only for JTA: Spring synchronization might get activated here// in case of a pre-existing JTA transaction.return startTransaction(definition, transaction, debugEnabled, null);}}
设置保存点:
public void createAndHoldSavepoint() throws TransactionException {setSavepoint(getSavepointManager().createSavepoint());}// JdbcTransactionObjectSupport#createSavepointpublic Object createSavepoint() throws TransactionException {ConnectionHolder conHolder = getConnectionHolderForSavepoint();try {if (!conHolder.supportsSavepoints()) {throw new NestedTransactionNotSupportedException("不支持");}if (conHolder.isRollbackOnly()) {throw new CannotCreateTransactionException("只读");}// 使用jdbc设置保存点return conHolder.createSavepoint();} catch (SQLException ex) {throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);}}
传播级别为SUPPORTS/REQUIRED/MANDATORY
// 默认falseif (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException( "Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
事务挂起
把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {// 判断当前线程的Set已经存在// TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法// 例如mybatis-spring中有SqlSessionSynchronization实现类if (TransactionSynchronizationManager.isSynchronizationActive()) {List suspendedSynchronizations = doSuspendSynchronization();try {Object suspendedResources = null;if (transaction != null) {// 挂起事务suspendedResources = doSuspend(transaction);}// 清除当前线程事务配置参数:事务名、只读属性、隔离级别等String name = TransactionSynchronizationManager.getCurrentTransactionName();TransactionSynchronizationManager.setCurrentTransactionName(null);boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();TransactionSynchronizationManager.setActualTransactionActive(false);// 把当前线程的事务相关信息封装起来以便后续恢复return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);} catch (RuntimeException | Error ex) {// doSuspend failed - original transaction is still active...doResumeSynchronization(suspendedSynchronizations);throw ex;}} else if (transaction != null) {// Transaction active but no synchronization active.Object suspendedResources = doSuspend(transaction);return new SuspendedResourcesHolder(suspendedResources);} else {// Neither transaction nor synchronization active.return null;}}private List doSuspendSynchronization() {List suspendedSynchronizations =TransactionSynchronizationManager.getSynchronizations();// 挂起所有的TransactionSynchronization// 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactoryfor (TransactionSynchronization synchronization : suspendedSynchronizations) {synchronization.suspend();}// 清除线程上的TransactionSynchronization集TransactionSynchronizationManager.clearSynchronization();return suspendedSynchronizations;}protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;txObject.setConnectionHolder(null);// ConnectionHolder对象return TransactionSynchronizationManager.unbindResource(obtainDataSource());}
事务恢复
protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)throws TransactionException {if (resourcesHolder != null) {Object suspendedResources = resourcesHolder.suspendedResources;if (suspendedResources != null) {// 恢复之前挂起的是ConnectionHolder对象doResume(transaction, suspendedResources);}List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;if (suspendedSynchronizations != null) {TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);TransactionSynchronizationManager .setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);doResumeSynchronization(suspendedSynchronizations);}}}protected void doResume(Object transaction, Object suspendedResources) {// 恢复之前挂起的是ConnectionHolder对象TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);}
当前无事务
// No existing transaction found -> check propagation behavior to find out how to proceed.if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {SuspendedResourcesHolder suspendedResources = suspend(null);// Creating new transactiontry {return startTransaction(def, transaction, debugEnabled, suspendedResources);} catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}} else {// Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}
创建TransactionInfo
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,TransactionAttribute txAttr, String joinpointIdentification,TransactionStatus status) {TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);if (txAttr != null) {// The transaction manager will flag an error if an incompatible tx already exists.txInfo.newTransactionStatus(status);} else {// The TransactionInfo.hasTransaction() method will return false. We created it only// to preserve the integrity of the ThreadLocal stack maintained in this class.}// We always bind the TransactionInfo to the thread, even if we didn't create// a new transaction here. This guarantees that the TransactionInfo stack// will be managed correctly even if no transaction was created by this aspect.txInfo.bindToThread();return txInfo;}
异常处理
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {if (txInfo != null && txInfo.getTransactionStatus() != null) {if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {try {txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());} catch (TransactionSystemException ex2) {logger.error("Application exception overridden by rollback exception", ex);ex2.initApplicationException(ex);throw ex2;} catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by rollback exception", ex);throw ex2;}} else {// We don't roll back on this exception.// Will still roll back if TransactionStatus.isRollbackOnly() is true.try {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());} catch (TransactionSystemException ex2) {logger.error("Application exception overridden by commit exception", ex);ex2.initApplicationException(ex);throw ex2;} catch (RuntimeException | Error ex2) {logger.error("Application exception overridden by commit exception", ex);throw ex2;}}}}
回滚:
public final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;processRollback(defStatus, false);}private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {boolean unexpectedRollback = unexpected;try {triggerBeforeCompletion(status);if (status.hasSavepoint()) {// 回滚到指定保存点status.rollbackToHeldSavepoint();} else if (status.isNewTransaction()) {// 事务回滚doRollback(status);} else {// Participating in larger transactionif (status.hasTransaction()) {if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {// 设置rollback-onlydoSetRollbackOnly(status);}}// Unexpected rollback only matters here if we're asked to fail earlyif (!isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = false;}}} catch (RuntimeException | Error ex) {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);// Raise UnexpectedRollbackException if we had a global rollback-only markerif (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}} finally {// 这里面有恢复挂起事务的逻辑cleanupAfterCompletion(status);}}// 回滚到指定保存点public void rollbackToSavepoint(Object savepoint) throws TransactionException {ConnectionHolder conHolder = getConnectionHolderForSavepoint();try {conHolder.getConnection().rollback((Savepoint) savepoint);conHolder.resetRollbackOnly();} catch (Throwable ex) {throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);}}// 释放保存点public void releaseSavepoint(Object savepoint) throws TransactionException {ConnectionHolder conHolder = getConnectionHolderForSavepoint();try {conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);} catch (Throwable ex) {logger.debug("Could not explicitly release JDBC savepoint", ex);}}// 事务回滚protected void doRollback(DefaultTransactionStatus status) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();Connection con = txObject.getConnectionHolder().getConnection();try {con.rollback();} catch (SQLException ex) {throw new TransactionSystemException("Could not roll back JDBC transaction", ex);}}private void cleanupAfterCompletion(DefaultTransactionStatus status) {status.setCompleted();if (status.isNewSynchronization()) {TransactionSynchronizationManager.clear();}if (status.isNewTransaction()) {// 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等// 将连接归还给数据源,清除ConnectionHolder的conndoCleanupAfterCompletion(status.getTransaction());}if (status.getSuspendedResources() != null) {// 恢复之前挂起的事务Object transaction = (status.hasTransaction() ? status.getTransaction() : null);resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());}}
事务清理
private void restoreThreadLocalStatus() {// Use stack to restore old transaction TransactionInfo.// Will be null if none was set.transactionInfoHolder.set(this.oldTransactionInfo);}
事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {if (txInfo != null && txInfo.getTransactionStatus() != null) {txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());}}
提交:
public final void commit(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;if (defStatus.isLocalRollbackOnly()) {// Transactional code has requested rollback// 回滚processRollback(defStatus, false);return;}if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {// Global transaction is marked as rollback-only but transactional code requested commit// 回滚processRollback(defStatus, true);return;}// 提交事务processCommit(defStatus);}// 提交事务private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {boolean unexpectedRollback = false;prepareForCommit(status);triggerBeforeCommit(status);triggerBeforeCompletion(status);beforeCompletionInvoked = true;if (status.hasSavepoint()) {unexpectedRollback = status.isGlobalRollbackOnly();// 释放保存点status.releaseHeldSavepoint();} else if (status.isNewTransaction()) {unexpectedRollback = status.isGlobalRollbackOnly();// 提交事务doCommit(status);} else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();}// Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}} catch (UnexpectedRollbackException ex) {// can only be caused by doCommittriggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;} catch (TransactionException ex) {// can only be caused by doCommitif (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);} else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;} catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}doRollbackOnCommitException(status, ex);throw ex;}// Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try {triggerAfterCommit(status);} finally {triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}} finally {cleanupAfterCompletion(status);}}