ForkJoin(分支合并)
fork():分支(拆分) join():合并 是ForkJoin种两个真实的方法
什么是ForkJoin?
它可以根据程序进行调节 下面代码会举例
ForkJoin是JDK1.7之后出现的,一般用来并行执行任务,提高效率,尤其是大数据量的情况下,它是一个线程并发成多个去操作的,主要思想是把大任务拆分为若干个小任务,然后由小任务分别去执行,最后汇总结果
ForkJoin的特点:工作窃取
这个里面维护的都是双端队列(可以从两个方向自由插入队列去执行任务,而不是传统的只能从左往右)
工作窃取:比如现在有两个线程线程A和线程B,他们分别执行子任务,如果线程A还没有执行完,但是线程B已经执行完毕,此时线程B不会等待线程A执行完毕,而是会把线程A没有执行完的任务偷过来执行,从而提高效率,当然也有弊端,就是线程B执行完毕去抢线程A的资源,但是线程A也执行到这了,就会造成竞争,当然利大于弊,只有大数据量的情况下才会使用它
操作ForkJoin,如何使用它
大概步骤如下所示
1 ForkJoinPool 通过它来执行
2 新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)
3 计算类继承 RecursiveTask (extends RecursiveTask)
4 通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 ) RecursiveTask(递归任务 有返回值)
5 调用ForkJoinTask的submit方法或者execute方法去执行(提交任务) ( submit是异步的 有返回值 execute是同步的 没有返回值)
代码测试
package com.wyh.ForkJoin; /** * @program: JUC * @description: 分支合并 求和计算 * @author: 魏一鹤 * @createDate: 2022-03-07 22:06 **/ import java.util.concurrent.RecursiveTask; /** * 如何使用ForkJoin 其实就以下步骤 * 1 ForkJoinPool 通过它来执行 * 2 新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task) * 3 计算类继承 RecursiveTask extends RecursiveTask * 4 通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 ) RecursiveTask(递归任务 有返回值) **/ //继承RecursiveTask递归任务 public class ForkJoinDemo01 extends RecursiveTask<Long> { //最小值 用long数据类型要比int大 int最多到20亿 private long min; //最大值 private long max; //构造器 public ForkJoinDemo01(long min,long max) { this.min = min; this.max = max; } //临界值 超过这个值分成两个任务 1万 //它可以根据程序进行调节 private long temp=10000L; //实现递归任务的compute计算方法 @Override protected Long compute() { //如果最大值减去最小值小于临界值 走普通计算 不需要分支合并 if((max-min)<temp){ //用于计算总数的变量 Long sum=0L; //循环1亿之间的数求和 for (Long i = min; i <= max; i++) { sum+=i; } //计算总数完毕 进行返回 return sum; }//否则走分支合并 ForkJoin else{ //计算最大值和最小值的中间值 long mid = (max + min) / 2; //创建计算任务 //把一个大任务拆分为两个小任务 //第一个任务 ForkJoinDemo01 task1 = new ForkJoinDemo01(min, mid); task1.fork(); //拆分任务 把任务压入线程队列 //第二个任务 ForkJoinDemo01 task2 = new ForkJoinDemo01(mid+1,max); task2.fork(); //拆分任务 把任务压入线程队列 //合并结果 return task1.join() + task2.join(); } } }
不同的方式去求和
- 异步回调(future 准确来说是CompletableFuture)
发起这个请求后,不一定非要等待结果(不用一直阻塞等待结果)和服务器和客户端通信的ajax一样的道理
如何使用?
- 使用Future准确来说是CompletableFuture来操作
- 使用CompletableFuture的supplyAsync(有返回类型回调)或者runAsync(没有返回值回调)去异步回调任务 通过get方法阻塞获取执行结果 get需要抛异常
- 使用completableFuture的whenComplete(t,u)成功回调 t=正常的返回结果 u=如果出错就是异常信息
- 使用completableFuture的exceptionally(e) e是失败回调返回的错误信息 通过e.getMessage()来接受
代码测试
有/没有返回值的异步回调 成功回调/异步回调
package com.wyh.ForkJoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * @program: JUC * @description: ForkJoin测试 * @author: 魏一鹤 * @createDate: 2022-03-07 22:39 **/ public class ForkJoinTest { public static void main(String[] args) throws ExecutionException, InterruptedException { test1(); //sum=500000000500000000,使用时间:272 test2(); //sum=500000000500000000,使用时间:4772 test3(); //sum=500000000500000000,耗费时间:113 } //方式1 普通求和 public static void test1(){ //求和变量 long sum=0; //开始时间 long start = System.currentTimeMillis(); for (int i = 1; i <= 10_0000_0000; i++) { //求和 sum+=i; } //结束时间 long end = System.currentTimeMillis(); System.out.println("sum="+sum+",使用时间:"+(end-start)); //sum=500000000500000000,使用时间:263 } //方式2 使用ForkJoin分支合并 public static void test2() throws ExecutionException, InterruptedException { //开始时间 long start = System.currentTimeMillis(); //创建ForkJoinPool池 ForkJoinPool forkJoinPool = new ForkJoinPool(); //创建ForkJoinTask任务 ForkJoinTask<Long> task = new ForkJoinDemo01(0L, 10_0000_0000); //执行/提交 ForkJoinTask任务 //forkJoinPool.submit() 提交任务 submit是异步的 有返回值 // forkJoinPool.execute(task); 执行任务 execute是同步的 没有返回值 ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务 //获取结果 get会阻塞等待结果 需要抛出异常 Long sum = submit.get(); //结束时间 long end = System.currentTimeMillis(); System.out.println("sum="+sum+",使用时间:"+(end-start)); //sum=500000000500000000,使用时间:5278 } //方式3 并行流 public static void test3(){ //开始时间 long start = System.currentTimeMillis(); //计算交给Stream流 parallel:并行 reduce获取结果 开始值,二进制操作结果 long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum); //结束时间 long end = System.currentTimeMillis(); System.out.println("sum="+sum+",耗费时间:"+(end-start)); //sum=500000000500000000,耗费时间:113 } }
package com.wyh.async; /** * @program: JUC * @description: 异步回调 * @author: 魏一鹤 * @createDate: 2022-03-08 23:38 **/ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; /** * 和ajax是一样的 * 异步执行 * 成功回调 * 失败回调 **/ public class asyncDemo01 { public static void main(String[] args) throws ExecutionException, InterruptedException { //没有返回值的异步回调 runAsync //发起一个请求 runAsync执行异步任务 参数是一个Runnable线程 可以使用lambda表达式 // CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { //休眠2s //try { // TimeUnit.SECONDS.sleep(2); //} catch (InterruptedException e) { // e.printStackTrace(); //} //System.out.println(Thread.currentThread().getName()+"runAsync->Void"); //1111111 //ForkJoinPool.commonPool-worker-9runAsync->Void // }); // System.out.println("1111111"); //阻塞获取线程执行结果 get需要抛异常 // completableFuture.get(); //有返回值的异步回调 supplyAsync //有返回值只有两种情况 要么成功要么失败 成功返回成功数据 失败返回错误信息 CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync->String"); //return和泛型数据类型一样的结果 参数类型自定义 //故意让代码报错执行错误回调 int a=10/0; return "1024"; }); //whenComplete 执行成功的操作 System.out.println(completableFuture.whenComplete((t,u)->{ System.out.println("t:"+t); //t=正常的返回结果 1024 System.out.println("u:"+u); //如果出错就是异常信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero //exceptionally执行失败的操作 }).exceptionally((e)->{ //打印异常信息 System.out.println(e.getMessage()); //失败的返回值 return "400"; }).get()); } }
//成功回调结果
ForkJoinPool.commonPool-worker-9supplyAsync->String
t:1024
u:null
1024
//失败回调结果
ForkJoinPool.commonPool-worker-9supplyAsync->String
t:null
u:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
400