Java并发编程异步操作Future和FutureTask

简介: 生活是一个洗礼自己的过程,这个洗礼并不是传统意义上的洗礼,传统意义上的洗礼通常认为这个人的思想得到洗礼,灵魂得到洗礼,十分的清新脱俗,不世故,不圆滑,而现实的洗礼实则是让一个人褪去幼稚,褪去无知,让你变得点头哈腰,圆滑世故,我们都是动物,需要物质满足,更需要欲望填补,所以,变成自己小时候唾骂的对象也是可以理解,不过这是一个选择,你可以进行选择,只是在物欲横流的时代,多数人没有这种选择的权力!

码农在囧途


生活是一个洗礼自己的过程,这个洗礼并不是传统意义上的洗礼,传统意义上的洗礼通常认为这个人的思想得到洗礼,灵魂得到洗礼,十分的清新脱俗,不世故,不圆滑,而现实的洗礼实则是让一个人褪去幼稚,褪去无知,让你变得点头哈腰,圆滑世故,我们都是动物,需要物质满足,更需要欲望填补,所以,变成自己小时候唾骂的对象也是可以理解,不过这是一个选择,你可以进行选择,只是在物欲横流的时代,多数人没有这种选择的权力!


Future和FutureTask


Future是一个接口,FutureTask是一个类,实现RunnableFuture接口,RunnableFuture接口继承Future接口。


Future接口的方法


V get() :获取异步执行的结果,如果没有返回结果,此方法会阻塞直到异步计算完成。


V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。


boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。


boolean isCancelled() :如果任务完成前被取消,则返回true。


boolean cancel(boolean mayInterruptRunning) :如果任务还没开始,执行cancel(...)方法将返回false;如果任务已经启动, 执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,如果停止成功,返回true;当任务已经启动, 执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时返回false;当任务已经完成, 执行cancel(...)方法将返回false。mayInterruptRunning参数表示是否中断执行中的线程。


Future是一个接口,因此我们不能直接创建对象,需要配合线程池一起使用,FutureTask我们可以直接创建对象。


Future的使用


Future代表异步执行的结果,也就是说异步执行完毕后,结果保存在Future里, 我们在使用线程池submit()时需要传入Callable接口,线程池的返回值为一个Future,而Future则保存了执行的结果 ,可通过Futureget()方法取出结果,如果线程池使用的是execute()方法,则传入的是Runnable接口无返回值。


如下我们使用Future模拟下单操作,用户下单后保存订单信息扣减库存增加积分发送短信通知,这么多个任务如果使用同步执行,那么效率就会 比较低,用户体验不好,一般我们会采用消息队列来达到异步的效果,今天我们就不用消息队列,而是使用Future接口来实现异步。


public class FutureTest {
    final static ExecutorService threadPool = Executors.newCachedThreadPool();
    //保存订单任务
    public static Future<R> saveOrderTask(OrderInfo orderInfo) {
        return threadPool.submit(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return saveOrder(orderInfo);
            }
        });
    }
    //扣减库存任务
    public static Future<R> decreaseStockTask(OrderInfo orderInfo) {
        return threadPool.submit(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return decreaseStockByCommodityId(orderInfo);
            }
        });
    }
    //增加积分任务
    public static Future<R> increaseIntegralTask(OrderInfo orderInfo) {
        return threadPool.submit(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return increaseIntegralByUserId(orderInfo);
            }
        });
    }
    public static void sendMsgToPhone(OrderInfo orderInfo) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("用户【" + orderInfo.getUserId() + "】,你已下单成功~~~~~~~~");
            }
        });
    }
    //增加积分rpc接口
    public static R increaseIntegralByUserId(OrderInfo orderInfo) {
        System.out.println("增加积分~~~~~~~~");
        integralService.increaseIntegralByUserId(orderInfo.getUserId(),20);
        return new R(200, "增加积分成功", null);
    }
    //扣减库存rpc接口
    public static R decreaseStockByCommodityId(OrderInfo orderInfo) {
        System.out.println("扣减库存~~~~~~~~");
        stockService.decreaseStockByCommodityId(orderInfo.getCommodityId());
        return new R(200, "扣减库存成功", null);
    }
    //保存订单rpc接口
    public static R saveOrder(OrderInfo orderInfo) throws InterruptedException {
        System.out.println("保存订单~~~~~~~~");
        Thread.sleep(2000);
        orderService.insert(orderInfo);
        return new R(200, "保存订单成功", null);
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        OrderInfo orderInfo = new OrderInfo().setId("123455").setUserId("111111").setCommodityId("123321");
        Future<R> orderTask = saveOrderTask(orderInfo);
        Future<R> stockTask = decreaseStockTask(orderInfo);
        Future<R> integralTask = increaseIntegralTask(orderInfo);
        sendMsgToPhone(orderInfo);
        if (orderTask.get().getCode() == 200 && orderTask.isDone()) 
            System.out.println(orderTask.get().getMsg());
        if (stockTask.get().getCode() == 200 && stockTask.isDone()) 
            System.out.println(stockTask.get().getMsg());
        if (integralTask.get().getCode() == 200 && integralTask.isDone()) 
            System.out.println(integralTask.get().getMsg());
        threadPool.shutdownNow();
    }
}


