ForkJoin(分支合并)

简介: ForkJoin(分支合并)

ForkJoin(分支合并)

fork():分支(拆分)   join():合并 是ForkJoin种两个真实的方法

什么是ForkJoin?

它可以根据程序进行调节 下面代码会举例

ForkJoin是JDK1.7之后出现的,一般用来并行执行任务,提高效率,尤其是大数据量的情况下,它是一个线程并发成多个去操作的,主要思想是把大任务拆分为若干个小任务,然后由小任务分别去执行,最后汇总结果

image.png


ForkJoin的特点:工作窃取

这个里面维护的都是双端队列(可以从两个方向自由插入队列去执行任务,而不是传统的只能从左往右)

工作窃取:比如现在有两个线程线程A和线程B,他们分别执行子任务,如果线程A还没有执行完,但是线程B已经执行完毕,此时线程B不会等待线程A执行完毕,而是会把线程A没有执行完的任务偷过来执行,从而提高效率,当然也有弊端,就是线程B执行完毕去抢线程A的资源,但是线程A也执行到这了,就会造成竞争,当然利大于弊,只有大数据量的情况下才会使用它

操作ForkJoin,如何使用它

大概步骤如下所示

1 ForkJoinPool 通过它来执行

2 新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)

3 计算类继承 RecursiveTask (extends RecursiveTask)

4 通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)

5 调用ForkJoinTask的submit方法或者execute方法去执行(提交任务) ( submit是异步的  有返回值 execute是同步的 没有返回值)

代码测试


package com.wyh.ForkJoin;
/**
 * @program: JUC
 * @description: 分支合并  求和计算
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:06
 **/
import java.util.concurrent.RecursiveTask;
/**
 * 如何使用ForkJoin 其实就以下步骤
 * 1  ForkJoinPool 通过它来执行
 * 2  新建一个计算任务 通过 ForkJoinPool.execute(ForkJoinTask task)
 * 3  计算类继承 RecursiveTask  extends RecursiveTask
 * 4  通过ForkJoinTask下的 RecursiveAction(递归事件 没有返回值 )  RecursiveTask(递归任务 有返回值)
**/
//继承RecursiveTask递归任务
public class ForkJoinDemo01 extends RecursiveTask<Long> {
//最小值 用long数据类型要比int大 int最多到20亿
    private long min;
//最大值
    private long max;
//构造器
    public ForkJoinDemo01(long min,long max) {
this.min = min;
this.max = max;
    }
//临界值 超过这个值分成两个任务  1万
    //它可以根据程序进行调节
    private long temp=10000L;
//实现递归任务的compute计算方法
    @Override
protected Long compute() {
//如果最大值减去最小值小于临界值 走普通计算  不需要分支合并
        if((max-min)<temp){
//用于计算总数的变量
            Long sum=0L;
//循环1亿之间的数求和
            for (Long i = min; i <= max; i++) {
                sum+=i;
            }
//计算总数完毕 进行返回
          return sum;
        }//否则走分支合并 ForkJoin
        else{
//计算最大值和最小值的中间值
            long mid = (max + min) / 2;
//创建计算任务
            //把一个大任务拆分为两个小任务
            //第一个任务
            ForkJoinDemo01 task1 = new ForkJoinDemo01(min, mid);
            task1.fork(); //拆分任务 把任务压入线程队列
            //第二个任务
            ForkJoinDemo01 task2 = new ForkJoinDemo01(mid+1,max);
            task2.fork(); //拆分任务 把任务压入线程队列
            //合并结果
            return  task1.join() + task2.join();
        }
    }
}


不同的方式去求和


  1. 异步回调(future 准确来说是CompletableFuture)

发起这个请求后,不一定非要等待结果(不用一直阻塞等待结果)和服务器和客户端通信的ajax一样的道理

如何使用?

  1. 使用Future准确来说是CompletableFuture来操作
  2. 使用CompletableFuture的supplyAsync(有返回类型回调)或者runAsync(没有返回值回调)去异步回调任务 通过get方法阻塞获取执行结果 get需要抛异常
  3. 使用completableFuture的whenComplete(t,u)成功回调 t=正常的返回结果  u=如果出错就是异常信息
  4. 使用completableFuture的exceptionally(e) e是失败回调返回的错误信息 通过e.getMessage()来接受

代码测试  

有/没有返回值的异步回调 成功回调/异步回调


package com.wyh.ForkJoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
 * @program: JUC
 * @description: ForkJoin测试
 * @author: 魏一鹤
 * @createDate: 2022-03-07 22:39
 **/
public class ForkJoinTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
test1(); //sum=500000000500000000,使用时间:272
        test2();  //sum=500000000500000000,使用时间:4772
        test3(); //sum=500000000500000000,耗费时间:113
    }
   //方式1 普通求和
    public static void test1(){
//求和变量
        long sum=0;
//开始时间
        long start = System.currentTimeMillis();
for (int i = 1; i <= 10_0000_0000; i++) {
//求和
            sum+=i;
        }
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:263
    }
   //方式2  使用ForkJoin分支合并
    public static void test2() throws ExecutionException, InterruptedException {
//开始时间
        long start = System.currentTimeMillis();
//创建ForkJoinPool池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
//创建ForkJoinTask任务
        ForkJoinTask<Long> task = new ForkJoinDemo01(0L, 10_0000_0000);
//执行/提交 ForkJoinTask任务
        //forkJoinPool.submit()  提交任务  submit是异步的  有返回值
       // forkJoinPool.execute(task); 执行任务  execute是同步的 没有返回值
        ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务
        //获取结果 get会阻塞等待结果 需要抛出异常
        Long sum = submit.get();
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",使用时间:"+(end-start));
//sum=500000000500000000,使用时间:5278
    }
