《深入探索Java并发编程&从锁到并发工具的深入解析》(下)

本文涉及的产品
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 《深入探索Java并发编程&从锁到并发工具的深入解析》(下)

《深入探索Java并发编程&从锁到并发工具的深入解析》(上)+https://developer.aliyun.com/article/1625012

CyclickBarrier

其实CyclickBarrier 的本质就是一个加法计数器

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

package org.example;
import java.util.concurrent.CyclicBarrier;
/**
 * @author linghu
 * @date 2023/12/20 10:32
 */
public class CyclickBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐7颗龙珠召唤神龙
         */
        //召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("召唤神龙~");
        });
        for (int i=1;i<=7;i++){
            int temp=i;
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"搜集"+temp+"颗龙珠");
                try {
                    cyclicBarrier.await();//等待计数器到7,然后往下执行~
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

如上代码运行结果如下:

总结
  • cyclicBarrier.await() 是 Java 中的一个方法,用于让当前线程等待其他线程到达屏障(CyclicBarrier)时再继续执行。当所有线程都调用了 await() 方法后,屏障才会打开,允许所有线程继续执行。这个方法通常用在多线程编程中,以确保所有线程都达到某个同步点后再继续执行。

Semaphore

在操作系统中我们学过信号量,学过PV操作。其实这里的 Semaphore信号量就是用来做线程限流操作的。

我们看如下代码:

package org.example;
import sun.awt.windows.ThemeReader;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
 * @author linghu
 * @date 2023/12/20 10:58
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        //线程数量:停车位!用来限流
        Semaphore semaphore = new Semaphore(3);
        for (int i=1;i<=6;i++){
            new Thread(()->{
                try {
                    semaphore.acquire();//得到
                    System.out.println(Thread.currentThread().getName()+"抢到车位");
                    TimeUnit.SECONDS.sleep(2);//让车多停一会
                    System.out.println(Thread.currentThread().getName()+"离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();//释放
                }
            },String.valueOf(i)).start();
        }
    }
}

在上面代码中,我们对6个线程-车进行限流,只有3个车位,6个车去抢车位!

总结
  • semaphore.acquire();//得到,等待释放为止
  • semaphore.release();//释放信号量以后会唤醒等待中的线程

关于PV操作,其实可以看我这个视频课程:

《【信号量pv操作】信号量进程同步互斥问题》

读写锁

所谓的读写锁(Readers-Writer Lock),顾名思义就是将一个锁拆分为读锁和写锁两个锁。其中读锁允许多个线程同时获得,而写锁则是互斥锁,不允许多个线程同时获得写锁,并且写操作和读操作也是互斥的。

为什么要读写锁?

Synchronized 和 ReentrantLock 都是独占锁,即在同一时刻只有一个线程获取到锁。然而在有些业务场景中,我们大多在读取数据,很少写入数据,这种情况下,如果仍使用独占锁,效率将及其低下。针对这种情况,Java提供了读写锁——ReentrantReadWriteLock。

主要解决:对共享资源有读和写的操作,且写操作没有读操作那么频繁的场景。

案例

如下分别读写线程6次执行:

package org.example;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 * @author linghu
 * @date 2023/12/20 11:50
 */
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache2 myCache2 = new MyCache2();
        int num=6;
        for (int i=1;i<=6;i++){
            int finall=i;
            //6个线程开始写
            new Thread(()->{
                myCache2.write(String.valueOf(finall),String.valueOf(finall));
            },String.valueOf(i)).start();
            //6个线程开始读
            new Thread(()->{
                myCache2.read(String.valueOf(finall));
            },String.valueOf(i)).start();
        }
    }
}
class MyCache2{
    private volatile Map<String, String> map=new HashMap<>();
    private ReadWriteLock lock=new ReentrantReadWriteLock();
    public void write(String key,String value){
        lock.writeLock().lock();//写锁
        try {
            System.out.println(Thread.currentThread().getName()+"线程开始写入");
            map.put(key,value);
            System.out.println(Thread.currentThread().getName()+"线程写入OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.writeLock().unlock();//释放写锁
        }
    }
    public void read(String key){
        lock.readLock().lock();//读锁
        try {
            System.out.println(Thread.currentThread().getName()+"线程开始读取");
            map.get(key);
            System.out.println(Thread.currentThread().getName()+"线程读取OK");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.readLock().unlock();//释放读锁
        }
    }
}

运行效果:

总结

如果我们不用锁,多线程的读写会造成数据不可靠的问题。我们可以采用独占锁:Synchronized 和 ReentrantLock保证数据可靠。但是使用更细粒度的锁读写锁(Readers-Writer Lock)效率更高!

BlockQueue

BlockQueueCollection的一个子类,我们把它叫做:阻塞队列!整个队列的家族是下面这幅图这个样子的:

通过上图我们知道了我们重点要学的是BlockQueue

BlockingQueue主要提供了四类方法,如下表所示:

同步队列

同步队列没有容量,也可以视为容量为1的队列。

  • 进去一个元素必须等待取出来以后才能往里面放元素
  • put了一个元素,就必须从里面先take出来,否则不能再put进去值!

代码

如下通过一个代码案例进行说明:

package org.example;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
/**
 * @author linghu
 * @date 2023/12/21 15:16
 */
public class SynchronousQueue {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue=
                new java.util.concurrent.SynchronousQueue<>();
        //往queue中添加元素
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"put 01");
                synchronousQueue.put("1");
                System.out.println(Thread.currentThread().getName()+"put 02");
                synchronousQueue.put("2");
                System.out.println(Thread.currentThread().getName()+"put 03");
                synchronousQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        //取出元素
        new Thread(()->{
            try {
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
                System.out.println(Thread.currentThread().getName()+"take"+synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

总结:上述代码中,三个线程负责往队列里put元素,但是只能等到对列的元素被take的时候才能往里面put!

执行结果如下:

上图得知总结:01,02在put以后只有take以后才能put03!说明了队列的同步性!

线程池

线程池用到了池化技术,其实在编程中,很多地方都有池化技术的思想,比如数据库连接池,HttpClient 连接池等。池化技术的出现主要是为了解决:资源的利用问题!提高效率,对资源进行复用!

这里我们用了线程池,目的就是 复用线程!。线程复用才可以控制最大并发数,管理线程!

线程池学习原则总结为:

  • 三大方式
  • 七大参数
  • 四种拒绝策略

线程池的三大方法

  • Executors.newSingleThreadExecutor()单个线程的创建
  • Executors.newFixedThreadPool(5)创建一个固定大小的线程池
  • Executors.newCachedThreadPool()可伸缩的线程池创建
package org.example;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
 * @author linghu
 * @date 2023/12/21 16:35
 * Executors工具类,3大方法
 */
public class Demo {
    public static void main(String[] args) {
        //单个线程池,线程
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        //固定的线程池大小
//        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //可伸缩的线程池大小
//        ExecutorService threadPool = Executors.newCachedThreadPool();
        try {
            for (int i=0;i<10;i++){
                //使用线程池创建线程,以前用new Thread()来创建
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

运行结果:

七大参数

这里说到七大参数是指我们在自定义线程池的时候用到的七大参数!

七大参数如下:

  • 核心线程数(corePoolSize):2
  • 最大线程数(maximumPoolSize):5
  • 空闲线程存活时间(keepAliveTime):3秒
  • 时间单位(unit):TimeUnit.SECONDS
  • 任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
  • 线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
  • 拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。

接下来我们通过一个银行等待业务的例子讲清楚上面的七大参数。

如上图所示,空闲线程存活时间(keepAliveTime)表示在候客区4、5窗口没人的情况下,3秒以后,这个窗口就会被释放掉,不能让它一直空闲下去,这样非常浪费资源!

接下来就是手动创建一个线程池,代码如下:

//自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,//核心线程数(corePoolSize):2
                5,//最大线程数(maximumPoolSize):5
                3,//空闲线程存活时间(keepAliveTime):3秒
                TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
                new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
                Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
        );

在上述代码的拒绝策略中,我们需要知道拒绝策略有四种:

  • new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常
  • new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里
  • new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常
  • new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常
package org.example;
import java.util.concurrent.*;
/**
 * @author linghu
 * @date 2023/12/22 10:36
 * 拒绝策略:
 *          new ThreadPoolExecutor.AbortPolicy();//银行满了,还有人进来,不处理这个人,抛出异常
 *         new ThreadPoolExecutor.CallerRunsPolicy();//哪里来的去哪里
 *         new ThreadPoolExecutor.DiscardPolicy();//队列满了,丢掉任务,不会抛出异常
 *         new ThreadPoolExecutor.DiscardOldestPolicy();//队列满了,尝试去和最早的竞争,也不会抛出异常
 */
public class Demo02 {
    public static void main(String[] args) {
        //自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        try {
            for (int i=1;i<=8;i++){
                //使用自己定义的线程池创建了线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" OK");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            threadPool.shutdown();
        }
    }
}

我们在如上代码中,创建了线程池,通过线程池创建线程,通过循环创建了8个任务,每个任务打印当前线程的名称和"OK"。这些任务被提交到自定义的线程池中执行。最后,在finally块中关闭线程池。

从上述运行结果来看,线程池中的核心线程就是1和3。最大线程数为5,空闲线程存活时间为3秒,任务队列容量为3,使用默认的线程工厂创建线程,拒绝策略为AbortPolicy

如何设置线程池的大小?

设置线程池的小主要是做调优的工作!

//自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,//核心线程数(corePoolSize):2
                5,//最大线程数(maximumPoolSize):5
                3,//空闲线程存活时间(keepAliveTime):3秒
                TimeUnit.SECONDS,//时间单位(unit):TimeUnit.SECONDS
                new LinkedBlockingDeque<>(3),//任务队列(workQueue):使用LinkedBlockingDeque作为任务队列,容量为3
                Executors.defaultThreadFactory(),//线程工厂(threadFactory):使用默认的线程工厂(Executors.defaultThreadFactory())
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略(rejectedExecutionHandler):使用AbortPolicy策略,当任务队列已满时,新提交的任务会抛出RejectedExecutionException异常。
        );

上述代码中,我们需要知道最大线程数(maximumPoolSize)应该怎么设置?

设置原则为:

  • CPU密集型:电脑的核数是几核,这里就写多少
  • I/O密集型:判断程序中消耗IO线程的数量,然后乘以1倍到2倍即可!
CPU密集型

电脑核数的查看方式有两种:

  • 任务管理器
  • Runtime.getRuntime().availableProcessors()
任务管理器

任务管理器CPU的查看如下:

Runtime.getRuntime().availableProcessors()

其实这个方法的调用如下:

public class Main {
    public static void main(String[] args) {
        int coreCount = Runtime.getRuntime().availableProcessors();
        System.out.println("电脑的核数为: " + coreCount);
    }
}
I/O密集型

这里我们需要知道我们的IO任务是多少,然后大于这个任务1倍~2倍即可。

四大函数接口

顾明思议:函数接口就是只有一个方法的接口,如下面这个接口:

总结:这种函数式接口用的非常多,特别是在很多框架中。

四大函数接口是Java中用于处理不同类型数据的方法,它们分别是:

  1. Consumer:接收一个输入参数并对其执行某种操作,但不返回任何结果。
  2. Function:接收一个输入参数并返回一个结果。
  3. Predicate:接收一个输入参数并返回一个布尔值,表示该参数是否满足某个条件。
  4. Supplier:不接收任何参数,但返回一个结果。
Function函数型接口

Function函数型接口就是有一个输入参数,有一个输出参数。

import java.util.function.Function;
/**
 * @author linghu
 * @date ${DATE} ${TIME}
 */
public class Main {
    public static void main(String[] args) {
        //输出输入的值
//        Function function=new Function<String,String>(){
//            @Override
//            public String apply(String str) {
//                return str;
//            }
//        };
        //只要是函数型接口,就可以用lambda表达式简化
        Function function1=(str)->{
          return str;
        };
        System.out.println(function1.apply("123"));
    }
}
predicate断定型接口

predicate断定型接口就是有一个输入参数,返回值只能是布尔值。

import java.util.function.Predicate;
/**
 * @author linghu
 * @date 2023/12/22 17:01
 */
public class Demo02 {
    public static void main(String[] args) {
//        Predicate<String> predicate = new Predicate<>() {
//            @Override
//            public boolean test(String s) {
//                return s.isEmpty();
//            }
//        };
        Predicate<String> predicate2 =(str)->{
            return str.isEmpty();
        };
        System.out.println(predicate2.test("addd"));
    }
}
Suppier供给型接口

Suppier供给型接口是没有参数,只有返回值

import java.util.function.Supplier;
/**
 * @author linghu
 * @date 2023/12/22 17:11
 */
public class Demo03 {
    public static void main(String[] args) {
        Supplier supplier = new Supplier<>() {
            @Override
            public Integer get() {
                return 1024;
            }
        };
        Supplier supplier2 =()->{
            return 1024;
        };
        System.out.println(supplier2.get());
    }
}
Consumer消费性接口

Consumer消费性接口只有输入,没有返回值。

import java.util.function.Consumer;
/**
 * @author linghu
 * @date 2023/12/22 17:15
 */
public class Demo04 {
    public static void main(String[] args) {
        Consumer<String> consumer = new Consumer<>() {
            @Override
            public void accept(String s) {
                System.out.println(s);
            }
        };
        Consumer<String> consumer2 =(str)->{
            System.out.println(str);
        };
        consumer2.accept("hi");
    }
}

Stream流式计算

Stream 就好像一个高级的迭代器,但只能遍历一次,就好像一江春水向东流;在流的过程中,对流中的元素执行一些操作,比如“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等。

流的操作可以分为两种类型:

1)中间操作,可以有多个,每次返回一个新的流,可进行链式操作。

2)终端操作,只能有一个,每次执行完,这个流也就用光光了,无法执行下一个操作,因此只能放在最后。

@NoArgsConstructor
public class User {
    private int id;
    private String name;
    private int age;
    public User(int i, String a, int i1) {
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
}
package org.example;
import java.util.Arrays;
import java.util.List;
/**
 * 题目要求:
 * 1、ID必须是偶数
 * 2、年龄必须大于23岁
 * 3、用户名转为大写字母
 * 4、用户名字母倒着排序
 * 5、只输出一个用户名
 */
public class Test {
    public static void main(String[] args) {
        User u1 = new User(1, "a", 21);
        User u2 = new User(2, "b", 22);
        User u3 = new User(3, "c", 23);
        User u4 = new User(4, "d", 24);
        User u5 = new User(5, "e", 25);
        //集合是存储
        List<User> list= Arrays.asList(u1,u2,u3,u4,u5);
        //计算交给Stream流
        list.stream()//下面相当于写条件
                .filter(u->{
                    return u.getId()%2==0;
                })
                .filter(u->{
                    return u.getAge()>23;
                })
                .map(u->{
                    return u.getName().toUpperCase();
                })
                .sorted((o1,o2)->{
                    return o2.compareTo(o1);
                })
                .limit(1)
                .forEach(System.out::println);
    }
}

ForkJoin

在JDK中,提供了这样一种功能:它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行,待每个并行执行的逻辑执行完成后,再将各个结果进行汇总,得出最终的结果数据。有点像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之后提供的多线程并发处理框架。ForkJoin框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值分解成多个计算,然后将各个计算结果进行汇总。相应的,ForkJoin将复杂的计算当做一个任务,而分解的多个计算则是当做一个个子任务来并行执行。

package org.example;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
 * @author linghu
 * @date 2023/12/25 11:18
 */
public class ForkJoinTest {
    private static final long SUM=20_0000_0000;
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        test01();
        test02();
        test03();
    }
    //使用普通方法
    public static void test01(){
        long start=System.currentTimeMillis();
        long sum=0L;
        for (int i=1;i<SUM;i++){
            sum+=i;
        }
        long end=System.currentTimeMillis();
        System.out.println(sum);
        System.out.println("时间:"+(end-start));
        System.out.println("============================");
    }
    //使用ForkJoin方法
    public static void test02() throws ExecutionException, InterruptedException {
        long start=System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, SUM);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long aLong = submit.get();
        System.out.println(aLong);
        Long end=System.currentTimeMillis();
        System.out.println("时间:"+(end-start));
        System.out.println("==========================");
    }
    //使用Stream流计算
    public static void test03(){
        long start=System.currentTimeMillis();
        long sum= LongStream.range(0L,2000000000L).parallel().reduce(0,Long::sum);
        System.out.println(sum);
        long end=System.currentTimeMillis();
        System.out.println("时间:"+(end-start));
        System.out.println("==========================");
    }
}
package org.example;
import java.util.Locale;
import java.util.concurrent.RecursiveTask;
/**
 * @author linghu
 * @date 2023/12/25 10:59
 */
public class ForkJoinDemo extends RecursiveTask<Long> {
    private long star;
    private long end;
    //临界值
    private long temp=100000L;
    public ForkJoinDemo(long star, long end) {
        this.star = star;
        this.end = end;
    }
    //计算方法
    @Override
    protected Long compute() {
        if ((end-star)<temp){
            Long sum=0L;
            for (Long i=star;i<end;i++){
                sum+=i;
            }
            return sum;
        }else {
            //使用ForkJoin分而治之计算
            //1、计算平均值
            long middle=(star+end)/2;
            ForkJoinDemo forkJoinDemo1 = new ForkJoinDemo(star, middle);
            //拆分任务,把线程压入线程队列
            forkJoinDemo1.fork();
            ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo(middle,end);
            forkJoinDemo2.fork();
            Long taskSum=forkJoinDemo1.join()+forkJoinDemo2.join();
            return taskSum;
        }
    }
}

ForkJoin特点:工作窃取

工作窃取

实现原理是:双端队列!

异步回调

Java中异步回调执行和前端的 Ajax其实是一样的。但是在Java中的话用的是 CompletableFuture

回调情况分为:

  • 没有返回值的runAsync异步回调
  • 有返回值的异步回调supplyAsync

没有返回值的runAsync异步回调

对于异步回调,其实就是三个过程:

  • 异步执行
  • 成功回调
  • 失败回调

代码如下:

package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @author linghu
 * @date 2023/12/25 16:11
 * desc: 异步调用:CompleteableFuture
 * //异步执行
 * //成功回调
 * //失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //发起一个请求void
        //没有返回值runAsync异步回调
        CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"runAsync=>void");
        });
        System.out.println("1111111111111");
        completableFuture.get();//获取执行结果
    }
}

总结:上面会先发起一个请求,请求休眠,同时打印语句,打印完毕以后回调get这个执行结果

有返回值的异步回调supplyAsync

whenComplete((t, u)有两个参数,一个是t,一个是u:

  • T是正常返回的结果
  • U是抛出异常的错误信息

如果发生了异常,get可以获取 exceptionally((e)返回的错误信息。

package org.example;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
 * @author linghu
 * @date 2023/12/25 16:11
 * desc: 异步调用:CompleteableFuture
 * //异步执行
 * //成功回调
 * //失败回调
 */
public class Demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
             int i=10/0;
            return 1024;
        });
        System.out.println(completableFuture.whenComplete((t, u) -> {
            System.out.println("t=>" + t);
            System.out.println("u=>" + u);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233;//获取错误的返回结果
        }).get());
    }
}

如上返回的结果:

JMM

JMM是一个概念,不是真实存在的,用来描述Java内存模型。

JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器会对代码指令重排序、处理器会对代码乱序执行等带来的问题。

关于JMM的一些同步约定

通过上图总结:

  • 线程解锁前,必须把共享变量立刻撤回主存
  • 线程加锁前,必须读取主存中的最新值到工作内存
  • 加锁和解锁是同一把锁
  • 线程中分为 工作内存主内存

现在如果有两个线程都在对主存进行操作:

通过上图发现,当我们的线程B修改了主存值,并且读取到了工作内存,这个时候线程A不知道,怎么办呢?

于是引入了 volatile

volatile

定义

  • volatile是Java提供的一种轻量级的同步机制
  • Java 语言包含两种内在的同步机制:同步块(或方法)和 volatile 变量。相比于synchronizedsynchronized通常称为重量级锁),volatile更轻量级,因为它不会引起线程上下文的切换和调度。但是volatile 变量的同步性较差(有时它更简单并且开销更低),而且其使用也更容易出错。

1)、可见性

当多个线程访问同一个变量时,一个线程修改了这个变量的值,另外一个线程是可以看到修改的值。

package org.example;
import java.util.concurrent.TimeUnit;
/**
 * @author linghu
 * @date 2023/12/26 16:12
 */
public class JMMDemo01 {
    //如果不加volatile程序会死循环
    //加了volatile是可以保证可见性的
//    private static Integer number=0;
    private volatile static Integer number=0;
    public static void main(String[] args) {
        //main线程
        //子线程1
        new Thread(()->{
            while (number==0){
            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //子线程2
        new Thread(()->{
            while (number==0){
            }
        }).start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        number=1;
        System.out.println(number);
    }
}

注:synchronized和lock都能保证可见性。

2)、不保证原子性

线程A在执行任务的时候,不能被打扰的,也不能被分割的,要么同时成功,要么同时失败。

package org.example;
/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
    private static volatile int number=0;
    public synchronized static void add(){
        number++;
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

总结:模拟多线程环境下对共享变量的操作,并展示线程之间的竞争和协作。

使用Thread.yield()方法让出CPU资源给其他线程

在这个地方,如果不加lock和synchronized,怎么保证原子性?

理论上,num的值应该为:2万。

package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
//    private static volatile AtomicInteger number=new AtomicInteger();
    private static volatile int number=0;
    public static void add(){
        number++;
//        number.incrementAndGet();//底层是CAS保证的原子性,加1操作
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

解决方案

可以用原子类解决问题,比锁的效率高很多!

package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author linghu
 * @date 2023/12/26 16:44
 */
public class VDemo02 {
    private static volatile AtomicInteger number=new AtomicInteger();
//    private static volatile int number=0;
    public static void add(){
//        number++;
        number.incrementAndGet();//底层是CAS保证的原子性,加1操作
    }
    public static void main(String[] args) {
        for (int i=1;i<=20;i++){
            new Thread(()->{
                for (int j=1;j<=1000;j++){
                    add();
                }
            }).start();
        }
        while (Thread.activeCount()>2){
            Thread.yield();
        }
        System.out.println(Thread.currentThread().getName()+",num="+number);
    }
}

3)、禁止指令重排

计算机并不是按照我们自己写的那样去执行的。

指令重排一般分为以下三种

  • 编译器优化 重新安排语句的执行顺序
  • 指令并行重排 利用指令级并行技术将多个指令并行执行,如果指令之前没有数据依赖,处理器可以改变对应机器指令的执行顺序
  • 内存系统重排 由于处理使用缓存和读写缓冲区,所以它们是乱序的
package org.example;
import org.openjdk.jol.info.ClassLayout;
/**
 * @author linghu
 * @date 2023/12/27 14:25
 */
public class CountObjectSize {
    int a=10;
    int b=20;
    double c=30.0;
    public static void main(String[] args) {
        CountObjectSize object = new CountObjectSize();
        System.out.println(ClassLayout.parseInstance(object).toPrintable());
    }
}

引入的pom:

<!--查看对象头工具-->
        <!-- https://mvnrepository.com/artifact/org.openjdk.jol/jol-core -->
        <dependency>
            <groupId>org.openjdk.jol</groupId>
            <artifactId>jol-core</artifactId>
            <version>0.16</version>
        </dependency>

总结:通过上面的代码,我们希望代码执行的顺序是:a-b-c;但实际的编译情况是,执行顺序是:a-c-b。

方案

如何禁止指令重排呢?

可以用 Volatile 关键字实现!Volatile中会加一道内存的屏障,这个内存屏障可以保证在这个屏障中的指令顺序。

内存屏障:CPU指令

总结
  • volatile可以保证可见性
  • 不能保证原子性
  • 由于内存屏障,可以保证避免指令重排的现象产生

单例模式

单例模式的文章我之前写过,文章如下:

1)、饿汉式

如下饿汉式单例代码:

package org.example;
/**
 * @author linghu
 * @date 2023/12/28 9:41
 * 饿汉式单例
 */
public class Hungry {
    private byte[] data1=new byte[1024*1024];
    private byte[] data2=new byte[1024*1024];
    private byte[] data3=new byte[1024*1024];
    private byte[] data4=new byte[1024*1024];
    //私有化构造器
    public Hungry() {
    }
    private final static Hungry HUNGRY=new Hungry();
    public static Hungry getInstance(){
        return HUNGRY;
    }
}

2)、DCL懒汉式

package org.example;
/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private static LazyMan lazyMan;
    public static LazyMan getInstance(){
        if (lazyMan==null){
            lazyMan=new LazyMan();
        }
        return lazyMan;
    }
    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }
}
package org.example;
/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private static LazyMan lazyMan;
    //双重检测锁模式的 懒汉式单例 DCL懒汉式
    public static LazyMan getInstance(){
        if (lazyMan==null){
            synchronized (LazyMan.class){
                if (lazyMan==null){
                    lazyMan=new LazyMan();
                }
            }
        }
        return lazyMan;
    }
    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }
}

