ConcurrentHashMap和HashMap思路差不多的,但是因为它是支持并发操作,所以要复杂些。

整个ConcurrentHashMap由一个Segment组成,Segment代表“部分”或“一段”的意思,所以很多地方都会将其描述为分段锁。注意,行文中,我很多地方使用“槽”来代表一个Segment。

简单的理解就是,ConcurrentHashMap是一个Segment数组,Segment它能够给继承ReegtrantLock来进行加锁,所以每次需要加锁的操作锁住的是一个Segment,这样只要保证每个Segment是线程安全的,也就实现了全局的线程安全。

ConcurrentHashMap

concrrencyLevel:并行级别、并发数、Segment数,怎么翻译不重要,理解它。默认是16,也就是说ConcurrentHashMap有16个Segment,所以理论上,这个时候,最多可以同时持有16个线程并发,只要它们分别分布在不同的Segment上。这个值可以在初始化的时候设置为其他值,但是一旦初始化后,它是不可以扩容的。

再具体到每个Segment内部,其实每个Segment很像之前介绍的HashMap,不过它要保证线程安全吗,所以处理起来要麻烦些。

初始化

inittalCapacity:初始化容量,这个值是整个ConcurrentHashMap的初始容量,实际操作的时候需要平均分配给每个Segment。 loadFactor:负载因子,之前我们说了,Segment数组不可以扩容,所以这个负载因子是给每个Segment内部使用的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 我们这里先不要那么烧脑,用默认值,concurrencyLevel 为 16,sshift 为 4
    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是设置整个 map 初始的大小,
    // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小
    // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
    // 插入一个元素不至于扩容,插入第二个的时候才会扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c)
        cap <<= 1;

    // 创建 Segment 数组,
    // 并创建数组的第一个元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往数组写入 segment[0]
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

初始化完成,我们得到一个Segment数组。 我们就当是用new ConcurrentHashMap()无参构造函数进行初始化的,那么初始化完成后:

  • Segment数组长度为16,不可以扩容
  • Segment[i]的默认大小为2,负载因子是0.75,得出初始阈值为1.5,也就是以后插入第一个元素默认不会触发扩容,插入第二个会进行第一次扩容
  • 这里初始化了segment[0],其它位置还是null
  • 当前segmentShift的值为32-4=28,segmentMask为16-1=15,姑且把他们简单的翻译为移位和掩码,这两个值马上就会用到。

put过程分析

我们先看put的主流程,对于其中的一些关键细节操作,后面会进行详细介绍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 1. 计算 key 的 hash 值
    int hash = hash(key);
    // 2. 根据 hash 值找到 Segment 数组中的位置 j
    //    hash 是 32 位,无符号右移 segmentShift(28) 位,剩下低 4 位,
    //    然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的最后 4 位,也就是槽的数组下标
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null,
    // ensureSegment(j) 对 segment[j] 进行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    // 3. 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}

第一层皮很简单,根据hash值很快找到对应的Segment,之后就是Segment内部的put操作了。

Segment内部是有 数组+链表 组成的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往该 segment 写入前,需要先获取该 segment 的独占锁
    //    先看主流程,后面还会具体介绍这部分内容
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 这个是 segment 内部的数组
        HashEntry<K,V>[] tab = table;
        // 再利用 hash 值,求应该放置的数组下标
        int index = (tab.length - 1) & hash;
        // first 是数组该位置处的链表的表头
        HashEntry<K,V> first = entryAt(tab, index);

        // 下面这串 for 循环虽然很长,不过也很好理解,想想该位置没有任何元素和已经存在一个链表这两种情况
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 继续顺着链表走
                e = e.next;
            }
            else {
                // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 如果不为 null,那就直接将它设置为链表表头;如果是null,初始化并设置为链表表头。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);

                int c = count + 1;
                // 如果超过了该 segment 的阈值,这个 segment 需要扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容后面也会具体分析
                else
                    // 没有达到阈值,将 node 放到数组 tab 的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

由于有独占锁保护,所以segment内部的操作并不复杂,至于里面并发的问题,我们稍后介绍。

引用