ConcurrentHashMap
数据结构
ConcurrentHashMap的数据结构与HashMap基本类似, 区别在于:
1、内部在数据写入时加了同步机制(分段锁)保证线程安全,读操作是无锁操作;
2、扩容时老数据的转移是并发执行的,这样扩容的效率更高
ConcurrentHashMap 线程安全的具体实现方式
JDK 1.7
ConcurrentHashMap基于ReentrantLock实现分段锁
将数据分为一段一段的存储,然后给每一段数据配一把锁,底层数据结构:Segment 数组 + HashEntry 数组 ,Segment 实现了 ReentrantLock,所以 Segment 是一种可重入锁,扮演锁的角色
源码分析:
构造函数:
/** * The default initial capacity for this table, * used when not otherwise specified in a constructor. */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * The default load factor for this table, used when not * otherwise specified in a constructor. */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * The default concurrency level for this table, used when not * otherwise specified in a constructor. */ static final int DEFAULT_CONCURRENCY_LEVEL = 16;//并发级别,segment数组的大小 //segment的可以包含数据最大的个数 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //容量最大值 static final int MAXIMUM_CAPACITY = 1 << 30; //segment里面最小的容量为2 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; 注意: DEFAULT_INITIAL_CAPACITY与DEFAULT_CONCURRENCY_LEVEL结合可以计算出每个segment里有多少个数。个数大致计算等于DEFAULT_INITIAL_CAPACITY/DEFAULT_CONCURRENCY_LEVEL public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new, empty map with the specified initial * capacity, load factor and concurrency level. */ @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //检验参数是否合法 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0;//记录ssize右移的位数,也就是segment[]容量是2的几次幂 //表示segment数组的大小 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1;//可以保证segment数组的大小是2幂次方的倍数 } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; //判断 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1;//<< 后将值赋值给cap // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);//cap * loadFactor阈值用于向segment数组进行判断。保证segment长度不会超出 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize] //将新生成的s0对象放到segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
注意:segment的扩容只与segment有关,当一个元素放入segment中时回去判断当前segment对象内部的阈值是多少,是否超过阈值。如果超过就进行扩容,扩容是扩容segment数组。扩容先创建一个扩容后的数组再将数据移入新数组再跟换segment对象。segment数组的长度是不变的。
put方法:
public V put(K key, V value) { Segment<K,V> s; //value不可以为空 if (value == null) throw new NullPointerException(); //取hash值 int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment 取第j个位置的元素 s = ensureSegment(j); return s.put(key, hash, value, false); } static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; /** * The per-segment table. */ transient volatile HashEntry<K,V>[] table; /** * The number of elements. */ transient int count; /** * The total number of mutative operations in this segment. */ transient int modCount; /** * The table is rehashed when its size exceeds(超出) this threshold. */ transient int threshold; /** * The load factor(因子) for the hash table. Even though this value * is same for all segments */ final float loadFactor; /** * The maximum number of times to tryLock in a prescan before * possibly blocking on acquire in preparation for a locked * segment operation. On multiprocessors, using a bounded * number of retries maintains cache acquired while locating * nodes. */ static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; 说明:Runtime.getRuntime().availableProcessors() 返回Java虚拟机可用的处理器数量。 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } //这个放法是写在segment内置对象里面的 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //尝试加锁(segment继承了ReentrantLock),如果获取不到锁调用scanAndLockForPut来获取锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table;//取得segment数组内部的table int index = (tab.length - 1) & hash;//计算数组的下标 HashEntry<K,V> first = entryAt(tab, index); //遍历当前位置的链表 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; //查看key是否相等 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { //记录value值 oldValue = e.value; // 如果标记不存在的时候,才进行插入操作 if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else //创建一个新的hashEntry对象 node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;//count代表segment数组当前位置存储的元素个数 //检查是否满足扩容的条件 if (c > threshold && tab.length < MAXIMUM_CAPACITY) //将表的大小加倍并重新打包条目,同时将给定的节点添加到新表中 rehash(node);//扩容 else //把数组放到segment数组内部的table的第i个位置 setEntryAt(tab, index, node);//保证是内存里的值,而不是线程里面的值 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } /** * Scans for a node containing given key while trying to * acquire lock, creating and returning one if not found。 */ //当put方法尝试获取锁失败后,调用该方法获取锁 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //获取hash对应 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry<K,V> f; // to recheck first below //根据实际情况来new hashEntry对象 if (retries < 0) { //first为空或者遍历到了尾节点k if (e == null) { if (node == null) // speculatively create node //创建hashEntry对象 node = new HashEntry<K,V>(hash, key, value, null); // 将retries设置为0 , 下次循环时就不进入这个分支(if (retries < 0)) retries = 0; } // 遍历到的当前节点中存在相等的key else if (key.equals(e.key)) // 将retries设置为0 , 下次循环时就不进入这个分支(if (retries < 0)) retries = 0; //first不为空并且key不相等,继续遍历下一个节点 else e = e.next; } // 如果retries > 64(处理器数量大于1)或1(处理器数量小于1) else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //(retries & 1) == 0 retries为偶数的时候成立 else if ((retries & 1) == 0 && //判断节点是否发生了变化 (f = entryForHash(this, hash)) != first) { //如果条目改变,重新遍历 e = first = f; // re-traverse if entry changed //重新赋值为-1 retries = -1; } } return node; } } /** * Gets the table entry for the given segment and hash */ @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) { HashEntry<K,V>[] tab; return (seg == null || (tab = seg.table) == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); } //使用volatile读语义获取给定表的第i个元素(如果非null)。 @SuppressWarnings("unchecked") static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { return (tab == null) ? null : (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
注意:
- put方法将键/值对添加到 hashMap 中
- tryLock():先判断当前的这把锁能不能获取到,如果能获取就获取锁并立马返回true,如果不能获取到立即返回false.(不阻塞)
- Lock():如果能获取到立即获取到,如果不能获取就阻塞知道获取到锁。
- 当调用tryLocak方法尝试获取锁失败调用scanAndLockForPut获取锁的原因:主要原因是锁的选择,我们可以看到scanAndLockForPut方法中获取锁的方式是while (!tryLock()) { //dosomething},用这种方式相较于lock()获取锁,用lock()获取锁会阻塞,但是更不占用cpu。用while (!tryLock()) { //dosomething}获取锁不阻塞,并且更占cpu但是再获取锁的可以做其他的事情。可以做的事情是,new HashEntry()对象,之后可以切换成lock
putIfAbsent方法:
@SuppressWarnings("unchecked") public V putIfAbsent(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) s = ensureSegment(j); return s.put(key, hash, value, true); }
注意:如果 hashMap 中不存在指定的键,则将指定的键/值对插入到 hashMap 中。
ensureSegment方法
@SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { //获取segment对象数组 final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //看segment数组的第u个位置是不是空,不为空说明被另外一个线程生成出来了,那么直接返回就行 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length;//ss[0]数组的长度 float lf = proto.loadFactor; //table进行扩容的一个阈值,定义map中有多少元素时,map快满了。map size大于这个阈值,则有可能会对table进行扩容 int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次进行检查,当前位置是否为空 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);//创建对象 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //尝试将生成的对象放到数组的第u个位置,如果成功就返回 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
注意:当有多个线程需要向同一个位置生成一个segment对象,当有前面的对象生成了segment对象时,之后的线程只需要使用前面生成的对象就可。
rehash(扩容)方法
/** * The table is rehashed when its size exceeds this threshold. * (The value of this field is always <tt>(int)(capacity * * loadFactor)</tt>.) */ transient int threshold;//阈值 /** * The load factor for the hash table. Even though this value * is same for all segments, it is replicated to avoid needing * links to outer object. * @serial */ final float loadFactor;//加载因子 /** * Doubles size of table and repacks entries, also adding the * given node to new table */ @SuppressWarnings("unchecked") private void rehash(HashEntry<K,V> node) { /* * Reclassify nodes in each list to new table. Because we * are using power-of-two expansion, the elements from * each bin must either stay at same index, or move with a * power of two offset. We eliminate unnecessary node * creation by catching cases where old nodes can be * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ HashEntry<K,V>[] oldTable = table;// int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1;//翻倍 threshold = (int)(newCapacity * loadFactor);//计算阈值 //创建一个新的newTable HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; //为了取余 ( hash & (sizeMask-1) ) int sizeMask = newCapacity - 1; //遍历旧数组 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; //当前元素不为空 if (e != null) { HashEntry<K,V> next = e.next; //当前元素的hash值(新数组的位置) int idx = e.hash & sizeMask; if (next == null) // Single node on list //将当前节点当前节点放到新数组 newTable[idx] = e; else { // Reuse consecutive sequence at same slot //当前元素 HashEntry<K,V> lastRun = e; int lastIdx = idx; //遍历当前的列表 //最后达到的效果:列表最后连续的需要移动到idx位置的一个或多个元素 放到了新数组的对应位置 for (HashEntry<K,V> last = next; last != null; last = last.next) { //计算元素的新的位置 int k = last.hash & sizeMask; // k if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 直接将列表最后连续的需要移动到idx位置的一个或多个元素的头部元素地址复制给对应位置的table newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; //记录新table中对应下标的元素 HashEntry<K,V> n = newTable[k]; //头插法,插入数据 newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } //构造函数 HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; }
get方法
/** * Returns the value to which the specified key is mapped, * or {@code null} if this map contains no mapping for the key. */ public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; //根据key来计算hash int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
remove方法
/** * Removes the key (and its corresponding value) from this map. * This method does nothing if the key is not in the map. */ //移除成功返回移除的旧值 public V remove(Object key) { int hash = hash(key); //Get the segment for the given hash Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } /** * Remove; match on key only if value null, else match both. */ final V remove(Object key, int hash, Object value) { //尝试加锁 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; //Gets the ith element of given table (if nonnull) with volatile read semantics. HashEntry<K,V> e = entryAt(tab, index); HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; } /** * Scans for a node containing the given key while trying to * acquire lock for a remove or replace operation. Upon * return, guarantees that lock is held. Note that we must * lock even if the key is not found, to ensure sequential * consistency of updates. */ private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } }