JAVA线程&线程池&异步编排

简介: JAVA线程&线程池&异步编排


异步和线程池

初始化线程的方式

继承Thread

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 继承Thread
        System.out.println("主方法1.。start。。。");
        Thread01 thread01 = new Thread01();
        thread01.start();
        System.out.println("主方法1.。end。。。");
    }
/**
 * 继承Thread 重新run方法
 */
public static class Thread01 extends Thread{
    @Override
    public void run() {
        System.out.println("当前线程:"+ Thread.currentThread().getId());
        System.out.println("i = " + 10 / 2);
    }
}

输出结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Runnable接口

主线程

public static void main(String[] args) throws ExecutionException, InterruptedException {
        //实现Runnable接口的方式
        System.out.println("主方法1.。start。。。");
        new Thread(new Runnable01()).start();
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Runnable接口的方式
     */
    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
        }
    }

结果

主方法1.。start。。。
当前线程:12
i = 5
主方法1.。end。。。
实现Callable接口 + FutureTask (可以拿到返回值,可以处理异常)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        FutureTask futureTask = new FutureTask<Integer>(new Callable01());
        new Thread(futureTask).start();
        //阻塞等待整个线程完成,获取返回结果
        Integer result = (Integer) futureTask.get();
        System.out.println("异步Callable执行后的返回结果是:"+ result);
        System.out.println("主方法1.。end。。。");
    }
/**
     * 实现Callable接口
     */
    public static class Callable01 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            System.out.println("i = " + 10 / 2);
            return 10 / 2;
        }
    }

输出

主方法1.。start。。。
当前线程:12
i = 5
异步Callable执行后的返回结果是:5
主方法1.。end。。。
线程池

一般在业务中以上三个方法都不推荐使用,一般是给线程池提交任务就可以了。

主方法

public static ExecutorService executor = Executors.newFixedThreadPool(10);
public static void main(String[] args) throws ExecutionException, InterruptedException {
        //线程池的使用
        System.out.println("主方法1.。start。。。");
        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<?> submit = service.submit(new Runnable01());
        submit.get();
        System.out.println("主方法1.。end。。。");
    }

可以向线程池中直接放入Runnable 或者Callable的任务即可

详解
/**
 * 线程池七大参数:
 *  corePoolSize 初始化的线程数量,等待接收异步请求
 *  maximumPoolSize 最大线程数量,可以控制资源
 *  keepAliveTime  存活时间,如果当前线程数量大于核心数量,那么就会释放空闲的线程,只要线程空闲大于执行的空闲时间
 *  unit  执行线程存活的时间单位
 *  BlockingQueue<Runnable> workQueue 阻塞队列,如果任务有很多,就会将目前多的任务放在队列中,只要有线程空闲
 *  就会去队列中取出新的任务继续执行
 *  threadFactory 线程的创建工厂
 *  RejectedExecutionHandler handler 如果队列满了,按照我们的策略拒绝执行的任务
 *
 *  工作顺序:
 *  1)当线程进来后会直接分配给核心线程中,也就是corePoolSize初始化的线程数
 *  2)当corePoolSize满了以后,默认将线程放到workQueue中等待核心线程调用
 *  3)如果workQueue满了就会再启动新的线程,但是最大数量不能超过maximumPoolSize
 *  4)maximumPoolSize满了如果还有新的线程任务就会调用 handler丢弃策略,
 *  5)如果maximumPoolSize中任务都执行完毕,那么就会等待keepAliveTime指定的空闲
 *  时间后进行销毁。
 *
 */
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        10,
        100000,
        3,
        TimeUnit.MINUTES,
        new LinkedBlockingDeque<>(100000), //默认是Integer的最大值所以需要修改
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy()
);
threadPoolExecutor.execute(new Runnable01());
总结

在实际的开发过程中要使用线程池进行异步任务的完成,因为用Thread Runnable Callable这些都不能做到对线程的管理,很容易就导致资源浪费和消耗。

使用线程池的好处:

  • 降低资源的消耗
  • 提高响应速度
  • 提高线程的可管理性