//方式3 并行流
    public static void test3(){
//开始时间
        long start = System.currentTimeMillis();
//计算交给Stream流    parallel:并行 reduce获取结果 开始值,二进制操作结果
        long sum = LongStream.rangeClosed(0, 10_0000_0000).parallel().reduce(0, Long::sum);
//结束时间
        long end = System.currentTimeMillis();
        System.out.println("sum="+sum+",耗费时间:"+(end-start));
//sum=500000000500000000,耗费时间:113
    }
}


package com.wyh.async;
/**
 * @program: JUC
 * @description: 异步回调
 * @author: 魏一鹤
 * @createDate: 2022-03-08 23:38
 **/
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
 * 和ajax是一样的
 * 异步执行
 * 成功回调
 * 失败回调
**/
public class asyncDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//没有返回值的异步回调 runAsync
        //发起一个请求  runAsync执行异步任务 参数是一个Runnable线程 可以使用lambda表达式
      //  CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            //休眠2s
            //try {
            //    TimeUnit.SECONDS.sleep(2);
            //} catch (InterruptedException e) {
            //    e.printStackTrace();
            //}
            //System.out.println(Thread.currentThread().getName()+"runAsync->Void");
            //1111111
            //ForkJoinPool.commonPool-worker-9runAsync->Void
      //  });
      //  System.out.println("1111111");
        //阻塞获取线程执行结果 get需要抛异常
      //  completableFuture.get();
        //有返回值的异步回调 supplyAsync
        //有返回值只有两种情况 要么成功要么失败 成功返回成功数据 失败返回错误信息
        CompletableFuture<String> completableFuture =  CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"supplyAsync->String");
//return和泛型数据类型一样的结果 参数类型自定义
            //故意让代码报错执行错误回调
            int a=10/0;
return "1024";
        });
//whenComplete 执行成功的操作
        System.out.println(completableFuture.whenComplete((t,u)->{
            System.out.println("t:"+t); //t=正常的返回结果 1024
            System.out.println("u:"+u);  //如果出错就是异常信息 java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
            //exceptionally执行失败的操作
        }).exceptionally((e)->{
//打印异常信息
            System.out.println(e.getMessage());
//失败的返回值
            return "400";
        }).get());
    }
}


//成功回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:1024

u:null

1024

//失败回调结果

ForkJoinPool.commonPool-worker-9supplyAsync->String

t:null

u:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

java.lang.ArithmeticException: / by zero

400

目录
相关文章
|
7月前
|
Java
java线程之分支合并框架
java线程之分支合并框架
|
6月前
|
存储 Java 索引
(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析下篇
在《(十二)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇》中,我们曾初步了解了ForkJoin分支合并框架的使用,也分析框架的成员构成以及任务提交和创建工作的原理实现,在本篇则会对框架的任务执行、任务扫描、线程挂起、结果合并以及任务窃取的源码实现进行分析。
|
6月前
|
存储 监控 Java
(十一)彻悟并发之JUC分治思想产物-ForkJoin分支合并框架原理剖析上篇
在上篇文章《深入理解并发之Java线程池、工作原理、复用原理及源码分析》中,曾详细谈到了Java的线程池框架。在其中也说到了JDK提供的四种原生线程池以及自定义线程池,而本文则再来详细谈谈JDK1.7中新推出的线程池:ForkJoinPool。
|
8月前
|
存储 安全 Java
Java线程池ThreadPoolExcutor源码解读详解08-阻塞队列之LinkedBlockingDeque
**摘要:** 本文分析了Java中的LinkedBlockingDeque,它是一个基于链表实现的双端阻塞队列,具有并发安全性。LinkedBlockingDeque可以作为有界队列使用,容量由构造函数指定,默认为Integer.MAX_VALUE。队列操作包括在头部和尾部的插入与删除,这些操作由锁和Condition来保证线程安全。例如,`linkFirst()`和`linkLast()`用于在队首和队尾插入元素,而`unlinkFirst()`和`unlinkLast()`则用于删除首尾元素。队列的插入和删除方法根据队列是否满或空,可能会阻塞或唤醒等待的线程,这些操作通过`notFul
329 5
|
8月前
|
并行计算 安全 Java
CyclicBarrier(循环屏障)源码解读与使用
CyclicBarrier(循环屏障)源码解读与使用
|
开发工具 git
一图弄懂Git rebase
两种合并分支的方式:merge,rebase With the rebase command, you can take all the changes that were committed on one branch and replay them on a different branch
145 0
一图弄懂Git rebase
|
安全 开发工具 git
如何理解git rebase?
如何理解git rebase?
如何理解git rebase?
|
存储 C语言
关于分支语句这件事
分类👏 C语言是一门程序化的语言,无非就三种结构:顺序结构,选择结构和循环结构,而语句是指由分号隔开的语句。C语言中体现为分支语句和循环语句。
关于分支语句这件事
|
算法 开发工具 git
|
存储 缓存 Java
90%的人(包括我)都以为会用ThreadPoolExecutor了,看了这10张图再说吧!
90%的人(包括我)都以为会用ThreadPoolExecutor了,看了这10张图再说吧!
208 0
90%的人(包括我)都以为会用ThreadPoolExecutor了,看了这10张图再说吧!