Spring 事务源码解析(后篇)

缘起

【4】中已经完成了从Spring IOC层面如何引入事务的问题, 本文接着完成从拦截器栈角度完成AOP源码分析的问题.

分析

下面的源码编号是接着【4】的.

其实源码17的第11行跟进去

org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(PlatformTransactionManager, TransactionAttribute, String)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SuppressWarnings("serial")
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
} // 第一件事情: 封装txAttr(一个RuleBasedTransactionAttribute,它实现了TransactionAttribute接口)为DelegatingTransactionAttribute(它也实现了TransactionAttribute接口),为什么要封装? 我的理解是因为提供了可以获取joinpointIdentification的功能

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr); // 第二件事情: 获取事务
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); // 第三件事情: 综合前两件事情得到TransactionInfo,说白了,TransactionInfo就是第二步得到的TransactionStatus和TransactionAttribute的综合体
}

​ 源码32

源码32的第一件事情没什么好讲的, 来看第二件事情, 即源码32的第18行——在(封装为DelegatingTransactionAttribute)txAttr和事务管理器tm(这里的实现是 DataSourceTransactionManager)都非空的情况下,调用事务管理器的getTransaction接口(此接口由PlatformTransactionManager接口定义). 跟进此方法. 来到

org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(TransactionDefinition)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Override
public final TransactionStatus getTransaction (TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction(); // 第一件事情: 获取对应的事务实例,获取的是txObject

// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();

if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}

if (isExistingTransaction(transaction)) { // 判断当前线程是否存在事务
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled); // 如果当前线程已经存在事务,则转向嵌套事务的处理
}

// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
} // 事务超时的处理

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

​ 源码33

下面就上面的代码详细的分析. 其实Spring的架构就是这样——一个比较简端的函数入口(例如这里的源码33),只是定义处理框架,具体的业务交给一些子函数取处理,这是Spring框架的特点.

来到源码33的第三行,跟进去

1
2
3
4
5
6
7
8
9
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); // 从当前线程获取connectionHolder
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

​ 源码34

其中DataSourceTransactionObject是

1
DataSource transaction object, representing a ConnectionHolder.Used as transaction object by DataSourceTransactionManager.

即DataSourceTransactionObject是一个数据源事务对象,代表一个连接持有器,被DataSourceTransactionManager这种事务管理器使用.

第四行代码说的是txObject是否允许设置回滚点取决于是否允许嵌套事务(如果不允许嵌套 事务的话,显然就不允许设置回滚点,回滚点的应用场景之一就是有嵌套事务的存在). 而DataSourceTransactionManager的nestedTransactionAllowed是true,这是DataSourceTransactionManager的无参构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
public DataSourceTransactionManager() {
setNestedTransactionAllowed(true);
}

/**
* Create a new DataSourceTransactionManager instance.
* @param dataSource JDBC DataSource to manage transactions for
*/
public DataSourceTransactionManager(DataSource dataSource) {
this();
setDataSource(dataSource);
afterPropertiesSet();
}

而我们在项目的【1】的MainConfig中调用的就是上面的构造器. (其实nestedTransactionAllowed是DataSourceTransactionManager的父类AbstractPlatformTransactionManager的属性,默认是false的, 只是被DataSourceTransactionManager在其构造函数中修改了).

源码34的第五行的逻辑是

org.springframework.transaction.support.TransactionSynchronizationManager.getResource(Object)

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Retrieve a resource for the given key that is bound to the current thread.
*/
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}

​ 源码35

和org.springframework.transaction.support.TransactionSynchronizationManager.doGetResource(Object)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Actually check the value of the resource that is bound for the given key.
*/
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}

​ 源码36

源码34 的第六行的dataSource是com.mchange.v2.c3p0.ComboPooledDataSource数据源,所以源码35、36中的key、actualKey也都是以数据源作为键. 源码36第一次来肯定map是null的. 所以源码36 的第七行就直接返回null了.源码35第八行也就是直接返回null了. 注意,源码35行的日志很有意思——

1
2
Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]

这明显的意思就是如果当前线程绑定了resource的话,就直接复用了. 其实源码35的注释也写清楚了.