CompletableFuture异步编排

需求场景:面对复杂关系的异步任务进行编排处理

使用

CompletableFuture.runAsync() 这个异步方法没有返回值,所以给的是Void类型

CompletableFuture<Void> future = CompletableFuture.runAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
        },executor);

CompletableFuture.supplyAsync()这个异步方法有返回值,返回值是Integer因此可以调用get方法,等异步任务执行结束后获取到返回值即可。

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("i = " + i);
            return i;
        },executor);
Integer i = future.get();

计算完成时使用函数回调whenComplete

还可以使用链式编程继续在后面追加方法

public class ThreadTest01 {
    //创建一个线程池
    public static ExecutorService executor = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("主方法1.。start。。。");
        //调用有返回值的异步
            CompletableFuture<Integer> future = CompletableFuture. (()->{
            System.out.println("当前线程:"+ Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("i = " + i);
            return i;
        },executor).whenComplete( (res,exc) -> {
                System.out.println("异步线程执行完毕返回的结果是:"+ res + ";抛出的异常是:" + exc);
            }).exceptionally( (exc) -> {
                //如果发生异常那么exceptionally可以直接得到异常信息还可以返回一个指定的异常信息
                // 这个返回的异常信息可以让future.get();这里拿到
                return 404;
            });
        // get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
        Integer i = future.get();
        System.out.println("主方法1.。end。。。"+ i);
    }

计算完成时使用函数回调handle,handle和handleAsync可以对异步返回结果和异常进行处理

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->{
    System.out.println("当前线程:"+ Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("i = " + i);
    return i;
},executor).handle( (res,exc) -> {
    if (res != null){
        return res*2;
    }
    if (exc !=null){
        return 404;
    }
    return 0;
});
// get一方面可以等待异步线程完成后再执行其他业务,还可以获取异步任务的返回结果
Integer i = future.get();
System.out.println("主方法1.。end。。。"+ i);

输出

主方法1.。start。。。
当前线程:12
主方法1.。end。。。404
线程串行化方法

thenApply方法,当一个线程依赖另一个线程的时候,获取上一个任务的返回值并返回当前任务的返回值。

thenAccepte 消费上一个线程的处理结果,无返回结果

thenRun 当上一个线程处理完成后,执行thenRun的后续操作

带有Async都是异步的

获取返回值使用future.get()

两任务组合

thenCombine 两个任务都执行,第三个任务可以获取上两个任务的返回值,并处理并返回新的结果

thenAccpteBoth 两个任务都执行,并且第三个任务可以获取上两个任务的返回值,但是不能返回新的处理结果

thenRunBoth 两个任务执行后第三个任务开始执行,并且只能传入Runnable不能返回结果

runAfterEither 两个任务中任何一个任务执行完成后都立即执行第三个任务,因为第三个任务传入的是Runnable接口所以不能有返回值

acceptEither 可以接收两个任务中率先完成的任务的返回值并处理,但是第三个任务不能有返回值

applyToEither 可以接收两个任务中率先完成的任务的返回值并处理,第三个任务处理后可以有返回值

多任务组合

allOf 当所有的异步任务都执行完毕后才能继续往下执行 (消耗最少时间:用时最长的那个任务)

anyOf 当任何一个异步任务执行完毕后就会继续往下执行(消耗最少时间:用时最快的那个任务)

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future01");
            return "future01.jpg";
        }, executor);
        CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future02");
            return  "商品属性:黑色256";
        }, executor);
        CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
            System.out.println("future03");
            return "商品分类信息";
        }, executor);
        CompletableFuture allOf = CompletableFuture.allOf(future01,future02,future03);
        //CompletableFuture.anyOf(future01,future02,future03);
        allOf.get();
        System.out.println("end" + future01.get() + future02.get() + future03.get());

最佳实战

