AQS
引言
在多线程高并发场景下,我们为了保证操作的原子性,必要的需要对代码块进行线程同步。
我们知道ReentrantLock
可以在Java中实现公平锁和非公平锁,它是类层面实现的锁,可以起到像sychronized
关键字那样的同步功能,那么它是如何做到的呢?
为了深入了解ReentrantLock
的实现,那么就必须要了解ReentrantLock
的底层设计—AQS,这篇文章将会结合底层源码来理解AQS是什么。
AQS
AQS,其实就是AbstractQueuedSynchronizer
(抽象队列式同步器)这个抽象类
位于java.util.concurrent.locks
这个包下,如果看它的派生类,会发现各种同步场景的设计都使用到了AQS,如LeentrantLock
,Semaphore
,ThreadPoolExecutor
等。
实际上AQS内部使用的是一种遵循FIFO的双向链表来存储线程对象的,这一点从源码中AQS的内部类Node即可看出。
设想这样的场景:如果有多个线程需要获取锁,那么我们对每个线程都要做妥善的处置,那么必然需要一个空间来存放这些线程以便我们管理这些线程,AQS就提供了这样一种数据结构来保存并发线程对象的引用。当我们需要唤醒线程,或是阻塞线程时都将通过这个FIFO的链式队列操作我们的线程对象。
static final class Node {
//省略....
volatile Node prev;
volatile Node next;
volatile Thread thread;
//省略....
}
那么AQS又是如何实现同步的?
如果要实现同步,我们就需要一种状态机制来确定当前同步队列的状态,是处于锁释放状态,还是处于锁占用状态,锁占用状态下我们同步队列的各个线程状态又是如何的,锁释放状态我们的同步队列又该如何运作。
其实在AQS中,是通过一个原子的int值state
来表示当前同步队列状态,具体的state
值是多少,是交由具体的实现类去定义的。
由于AQS支持独占/共享模式,因此不同实现对state
值的不同解释也不同,AQS只是做了抽象层面的定义。
例如
在RenntrantLock
实现中state
值为1时表示有线程正在占用锁,state
值代表0时表示锁被释放,这点我们来看源码。
final void lock() {
acquire(1);//调用父类AQS的acquire方法
}
//父类AQS中的方法,如果尝试获取锁失败就阻塞当前线程,将调用子类的tryAcquire方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我们关心的compareAndSetState
这个语句,在底层使用的其实是Unsafe
类。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
没错可以看到AQS用了Unsafe
类的方法,正是CAS
操作(比较并替换),这是一个原子操作调用了本地方法,使用的。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//这个方法
}
Unsafe
这个类提供了一种底层的、不安全的操作内存的方式。在compareAndSetState
方法中,Unsafe
类的compareAndSwapInt
方法被用来执行CAS(比较并交换)操作。
Unsafe
这个类比较特殊,可以直接操作内存,一般来说高级语言是没法直接操作内存的,java也存在相关的安全检查和内存管理机制。但是通过Unsafe
这个不安全的类就能完成这一操作。
compareAndSwapInt
就是它提供的一种硬件级别的原子操作指令。
原子操即在多线程环境中单个线程要么一次性做完,要么全不做。
CAS操作分为下面三个步骤
- 读取当前变量值
- 比较这个值是否等于预期值
- 如果是将这个变量更新为预期值
这样的三个操作就是原子操作
为什么要有这样的操作呢?
就好比现在多个人竞争一个房间的使用权,只有房间上挂上未使用(值=0)的牌子,才允许他人进去,这个人在进去前是不是就能挂上一个在使用的牌子(值=1),这样子其它人来的时候看到了就不会直接冲进去了,只能在外面等着。
只不过CAS操作保证了这个看牌子→判断→挂牌子
的不被他人打断。
线程阻塞
回到刚才ReentrantLock
对state值的改变,我们知道了ReentrantLock
是通过AQS中提供的CAS
方法(底层Unsafe操作本地方法)通过操作内存来改变state的值。
那么如果CAS操作失败怎么办,也就是已经有线程通过CAS获取到锁了(state = 1),这时候我们再来看AQS接下来的操作。
其实在刚才CAS操作之后,在AQS中还有一个相与的条件方acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
,这个方法中有一个addWaiter
方法,看似套娃其实就是把当前的线程构造为AQS同步队列中的一个Node
并放入链式队列中了。
那么放进去就好了吗?当然不是,既然是同步阻塞队列,那么线程竞争锁失败肯定是会被阻塞的。我们来看看阻塞的具体方法。
其实就是这个方法 parkAndCheckInterrupt
,我们进入这个方法看看。
final boolean acquireQueued(final Node node, int arg) {
//...
try {
//...
for (;;) {
//...
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//...
}
} finally {
//...
}
}
可以看到这个方法使用到了LockSupport
这个类,我们直接打入内部。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
现在我们打入LockSupport
这个类的内部了,结果一看又是老朋友Unsafe
提供的方法,那我们就不用往下看了,线程阻塞的方法也是Unsafe类提供的方法实现的。UNSAFE.park(false, 0L);
这里其实就是设置了线程无限期的等待,通过本地方法实现了线程的阻塞逻辑。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
到此为止我们大概了解了ReentrantLock
是如何利用AQS来实现的线程获取锁以及获取锁失败后的阻塞原理。
让我们来梳理一下逻辑:
- 当线程进入AQS后,具体的锁实现会调用
acquire
方法来获取锁,本质上就是CAS方法 - 如果获取锁成功(CAS),其它线程进入后也会尝试CAS获取锁
- 如果获取锁失败(CAS),就放入FIFO链式队列中阻塞等待
锁释放
我们知道使用锁的时候当线程完成任务时也一定要记得释放锁,否则就会发生死锁问题,那么在释放锁后,AQS又发生了什么呢?
这里我们就快速定位到释放锁方法,主要是以下两个步骤
- 调用具体的释放锁方法,
ReentrantLock
中最后是将state
的值设置回了0 - 由AQS负责调用
unparkSuccessor
方法,将后续节点节点从阻塞状态被唤醒 (底层方法LockSupport.unpark(s.thread);
)
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock
中释放锁具体实现
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
那么后续节点所对应的阻塞线程从阻塞状态中恢复后发生了什么?
其实我们就看刚才发生阻塞的方法在调用了parkAndCheckInterrupt()
后被阻塞,那么被唤醒后其实就是再次循环调用tryAcquire
方法获取锁。这样子我们就分析完了从锁获取→阻塞→再次获取锁
的一个分析流程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //在这行被阻塞
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
用AQS设计一个简单的锁
到这里我们就能自己实现一个自己的锁了,我们仿照ReentrantLock
的逻辑实现一个我们的锁,AQS使用了模板设计模式,将需要实现的方法开放给了我们,我们需要重写一些方法来实现具体的上锁和解锁方法。
下面的方法是博主自己写的,仅供学习参考,但是实现了基本的逻辑,大家可以参考
ReentrantLock
源码实现一个完善的锁
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class AQSTest {
public static void main(String[] args) {
Lock lock = new Lock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("t1尝试获取锁");
lock.lock();
lock.lock();//重入锁
System.out.println("t1获取到锁");
System.out.println("5s后释放锁");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();//释放锁
lock.unlock();//释放锁
System.out.println("t1释放锁");
}
});
t1.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程尝试获取锁");
lock.lock();
System.out.println("主线程获取到锁");
lock.unlock();
System.out.println("主线程释放锁");
}
}
class Lock {
private Sync sync = new Sync();
void lock(){
sync.lock();
}
void unlock(){
sync.unlock();
}
class Sync extends AbstractQueuedSynchronizer{
void lock(){
acquire(1);
}
/**
* 返回true代表获取锁成功,false代表获取锁失败
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
int state = getState();
if (state == 0){
//CAS操作
compareAndSetState(0,arg);
setExclusiveOwnerThread(Thread.currentThread());
return true;
}else if (getExclusiveOwnerThread() == Thread.currentThread()){
//可重入锁逻辑
int nextS = getState() + arg;
setState(nextS);//更新状态值,直接更新
return true;
}
return false;
}
/**
* 返回true代表释放锁成功,false代表释放锁失败
* @param arg
* @return
*/
@Override
protected boolean tryRelease(int arg) {
//当前线程释放锁
int c = getState() - arg;
if (Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
boolean b = false;
if (c == 0){
b = true;
setExclusiveOwnerThread(null);//清除拥有锁的线程引用
}
setState(c);
return b;
}
void unlock(){
release(1);
}
}
}
结果
t1尝试获取锁
t1获取到锁
5s后释放锁
主线程尝试获取锁
t1释放锁
主线程获取到锁
主线程释放锁
下面是AQS的JDK注释,如果要设计锁的话可以仔细结合源码阅读
为实现阻塞锁和依赖于先进先出(FIFO)等待队列的相关同步器(semaphores、事件等)提供了一个框架。该类旨在为大多数依赖于单个原子 int 值来表示状态的同步器提供有用的基础。子类必须定义改变该状态的受保护方法,以及定义该状态在该对象被获取或释放时的含义。有了这些方法,该类中的其他方法就可以执行所有队列和阻塞机制。子类可以维护其他状态字段,但只有使用
getState
、setState
和compareAndSetState
方法原子更新的 int 值才会被同步跟踪。
子类应定义为非公开的内部辅助类,用于实现其外层类的同步属性。AbstractQueuedSynchronizer
类没有实现任何同步接口。相反,该类定义了acquireInterruptibly
等方法,具体锁和相关同步器可根据需要调用这些方法来实现其公共方法。
该类支持默认独占模式和共享模式。在独占模式下获取时,其他线程尝试获取不会成功。多个线程在共享模式下的获取可能(但不必)成功。该类并不 “理解 “这些差异,只是在机械意义上,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也能获取。在不同模式下等待的线程共享同一个 FIFO 队列。通常,实现子类只支持其中一种模式,但这两种模式都会发挥作用,例如在读写锁中。只支持独占模式或共享模式的子类无需定义支持未使用模式的方法。
该类定义了一个嵌套的AbstractQueuedSynchronizer.ConditionObject
类,支持独占模式的子类可将该类用作 Condition 实现,其中的方法isHeldExclusively
会报告同步是否被当前线程独占,使用当前getState
值调用的方法 release 会完全释放该对象,而获取(acquire)则会根据保存的状态值,最终将该对象恢复到之前的获取状态。没有任何AbstractQueuedSynchronizer
方法会在其他情况下创建这样的条件,因此如果无法满足这一限制条件,请不要使用该方法。当然,AbstractQueuedSynchronizer.ConditionObject
的行为取决于其同步器实现的语义。
该类为内部队列提供了检查、检测和监控方法,并为条件对象提供了类似的方法。这些方法可根据需要导出到使用AbstractQueuedSynchronizer
作为同步机制的类中。
该类的序列化只存储底层的原子整数维护状态,因此反序列化对象的线程队列是空的。需要序列化的典型子类将定义一个readObject
方法,用于在反序列化时将其恢复到已知的初始状态。
公平锁与非公平锁
最后我们来对比下ReentrantLock
中公平锁和非公平锁的区别
非公平锁
final void lock() {
if (compareAndSetState(0, 1)) //直接尝试CAS,插队了
setExclusiveOwnerThread(Thread.currentThread());//CAS成功就获取到锁
else
acquire(1);//没抢到才执行AQS里获取锁的逻辑,也就是在进行一次CAS,再没抢到锁才去排队
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁
final void lock() {
acquire(1);//老老实实执行获取锁的操作
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && //先看有没有其它线程排在该线程前面,如果该线程就是队首才尝试去CAS,不然就去排队
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
ReentrantLock
对于两种锁的执行逻辑
- 线程1进入
- 如果锁可用(同步状态允许),直接获取锁
- 锁不可用(被占用),封装为Node放入FIFO队列并阻塞
- 非公平锁:线程2进入,尝试获取锁(共2次CAS),如果竞争失败,放入FIFO队尾,阻塞等待
- 公平锁:线程2进入,检查等待队列,确保无线程等待,若存在线程等待,加入队尾等待
- 线程1释放锁,更新同步状态,唤醒队列中下一个等待线程
- 线程2被唤醒,尝试获取锁
相信经过本文的讲述你对AQS应该也不再陌生,如果感兴趣一定要自己去动手实践一下,前路漫漫,与君共勉。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com