我看JAVA 之 Thread & ThreadLocal
注:基于jdk11
Thread
Thread是程序里执行的一个线程。JVM允许一个应用程序中可以有多个线程并发执行。
每一个线程都有一个优先级,高优先级的线程优于低优先级的线程先执行。同时,线程还可以被标记为守护线程。线程在被创建的时候优先级默认等同于创建者的优先级。
创建一个Thread通常有如下几种方式:
-
继承Thread类,重写run()方法
public class ConcreteThread extends Thread(){ public void run() { .... } } new ConcreteThread().start()
-
实现Runnable接口,重写run()方法
public class ConcreteThread implements Runnable(){ public void run() { .... } } new ConcreteThread().start()
-
匿名类方式
new Thread(new Runnable() { public void run() { .... } }).start()
实现了如下接口
- Runnable 被FunctionalInterface注解的接口,定义了public abstract void run()方法供子类去实现。
几个重要的成员变量
- private volatile String name; 被volatile修饰的name,每个线程必须有一个唯一的名字,方便调试,一般为 Thread-nextThreadNum()
- private boolean daemon = false; 是否守护进程,默认为否
- private boolean stillborn = false;
- private long eetop;
- private Runnable target; 执行目标
- private ThreadGroup group; 线程组,默认为ecurity.getThreadGroup() 或 父线程所在组
- private ClassLoader contextClassLoader;
- private AccessControlContext inheritedAccessControlContext;
- private static int threadInitNumber; 与Thread-拼接构成线程默认名称,private static synchronized int nextThreadNum()对其递增threadInitNumber++
- ThreadLocal.ThreadLocalMap threadLocals = null;
- ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
- private final long stackSize; 为当前线程申请的栈空间,默认为0,取决于vm设计实现,有些vm会直接忽略此配置
- private long nativeParkEventPointer;
- private final long tid; 当前线程ID
- private static long threadSeqNumber; 线程id计数器,private static synchronized long nextThreadID()对其递增++threadSeqNumber
- private volatile int threadStatus;
- volatile Object parkBlocker;
- private volatile Interruptible blocker;
- private final Object blockerLock = new Object();
- public static final int MIN_PRIORITY = 1; 线程可以设置的最小优先级
public static final int NORM_PRIORITY = 5;线程默认优先级
public static final int MAX_PRIORITY = 10;线程可以设置的最大优先级
线程的优先级会对应到不同操作系统的优先级,JVM不一定设置的优先级进行线程调度 - 异常处理相关
//当前线程异常处理handler,由volatile修饰
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
//所有线程缺省异常处理handler,由static volatile修饰
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;
几个重要的方法
-
启动线程,JVM会调用当前线程的run方法
public synchronized void start() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) throw new IllegalThreadStateException(); /* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } } private native void start0();
-
停止线程,已过时
@Deprecated(since="1.2") public final void stop() { SecurityManager security = System.getSecurityManager(); if (security != null) { checkAccess(); if (this != Thread.currentThread()) { security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION); } } // A zero status value corresponds to "NEW", it can't change to // not-NEW because we hold the lock. if (threadStatus != 0) { resume(); // Wake up thread if it was suspended; no-op otherwise } // The VM can handle all thread states stop0(new ThreadDeath()); } private native void stop0(Object o);
stop线程容易导致出现如下两种情况:
- 立即停止run()方法中剩余工作(包括在catch或finally语句),并抛出ThreadDeath异常(通常情况下此异常不需要显示的捕获),可能会导致一些清理性工作的得不到执行,如文件流,数据库连接等的关闭。
- 会立即释放该线程所持有的所有的锁,导致数据得不到同步的处理,出现数据不一致的问题
-
中断
public void interrupt() { if (this != Thread.currentThread()) { checkAccess(); // thread may be blocked in an I/O operation synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // set interrupt status b.interrupt(this); return; } } } // set interrupt status interrupt0(); } private native void interrupt0(); public static boolean interrupted() { return currentThread().isInterrupted(true); } private native boolean isInterrupted(boolean ClearInterrupted);
-
join 插队并阻塞当前执行线程,使用 loop + wait 的方式实现
public final void join() throws InterruptedException { join(0); } public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } } public final native boolean isAlive();
-
suspend 与 resume 要成对出现,如果A线程访问某个资源x时suspend(),那么没有任何线程可以访问资源x直到A线程被resume()
@Deprecated(since="1.2") public final void suspend() { checkAccess(); suspend0(); } @Deprecated(since="1.2") public final void resume() { checkAccess(); resume0(); }
线程状态及状态转换
-
状态定义
public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
- 状态图
-
例子:
package chapter02; public class TestThread { public static void main(String [] args) throws InterruptedException { final Thread thread0 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入run"); try { System.out.printf("enter run(), thread0' state: %s\n", Thread.currentThread().getState()); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("异常处理"); System.out.printf("on catch interrupt, thread0 isInterrupted or not ? %s \n", Thread.currentThread().isInterrupted()); System.out.printf("on catch interrupt, thread0' state: %s\n", Thread.currentThread().getState()); return; } System.out.println("退出run"); } }); Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread1's run"); try { Thread.sleep(1000); System.out.printf("before interrupt, thread0 isInterrupted or nott ? %s \n", thread0.isInterrupted()); System.out.printf("enter thread1's run(), thread0' state: %s\n", thread0.getState()); Thread.sleep(1000); thread0.interrupt(); System.out.printf("after interrupt, thread0 isInterrupted or not ? %s \n", thread0.isInterrupted()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("退出thread1's run"); } }); System.out.printf("after new(), thread0' state: %s\n", thread0.getState()); thread0.start(); System.out.printf("after start(), thread0' state: %s\n", thread0.getState()); thread1.start(); thread0.join(); System.out.printf("after join(), thread0' state: %s\n", thread0.getState()); System.out.println("退出"); } }
打印结果如下:
after new(), thread0' state: NEW after start(), thread0' state: RUNNABLE 进入run enter run(), thread0' state: RUNNABLE 进入thread1's run before interrupt, thread0 isInterrupted or nott ? false enter thread1's run(), thread0' state: TIMED_WAITING after interrupt, thread0 isInterrupted or not ? false 退出thread1's run 异常处理 on catch interrupt, thread0 isInterrupted or not ? false on catch interrupt, thread0' state: RUNNABLE after join(), thread0' state: TERMINATED 退出
异常捕获
- 说明:
//当前线程异常处理handler,由volatile修饰
private volatile UncaughtExceptionHandler uncaughtExceptionHandler;
//所有线程缺省异常处理handler,由static volatile修饰
private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler; -
例子:
package chapter02; public class TestThread { public static void main(String [] args) throws InterruptedException { //全局异常处理器 Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("-" + Thread.currentThread().getName()); String threadName = t.getName(); System.out.printf("global exception handler >> : current thread's name is %s, ", threadName); System.out.printf("the error is %s \n",e.getLocalizedMessage()); } }); final Thread thread0 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread0's run"); System.out.printf("enter run(), thread0' state: %s\n", Thread.currentThread().getState()); try { Thread.sleep(5000); } catch (InterruptedException e) { // e.printStackTrace(); // System.out.println("异常处理"); // System.out.printf("on catch interrupt, thread0 isInterrupted or not ? %s \n", Thread.currentThread().isInterrupted()); // System.out.printf("on catch interrupt, thread0' state: %s\n", Thread.currentThread().getState()); // // return; throw new RuntimeException(e); } System.out.println("退出thread0's run"); } }); //thread0异常处理器 thread0.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("-" + Thread.currentThread().getName()); String threadName = t.getName(); System.out.printf("thread0 exception handler >> : current thread's name is %s, ", threadName); System.out.printf("the error is %s \n",e.getLocalizedMessage()); } }); Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread1's run"); try { Thread.sleep(1000); System.out.printf("before interrupt, thread0 isInterrupted or nott ? %s \n", thread0.isInterrupted()); System.out.printf("enter thread1's run(), thread0' state: %s\n", thread0.getState()); Thread.sleep(1000); thread0.interrupt(); System.out.printf("after interrupt, thread0 isInterrupted or not ? %s \n", thread0.isInterrupted()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("退出thread1's run"); } }); System.out.printf("after new(), thread0' state: %s\n", thread0.getState()); thread0.start(); System.out.printf("after start(), thread0' state: %s\n", thread0.getState()); thread1.setDaemon(true); thread1.start(); thread0.join(); thread1.join(); System.out.printf("after join(), thread0' state: %s\n", thread0.getState()); System.out.println("退出"); } }
打印结果如下:
after new(), thread0' state: NEW after start(), thread0' state: RUNNABLE 进入thread0's run enter run(), thread0' state: RUNNABLE 进入thread1's run before interrupt, thread0 isInterrupted or nott ? false enter thread1's run(), thread0' state: TIMED_WAITING after interrupt, thread0 isInterrupted or not ? true -Thread-0 thread0 exception handler >> : current thread's name is Thread-0, the error is java.lang.InterruptedException: sleep interrupted 退出thread1's run after join(), thread0' state: TERMINATED 退出
ThreadLocal
- 说明:
jdk1.2开始,为解决多线程程序的并发问题提供了一种新的思路ThreadLocal。使用这个工具类可以很简洁地编写出优美的多线程
程序,ThreadLocal并不是一个Thread,而是Thread的局部变量。 -
源码:
public class ThreadLocal<T> { private final int threadLocalHashCode = nextHashCode(); private static AtomicInteger nextHashCode = new AtomicInteger(); private static final int HASH_INCREMENT = 0x61c88647; private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); } protected T initialValue() { return null; } public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) { return new SuppliedThreadLocal<>(supplier); } public ThreadLocal() { } public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } boolean isPresent() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); return map != null && map.getEntry(this) != null; } private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { map.set(this, value); } else { createMap(t, value); } if (this instanceof TerminatingThreadLocal) { TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this); } return value; } //设置线程本地值,如果已经存在覆盖,否则为当前线程创建新的ThreadLocalMap,赋值给当前线程的threadLocals局部变量 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { map.set(this, value); } else { createMap(t, value); } } //删除本地值,不调用此方法在线程销毁后jvm也会回收,调用此方法后,如果多次访问get()方法可能导致多次触发initialValue() public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) { m.remove(this); } } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap); } T childValue(T parentValue) { throw new UnsupportedOperationException(); } static final class SuppliedThreadLocal<T> extends ThreadLocal<T> { private final Supplier<? extends T> supplier; SuppliedThreadLocal(Supplier<? extends T> supplier) { this.supplier = Objects.requireNonNull(supplier); } @Override protected T initialValue() { return supplier.get(); } } static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } private static final int INITIAL_CAPACITY = 16; private Entry[] table; private int size = 0; private int threshold; // Default to 0 private void setThreshold(int len) { threshold = len * 2 / 3; } private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); } ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (Entry e : parentTable) { if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; } private void set(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get(); if (k == key) { e.value = value; return; } if (k == null) { replaceStaleEntry(key, value, i); return; } } tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); } private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } } private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e; // Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i; // Find either the key or trailing null slot of run, whichever // occurs first for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); // If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if (k == key) { e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; } // If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; } // If key not found, put new entry in stale slot tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value); // If there are any other stale entries in run, expunge them if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); } /** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length; // expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--; // Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; } private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; } private void rehash() { expungeStaleEntries(); // Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0; for (Entry e : oldTab) { if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } } setThreshold(newLen); size = count; table = newTab; } private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } } } }