Spring系列第47篇:spring事务源码解析

star2017 1年前 ⋅ 319 阅读

本文主要内容:Spring编程式事务源码深度解析,理解spring事务的本质

目录

环境

  1. jdk1.8
  2. Spring版本:5.2.3.RELEASE
  3. mysql5.7

回顾一下编程式事务用法

  1. @Test
  2. public void test1() throws Exception {
  3. //定义一个数据源
  4. org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();
  5. dataSource.setDriverClassName("com.mysql.jdbc.Driver");
  6. dataSource.setUrl("jdbc:mysql://localhost:3306/javacode2018?characterEncoding=UTF-8");
  7. dataSource.setUsername("root");
  8. dataSource.setPassword("root123");
  9. dataSource.setInitialSize(5);
  10. //定义一个JdbcTemplate,用来方便执行数据库增删改查
  11. JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
  12. //1.定义事务管理器,给其指定一个数据源(可以把事务管理器想象为一个人,这个人来负责事务的控制操作)
  13. PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
  14. //2.定义事务属性:TransactionDefinition,TransactionDefinition可以用来配置事务的属性信息,比如事务隔离级别、事务超时时间、事务传播方式、是否是只读事务等等。
  15. TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
  16. //3.获取事务:调用platformTransactionManager.getTransaction开启事务操作,得到事务状态(TransactionStatus)对象
  17. TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
  18. //4.执行业务操作,下面就执行2个插入操作
  19. try {
  20. System.out.println("before:" + jdbcTemplate.queryForList("SELECT * from t_user"));
  21. jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");
  22. jdbcTemplate.update("insert into t_user (name) values (?)", "test1-2");
  23. //5.提交事务:platformTransactionManager.commit
  24. platformTransactionManager.commit(transactionStatus);
  25. } catch (Exception e) {
  26. //6.回滚事务:platformTransactionManager.rollback
  27. platformTransactionManager.rollback(transactionStatus);
  28. }
  29. System.out.println("after:" + jdbcTemplate.queryForList("SELECT * from t_user"));
  30. }

编程式事务过程

编程式事务过程,我们简化了一下,如下:

  1. 1、定义事务属性信息:TransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
  2. 2、定义事务管理器:PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);
  3. 3、获取事务:TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
  4. 4、执行sql操作:比如上面通过JdbcTemplate的各种方法执行各种sql操作
  5. 5、提交事务(platformTransactionManager.commit)或者回滚事务(platformTransactionManager.rollback)

下面通过源码来解析上面4步操作,带大家理解原理。

1、定义事务属性信息(TransactionDefinition)

事务启动的过程中需要定义事务的一些配置信息,如:事务传播行为、隔离级别、超时时间、是否是只读事务、事务名称,spring中使用TransactionDefinition接口表示事务定义信息,下面看一下TransactionDefinition接口源码,主要有5个信息

  • 事务传播行为
  • 事务隔离级别
  • 事务超时时间
  • 是否是只读事务
  • 事务名称
  1. public interface TransactionDefinition {
  2. //传播行为:REQUIRED
  3. int PROPAGATION_REQUIRED = 0;
  4. //传播行为:REQUIRED
  5. int PROPAGATION_SUPPORTS = 1;
  6. //传播行为:REQUIRED
  7. int PROPAGATION_MANDATORY = 2;
  8. //传播行为:REQUIRED
  9. int PROPAGATION_REQUIRES_NEW = 3;
  10. //传播行为:REQUIRED
  11. int PROPAGATION_NOT_SUPPORTED = 4;
  12. //传播行为:REQUIRED
  13. int PROPAGATION_NEVER = 5;
  14. //传播行为:REQUIRED
  15. int PROPAGATION_NESTED = 6;
  16. //默认隔离级别
  17. int ISOLATION_DEFAULT = -1;
  18. //隔离级别:读未提交
  19. int ISOLATION_READ_UNCOMMITTED = 1;
  20. //隔离级别:读已提交
  21. int ISOLATION_READ_COMMITTED = 2;
  22. //隔离级别:可重复读
  23. int ISOLATION_REPEATABLE_READ = 4;
  24. //隔离级别:序列化的方式
  25. int ISOLATION_SERIALIZABLE = 8;
  26. //默认超时时间
  27. int TIMEOUT_DEFAULT = -1;
  28. //返回事务传播行为,默认是REQUIRED
  29. default int getPropagationBehavior() {
  30. return PROPAGATION_REQUIRED;
  31. }
  32. //返回事务的隔离级别
  33. default int getIsolationLevel() {
  34. return ISOLATION_DEFAULT;
  35. }
  36. //返回事务超时时间(秒)
  37. default int getTimeout() {
  38. return -1;
  39. }
  40. //是否是只读事务
  41. default boolean isReadOnly() {
  42. return false;
  43. }
  44. //获取事务名称
  45. @Nullable
  46. default String getName() {
  47. return null;
  48. }
  49. //获取默认的事务定义信息
  50. static TransactionDefinition withDefaults() {
  51. return StaticTransactionDefinition.INSTANCE;
  52. }
  53. }

TransactionDefinition接口的实现类,比较多,我们重点关注用的比较多的2个

  • DefaultTransactionDefinition:TransactionDefinition接口的默认的一个实现,编程式事务中通常可以使用这个
  • RuleBasedTransactionAttribute:声明式事务中用到的是这个,这个里面对于事务回滚有一些动态匹配的规则,稍后在声明式事务中去讲。

