一、引言 1.随便说说 ConcurrentHashMap在jdk1.8之前,即1.7的时候,采用的是分段锁+数组+链表的技术
但是这样会存在一些问题,如下:
1.分段锁存在性能瓶颈,不能支持很高的并发量
2.链表解决冲突,在极端情况下会导致链表过长查询性能下降
因此,本文主要介绍jdk1.8实现的ConcurrentHashMap源码
此外,像哈希表中的get,set,size等方法过于简单,此处不会进行分析
仅对put,扩容,计算元素数量做详细介绍
2.简单比较 相比1.7版本的ConcurrentHashMap,1.8版本的ConcurrentHashMap进行了一次很好的优化,主要在以下方面:
1.分段锁 -> CAS
不再采用分段锁技术,插入时仅锁某一个槽,并且使用CAS进行元素操作,极大的提升了并发性能
2.数组+链表 -> 数组+链表+红黑树
引入红黑树解决哈希冲突,提升了插入和查询效率
二、参数解析 1.容量相关参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 private static final int MAXIMUM_CAPACITY = 1 << 30 ;private static final int DEFAULT_CAPACITY = 16 ;static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8 ;private static final int DEFAULT_CONCURRENCY_LEVEL = 16 ;private static final float LOAD_FACTOR = 0.75f ;static final int TREEIFY_THRESHOLD = 8 ;static final int UNTREEIFY_THRESHOLD = 6 ;static final int MIN_TREEIFY_CAPACITY = 64 ;
2.扩容相关参数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 private static int RESIZE_STAMP_BITS = 16 ;private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 ;private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
3.状态相关参数 1 2 3 4 static final int MOVED = -1 ; static final int TREEBIN = -2 ; static final int RESERVED = -3 ; static final int HASH_BITS = 0x7fffffff ;
4.Node结构 1 2 3 4 5 6 7 8 static class Node <K,V> implements Map .Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; ...... ...... }
保存在槽与链表中的节点数据结构
5.工具方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final int spread (int h) { return (h ^ (h >>> 16 )) & HASH_BITS; } private static final int tableSizeFor (int c) { int n = c - 1 ; n |= n >>> 1 ; n |= n >>> 2 ; n |= n >>> 4 ; n |= n >>> 8 ; n |= n >>> 16 ; return (n < 0 ) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1 ; } @SuppressWarnings({"rawtypes","unchecked"}) static int compareComparables (Class<?> kc, Object k, Object x) { return (x == null || x.getClass() != kc ? 0 : ((Comparable)k).compareTo(x)); }
6.元素操作方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); } static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long )i << ASHIFT) + ABASE, c, v); } static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long )i << ASHIFT) + ABASE, v); }
7.全局变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;private transient volatile long baseCount;private transient volatile int sizeCtl;private transient volatile int transferIndex;private transient volatile int cellsBusy;private transient volatile CounterCell[] counterCells;
三、put操作 1.源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public V put (K key, V value) { return putVal(key, value, false ); } final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node <K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
2.小结 通过上述流程,可以总结出来put流程大体如下
1.判空
2.启用一个死循环,直到binCount != 0为止,即插入成功
3.是否初始化,未初始化先调用初始化方法
4.判断槽是否空,空的直接放进去
5.槽不空,判断是否在扩容中,扩容中就加入扩容队伍
6.槽不空,未在扩容,那就锁住这个槽执行插入
7.判断槽中是链表还是树,执行向下走的流程
8.key是否存在?存在就覆盖
9.key不存在,那就插入,链表是尾插
10.如果key存在采用的覆盖,就返回旧值
11.元素统计加上
四、初始化 1.源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield (); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
2.小结 1.启动一个循环进行初始化
2.使用sizeCtl控制初始化状态
3.如果sizeCtl为负值,就是在初始化/扩容中,当前线程临时歇着就行
4.sizeCtl大于等于0,尝试将sizeCtl设置为-1,表示初始化中
5.再判断一次tab是否为空,防止其他线程已经初始化
6.初始化,将sizeCtl设置为扩容阈值
三、扩容流程 1.源码 helpTransfer线程入场,加入扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
resizeStamp获得扩容戳
n 是当前哈希表的容量,RESIZE_STAMP_BITS 是一个常量,表示扩容戳的长度。该方法首先通过 Integer.numberOfLeadingZeros(n) 方法计算出哈希表容量的前导 0 的个数,然后将该值左移 RESIZE_STAMP_BITS - 1 位,最后通过按位或操作将扩容戳的高位设置为 1。这样就可以得到一个长度为 RESIZE_STAMP_BITS 的扩容戳,其中高位为 1,低位为当前哈希表的容量的前导 0 的个数
1 2 3 static final int resizeStamp (int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1 )); }
扩容的灵魂—transfer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode <K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node <K,V>(ph, pk, pv, ln); else hn = new Node <K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode <K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin <K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin <K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
2.小结 2.1首先解决一些疑问 1)线程可以辅助扩容,有数量上限吗?扔进helpTransfer就会进行辅助扩容吗?
答:在helpTransfer中,会判断当前线程是否能够加入扩容操作
这里会有一系列判断
判断当前是否处于扩容状态中
判断当前是否还需要辅助扩容
判断当前参与扩容的线程数量是否达到了上限等
全部校验通过,才会调用transfer方法进行实际上的扩容操作
2)多个线程同时扩容,如何保证扩容过程中,数据迁移不重复,不迁移漏的?
答:
在transfer方法中,线程进入该方法后会通过计算得出一个transferIndex值,这个值指向自己负责扩容的数组数据结束位置,从后往前进行迁移;
而这个线程也只负责一段连续区域内的数据迁移任务而已,它只会执行自己负责区域内的数据迁移任务,迁移完成之后把transferIndex设置成一个负数,表示迁移完成;
如果没有其他线程进入,这个线程会再获得一个迁移范围,从而进行迁移,直到全部迁移完毕;
即使用分块的模式,让不同的线程负责不同的块,从而避免了扩容区间重叠,遗漏等问题
3)在扩容时,有其他线程执行插入/删除/修改/查询等操作怎么办?
答:在原数组中,迁移过程中,槽都会被锁住,其他线程无法操作,只能阻塞在那里,等待释放锁后再操作
而当一个线程迁移完一段区域内的数据之后,迁移完成的槽,首位置节点都会设置成fwd,这个节点的hash值被设置成了MOVED,即-1
因此如果有put进来,根据前面分析,会直接把线程扔给辅助扩容数组,无法put成功,只能等待所有数据都迁移完毕
而如果调用了get方法呢?由于此时hash值小于0,会进入节点的find方法内部
此时,由于节点类型为fwd,所以进入的find方法会进入新建立的nextTable中找数据
因此扩容中,槽上锁,get和put操作都不行
但是某一个槽迁移完毕后,可以get,不可以put
4)如何判断是在扩容状态还是扩容完毕状态的?
由上面可知,迁移部分时,迁移完毕的槽会放进去一个fwd节点
全部迁移完毕后就是一个新的数组了,和之前操作没区别,不用判断状态
2.2总结下迁移流程
进入helpTransfer方法,判断当前线程是否能够进入搬迁大军当中
符合条件的线程,会调用transfer方法进行扩容
扩容时,会为每个线程分配一块儿区域,这个区域就是线程要进行迁移的范围
然后就是加锁搬迁
迁移完毕之后,原数组槽位置放进去一个fwd
四、元素数量统计 1.源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { CounterCell r = new CounterCell (x); if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; else if (counterCells != as || n >= NCPU) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { CounterCell[] rs = new CounterCell [n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell [2 ]; rs[h & 1 ] = new CounterCell (x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
2.小结 在jdk1.8中,由于存在多线程操作的情况,因此不再使用一个单独的变量来计算map中的元素数量,而是使用了一个数组CounterCell来进行统计
当put进一个元素后,会调用addCount方法来进行元素数量的增加
首先会尝试对baseCount进行自增,当新增失败后表示竞争激烈
此时会使用CounterCell来进行元素统计,进入fullAddCount
如果CounterCell未初始化,则进行初始化为2,并把新增元素数量x随机放到一个位置
如果此时还无法放进去,即没有空闲的CounterCell,那么就会新建一个CounterCell,容量是2倍
然后将旧的都迁移过去,再尝试把x放进去
大致流程如上
总结 ConcurrentHashMap使用了大量的CAS锁来保证并发安全,主要是扩容与元素统计略显复杂,putVal的主流程还是十分清晰的