摘要: 原创出处 blog.csdn.net/qq_41737716/article/details/90549847 「Static_lin」欢迎转载,保留摘要,谢谢!
前言
阅读此篇文章,你需要有以下知识基础
Java内存模型,可见性问题
CAS
HashMap底层原理
我们知道,在日常开发中使用的HashMap是线程不安全的,而线程安全类HashTable只是简单的在方法上加锁实现线程安全,效率低下,所以在线程安全的环境下我们通常会使用ConcurrentHashMap,但是又为何需要学习ConcurrentHashMap?用不就完事了?我认为学习其源码有两个好处:
更灵活的运用ConcurrentHashMap
欣赏并发编程大师Doug Lea的作品,源码中有很多值得我们学习的并发思想,要意识到,线程安全不仅仅只是加锁
我抛出以下问题:
接下来,带着问题来继续看下去,欣赏并发大师精妙绝伦的并发艺术作品(以下讨论基于JDK1.8)
相关概念
Amdahl定律
此节定律描述均来自《Java并发编程实战》一书
假设F是必须被串行执行的部分,N代表处理器数量,Speedup代表加速比,可以简单理解为CPU使用率
此公式告诉我们,当N趋近无限大,加速比最大趋近于1/F,假设我们的程序有50%的部分需要串行执行,就算处理器数量无限多,最高的加速比只能是2(20%的使用率),如果程序中仅有10%的部分需要串行执行,最高的加速比可以达到9.2(92%的使用率),但我们的程序或多或少都一定会有串行执行的部分,所以F不可能为0,所以,就算有无限多的CPU,加速比也不可能达到10(100%的使用率),下面给一张图来表示串行执行部分占比不同对利用率的影响:
由此我们可以看出,程序中的可伸缩性(提升外部资源即可提升并发性能的比率)是由程序中串行执行部分所影响的,而常见的串行执行有锁竞争(上下文切换消耗、等待、串行)等等,这给了我们一个启发,可以通过减少锁竞争来优化并发性能,而ConcurrentHashMap则使用了锁分段(减小锁范围)、CAS(乐观锁,减小上下文切换开销,无阻塞)等等技术,下面来具体看看吧
初始化数据结构时的线程安全
HashMap的底层数据结构这里简单带过一下,不做过多赘述:
大致是以一个Node对象数组来存放数据,Hash冲突时会形成Node链表,在链表长度超过8,Node数组超过64时会将链表结构转换为红黑树,Node对象:
static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; volatile V val; volatile Node<K,V> next; ... }
值得注意的是,value和next指针使用了volatile来保证其可见性
在JDK1.8中,初始化ConcurrentHashMap的时候这个Node[]
数组是还未初始化的,会等到第一次put方法调用时才初始化:
final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); ... }
此时是会有并发问题的,如果多个线程同时调用initTable初始化Node数组怎么办?看看大师是如何处理的:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings ("unchecked" ) Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
table变量使用了volatile来保证每次获取到的都是最新写入的值
transient volatile Node<K,V>[] table;
总结
就算有多个线程同时进行put操作,在初始化数组时使用了乐观锁CAS操作来决定到底是哪个线程有资格进行初始化,其他线程均只能等待。
用到的并发技巧:
volatile变量(sizeCtl):它是一个标记位,用来告诉其他线程这个坑位有没有人在,其线程间的可见性由volatile保证。
CAS操作:CAS操作保证了设置sizeCtl标记位的原子性,保证了只有一个线程能设置成功
put操作的线程安全
直接看代码:
final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } ... }
值得关注的是tabAt(tab, i)方法,其使用Unsafe类volatile的操作volatile式地查看值,保证每次获取到的值都是最新的:
static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); }
虽然上面的table变量加了volatile,但也只能保证其引用的可见性,并不能确保其数组中的对象是否是最新的,所以需要Unsafe类volatile式地拿到最新的Node
总结
由于其减小了锁的粒度,若Hash完美不冲突的情况下,可同时支持n个线程同时put操作,n为Node数组大小,在默认大小16下,可以支持最大同时16个线程无竞争同时操作且线程安全。当hash冲突严重时,Node链表越来越长,将导致严重的锁竞争,此时会进行扩容,将Node进行再散列,下面会介绍扩容的线程安全性。总结一下用到的并发技巧:
减小锁粒度:将Node链表的头节点作为锁,若在默认大小16情况下,将有16把锁,大大减小了锁竞争(上下文切换),就像开头所说,将串行的部分最大化缩小,在理想情况下线程的put操作都为并行操作。同时直接锁住头节点,保证了线程安全
Unsafe的getObjectVolatile方法:此方法确保获取到的值为最新
扩容操作的线程安全
在扩容时,ConcurrentHashMap支持多线程并发扩容,在扩容过程中同时支持get查数据,若有线程put数据,还会帮助一起扩容,这种无阻塞算法,将并行最大化的设计,堪称一绝。
先来看看扩容代码实现:
private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings ("unchecked" ) Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } .... } } } } }
这里说一下迁移时为什么要分一个ln(低位Node)、hn(高位Node),首先说一个现象:
我们知道,在put值的时候,首先会计算hash值,再散列到指定的Node数组下标中:
int hash = spread(key.hashCode());(f = tabAt(tab, i = (n - 1 ) & hash);
其中n为Node数组长度,这里假设为16。
假设有一个key进来,它的散列之后的hash=9,那么它的下标值是多少呢?
(16 - 1)和 9 进行与运算 -> 0000 1111 和 0000 1001 结果还是 0000 1001 = 9
假设Node数组需要扩容,我们知道,扩容是将数组长度增加两倍,也就是32,那么下标值会是多少呢?
(32 - 1)和 9 进行与运算 -> 0001 1111 和 0000 1001 结果还是9
此时,我们把散列之后的hash换成20,那么会有怎样的变化呢?
(16 - 1)和 20 进行与运算 -> 0000 1111 和 0001 0100 结果是 0000 0100 = 4
(32 - 1)和 20 进行与运算 -> 0001 1111 和 0001 0100 结果是 0001 0100 = 20
此时细心的读者应该可以发现,如果hash在高X位为1,(X为数组长度的二进制-1的最高位),则扩容时是需要变换在Node数组中的索引值的,不然就hash不到,丢失数据,所以这里在迁移的时候将高X位为1的Node分类为hn,将高X位为0的Node分类为ln。
回到代码中:
for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); }
这个操作将高低位组成了两条链表结构,由下图所示:
然后将其CAS操作放入新的Node数组中:
setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn);
其中,低位链表放入原下标处,而高位链表则需要加上原Node数组长度,其中为什么不多赘述,上面已经举例说明了,这样就可以保证高位Node在迁移到新Node数组中依然可以使用hash算法散列到对应下标的数组中去了。
最后将原Node数组中对应下标Node对象设置为fwd标记Node,表示该节点迁移完成,到这里,一个节点的迁移就完成了,将进行下一个节点的迁移,也就是i-1=14下标的Node节点。
扩容时的get操作
假设Node下标为16的Node节点正在迁移,突然有一个线程进来调用get方法,正好key又散列到下标为16的节点,此时怎么办?
public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
重点看有注释的那两行,在get操作的源码中,会判断Node中的hash是否小于0,是否还记得我们的占位Node,其hash为MOVED,为常量值-1,所以此时判断线程正在迁移,委托给fwd占位Node去查找值:
Node<K,V> find (int h, Object k) { outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1 ) & h)) == null ) return null ; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0 ) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null ) return null ; } } }
到这里应该可以恍然大悟了,之所以占位Node需要保存新Node数组的引用也是因为这个,它可以支持在迁移的过程中照样不阻塞地查找值,可谓是精妙绝伦的设计。
多线程协助扩容
在put操作时,假设正在迁移,正好有一个线程进来,想要put值到迁移的Node上,怎么办?
final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); ... } final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
此方法涉及大量复杂的位运算,这里不多赘述,只是简单的说几句,此时sizeCtl变量用来标示HashMap正在扩容,当其准备扩容时,会将sizeCtl设置为一个负数,(例如数组长度为16时)其二进制表示为:
1000 0000 0001 1011 0000 0000 0000 0010
无符号位为1,表示负数。其中高16位代表数组长度的一个位算法标示(有点像epoch的作用,表示当前迁移朝代为数组长度X),低16位表示有几个线程正在做迁移,刚开始为2,接下来自增1,线程迁移完会进行减1操作,也就是如果低十六位为2,代表有一个线程正在迁移,如果为3,代表2个线程正在迁移以此类推…
只要数组长度足够长,就可以同时容纳足够多的线程来一起扩容,最大化并行任务,提高性能。
在什么情况下会进行扩容操作?
在put值时,发现Node为占位Node(fwd)时,会协助扩容
在新增节点后,检测到链表长度大于8时
final V putVal (K key, V value, boolean onlyIfAbsent) { ... if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } ... }
treeifyBin方法会将链表转换为红黑树,增加查找效率,但在这之前,会检查数组长度,若小于64,则会优先做扩容操作:
private final void treeifyBin (Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null ) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1 ); else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { } } } }
在每次新增节点之后,都会调用addCount方法,检测Node数组大小是否达到阈值:
final V putVal (K key, V value, boolean onlyIfAbsent) { ... addCount(1L , binCount); return null ; } private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
总结
ConcurrentHashMap运用各类CAS操作,将扩容操作的并发性能实现最大化,在扩容过程中,就算有线程调用get查询方法,也可以安全的查询数据,若有线程进行put操作,还会协助扩容,利用sizeCtl标记位和各种volatile变量进行CAS操作达到多线程之间的通信、协助,在迁移过程中只锁一个Node节点,即保证了线程安全,又提高了并发性能。
统计容器大小的线程安全
ConcurrentHashMap在每次put操作之后都会调用addCount方法,此方法用于统计容器大小且检测容器大小是否达到阈值,若达到阈值需要进行扩容操作,这在上面也是有提到的。这一节重点讨论容器大小的统计是如何做到线程安全且并发性能不低的。
大部分的单机数据查询优化方案都会降低并发性能,就像缓存的存储,在多线程环境下将有并发问题,所以会产生并行或者一系列并发冲突锁竞争的问题,降低了并发性能。类似的,热点数据也有这样的问题,在多线程并发的过程中,热点数据(频繁被访问的变量)是在每一个线程中几乎或多或少都会访问到的数据,这将增加程序中的串行部分,回忆一下开头所描述的,程序中的串行部分将影响并发的可伸缩性,使并发性能下降,这通常会成为并发程序性能的瓶颈。
而在ConcurrentHashMap中,如何快速的统计容器大小更是一个很重要的议题,因为容器内部需要依靠容器大小来考虑是否需要扩容,而在客户端而言需要调用此方法来知道容器有多少个元素,如果处理不好这种热点数据,并发性能将因为这个短板整体性能下降。
试想一下,如果是你,你会如何设计这种热点数据?是加锁,还是进行CAS操作?进入ConcurrentHashMap中,看看大师是如何巧妙的运用了并发技巧,提高热点数据的并发性能。
先用图的方式来看看大致的实现思路:
@sun .misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
这是一个粗略的实现,在设计中,使用了分而治之的思想,将每一个计数都分散到各个countCell对象里面(下面称之为桶),使竞争最小化,又使用了CAS操作,就算有竞争,也可以对失败了的线程进行其他的处理。乐观锁的实现方式与悲观锁不同之处就在于乐观锁可以对竞争失败了的线程进行其他策略的处理,而悲观锁只能等待锁释放,所以这里使用CAS操作对竞争失败的线程做了其他处理,很巧妙的运用了CAS乐观锁。
下面看看具体的代码实现吧:
private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } ... }
假设当前线程为第一个put的线程
先假设当前Map还未被put数据,则addCount一定没有被调用过,当前线程第一个调用addCount方法,则此时countCell一定没有被初始化,为null,则进行如下判断:
if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x))
这里的if判断一定会走第二个判断,先CAS增加变量baseCount的值:
private transient volatile long baseCount;
这个值有什么用呢?我们看看统计容器大小的方法sumCount:
final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
这个方法的大体思路与我们开头那张图差不多,容器的大小其实是分为两部分,开头只说了计数桶的那部分,其实还有一个baseCount,在线程没有竞争的情况下的统计值,换句话说,在增加容量的时候其实是先去做CAS递增baseCount的。
由此可见,统计容器大小其实是用了两种思路:
CAS方式直接递增:在线程竞争不大的时候,直接使用CAS操作递增baseCount值即可,这里说的竞争不大指的是CAS操作不会失败的情况
分而治之桶计数:若出现了CAS操作失败的情况,则证明此时有线程竞争了,计数方式从CAS方式转变为分而治之的桶计数方式
出现了线程竞争导致CAS失败
此时出现了竞争,则不会再用CAS方式来计数了,直接使用桶方式,从上面的addCount方法可以看出来,此时的countCell是为空的,最终一定会进入fullAddCount方法来进行初始化桶:
private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { CounterCell[] as; CounterCell a; int n; long v; ... if ((as = counterCells) != null && (n = as.length) > 0 ) { ... } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell[2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
到这里就完成了计数桶的初始化工作,在之后的计数都将会使用计数桶方式来统计总数
计数桶扩容
从上面的分析中我们知道,计数桶初始化之后长度为2,在竞争大的时候肯定是不够用的,所以一定有计数桶的扩容操作,所以现在就有两个问题了:
什么条件下会进行计数桶的扩容?
扩容操作是怎么样的?
假设此时是用计数桶方式进行计数:
private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } ... }
进入fullAddCount方法:
private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { CounterCell r = new CounterCell(x); if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; else if (counterCells != as || n >= NCPU) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { CounterCell[] rs = new CounterCell[n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
看到这里,想必已经可以解决上面两个问题了:
什么条件下会进行计数桶的扩容?
答:在CAS操作递增计数桶失败了3次之后,会进行扩容计数桶操作,注意此时同时进行了两次随机定位计数桶来进行CAS递增的,所以此时可以保证大概率是因为计数桶不够用了,才会进行计数桶扩容
扩容操作是怎么样的?
答:计数桶长度增加到两倍长度,数据直接遍历迁移过来,由于计数桶不像HashMap数据结构那么复杂,有hash算法的影响,加上计数桶只是存放一个long类型的计数值而已,所以直接赋值引用即可。
总结
个人感觉,统计容器大小的线程安全与扩容ConcurrentHashMap这两个算得上ConcurrentHashMap中最聪明的两个并发设计了,阅读此源码的我看到这两个操作的设计,都忍不住拍手叫绝,我想,这或许也是一个看源码的乐趣吧,站在巨人的肩膀看巨人的思想。
总结一下计数中用到的并发技巧:
利用CAS递增baseCount值来感知是否存在线程竞争,若竞争不大直接CAS递增baseCount值即可,性能与直接baseCount++差别不大
若存在线程竞争,则初始化计数桶,若此时初始化计数桶的过程中也存在竞争,多个线程同时初始化计数桶,则没有抢到初始化资格的线程直接尝试CAS递增baseCount值的方式完成计数,最大化利用了线程的并行。此时使用计数桶计数,分而治之的方式来计数,此时两个计数桶最大可提供两个线程同时计数,同时使用CAS操作来感知线程竞争,若两个桶情况下CAS操作还是频繁失败(失败3次),则直接扩容计数桶,变为4个计数桶,支持最大同时4个线程并发计数,以此类推…同时使用位运算和随机数的方式"负载均衡"一样的将线程计数请求接近均匀的落在各个计数桶中。
get操作的线程安全
对于get操作,其实没有线程安全的问题,只有可见性的问题,只需要确保get的数据是线程之间可见的即可:
public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
在get操作中除了增加了迁移的判断以外,基本与HashMap的get操作无异,这里不多赘述,值得一提的是这里使用了tabAt方法Unsafe类volatile的方式去获取Node数组中的Node,保证获得到的Node是最新的
static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long )i << ASHIFT) + ABASE); }
JDK1.7与1.8的不同实现
JDK1.7的ConcurrentHashMap底层数据结构:
其中1.7的实现也同样采用了分段锁的技术,只不过多个一个segment,一个segment里对应一个小HashMap,其中segment继承了ReentrantLock,充当了锁的角色,一把锁锁一个小HashMap(相当于多个Node),从1.8的实现来看, 锁的粒度从多个Node级别又减小到一个Node级别,再度减小锁竞争,减小程序同步的部分。
总结
不得不说,大师将CAS操作运用的淋漓尽致,相信理解了以上源码的读者也可以学习到大师所运用的并发技巧,不仅仅是在ConcurrentHashMap中,其实在大部分JUC的源码中很多并发思想很值得我们去阅读、学习。