编程式事务中通常使用DefaultTransactionDefinition,如下:

  1. DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
  2. //设置事务传播行为
  3. transactionDefinition.setPropagationBehavior(TransactionDefinition.PROPAGATION_SUPPORTS);
  4. //设置事务隔离级别
  5. transactionDefinition.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
  6. //设置是否是只读事务
  7. transactionDefinition.setReadOnly(true);
  8. //设置事务超时时间(s),事务超过了指定的时间还未结束,会抛异常
  9. transactionDefinition.setTimeout(5);
  10. //设置事务名称,这个名称可以随便设置,不过最好起个有意义的名字,在debug的过程中会输出
  11. transactionDefinition.setName("class完整类名.方法名称");

下面进入第2步,定义事务管理器。

2、定义事务管理器(PlatformTransactionManager)

事务管理器,这是个非常重要的角色,可以把这货想象为一个人,spring就是靠这个人来管理事务的,负责:获取事务、提交事务、回滚事务,Spring中用PlatformTransactionManager接口表示事务管理器,接口中有三个方法

  1. public interface PlatformTransactionManager {
  2. //通过事务管理器获取一个事务,返回TransactionStatus:内部包含事务的状态信息
  3. TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException;
  4. //根据事务的状态信息提交事务
  5. void commit(TransactionStatus status) throws TransactionException;
  6. //根据事务的状态信息回滚事务
  7. void rollback(TransactionStatus status) throws TransactionException;
  8. }

PlatformTransactionManager有多个实现类,用来应对不同的环境,比如你操作db用的是hibernate或者mybatis,那么用到的事务管理器是不一样的,常见的事务管理器实现有下面几个

JpaTransactionManager:如果你用jpa来操作db,那么需要用这个管理器来帮你控制事务。

DataSourceTransactionManager:如果你用是指定数据源的方式,比如操作数据库用的是:JdbcTemplate、mybatis、ibatis,那么需要用这个管理器来帮你控制事务。

HibernateTransactionManager:如果你用hibernate来操作db,那么需要用这个管理器来帮你控制事务。

JtaTransactionManager:如果你用的是java中的jta来操作db,这种通常是分布式事务,此时需要用这种管理器来控制事务。

我们的案例中使用的是JdbcTemplate来操作db,所以用的是DataSourceTransactionManager这个管理器。

  1. PlatformTransactionManager platformTransactionManager = new DataSourceTransactionManager(dataSource);

下面进入第3步,通过事务管理器来开启事务。

3、获取事务

下面源码我们以REQUIRED中嵌套一个REQUIRED_NEW来进行说明,也就是事务中嵌套一个新的事务。

3.1、getTransaction:获取事务

通过事务管理器的getTransactiongetTransaction(transactionDefinition)方法开启事务,传递一个TransactionDefinition参数

  1. TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);

事务管理器我们用的是DataSourceTransactionManager,下面我们看一下DataSourceTransactionManager.getTransaction源码

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager
  2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  3. //事务定义信息,若传入的definition如果为空,取默认的
  4. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  5. //@3.1-1:获取事务对象
  6. Object transaction = doGetTransaction();
  7. boolean debugEnabled = logger.isDebugEnabled();
  8. //@3.1-2:当前是否存在事务
  9. if (isExistingTransaction(transaction)) {
  10. //@1-3:如果当前存在事务,走这里
  11. return handleExistingTransaction(def, transaction, debugEnabled);
  12. }
  13. // 当前没有事务,走下面的代码
  14. // 若事务传播级别是PROPAGATION_MANDATORY:要求必须存在事务,若当前没有事务,弹出异常
  15. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
  16. throw new IllegalTransactionStateException(
  17. "No existing transaction found for transaction marked with propagation 'mandatory'");
  18. } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  19. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  20. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  21. //事务传播行为(PROPAGATION_REQUIRED|PROPAGATION_REQUIRES_NEW|PROPAGATION_NESTED)走这里
  22. //@3.1-4:挂起事务
  23. SuspendedResourcesHolder suspendedResources = suspend(null);
  24. try {
  25. //@3.1-5:是否开启新的事务同步
  26. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  27. //@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
  28. DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  29. //@3.1-7:doBegin用于开始事务
  30. doBegin(transaction, def);
  31. //@3.1-8:准备事务同步
  32. prepareSynchronization(status, def);
  33. //@3.1-9:返回事务状态对象
  34. return status;
  35. } catch (RuntimeException | Error ex) {
  36. //@3.1-10:出现(RuntimeException|Error)恢复被挂起的事务
  37. resume(null, suspendedResources);
  38. throw ex;
  39. }
  40. } else {
  41. //@3.1-11:其他事务传播行为的走这里(PROPAGATION_SUPPORTS、PROPAGATION_NOT_SUPPORTED、PROPAGATION_NEVER)
  42. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  43. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
  44. }
  45. }

下面来看一下@3.1-1:doGetTransaction方法,用来获取事务对象

3.2、doGetTransaction:获取事务对象

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager
  2. protected Object doGetTransaction() {
  3. //@3.2-1:创建数据源事务对象
  4. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  5. //@3.2-2:是否支持内部事务
  6. txObject.setSavepointAllowed(isNestedTransactionAllowed());
  7. //@3.2-4:ConnectionHolder表示jdbc连接持有者,简单理解:数据的连接被丢到ConnectionHolder中了,ConnectionHolder中提供了一些方法来返回里面的连接,此处调用TransactionSynchronizationManager.getResource方法来获取ConnectionHolder对象
  8. ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  9. //@3.2-5:将conHolder丢到DataSourceTransactionObject中,第二个参数表示是否是一个新的连接,明显不是的吗,新的连接需要通过datasource来获取,通过datasource获取的连接才是新的连接
  10. txObject.setConnectionHolder(conHolder, false);
  11. return txObject;
  12. }