@Override
public SkuItemVo item(Long skuId) {
    SkuItemVo skuItemVo = new SkuItemVo();
    CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
        //1、sku基本信息获取
        SkuInfoEntity skuInfoEntity = getById(skuId);
        skuItemVo.setInfo(skuInfoEntity);
        return skuInfoEntity;
    }, executor);
    CompletableFuture<Void> ImageFuture = infoFuture.thenRunAsync(() -> {
        //2、sku图片信息
        List<SkuImagesEntity> skuImagesEntities =
                skuImagesService.list(
                        new LambdaQueryWrapper<SkuImagesEntity>().eq(SkuImagesEntity::getSkuId, skuId));
        skuItemVo.setImages(skuImagesEntities);
    }, executor);
    CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //3、获取spu的销售属性组合
        List<SkuItemVo.SkuItemSaleAttrVo> saleAttr = skuSaleAttrValueService.getSaleAttrBySpuId(spuId);
        skuItemVo.setSaleAttr(saleAttr);
    }, executor);
    CompletableFuture<Void> spuInfoDescEntityFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        //4、获取spu的介绍
        SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(spuId);
        skuItemVo.setDesc(spuInfoDescEntity);
    });
    CompletableFuture<Void> voidCompletableFuture = infoFuture.thenAcceptAsync((skuInfoEntity) -> {
        Long spuId = skuInfoEntity.getSpuId();
        Long catalogId = skuInfoEntity.getCatalogId();
        //5、获取spu规格参数信息
        List<SkuItemVo.SpuItemAttrGroupVo> groupAttrs = attrGroupService.getGroupAttrsBySpuCataId(spuId, catalogId);
        skuItemVo.setGroupAttrs(groupAttrs);
    });
    //等所有任务都做完然后再返回
    CompletableFuture<Void> lastFuture =
            CompletableFuture.allOf(ImageFuture, saleAttrFuture, spuInfoDescEntityFuture, voidCompletableFuture);
    try {
        lastFuture.get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return skuItemVo;
}
相关文章
|
10天前
|
存储 监控 小程序
Java中的线程池优化实践####
本文深入探讨了Java中线程池的工作原理,分析了常见的线程池类型及其适用场景,并通过实际案例展示了如何根据应用需求进行线程池的优化配置。文章首先介绍了线程池的基本概念和核心参数,随后详细阐述了几种常见的线程池实现(如FixedThreadPool、CachedThreadPool、ScheduledThreadPool等)的特点及使用场景。接着,通过一个电商系统订单处理的实际案例,分析了线程池参数设置不当导致的性能问题,并提出了相应的优化策略。最终,总结了线程池优化的最佳实践,旨在帮助开发者更好地利用Java线程池提升应用性能和稳定性。 ####
|
6天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
6天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
23 3
|
7天前
|
监控 Java 开发者
深入理解Java中的线程池实现原理及其性能优化####
本文旨在揭示Java中线程池的核心工作机制,通过剖析其背后的设计思想与实现细节,为读者提供一份详尽的线程池性能优化指南。不同于传统的技术教程,本文将采用一种互动式探索的方式,带领大家从理论到实践,逐步揭开线程池高效管理线程资源的奥秘。无论你是Java并发编程的初学者,还是寻求性能调优技巧的资深开发者,都能在本文中找到有价值的内容。 ####
|
12天前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
37 5
|
10天前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
19 2
|
10天前
|
监控 Java 开发者
Java线程管理:守护线程与本地线程的深入剖析
在Java编程语言中,线程是程序执行的最小单元,它们可以并行执行以提高程序的效率和响应性。Java提供了两种特殊的线程类型:守护线程和本地线程。本文将深入探讨这两种线程的区别,并探讨它们在实际开发中的应用。
15 1
|
12天前
|
安全 Java 开发者
Java中的多线程编程:从基础到实践
本文深入探讨了Java多线程编程的核心概念和实践技巧,旨在帮助读者理解多线程的工作原理,掌握线程的创建、管理和同步机制。通过具体示例和最佳实践,本文展示了如何在Java应用中有效地利用多线程技术,提高程序性能和响应速度。
46 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
51 1
C++ 多线程之初识多线程
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
23 3