Redian新闻
>
Spring在多线程环境下如何确保事务一致性

Spring在多线程环境下如何确保事务一致性

公众号新闻

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 

来源:blog.csdn.net/m0_53157173

/article/details/127423286


问题在现

我先把问题抛出来,大家就明白本文目的在于解决什么样的业务痛点了:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    //1.查询出当前资源模块下所有资源,查询出来后进行删除
    deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //2.查询出当前资源模块下所有子模块,递归查询,当删除完所有子模块下的资源后,再删除所有子模块,最终删除当前资源模块
    deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
    //3.删除当前资源模块
    removeById(authorityModuleId);
}

如果我希望将步骤1和步骤2并行执行,然后确保步骤1和步骤2执行成功后,再执行步骤3,等到步骤3执行完毕后,再提交全部事务,这个需求该如何实现呢?

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

如何解决异步执行

上面需求第一点是: 如何让任务异步并行执行,如何实现二元依赖呢?

说到异步执行,很多小伙伴首先想到Spring中提供的@Async注解,但是Spring提供的异步执行任务能力并不足以解决我们当前的需求。

@Async注解原理简单来说,就是扫描IOC中的bean,给方法上标注有@Async注解的bean进行代理,代理的核心是添加一个MethodInterceptorAsyncExecutionInterceptor,该方法拦截器负责将方法真正的执行包装为任务,放入线程池中执行。

下面我们先使用CompletableFuture来完成我们第一步需求:

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //两个并行执行的任务
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待两个并行任务执行完后,再执行最后一个步骤
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId)); 
    },executor);
}

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

多线程环境下如何确保事务一致性

我们已经完成了任务的异步执行化,那么又如何确保多线程环境下的事务一致性问题呢?

public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
    CompletableFuture.runAsync(()->{
        //两个并行执行的任务
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
                deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
                deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
        //等待两个并行任务执行完后,再执行最后一个步骤
        CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
    },executor);
}

在Spring环境下说到事务控制,大家第一反应就想到使用@Transactional注解解决问题,但是这里显然行不通,为什么行不通呢?

我还是简单的对Spring事务实现原理进行一番概括:

事务王国回顾

事务管理大体分为三个流程: 事务创建 ,事务执行,事务结束

事务创建涉及到一些属性的配置,如:

  • 事务的隔离级别
  • 事务的传播行为
  • 事务的超时时间
  • 是否为只读事务

由于涉及属性颇多,并且后期还有可能进行扩展,因此必须通过一个类来封装这些属性,在Spring中对应TransactionDefinition

有了事务相关属性定义后,我们就可以利用TransactionDefinition来创建一个事务了,在Spring中局部事务由PlatformTransactionManager负责管理,创建事务也是由PlatformTransactionManager负责提供:

 TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
   throws TransactionException
;

如果我们希望追踪事务的状态,例如: 事务已完成,事务回滚等,那么就需要一个事务状态类贯穿当前事务的执行流程,在Spring中由TransactionStatus负责完成。

对于常见的数据源而言,通常需要记录的事务状态有如下几点:

  • 当前事务是否是新事务
  • 当前事务是否结束
  • 当前事务是否需要回滚(通过标记来判断,因此我也可以在业务流程中手动设置标记为true,来让事务在没有发生异常的情况下进行回滚)
  • 当前事务是否设置了回滚点(savePoint)

事务的执行过程就是具体业务代码的执行流程,这里就不多说了。

事务的结束分为两种情况: 需要进行事务回滚或者事务正常提交,如果是事务回滚,还需要判断TransactionStatus 中的savePoint是否被设置了。

事务实现方式回顾

Spring中常见的事务实现方式有两种: 编程式和声明式。

编程式事务使用是本文重点,因此这里按下不表,我们先来复习一下声明式事务的使用

声明式事务就是使用我们常见的@Transactional注解完成的,声明式事务优点就在于让事务代码与业务代码解耦,通过Spring中提供的声明式事务使用,我们也可以发觉我们只需要编写业务代码即可,而事务的管理基本不需要我们操心,Spring就像使用了魔法一样,帮我们自动完成了。

之所以那么神奇,本质还是依靠Spring框架提供的Bean生命周期相关回调接口和AOP结合完成的,简述如下:

  • 通过自动代理创建器依次尝试为每个放入容器中的bean尝试进行代理
  • 尝试进行代理的过程对于事务管理来说,就是利用事务管理涉及到的增强器advisor,即TransactionAttributeSourceAdvisor
  • 判断当前增强器是否能够应用与当前bean上,怎么判断呢? —> advisor内部的pointCut喽 !
  • 如果能够应用,那么好,为当前bean创建代理对象返回,并且往代理对象内部添加一个TransactionInterceptor拦截器。
  • 此时我们再从容器中获取,拿到的就是代理对象了,当我们调用代理对象的方法时,首先要经过代理对象内部拦截器链的处理,处理完后,最终才会调用被代理对象的方法。(这里其实就是责任链模式的应用)

