深入Java并发包(2) – Java说要有并发包, 于是有了AQS

This entry is part 2 of 4 in the series 深入Java并发包

AQS提供了框架,提供了许多可以选择性实现的方法。将他们组合起来,便成了各种我们常用的锁/队列/同步器。本文以ReentrantLock和CountDownLatch为切入口扒一扒源码, 讲一讲思路。

AQS为何物?

AQS提供的并发控制框架, 本质上就是将多个线程对竞争对象的操作变成有序的队列. 这从类名也可以看出来AbstractQueuedSynchronizer,

  • A (Abstract)指的是抽象性,是指导性的框架, 在本类中只会实现最基础的方法, 大多数的方法默认抛出异常UnsupportedOperationException, 子类可以按需重写自己需要的方法.
  • Q (Queue)说明是本类通过显式队列来保存竞争线程的, 队列的节点是内部内Node, 双向队列.
  • S (Synchronizer)即针对线程共享对象的修改都是通过同步技术如CAS, volatile来实现的, 这是线程安全的基础.

Synchronizer vs. synchronize

当然地, ASQ内部绝对不会使用synchronize关键字, 因为AQS的出现本来就是为了更加轻量级地在java语言层面实现同步语义. AQS依赖的主要是乐观锁技术, 而synchronize则是基于JVM底层对象头的悲观锁技术, 更重且悲观锁的特性导致其性能也不如AQS.

先看看AQS内部的全局变量只有三个, 他们是:

// 保存等待线程队列的头
private transient volatile Node head;
// 保存等待线程队列的尾
private transient volatile Node tail;
// 表示当前锁状态的属性, 但具体什么含义, 由子类确定
// 稍后聊到ReentrantLock和CountDownLatch的时候继续分析
private volatile int state

// 另外AQS还继承了AbstractOwnableSynchronizer,后者还有一个属性:
private transient Thread exclusiveOwnerThread; // 记录当前占有锁的线程

AQS#Node

值得注意的是,tryAcquire()方法是AQS提供的方法,但实际并没有做任何操作,仅仅抛出UnsupportedOperationException异常:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

类似tryAcquire的方法还有tryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively共5个。这些方法是围绕state实现锁的基本逻辑,子类可以自己实现不同的功能例如公平、非公平、读写锁、同步等待… … 从这个角度来看,前面的的tryAcquire()做的事情是“获取锁”并不准确,实际做的是对state的竞争。

ReentrantLock

一次获取锁

ReentrantLock是常用的可重入锁. 它定义state为锁重入的次数, 0表示没有线程占有. 默认构造方法创建的是一个非公平的可重入锁. 也可以使用带参方法创建公平的重入锁, 区别在于创建的同步器sync(ReentrantLock的全局变量)是否公平.

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentranLock调用lock()加锁方法最后落到AQS#acquire(int)上。此方法先尝试调用子类的tryAcquire(int)来做一次获取锁的尝试,成功则直接返回,失败则进入等待队列。

# AQS.class
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 尝试一次获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //一次失败入队列
        selfInterrupt();
}

前面介绍过,tryAcquire方法是AQS留给子类实现的同步逻辑,在ReentranLock中,FairSyncNonfairSync各自实现了此方法,基本原理都是通过state来判断锁定状态:当没有线程占有锁时state=0,任意线程获取锁都通过CAS(0,1)来争强锁,重入锁则state+1;对应地,释放锁就是state-1。