下面来看一下@3.2-4的代码,这个是重点了,这个用到了一个新的类TransactionSynchronizationManager:事务同步管理器,什么叫同步?一个事务过程中,被调用的方法都在一个线程中串行执行,就是同步;这个类中用到了很多ThreadLocal,用来在线程中存储事务相关的一些信息,,来瞅一眼

  1. public abstract class TransactionSynchronizationManager {
  2. //存储事务资源信息
  3. private static final ThreadLocal<Map<Object, Object>> resources =
  4. new NamedThreadLocal<>("Transactional resources");
  5. //存储事务过程中的一些回调接口(TransactionSynchronization接口,这个可以在事务的过程中给开发者提供一些回调用的)
  6. private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
  7. new NamedThreadLocal<>("Transaction synchronizations");
  8. //存储当前正在运行的事务的名称
  9. private static final ThreadLocal<String> currentTransactionName =
  10. new NamedThreadLocal<>("Current transaction name");
  11. //存储当前正在运行的事务是否是只读的
  12. private static final ThreadLocal<Boolean> currentTransactionReadOnly =
  13. new NamedThreadLocal<>("Current transaction read-only status");
  14. //存储当前正在运行的事务的隔离级别
  15. private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
  16. new NamedThreadLocal<>("Current transaction isolation level");
  17. //存储当前正在运行的事务是否是活动状态,事务启动的时候会被激活
  18. private static final ThreadLocal<Boolean> actualTransactionActive =
  19. new NamedThreadLocal<>("Actual transaction active");
  20. //还有很多静态方法,主要是用来操作上面这些ThreadLocal的,这里就不列出的,大家可以去看看
  21. }

下面来看TransactionSynchronizationManager.getResource代码

  1. ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());

obtainDataSource()会返回事务管理器中的datasource对象

  1. protected DataSource obtainDataSource() {
  2. DataSource dataSource = getDataSource();
  3. Assert.state(dataSource != null, "No DataSource set");
  4. return dataSource;
  5. }

下面看TransactionSynchronizationManager.getResource的源码

  1. public static Object getResource(Object key) {
  2. //通过TransactionSynchronizationUtils.unwrapResourceIfNecessary(key)获取一个actualKey,我们传入的是datasouce,实际上最后actualKey和传入的datasource是一个对象
  3. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  4. //调用doGetResource方法获取对应的value
  5. Object value = doGetResource(actualKey);
  6. return value;
  7. }

doGetResource(actualKey)方法如下,内部会从resources这个ThreadLocal中获取获取数据ConnectionHolder对象,到目前为止,根本没有看到向resource中放入过数据,获取获取到的conHolder肯定是null了

  1. static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
  2. private static Object doGetResource(Object actualKey) {
  3. Map<Object, Object> map = resources.get();
  4. if (map == null) {
  5. return null;
  6. }
  7. Object value = map.get(actualKey);
  8. return value;
  9. }

TransactionSynchronizationManager.getResource:可以理解为从resource ThreadLocal中查找transactionManager.datasource绑定的ConnectionHolder对象

到此,Object transaction = doGetTransaction()方法执行完毕,下面我们再回到getTransaction方法,第一次进来,上下文中是没有事务的,所以会走下面的@3.1-4的代码,当前没有事务,导致没有事务需要挂起,所以suspend方法内部可以先忽略

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
  2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
  3. throws TransactionException {
  4. //事务定义信息,若传入的definition如果为空,取默认的
  5. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
  6. //@3.1-1:获取事务对象
  7. Object transaction = doGetTransaction();
  8. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
  9. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
  10. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
  11. //事务传播行为(PROPAGATION_REQUIRED|PROPAGATION_REQUIRES_NEW|PROPAGATION_NESTED)走这里
  12. //@3.1-4:挂起事务
  13. SuspendedResourcesHolder suspendedResources = suspend(null);
  14. try {
  15. //@3.1-5:是否开启新的事务同步
  16. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  17. //@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
  18. DefaultTransactionStatus status = newTransactionStatus(
  19. def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  20. //@3.1-7:doBegin用于开始事务
  21. doBegin(transaction, def);
  22. //@3.1-8:准备事务同步
  23. prepareSynchronization(status, def);
  24. //@3.1-9:返回事务状态对象
  25. return status;
  26. } catch (RuntimeException | Error ex) {
  27. //@3.1-10:出现(RuntimeException|Error)恢复被挂起的事务
  28. resume(null, suspendedResources);
  29. throw ex;
  30. }
  31. }
  32. }

然后会执行下面代码

  1. //@3.1-5:是否开启新的事务同步,事务同步是干嘛的,是spring在事务过程中给开发者预留的一些扩展点,稍后细说;大家先这么理解,每个新的事务newSynchronization都是true,开一个一个新的事务就会启动一个新的同步
  2. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  3. //@3.1-6:创建事务状态对象DefaultTransactionStatus,DefaultTransactionStatus是TransactionStatus的默认实现
  4. DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  5. //@3.1-7:doBegin用于开始事务
  6. doBegin(transaction, def);
  7. //@3.1-8:准备事务同步
  8. prepareSynchronization(status, def);
  9. //@3.1-9:返回事务状态对象
  10. return status;

上面过程我们重点来说一下@3.1-7 和 @3.1-8

3.3、@3.1-7:doBegin 开启事务

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
  2. protected void doBegin(Object transaction, TransactionDefinition definition) {
  3. //数据源事务对象
  4. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  5. //数据库连接
  6. Connection con = null;
  7. try {
  8. //txObject.hasConnectionHolder()用来判断txObject.connectionHolder!=null,现在肯定是null,所以txObject.hasConnectionHolder()返回false
  9. if (!txObject.hasConnectionHolder() ||
  10. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
  11. //调用transactionManager.datasource.getConnection()获取一个数据库连接
  12. Connection newCon = obtainDataSource().getConnection();
  13. //将数据库连接丢到一个ConnectionHolder中,放到txObject中,注意第2个参数是true,表示第一个参数的ConnectionHolder是新创建的
  14. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
  15. }
  16. //连接中启动事务同步
  17. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
  18. //获取连接
  19. con = txObject.getConnectionHolder().getConnection();
  20. //获取隔离级别
  21. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
  22. //设置隔离级别
  23. txObject.setPreviousIsolationLevel(previousIsolationLevel);
  24. //设置是否是只读
  25. txObject.setReadOnly(definition.isReadOnly());
  26. //判断连接是否是自动提交的,如果是自动提交的将其置为手动提交
  27. if (con.getAutoCommit()) {
  28. //在txObject中存储一下连接自动提交老的值,用于在事务执行完毕之后,还原一下Connection的autoCommit的值
  29. txObject.setMustRestoreAutoCommit(true);
  30. //设置手动提交
  31. con.setAutoCommit(false);
  32. }
  33. //准备事务连接
  34. prepareTransactionalConnection(con, definition);
  35. //设置事务活动开启
  36. txObject.getConnectionHolder().setTransactionActive(true);
  37. //根据事务定义信息获取事务超时时间
  38. int timeout = determineTimeout(definition);
  39. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
  40. //设置连接的超时时间
  41. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
  42. }
  43. //txObject中的ConnectionHolder是否是一个新的,确实是新的,所以这个地方返回true
  44. if (txObject.isNewConnectionHolder()) {
  45. //将datasource->ConnectionHolder丢到resource ThreadLocal的map中
  46. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
  47. }
  48. }
  49. }

重点来看一下下面这段代码

  1. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());

源码

  1. org.springframework.transaction.support.TransactionSynchronizationManager#bindResource
  2. public static void bindResource(Object key, Object value) throws IllegalStateException {
  3. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
  4. Map<Object, Object> map = resources.get();
  5. if (map == null) {
  6. map = new HashMap<>();
  7. resources.set(map);
  8. }
  9. map.put(actualKey, value);
  10. }

上面这段代码执行完毕之后,datasource->ConnectionHoloder(conn)被放到resources Threadloca的map中了。

3.4、@3.1-8:prepareSynchronization 准备事务同步

  1. //@3.1-8:准备事务同步
  2. prepareSynchronization(status, def);

源码如下,大家看一下,这个方法主要的作用是,开启一个新事务的时候,会将事务的状态、隔离级别、是否是只读事务、事务名称丢到TransactionSynchronizationManager中的各种对应的ThreadLocal中,方便在当前线程中共享这些数据。

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#prepareSynchronization
  2. protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
  3. //如果是一个新的事务,status.isNewSynchronization()将返回true
  4. if (status.isNewSynchronization()) {
  5. TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
  6. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
  7. definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
  8. definition.getIsolationLevel() : null);
  9. TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
  10. TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
  11. //@3.4-1:初始化事务同步
  12. TransactionSynchronizationManager.initSynchronization();
  13. }
  14. }

@3.4-1:初始化事务同步

  1. org.springframework.transaction.support.TransactionSynchronizationManager
  2. private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
  3. new NamedThreadLocal<>("Transaction synchronizations");
  4. //获取同步是否启动,新事务第一次进来synchronizations.get()是null,所以这个方法返回的是false
  5. public static boolean isSynchronizationActive() {
  6. return (synchronizations.get() != null);
  7. }
  8. //初始化事务同步,主要就是在synchronizations ThreadLocal中放一个LinkedHashSet
  9. public static void initSynchronization() throws IllegalStateException {
  10. if (isSynchronizationActive()) {
  11. throw new IllegalStateException("Cannot activate transaction synchronization - already active");
  12. }
  13. synchronizations.set(new LinkedHashSet<>());
  14. }

3.5、小结

获取事务的过程已经结束了,我们来看一下这个过程中做的一些关键的事情

  1. 1、获取db连接:从事务管理器的datasource中调用getConnection获取一个新的数据库连接,将连接置为手动提交
  2. 2、将datasource关联连接丢到ThreadLocal中:将第一步中获取到的连丢到ConnectionHolder中,然后将事务管理器的datasource->ConnectionHolder丢到了resource ThreadLocal中,这样我们可以通过datasourceThreadLocal中获取到关联的数据库连接
  3. 3、准备事务同步:将事务的一些信息放到ThreadLocal

4、事务方法中执行增删改查

以下面这个插入操作来看一下这个插入是如何参与到spring事务中的。

  1. jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");

最终会进入到jdbctemplate#execute方法里,无用代码我们给剔除,重点内部关注下面获取连接的方法

  1. org.springframework.jdbc.core.JdbcTemplate#execute(org.springframework.jdbc.core.PreparedStatementCreator, org.springframework.jdbc.core.PreparedStatementCallback<T>){
  2. //获取数据库连接
  3. Connection con = DataSourceUtils.getConnection(obtainDataSource());
  4. //通过conn执行db操作
  5. }

obtainDataSource()会返回jdbctemplate.datasource对象,下面重点来看DataSourceUtils.getConnection源码,最终会进入下面这个方法

  1. org.springframework.jdbc.datasource.DataSourceUtils#doGetConnection
  2. public static Connection doGetConnection(DataSource dataSource) throws SQLException {
  3. //用jdbctemplate.datSource从TransactionSynchronizationManager的resouce ThreadLocal中获取对应的ConnectionHolder对象,在前面获取事务环节中,transactionManager.datasource->ConnectionHolder被丢到resouce ThreadLocal,而jdbctemplate.datSource和transactionManager.datasource是同一个对象,所以是可以获取到ConnectionHolder的,此时就会使用事务开启是的数据库连接
  4. ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
  5. //conHolder不为空 && conHolder中有数据库连接对象
  6. if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
  7. //返回conHolder中的数据库连接对象
  8. return conHolder.getConnection();
  9. }
  10. //如果上面获取不到连接,会走这里,这里将会调用jdbctemplate.datasource.getConnection()从数据源中获取一个新的db连接
  11. Connection con = fetchConnection(dataSource);
  12. //将连接返回
  13. return con;
  14. }

