深入Java并发包(3)-容器那些事

Tuesday, November 12, 2019

TOC

CopyOnWriteArrayList vs. ArrayList

CopyOnWriteArrayList 是 ArrayList的线程安全版本, 底层保存均用数组实现, 并含有一个全局变量ReentrantLock实现线程安全。

// CopyOnWriteArrayList.java
final transient ReentrantLock lock = new ReentrantLock();
private transient volatile Object[] array; // 注意使用了volatile保证可见性

addremoveclearset这些会改变数据的方法上通过ReentrantLock.lock()方法加锁, 并且使用复制数组的方式改变数组长度(区别于ArrayList通过直接操作数组+按1.5比例扩容的方式操作数组)。 而get方法不加锁。

// 尾部追加元素
public boolean add(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 典型的lock + finally{unlock}操作方式
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // copy数组, modify数组, set数组
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    } finally {
        lock.unlock();
    }
}

// 由于使用copy+modify+set方法修改数组, 所以get的时候不需要加锁
public E get(int index) {
    return get(getArray(), index);
}

ConcurrentHashMap vs. HashMap vs. HashTable

HashMap的基本原理是通过计算每一个元素的key的hash值, 将其映射到数组中保存。 当出现hash冲突时, 退化为链表(JDK1.8优化为红黑树)保存。

对标非线程安全的HashMap, JDK1.0开始提供了一个HashTable, 它提供的线程安全方法比较粗暴, 把所有getputclearsize等方法加上synchronized关键字。 等于所有操作均会锁住整个map, 性能较低。

JDK1.5开始, 跟随juc包一起发布了高性能并发安全的ConcurrentHashMap。 在JDK1.7和JDK1.8中实现有些微的区别。 下面分别介绍:

ConcurrentHashMap@JDK1.7

ConcurrentHashMap基于桶保存数据, 它的高性能主要来自于三方面:

  1. 分桶锁, 不再锁全表
  2. 利用ReentrantLock乐观锁, 增加加锁性能
  3. 读操作不加锁, 通过Unsafe.getObjectVolatile实现volatile语义, 确保读取最新数据

ConcurrentHashMap的分桶数量在创建map对象的时候一经确定, 就不再修改, 所以第一步要确定的中间就是如何计算分桶的数量。 所有的构造方法最终调用的都是ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)initialCapacityloadFactor分别指定map初始化的容量和分桶扩容阈值。 concurrencyLevel指定预期的并发数, 最终的分桶数为满足大于concurrencyLevel的最小的2的n次幂, 默认为16。

注意到构造方法中仅创建了分桶数组segments和第一个分桶, 其余的分桶只有在需要写入数据时(具体说就是put方法)才延后创建。

// ConcurrentHashMap
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    // 取hash值的高位按位与上segmentMask得到分桶的在数组内的相对位置j
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;
    // 因为分桶数量固定, 桶一旦实例化就不会改变, 所以这里使用非volatile的方法
    // 获取桶, 只有发现为null时, 才在ensureSegment内部通过CAS创建新桶
    if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);
    // 真正的写入逻辑落到Segment的put方法
    return s.put(key, hash, value, false);
}

分桶类Segment继承自ReentrantLock, 在put写入数据时首先尝试调用tryLock获取锁, 成功的时候执行写入, 跟HashMap(JDK1.7)的写入一模一样。 如果获取失败, 则先尝试若干次tryLock, 还不成功, 则调用阻塞lock()方法。

// Segment extends ReentrantLock
// onlyIfAbsent 为true则只替换已有key,已存在不会覆盖仅返回旧值
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 尝试获取锁, 失败继续在scanAndLockForPut等待(同时做了很有意思的warm up)
    // 工作, 具体看下文
    HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        // Segment内部还是用了同样的hash值寻址Entry
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab,index);
        for (HashEntry<K,V> e = first;;) {
            // 目标已成链, 则通过e。next寻找key相等的Entry
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 数量大于阈值, 且还没到达整个链表数量的最大值, 将其rehash, 
                // 创建更多的链表, 摊平数据
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // rehash方法每次扩容一倍, 链表长度永远为2的n次幂
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();
    }
    return oldValue;
}

// tryLock尝试获取锁n次, 再lock阻塞锁
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1;
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e == null) {
                if (node == null) // speculatively create node
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0; 
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                e = e.next;
        }
        else if (++retries > MAX_SCAN_RETRIES) {
            // 超过最大重试次数, 阻塞锁
            lock();
            break;
        }
        else if ((retries & 1) == 0 &&
                (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

scanAndLockForPut warm up?

有趣的地方在于scanAndLockForPut不仅做了尝试获取锁的工作, 还在tryLock的过程中遍历了hash值对应的链表(如果已经key冲突), 判断是否有相同的key的数据。 如果已有, 返回null; 如果没有, 返回node。 上层方法获取锁之后可以直接利用此结果。 这样当前线程获取锁的时候就不需要一边占用锁, 一边耗时去做这种遍历判断。 当然, 在最后一个else if中还判断了链表是否在这个过程中修改了值, 如果修改了, 那就将整个过程重新来一次。 性能消耗? 问题不大, 因为当前线程tryLock等着也是等着。 在doc中, 称这种方法叫热身? warm up。

感兴趣的朋友可以看看这里, 还有作者Doug亲自回复的mail

相对于put操作, get(Object key)方法就简单多了, 通过两个Unsafe.getObjectVolatile方法先后获取Segment和Entry即可。 这里不再赘述。

ConcurrentHashMap@JDK1.8

JDK7对Segment的实现跟HashMap几乎一致,所以HashMap存在的hashCode大量冲突导致的链表读性能差的问题也被一并带入。JDK8对此进行了改进(连ConcurrentHashMap和HashMap一并改进),当链表的长度大于阈值(默认为8)时,会将链表变成红黑树,从而实现更快的读取。