一、引言

1.随便说说

ConcurrentHashMap在jdk1.8之前,即1.7的时候,采用的是分段锁+数组+链表的技术

但是这样会存在一些问题,如下:

1.分段锁存在性能瓶颈,不能支持很高的并发量

2.链表解决冲突,在极端情况下会导致链表过长查询性能下降

因此,本文主要介绍jdk1.8实现的ConcurrentHashMap源码

此外,像哈希表中的get,set,size等方法过于简单,此处不会进行分析

仅对put,扩容,计算元素数量做详细介绍

2.简单比较

相比1.7版本的ConcurrentHashMap,1.8版本的ConcurrentHashMap进行了一次很好的优化,主要在以下方面:

1.分段锁 -> CAS

不再采用分段锁技术,插入时仅锁某一个槽,并且使用CAS进行元素操作,极大的提升了并发性能

2.数组+链表 -> 数组+链表+红黑树

引入红黑树解决哈希冲突,提升了插入和查询效率

二、参数解析

1.容量相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 容器最大容量
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* 默认初始化容量
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* 数组最大长度
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* 默认并发级别
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 负载因子,用于计算扩容阈值
*/
private static final float LOAD_FACTOR = 0.75f;

/**
* 链表树化阈值
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* 树退化为链表阈值
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* 链表树化时,数组最小长度阈值
*/
static final int MIN_TREEIFY_CAPACITY = 64;

2.扩容相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 表示resizeStamp中记录扩容状态和版本号信息所占用的位数
**/
private static int RESIZE_STAMP_BITS = 16;

