Java锁(二)ReentrantLock独占锁分析

    ReentrantLock的功能是实现代码段的并发访问控制,是一种排它锁,也就是通常意义上所说的锁,内部有两种实现NonfairSyncFairSync,公平锁和非公平锁,默认采用非公平锁策略。ReentrantLock的实现不仅可以替代隐式的synchronized关键字,而且能够提供超过关键字本身的多种功能。

1、ReentrantLock的使用

class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...

  public void m() {
    lock.lock();  // block until condition holds
    try {
      // ... method body
    } finally {
      lock.unlock()
    }
  }
}

    ReentrantLock会保证method-body在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的lock方法会返回。其余线程会被挂起,直到获取锁。从这里可以看出,其实ReentrantLock实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。那现在看下Doug Lea 怎么去实现ReentrantLock重入锁的。首先看下ReentrantLock的创建和加锁、解锁过程。

2、ReentrantLock原理分析

/**
 * 默认非公平锁
 */
public ReentrantLock() {
    sync = new NonfairSync();
} /**
 * 创建ReentrantLock,公平锁or非公平锁
*/
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

/**
 *加锁解锁,使用sync完成
*/
public void lock() {
    sync.lock();
}
public void unlock() {
    sync.release(1);
}

    其中,公平锁中每个线程抢占锁的顺序为先后调用lock方法的顺序依次获取锁。非公平锁中每个线程抢占锁的顺序不定,先获取锁,获取不到,然后加入到queue中,和调用lock方法的先后顺序无关。

    如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。ReentrantLock这个锁提供了一个构造函数,能够控制这个锁是否是公平的。而锁的名字也是说明了这个锁具备了重复进入的可能,也就是说能够让当前线程多次的进行对锁的获取操作,这样的最大次数限制是Integer.MAX_VALUE,约21亿次左右。

    事实上公平的锁机制往往没有非公平的效率高,因为公平的获取锁没有考虑到操作系统对线程的调度因素,这样造成JVM对于等待中的线程调度次序和操作系统对线程的调度之间的不匹配。对于锁的快速且重复的获取过程中,连续获取的概率是非常高的,而公平锁会压制这种情况,虽然公平性得以保障,但是响应比却下降了,但是并不是任何场景都是以TPS作为唯一指标的,因为公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

AQS的队列操作

在分析加锁前,先看下AQS的入队列操作, head,tail节点默认为null,入队列时,当tail为null时,初始化创建dummy head节点,将入队列的node插入队尾。

/**
 * Creates and enqueues node for given thread and mode.
 * 节点入同步队列,通过CAS比较然后插入队列尾部, 
 * @param current the thread
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
	// 快速入队列,tail不为null,通过CAS比较然后插入队列尾部 Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //快速入队列失败后执行enq方法
    enq(node);
    return node;
}


/**
 * 插入节点到队列中,必要的时候初始化头节点,返回该节点前驱
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化,创建Dummy header,thread为null
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

AQS获取锁定(独占锁)

调用acquire方法,其实这个方法是阻塞的, 获取锁的步骤为:

1、  tryAcquire(由子类Sync实现)尝试获取锁

2、  没有获取到锁,将节点加入到队列尾部中,加入成功selfInterrupt,中断当前线程。

public final void acquire(int arg)  {
 if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
selfInterrupt();
}

没有获取到锁则调用AQS的acquireQueued方法:

1、当node的前驱节点是头节点,并且独占时才返回

2、前继节点不是head, 而且当你的前继节点状态是Node.SIGNAL时, 你这个线程将被park(),直到另外的线程release时,发现head.next是你这个node时才unpark,才能继续循环并获取锁

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /* 当node的前驱节点是头节点,并且独占时才返回 */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC,队列中移除头节点p
                failed = false;
                return interrupted;
            }
            /* 阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
             * 意思是如果你的前继节点不是head,
             * 而且当你的前继节点状态是Node.SIGNAL时,
             * 你这个线程将被park(),
             * 直到另外的线程release时,发现head.next是你这个node时,才unpark,
             * 才能继续循环并获取锁
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire这个方法删除所有waitStatus>0也就是CANCELLED状态的Node,并设置前继节点为signal

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 节点已经设置状态Node.SIGNAL,
         * 请求释放使它获取信号,所以它才能安全的park
         */
        return true;
    if (ws > 0) {
        /*
         *前驱节点被取消,跳过所有的取消的前驱节点和表明重试
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus为0或者PROPAGATE,表明我们需要一个信号,但是还没有park,
         * 调用者需要重试,保证在parking过程中,它不能被获取到
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    } 
    return false;
}
/* 禁用当前线程,返回是否中断 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

独占锁acquire获取的过程,如下所示

AQS释放独占锁

1、tryRelease释放锁,没有释放成功,返回false

2、锁释放成功,唤醒head的后继节点,返回true

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//unblock,唤醒head的后继节点
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 状态为负数,清除信号,设置成0
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * 唤醒后继节点(一般是下一个节点),如果节点被取消或者为null
     * 反向遍历从尾到头找到实际的非取消的后继节点(问题:为什么不正向遍历)
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}  

释放锁的过程如下图所示

ReentrantLock中公平锁和非公平锁

两种锁都继承Sync,而 Sync又继承AbstractQueuedSynchronizer ,所以子类必须实现AQS的5个保护方法。

对于独占锁,需要实现tryAcquire,tryRelease,isHeldExclusively这3个方法,

其中tryRelease,isHeldExclusively是公平锁和非公平锁共有的,在Sync中实现。

1、tryRelease尝试释放锁,每调用tryRelease(1)一次,将state减去1,当state为0的时候,释放锁成功,将独占锁的owner设为null。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

2、isHeldExclusively判断独占锁的owner是不是当前线程

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

非公平锁

现在我们来看NonfairSync加锁的策略

1、查看同步队列锁的state,不通过同步队列,通过CAS抢占独占锁,抢占成功,将当前线程设置成独占线程,state增加1。

2、获取不成功,走普通的获取锁的流程

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

其中NonfairSync在acquire 获取锁的过程中,调用tryAcquire尝试获取锁。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

在看Sync的 nonfairTryAcquire方法实现如下,直接通过CAS获取锁

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    /* 锁没有被占用,将当前线程设置成独占锁的owner
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        /* 增加独占锁的被线程的加锁次数 */
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁

了解非公平锁的获取过程,我们再看下公平锁的加锁过程,了解其区别

final void lock() {
    acquire(1);
}

通过acquire获取锁,没有像非公平锁通过CAS操作,直接抢占独占锁。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /* hasQueuedPredecessors判断是否有比当前线程等待更久的线程在等待
         * 没有的话则通过CAS获取锁
         */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

从上面的代码可以看出

1、  公平锁与非公平锁的释放锁步骤是一致的

2、获取锁的过程不一致,非公平锁是让当前线程优先独占,而公平锁则是让等待时间最长的线程优先,非公平的可能让其他线程没机会执行,而公平的则可以让等待时间最长的先执行,但是性能上会差点。

Java锁系列其他文章

1、  AbstractQueuedSynchronizer基础类分析

2、  ReentrantLock独占锁分析

3、  CountDownLatch共享锁分析

4、  ConditionObject分析

5、  CyclicBarrier分析