SpringBoot 使用线程池如何控制主线程和子线程的事务

简介: SpringBoot 使用线程池如何控制主线程和子线程的事务


一、使用场景

数据库有两张表 t_persont_school 如下:前端传来10000条person数据要插入到t_person,同时要删除t_school表中id为1的数据(为提高效率采用线程池做)

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

二、思路

1、要保证主线程和子线程使用的同一个sqlSession

2、手动控制提交和回滚

3、将10000条数据均分成10份,每份1000条,创建10个任务,放入线程池执行!

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

三、代码及注释如下:

1、核心业务代码

@Service
public class PersonServiceImpl extends ServiceImpl<PersonMapper, Person> implements IPersonService {
    @Autowired
    private SqlSessionTemplate sqlSessionTemplate;
    @Autowired
    private SchoolMapper schoolMapper;
    private ArrayBlockingQueue queue=new ArrayBlockingQueue(8,true);
    private ThreadPoolExecutor.CallerRunsPolicy policy=new ThreadPoolExecutor.CallerRunsPolicy();
    //1、创建核心线程为10的线程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(10,15,10, TimeUnit.SECONDS
            ,queue,policy);
    @Override
    public int insertPerson(Person person) {
        return this.baseMapper.insert(person);
    }
    @Override
    @Transactional
    public void inserPersonBatch(List<Person> list) throws SQLException {
        //2、根据sqlSessionTemplate获取SqlSession工厂
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        SqlSession sqlSession = sqlSessionFactory.openSession();
        //3、获取Connection来手动控制事务
        Connection connection = sqlSession.getConnection();
        try{
            //4、设置手动提交
            connection.setAutoCommit(false);
            //5、获取PersonMapper(此处是由于无法通过this.baseMapper调用自定义的saveBatch方法)
            PersonMapper mapper = sqlSession.getMapper(PersonMapper.class);
            //6、主线程去删除t_school表中id为1的数据
            schoolMapper.deleteById("1");
            //7、将传入List中的10000个数据按1000一组均分成10组
            List<List<Person>> lists = ListUtils.averageAssign(list,1000);
            //8、新建任务列表
            List<Callable<Integer>> callableList = new ArrayList<>();
            //9、根据均分的5组数据分别新建5个Callable任务
            for(int i = 0; i < lists.size(); i++){
                List<Person> insertList = lists.get(i);
                Callable<Integer> callable = new Callable<Integer>() {
                    @Override
                    public Integer call() throws Exception {
                        int n = 0;
                        try{
                            n = mapper.saveBatch(insertList);
                        }catch (Exception e){
                            //插入失败返回0
                            return n;
                        }
                        //插入成功返回成功提交数
                        return n;
                    }
                };
                callableList.add(callable);
            }
            //10、任务放入线程池开始执行
            List<Future<Integer>> futures = executor.invokeAll(callableList);
            //11、对比每个任务的返回值 <= 0 代表执行失败
            for(Future<Integer> future : futures){
                if(future.get() <= 0){
                    //12、只要有一组任务失败回滚整个connection
                    connection.rollback();
                    return;
                }
            }
            //13、主线程和子线程都执行成功 直接提交
            connection.commit();
            System.out.println("添加成功!");
        }catch (Exception e){
            //14、主线程报错回滚
            connection.rollback();
            log.error(e.toString());
            throw new SQLException("出现异常!");
        }
        return;
    }
}

2、PersonMapper中自定义批量插入

<insert id="saveBatch" parameterType="list">
    insert into t_person(id,name,age,addr,classes,school_id)
    values
        <foreach collection="list" index="index" item="item" separator=",">
            (
             #{item.id},
            #{item.name},
            #{item.age},
            #{item.addr},
            #{item.classes},
            #{item.schoolId}
            )
        </foreach>
</insert>

3、均分List工具类

