Skip to content

ReentrantLock

1. ReentrantLock简介

ReentrantLock意思是"可重入锁",可理解为可以重复使用的锁,ReentrantLock是实现了Lock接口的类,并且ReentrantLock提供了更多的方法。ReentrantLock还提供了公平锁也非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,否则为非公平锁。
公平锁与非公平锁的区别在于公平锁的锁获取是有顺序的,因而非公平锁的效率更高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。

2. ReentrantLock特性

  1. 可重入性: 如果一个线程已经持有了某个锁,那么它可以再次调用lock()方法而不会被阻塞。锁的持有计数会在每次成功调用lock()方法时递增,并在每次unlock()方法被调用时递减。
  2. 公平性:ReentrantLock提供了一个公平锁的选项。公平锁会按照线程请求锁的顺序来分配锁,而不是像非公平锁那样允许线程抢占已经等待的线程的锁。公平锁可以减少“饥饿”的情况,但也可能降低一些性能。
  3. 可中断性:ReentrantLock的获取锁操作(lockInterruptibly()方法)可以被中断。这提供了另一个相对于synchronized关键字的优势,因为synchronized不支持响应中断。
  4. 条件变量: ReentrantLock类中还包含一个Condition接口的实现,该接口允许线程在某些条件下等待await()或唤醒signal()

3. ReentrantLock的使用

java
public class Demo_ReentrantLock {

    static class Ticketer {
        private int num = 30;
        // 创建可重入锁
        private ReentrantLock lock = new ReentrantLock();
        public void sale() {
            // 上锁
            lock.lock();
            try {
                if (num > 0) {
                    System.out.println(Thread.currentThread().getName() + "卖了1张, 还剩" + --num);
                }
            }finally {
                // 解锁
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Ticketer ticketer = new Ticketer();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticketer.sale();
            }
        }, "AA").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticketer.sale();
            }
        }, "BB").start();
        new Thread(() -> {
            for (int i = 0; i < 40; i++) {
                ticketer.sale();
            }
        }, "CC").start();
    }
}

运行结果: Alt text 线程调用start()不一定马上执行,可能立即执行可能隔一会儿执行。
查看start()方法源码:

java
public synchronized void start() {
    
    if (threadStatus != 0)
        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;
    try {
        start0();
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
        }
    }
}

private native void start0();

start0()是一个本地方法,操作系统完成线程的创建,已经超出JVM的管理范围, 所以线程的启动时间不确定。

4. ReentrantLock的实现原理

4.1 ReentrantLock类图

Alt text ReentrantLock的实现依赖于内部的Sync类,这个类是AbstractQueuedSynchronizer(AQS)的一个实现。AQS是Java并发库中许多同步工具(包括Semaphore、CountDownLatch和CyclicBarrier等)的核心。

4.2 AQS工作原理

AQS使用一个int类型的变量来表示同步状态,ReentrantLock用它来表示锁的持有计数和持有线程的信息。当计数为0时,表示锁未被任何线程持有。当一个线程首次成功获取锁时,JVM会记录这个锁的持有线程,并将计数器设置为1。如果同一个线程再次请求这个锁,它将能够再次获得这个锁,并且计数器会递增。当线程释放锁时(通过调用unlock()方法),计数器会递减。如果计数器递减为0,则表示锁已经完全释放,其他等待的线程有机会获取它。
Alt text
AQS内部维护着一个FIFO双向队列,该队列就是CLH同步队列。AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败(就是图中加锁没成功)时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。公平锁与非公平锁的区别在于获取锁的时候是否按照AQS的队列FIFO的顺序来。

5. 非公平和公平锁

ReentrantLock实现公平锁和非公平锁分别依赖FairSync、NonFairSync类,默认的构造函数创建的是非公平锁:
Alt text FairSync、NonFairSync类的父类是Sync类:
Alt text 公平锁和非公平锁的差异如下:
公平锁: 公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。
最终调用AbstractQueuedSynchronizer类的acquire()方法通过一个无限循环不断尝试获取锁,处理节点入队、线程阻塞、中断和超时等情况,确保线程能正确获取锁。

6. 加锁源码

非公平和公平锁在获取锁的时候,调用都是lock()方法:

java
// 实际调用的是Sync类的lock方法
final void lock() {
    if (!initialTryLock())
        acquire(1);
}

6.1 深入initialTryLock源码

initialTryLock()是在FairSync、NonFairSync类进行了实现:
Alt text 在公平锁中initialTryLock()首先进行初始判断,是否是0状态:

java
// FairSync类的方法实现
final boolean initialTryLock() {
    // 获取当前线程
    Thread current = Thread.currentThread();
    int c = getState();
    // 如果AQS的状态为0,表明当前锁没有被其他线程得到
    if (c == 0) {
        // hasQueuedThreads()判断队列是否还有线程
        // 满足队列没有线程,那再通过CAS设置AQS的状态
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            // 设置排他锁线程引用给AQS
            setExclusiveOwnerThread(current);
            return true;
        }
        // 如果当前线程已经获取了锁
    } else if (getExclusiveOwnerThread() == current) {
        // 可重入次数+1
        if (++c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新可重入次数
        setState(c);
        return true;
    }
    return false;
}

为何hasQueuedThreads()判断为false的时候,还需要compareAndSetState呢,如果队列中没有线程,可以直接设置线程状态为1? 主要是多个线程可能同时尝试获取锁。hasQueuedThreads()检查和设置锁状态这两个操作不是原子的,在这两个操作之间可能会有其他线程介入。使用compareAndSetState()能够保证其中一个线程能够正确的更新state状态, 从而拿到锁。
而在非公平锁中,没有队列是否为空的判断:

java
final boolean initialTryLock() {
    // 获取当前线程
    Thread current = Thread.currentThread();
    // 直接通过CAS方法设置值
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        // 设置排他锁的线程信息
        setExclusiveOwnerThread(current);
        return true;
    // 如果当前线程已经获取了锁,不需要CAS操作
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}

公平和非公平锁在initialTryLock()方法里面都是尝试获取锁,如果不成功,就进入acquire()方法中。

6.2 深入tryAcquire源码

acquire()方法会调用tryAcquire()方法,再次试图加锁:

java
// 实际调用的是AbstractQueuedSynchronizer类的acquire方法,arg=1
public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

但是AbstractQueuedSynchronizer类的tryAcquire()只是定义, 利用多态特性,实际是在各个子类:FairSync、NonFairSync中调用:
Alt text 在公平锁FairSync类中的hasQueuedPredecessors()方法,第一次执行时,队列此时还未完成初始化,head为null:

java
public final boolean hasQueuedPredecessors() {
    Thread first = null; Node h, s;
    // s为头节点后面的第二节点
    if ((h = head) != null && ((s = h.next) == null ||
                                (first = s.waiter) == null ||
                                s.prev == null))
        first = getFirstQueuedThread(); // retry via getFirstQueuedThread
    return first != null && first != Thread.currentThread();
}

可以看出非公平和公平锁在获取锁的时候,区别在于公平锁需要额外执行hasQueuedPredecessors()方法,hasQueuedPredecessors()用来判断等待队列是否为空以及是否头节点就是自己线程。返回true表示自己不是头节点,当前线程前面还有等待的线程。也就是说hasQueuedPredecessors()中判断了是否需要排队。

6.3 深入acquire源码

java
final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
    // 初始化局部变量                    
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    // 死循环不断尝试获取锁
    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // 获取锁失败,进入等待队列
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

7. 解锁源码

java
public void unlock() {
    sync.release(1);
}

底层实际调用的是AbstractQueuedSynchronizer的release()方法:

java
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
java
protected final boolean tryRelease(int releases) {
    // 
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}