回到源码34第七行,简单的设置了conHolder为null以及newConnectionHolder为false

最后源码34返回了txObject这个DataSourceTransactionManager中的DataSourceTransactionObject. 来到源码33的13行,判断是否存在事务,isExistingTransaction方法的逻辑很简单,就是

1
2
3
4
5
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

而这里的transaction入参就是源码33第三行得到的那个txObject. 那txObject.hasConnectionHolder()已经是false了,所以自然返回false. 即当前线程不存在事务(为啥说当前线程? 因为源码34到36的resources是一个ThreadLocal变量).

来到源码33的第19行,即事务超时的处理,definition就是源码32第七行封装过的txAttr,它作为DelegatingTransactionAttribute的是实现了TransactionDefinition接口的. 而definition.getTimeout() <与TransactionDefinition.TIMEOUT_DEFAULT都是-1,所以来到了源码33第24行. 这一行的意思是如果事务定义(definition其实就是封装过的txAttr)强制当前有事务,但是现在源码33第13行已经判定了没有事务

所以就不符合要求,就会抛出异常. 但是现在 txAttr中是PROPAGATION_REQUIRED,所以不会抛出异常. 来到源码33的28行,显然符合判断,即对于 PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都是需要新建事务的(因为源码33第13行已经判定了当前)

源码33的第31行是空挂起,因为传入的是null, 所以其实这个函数什么也没做.

即返回的suspendedResources为null(没有任何事务资源被挂起)

源码33 的36行和37行不是很清楚得到什么(估计要到源码33的40行才清楚). 但是结果是newSynchronization为true.因为getTransactionSynchronization()得到的是0

下面来到重头戏——源码33的39行——doBegin(transaction, definition);, 第一个参数是txObject,第二个参数是封装过的txAttr. 可以说事务是从这个函数开始的 ,因为在这个函数中已经开始尝试了对数据库连接的获
取,当然,在获取数据库连接的同时 , 一些必要的设置也是需要同步设置的

下面跟进这个函数,总体而言,这个函数构造 transaction , 包括设置ConnectionHolder 、 隔离级别、 timeout
并且如果是新连接 , 则绑定到当前线程

org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(Object, TransactionDefinition)

这个方法是org.springframework.transaction.support.AbstractPlatformTransactionManager定义的抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* This implementation sets the isolation level but ignores the timeout.
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// Bind the connection holder to the thread.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

​ 源码37

第六行首先获取txAttr, 然后用他看看当前线程有没有ConnectionHolder,显然现在是没有的. 则来到第12行——从数据源(【1】中用的是C3P0的数据源)获取一个连接. 这个不在我们Spring考察的范围中. 于是我们获取到了

newCon=com.mchange.v2.c3p0.impl.NewProxyConnection@18a136ac 这样一个(新)连接.

看源码37的第16行,我们将用这个新的连接用ConnectionHolder封装一下,所谓封装的源码如下,是ConnectionHolder的构造器

1
2
3
4
5
6
7
8
9
10
11
/**
* Create a new ConnectionHolder for the given JDBC Connection,
* wrapping it with a {@link SimpleConnectionHandle},
* assuming that there is no ongoing transaction.
* @param connection the JDBC Connection to hold
* @see SimpleConnectionHandle
* @see #ConnectionHolder(java.sql.Connection, boolean)
*/
public ConnectionHolder(Connection connection) {
this.connectionHandle = new SimpleConnectionHandle(connection);
}

​ 源码38

其实就是给ConnectionHolder的connectionHandle设置为一个connection为内核的SimpleConnectionHandle。

源码37的第16行的第二个参数newConnectionHolder设置为了true,即txObject(一个DataSourceTransactionObject)的属性.

经过源码37的第16行处理之后,txObject中已经有了ConnectionHolder. 即txObject.hasConnectionHolder不再是false了.

源码37的第19行设置了connectionHolder的synchronizedWithTransaction属性为true. 注意,源码37第10行的判断——我们不是每次都要去数据源获取新连接,而是并不是每次都会获取新的连接 , 如果当前线程中的 connectionHolder 已经存在, 则没有必要再次获取,或者 , 对于事务同步标识(即synchronizedWithTransaction)设置为 true 的需要重新获取连接 。

注意,到了这里先小结一下

即txObject里面有connectionHolder,connectionHolder里面有connectionHandler,connectionHandler里面有connection(源码38).而且connectionHolder设置了事务同步标识为synchronizedWithTransaction为true.

源码37的20行拿出了我们刚刚从数据源获取的连接, 并且向txObject的connectionHolder的currentConnection设置了当前的数据库连接(其实就是connectionHolder的connectionHandler中的connection).

来到源码37的第22行. prepareConnectionForTransaction这个名字顾名思义就是为这个事务(使用txAttr,也就是之前封装好的DelegatingTransactionAttribute)设置刚刚获取的连接(connection)的属性,这个connection是和数据库交互的.

讲到这里,基本就明白了——txObject中存放的是连接,而连接根据definition(也就是txAttr)来设置各种属性. 不信? 我们跟进prepareConnectionForTransaction的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Prepare the given Connection with the given transaction semantics.
* @param con the Connection to prepare
* @param definition the transaction definition to apply
* @return the previous isolation level, if any
* @throws SQLException if thrown by JDBC methods
* @see #resetConnectionAfterTransaction
*/
public static Integer prepareConnectionForTransaction(Connection con, TransactionDefinition definition)
throws SQLException {

Assert.notNull(con, "No Connection specified");

// Set read-only flag.
if (definition != null && definition.isReadOnly()) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Setting JDBC Connection [" + con + "] read-only");
}
con.setReadOnly(true);
}
catch (SQLException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
throw ex;
}
exToCheck = exToCheck.getCause();
}
// "read-only not supported" SQLException -> ignore, it's just a hint anyway
logger.debug("Could not set JDBC Connection read-only", ex);
}
catch (RuntimeException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
// Assume it's a connection timeout that would otherwise get lost: e.g. from Hibernate
throw ex;
}
exToCheck = exToCheck.getCause();
}
// "read-only not supported" UnsupportedOperationException -> ignore, it's just a hint anyway
logger.debug("Could not set JDBC Connection read-only", ex);
}
}

// Apply specific isolation level, if any.
Integer previousIsolationLevel = null;
if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
if (logger.isDebugEnabled()) {
logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
definition.getIsolationLevel());
}
int currentIsolation = con.getTransactionIsolation();
if (currentIsolation != definition.getIsolationLevel()) {
previousIsolationLevel = currentIsolation;
con.setTransactionIsolation(definition.getIsolationLevel());
}
}

return previousIsolationLevel;
}

​ 源码39

第15行对只读事务进行了设置,也就是说如果definition(即txAttr)中设置了当前事务是只读的(即从方法的@Transactional 注解中设置了只读), 则我们就要设置connection为read-only的(源码39的20行).

你看,这不就应验了我刚刚说的——txObject中存放的connection连接根据txAttr中的属性进行设置吗? 所以txAttr是十分重要的——它代表了使用者希望连接具备什么样的属性. 来到源码50行,如果txAttr(即用户希望的)隔离级别不是ISOLATION_DEFAULT(用-1表示)的话,则返回过去的隔离级别并设置当前用户想设置的隔离级别.

注意,这里我多说一句,我不会去跟诸如 setTransactionIsolation 的源码. 因为那里不是Spring的源码. 而是各个数据源或者mysql实现的. 如果用户没有动事务的默认的隔离级别的话,例如本例(【1】)就是这样. 则返回previousIsolationLevel=null. 源码37的23行就对txObject设置了其previousIsolationLevel属性(自然也是null).

源码37的28行判断了当前连接是不是自动提交的,显然是(mysql数据库就是这样),所以会进到源码37 的29行,设置txObject的mustRestoreAutoCommit属性为true. 即原本数据库的此连接是自动提交的(例如mysql), 所以要保存一下. 而且看一下代码中做的事情就知道了——Switching JDBC Connection [“ + con + “] to manual commit。

最后源码37的33行设置了连接的自动提交为false——毕竟你要开启事务嘛~ 自然不能自动提交啦.

注意源码37的25行注释——转换手动提交对于某些jdbc驱动性能损耗是十分高的,所以源码37的28行要做一个判断——如果在某些连接池中已经设置了手动提交的话,就没必要进行后面的再设置了.

我们继续,我们来到源码37的36行,prepareTransactionalConnection,顾名思义就是准备事务连接,上面已经对connection进行了只读、隔离级别、自动提交等一系列属性进行了设置. 那么这里这个函数是做什么的?

跟进去

org.springframework.jdbc.datasource.DataSourceTransactionManager.prepareTransactionalConnection(Connection, TransactionDefinition)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
Prepare the transactional {@code Connection} right after transaction begin.
*/
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
throws SQLException {

if (isEnforceReadOnly() && definition.isReadOnly()) {
Statement stmt = con.createStatement();
try {
stmt.executeUpdate("SET TRANSACTION READ ONLY");
}
finally {
stmt.close();
}
}
}

​ 源码40

如果事务被设置为只读的,则整个事务先执行一条sql语句

1
SET TRANSACTION READ ONLY

关于这条sql参见【5】,其实就是设置只读事务. 而现在显然不是只读事务,所以源码40本质上什么也没干就退出了.源码37的第37行设置了当前事务是活跃的. 即txObject的connectionHolder属性的transactionActive属性设置为true(Set whether this holder represents an active, JDBC-managed transaction.). 源码37的第39行得到timeout是-1,即用不超时. 源码37的40行表示如果txAttr(即用户想要的超时时间)和默认的”永不超时”不相等的话,则就要设置失效时间(源码37的41行). 设置的方法是

ResourceHolderSupport中的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Set the timeout for this object in seconds.
* @param seconds number of seconds until expiration
*/
public void setTimeoutInSeconds(int seconds) {
setTimeoutInMillis(seconds * 1000L);
}

/**
* Set the timeout for this object in milliseconds.
* @param millis number of milliseconds until expiration
*/
public void setTimeoutInMillis(long millis) {
this.deadline = new Date(System.currentTimeMillis() + millis);
}

注意,源码37 的41行是对ConnectionHolder进行设置的,而ConnectionHolder是ResourceHolderSupport的子类,你懂了吧?

好了,来到了源码37的45行. 做的事情注释也写清楚了——Bind the connection holder to the thread. 绑定txObject中的ConnectionHolder和当前线程. 这里先做了一个判断,问是不是新建的ConnectionHolder?显然嘛~ 显然是新建的嘛 ~ 因为就是看txObject中的newConnectionHolder属性是不是true, 源码34的第七行我们就做了这件事情啦. 则源码37的46行,做的事情就是绑定数据源和txObject(其实就是数据源事务对象)的connectionHolder源码如下

TransactionSynchronizationManager中的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");
Map<Object, Object> map = resources.get();
// set ThreadLocal Map if none found
if (map == null) {
map = new HashMap<Object, Object>();
resources.set(map);
}
Object oldValue = map.put(actualKey, value);
// Transparently suppress a ResourceHolder that was marked as void...
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}
if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
if (logger.isTraceEnabled()) {
logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +
Thread.currentThread().getName() + "]");
}
}

​ 源码41

注意,我们曾今在源码35中获取resource(getResource), 也是在TransactionSynchronizationManager 这个类中.

源码41的key是c3p0数据源,value是txObject的ConnectionHolder. 源码41的第四行毫无意外的得到了null. 所以第7行map进行了赋值——一个hashmap,然后塞进了这里的<c3p0数据源,txObject的connectionHolder>键值对. 所以这里我们就明白了,源码35中的getResource方法获取的是什么资源——就是txObject的connectionHolder! 而这个connectionHolder中有数据库连接,也有根据txAttr的信息写入的一系列事务信息.

至此,源码33的39行的doBegin的代码已经解析完毕,其实做的事情就是向数据源获取连接之后根据事务信息txAttr对连接进行各种设置. 然后绑定当前线程(确切讲,根据源码41,是一个map绑定了当前线程,map是使用c3p0数据源做key的, 则一根线程来取的时候,先取出map,然后根据数据源取出相应的connectionHolder, 里面有从该数据源获取的并根据txAttr设置好各种事务属性的连接).

来到源码33 的第40行,即prepareSynchronization,顾名思义是处理事务同步的事情.

跟进去

org.springframework.transaction.support.AbstractPlatformTransactionManager.prepareSynchronization(DefaultTransactionStatus, TransactionDefinition) 这是AbstractPlatformTransactionManager的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Initialize transaction synchronization as appropriate.
*/
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}

​ 源码42

源码33的36行的作用就知道了——判断事务的同步属性.

上面源码42做的事情就是往一系列的TransactionSynchronizationManager的ThreadLocal属性中塞东西.

第六行塞的是事务是否活跃? doBegin(源码37的37行)中就已经设置为true了.

第10行是塞是否是只读事务

第11行是塞事务的名字,注意,因为传入的definition是txAttr, 而他在源码32的第七行就已经封装成了

DelegatingTransactionAttribute(里面实现了getName接口),所以我们现在就知道为毛要封装成DelegatingTransactionAttribute了,因为可以保存事务的名字——就用方法的名字com.yfs.service.UserService.insert来作为当前事务的名字. 这真是一个好办法——不添加属性,而是添加回调来保存一个新的属性.

最后第12行Activate transaction synchronization for the current thread. Called by a transaction manager on transaction begin.

其实prepareSynchronization的作用就是Initialize transaction synchronization as appropriate.

最后源码33的40行返回了DefaultTransactionStatus status.

回到了源码32的27行.

prepareTransactionInfo,顾名思义就是准备TransactionInfo——通过txAttr和status

我们来看看源码

org.springframework.transaction.interceptor.TransactionAspectSupport.prepareTransactionInfo(PlatformTransactionManager, TransactionAttribute, String, TransactionStatus) 注意这是TransactionAspectSupport的方法,而TransactionAspectSupport是TransactionInterceptor的父类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}

// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
txInfo.bindToThread();
return txInfo;
}

​ 源码43

上面的源码首先构建了TransactionInfo txInfo,最后txInfo中塞进去了status(即txInfo由TransactionAttribute txAttr和TransactionStatus status联合构成),最后第25行是绑定当前事务信息txInfo进入当前线程, 所谓绑定就是往 TransactionAspectSupport 中的一个threadlocal变量transactionInfoHolder中塞txInfo(事务信息).

org.springframework.transaction.interceptor.TransactionAspectSupport.TransactionInfo.bindToThread()

1
2
3
4
5
6
private void bindToThread() {
// Expose current TransactionStatus, preserving any existing TransactionStatus
// for restoration after this transaction is complete.
this.oldTransactionInfo = transactionInfoHolder.get();
transactionInfoHolder.set(this);
}

最后源码43返回事务信息(一个TransactionInfo).

至此,我们已经分析完源码32了. 回到【4】中的源码17的第17行. 进入TransactionInterceptor的invoke方法中传入的回调. 即源码16的12行,而源码16 的第12行其本质就是让拦截器链继续执行下一个拦截器. 但是【4】中我们分析出来了——拦截器栈只有一个拦截器,就是transactioninterceptor. 所以回到org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(),注意 CglibMethodInvocation是ReflectiveMethodInvocation的子类,就不要觉得奇怪了. 因为拦截器栈的启动就是从CglibMethodInvocation.proceed方法开始的(【1】源码33).

而因为拦截器栈中只有一个拦截器,所以就直接执行目标方法去了

org.springframework.aop.framework.CglibAopProxy.CglibMethodInvocation.invokeJoinpoint()

1
2
3
4
5
6
7
@Override
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
...

​ 源码44

即源码44在第五行就执行目标方法并返回了.其中target就是com.yfs.service.UserService@1a760689(因为动态代理中的target就是被包装对象嘛).arguments是[], 毕竟我们的insert方法没有传参(这里写的insert比较简单). 然后就一路来到了(因为动态代理没有java源文件,所以看不了源码)目标方法

1
2
3
4
public void insert() {
userDao.insert();
int i = 1 / 0;
}

因为使用的是spring-jdbcTemplate,所以最后来到了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected int update(final PreparedStatementCreator psc, final PreparedStatementSetter pss)
throws DataAccessException {

logger.debug("Executing prepared SQL update");

return execute(psc, new PreparedStatementCallback<Integer>() {
@Override
public Integer doInPreparedStatement(PreparedStatement ps) throws SQLException {
try {
if (pss != null) {
pss.setValues(ps);
}
int rows = ps.executeUpdate(); // ps是com.mchange.v2.c3p0.impl.NewProxyPreparedStatement,即c3p0数据源的preparedstatement,而其实跟进去,里面的inner是com.mysql.jdbc.JDBC42PreparedStatemen. 即数据源c3p0指挥真正干活的是mysql的东西
if (logger.isDebugEnabled()) {
logger.debug("SQL update affected " + rows + " rows");
}
return rows;
}
finally {
if (pss instanceof ParameterDisposer) {
((ParameterDisposer) pss).cleanupParameters();
}
}
}
});
}

​ 源码49

再跟进execute方法

org.springframework.jdbc.core.JdbcTemplate.execute(PreparedStatementCreator, PreparedStatementCallback)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
throws DataAccessException {
...
Connection con = DataSourceUtils.getConnection(getDataSource());
PreparedStatement ps = null;
Connection conToUse = con;
if (this.nativeJdbcExtractor != null && // 如果有自定义jdbc提取器
this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
}
ps = psc.createPreparedStatement(conToUse);
applyStatementSettings(ps);
PreparedStatement psToUse = ps;
if (this.nativeJdbcExtractor != null) {
psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
}
T result = action.doInPreparedStatement(psToUse);
handleWarnings(ps);
return result;
...
catch (SQLException ex) {
// Release Connection early, to avoid potential connection pool deadlock
// in the case when the exception translator hasn't been initialized yet.
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
String sql = getSql(psc);
psc = null;
JdbcUtils.closeStatement(ps);
ps = null;
DataSourceUtils.releaseConnection(con, getDataSource());
con = null;
throw getExceptionTranslator().translate("PreparedStatementCallback", sql, ex);
}
finally {
if (psc instanceof ParameterDisposer) {
((ParameterDisposer) psc).cleanupParameters();
}
JdbcUtils.closeStatement(ps);
DataSourceUtils.releaseConnection(con, getDataSource());
}

​ 源码45

其中第五行我们考察一下,不消说,getDataSource()得到的肯定是c3p0数据源,

跟两步来到

org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSource)

1
2
3
4
5
6
7
8
9
/*
Actually obtain a JDBC Connection from the given DataSource.
*/
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
Assert.notNull(dataSource, "No DataSource specified");

ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
...
return conHolder.getConnection();

​ 源码46

而TransactionSynchronizationManager的getResource方法(这是核心代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}

/**
* Actually check the value of the resource that is bound for the given key.
*/
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}

​ 源码47

之前源码34的第五行就这样获取过资源(前面说了: 所谓资源就是当前线程绑定的map中根据数据源为key获取的txObject中的connectionHolder),但是那是第一次,所以获取到的是null. doBegin(源码37)执行完之后,当前线程中就有了. 得到了当前线程根据数据源绑定的connectionHolder,里面有数据库连接. 最后源码46 的第九行返回了数据库连接.

回到源码45,第六行,则余下的代码就很顺眼了——创建PreparedStatement,然后执行 PreparedStatement. 最后将执行的结果result返回. 但是源码45有一行代码值得关注,就是第13行. 我们跟进去看看

org.springframework.jdbc.core.JdbcTemplate.applyStatementSettings(Statement)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*
Prepare the given JDBC Statement (or PreparedStatement or CallableStatement), applying statement settings such as fetch size, max rows, and query timeout.
*/
protected void applyStatementSettings(Statement stmt) throws SQLException {
int fetchSize = getFetchSize();
if (fetchSize != -1) {
stmt.setFetchSize(fetchSize);
}
int maxRows = getMaxRows();
if (maxRows != -1) {
stmt.setMaxRows(maxRows);
}
DataSourceUtils.applyTimeout(stmt, getDataSource(), getQueryTimeout());
}