加volatile可以防止指令重排

private volatile static LazyMan lazyMan;

package org.example;
/**
 * @author linghu
 * @date 2023/12/27 17:23
 * 懒汉式
 */
public class LazyMan {
    //私有构造器
    public LazyMan() {
        System.out.println(Thread.currentThread().getName()+"OK");
    }
    private volatile static LazyMan lazyMan;
    //双重检测锁模式的 懒汉式单例 DCL懒汉式
    public static LazyMan getInstance(){
        if (lazyMan==null){
            synchronized (LazyMan.class){
                if (lazyMan==null){
                    lazyMan=new LazyMan();
                }
            }
        }
        return lazyMan;
    }
    //多线程合并会有隐患!
    public static void main(String[] args) {
        for (int i=0;i<10;i++){
            new Thread(()->{
                lazyMan.getInstance();
            }).start();
        }
    }
}

深入理解CAS

CAS就是比较以前工作内存中的值 和 主存内存中的值,如果这个值是期望的,那么执行操作! 如果不是,就一直循环,使用的是 自旋锁。

package org.example;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author linghu
 * @date 2023/12/29 9:54
 */
public class casDemo {
    //CAS:compareAndSet
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(2020);
        //如果实际值 和 期望值相同,那么就更新
        //如果实际值 和 期望值不同,那么就不更新
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());
        //因为期望值是2020,实际值却变成了2021 所以会修改失败!
        atomicInteger.getAndIncrement();//++操作
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());
    }
}