// ReentrantLock$Sync.class
final boolean nonfairTryAcquire(int acquires) {
    // 注意此方法并未加锁,并发访问
    final Thread current = Thread.currentThread();
    // 尝试获取state值
    int c = getState();
    if (c == 0) {
        // state=0,进一步通过CAS安全修改
        if (compareAndSetState(0, acquires)) {
            // 获取成功,将占有锁的线程修改为当前
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // state!=0,目前占有锁的线程即是自己,重入锁
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            // 重入次数int溢出
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // 获取锁失败
    return false;
}

// ReentrantLock$NonfairSync.class
protected final boolean tryAcquire(int acquires) {
    // 即ReentrantLock$Sync#nonfairTryAcquire()
    return nonfairTryAcquire(acquires); 
}

// ReentrantLock$FairSync.class
protected final boolean tryAcquire(int acquires) {
    //... 
    int c = getState();
    if (c == 0) {
        // FairSync的关键在于CAS修改之前,先判断在等待锁队列中是否有前置节点
        if (!hasQueuedPredecessors() && 
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    // ... 重入锁 和 获取锁失败 的处理跟NonfairSync完全一致
}

幸运的话,一次获取锁就能成功返回,这也是乐观锁最希望见到的情况。但如果一次获取锁失败,就需要进入队列等待锁,这时就会进入AQS#acquireQueued(Node,int)方法的逻辑。

等待锁

正如AQS类名的Q所指Queue,AQS提供的等待锁功能是通过队列来实现的,在进入AQS#acquireQueued(Node,int)方法前首先如要通过addWaiter(Node.EXCLUSIVE)方法构造一个Node节点。Node保存的信息包括:

  • volatile Node prev节点保存前置节点
  • volatile Node next节点保存后继节点
  • volatile Thread thread节点保存当前线程
  • volatile int waitStatus保存了当前节点的状态. 要注意的是, 这里的状态是节点状态, 区别于AQS的全局变量state保存的是锁的状态.

// AQS.java
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; // 成功获取锁的标识
    try {
        boolean interrupted = false; // 在等待锁的过程中是否发生过中断
        // 死循环直到获取锁, 或者异常退出。此循环是等待锁的核心逻辑
        for (;;) {
            final Node p = node.predecessor();
            // 前置节点就是等待队列头,则尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功,将队列头设为当前节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted; // 返回等待过程中是否发生过中断
            }
            // parkAndCheckInterrupt方法触发线程park, 如果过程中发生中断返回
            // true, 本方法不可中断, 故仅标记interrupted交给外层方法处理
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) 
            // 异常时获取锁失败, 需要取消本次acquire请求
            // 并将此节点的状态修改为CANCELLED,结束锁争抢
            cancelAcquire(node);
    }
}

// AQS.class
// 在尝试获取锁失败之后,根据前置节点的状态来判断当前节点(所在线程)是否需要进入park等待
// 结合方法调用方可知,当此方法返回true时,会导致for循环暂停,直到unpark
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前置节点的状态为SIGNAL,表示前置节点正常运行,后续节点需要等待前置节点
        // 的通知(SIGNAL)再做行动(是否进入park等待)
        return true;
    if (ws > 0) {
        // ws>0只有一个状态即CANCELLED,表示前置节点已经中止退出(退出前通过unpark唤醒
        // 了当前节点)那么就沿着队列找第一个未中止的节点为当前的新前置节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 在park之前将前置节点的状态修改为SIGNAL,提醒它记得来唤醒(unpark)当前节点
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// AQS.class
// 在判断当前节点需要进入park等待时,就掉用此方法
// 注意返回是否异常的标志,这作为可中断lock的依据
private final boolean parkAndCheckInterrupt() {
    // park等待,阻塞方法
    // 线程异常 or 其他线程调用unpark方才退出阻塞
    LockSupport.park(this);
    // 线程异常导致的park退出阻塞,此方法会返回true
    return Thread.interrupted();
}

CLH vs. Sync(CAS)

对比CLH和Sync的自旋判断可以发现, 前者判断的是变量pred.locked的值, 只有前置节点会修改它的状态, 不存在线程竞争; 后者判断的是变量AQS.state, 它可以通过CAS被所有竞争线程并发安全地修改. 同样的逻辑使用CAS实现比CLH的经典实现更为优雅, 而且可以减少bug(参考深入Java并发包(1) – 什么是锁?#死锁问题). 这就是底层技术进步带来的优势.

可以看到不同Sync底层均调用AQS的acquire方法实现类似CLH的自旋判断,

可中断 vs. 不可中断

发现acquire方法在排队得到锁之后, 还根据中断标志选择调用了selfInterrupt方法中断自己. 这是因为acquire()->acquireQueued()->parkAndCheckInterrupt()->LockSupport.part()挂起线程等待唤醒(相当于CLH中while自旋). 但唤醒分两种情况:
1. 前置节点unpark当前线程, 此时不会发生中断, 也不调用selfInterrupt方法
2. 其余线程中断当前线程, 导致LockSupport.part异常唤醒(不同于Thread.sleep()或者wait(), park方法被中断之后只会唤醒, 不主动抛出InterruptException, 只能通过Thread.interrupted()判断唤醒是否由中断导致), 而parkAndCheckInterrupt被中断唤醒之后还通过Thread.interrupted()清空了中断状态并返回true. 外部方法通过parkAndCheckInterrupt返回是否true判断中断是否发生. 此时acquire会记下此状态, 等到成功获取锁之后再中断自己. 而acquireInterruptibly方法则在发现中断发生的第一时间抛出异常.

来到这里, 正常等待锁的线程都在park状态了, 直到当前持有锁的线程调用unlock()方法释放锁:

// ReentrantLock
public void unlock() {
    sync.release(1);
}
// AQS
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

有了前面加锁的铺垫, 释放锁差不多就是一个逆运算. 直接调用AQS的release方法, 在tryRelease()中递减state, 在unparkSuccessor()中unparkheadsuccessor即头结点的下一个节点.
注意这里unparkSuccessor方法unpark的只是head的下一个节点, 不论公平与非公平均调用此方法释放锁, 导致的一个问题就是: 事实上非公平锁的”非公平”只体现在入队列前的第一次抢夺锁, 一旦抢夺失败进入到队列, 一样是FIFO, 非公平又变得公平了.

至此,ReentrantLock的简单加锁流程就完整了。另外ReentrantLock还提供了一些更丰富的功能。例如:

  • ReentrantLock#lockInterruptibly()
    实现可中断的锁。实现方法是,通过park循环等待中,parkAndCheckInterrupt捕捉到异常即马上抛出异常结束等待。
  • ReentrantLock#tryLock()
    只尝试一次获取锁,失败立马返回。实现方法是,取消park循环等待。
  • ReentrantLock#tryLock(long, TimeUnit)
    带超时时间的获取锁。实现方法是,底层将LockSupport.park(Object)改为LockSupport.parkNanos(Object, long),可以实现在等待指定时间无唤醒之后主动从park状态醒来。

CountDownLatch

Latch原意为门栓, 描述CountDownLatch的作用很贴切, 就是让在门外等待(wait()方法)的线程等在门外。初始化CountDownLatch的时候指定state的值(>=0),每次调用countDown()均会递减AQS的state, 直到state降为0打开门栓, 所有等待线程同时得到许可继续运行.

不同于ReentrantLock, Latch同时给多个线程授予许可, 所以ReentrantLock调用的AQS#acquire方法为阻塞排他的不再适用, CountDownLatch#wait调用AQS#acquireSharedInterruptibly方法, 它是可中断的, 调用的子类方法也变成的tryAcquireShared:

// CountDownLatch.class
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

// AQS.class
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// CountDownLatch.class
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// AQS
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                // 调用CountDownLatch方法, 返回>0表示获取锁成功
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // ReentrantLock仅仅将当前节点设为头, 而Latch还需要设置
                    // Propagate保证解锁状态传递下去
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意Latch在释放锁之后, 头结点还需要设置传递, 将解锁状态沿着队列传递下去使所有线程均被唤醒.

// AQS
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

// AQS
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        if (h == head) // loop if head changed
            break;
    }
}

总结一下, AQS作为框架提供了获取/释放锁, 公平锁/非公平锁, 可中断/不可中断的方法, 这些方法通过CAS技术保证线程安全. 所有方法最后通过回调实现类的tryXXX方法操作state, 实现不同的逻辑.

至此把Java并发的基础类AQS和依赖它实现的Lock和Latch都分析了一遍, 下一篇我们看看开发中常用的并发容器, 是如何依赖这些工具保证并发安全的.

Series Navigation<< 深入Java并发包(1) – 什么是锁?深入Java并发包(3) – 容器那些事 >>

发表评论

电子邮件地址不会被公开。

+ 24 = 32