Spring事务源码解析之内嵌事务

缘起

【1】中基本分析了Sping 事务的流程. 但是留了一个问题——内嵌事务没有分析,【1】都是基于独立事务进行分析的. 遂本文针对【2】给的小例子进行内嵌事务的源码分析.

分析

首先定位需要调试的源码位置.

根据【1】的源码33的第13行. 我们只需要在如下代码

1
2
3
4
5
6
7
8
9
10
11
@Transactional(propagation = Propagation.REQUIRED)
public void insert_parent() {
System.out.println(this.getClass().getName()); // com.yfs.service.UserService 注意,不会打印出代理类,而是被代理对象
userDao.insert("parent");
try {
((UserService) AopContext.currentProxy()).insert_child(); // 通过AopContext从当前线程获取代理对象调用事务方法就会走事务代理,
// 则此时insert_child上的@Transactional注解就生效了
} catch (Exception e) {
e.printStackTrace();
}
}

​ 业务代码1

的第6行打断点(即第4行的事务已经进行了), 调用第二个事务方法的时候打断点. 也就是第二次来到了【1】的源码33的第13行. 则此时isExistingTransaction(transaction)就不再是false了,而是true. 则就会进入到【1】的源码33的第15行. 【1】的源码33的第15行的注释写的很贴切

1
2
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);

​ 源码1

如果找到了当前事务的话(其实就是txObject中有没有connectionHolder,现在显然是有的),

definition就是DelegatingTransactionAttribute封装过的txAttr, 而transaction就是txObject. 跟进此方法来到

org.springframework.transaction.support.AbstractPlatformTransactionManager.handleExistingTransaction(TransactionDefinition, Object, boolean)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}

if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}

// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

​ 源码2

入参definition是封装成DelegatingTransactionDefinition的txAttr,里面有事务的信息(说白了就是方法上@Transactional注解的信息), 自然有propagation属性的信息. 源码2主要就是根据这个属性的信息进行处理的.

在【2】中,insert_child 上的propagation属性的值是REQUIRES_NEW.

则会进入到源码2的第23行,控制台打印日志

1
DEBUG - Suspending current transaction, creating new transaction with name [com.yfs.service.UserService.insert_child]

说的也很清楚了——挂起当前事务(即insert_parent产生的事务),创建一个新的事务,其名称为com.yfs.service.UserService.insert_child,至于为什么能得到这个名称,就是txAttr封装成 DelegatingTransactionAttribute 的理由, 这个在【1】中也谈到了的.

我们来到了源码2的第28行. 此行的作用是挂起当前事务.

而跟进去,

org.springframework.transaction.support.AbstractPlatformTransactionManager.suspend(Object)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
...

​ 源码3

其实做的事情是比较简单的——记录当前事务,清空threadlocal变量然后用记录好的当前事务信息封装成SuspendedResourcesHolder返回. 其实就是记下了当前事务,因为你现在要执行别的事务去了. 然后源码2的29-35行开启新的事务返回(即取代了【1】的源码33的第41行返回的TransactionStatus ).

注意哈,源码2的72行将打印日志

1
DEBUG - Acquired Connection [com.mchange.v2.c3p0.impl.NewProxyConnection@40844aab] for JDBC transaction

也就是对于REQUIRES_NEW级别的propagation,处理方式是新获取一个数据库连接. 那么出现异常进行回滚的时候走的自然和【1】中分析的一样,完全不影响之前的insert_parent事务的提交. 这是因为业务代码1已经自己捕捉了异常(如果不捕捉则因为抛出的是运行时异常(算术异常),则就会触发父事务insert_parent的回滚,则两次插入都会失败).

这里注意哈,进入业务代码1的时候会走一次拦截器栈,调用业务代码1的第六行也会走一次拦截器栈. 这两次形成栈关系. 而且处于不同的数据库连接(从而是不同的事务——这不就是传播属性REQUIRES_NEW的含义么?). 所以

【3】中源码17从insert_child会从completeTransactionAfterThrowing(也就是21行)退出,而insert_parent会从commitTransactionAfterReturning(也就是27行)退出. 而且他们的txInfo是2个不一样的对象(因为新的数据库连接,【1】的源码32的第27行prepareTransactionInfo返回的TransactionInfo 都是不同的对象, 里面的数据库连接也不同, 完全是2个事务).

而如果将insert_child的propagation属性改成 REQUIRED的话,则源码2就会直接走102行复用之前的definition,和transaction. 也就是复用了之前的事务,是同一个数据库连接. 自然可以回滚啦~

而且,如果insert_parent和insert_child都是REQUIRED的话,而且insert_child抓了异常, 则依旧两次插入都不会成功(不抓异常就2条更不会成功). 原因是insert_child 用了insert_parent的事务(即数据库连接),所以就算insert_child 抓了异常,但是它已经做了回滚操作了(和数据库交互了),所以自然insert_parent也不能插入成功. REQUIRES_NEW和REQUIRED的最大区别就在于前者新拿了一个数据库连接,而后者是复用的.

参考

【1】https://yfsyfs.github.io/2019/06/18/Spring-%E4%BA%8B%E5%8A%A1%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%88%E5%90%8E%E7%AF%87%EF%BC%89/

【2】https://github.com/yfsyfs/backend/tree/master/spring-annotation-transaction-child-parent-correct

【3】https://yfsyfs.github.io/2019/06/17/Spring-%E4%BA%8B%E5%8A%A1%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/