Java锁(三)CountDownLatch共享锁分析

在开始解读AQS的共享功能前,我们再重温一下CountDownLatchCountDownLatchjava.util.concurrent包下的计数器工具类,常被用在多线程环境下,它在初始时需要指定一个计数器的大小,然后可被多个线程并发的实现减1操作,并在计数器为0后调用await方法的线程被唤醒,从而实现多线程间的协作。

class Driver2 { 
    void main() throws InterruptedException {
      CountDownLatch doneSignal = new CountDownLatch(N);
      Executor e = ...

      for (int i = 0; i < N; ++i) // create and start threads 
      e.execute(new WorkerRunnable(doneSignal, i)); 
      doneSignal.await(); // wait for all to finish 
      } 
} 

class WorkerRunnable implements Runnable {
  private final CountDownLatch doneSignal;
  private final int i;
  WorkerRunnable(CountDownLatch doneSignal, int i) {
    this.doneSignal = doneSignal;
    this.i = i;
  }
  public void run() {
    try {
      doWork(i);
      doneSignal.countDown();
    } catch (InterruptedException ex) {} // return;
  }

  void doWork() { ... }
}

可以看到CountDownLatch的作用类似于一个“栏栅”,在CountDownLatch的计数为0前,调用await方法的线程将一直阻塞,直到CountDownLatch计数为0await方法才会返回,而CountDownLatchcountDown()方法则一般由各个线程调用,实现CountDownLatch计数的减1

首先,看下CountDownLatch的构造方法:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

ReentrantLock类似,CountDownLatch内部也有一个叫做Sync的内部类,同样也是用它继承了AQS,子类需要实现AQS5个保护方法。

对于共享锁,需要实现tryAcquireSharedtryReleaseShared2个方法。

 

setState方法设定的stateAQS的一个“状态位”,在不同的场景下,代表不同的含义,比如在ReentrantLock中,表示加锁的次数,在CountDownLatch中,则表示CountDownLatch的计数器的初始大小。

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

设置完计数器大小后CountDownLatch的构造方法返回,下面我们再看下CountDownLatchawait()方法。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

调用了SyncacquireSharedInterruptibly方法,因为SyncAQS子类的原因,这里其实是直接调用了AQSacquireSharedInterruptibly方法。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

这个方法的调用是响应线程的打断的,所以在前两行会检查下线程是否被打断。接着,尝试着获取共享锁,小于0,表示获取失败,AQS在获取锁的思路是,先尝试直接获取锁,如果失败会将当前线程放在队列中,按照FIFO的原则等待锁。而对于共享锁也是这个思路,如果和独占锁一致,这里的tryAcquireShared应该是个空方法,留给子类去判断。

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

如果state变成0了,则返回1,表示获取成功,否则返回-1则表示获取失败。

看到这里,读者可能会发现,await方法的获取方式更像是在获取一个独占锁,那为什么这里还会用tryAcquireShared呢?

回想下CountDownLatchawait方法是不是只能在主线程中调用?答案是否定的,CountDownLatchawait方法可以在多个线程中调用,当CountDownLatch的计数器为0后,调用await的方法都会依次返回。
也就是说可以多个线程同时在等待await方法返回,所以它被设计成了实现tryAcquireShared方法,获取的是一个共享锁,锁在所有调用await方法的线程间共享,所以叫共享锁。

如果获取共享锁失败(返回了-1,说明state不为0,也就是CountDownLatch的计数器还不为0),进入调用doAcquireSharedInterruptibly方法中,按照我们上述的猜想,应该是要将当前线程放入到队列中去。

在这之前,我们再回顾一下AQS队列的数据结构:AQS是一个双向链表,通过节点中的nextpre变量分别指向当前节点后一个节点和前一个节点。其中,每个节点中都包含了一个线程和一个类型变量:表示当前节点是独占节点还是共享节点,头节点中的线程为正在占有锁的线程,而后的所有节点的线程表示为正在等待获取锁的线程。

黄色节点为头节点,表示正在获取锁的节点,剩下的蓝色节点(Node1Node2Node3)为正在等待锁的节点,他们通过各自的nextpre变量分别指向前后节点,形成了AQS中的双向链表。每个线程被加上类型(共享还是独占)后便是一个Node
也就是本文中说的节点。

回到acquireSharedInterruptibly方法:

/**
 * 在中断模式下获取共享锁
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    /* 类型为Node.SHARED,标示为共享节点。*/
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                /* 头节点获取共享锁 */
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            /* 阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
             * 意思是如果你的前继节点不是head,
             * 而且当你的前继节点状态是Node.SIGNAL时,
             * 你这个线程将被park(),
             * 直到另外的线程release时,发现head.next是你这个node时,才unpark,
             * 才能继续循环并获取锁
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

使用了CAS更换了头节点,然后,将当前节点的下一个节点取出来,如果同样是“shared”类型的,再做一个”releaseShared”操作

/**
 * 设置队列head节点,检查后继节点是否在共享模式下等待,
 * 如果propagate > 0 或者 节点PROPAGATE状态被设置,状态传播,
 */
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 记录老的头节点
    setHead(node);
    /*
     * 如果传播propagate被调用者caller标示,或者被前一次操作记录
     * 并且下一个节点在共享模式等待,或者为null,
     * 尝试信号通知队列下一个节点
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            /* 共享模式下的释放动作,信号通知后继节点,保证状态传递 */
            doReleaseShared();
    }
}

看完await方法,我们再来看下countDown()方法:

public void countDown() {
    sync.releaseShared(1);
}

/**
 * Releases in shared mode.  Implemented by unblocking one or more
 * threads if {@link #tryReleaseShared} returns true.
*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}


/**
 * 共享模式下的释放动作,信号通知后继节点,保证状态传递
 */
private void doReleaseShared() {
    /*
     * 确保释放状态的传播,即使有其他在进行中的acquires/releases操作的情况下。
     * 如果节点需要等待信号,用常用的方式,
     * 尝试unparkSuccessor将head节点的后继unpark
     * 否则状态被设置成PROPAGATE,来保证在释放的时候,传播能够继续。
     * 另外,当执行这个操作的时候,必须循环,防止新的节点被增加,
     * 此外,不像其他使用unparkSuccessor,我们需要知道CAS是否重置状态失败,
     * 如果失败重新检查。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                /* 如果当前节点是SIGNAL意味着,它正在等待一个信号,  
                 * 或者说,它在等待被唤醒,因此做两件事,
                 * 1是重置waitStatus标志位,2是重置成功后,唤醒下一个节点。
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                /* 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,
                 * 将其设置为“传播”状态。
                 * 意味着需要将状态向后一个节点传播。
                 */
                continue;                // loop on failed CAS }
        if (h == head)                   // loop if head changed
            break;
    }
}

结束语:

1.  与AQS的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。

2.  与AQS的独占功能不同,当共享锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。