可以得出一个结论:如果要让最终执行的sql受spring事务控制,那么事务管理器中datasource对象必须和jdbctemplate.datasource是同一个,这个结论在其他文章中说过很多次了,这里大家算是搞明白了吧。

5、提交事务

调用事务管理器的commit方法,提交事务

  1. platformTransactionManager.commit(transactionStatus);

commit源码

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#commit
  2. public final void commit(TransactionStatus status) throws TransactionException {
  3. //事务是否已经完成,此时还未完成,如果事务完成了,再来调用commit方法会报错
  4. if (status.isCompleted()) {
  5. throw new IllegalTransactionStateException(
  6. "Transaction is already completed - do not call commit or rollback more than once per transaction");
  7. }
  8. //事务状态
  9. DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  10. //defStatus.rollbackOnly是否是true,如果是true,说明事务状态被标注了需要回滚,此时走回滚逻辑
  11. if (defStatus.isLocalRollbackOnly()) {
  12. //走回滚逻辑
  13. processRollback(defStatus, false);
  14. return;
  15. }
  16. //提交事务过程
  17. processCommit(defStatus);
  18. }

processCommit源码

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#processCommit
  2. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  3. try {
  4. try {
  5. //提交之前的回调(给开发提供的扩展点)
  6. triggerBeforeCommit(status);
  7. //事务完成之前的回调(给开发提供的扩展点)
  8. triggerBeforeCompletion(status);
  9. //是否是新事务,如果是新事务,将执行提交操作,比如传播行为是REQUIRED中嵌套了一个REQUIRED,那么内部的事务就不是新的事务,外部的事务是新事务
  10. if (status.isNewTransaction()) {
  11. //@5-1:执行提交操作
  12. doCommit(status);
  13. }
  14. } catch (UnexpectedRollbackException ex) {
  15. //事务完成之后执行的回调(给开发提供的扩展点)
  16. triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
  17. throw ex;
  18. } catch (RuntimeException | Error ex) {
  19. //提交过程中有异常,执行回滚操作
  20. doRollbackOnCommitException(status, ex);
  21. throw ex;
  22. }
  23. try {
  24. //事务commit之后,执行一些回调(给开发提供的扩展点)
  25. triggerAfterCommit(status);
  26. } finally {
  27. //事务完成之后,执行一些回调(给开发提供的扩展点)
  28. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
  29. }
  30. } finally {
  31. //事务执行完毕之后,执行一些清理操作
  32. cleanupAfterCompletion(status);
  33. }
  34. }

上面这个方法看起来挺长的,重点会做3件事情:

1、给开发提供的扩展点:以trigger开头的方法,是留给开发的扩展点,可以在事务执行的过程中执行一些回调,主要是在事务提交之前,提交之后,回滚之前,回滚之后,可以执行一些回调,也就是事务同步要干的事情,这个扩展点稍后说。

2、通过connection执行commit操作,对应上面的 @5-1 代码:doCommit(status);

3、完成之后执行清理操作:finally中执行 cleanupAfterCompletion(status);

来看看doCommit(status)方法,内部主要就是调用connection的commit()提交事务,如下:

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
  2. protected void doCommit(DefaultTransactionStatus status) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
  4. //从ConnectionHolder中获取Connection
  5. Connection con = txObject.getConnectionHolder().getConnection();
  6. //执行commit,提交数据库事务
  7. con.commit();
  8. }

cleanupAfterCompletion(status):清理操作

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#cleanupAfterCompletion
  2. private void cleanupAfterCompletion(DefaultTransactionStatus status) {
  3. //将事务状态置为已完成
  4. status.setCompleted();
  5. //是否是新的事务同步
  6. if (status.isNewSynchronization()) {
  7. //将TransactionSynchronizationManager中的那些ThreadLoca中的数据都清除,会调用ThreadLocal的remove()方法清除数据
  8. TransactionSynchronizationManager.clear();
  9. }
  10. //是否是新事务
  11. if (status.isNewTransaction()) {
  12. //执行清理操作
  13. doCleanupAfterCompletion(status.getTransaction());
  14. }
  15. //是否有被挂起的事务
  16. if (status.getSuspendedResources() != null) {
  17. Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
  18. //恢复被挂起的事务
  19. resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
  20. }
  21. }

doCleanupAfterCompletion源码

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager#doCleanupAfterCompletion
  2. protected void doCleanupAfterCompletion(Object transaction) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4. //是否是一个新的ConnectionHolder,如果是新的事务,那么ConnectionHolder是新的
  5. if (txObject.isNewConnectionHolder()) {
  6. //将transactionManager.datasource->ConnectionHolder从resource Threadlocal中干掉
  7. TransactionSynchronizationManager.unbindResource(obtainDataSource());
  8. }
  9. //下面重置Connection,将Connection恢复到最原始的状态
  10. Connection con = txObject.getConnectionHolder().getConnection();
  11. try {
  12. if (txObject.isMustRestoreAutoCommit()) {
  13. //自动提交
  14. con.setAutoCommit(true);
  15. }
  16. //恢复connction的隔离级别、是否是只读事务
  17. DataSourceUtils.resetConnectionAfterTransaction(
  18. con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
  19. } catch (Throwable ex) {
  20. logger.debug("Could not reset JDBC Connection after transaction", ex);
  21. }
  22. //是否是新的连接
  23. if (txObject.isNewConnectionHolder()) {
  24. //释放连接,内部会调用conn.close()方法
  25. DataSourceUtils.releaseConnection(con, this.dataSource);
  26. }
  27. //还原ConnectionHoloder到最初的状态
  28. txObject.getConnectionHolder().clear();
  29. }