输出


保存订单~~~~~~~~
扣减库存~~~~~~~~
增加积分~~~~~~~~
用户【111111】,你已下单成功~~~~~~~~
保存订单成功
扣减库存成功
增加积分成功


我们在保存订单接口模拟处理业务操作,花费了2s,从输出结果可以看出,其他rpc接口并没有在保存订单时而阻塞,而是同时执行,就达到了异步的效果。


不过我们发现了一个问题,那就是异步返回结果被阻塞了,我明明我扣减库存和增加积分接口很快就返回,但是从输出中却发现扣减库存和增加积分在保存 订单后输出,由此我们可看出Future会阻塞返回结果。


上面我们发送短信到用户手机并没有获取返回结果,所以没有使用Future,使用线程池我们就没有使用Callable接口,而是使用Runnable接口, 方法就是execute(),而不是submit()


execute()和submit()区别


1.execute无返回值,这样就无法知道任务是否执行成功,而submit有返回值。 2.execute抛出异常后无法处理,不能捕捉异常,而submit可以捕获异常;


FutureTask的使用


FutureTaskFuture接口的实现类,我们可以直接创建一个FutureTask对象,下面我们对上面的下单流程就行改造,使用FutureTask 来实现。


/**
 * @author 刘牌
 * @date 2022/3/2617:34
 */
public class PlaceOrderFutureTaskTest {
    final static ExecutorService threadPool = Executors.newCachedThreadPool();
    //保存订单任务
    public static FutureTask<R> saveOrderTask(OrderInfo orderInfo){
        return new FutureTask<>(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return saveOrder(orderInfo);
            }
        });
    }
    //扣减库存任务
    public static FutureTask<R> decreaseStockTask(OrderInfo orderInfo){
        return new FutureTask<>(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return decreaseStockByCommodityId(orderInfo);
            }
        });
    }
    //增加积分任务
    public static FutureTask<R> increaseIntegralTask(OrderInfo orderInfo){
        return new FutureTask<>(new Callable<R>() {
            @Override
            public R call() throws Exception {
                return increaseIntegralByUserId(orderInfo);
            }
        });
    }
    public static void sendMsgToPhone(OrderInfo orderInfo){
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("用户【"+orderInfo.getUserId()+"】,你已下单成功~~~~~~~~");
            }
        });
    }
    //增加积分rpc接口
    public static R increaseIntegralByUserId(OrderInfo orderInfo){
        System.out.println("增加积分~~~~~~~~");
        integralService.increaseIntegralByUserId(orderInfo.getUserId(),20);
        return new R(200,"增加积分成功",null);
    }
    //扣减库存rpc接口
    public static R decreaseStockByCommodityId(OrderInfo orderInfo){
        System.out.println("扣减库存~~~~~~~~");
        stockService.decreaseStockByCommodityId(orderInfo.getCommodityId());
        return new R(200,"扣减库存成功",null);
    }
    //保存订单rpc接口
    public static R saveOrder(OrderInfo orderInfo) throws InterruptedException {
        System.out.println("保存订单~~~~~~~~");
        Thread.sleep(2000);
        orderService.insert(orderInfo);
        return new R(200,"保存订单成功",null);
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        OrderInfo orderInfo = new OrderInfo().setId("123455").setUserId("111111").setCommodityId("123321");
        FutureTask<R> orderTask = saveOrderTask(orderInfo);
        FutureTask<R> stockTask = decreaseStockTask(orderInfo);
        FutureTask<R> integralTask = increaseIntegralTask(orderInfo);
        threadPool.submit(orderTask);
        threadPool.submit(stockTask);
        threadPool.submit(integralTask);
        sendMsgToPhone(orderInfo);
        if (orderTask.get().getCode() == 200 && orderTask.isDone()) 
            System.out.println(orderTask.get().getMsg());
        if (stockTask.get().getCode() == 200 && stockTask.isDone()) 
            System.out.println(stockTask.get().getMsg());
        if (integralTask.get().getCode() == 200 && integralTask.isDone()) 
            System.out.println(integralTask.get().getMsg());
        threadPool.shutdownNow();
    }
}


输出


保存订单~~~~~~~~
扣减库存~~~~~~~~
增加积分~~~~~~~~
用户【111111】,你已下单成功~~~~~~~~
保存订单成功
扣减库存成功
增加积分成功


额~~~,从代码中我们看出其实没啥区别,就是一个接口和实现类的不同写法而已,从输入也可以看出和上面的Future一样,由此可知FutureTask获取结果也是 阻塞的。