public class ListUtils {
    public static <T> List<List<T>> averageAssign(List<T> source, int limit) {
        if (null == source || source.isEmpty()) {
            return Collections.emptyList();
        }
        List<List<T>> result = new ArrayList<>();
        int listCount = (source.size() - 1) / limit + 1;
        int remaider = source.size() % listCount; // (先计算出余数)
        int number = source.size() / listCount; // 然后是商
        int offset = 0;// 偏移量
        for (int i = 0; i < listCount; i++) {
            List<T> value;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }
}

四、测试验证:

controller层如下:传入10000条数据

@GetMapping("/addBatch")
public void addBatch() {
    List<Person> list = new ArrayList<>();
    for(int i = 1; i <= 10000; i++){
        Person p = new Person();
        p.setId(i);
        p.setName("张三" + i);
        p.setAge(i);
        p.setAddr("重庆");
        p.setClasses("一班");
        p.setSchoolId(i);
        list.add(p);
    }
    try{
        this.iPersonService.inserPersonBatch(list);
    }catch (Exception e){
        e.printStackTrace();
    }
}

1、情况1:子线程中有一个执行失败

t_person表主键唯一  10000条Person数据id按1—10000设置

如图t_person表中已经有一条id为201的数据 所以线程池中有一个任务执行会失败!

我们打断点来看:此时已经分配好10个任务

如下图:插入id为201的数据时失败,线程池第一个任务执行失败返回0,其余全部成功返回1000

执行rollback回滚

执行完毕观察数据库:

t_school表数据没有被删,

t_person表数据也没有变化

2、情况2、删除 t_person表中id为201的数据重新插入

此时10个任务全部执行成功:

执行commit

执行完毕观察数据库:

t_school表数据已被删除

t_person表中10000条数据也成功插入:

3、情况3:主线程报错就不演示了

以上测试成功!



相关文章
|
并行计算 Java 数据处理
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
SpringBoot高级并发实践:自定义线程池与@Async异步调用深度解析
960 0
|
2月前
|
设计模式 缓存 安全
【JUC】(6)带你了解共享模型之 享元和不可变 模型并初步带你了解并发工具 线程池Pool,文章内还有饥饿问题、设计模式之工作线程的解决于实现
JUC专栏第六篇,本文带你了解两个共享模型:享元和不可变 模型,并初步带你了解并发工具 线程池Pool,文章中还有解决饥饿问题、设计模式之工作线程的实现
193 2
|
10月前
|
存储 监控 Java
【Java并发】【线程池】带你从0-1入门线程池
欢迎来到我的技术博客!我是一名热爱编程的开发者,梦想是编写高端CRUD应用。2025年我正在沉淀中,博客更新速度加快,期待与你一起成长。 线程池是一种复用线程资源的机制,通过预先创建一定数量的线程并管理其生命周期,避免频繁创建/销毁线程带来的性能开销。它解决了线程创建成本高、资源耗尽风险、响应速度慢和任务执行缺乏管理等问题。
484 60
【Java并发】【线程池】带你从0-1入门线程池
|
11月前
|
监控 Kubernetes Java
阿里面试:5000qps访问一个500ms的接口,如何设计线程池的核心线程数、最大线程数? 需要多少台机器?
本文由40岁老架构师尼恩撰写,针对一线互联网企业的高频面试题“如何确定系统的最佳线程数”进行系统化梳理。文章详细介绍了线程池设计的三个核心步骤:理论预估、压测验证和监控调整,并结合实际案例(5000qps、500ms响应时间、4核8G机器)给出具体参数设置建议。此外,还提供了《尼恩Java面试宝典PDF》等资源,帮助读者提升技术能力,顺利通过大厂面试。关注【技术自由圈】公众号,回复“领电子书”获取更多学习资料。
|
8月前
|
Java
线程池是什么?线程池在实际工作中的应用
总的来说,线程池是一种有效的多线程处理方式,它可以提高系统的性能和稳定性。在实际工作中,我们需要根据任务的特性和系统的硬件能力来合理设置线程池的大小,以达到最佳的效果。
252 18
|
Prometheus 监控 Cloud Native
JAVA线程池监控以及动态调整线程池
【10月更文挑战第22天】在 Java 中,线程池的监控和动态调整是非常重要的,它可以帮助我们更好地管理系统资源,提高应用的性能和稳定性。
733 64
|
10月前
|
安全 Java C#
Unity多线程使用(线程池)
在C#中使用线程池需引用`System.Threading`。创建单个线程时,务必在Unity程序停止前关闭线程(如使用`Thread.Abort()`),否则可能导致崩溃。示例代码展示了如何创建和管理线程,确保在线程中执行任务并在主线程中处理结果。完整代码包括线程池队列、主线程检查及线程安全的操作队列管理,确保多线程操作的稳定性和安全性。
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
317 38
|
算法 NoSQL Java
Springboot3新特性:GraalVM Native Image Support和虚拟线程(从入门到精通)
这篇文章介绍了Spring Boot 3中GraalVM Native Image Support的新特性,提供了将Spring Boot Web项目转换为可执行文件的步骤,并探讨了虚拟线程在Spring Boot中的使用,包括如何配置和启动虚拟线程支持。
1003 9
Springboot3新特性:GraalVM Native Image Support和虚拟线程(从入门到精通)
|
Java
.如何根据 CPU 核心数设计线程池线程数量
IO 密集型:核心数*2 计算密集型: 核心数+1 为什么加 1?即使当计算密集型的线程偶尔由于缺失故障或者其他原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。
408 4