0%

Java ConcurrentHashMap源码分析

本文中ConcurrentHashMap源码分析基于jdk1.7,与jdk1.8中ConcurrentHashMap的实现有很大差别,在学完《Java并发编程实战》和《Linux程序设计》之后如果有时间再具体研究jdk1.8中新的锁和并发机制。(http://ifeve.com/jdk1-8-abstractqueuedsynchronizer/)

背景知识

与ConcurrentHashMap类似的Java Conllections还有Hashtable和HashMap,HashMap不是一个线程安全的类,key和value的值都可以是null,ConcurrentHashMap和HashMap都是线程安全的类,但是key和value的值都不能是null。Hashtable保证线程安全的做法是每个可调用的方法都是synchronized的。这意味着每个线程访问Hashtable时都会把整个Hashtable锁起来。ConcurrentHashMap则采用了分段锁(Segment)的机制,使得竞态条件只发生在Segment内,增加了并发度;同时,在HashMap.Entry的基础上进行改进,ConcurrentHashMap.Entry的value和next域都是volatile的,可以保证在多线程环境下对于其他线程的可见性。由于ConcurrentHashMap不是对整个表加锁,在执行size()这些全局性质的方法时需要遍历整个Segment数组。

HashEntry<K, V>类

HashEntry 用来封装散列映射表中的键值对

1
2
3
4
5
6
static final class HashEntry<K, V> {
final int hash;
final K key;
volatile V value;
volatile ConcurrentHashMap.HashEntry<K, V> next;
}

Segment类

每一个 Segment 对象都有一个count对象来表示本Segment中包含的HashEntry对象的个数。这样当需要更新计数器时,不用锁定整个 ConcurrentHashMap。Segment类源码:

1
2
3
4
5
6
7
8
9
10
static final class Segment<K, V> extends ReentrantLock implements Serializable {
transient volatile ConcurrentHashMap.HashEntry<K, V>[] table;
transient int threshold;
final float loadFactor;
Segment(float lf, int threshold, ConcurrentHashMap.HashEntry<K, V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
}

JDK1.7中除了第一个Segment之外,剩余的Segments采用的是延迟初始化的机制:每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。ensureSegment可能在并发环境下被调用,但与想象中不同,ensureSegment并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下:

1
2
3
4
5
6
7
8
9
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}

CorrentHashMap类

ConcurrentHashMap类在默认并发级别下会创建16个Segment对象,每个Segment对象里面包含若干个桶,每个桶中存放的是由HashEntry连接起来的单链表。散列均匀的情况下,每个Segment大致守护整个表的1/16对象。ConcurrentHashMap数据结构如图所示:

ConcurrentHashMap数据结构

ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。segmentShift与segmentMask都是在构造过程中根据concurrency level被相应的计算出来。

如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。

1
2
3
4
5
6
7
8
9
10
11
12
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
static final int DEFAULT_INITIAL_CAPACITY = 16;
static final float DEFAULT_LOAD_FACTOR = 0.75F;
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
final int segmentMask;
final int segmentShift;
final ConcurrentHashMap.Segment<K, V>[] segments;
//对外提供三个key, value和<key, value>三个视图
transient Set<K> keySet;
transient Set<Entry<K, V>> entrySet;
transient Collection<V> values;
}

put()/get()/remove()

ConcurrentHashMap的put方法被代理到了对应的Segment中。1.7版本的ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化,在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。

在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。

put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过Unsafe的putOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。

rehash()

相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。这部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
for(nodeIndex = 0; nodeIndex < oldCapacity; ++nodeIndex) {
ConcurrentHashMap.HashEntry e = oldTable[nodeIndex];
if(e != null) {
ConcurrentHashMap.HashEntry next = e.next;
int idx = e.hash & sizeMask;
if(next == null) {
newTable[idx] = e;
} else {
ConcurrentHashMap.HashEntry lastRun = e;
int lastIdx = idx;

ConcurrentHashMap.HashEntry p;
for(p = next; p != null; p = p.next) {
int v = p.hash & sizeMask;
if(v != lastIdx) {
lastIdx = v;
lastRun = p;
}
}

newTable[lastIdx] = lastRun;

for(p = e; p != lastRun; p = p.next) {
Object var18 = p.value;
int h = p.hash;
int k = h & sizeMask;
ConcurrentHashMap.HashEntry n = newTable[k];
newTable[k] = new ConcurrentHashMap.HashEntry(h, p.key, var18, n);
}
}
}
}

get()/containsKey()

get与containsKey两个方法几乎完全一致:他们都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。

size()/containsValue()

这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment(通过Unsafe的getObjectVolatile()以保证原子读语义),获得对应的值以及所有Segment的modcount之和。如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。

当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的Segment,否则容易出现其他线程创建Segment并进行put,remove等操作。代码如下:

1
2
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock();// force creation

一般来说,应该避免在多线程环境下使用size和containsValue方法, 或者只把这些返回值作为参考值,不能直接用于严格的程序控制代码中。

总结

在使用锁协调多线程环境下同步访问时,减少对锁的竞争可以有效提高并发度,减少对锁的竞争有两种方法:

  • 减少请求锁的次数
  • 减少持有锁的时间

ConcurrentHashMap中通过以下三中措施提高并发性能:

  • 分段锁Segment的使用可以有效减少发生竞争的可能性
  • HashEntry中value和next域的volatile属性可以减少持有锁的时间
  • HashEntry倒序更新的方式可以减少访问次数和持有锁的时间

JDK1.8 中的实现

JDK 1.8 中,ConcurrentHashMap 抛弃了原有的 Segment 分段锁设计,采用 CAS + synchronized 来保证并发安全性。数组中 Node(1.7 中的 HashEntry)的 val 和 next 都是 volatile 的,保证可见性。

put流程

  1. 根据 key 计算出 hashcode
  2. 判断是否需要初始化
  3. 根据 key 定位 Node,如果为空采用 cas 尝试写入,失败则自旋保证成功
  4. 如果 Node 正在扩容,加入扩容流程
  5. 如果都不满足,采用 synchronized 锁写入数据
  6. 如果数量大于8,转化为红黑树

get 流程

  1. 根据 key 的 hashcode 寻址,如果就在桶上则直接返回
  2. 否则按照红黑树或链表方式查找

后记

与HashMap不同的是,ConcurrentHashMap并不允许key或者value为null,按照Doug Lea的说法,这么设计的原因是在ConcurrentHashMap中,一旦value出现null,则代表HashEntry的key/value没有映射完成就被其他线程所见,需要特殊处理。在JDK1.6中,get方法的实现中就有一段对HashEntry.value == null的防御性判断。但Doug Lea也承认实际运行过程中,这种情况似乎不可能发生。

参考资料

  1. 一文让你彻底理解 Java HashMap 和 ConcurrentHashMap

http://www.threaddeath.com/concurrency_of_concurrenthashmap.html
http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/index.html