Java锁(四)ConditionObject分析

在讲ConditionObject之前,先讲解下条件队列。条件队列能够使得一组线程能够通过某种方式来等待特定的条件变成真,条件队列中的成员是一个个正在等待状态的线程。条件队列提供了一种挂起方式,当现场等待的条件非真时,挂起自己并释放锁,一旦等待条件为真,则立即醒来。这也是条件队列提供的主要功能。

1、隐式锁对应的条件队列

对象的内置锁(synchronized语义对应的同步机制),关联着一个内置的条件队列。Object的wait/notify/notifyAll等方法构成了内部条件队列的API(即将内部锁与内部条件队列关联的机制)。 内部条件队列是需要内置锁保护的,需要调用对象X中的条件队列,必须持有对象X上的锁。这是因为状态处于并发环境下,“等待依赖状态的某个条件”与“维护状态的一致性”是绑定在一起的。

2、显式锁对应的条件队列

与内置锁对应的是显式锁,显式锁关联的条件队列是显式条件队列。内置锁的局限是每个内置锁只能关联一个条件队列,当线程需要等待多个条件时,则需要同时获取多个内置锁。 显式锁可以与多个条件队列关联,Condition是显式锁的条件队列,它是Object的wait/notify/notifyAll等方法的扩展。提供了在一个对象上设置多个等待集合的功能,即一个对象上设置多个等待条件。

3condition使用

Condition也称为条件队列,与内置锁关联的条件队列类似,它是一种广义的内置条件队列。它提供给线程一种方式使得该线程在调用wait方法后执行挂起操作,直到线程的等待的某个条件为真时被唤醒。 条件队列必须跟锁一起使用的,因为对共享状态变量的访问发生在多线程环境下,原理与内部条件队列一样。一个Condition的实例必须跟一个Lock绑定, Condition一般作为Lock的内部类现。

class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull  = lock.newCondition();
  final Condition notEmpty = lock.newCondition();
 
  final Object[] items = new Object[100];
  int putptr, takeptr, count;
 
  public void put(Object x) throws InterruptedException {
    lock.lock();
    try {
      while (count == items.length)
        notFull.await();
      items[putptr] = x;
      if (++putptr == items.length) putptr = 0;
      ++count;
      notEmpty.signal();
    } finally {
      lock.unlock();
    }
  }
 
  public Object take() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
        notEmpty.await();
      Object x = items[takeptr];
      if (++takeptr == items.length) takeptr = 0;
      --count;
      notFull.signal();
      return x;
    } finally {
      lock.unlock();
    }
  }
 
}

4AQS对条件队列的实现类图如下:

5、条件队列的节点状态

调用条件队列的等待操作,会设置节点的waitingStatus为Condition,标识当前节点正处于条件队列中。状态转换图如下:

Node的各个状态的主要作用,Cancelled主要是解决线程在持有锁时被外部中断的逻辑,AQS的可中断锁获取方法lockInterrutible()是基于该状态实现的,显示锁必须手动释放锁,尤其是有中断的环境中,一个线程被中断可能仍然持有锁,所以必须注意在finally中unlock。Condition则是条件队列的等待操作,是Lock与条件队列关联的基础。Signal是阻塞后继线程的标识。

6、条件队列的等待和唤醒操作

条件队列上的等待和唤醒操作,本质上是节点在AQS线程等待队列和条件队列之间相互转移的过程,当需要等待某个条件时,线程会将当前节点添加到条件队列中,并释放锁;当某个线程执行条件队列的唤醒操作,则会将条件队列的节点转移到AQS等待队列。每个Condition就是一个条件队列,可以通过Lock的newCondition创建多个等待条件。操作流程如下:

条件队列等待await如下所示

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程所在的节点添加到条件等待队列里面
    Node node = addConditionWaiter();
    // 释放并唤醒后续节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // node不在同步队列中,挂起当前线程,
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 检查在等待时是否中断,如果中断,返回中断模式
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 当前节点进入到队列中,并自旋
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 当前节点的下一个等待者,不为空,移除所有取消的
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
 
/**
 * 添加一个新的等待者到条件等待队列里面
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果lastWaiter不为状态节点,移除
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程关联的node添加到条件队列中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
 
/**
 * 使用当前的state值调用release方法,返回保存的state值
 * 失败时抛出异常,并将节点设置成在Node.CANCELLED,
 * @param node 等待的状态节点
 * @return 返回上一次同步队列的state状态
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 获取当前AQS的状态值state
        int savedState = getState();
        // 释放,并唤醒后继节点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 释放失败,抛出异常,并且在finally中将节点设置成在Node.CANCELLED
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
 
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//unblock,唤醒head的后继节点
        return true;
    }
    return false;
}

await等待的流程如下图所示

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
 
/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
 
 
/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
 
/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
 
/**
 * 将节点从条件队列转到同步队列中
 * 成功,返回true
 * 否则节点被取消,在收到信号前
 */
final boolean transferForSignal(Node node) {
    /*
     * 节点被取消,不能改变waitStatus
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 
    /*
     * 将节点粘到队列中,尝试设置前驱节点的waitStatus,表明线程可能正在等待
     * 如果取消或者尝试设置waitStatus失败,唤醒重新同步
     * 在这种情况下waitStatus会出错,但是是瞬时无害的
     */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal通知条件等待

结论:显式条件队列弥补内置条件队列只能关联一个条件的缺陷,同时继承了Lock对象的公平性。在Condition对象中,与Object的wait/notify/notifyAll对应的扩展方法是await/signal/signallAll,同时也具有Object的这三个方法,所以使用的时候需要注意使用版本的正确。另外,显式锁必须遵从特定的使用规范,先lock在finally中unlock,以确保锁必然会被正确释放。

此外,AQS的两个队列都是链表队列,关联类的方法的都相当简洁,尤其是节点移除队列操作过程中,都及时释放了所占内存。读源码,可以学习到一种编码的严谨性,锻炼自己关注GC的意识。这是我见到过的第三处及时释放GC的的代码了。从最初的ArrayList的元素remove中,然后是HashMap的动态扩容数组转移操作,最近看AQS的元素唤醒和锁释放操作。关注GC的确是最近开始形成的一种编程意识。