从上面的流程中可以看出,FutureFutureTask能够实现异步,但是获取结果却是同步的,这缺陷也是显而易见,如果遇到耗时的任务,那么获取返回值的时候 其他任务就会被阻塞,只能排队慢慢来,在高并发的场景下不适合,那有没有解决方案呢,肯定有的,那就是CompletableFuture,我们后面继续介绍,本章我们 就不对其进行介绍。


今天的分享就到这里,感谢你的观看,我们下期见。

目录
相关文章
|
13天前
|
存储 缓存 Java
Java 并发编程——volatile 关键字解析
本文介绍了Java线程中的`volatile`关键字及其与`synchronized`锁的区别。`volatile`保证了变量的可见性和一定的有序性,但不能保证原子性。它通过内存屏障实现,避免指令重排序,确保线程间数据一致。相比`synchronized`,`volatile`性能更优,适用于简单状态标记和某些特定场景,如单例模式中的双重检查锁定。文中还解释了Java内存模型的基本概念,包括主内存、工作内存及并发编程中的原子性、可见性和有序性。
Java 并发编程——volatile 关键字解析
|
17天前
|
算法 Java 调度
java并发编程中Monitor里的waitSet和EntryList都是做什么的
在Java并发编程中,Monitor内部包含两个重要队列:等待集(Wait Set)和入口列表(Entry List)。Wait Set用于线程的条件等待和协作,线程调用`wait()`后进入此集合,通过`notify()`或`notifyAll()`唤醒。Entry List则管理锁的竞争,未能获取锁的线程在此排队,等待锁释放后重新竞争。理解两者区别有助于设计高效的多线程程序。 - **Wait Set**:线程调用`wait()`后进入,等待条件满足被唤醒,需重新竞争锁。 - **Entry List**:多个线程竞争锁时,未获锁的线程在此排队,等待锁释放后获取锁继续执行。
50 12
|
14天前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
96 2
|
30天前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####
|
30天前
|
Java 调度
Java中的多线程编程与并发控制
本文深入探讨了Java编程语言中多线程编程的基础知识和并发控制机制。文章首先介绍了多线程的基本概念,包括线程的定义、生命周期以及在Java中创建和管理线程的方法。接着,详细讲解了Java提供的同步机制,如synchronized关键字、wait()和notify()方法等,以及如何通过这些机制实现线程间的协调与通信。最后,本文还讨论了一些常见的并发问题,例如死锁、竞态条件等,并提供了相应的解决策略。
51 3
|
Java API UED
Java 并发专题 :FutureTask 实现预加载数据 在线看电子书、浏览器浏览网页等
转自:http://blog.csdn.net/lmj623565791/article/details/26817403  继续并发专题~ FutureTask 有点类似Runnable,都可以通过Thread来启动,不过FutureTask可以返回执行完毕的数据,并且FutureTask的get方法支持阻塞。 由于:FutureTask可以返回执行完毕的数据,
1105 0
|
11天前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
13天前
|
安全 Java Kotlin
Java多线程——synchronized、volatile 保障可见性
Java多线程中,`synchronized` 和 `volatile` 关键字用于保障可见性。`synchronized` 保证原子性、可见性和有序性,通过锁机制确保线程安全;`volatile` 仅保证可见性和有序性,不保证原子性。代码示例展示了如何使用 `synchronized` 和 `volatile` 解决主线程无法感知子线程修改共享变量的问题。总结:`volatile` 确保不同线程对共享变量操作的可见性,使一个线程修改后,其他线程能立即看到最新值。
|
13天前
|
消息中间件 缓存 安全
Java多线程是什么
Java多线程简介:本文介绍了Java中常见的线程池类型,包括`newCachedThreadPool`(适用于短期异步任务)、`newFixedThreadPool`(适用于固定数量的长期任务)、`newScheduledThreadPool`(支持定时和周期性任务)以及`newSingleThreadExecutor`(保证任务顺序执行)。同时,文章还讲解了Java中的锁机制,如`synchronized`关键字、CAS操作及其实现方式,并详细描述了可重入锁`ReentrantLock`和读写锁`ReadWriteLock`的工作原理与应用场景。
|
14天前
|
安全 Java 编译器
深入理解Java中synchronized三种使用方式:助您写出线程安全的代码
`synchronized` 是 Java 中的关键字,用于实现线程同步,确保多个线程互斥访问共享资源。它通过内置的监视器锁机制,防止多个线程同时执行被 `synchronized` 修饰的方法或代码块。`synchronized` 可以修饰非静态方法、静态方法和代码块,分别锁定实例对象、类对象或指定的对象。其底层原理基于 JVM 的指令和对象的监视器,JDK 1.6 后引入了偏向锁、轻量级锁等优化措施,提高了性能。
37 3