​ 源码48

而getFetchSize() 这个方法Return the fetch size specified for this JdbcTemplate(默认是-1,Passing ‘-1’ will retrieve all the rows. This is default behavior.也就是默认是一次性把结果集的数据全部取出来,这样就容易造成内存不足 ).

第6行说,如果不是默认的-1,就根据JdbcTemplate配置的fetchSize参数对stmt进行设置. 同样处理maxRows参数. 最后处理timeout. 处理timeout的源码如下

DataSourceUtils的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void applyTimeout(Statement stmt, DataSource dataSource, int timeout) throws SQLException {
Assert.notNull(stmt, "No Statement specified");
ConnectionHolder holder = null;
if (dataSource != null) {
holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
}
if (holder != null && holder.hasTimeout()) {
// Remaining transaction timeout overrides specified value.
stmt.setQueryTimeout(holder.getTimeToLiveInSeconds());
}
else if (timeout >= 0) {
// No current transaction timeout -> apply specified value.
stmt.setQueryTimeout(timeout);
}
}

其实也就是把当前线程中的connectionHolder中的deadline属性(holder.getTimeToLiveInSeconds源码就是这样写的)赋值给statement. 回到源码45的18行,就会执行源码49第六行中传入的回调. 直至源码49第13行执行PreparedStatement. 然后返回结果后进入源码45的36行的finally块进行处理. 包括源码45的40行的关闭PreparedStatement和释放连接回数据源(就是一个连接池). 最后执行完目标方法之后,因为发现了 1/0 的算术异常,于是回到源码17的第21行,就要回滚事务. 即completeTransactionAfterThrowing方法.

跟进去

org.springframework.transaction.interceptor.TransactionAspectSupport.completeTransactionAfterThrowing(TransactionInfo, Throwable)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute.rollbackOn(ex)) {
try {
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by rollback error", ex);
throw err;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
catch (Error err) {
logger.error("Application exception overridden by commit error", ex);
throw err;
}
}
}
}

​ 源码50

源码50第二行的hasTransaction就是判断txInfo的transactionStatus是不是非空. 源码50第七行的就是判断Should we roll back on the given exception? 最后发现判断的依据是(当然,有一些细节,这里就不具体跟了,就是源码50第七行的rollbackOn方法的细节就不跟了)

1
ex instanceof RuntimeException || ex instanceof Error

也就是Spring 事务一般而言,只有运行时异常和错误才会回滚, 其他类型不会回滚(即如果不满足问滚条件即使抛出异常’也同样会提交 ,例如你抛出的是Exception). 虽然不细跟了,但是有一点还是必须要说的, 那就是我们其实可以使用@Transactional注解的rollbackFor属性扩展让遇到我们自定义的异常的时候也可以回滚. 为什么呢? 答案在

源码50的第七行代码的rollbackOn(看来还是要跟一下,囧)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public boolean rollbackOn(Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("Applying rules to determine whether transaction should rollback on " + ex);
}

RollbackRuleAttribute winner = null;
int deepest = Integer.MAX_VALUE;

if (this.rollbackRules != null) {
for (RollbackRuleAttribute rule : this.rollbackRules) {
int depth = rule.getDepth(ex);
if (depth >= 0 && depth < deepest) {
deepest = depth;
winner = rule;
}
}
}
...

这里的rollbackRules就是从@Transactional注解的rollbackFor属性提取出来的. 何以为证呢? rollbackRules属性是RuleBasedTransactionAttribute的属性,而RuleBasedTransactionAttribute的初始化是在org.springframework.transaction.annotation.SpringTransactionAnnotationParser.parseTransactionAnnotation(AnnotationAttributes)中完成的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));

List<RollbackRuleAttribute> rollbackRules = new ArrayList<RollbackRuleAttribute>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);

return rbta;
}

而SpringTransactionAnnotationParser就是加在AnnotationTransactionAttributeSource,即【4】的源码11的第二个bean. 也就是在这个bean对@Transactional注解进行解析的时候就会构建txAttr。