终结一下,清理工作主要做的事情就是释放当前线程占有的一切资源,然后将被挂起的事务恢复

6、回滚事务

回滚的操作和提交的操作差不多的,源码我就不讲了,大家自己去看一下。

7、存在事务的情况如何走?

下面来看另外一个流程,REQUIRED中嵌套一个REQUIRED_NEW,然后走到REQUIRED_NEW的时候,代码是如何运行的?大致的过程如下

1、判断上线文中是否有事务

2、挂起当前事务

3、开启新事务,并执行新事务

4、恢复被挂起的事务

7.1、判断是否有事务:isExistingTransaction

判断上线文中是否有事务,比较简单,如下:

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager#isExistingTransaction
  2. protected boolean isExistingTransaction(Object transaction) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4. //txObject.connectionHolder!=null && connectionHolder事务处于开启状态(上面我们介绍过在doBegin开启事务的时候connectionHolder.transactionActive会被置为true)
  5. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
  6. }

7.2、若当前存在事务

我们再来看一下获取事务中,有事务如何走

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
  2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
  3. //获取事务
  4. Object transaction = doGetTransaction();
  5. //是否存在事务
  6. if (isExistingTransaction(transaction)) {
  7. //存在事务会走这里
  8. return handleExistingTransaction(def, transaction, debugEnabled);
  9. }
  10. }

当前存在事务,然后会进入handleExistingTransaction方法

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction
  2. private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
  3. //当前有事务,被嵌套的事务传播行为是PROPAGATION_NEVER,抛出异常
  4. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
  5. throw new IllegalTransactionStateException(
  6. "Existing transaction found for transaction marked with propagation 'never'");
  7. }
  8. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
  9. //当前有事务,被嵌套的事务传播行为是PROPAGATION_NOT_SUPPORTED,那么将先调用suspend将当前事务挂起,然后以无事务的方式运行被嵌套的事务
  10. //挂起当前事务
  11. Object suspendedResources = suspend(transaction);
  12. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
  13. //以无事务的方式运行
  14. return prepareTransactionStatus(
  15. definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  16. }
  17. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
  18. //被嵌套的事务传播行为是PROPAGATION_REQUIRES_NEW,那么会先挂起当前事务,然后会重新开启一个新的事务
  19. //挂起当前事务
  20. SuspendedResourcesHolder suspendedResources = suspend(transaction);
  21. try {
  22. //下面的过程我们就不在再介绍了,之前有介绍过
  23. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  24. DefaultTransactionStatus status = newTransactionStatus(
  25. definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
  26. doBegin(transaction, definition);
  27. prepareSynchronization(status, definition);
  28. return status;
  29. } catch (RuntimeException | Error beginEx) {
  30. resumeAfterBeginException(transaction, suspendedResources, beginEx);
  31. throw beginEx;
  32. }
  33. }
  34. //其他的传播行为走下面。。。,暂时省略了
  35. }

下面重点看事务挂起和事务的恢复操作。

7.3、事务挂起:suspend

事务挂起调用事务管理器的suspend方法,源码如下,主要做的事情:将当前事务中的一切信息保存到SuspendedResourcesHolder对象中,相当于事务的快照,后面恢复的时候用;然后将事务现场清理干净,主要是将一堆存储在ThreadLocal中的事务数据干掉。

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
  2. protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
  3. //当前事务同步是否被激活,如果是新事务,这个返回的是true
  4. if (TransactionSynchronizationManager.isSynchronizationActive()) {
  5. //挂起事务同步,这个地方会可以通过TransactionSynchronization接口给开发者提供了扩展点,稍后我们会单独介绍TransactionSynchronization接口,这个接口专门用来在事务执行过程中做回调的
  6. List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
  7. try {
  8. Object suspendedResources = null;
  9. if (transaction != null) {
  10. //@1:获取挂起的资源
  11. suspendedResources = doSuspend(transaction);
  12. }
  13. //下面就是获取当前事务的各种信息(name,readyOnly,事务隔离级别,是否被激活)
  14. String name = TransactionSynchronizationManager.getCurrentTransactionName();
  15. TransactionSynchronizationManager.setCurrentTransactionName(null);
  16. boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
  17. TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
  18. Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
  19. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
  20. boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
  21. TransactionSynchronizationManager.setActualTransactionActive(false);
  22. return new SuspendedResourcesHolder(
  23. suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
  24. }
  25. }
  26. }

下面来看看@1:doSuspend(transaction)源码,主要就是将datasource->connectionHolder从resource ThreadLocal中解绑,然后将connectionHolder返回,下面这个方法实际上返回的就是connectionHolder对象

  1. org.springframework.jdbc.datasource.DataSourceTransactionManager#doSuspend
  2. protected Object doSuspend(Object transaction) {
  3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
  4. //将connectionHolder置为null
  5. txObject.setConnectionHolder(null);
  6. //将datasource->connectionHolder从resource ThreadLocal中解绑,并返回被解绑的connectionHolder对象
  7. return TransactionSynchronizationManager.unbindResource(obtainDataSource());
  8. }

此时,当前的事务被挂起了,然后开启一个新的事务,新的事务的过程上面已经介绍过了,下面我们来看事务的恢复过程。

7.4、事务恢复:resume

事务挂起调用事务管理器的resume方法,源码如下,主要做的事情:通过SuspendedResourcesHolder对象中,将被挂起的事务恢复,SuspendedResourcesHolder对象中保存了被挂起的事务所有信息,所以可以通过这个对象来恢复事务。

  1. org.springframework.transaction.support.AbstractPlatformTransactionManager#resume
  2. protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
  3. throws TransactionException {
  4. if (resourcesHolder != null) {
  5. Object suspendedResources = resourcesHolder.suspendedResources;
  6. if (suspendedResources != null) {
  7. //恢复被挂起的资源,也就是将datasource->connectionHolder绑定到resource ThreadLocal中
  8. doResume(transaction, suspendedResources);
  9. }
  10. List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
  11. //下面就是将数据恢复到各种ThreadLocal中
  12. if (suspendedSynchronizations != null) {
  13. TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
  14. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
  15. TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
  16. TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
  17. //恢复事务同步(将事务扩展点恢复)
  18. doResumeSynchronization(suspendedSynchronizations);
  19. }
  20. }
  21. }

