Java锁(二)ReentrantLock独占锁分析

ReentrantLock 的功能是实现代码段的并发访问控制,是一种排它锁,也就是通常意义上所说的锁,内部有两种实现 NonfairSync 和 FairSync,公平锁和非公平锁,默认采用非公平锁策略。ReentrantLock 的实现不仅可以替代隐式的 synchronized 关键字,而且能够提供超过关键字本身的多种功能。

1、ReentrantLock 的使用

class X {
  private final ReentrantLock lock = new ReentrantLock();
  // ...

  public void m() {
    lock.lock();  // block until condition holds
    try {
      // ... method body
    } finally {
      lock.unlock()
    }
  }
}

ReentrantLock 会保证 method-body 在同一时间只有一个线程在执行这段代码,或者说,同一时刻只有一个线程的 lock 方法会返回。其余线程会被挂起,直到获取锁。从这里可以看出,其实 ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。那现在看下 Doug Lea 怎么去实现 ReentrantLock 重入锁的。首先看下 ReentrantLock 的创建和加锁、解锁过程。

2、ReentrantLock 原理分析

/**
 * 默认非公平锁
 */
public ReentrantLock() {
    sync = new NonfairSync();
} /**
 * 创建ReentrantLock,公平锁or非公平锁
*/
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

/**
 *加锁解锁,使用sync完成
*/
public void lock() {
    sync.lock();
}
public void unlock() {
    sync.release(1);
}

其中,公平锁中每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁。非公平锁中每个线程抢占锁的顺序不定,先获取锁,获取不到,然后加入到 queue 中,和调用 lock 方法的先后顺序无关。
如果在绝对时间上,先对锁进行获取的请求一定被先满足,那么这个锁是公平的,反之,是不公平的,也就是说等待时间最长的线程最有机会获取锁,也可以说锁的获取是有序的。ReentrantLock 这个锁提供了一个构造函数,能够控制这个锁是否是公平的。而锁的名字也是说明了这个锁具备了重复进入的可能,也就是说能够让当前线程多次的进行对锁的获取操作,这样的最大次数限制是 Integer.MAX_VALUE,约 21 亿次左右。
事实上公平的锁机制往往没有非公平的效率高,因为公平的获取锁没有考虑到操作系统对线程的调度因素,这样造成 JVM 对于等待中的线程调度次序和操作系统对线程的调度之间的不匹配。对于锁的快速且重复的获取过程中,连续获取的概率是非常高的,而公平锁会压制这种情况,虽然公平性得以保障,但是响应比却下降了,但是并不是任何场景都是以 TPS 作为唯一指标的,因为公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

AQS 的队列操作

在分析加锁前,先看下 AQS 的入队列操作, head,tail 节点默认为 null,入队列时,当 tail 为 null 时,初始化创建 dummy head 节点,将入队列的 node 插入队尾。

/**
 * Creates and enqueues node for given thread and mode.
 * 节点入同步队列,通过CAS比较然后插入队列尾部,
 * @param current the thread
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速入队列,tail不为null,通过CAS比较然后插入队列尾部 Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //快速入队列失败后执行enq方法
    enq(node);
    return node;
}

/**
 * 插入节点到队列中,必要的时候初始化头节点,返回该节点前驱
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 初始化,创建Dummy header,thread为null
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

AQS 获取锁定(独占锁)

调用 acquire 方法,其实这个方法是阻塞的, 获取锁的步骤为:
1、 tryAcquire(由子类 Sync 实现)尝试获取锁
2、 没有获取到锁,将节点加入到队列尾部中,加入成功 selfInterrupt,中断当前线程。
ReentrantLock,state 初始化为 0,表示未锁定状态。A 线程 lock()时,会调用 tryAcquire()独占该锁并将 state+1。
image.png
独占模式下的 AQS 是不响应中断的 ,指的是加入到同步队列中的线程,如果因为中断而被唤醒的话,不会立即返回,并且抛出 InterruptedException。而是再次去判断其前驱节点是否为 head 节点,决定是否争抢同步状态。如果其前驱节点不是 head 节点或者争抢同步状态失败,那么再次挂起。

public final void acquire(int arg)  {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

没有获取到锁则调用 AQS 的 acquireQueued 方法:
1、当 node 的前驱节点是头节点,并且独占时才返回
2、前继节点不是 head, 而且当你的前继节点状态是 Node.SIGNAL 时, 你这个线程将被 park(),直到另外的线程 release 时,发现 head.next 是你这个 node 时才 unpark,才能继续循环并获取锁

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            /* 当node的前驱节点是头节点,并且独占时才返回 */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC,队列中移除头节点p
                failed = false;
                return interrupted;
            }
            /* 阻塞并判断是否打断,其实这个判断才是自旋锁真正的猥琐点,
             * 意思是如果你的前继节点不是head,
             * 而且当你的前继节点状态是Node.SIGNAL时,
             * 你这个线程将被park(),
             * 直到另外的线程release时,发现head.next是你这个node时,才unpark,
             * 才能继续循环并获取锁
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire 这个方法删除所有 waitStatus>0 也就是 CANCELLED 状态的 Node,并设置前继节点为 signal

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 节点已经设置状态Node.SIGNAL,
         * 请求释放使它获取信号,所以它才能安全的park
         */
        return true;
    if (ws > 0) {
        /*
         *前驱节点被取消,跳过所有的取消的前驱节点和表明重试
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus为0或者PROPAGATE,表明我们需要一个信号,但是还没有park,
         * 调用者需要重试,保证在parking过程中,它不能被获取到
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/* 禁用当前线程,返回是否中断 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

独占锁 acquire 获取的过程,如下所示
image.png

AQS 释放独占锁

1、tryRelease 释放锁,没有释放成功,返回 false
2、锁释放成功,唤醒 head 的后继节点,返回 true

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//unblock,唤醒head的后继节点
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
     * 状态为负数,清除信号,设置成0
     */
    int ws = node.waitStatus;
    if (ws <0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * 唤醒后继节点(一般是下一个节点),如果节点被取消或者为null
     * 反向遍历从尾到头找到实际的非取消的后继节点(问题:为什么不正向遍历)
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

释放锁的过程如下图所示
image.png

3、公平锁和非公平锁

两种锁都继承 Sync,而 Sync 又继承 AbstractQueuedSynchronizer ,所以子类必须实现 AQS 的 5 个保护方法。
对于独占锁,需要实现 tryAcquire,tryRelease,isHeldExclusively 这 3 个方法,
其中 tryRelease,isHeldExclusively 是公平锁和非公平锁共有的,在 Sync 中实现。
1、tryRelease 尝试释放锁,每调用 tryRelease(1)一次,将 state 减去 1,当 state 为 0 的时候,释放锁成功,将独占锁的 owner 设为 null。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

2、isHeldExclusively 判断独占锁的 owner 是不是当前线程

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

非公平锁

现在我们来看 NonfairSync 加锁的策略
1、查看同步队列锁的 state,不通过同步队列,通过 CAS 抢占独占锁,抢占成功,将当前线程设置成独占线程,state 增加 1。
2、获取不成功,走普通的获取锁的流程

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

其中 NonfairSync 在 acquire 获取锁的过程中,调用 tryAcquire 尝试获取锁。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

在看 Sync 的 nonfairTryAcquire 方法实现如下,直接通过 CAS 获取锁

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    /* 锁没有被占用,将当前线程设置成独占锁的owner
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        /* 增加独占锁的被线程的加锁次数 */
        int nextc = c + acquires;
        if (nextc <0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁

了解非公平锁的获取过程,我们再看下公平锁的加锁过程,了解其区别

final void lock() {
    acquire(1);
}

通过 acquire 获取锁,没有像非公平锁通过 CAS 操作,直接抢占独占锁。

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        /* hasQueuedPredecessors判断是否有比当前线程等待更久的线程在等待
         * 没有的话则通过CAS获取锁
         */
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc <0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

从上面的代码可以看出
1、公平锁与非公平锁的释放锁步骤是一致的
2、获取锁的过程不一致,非公平锁是让当前线程优先独占,而公平锁则是让等待时间最长的线程优先,非公平的可能让其他线程没机会执行,而公平的则可以让等待时间最长的先执行,但是性能上会差点。

https://alicharles.oss-cn-hangzhou.aliyuncs.com/static/images/mp_qrcode.jpg
文章目录
  1. 1、ReentrantLock 的使用
  2. 2、ReentrantLock 原理分析
    1. AQS 的队列操作
    2. AQS 获取锁定(独占锁)
    3. AQS 释放独占锁
  3. 3、公平锁和非公平锁
    1. 非公平锁
    2. 公平锁