总结:

  • CAS 是CPU的并发原语。

这段代码演示了如何使用原子操作来安全地更新一个整数值,并展示了compareAndSet方法的工作原理。

缺点:

  • 循环会耗时
  • 一次性只能保证一个共享变量的原子性
  • 它会存在ABA问题【解决方案:原子引用】
目录
相关文章
|
1天前
|
安全 Java
《深入探索Java并发编程&从锁到并发工具的深入解析》(上)
《深入探索Java并发编程&从锁到并发工具的深入解析》(上)
6 0
|
1月前
|
监控 Java 开发者
【并发编程的终极简化】JDK 22结构化并发:让并发编程变得像写代码一样简单!
【9月更文挑战第8天】随着JDK 22的发布,结构化并发为Java编程带来了全新的并发编程体验。它不仅简化了并发编程的复杂性,提高了程序的可靠性和可观察性,还为开发者们提供了更加高效、简单的并发编程方式。我们相信,在未来的发展中,结构化并发将成为Java并发编程的主流方式之一,推动Java编程语言的进一步发展。让我们共同期待Java在并发编程领域的更多创新和突破!
|
1月前
|
存储 监控 算法
深入探究Java线程池:提升并发性能的利器
在当今高度并发的应用开发中,Java线程池作为一种广泛应用的并发编程技术,提供了一种优雅且高效的线程管理方案。本文深入探究Java线程池的相关技术,涵盖其核心概念、优势、常见类型(如FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool、ForkJoinPool及WorkStealingPool)、核心参数配置、异常处理与监控方法,以及性能调优的最佳实践,帮助读者更好地理解和应用线程池,从而提升并发性能。
|
3月前
|
安全 Java 开发者
Java并发编程:理解并发安全与性能优化
在当今软件开发中,Java作为一种广泛使用的编程语言,其并发编程能力显得尤为重要。本文深入探讨了Java中的并发编程,包括如何确保并发安全性以及优化并发程序的性能。通过分析常见的并发问题和解决方案,读者将能够更好地理解如何利用Java的并发工具包来构建可靠和高效的多线程应用程序。 【7月更文挑战第10天】
48 3
|
3月前
|
安全 Java 调度
Java面试题:Java内存优化、多线程安全与并发框架实战,如何在Java应用中实现内存优化?在多线程环境下,如何保证数据的线程安全?使用Java并发工具包中的哪些工具可以帮助解决并发问题?
Java面试题:Java内存优化、多线程安全与并发框架实战,如何在Java应用中实现内存优化?在多线程环境下,如何保证数据的线程安全?使用Java并发工具包中的哪些工具可以帮助解决并发问题?
55 0
|
3月前
|
监控 Java
Java面试题:Java内存、多线程与并发工具包的深度探索,Java内存管理策略及其优化技巧,Java多线程并发控制的工具类与机制,Java并发工具包在实际项目中的应用
Java面试题:Java内存、多线程与并发工具包的深度探索,Java内存管理策略及其优化技巧,Java多线程并发控制的工具类与机制,Java并发工具包在实际项目中的应用
31 0
|
3月前
|
存储 算法 安全
Java面试题:给定一个可能产生内存泄漏的场景,如何诊断并解决?实现一个生产者-消费者模型,使用适当的同步机制与并发工具类,Java并发工具包与框架:性能与调优
Java面试题:给定一个可能产生内存泄漏的场景,如何诊断并解决?实现一个生产者-消费者模型,使用适当的同步机制与并发工具类,Java并发工具包与框架:性能与调优
28 0
|
3月前
|
存储 安全 Java
Java面试题:Java内存管理、多线程与并发框架:一道综合性面试题的深度解析,描述Java内存模型,并解释如何在应用中优化内存使用,阐述Java多线程的创建和管理方式,并讨论线程安全问题
Java面试题:Java内存管理、多线程与并发框架:一道综合性面试题的深度解析,描述Java内存模型,并解释如何在应用中优化内存使用,阐述Java多线程的创建和管理方式,并讨论线程安全问题
35 0
|
3月前
|
Java 测试技术 容器
多线程编程基础与并发问题解决方案
多线程编程基础与并发问题解决方案
|
5月前
|
安全 Go 对象存储
C++多线程编程:并发与同步的实战应用
本文介绍了C++中的多线程编程,包括基础知识和实战应用。C++借助`&lt;thread&gt;`库支持多线程,通过`std::thread`创建线程执行任务。文章探讨了并发与同步的概念,如互斥锁(Mutex)用于保护共享资源,条件变量(Condition Variable)协调线程等待与通知,以及原子操作(Atomic Operations)保证线程安全。实战部分展示了如何使用多线程进行并发计算,利用`std::async`实现异步任务并获取结果。多线程编程能提高效率,但也需注意数据竞争和同步问题,以确保程序的正确性。