ReentrantLock
1. ReentrantLock简介
ReentrantLock意思是"可重入锁",可理解为可以重复使用的锁,ReentrantLock是实现了Lock接口的类,并且ReentrantLock提供了更多的方法。ReentrantLock还提供了公平锁也非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平锁),当设置为true时,表示公平锁,否则为非公平锁。
公平锁与非公平锁的区别在于公平锁的锁获取是有顺序的,因而非公平锁的效率更高,在许多线程访问的情况下,公平锁表现出较低的吞吐量。
2. ReentrantLock特性
- 可重入性: 如果一个线程已经持有了某个锁,那么它可以再次调用lock()方法而不会被阻塞。锁的持有计数会在每次成功调用lock()方法时递增,并在每次unlock()方法被调用时递减。
- 公平性:ReentrantLock提供了一个公平锁的选项。公平锁会按照线程请求锁的顺序来分配锁,而不是像非公平锁那样允许线程抢占已经等待的线程的锁。公平锁可以减少“饥饿”的情况,但也可能降低一些性能。
- 可中断性:ReentrantLock的获取锁操作(
lockInterruptibly()
方法)可以被中断。这提供了另一个相对于synchronized关键字的优势,因为synchronized不支持响应中断。 - 条件变量: ReentrantLock类中还包含一个Condition接口的实现,该接口允许线程在某些条件下等待
await()
或唤醒signal()
。
3. ReentrantLock的使用
public class Demo_ReentrantLock {
static class Ticketer {
private int num = 30;
// 创建可重入锁
private ReentrantLock lock = new ReentrantLock();
public void sale() {
// 上锁
lock.lock();
try {
if (num > 0) {
System.out.println(Thread.currentThread().getName() + "卖了1张, 还剩" + --num);
}
}finally {
// 解锁
lock.unlock();
}
}
}
public static void main(String[] args) {
Ticketer ticketer = new Ticketer();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticketer.sale();
}
}, "AA").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticketer.sale();
}
}, "BB").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
ticketer.sale();
}
}, "CC").start();
}
}
运行结果: 线程调用
start()
不一定马上执行,可能立即执行可能隔一会儿执行。
查看start()
方法源码:
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}
private native void start0();
start0()
是一个本地方法,操作系统完成线程的创建,已经超出JVM的管理范围, 所以线程的启动时间不确定。
4. ReentrantLock的实现原理
4.1 ReentrantLock类图
ReentrantLock的实现依赖于内部的Sync类,这个类是AbstractQueuedSynchronizer(AQS)的一个实现。AQS是Java并发库中许多同步工具(包括Semaphore、CountDownLatch和CyclicBarrier等)的核心。
4.2 AQS工作原理
AQS使用一个int类型的变量来表示同步状态,ReentrantLock用它来表示锁的持有计数和持有线程的信息。当计数为0时,表示锁未被任何线程持有。当一个线程首次成功获取锁时,JVM会记录这个锁的持有线程,并将计数器设置为1。如果同一个线程再次请求这个锁,它将能够再次获得这个锁,并且计数器会递增。当线程释放锁时(通过调用unlock()方法),计数器会递减。如果计数器递减为0,则表示锁已经完全释放,其他等待的线程有机会获取它。
AQS内部维护着一个FIFO双向队列,该队列就是CLH同步队列。AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败(就是图中加锁没成功)时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。公平锁与非公平锁的区别在于获取锁的时候是否按照AQS的队列FIFO的顺序来。
5. 非公平和公平锁
ReentrantLock实现公平锁和非公平锁分别依赖FairSync、NonFairSync类,默认的构造函数创建的是非公平锁: FairSync、NonFairSync类的父类是Sync类:
公平锁和非公平锁的差异如下:
公平锁: 公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。
最终调用AbstractQueuedSynchronizer类的acquire()
方法通过一个无限循环不断尝试获取锁,处理节点入队、线程阻塞、中断和超时等情况,确保线程能正确获取锁。
6. 加锁源码
非公平和公平锁在获取锁的时候,调用都是lock()
方法:
// 实际调用的是Sync类的lock方法
final void lock() {
if (!initialTryLock())
acquire(1);
}
6.1 深入initialTryLock源码
initialTryLock()
是在FairSync、NonFairSync类进行了实现: 在公平锁中
initialTryLock()
首先进行初始判断,是否是0状态:
// FairSync类的方法实现
final boolean initialTryLock() {
// 获取当前线程
Thread current = Thread.currentThread();
int c = getState();
// 如果AQS的状态为0,表明当前锁没有被其他线程得到
if (c == 0) {
// hasQueuedThreads()判断队列是否还有线程
// 满足队列没有线程,那再通过CAS设置AQS的状态
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 设置排他锁线程引用给AQS
setExclusiveOwnerThread(current);
return true;
}
// 如果当前线程已经获取了锁
} else if (getExclusiveOwnerThread() == current) {
// 可重入次数+1
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 更新可重入次数
setState(c);
return true;
}
return false;
}
为何hasQueuedThreads()
判断为false的时候,还需要compareAndSetState呢,如果队列中没有线程,可以直接设置线程状态为1? 主要是多个线程可能同时尝试获取锁。hasQueuedThreads()
检查和设置锁状态这两个操作不是原子的,在这两个操作之间可能会有其他线程介入。使用compareAndSetState()
能够保证其中一个线程能够正确的更新state状态, 从而拿到锁。
而在非公平锁中,没有队列是否为空的判断:
final boolean initialTryLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 直接通过CAS方法设置值
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 设置排他锁的线程信息
setExclusiveOwnerThread(current);
return true;
// 如果当前线程已经获取了锁,不需要CAS操作
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
公平和非公平锁在initialTryLock()
方法里面都是尝试获取锁,如果不成功,就进入acquire()
方法中。
6.2 深入tryAcquire源码
acquire()
方法会调用tryAcquire()
方法,再次试图加锁:
// 实际调用的是AbstractQueuedSynchronizer类的acquire方法,arg=1
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
但是AbstractQueuedSynchronizer类的tryAcquire()
只是定义, 利用多态特性,实际是在各个子类:FairSync、NonFairSync中调用: 在公平锁FairSync类中的
hasQueuedPredecessors()
方法,第一次执行时,队列此时还未完成初始化,head为null:
public final boolean hasQueuedPredecessors() {
Thread first = null; Node h, s;
// s为头节点后面的第二节点
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}
可以看出非公平和公平锁在获取锁的时候,区别在于公平锁需要额外执行hasQueuedPredecessors()
方法,hasQueuedPredecessors()
用来判断等待队列是否为空以及是否头节点就是自己线程。返回true表示自己不是头节点,当前线程前面还有等待的线程。也就是说hasQueuedPredecessors()
中判断了是否需要排队。
6.3 深入acquire源码
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
// 初始化局部变量
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
// 死循环不断尝试获取锁
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // 获取锁失败,进入等待队列
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
7. 解锁源码
public void unlock() {
sync.release(1);
}
底层实际调用的是AbstractQueuedSynchronizer的release()
方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}