六、线程安全
为什么会出现线程安全问题?
线程不安全:
当多线程并发访问临界资源时,如果破坏原子操作,可能会造成数据不一致。
临界资源:共享资源(同一对象),一次仅允许一个线程使用,才可保证其正确性。
原子操作:不可分割的多步操作,被视作一个整体,其顺序和步骤不可打乱或缺省。
线程安全问题都是由全局变量及静态变量引起的。若每个线程中对全局变量、静态变量只有读操作,而无写操作,一般来说,这个全局变量是线程安全的;若有多个线程同时执行写操作,一般都需要考虑线程同步,否则的话就可能影响线程安全。
示例:
class TicketRunnable implements Runnable{ private int ticket=100; //每个窗口卖票的操作 //窗口 永远开启 @Override public void run() { while(true){//有票可以卖 //出票操作 if(ticket>0){ //使用sleep模拟一下出票时间 //模拟一下出票的时间 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"正在卖票:"+ticket--); } } } } public class ThreadSafe { public static void main(String[] args) throws Exception{ TicketRunnable t = new TicketRunnable(); Thread t1 = new Thread(t,"窗口1"); Thread t2 = new Thread(t,"窗口2"); Thread t3 = new Thread(t,"窗口3"); //3个窗口同时卖票 t1.start(); t2.start(); t3.start(); } }
为了保证每个线程都能正常执行原子操作,Java引入了线程同步机制。那么怎么去使用呢?有三种方式完成同步操作:
同步代码块。
同步方法。
锁机制。
同步代码块
语法:
synchronized(临界资源对象){ //对临界资源对象加锁 //代码(原子操作) }
同步锁:
对象的同步锁只是一个概念,可以想象为在对象上标记了一个锁.
锁对象 可以是任意类型。
多个线程对象 要使用同一把锁。
注意:在任何时候,最多允许一个线程拥有同步锁,谁拿到锁就进入代码块,其他的线程只能在外等着(BLOCKED)。
示例:
package com.qf.sync; class Ticket2 implements Runnable{ private int ticket=100; Object lock = new Object(); //每个窗口卖票的操作 //窗口 永远开启 @Override public void run() { while(true){//有票可以卖 synchronized(lock){//synchronized (this) {//this ---当前对象 if(ticket>0){ //出票操作 //使用sleep模拟一下出票时间 try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"正在卖票:"+ticket--); } } } } } public class TicketDemo2 { public static void main(String[] args) { Ticket2 ticket2 = new Ticket2(); Thread t1 = new Thread(ticket2,"窗口1"); Thread t2 = new Thread(ticket2,"窗口2"); Thread t3 = new Thread(ticket2,"窗口3"); //3个窗口同时卖票 t1.start(); t2.start(); t3.start(); } }
同步方法
同步方法 :使用synchronized修饰的方法,就叫做同步方法,保证A线程执行该方法的时候,其他线程只能在方法外等着。
语法:
synchronized 返回值类型 方法名称(形参列表){ //对当前对象(this)加锁 // 代码(原子操作) }
只有拥有对象互斥锁标记的线程,才能进入该对象加锁的同步方法中。
线程退出同步方法时,会释放相应的互斥锁标记。
如果方式是静态,锁是类名.class。
示例:
class Ticket3 implements Runnable{ private int ticket=100; //Object lock = new Object(); //每个窗口卖票的操作 //窗口 永远开启 @Override public void run() { while(true){//有票可以卖 sellTicket(); if(ticket<=0){ break; } } } /** * 锁对象,谁调用这个方法,就是谁 * 隐含锁对象,就是this * * 静态方法,隐含锁对象就是Ticket3.class */ public synchronized void sellTicket(){ if(ticket>0){ //出票操作 //使用sleep模拟一下出票时间 try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"正在卖票:"+ticket--); } } } public class TicketDemo3 { public static void main(String[] args) { Ticket3 ticket3 = new Ticket3(); Thread t1 = new Thread(ticket3,"窗口1"); Thread t2 = new Thread(ticket3,"窗口2"); Thread t3 = new Thread(ticket3,"窗口3"); //3个窗口同时卖票 t1.start(); t2.start(); t3.start(); } }
synchronized注意点
同步锁是谁?
对于非static方法,同步锁就是this。
对于static方法,我们使用当前方法所在类的字节码对象(类名.class)。
Lock
JDK5加入,与synchronized比较,显示定义,结构更灵活。
提供更多实用性方法,功能更强大、性能更优越。
常用方法:
方法名 描述
void lock() 获取锁,如锁被占用,则等待。
boolean tryLock() 尝试获取锁(成功返回true。失败返回false,不阻塞)。
void unlock() 释放锁。
ReentrantLock:
Lock接口的实现类,与synchronized一样具有互斥锁功能。
示例:
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyList { //创建锁 private Lock lock = new ReentrantLock(); private String[] str = {"A","B","","",""}; private int count = 2; public void add(String value){ //当没有锁的时候,会出现覆盖的情况 str[count] = value; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println(Thread.currentThread().getName()+"添加了"+value); // lock.lock(); // try { // str[count] = value; // try { // Thread.sleep(100); // } catch (InterruptedException e) { // e.printStackTrace(); // } // count++; // System.out.println(Thread.currentThread().getName()+"添加了"+value); // }finally { // lock.unlock(); // } } public String[] getStr(){ return str; } }
测试:
public class TestMyList { public static void main(String[] args) throws InterruptedException { MyList myList = new MyList(); // Thread t1 =new Thread(new Runnable() { @Override public void run() { myList.add("hello"); } }); t1.start(); Thread t2 = new Thread(new Runnable() { @Override public void run() { myList.add("world"); } }); t2.start(); t1.join(); t2.join(); String[] str = myList.getStr(); for (String s : str) { System.out.println("s:"+s); } } }
七、线程通信
概述
多个线程在处理同一个资源,但是处理的动作(线程的任务)却不相同。
为什么要处理线程间通信
多个线程并发执行时, 在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行, 那么多线程之间需要一些协调通信,以此来帮我们达到多线程共同操作一份数据。
如何保证线程间通信有效利用资源
多个线程在处理同一个资源,并且任务不同时,需要线程通信来帮助解决线程之间对同一个变量的使用或操作。 就是多个线程在操作同一份数据时, 避免对同一共享变量的争夺。也就是我们需要通过一定的手段使各个线程能有效的利用资源。而这种手段即—— 等待唤醒机制。
等待唤醒机制
什么是等待唤醒机制
这是多个线程间的一种协作机制。谈到线程我们经常想到的是线程间的竞争(race),比如去争夺锁,但这并不是故事的全部,线程间也会有协作机制。就好比在公司里你和你的同事们,你们可能存在在晋升时的竞争,但更多时候你们更多是一起合作以完成某些任务。
就是在一个线程进行了规定操作后,就进入等待状态(wait()), 等待其他线程执行完他们的指定代码过后 再将其唤醒(notify());在有多个线程进行等待时, 如果需要,可以使用 notifyAll()来唤醒所有的等待线程。
wait/notify 就是线程间的一种协作机制。
线程通信方法
方法 说明
public final void wait() 释放锁,进入等待队列
public final void wait(long timeout) 在超过指定的时间前,释放锁,进入等待队列
public final void notify() 随机唤醒、通知一个线程
public final void notifyAll() 唤醒、通知所有线程
注意:所有的等待、通知方法必须在对加锁的同步代码块中。
等待唤醒机制就是用于解决线程间通信的问题的,使用到的3个方法的含义如下:
wait:线程不再活动,不再参与调度,进入 wait set(锁池) 中,因此不会浪费 CPU 资源,也不会去竞争锁了,这时的线程状态即是 WAITING。它还要等着别的线程执行一个特别的动作,也即是“通知(notify)”在这个对象上等待的线程从wait set 中释放出来,重新进入到调度队列(ready queue)中
wait(long m):wait方法如果在指定的毫秒之后,还没有被notify唤醒,就会自动醒来
sleep(long m):不会释放锁
notify:则选取所通知对象的 wait set 中的一个线程释放;例如,餐馆有空位置后,等候就餐最久的顾客最先入座。
notifyAll:则释放所通知对象的 wait set 上的全部线程。
示例:
/* 等待唤醒案例: 1,创建一个顾客线程(消费者):告知老板要的包子种类和数量,调用wait方法,放弃cpu的执行,进入等待状态 2,创建一个老板线程(生产者):花了5秒做包子,做好包子之后,调用notify方法,通知顾客吃包子, 注意: 顾客和老板线程必须使用同步代码块包裹起来,保证等待和唤醒只能有一个在执行 同步使用的锁对象必须是唯一的, 只有锁对象才能调用wait方法和notify方法 */ public class Demo1 { public static void main(String[] args) { //创建锁对象,保证唯一 Object obj =new Object(); //创建顾客线程 new Thread(){ @Override public void run() { while(true){ //保证等待和唤醒只能有一个在执行 synchronized (obj){ System.out.println("告知老板要的包子种类和数量"); //进入等待 try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } //唤醒之后执行的代码 System.out.println("拿到包子,开始吃。。。"); System.out.println("---------------------"); } } } }.start(); //创建老板线程 new Thread(){ @Override public void run() { while(true){ //花5秒钟做包子, try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } //保证等待和唤醒只能有一个在执行 synchronized (obj){ System.out.println("包子做好了。。。。"); //做好包子之后,调用notify方法,通知顾客吃包子, obj.notify(); } } } }.start(); } }
注意:
哪怕只通知了一个等待的线程,被通知线程也不能立即恢复执行,因为它当初中断的地方是在同步块内,而此刻它已经不持有锁,所以她需要再次尝试去获取锁(很可能面临其它线程的竞争),成功后才能在当初调用 wait 方法之后的地方恢复执行。
总结如下:
如果能获取锁,线程就从 WAITING 状态变成 RUNNABLE 状态;
否则,从 wait set 出来,又进入 entry set,线程就从 WAITING 状态又变成 BLOCKED 状态
调用wait和notify方法需要注意的细节
wait方法与notify方法必须要由同一个锁对象调用。因为:对应的锁对象可以通过notify唤醒使用同一个锁对象调用的wait方法后的线程。
wait方法与notify方法是属于Object类的方法的。因为:锁对象可以是任意对象,而任意对象的所属类都是继承了Object类的。
wait方法与notify方法必须要在同步代码块或者是同步函数中使用。因为:必须要通过锁对象调用这2个方法。
八、死锁
多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。
如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,所以这两个线程就会互相等待而进入死锁状态。
示例:
package com.qf.safe; public class DeadLockDemo { private static Object lock1 = new Object();//锁1,资源1 private static Object lock2 = new Object();//锁2,资源2 public static void main(String[] args) { //启动一个线程 new Thread(new Runnable() { @Override public void run() { synchronized(lock1){ System.out.println(Thread.currentThread().getName()+"拿到了锁1,资源1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"等待锁2,资源2"); synchronized (lock2){ System.out.println(Thread.currentThread().getName()+"拿到了锁2,资源2"); } } } },"线程1").start(); //产生死锁的线程 // new Thread(new Runnable() { // @Override // public void run() { // synchronized(lock2){ // System.out.println(Thread.currentThread().getName()+"拿到了锁2,资源2"); // try { // Thread.sleep(1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // System.out.println(Thread.currentThread().getName()+"等待锁1,资源1"); // synchronized (lock1){ // System.out.println(Thread.currentThread().getName()+"拿到了锁1,资源1"); // } // } // } // },"线程2").start(); } }
线程 A 通过 synchronized (resource1) 获得 resource1 的监视器锁,然后通过Thread.sleep(1000);让线程 A 休眠 1s 为的是让线程 B 得到执行然后获取到 resource2 的监视器锁。线程 A 和线程 B 休眠结束了都开始企图请求获取对方的资源,然后这两个线程就会陷入互相等待的状态,这也就产生了死锁。
破坏死锁
//破坏死锁 new Thread(new Runnable() { @Override public void run() { synchronized(lock1){ System.out.println(Thread.currentThread().getName()+"拿到了锁1,资源1"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"等待锁2,资源2"); synchronized (lock2){ System.out.println(Thread.currentThread().getName()+"拿到了锁2,资源2"); } } } },"线程2").start();
线程 1 首先获得到 resource1 的监视器锁,这时候线程 2 就获取不到了。然后线程 1 再去获取 resource2 的监视器锁,可以获取到。然后线程 1 释放了对 resource1、resource2 的监视器锁的占用,线程 2 获取到就可以执行了。这样就破坏了破坏循环等待条件,因此避免了死锁。
九、线程池
概述
我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
在Java中可以通过线程池来达到这样的效果。
**线程池:**其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。
合理利用线程池能够带来三个好处:
降低资源消耗。减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。
提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性。可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
线程池的使用
Java里面线程池的顶级接口是java.util.concurrent.Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是java.util.concurrent.ExecutorService。
要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在java.util.concurrent.Executors线程工厂类里面提供了一些静态工厂,生成一些常用的线程池。官方建议使用Executors工程类来创建线程池对象。
Java类库提供了许多静态方法来创建一个线程池:
Executors类中创建线程池的方法如下:
a、newFixedThreadPool 创建一个固定长度的线程池,当到达线程最大数量时,线程池的规模将不再变化。
b、newCachedThreadPool 创建一个可缓存的线程池,如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模无限制。
c、newSingleThreadPoolExecutor 创建一个单线程的Executor,确保任务对了,串行执行
d、newScheduledThreadPool 创建一个固定长度的线程池,而且以延迟或者定时的方式来执行,类似Timer;
使用线程池中线程对象的步骤:
创建线程池对象。
创建Runnable接口子类对象。(task)
提交Runnable接口子类对象。(take task)
获取到了一个线程池ExecutorService 对象,定义了一个使用线程池对象的方法如下:
public Future submit(Runnable task):获取线程池中的某一个线程对象,并执行
Future接口:用来记录线程任务执行完毕后产生的结果。线程池创建与使用。
关闭线程池(一般不做)。
示例:
package com.qf.threadpool; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; class MyThread implements Runnable{ @Override public void run() { System.out.println("我要一个教练"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("教练来了:"+Thread.currentThread().getName()); System.out.println("教完后,教练回到了游泳池"); } } public class ThreadPoolDemo { public static void main(String[] args) { // //创建一个包含固定数量的线程池对象 // ExecutorService executorService = Executors.newFixedThreadPool(2); // //创建一个包含单条线程的线程池 // ExecutorService executorService = Executors.newSingleThreadExecutor(); // //创建一个带缓冲区的线程池,会根据需求创建线程 // ExecutorService executorService = Executors.newCachedThreadPool(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); //创建Runnable实例对象 MyThread r = new MyThread(); //自己创建线程的方式 // Thread t = new Thread(r); // t.start(); // //从线程池中获取线程对象,然后调用MyThread的run方法 // executorService.submit(r); // //再获取一个线程对象, // executorService.submit(r); // executorService.submit(r); // //注意:submit方法调用后,程序并不终止,因为线程次控制了线程的关闭 // //使用完,又归还到了线程池中, // // //关闭线程池 // executorService.shutdown(); for (int i = 0; i < 10; i++) { scheduledExecutorService.schedule(r,10, TimeUnit.SECONDS);//延迟10秒执行 } scheduledExecutorService.shutdown();;//执行到此处并不会马上关闭连接池 // while(!scheduledExecutorService.isTerminated()){ // // } System.out.println("Main Thread finished at"+new Date()); } }
Callable接口
一般情况下,使用Runnable接口、Thread实现的线程我们都是无法返回结果的。但是如果对一些场合需要线程返回的结果。就要使用用Callable、Future这几个类。Callable只能在ExecutorService的线程池中跑,但有返回结果,也可以通过返回的Future对象查询执行状态。Future 本身也是一种设计模式,它是用来取得异步任务的结果
看看其源码:
public interface Callable { V call() throws Exception; } 1 2 3 它只有一个call方法,并且有一个返回V,是泛型。可以认为这里返回V就是线程返回的结果。 ExecutorService接口:线程池执行调度框架 Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); 1 2 3 示例: import java.util.Random; import java.util.concurrent.*; class HandleCallable implements Callable { private String name; public HandleCallable(String name) { this.name = name; } @Override public Integer call() throws Exception { System.out.println("task"+ name + "开始进行计算"); Thread.sleep(3000); int sum = new Random().nextInt(300); int result = 0; for (int i = 0; i < sum; i++) result += i; return result; } } public class FutureTest{ public static void main(String[] args) { System.out.println("main Thread begin at:"+ System.nanoTime()); //创建线程池对象 ExecutorService executor = Executors.newCachedThreadPool(); HandleCallable task1 = new HandleCallable("1"); HandleCallable task2 = new HandleCallable("2"); HandleCallable task3 = new HandleCallable("3"); //执行 Future result1 = executor.submit(task1); Future result2 = executor.submit(task2); Future result3 = executor.submit(task3); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } //获取到返回的直接 try { System.out.println("task1运行结果:"+result1.get()); System.out.println("task2运行结果:"+result2.get()); System.out.println("task3运行结果:"+result3.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("main Thread finish at:"+ System.nanoTime()); } }
十、线程安全集合
CopyOnWriteArrayList[重点]
线程安全的ArrayList,加强版读写分离。
写有锁,读无锁,读写之间不阻塞,优于读写锁。
写入时,先copy一个容器副本、再添加新元素,最后替换引用。
使用方式与ArrayList无异。
示例:
public class TestCopyOnWriteArrayList { public static void main(String[] args) { //1创建集合 CopyOnWriteArrayList list=new CopyOnWriteArrayList<>(); //2使用多线程操作 ExecutorService es=Executors.newFixedThreadPool(5); //3提交任务 for(int i=0;i<5;i++) { es.submit(new Runnable() { @Override public void run() { for(int j=0;j<10;j++) { list.add(Thread.currentThread().getName()+"...."+new Random().nextInt(1000)); } } }); } //4关闭线程池 es.shutdown(); while(!es.isTerminated()) {} //5打印结果 System.out.println("元素个数:"+list.size()); for (String string : list) { System.out.println(string); } } }
CopyOnWriteArrayList如何做到线程安全的
CopyOnWriteArrayList使用了一种叫写时复制的方法,当有新元素添加到CopyOnWriteArrayList时,先从原有的数组中拷贝一份出来,然后在新的数组做写操作,写完之后,再将原来的数组引用指向到新数组。
当有新元素加入的时候,如下图,创建新数组,并往新数组中加入一个新元素,这个时候,array这个引用仍然是指向原数组的。
当元素在新数组添加成功后,将array这个引用指向新数组。
CopyOnWriteArrayList的整个add操作都是在锁的保护下进行的。 这样做是为了避免在多线程并发add的时候,复制出多个副本出来,把数据搞乱了,导致最终的数组数据不是我们期望的。
CopyOnWriteArrayList的add操作的源代码如下:
public boolean add(E e) { //1、先加锁 final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //2、拷贝数组 Object[] newElements = Arrays.copyOf(elements, len + 1); //3、将元素加入到新数组中 newElements[len] = e; //4、将array引用指向到新数组 setArray(newElements); return true; } finally { //5、解锁 lock.unlock(); } }
由于所有的写操作都是在新数组进行的,这个时候如果有线程并发的写,则通过锁来控制,如果有线程并发的读,则分几种情况:
1、如果写操作未完成,那么直接读取原数组的数据;
2、如果写操作完成,但是引用还未指向新数组,那么也是读取原数组数据;
3、如果写操作完成,并且引用已经指向了新的数组,那么直接从新数组中读取数据。
可见,CopyOnWriteArrayList的读操作是可以不用加锁的。
CopyOnWriteArraySet
示例:
public class TestCopyOnWriteArraySet { public static void main(String[] args) { //1创建集合 CopyOnWriteArraySet set=new CopyOnWriteArraySet<>(); //2添加元素 set.add("pingguo"); set.add("huawei"); set.add("xiaomi"); set.add("lianxiang"); set.add("pingguo"); //3打印 System.out.println("元素个数:"+set.size()); System.out.println(set.toString()); } }
ConcurrentHashMap[重点]
初始容量默认为16段(Segment),使用分段锁设计。
不对整个Map加锁,而是为每个Segment加锁。
当多个对象存入同一个Segment时,才需要互斥。
最理想状态为16个对象分别存入16个Segment,并行数量16。
使用方式与HashMap无异。
示例:
public class TestConcurrentHashMap { public static void main(String[] args) { //1创建集合 ConcurrentHashMap hashMap=new ConcurrentHashMap(); //2使用多线程添加数据 for(int i=0;i<5;i++) { new Thread(new Runnable() { @Override public void run() { for(int k=0;k<10;k++) { hashMap.put(Thread.currentThread().getName()+"--"+k, k+""); System.out.println(hashMap); } } }).start(); } } }