/**
* 能帮助扩容的最大线程数量
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* 配合resizeStamp记录扩容状态和版本号
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

3.状态相关参数

1
2
3
4
static final int MOVED     = -1; // 表示该槽位元素已迁移
static final int TREEBIN = -2; // 表示该槽位元素已树化
static final int RESERVED = -3; // 槽位已占有,但未分配
static final int HASH_BITS = 0x7fffffff; // 用于计算元素哈希槽位置

4.Node结构

1
2
3
4
5
6
7
8
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
......
......
}

保存在槽与链表中的节点数据结构

5.工具方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 转换哈希码,使其尽量均匀
static final int spread(int h) {
// 哈希码右移16位与自己异或散列后,再和HASH_BITS与运算再次散列
return (h ^ (h >>> 16)) & HASH_BITS;
}

// 主要作用是根据给定的初始容量,计算出一个大于等于该初始容量的2的整数次幂值
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

// 比较两个元素大小关系
@SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable
static int compareComparables(Class<?> kc, Object k, Object x) {
return (x == null || x.getClass() != kc ? 0 :
((Comparable)k).compareTo(x));
}

6.元素操作方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 获取数组对应位置头元素
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// casTabAt方法的作用是将数组tab中下标为i的节点的值从c更新为v
// 但只有在节点值为c时才会更新成功
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// setTabAt方法的作用是将数组tab中下标为i的节点的值直接设置为v
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

7.全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 数组,即数组+链表/红黑树中的数组
transient volatile Node<K,V>[] table;

// 扩容时的新数组
private transient volatile Node<K,V>[] nextTable;

// 使用cas修改指,统计元素数量
private transient volatile long baseCount;

// 状态机,用于标识当前哈希表状态---十分重要
// sizeCtl == -N(N > 1)表示正在扩容,已经有N个线程正在参与扩容操作
// sizeCtl == -1 表示正在初始化
// sizeCtl == 0 表示当前哈希表没有被初始化,初始化后为 初始容量 * 0.75
// sizeCtl > 0 由上面可知,表示扩容阈值
private transient volatile int sizeCtl;

// 指定了当前线程在进行扩容操作时,需要转移元素的下标位置
// 由spread方法和当前线程的线程ID计算得出的
private transient volatile int transferIndex;

// 用于标识当前是否有线程正在对ConcurrentHashMap进行扩容操作
private transient volatile int cellsBusy;

// 用该数组记录容器元素数,减少冲突
private transient volatile CounterCell[] counterCells;

三、put操作

1.源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// 入口方法,实际上是调用putVal实现的put操作
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// ConcurrentHashMap不支持key和value为null的情况,防止出现二义性
// 比如某个key本来不存在,get后是null
// 但是使用containsKey判断时,另一个线程插入了key为null,就出现二义性
if (key == null || value == null) throw new NullPointerException();
// 获取散列后的hash值
int hash = spread(key.hashCode());
// 统计槽内元素数量,判断要不要树化
int binCount = 0;
// 死循环,不搞进去不结束
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 发现还未初始化,进行数组初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 已初始化,对应槽是空的,直接放进去
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 已初始化,对应槽非空,但是正在进行扩容,于是线程也加入扩容大军
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 已初始化,对应槽非空,也没在扩容,那就进去扩容
else {
V oldVal = null;
// 锁住这个槽,插入后再释放
synchronized (f) {
// 确认下f没被其他线程干掉,有点儿双重校验锁的味道~
if (tabAt(tab, i) == f) {
// 节点哈希值为负数就不能用链表了,非负才走链表逻辑
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key相等,覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 走到尾部了,尾插放进去
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 走树的插入逻辑
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 有插入成功元素
if (binCount != 0) {
// 达到树化的链表阈值
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 统计哈希表元素数量,并且判断是否扩容
addCount(1L, binCount);
return null;
}

2.小结

通过上述流程,可以总结出来put流程大体如下

1.判空

2.启用一个死循环,直到binCount != 0为止,即插入成功

3.是否初始化,未初始化先调用初始化方法

4.判断槽是否空,空的直接放进去

5.槽不空,判断是否在扩容中,扩容中就加入扩容队伍

6.槽不空,未在扩容,那就锁住这个槽执行插入

7.判断槽中是链表还是树,执行向下走的流程

8.key是否存在?存在就覆盖

9.key不存在,那就插入,链表是尾插

10.如果key存在采用的覆盖,就返回旧值

11.元素统计加上

四、初始化

1.源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 未被初始化
while ((tab = table) == null || tab.length == 0) {
// 已有其他线程扩容中,先尝试放弃cpu给其他线程用
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 尝试将SIZECTL的值从sc修改为-1,一般是0到-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 再判断一次,tab还没被其他线程初始化,就进行初始化
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
// 初始化后,sizeCtl设置为扩容阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}

2.小结

1.启动一个循环进行初始化

2.使用sizeCtl控制初始化状态

3.如果sizeCtl为负值,就是在初始化/扩容中,当前线程临时歇着就行

4.sizeCtl大于等于0,尝试将sizeCtl设置为-1,表示初始化中

5.再判断一次tab是否为空,防止其他线程已经初始化

6.初始化,将sizeCtl设置为扩容阈值

三、扩容流程

1.源码

helpTransfer线程入场,加入扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// tab != null表示旧数组不为null,还在扩容中
// ForwardingNode类型表示当前节点正在进行扩容操作
// nextTab = ((ForwardingNode<K,V>)f).nextTable取出新数组,且不为空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 扩容戳,高16位表示扩容次数,低16位表示线程数
int rs = resizeStamp(tab.length);
// 新旧数组都对应的上,sizeCtl小于0表示在扩容中
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 判断是否还需要帮助扩容
// (sc >>> RESIZE_STAMP_SHIFT) != rs扩容戳变化
// 表示当前有其他线程正在进行扩容操作,当前线程无法参与扩容操作
// 表示扩容操作已经有足够的线程参与,当前线程无需参与扩容操作
// 表示当前线程已经没有数据需要移动
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 使用cas设置sc的值,表示参入扩容线程数+1,注意sc在上一个if中被修改了
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

resizeStamp获得扩容戳

n 是当前哈希表的容量,RESIZE_STAMP_BITS 是一个常量,表示扩容戳的长度。该方法首先通过 Integer.numberOfLeadingZeros(n) 方法计算出哈希表容量的前导 0 的个数,然后将该值左移 RESIZE_STAMP_BITS - 1 位,最后通过按位或操作将扩容戳的高位设置为 1。这样就可以得到一个长度为 RESIZE_STAMP_BITS 的扩容戳,其中高位为 1,低位为当前哈希表的容量的前导 0 的个数

1
2
3
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

扩容的灵魂—transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算每个线程处理的数据的区间大小,最小是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 新数组为空,还未创建,进行初始化,在原来的基础上扩大两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 用来表示已经迁移完的状态,也就是说
// 如果某个old数组的节点完成了迁移,则需要更改成fwd
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 计算负责迁移的区间范围,从后往前数
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 假设数组长度是32
// 第一次 [16(nextBound),31(i)]
// 第二次 [0,15]
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 扩容是否结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 得到数组下标为31的位置的值
else if ((f = tabAt(tab, i)) == null) // 说明当前数组位置为空
// 直接改成fwd -> 表示迁移完成
advance = casTabAt(tab, i, null, fwd);
// 判断是否已经被处理过了,如果是,则进入下一次区间遍历
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 加锁->针对当前要去迁移的节点
// 保证迁移过程中,其他线程调用put()方法时,必须要等待
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 链表与树分开处理
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 高低位分开迁移
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 高低位分开迁移
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

2.小结

2.1首先解决一些疑问

1)线程可以辅助扩容,有数量上限吗?扔进helpTransfer就会进行辅助扩容吗?

答:在helpTransfer中,会判断当前线程是否能够加入扩容操作

这里会有一系列判断

判断当前是否处于扩容状态中

判断当前是否还需要辅助扩容

判断当前参与扩容的线程数量是否达到了上限等

全部校验通过,才会调用transfer方法进行实际上的扩容操作

2)多个线程同时扩容,如何保证扩容过程中,数据迁移不重复,不迁移漏的?

答:

在transfer方法中,线程进入该方法后会通过计算得出一个transferIndex值,这个值指向自己负责扩容的数组数据结束位置,从后往前进行迁移;

而这个线程也只负责一段连续区域内的数据迁移任务而已,它只会执行自己负责区域内的数据迁移任务,迁移完成之后把transferIndex设置成一个负数,表示迁移完成;

如果没有其他线程进入,这个线程会再获得一个迁移范围,从而进行迁移,直到全部迁移完毕;

即使用分块的模式,让不同的线程负责不同的块,从而避免了扩容区间重叠,遗漏等问题

3)在扩容时,有其他线程执行插入/删除/修改/查询等操作怎么办?

答:在原数组中,迁移过程中,槽都会被锁住,其他线程无法操作,只能阻塞在那里,等待释放锁后再操作

而当一个线程迁移完一段区域内的数据之后,迁移完成的槽,首位置节点都会设置成fwd,这个节点的hash值被设置成了MOVED,即-1

因此如果有put进来,根据前面分析,会直接把线程扔给辅助扩容数组,无法put成功,只能等待所有数据都迁移完毕

而如果调用了get方法呢?由于此时hash值小于0,会进入节点的find方法内部

此时,由于节点类型为fwd,所以进入的find方法会进入新建立的nextTable中找数据

因此扩容中,槽上锁,get和put操作都不行

但是某一个槽迁移完毕后,可以get,不可以put

4)如何判断是在扩容状态还是扩容完毕状态的?

由上面可知,迁移部分时,迁移完毕的槽会放进去一个fwd节点

全部迁移完毕后就是一个新的数组了,和之前操作没区别,不用判断状态

2.2总结下迁移流程
  • 进入helpTransfer方法,判断当前线程是否能够进入搬迁大军当中
  • 符合条件的线程,会调用transfer方法进行扩容
  • 扩容时,会为每个线程分配一块儿区域,这个区域就是线程要进行迁移的范围
  • 然后就是加锁搬迁
  • 迁移完毕之后,原数组槽位置放进去一个fwd

四、元素数量统计

1.源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 统计元素个数
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 会完成CounterCell的初始化以及元素的累加
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 是否要做扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// helpTransfer -> 第一次扩容的场景
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
// 针对已经初始化的数组的某个位置
// 去添加一个CounterCell
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
else if (!collide)
collide = true;
// 扩容部分
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
// 一倍扩容
CounterCell[] rs = new CounterCell[n << 1];
// 遍历加入新数组
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
}
// 如果CounterCell为空, 保证在初始化过程的线程安全性
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//一旦cas成功,说明当前线程抢到了锁
boolean init = false;
try { // Initialize table
if (counterCells == as) {
// 初始化长度为2的数组
CounterCell[] rs = new CounterCell[2];
// 把x保存到某个位置
rs[h & 1] = new CounterCell(x);
// 复制给成员变量counterCells
counterCells = rs;
init = true;
}
} finally {
// 释放锁
cellsBusy = 0;
}
if (init)
break;
}
// 最终的情况,直接修改baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

2.小结

在jdk1.8中,由于存在多线程操作的情况,因此不再使用一个单独的变量来计算map中的元素数量,而是使用了一个数组CounterCell来进行统计

当put进一个元素后,会调用addCount方法来进行元素数量的增加

  • 首先会尝试对baseCount进行自增,当新增失败后表示竞争激烈
  • 此时会使用CounterCell来进行元素统计,进入fullAddCount
  • 如果CounterCell未初始化,则进行初始化为2,并把新增元素数量x随机放到一个位置
  • 如果此时还无法放进去,即没有空闲的CounterCell,那么就会新建一个CounterCell,容量是2倍
  • 然后将旧的都迁移过去,再尝试把x放进去

大致流程如上

总结

ConcurrentHashMap使用了大量的CAS锁来保证并发安全,主要是扩容与元素统计略显复杂,putVal的主流程还是十分清晰的