AQS

  1. AQS
    1. 那么AQS又是如何实现同步的?
    2. Unsafe
    3. 线程阻塞
    4. 锁释放
    5. 用AQS设计一个简单的锁
    6. 公平锁与非公平锁

AQS

引言

在多线程高并发场景下,我们为了保证操作的原子性,必要的需要对代码块进行线程同步。

我们知道ReentrantLock可以在Java中实现公平锁和非公平锁,它是类层面实现的锁,可以起到像sychronized关键字那样的同步功能,那么它是如何做到的呢?

为了深入了解ReentrantLock的实现,那么就必须要了解ReentrantLock的底层设计—AQS,这篇文章将会结合底层源码来理解AQS是什么。

img

AQS

AQS,其实就是AbstractQueuedSynchronizer(抽象队列式同步器)这个抽象类

位于java.util.concurrent.locks这个包下,如果看它的派生类,会发现各种同步场景的设计都使用到了AQS,如LeentrantLockSemaphoreThreadPoolExecutor等。

实际上AQS内部使用的是一种遵循FIFO的双向链表来存储线程对象的,这一点从源码中AQS的内部类Node即可看出。

设想这样的场景:如果有多个线程需要获取锁,那么我们对每个线程都要做妥善的处置,那么必然需要一个空间来存放这些线程以便我们管理这些线程,AQS就提供了这样一种数据结构来保存并发线程对象的引用。当我们需要唤醒线程,或是阻塞线程时都将通过这个FIFO的链式队列操作我们的线程对象。

static final class Node {

    //省略....
    
    volatile Node prev;

    volatile Node next;

    volatile Thread thread;
   
     //省略....
}

那么AQS又是如何实现同步的?

如果要实现同步,我们就需要一种状态机制来确定当前同步队列的状态,是处于锁释放状态,还是处于锁占用状态,锁占用状态下我们同步队列的各个线程状态又是如何的,锁释放状态我们的同步队列又该如何运作。

其实在AQS中,是通过一个原子的int值state来表示当前同步队列状态,具体的state值是多少,是交由具体的实现类去定义的。

由于AQS支持独占/共享模式,因此不同实现对state值的不同解释也不同,AQS只是做了抽象层面的定义。

例如

RenntrantLock实现中state值为1时表示有线程正在占用锁,state值代表0时表示锁被释放,这点我们来看源码。

final void lock() {
    acquire(1);//调用父类AQS的acquire方法
}
//父类AQS中的方法,如果尝试获取锁失败就阻塞当前线程,将调用子类的tryAcquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们关心的compareAndSetState这个语句,在底层使用的其实是Unsafe类。

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

没错可以看到AQS用了Unsafe类的方法,正是CAS操作(比较并替换),这是一个原子操作调用了本地方法,使用的。

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);//这个方法
}

Unsafe

这个类提供了一种底层的、不安全的操作内存的方式。在compareAndSetState方法中,Unsafe类的compareAndSwapInt方法被用来执行CAS(比较并交换)操作。

Unsafe这个类比较特殊,可以直接操作内存,一般来说高级语言是没法直接操作内存的,java也存在相关的安全检查和内存管理机制。但是通过Unsafe这个不安全的类就能完成这一操作。

compareAndSwapInt就是它提供的一种硬件级别的原子操作指令。

原子操即在多线程环境中单个线程要么一次性做完,要么全不做。

CAS操作分为下面三个步骤

  • 读取当前变量值
  • 比较这个值是否等于预期值
  • 如果是将这个变量更新为预期值

这样的三个操作就是原子操作

为什么要有这样的操作呢?

就好比现在多个人竞争一个房间的使用权,只有房间上挂上未使用(值=0)的牌子,才允许他人进去,这个人在进去前是不是就能挂上一个在使用的牌子(值=1),这样子其它人来的时候看到了就不会直接冲进去了,只能在外面等着。

只不过CAS操作保证了这个看牌子→判断→挂牌子的不被他人打断。

线程阻塞

回到刚才ReentrantLock对state值的改变,我们知道了ReentrantLock是通过AQS中提供的CAS方法(底层Unsafe操作本地方法)通过操作内存来改变state的值。

那么如果CAS操作失败怎么办,也就是已经有线程通过CAS获取到锁了(state = 1),这时候我们再来看AQS接下来的操作。

其实在刚才CAS操作之后,在AQS中还有一个相与的条件方acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),这个方法中有一个addWaiter方法,看似套娃其实就是把当前的线程构造为AQS同步队列中的一个Node并放入链式队列中了。

那么放进去就好了吗?当然不是,既然是同步阻塞队列,那么线程竞争锁失败肯定是会被阻塞的。我们来看看阻塞的具体方法。

其实就是这个方法 parkAndCheckInterrupt,我们进入这个方法看看。

final boolean acquireQueued(final Node node, int arg) {
    //...
    try {
        //...
        for (;;) {
            //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                    //...
        }
    } finally {
            //...
    }
}

可以看到这个方法使用到了LockSupport这个类,我们直接打入内部。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

现在我们打入LockSupport这个类的内部了,结果一看又是老朋友Unsafe提供的方法,那我们就不用往下看了,线程阻塞的方法也是Unsafe类提供的方法实现的。UNSAFE.park(false, 0L);

这里其实就是设置了线程无限期的等待,通过本地方法实现了线程的阻塞逻辑。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

到此为止我们大概了解了ReentrantLock是如何利用AQS来实现的线程获取锁以及获取锁失败后的阻塞原理。

让我们来梳理一下逻辑:

  • 当线程进入AQS后,具体的锁实现会调用acquire方法来获取锁,本质上就是CAS方法
  • 如果获取锁成功(CAS),其它线程进入后也会尝试CAS获取锁
  • 如果获取锁失败(CAS),就放入FIFO链式队列中阻塞等待

锁释放

我们知道使用锁的时候当线程完成任务时也一定要记得释放锁,否则就会发生死锁问题,那么在释放锁后,AQS又发生了什么呢?

这里我们就快速定位到释放锁方法,主要是以下两个步骤

  • 调用具体的释放锁方法,ReentrantLock中最后是将state的值设置回了0
  • 由AQS负责调用unparkSuccessor方法,将后续节点节点从阻塞状态被唤醒 (底层方法LockSupport.unpark(s.thread);)
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantLock中释放锁具体实现

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

那么后续节点所对应的阻塞线程从阻塞状态中恢复后发生了什么?

其实我们就看刚才发生阻塞的方法在调用了parkAndCheckInterrupt()后被阻塞,那么被唤醒后其实就是再次循环调用tryAcquire方法获取锁。这样子我们就分析完了从锁获取→阻塞→再次获取锁的一个分析流程。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //在这行被阻塞
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

用AQS设计一个简单的锁

到这里我们就能自己实现一个自己的锁了,我们仿照ReentrantLock的逻辑实现一个我们的锁,AQS使用了模板设计模式,将需要实现的方法开放给了我们,我们需要重写一些方法来实现具体的上锁和解锁方法。

下面的方法是博主自己写的,仅供学习参考,但是实现了基本的逻辑,大家可以参考ReentrantLock源码实现一个完善的锁

import java.util.concurrent.locks.AbstractQueuedSynchronizer;


public class AQSTest {

    public static void main(String[] args) {
        Lock lock = new Lock();

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("t1尝试获取锁");
                lock.lock();
                lock.lock();//重入锁
                System.out.println("t1获取到锁");
                System.out.println("5s后释放锁");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock.unlock();//释放锁
                lock.unlock();//释放锁
                System.out.println("t1释放锁");
            }
        });
        t1.start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("主线程尝试获取锁");
        lock.lock();
        System.out.println("主线程获取到锁");
        lock.unlock();
        System.out.println("主线程释放锁");

    }

}

class Lock {

    private Sync sync = new Sync();

    void lock(){
        sync.lock();
    }

    void unlock(){
        sync.unlock();
    }

    class Sync extends AbstractQueuedSynchronizer{

        void lock(){
            acquire(1);
        }

        /**
         * 返回true代表获取锁成功,false代表获取锁失败
         * @param arg
         * @return
         */
        @Override
        protected boolean tryAcquire(int arg) {
            int state = getState();
            if (state == 0){
                //CAS操作
                compareAndSetState(0,arg);
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }else if (getExclusiveOwnerThread() == Thread.currentThread()){
                //可重入锁逻辑
                int nextS = getState() + arg;
                setState(nextS);//更新状态值,直接更新
                return true;
            }
            return false;

        }

        /**
         * 返回true代表释放锁成功,false代表释放锁失败
         * @param arg
         * @return
         */
        @Override
        protected boolean tryRelease(int arg) {
            //当前线程释放锁
            int c = getState() - arg;
            if (Thread.currentThread() != getExclusiveOwnerThread()){
                throw new IllegalMonitorStateException();
            }
            boolean b = false;
            if (c == 0){
                b = true;
                setExclusiveOwnerThread(null);//清除拥有锁的线程引用
            }
            setState(c);
            return b;
        }

        void unlock(){
            release(1);
        }

    }

}

结果

t1尝试获取锁
t1获取到锁
5s后释放锁
主线程尝试获取锁
t1释放锁
主线程获取到锁
主线程释放锁

下面是AQS的JDK注释,如果要设计锁的话可以仔细结合源码阅读

为实现阻塞锁和依赖于先进先出(FIFO)等待队列的相关同步器(semaphores、事件等)提供了一个框架。该类旨在为大多数依赖于单个原子 int 值来表示状态的同步器提供有用的基础。子类必须定义改变该状态的受保护方法,以及定义该状态在该对象被获取或释放时的含义。有了这些方法,该类中的其他方法就可以执行所有队列和阻塞机制。子类可以维护其他状态字段,但只有使用 getStatesetStatecompareAndSetState 方法原子更新的 int 值才会被同步跟踪。
子类应定义为非公开的内部辅助类,用于实现其外层类的同步属性。AbstractQueuedSynchronizer 类没有实现任何同步接口。相反,该类定义了 acquireInterruptibly 等方法,具体锁和相关同步器可根据需要调用这些方法来实现其公共方法。
该类支持默认独占模式和共享模式。在独占模式下获取时,其他线程尝试获取不会成功。多个线程在共享模式下的获取可能(但不必)成功。该类并不 “理解 “这些差异,只是在机械意义上,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也能获取。在不同模式下等待的线程共享同一个 FIFO 队列。通常,实现子类只支持其中一种模式,但这两种模式都会发挥作用,例如在读写锁中。只支持独占模式或共享模式的子类无需定义支持未使用模式的方法。
该类定义了一个嵌套的 AbstractQueuedSynchronizer.ConditionObject 类,支持独占模式的子类可将该类用作 Condition 实现,其中的方法 isHeldExclusively 会报告同步是否被当前线程独占,使用当前 getState 值调用的方法 release 会完全释放该对象,而获取(acquire)则会根据保存的状态值,最终将该对象恢复到之前的获取状态。没有任何 AbstractQueuedSynchronizer 方法会在其他情况下创建这样的条件,因此如果无法满足这一限制条件,请不要使用该方法。当然,AbstractQueuedSynchronizer.ConditionObject 的行为取决于其同步器实现的语义。
该类为内部队列提供了检查、检测和监控方法,并为条件对象提供了类似的方法。这些方法可根据需要导出到使用 AbstractQueuedSynchronizer 作为同步机制的类中。
该类的序列化只存储底层的原子整数维护状态,因此反序列化对象的线程队列是空的。需要序列化的典型子类将定义一个 readObject 方法,用于在反序列化时将其恢复到已知的初始状态。

公平锁与非公平锁

最后我们来对比下ReentrantLock中公平锁和非公平锁的区别

非公平锁

final void lock() {
    if (compareAndSetState(0, 1)) //直接尝试CAS,插队了
        setExclusiveOwnerThread(Thread.currentThread());//CAS成功就获取到锁
    else
        acquire(1);//没抢到才执行AQS里获取锁的逻辑,也就是在进行一次CAS,再没抢到锁才去排队
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平锁

final void lock() {
    acquire(1);//老老实实执行获取锁的操作
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&  //先看有没有其它线程排在该线程前面,如果该线程就是队首才尝试去CAS,不然就去排队
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
}

ReentrantLock对于两种锁的执行逻辑

  1. 线程1进入
  2. 如果锁可用(同步状态允许),直接获取锁
  3. 锁不可用(被占用),封装为Node放入FIFO队列并阻塞
    1. 非公平锁:线程2进入,尝试获取锁(共2次CAS),如果竞争失败,放入FIFO队尾,阻塞等待
    2. 公平锁:线程2进入,检查等待队列,确保无线程等待,若存在线程等待,加入队尾等待
  4. 线程1释放锁,更新同步状态,唤醒队列中下一个等待线程
  5. 线程2被唤醒,尝试获取锁

相信经过本文的讲述你对AQS应该也不再陌生,如果感兴趣一定要自己去动手实践一下,前路漫漫,与君共勉。


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以邮件至 1300452403@qq.com

文章标题:AQS

字数:3.9k

本文作者:Os467

发布时间:2024-05-26, 00:14:11

最后更新:2024-05-26, 00:15:37

原始链接:https://os467.github.io/2024/05/26/AQS/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

×

喜欢就点赞,疼爱就打赏