8、事务执行过程中的回调接口: TransactionSynchronization

8.1、作用

spring事务运行的过程中,给开发者预留了一些扩展点,在事务执行的不同阶段,将回调扩展点中的一些方法。

比如我们想在事务提交之前、提交之后、回滚之前、回滚之后做一些事务,那么可以通过扩展点来实现。

8.2、扩展点的用法

1、定义事务TransactionSynchronization对象

TransactionSynchronization接口中的方法在spring事务执行的过程中会自动被回调

  1. public interface TransactionSynchronization extends Flushable {
  2. //提交状态
  3. int STATUS_COMMITTED = 0;
  4. //回滚状态
  5. int STATUS_ROLLED_BACK = 1;
  6. //状态未知,比如事务提交或者回滚的过程中发生了异常,那么事务的状态是未知的
  7. int STATUS_UNKNOWN = 2;
  8. //事务被挂起的时候会调用被挂起事务中所有TransactionSynchronization的resume方法
  9. default void suspend() {
  10. }
  11. //事务恢复的过程中会调用被恢复的事务中所有TransactionSynchronization的resume方法
  12. default void resume() {
  13. }
  14. //清理操作
  15. @Override
  16. default void flush() {
  17. }
  18. //事务提交之前调用
  19. default void beforeCommit(boolean readOnly) {
  20. }
  21. //事务提交或者回滚之前调用
  22. default void beforeCompletion() {
  23. }
  24. //事务commit之后调用
  25. default void afterCommit() {
  26. }
  27. //事务完成之后调用
  28. default void afterCompletion(int status) {
  29. }
  30. }

2、将TransactionSynchronization注册到当前事务中

通过下面静态方法将事务扩展点TransactionSynchronization注册到当前事务中

  1. TransactionSynchronizationManager.registerSynchronization(transactionSynchronization)

看一下源码,很简单,丢到ThreadLocal中了

  1. private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
  2. new NamedThreadLocal<>("Transaction synchronizations");
  3. public static void registerSynchronization(TransactionSynchronization synchronization)
  4. throws IllegalStateException {
  5. Set<TransactionSynchronization> synchs = synchronizations.get();
  6. if (synchs == null) {
  7. throw new IllegalStateException("Transaction synchronization is not active");
  8. }
  9. synchs.add(synchronization);
  10. }

当有多个TransactionSynchronization的时候,可以指定其顺序,可以实现org.springframework.core.Ordered接口,来指定顺序,从小大的排序被调用,TransactionSynchronization有个默认适配器TransactionSynchronizationAdapter,这个类实现了Ordered接口,所以,如果我们要使用的时候,直接使用TransactionSynchronizationAdapter这个类。

3、回调扩展点TransactionSynchronization中的方法

TransactionSynchronization中的方法是spring事务管理器自动调用的,本文上面有提交到,事务管理器在事务提交或者事务回滚的过程中,有很多地方会调用trigger开头的方法,这个trigger方法内部就会遍历当前事务中的transactionSynchronization列表,然后调用transactionSynchronization内部的一些指定的方法。

以事务提交的源码为例,来看一下

  1. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  2. triggerBeforeCommit(status);
  3. triggerBeforeCompletion(status);
  4. //....其他代码省略
  5. }

triggerBeforeCommit(status)源码

  1. protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
  2. if (status.isNewSynchronization()) {
  3. TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
  4. }
  5. }

TransactionSynchronizationUtils.triggerBeforeCommit 源码

  1. public static void triggerBeforeCommit(boolean readOnly) {
  2. for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
  3. synchronization.beforeCommit(readOnly);
  4. }
  5. }

8.3、来个案例

1、执行sql

  1. DROP DATABASE IF EXISTS javacode2018;
  2. CREATE DATABASE if NOT EXISTS javacode2018;
  3. USE javacode2018;
  4. DROP TABLE IF EXISTS t_user;
  5. CREATE TABLE t_user(
  6. id int PRIMARY KEY AUTO_INCREMENT,
  7. name varchar(256) NOT NULL DEFAULT '' COMMENT '姓名'
  8. );

2、案例代码