然后来到源码50的第9行,进行回滚, 就是用事务管理器DataSourceTransactionManager利用当前的TransactionStatus进行回滚, 我们来看看源码

org.springframework.transaction.support.AbstractPlatformTransactionManager.rollback(TransactionStatus)

1
2
3
4
5
6
7
8
9
10
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus);
}

​ 源码51

显然,对于已经完成的事务是不能回滚的.

然后来到源码51 的第九行,最后跟进

org.springframework.transaction.support.AbstractPlatformTransactionManager.processRollback(DefaultTransactionStatus)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Process an actual rollback.
* The completed flag has already been checked.
* @param status object representing the transaction
* @throws TransactionException in case of rollback failure
*/
private void processRollback(DefaultTransactionStatus status) {
try {
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint(); //如果有保存点,也就是当前事务为单独的线程则会退到保存点
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status); //如果 当前事务为独立的新事务 ,则直接 回退
}
else if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status); // 如果当前事务不是独立的事务,那么只能标记状态 , 等到事务链执行完毕后统一回滚
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
cleanupAfterCompletion(status);
}
}

​ 源码52

源码52的注释写的也很清楚了——真正的回滚.

然后判断有没有回滚点. 我这里是没有的,但是符合第17条判断,所以进入到源码52的第21行.

跟进

org.springframework.jdbc.datasource.DataSourceTransactionManager.doRollback(DefaultTransactionStatus)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}

​ 源码53

从源码33的第37行知道TransactionStatus中有transaction(即一个txObject,看源码33的37行). 最后拿出txObject中的数据库连接——就是当前线程绑定的数据库连接,执行rollback——这才是Spring层面最底层的——通过com.mchange.v2.c3p0.impl.NewProxyConnection和数据库打交道.

这样就实现了回滚事务.

下面来看看如何提交事务. 也就是如果

将示例代码中的 int i = 1/0 删掉. 让事务正常提交的话.

则执行的就是源码17 的第27行. 跟进

org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionInfo)

1
2
3
4
5
6
7
8
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

​ 源码54

和回滚其实很类似,我们跟进第6行.——用事务管理器提交事务(根据TransactionStatus)

跟进

org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(TransactionStatus)

1
2
3
4
5
6
7
8
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
...
processCommit(defStatus);

​ 源码55

事务如果已经完成的话,则抛异常. 否则的话,执行第八行. 跟进

org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(DefaultTransactionStatus)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Process an actual commit.
* Rollback-only flags have already been checked and applied.
* @param status object representing the transaction
* @throws TransactionException in case of commit failure
*/
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
doCommit(status);
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (globalRollbackOnly) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
...
finally {
cleanupAfterCompletion(status);
}

​ 源码56

因为这是独立的事务则直接提交 ,即直接进入源码56的29行. 跟进

org.springframework.jdbc.datasource.DataSourceTransactionManager.doCommit(DefaultTransactionStatus)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}

​ 源码57

这个和之前讲回滚的时候很像. 都是拿txObject(源码33的第37行).然后获取数据库连接(com.mchange.v2.c3p0.impl.NewProxyConnection),最后用它提交事务. 最后clean事务信息,表现为清空TransactionSynchronizationManager中的一系列的ThreadLocal变量(即源码56 的第40行),解除各种线程绑定、重设连接的属性之后归还数据库连接回连接池.

至此,提交事务的流程也分析完了.

先写到这里吧~ 还有一篇要写事务的嵌套以及如何将包装好的代理放进AopContext. 后面再写.

参考

【1】https://yfsyfs.github.io/2019/06/13/spring-AOP-%E6%B3%A8%E8%A7%A3%E5%8E%9F%E7%90%86/

【2】https://github.com/yfsyfs/backend/tree/master/spring-annotation-transaction

【3】Spring 源码深度解析 第二版 第十章

【4】https://yfsyfs.github.io/2019/06/17/Spring-%E4%BA%8B%E5%8A%A1%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/

【5】https://blog.csdn.net/firefoxboy/article/details/3023687