对于被事务增强器TransactionAttributeSourceAdvisor代理的bean而言,代理对象内部会存在一个TransactionInterceptor,该拦截器内部构造了一个事务执行的模板流程:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
   final InvocationCallback invocation)
 throws Throwable 
{
  //TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源
  //可以看做是一个存放TransactionAttribute与method方法映射的池子
  TransactionAttributeSource tas = getTransactionAttributeSource();
  //获取当前事务方法对应的TransactionAttribute
  final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  //定位TransactionManager
  final TransactionManager tm = determineTransactionManager(txAttr);
        .....
        //类型转换为局部事务管理器
  PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
   //TransactionManager根据TransactionAttribute创建事务后返回
   //TransactionInfo封装了当前事务的信息--包括TransactionStatus
   TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

   Object retVal;
   try {
    //继续执行过滤器链---过滤链最终会调用目标方法
    //因此可以理解为这里是调用目标方法
    retVal = invocation.proceedWithInvocation();
   }
   catch (Throwable ex) {
    //目标方法抛出异常则进行判断是否需要回滚
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
   }
   finally {
       //清除当前事务信息
    cleanupTransactionInfo(txInfo);
   }
            ...
            //正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)
   commitTransactionAfterReturning(txInfo);
   return retVal;
  }
  ...

编程式事务

还记得本文一开始提出的业务需求吗?

不清楚,可以回看一下,在上文,我们已经解决了任务异步并行执行的难题,下面我们需要解决的就是如何确保Spring在多线程环境下也能保持事务一致性。

通过上文对Spring事务基础和声明式事务的原理回顾,相信大家也发现了,声明式事务并不能解决我们当前的问题,那么就只能求助于编程式事务了。

那么编程式事务是什么样子呢?

其实上面TransactionInterceptor给出的那套模板流程,就是编程式事务使用的模范案例,我们可以简化上面的模板流程,简单使用如下:

public class TransactionMain {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        test();
    }

    private static void test() {
        DataSource dataSource = getDS();
        JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
        //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
        //包括隔离级别和传播行为等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
        TransactionStatus ts = jtm.getTransaction(transactionDef);
        //进行业务逻辑操作
        try {
            update(dataSource);
            jtm.commit(ts);
        }catch (Exception e){
            jtm.rollback(ts);
            System.out.println("发生异常,我已回滚");
        }
    }

    private static void update(DataSource dataSource) throws Exception {
        JdbcTemplate jt = new JdbcTemplate();
        jt.setDataSource(dataSource);
        jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
        throw new Exception("我是来捣乱的");
    }
}

利用编程式事务解决问题

我们明白了编程式事务的使用,相信大家也都知道问题如何解决了,下面我给出一份看似正确的解决方案:

package com.user.util;

import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 多线程事务一致性管理 <br>
 * 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */

@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多数据源的情况下,需要指定具体是哪一个数据源
     */

    private final DataSource dataSource;

    /**
     * 执行的是无返回值的任务
     * @param tasks 异步执行的任务列表
     * @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
     */

    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("线程池不能为空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否发生了异常
        AtomicBoolean ex=new AtomicBoolean();

        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());

        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.开启新事务
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.异步任务执行
                            task.run();
                        }catch (Throwable throwable){
                            //打印异常
                            throwable.printStackTrace();
                            //其中某个异步任务执行出现了异常,进行标记
                            ex.set(Boolean.TRUE);
                            //其他任务还没执行的不需要执行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });

        try {
            //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        //发生了异常则进行回滚操作,否则提交
        if(ex.get()){
            System.out.println("发生异常,全部事务回滚");
            transactionStatusList.forEach(transactionManager::rollback);
        }else {
            System.out.println("全部事务正常提交");
            transactionStatusList.forEach(transactionManager::commit);
        }
    }

    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
        //包括隔离级别和传播行为等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
        return transactionManager.getTransaction(transactionDef);
    }

    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }
}

大家思考上面的代码存在问题吗?

测试:

public void test(){
    List<Runnable> tasks=new ArrayList<>();

    tasks.add(()->{
       userMapper.deleteById(26);
    });

    tasks.add(()->{
        signMapper.deleteById(10);
    });

    multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}

任务正常都执行完毕,事务进行提交,但是会抛出异常,导致事务回滚:

抓关键字:

No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解释: 无法在当前线程绑定的threadLocal中寻找到HikariDataSource作为key,对应关联的资源对象ConnectionHolder

这里需要再次回顾一下Spring事务实现的小细节:

一次事务的完成通常都是默认在当前线程内完成的,又因为一次事务的执行过程中,涉及到对当前数据库连接Connection的操作,因此为了避免将Connection在事务执行过程中来回传递,我们可以将Connextion绑定到当前事务执行线程对应的ThreadLocalMap内部,顺便还可以将一些其他属性也放入其中进行保存,在Spring中,负责保存这些ThreadLocal属性的实现类由TransactionSynchronizationManager承担。

TransactionSynchronizationManager类内部默认提供了下面六个ThreadLocal属性,分别保存当前线程对应的不同事务资源:

   //保存当前事务关联的资源--默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系--当然这里Connection被包装为了ConnectionHolder
 private static final ThreadLocal<Map<Object, Object>> resources =
   new NamedThreadLocal<>("Transactional resources");
    //事务监听者--在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用)---默认为空集合
 private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
   new NamedThreadLocal<>("Transaction synchronizations");
   //见名知意: 存放当前事务名字
 private static final ThreadLocal<String> currentTransactionName =
   new NamedThreadLocal<>("Current transaction name");
   //见名知意: 存放当前事务是否是只读事务
 private static final ThreadLocal<Boolean> currentTransactionReadOnly =
   new NamedThreadLocal<>("Current transaction read-only status");
   //见名知意: 存放当前事务的隔离级别
 private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
   new NamedThreadLocal<>("Current transaction isolation level");
   //见名知意: 存放当前事务是否处于激活状态
 private static final ThreadLocal<Boolean> actualTransactionActive =
   new NamedThreadLocal<>("Actual transaction active");

那么上面抛出的异常的原因也就很清楚了,无法在main线程找到当前事务对应的资源,原因如下:

开启新事务时,事务相关资源都被绑定到了thread-cache-pool-1线程对应的threadLocalMap内部,而当执行事务提交代码时,commit内部需要从TransactionSynchronizationManager中获取当前事务的资源,显然我们无法从main线程对应的threadLocalMap中获取到对应的事务资源,这也就是异常抛出的原因。

问题分析完了,那么如何解决问题呢?

这里给出一个我首先想到的简单粗暴的方法—CopyTransactionResource—将事务资源在两个线程间来回复制

这里给出解决后问题后的代码示例:

package com.user.util;

import lombok.Builder;
import lombok.RequiredArgsConstructor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.sql.DataSource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 多线程事务一致性管理 <br>
 * 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行
 * @author 大忽悠
 * @create 2022/10/19 21:34
 */

@Component
@RequiredArgsConstructor
public class MultiplyThreadTransactionManager {
    /**
     * 如果是多数据源的情况下,需要指定具体是哪一个数据源
     */

    private final DataSource dataSource;

    /**
     * 执行的是无返回值的任务
     * @param tasks 异步执行的任务列表
     * @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
     */

    public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
        if(executor==null){
            throw new IllegalArgumentException("线程池不能为空");
        }
        DataSourceTransactionManager transactionManager = getTransactionManager();
        //是否发生了异常
        AtomicBoolean ex=new AtomicBoolean();

        List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
        List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
        List<TransactionResource> transactionResources=new ArrayList<>(tasks.size());

        tasks.forEach(task->{
            taskFutureList.add(CompletableFuture.runAsync(
                    () -> {
                        try{
                            //1.开启新事务
                            transactionStatusList.add(openNewTransaction(transactionManager));
                            //2.copy事务资源
                         transactionResources.add(TransactionResource.copyTransactionResource());
                            //3.异步任务执行
                            task.run();
                        }catch (Throwable throwable){
                            //打印异常
                            throwable.printStackTrace();
                            //其中某个异步任务执行出现了异常,进行标记
                            ex.set(Boolean.TRUE);
                            //其他任务还没执行的不需要执行了
                            taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
                        }
                    }
                    , executor)
            );
        });

        try {
            //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
            CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        //发生了异常则进行回滚操作,否则提交
        if(ex.get()){
            System.out.println("发生异常,全部事务回滚");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.rollback(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }else {
            System.out.println("全部事务正常提交");
            for (int i = 0; i < tasks.size(); i++) {
                transactionResources.get(i).autoWiredTransactionResource();
                transactionManager.commit(transactionStatusList.get(i));
                transactionResources.get(i).removeTransactionResource();
            }
        }
    }

    private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
        //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
        //包括隔离级别和传播行为等
        DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
        //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
        return transactionManager.getTransaction(transactionDef);
    }

    private DataSourceTransactionManager getTransactionManager() {
        return new DataSourceTransactionManager(dataSource);
    }

    /**
     * 保存当前事务资源,用于线程间的事务资源COPY操作
     */

    @Builder
    private static class TransactionResource{
        //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
        private  Map<Object, Object> resources = new HashMap<>();

        //下面五个属性会在事务结束后被自动清理,无需我们手动清理
        private  Set<TransactionSynchronization> synchronizations =new HashSet<>();

        private  String currentTransactionName;

        private Boolean currentTransactionReadOnly;

        private Integer currentTransactionIsolationLevel;

        private Boolean actualTransactionActive;

        public static TransactionResource copyTransactionResource(){
            return TransactionResource.builder()
                    //返回的是不可变集合
                    .resources(TransactionSynchronizationManager.getResourceMap())
                    //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
                    .synchronizations(new LinkedHashSet<>())
                    .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
                    .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
                    .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
                    .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
                    .build();
        }

        public void autoWiredTransactionResource(){
             resources.forEach(TransactionSynchronizationManager::bindResource);
             //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
             TransactionSynchronizationManager.initSynchronization();
             TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
             TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
             TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
             TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
        }

        public void removeTransactionResource() {
            //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
            //DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
            resources.keySet().forEach(key->{
                if(!(key instanceof  DataSource)){
                    TransactionSynchronizationManager.unbindResource(key);
                }
            });
        }
    }
}

增加异常抛出,测试是否能够保证多线程间的事务一致性:

@SpringBootTest(classes = UserMain.class)
public class Test 
{
    @Resource
    private UserMapper userMapper;
    @Resource
    private SignMapper signMapper;
    @Resource
    private MultiplyThreadTransactionManager multiplyThreadTransactionManager;

    @SneakyThrows
    @org.junit.jupiter.api.Test
    public void test()
{
        List<Runnable> tasks=new ArrayList<>();

        tasks.add(()->{
                userMapper.deleteById(26);
                throw new RuntimeException("我就要抛出异常!");
        });

        tasks.add(()->{
            signMapper.deleteById(10);
        });

        multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
    }

}

事务都进行了回滚,数据库数据没变。

小结

本文给出的只是一个方法,为了实现多线程事务一致性,我们还有很多方法,例如和本文一样的思想,直接利用JDBC提供的API来手动控制事务提交和回滚,或者可以尝试采用分布式事务的思路来解决问题。

大家之所以会被这个问题难住,主要是因为对Spring框架提供的便捷声明式事务支持中毒太深,以至于脑海中对事务的认知完全停留在@Transactional注解的层面,多了解底层基础设施,才能做到遇事不慌。



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
《女の生きがい》面试难题:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!Python社区变天:可去除全局解释器锁GIL,真正多线程要来了Spring循环依赖那些事儿(含Spring详细流程图)In Northeast China, a New ‘Fangcang’ Hospital Sparks UproarSpring 6.1 M4发布,已兼容虚拟线程和JDK 21大温房产销量与价格一路飙升,高利率环境下竟然不降反涨?微服务架构中的数据一致性:解决方案与实践保姆级教程:Spring Cloud 集成 Seata 分布式事务深度解析乳腺癌中原发灶和转移灶之间HER2低表达的不一致性CPU缓存一致性协议MESIDubbo负载均衡策略之 一致性哈希缓存数据一致性探究Python社区大变天!可去除全局解释器锁GIL,真正多线程要来了!城市里手离方向盘的司机越来越多,如何确保这不是一场灾难?CCKS 2023 | 开放环境下的知识图谱挑战赛上线,奖金等你来拿!Redis和MySQL双写一致性如何保证?这个方案够优雅!竟然是个大外宣缩水超70%!2023-24财年澳洲州担配额削减的环境下,未来到底要选什么专业?重磅!Spring 6.1 M4发布,已兼容虚拟线程和JDK 21大量图片,请在WIFI环境下观看!“越南小腾讯”VNG在美国申请IPO,计划发行近2200万股Python 社区变天:可去除全局解释器锁 GIL,真正多线程要来了细数线程池的10个坑,面试线程不怕不怕啦技术派中的缓存一致性解决方案In-Context-Learning在更大的语言模型上表现不同多线程如何实现事务回滚?一招帮你搞定!记一次容器环境下出现Address not available推荐35款 SpringBoot/SpringCloud 开源项目,附源码在这个大环境下,我是如何找工作的为了你走遍草原 第十二章要命的屁股---东南大学泄密之事的警醒一。明月初升离大海,求下联。不确定环境下,如何实现“退休自由”?清华AM: 用离子凝胶电极在近生理环境下研究蛋白质隧穿结
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。