代码比较简单,不多解释了,运行测试方法m0

  1. package com.javacode2018.tx.demo9;
  2. import org.junit.Before;
  3. import org.junit.Test;
  4. import org.springframework.jdbc.core.JdbcTemplate;
  5. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  6. import org.springframework.transaction.PlatformTransactionManager;
  7. import org.springframework.transaction.TransactionDefinition;
  8. import org.springframework.transaction.TransactionStatus;
  9. import org.springframework.transaction.support.DefaultTransactionDefinition;
  10. import org.springframework.transaction.support.TransactionSynchronizationAdapter;
  11. import org.springframework.transaction.support.TransactionSynchronizationManager;
  12. /**
  13. * 公众号:路人甲java,工作10年的前阿里P7,所有文章以系列的方式呈现,带领大家成为java高手,
  14. * 目前已出:java高并发系列、mysq|高手系列、Maven高手系列、mybatis系列、spring系列,
  15. * 正在连载springcloud系列,欢迎关注!
  16. */
  17. public class Demo9Test {
  18. JdbcTemplate jdbcTemplate;
  19. PlatformTransactionManager platformTransactionManager;
  20. @Before
  21. public void before() {
  22. //定义一个数据源
  23. org.apache.tomcat.jdbc.pool.DataSource dataSource = new org.apache.tomcat.jdbc.pool.DataSource();
  24. dataSource.setDriverClassName("com.mysql.jdbc.Driver");
  25. dataSource.setUrl("jdbc:mysql://localhost:3306/javacode2018?characterEncoding=UTF-8");
  26. dataSource.setUsername("root");
  27. dataSource.setPassword("root123");
  28. dataSource.setInitialSize(5);
  29. //定义一个JdbcTemplate,用来方便执行数据库增删改查
  30. this.jdbcTemplate = new JdbcTemplate(dataSource);
  31. this.platformTransactionManager = new DataSourceTransactionManager(dataSource);
  32. this.jdbcTemplate.update("truncate table t_user");
  33. }
  34. @Test
  35. public void m0() throws Exception {
  36. System.out.println("PROPAGATION_REQUIRED start");
  37. //2.定义事务属性:TransactionDefinition,TransactionDefinition可以用来配置事务的属性信息,比如事务隔离级别、事务超时时间、事务传播方式、是否是只读事务等等。
  38. TransactionDefinition transactionDefinition = new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRED);
  39. //3.开启事务:调用platformTransactionManager.getTransaction开启事务操作,得到事务状态(TransactionStatus)对象
  40. TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
  41. this.addSynchronization("ts-1", 2);
  42. this.addSynchronization("ts-2", 1);
  43. //4.执行业务操作,下面就执行2个插入操作
  44. jdbcTemplate.update("insert into t_user (name) values (?)", "test1-1");
  45. jdbcTemplate.update("insert into t_user (name) values (?)", "test1-2");
  46. this.m1();
  47. //5.提交事务:platformTransactionManager.commit
  48. System.out.println("PROPAGATION_REQUIRED 准备commit");
  49. platformTransactionManager.commit(transactionStatus);
  50. System.out.println("PROPAGATION_REQUIRED commit完毕");
  51. System.out.println("after:" + jdbcTemplate.queryForList("SELECT * from t_user"));
  52. }
  53. public void m1() {
  54. System.out.println("PROPAGATION_REQUIRES_NEW start");
  55. TransactionDefinition transactionDefinition = new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
  56. TransactionStatus transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
  57. jdbcTemplate.update("insert into t_user (name) values (?)", "test2-1");
  58. jdbcTemplate.update("insert into t_user (name) values (?)", "test2-2");
  59. this.addSynchronization("ts-3", 2);
  60. this.addSynchronization("ts-4", 1);
  61. System.out.println("PROPAGATION_REQUIRES_NEW 准备commit");
  62. platformTransactionManager.commit(transactionStatus);
  63. System.out.println("PROPAGATION_REQUIRES_NEW commit完毕");
  64. }
  65. public void addSynchronization(final String name, final int order) {
  66. if (TransactionSynchronizationManager.isSynchronizationActive()) {
  67. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
  68. @Override
  69. public int getOrder() {
  70. return order;
  71. }
  72. @Override
  73. public void suspend() {
  74. System.out.println(name + ":suspend");
  75. }
  76. @Override
  77. public void resume() {
  78. System.out.println(name + ":resume");
  79. }
  80. @Override
  81. public void flush() {
  82. System.out.println(name + ":flush");
  83. }
  84. @Override
  85. public void beforeCommit(boolean readOnly) {
  86. System.out.println(name + ":beforeCommit:" + readOnly);
  87. }
  88. @Override
  89. public void beforeCompletion() {
  90. System.out.println(name + ":beforeCompletion");
  91. }
  92. @Override
  93. public void afterCommit() {
  94. System.out.println(name + ":afterCommit");
  95. }
  96. @Override
  97. public void afterCompletion(int status) {
  98. System.out.println(name + ":afterCompletion:" + status);
  99. }
  100. });
  101. }
  102. }
  103. }

3、输出

  1. PROPAGATION_REQUIRED start
  2. PROPAGATION_REQUIRES_NEW start
  3. ts-2:suspend
  4. ts-1:suspend
  5. PROPAGATION_REQUIRES_NEW 准备commit
  6. ts-4:beforeCommit:false
  7. ts-3:beforeCommit:false
  8. ts-4:beforeCompletion
  9. ts-3:beforeCompletion
  10. ts-4:afterCommit
  11. ts-3:afterCommit
  12. ts-4:afterCompletion:0
  13. ts-3:afterCompletion:0
  14. ts-2:resume
  15. ts-1:resume
  16. PROPAGATION_REQUIRES_NEW commit完毕
  17. PROPAGATION_REQUIRED 准备commit
  18. ts-2:beforeCommit:false
  19. ts-1:beforeCommit:false
  20. ts-2:beforeCompletion
  21. ts-1:beforeCompletion
  22. ts-2:afterCommit
  23. ts-1:afterCommit
  24. ts-2:afterCompletion:0
  25. ts-1:afterCompletion:0
  26. PROPAGATION_REQUIRED commit完毕
  27. after:[{id=1, name=test1-1}, {id=2, name=test1-2}, {id=3, name=test2-1}, {id=4, name=test2-2}]

输出配合案例源码,大家理解一下。

总结一下

今天的内容挺长的,辛苦大家了,不过我相信spring事务这块吃透了,会让你收获很多,加油!

事务方面有任何问题的欢迎给我留言,下篇文章将解析声明式事务的源码,敬请期待!!**

案例源码

  1. git地址:
  2. https://gitee.com/javacode2018/spring-series
  3. 本文案例对应源码:
  4. spring-series\lesson-002-tx\src\main\java\com\javacode2018\tx\demo9

本博客所有系列案例代码以后都会放到这个上面,大家watch一下,可以持续关注动态。

最新资料

更多内容请访问:IT源点

全部评论: 0

    我有话说: