Redian新闻
>
SpringBoot 使用线程池如何控制主线程和子线程的事务

SpringBoot 使用线程池如何控制主线程和子线程的事务

公众号新闻

点击上方“芋道源码”,选择“设为星标

管她前浪,还是后浪?

能浪的浪,才是好浪!

每天 10:33 更新文章,每天掉亿点点头发...

源码精品专栏

 
来源:huaweicloud.csdn.net/
63876ff5dacf622b8df8c462.html

一、使用场景

数据库有两张表 t_persont_school 如下:前端传来10000条person数据要插入到t_person,同时要删除t_school表中id为1的数据(为提高效率采用线程池做)

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

二、思路

1、要保证主线程和子线程使用的同一个sqlSession

2、手动控制提交和回滚

3、将10000条数据均分成10份,每份1000条,创建10个任务,放入线程池执行!

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

三、代码及注释如下:

1、核心业务代码

@Service
public class PersonServiceImpl extends ServiceImpl<PersonMapperPersonimplements IPersonService {

    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;

    @Autowired
    private SchoolMapper schoolMapper;

    private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);

    private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();

    //1、创建核心线程为10的线程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,10, TimeUnit.SECONDS
            ,queue,policy);


    @Override
    public int insertPerson(Person person) {
        return this.baseMapper.insert(person);
    }

    @Override
    @Transactional
    public void inserPersonBatch(List<Person> list) throws SQLException {

        //2、根据sqlSessionTemplate获取SqlSession工厂
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        SqlSession sqlSession = sqlSessionFactory.openSession();
        //3、获取Connection来手动控制事务
        Connection connection = sqlSession.getConnection();
        try{
            //4、设置手动提交
            connection.setAutoCommit(false);
            //5、获取PersonMapper(此处是由于无法通过this.baseMapper调用自定义的saveBatch方法)
            PersonMapper mapper = sqlSession.getMapper(PersonMapper.class);
            //6、主线程去删除t_school表中id为1的数据
            schoolMapper.deleteById("1");
            //7、将传入List中的10000个数据按1000一组均分成10组
            List<List<Person>> lists = ListUtils.averageAssign(list,1000);
            //8、新建任务列表
            List<Callable<Integer>> callableList = new ArrayList<>();
            //9、根据均分的5组数据分别新建5个Callable任务
            for(int i = 0; i < lists.size(); i++){
                List<Person> insertList = lists.get(i);
                Callable<Integer> callable = new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int n = 0;
                        try{
                            n = mapper.saveBatch(insertList);
                        }catch (Exception e){
                            //插入失败返回0
                            return n;
                        }
                        //插入成功返回成功提交数
                        return n;
                    }
                };
                callableList.add(callable);
            }

            //10、任务放入线程池开始执行
            List<Future<Integer>> futures = executor.invokeAll(callableList);
            //11、对比每个任务的返回值 <= 0 代表执行失败
            for(Future<Integer> future : futures){
                if(future.get() <= 0){
                    //12、只要有一组任务失败回滚整个connection
                    connection.rollback();
                    return;
                }
            }
            //13、主线程和子线程都执行成功 直接提交
            connection.commit();
            System.out.println("添加成功!");

        }catch (Exception e){
            //14、主线程报错回滚
            connection.rollback();
            log.error(e.toString());
            throw new SQLException("出现异常!");
        }
        return;
    }
}

2、PersonMapper中自定义批量插入

<insert id="saveBatch" parameterType="list">
    insert into t_person(id,name,age,addr,classes,school_id)
    values
        <foreach collection
="list" index="index" item="item" separator=",">
            (
             #{item.id},
            #{item.name},
            #{item.age},
            #{item.addr},
            #{item.classes},
            #{item.schoolId}
            )
        </foreach>
</insert>

3、均分List工具类

public class ListUtils {

    public static <T> List<List<T>> averageAssign(List<T> source, int limit) {
        if (null == source || source.isEmpty()) {
            return Collections.emptyList();
        }
        List<List<T>> result = new ArrayList<>();
        int listCount = (source.size() - 1) / limit + 1;
        int remaider = source.size() % listCount; // (先计算出余数)
        int number = source.size() / listCount; // 然后是商
        int offset = 0;// 偏移量
        for (int i = 0; i < listCount; i++) {
            List<T> value;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }
}

四、测试验证:

controller层如下:传入10000条数据

@GetMapping("/addBatch")
public void addBatch() {
    List<Person> list = new ArrayList<>();
    for(int i = 1; i <= 10000; i++){
        Person p = new Person();
        p.setId(i);
        p.setName("张三" + i);
        p.setAge(i);
        p.setAddr("重庆");
        p.setClasses("一班");
        p.setSchoolId(i);
        list.add(p);
    }
    try{
        this.iPersonService.inserPersonBatch(list);
    }catch (Exception e){
        e.printStackTrace();
    }
}

1、情况1:子线程中有一个执行失败

t_person表主键唯一  10000条Person数据id按1—10000设置

如图t_person表中已经有一条id为201的数据 所以线程池中有一个任务执行会失败!

我们打断点来看:此时已经分配好10个任务

如下图:插入id为201的数据时失败,线程池第一个任务执行失败返回0,其余全部成功返回1000

执行rollback回滚

执行完毕观察数据库:

t_school表数据没有被删,

t_person表数据也没有变化

2、情况2、删除 t_person表中id为201的数据重新插入

此时10个任务全部执行成功:

执行commit

执行完毕观察数据库:

t_school表数据已被删除

t_person表中10000条数据也成功插入:

3、情况3:主线程报错就不演示了

以上测试成功!



欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

最近更新《芋道 SpringBoot 2.X 入门》系列,已经 101 余篇,覆盖了 MyBatis、Redis、MongoDB、ES、分库分表、读写分离、SpringMVC、Webflux、权限、WebSocket、Dubbo、RabbitMQ、RocketMQ、Kafka、性能测试等等内容。

提供近 3W 行代码的 SpringBoot 示例,以及超 4W 行代码的电商微服务项目。

获取方式:点“在看”,关注公众号并回复 666 领取,更多内容陆续奉上。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

微信扫码关注该文公众号作者

戳这里提交新闻线索和高质量文章给我们。
相关阅读
SpringBoot + MDC 实现全链路调用日志跟踪深入剖析 Spring Boot 的 SPI 机制SpringBoot+Vue 实现网页版人脸登录、人脸识别,逼格很高!!!SpringBoot 应用的新命令行界面:JustSpringBoot 启动优化实践!SpringBoot 如何快速过滤出一次请求的所有日志?SpringBoot 整合 ChatGPT API 项目实战字节:SpringBoot 启动流程SpringBoot 实现 MySQL 百万级数据量导出并避免 OOM 的解决方案ThreadLocal 搭配线程池使用造成内存泄漏的原因和解决方案SpringBoot + Druid,完美监控 MySQL 性能用这4招 优雅的实现Spring Boot 异步线程间数据传递Springboot代码混淆,别再让代码在线上进行裸奔这是我见过最好的SpringBoot系统!SpringBoot+Flowable 快速实现工作流,so easy!Spring for Apache Kafka 3.0 和 Spring for RabbitMQ 3.0 发布德国安全吗?ThreadLocal 父子线程之间该如何传递数据?SpringBoot 多数据源及事务解决方案SpringBoot 实现 Excel 导入导出,百万数据量,性能爆表!K8s + SpringBoot实现零宕机发布SpringBoot+Prometheus+Grafana 实现自定义监控别再自己瞎写工具类了,SpringBoot 内置工具类应有尽有,建议收藏!!利用 Nacos 实现了一个动态化线程池,非常实用!动态可监控线程池,你还没用起来吗?妥了...还是SpringBoot够牛逼!SpringBoot 我随手封装了一个万能的导出excel工具,传什么都能导出巴黎,巴黎(1)SpringBoot 中 MybatisX 插件的简单使用教程(超详细!!)SpringBoot 在打包部署的时候打包成 jar 和 war 有什么不同?我家的超级足球迷银座蹓7000字+24张图带你彻底弄懂线程池天赋“易昺(bǐng)”,创造历史!朋友圈阴性已然清零
logo
联系我们隐私协议©2024 redian.news
Redian新闻
Redian.news刊载任何文章,不代表同意其说法或描述,仅为提供更多信息,也不构成任何建议。文章信息的合法性及真实性由其作者负责,与Redian.news及其运营公司无关。欢迎投稿,如发现稿件侵权,或作者不愿在本网发表文章,请版权拥有者通知本网处理。