「aqs使用java」aqs是什么意思

博主:adminadmin 2022-11-27 01:33:08 63

今天给各位分享aqs使用java的知识,其中也会对aqs是什么意思进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

AQS研究系列(一)--Unsafe使用

为了研究AQS,我们先来学习下java中cas(Compare And Swap)的基础Unsafe类的使用

Unsafe产生于java无法向c那样操作底层操作系统,但一些场景又需要相关操作.所以此类提供了一些java语言对于操作系统内存层面操作的API.这显然被认为是不安全的,所以此类是不公开的,不建议被java应用直接使用.

但现实中已经有大量的java并发相关操作的框架在使用它了....据说此类在计划废弃中.

Unsafe能操作内存?这个是什么概念?都有哪些操作呢?

其实最明显的是它大量方法都是直接操作内存地址进行操作的.方法可以分为下面几类:

我们可以使用LockSupport类进行操作

a. LockSupport.park()对应Unsafe的Unsafe.park(false, 0L)------给当前所在线程加锁,第一个参数表示true为精度型单位为纳秒,false单位毫秒,第二次参数表示等待时间;

b. LockSupport.park.unpark ---------Thread thread对应Unsafe的UNSAFE.unpark(thread)方法(解锁指定线程)

如果,我们直接使用Unsafe,是这样子的:

我们还可以通过Unsafe类获取对象的属性值.因为Unsafe类是直接操作内存的,所以需要我们获得对应的属性内存地址,如下操作:

如下操作,通过unsafe类实现cas原子操作.

好了,上面就是unsafe的基本几种使用,其也是aqs框架中cas操作的基础.下面我们进行aqs相关学习.

AQS研究系列(二)--线程状态和interrupt()、interrupted()、isInterrupted等方法学习

AQS研究系列(三)--AbstractQueuedSynchronizer源码分析

什么是重入锁和AQS

什么是重入锁

java.util.concurrent.locks.ReentrantLock

ReenTrantLock独有的能力:

1.      ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。

2.      ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

3.      ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。

非重入锁

当A方法获取锁去锁住一段需要做原子性操作的B方法时,如果这段B方法又需要锁去做原子性操作,那么A方法就必定要与B方法出现死锁。这种会出现问题的重入一把锁的情况,叫不可重入锁。

lock的过程:

   首先尝试获取资源,如果当前状态为0,表示没有线程占有锁,设置该线程为独占模式,使用CAS设置状态,否则如果当前线程和独占线程是一个线程,修改状态值,否则返回false。

  若获取资源失败,则通过addWaiter -(aqs)方法创建一个节点并放在CLH队列的尾部。head tail未初始化会创建虚拟节点同时指向

为什么 AQS 需要一个虚拟 head 节点

每个节点都需要设置前置Node 的 waitStatus  状态(这个状态为是为了保证数据一致性),防止重复释放操作。而第一个节点是没有前置节点的,所以需要创建一个虚拟节点。

  逐步去执行CLH队列中的线程,当前线程会公平性的阻塞一直到获取锁为止,返回线程在等待的过程中还是否中断过。

unlock的过程

一次unlock操作需要修改状态位,然后唤醒节点。整个释放操作也是使用unpark()来唤醒队列最前面的节点。其实lock中比较重要的也就是lock和release,它们又和AQS联系紧密,下面会单独谈谈AQS的重要方法。

Condition的await和signal

wait和notify/notify VS await和signal

Condition能够支持不响应中断,而通过使用Object方式不支持;

Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;

Condition能够支持超时时间的设置,而Object不支持

对标Object的wait方法

void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;

long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时;

boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位

boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

对标Object的notify/notifyAll方法

void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。

void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程

如图所示,ConditionObject是AQS的内部类,因此每个ConditionObject能够访问到AQS提供的方法,相当于每个Condition都拥有所属同步器的引用。

调用condition.await方法的线程必须是已经获得了lock,也就是当前线程是同步队列中的头结点。调用该方法后会使得当前线程所封装的Node尾插入到等待队列中。

如图,线程awaitThread先通过lock.lock()方法获取锁成功后调用了condition.await方法进入等待队列,而另一个线程signalThread通过lock.lock()方法获取锁成功后调用了condition.signal或者signalAll方法,使得线程awaitThread能够有机会移入到同步队列中,当其他线程释放lock后使得线程awaitThread能够有机会获取lock,从而使得线程awaitThread能够从await方法中退出执行后续操作。如果awaitThread获取lock失败会直接进入到同步队列。

// 线程已被取消

    static final int CANCELLED =  1;

    // 当前线程的后继线程需要被unpark(唤醒)

    // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。

    static final int SIGNAL    = -1;

    // 在Condition休眠状态,在等待Condition唤醒

    static final int CONDITION = -2;

    // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值

    static final int PROPAGATE = -3;

volatile int waitStatus;

---------------------

/**

    * 这个方法也就是lock()方法的关键方法。tryAcquire获得资源,返回true,直接结束。若未获取资源,新建一个节点插入队尾,

*addWaiter用于添加节点,也就是把当前线程对应的节点插入CLH队列的尾部。

    * @param arg the acquire argument.  This value is conveyed to

    *        {@link #tryAcquire} but is otherwise uninterpreted and

    *        can represent anything you like.

    */

    public final void acquire(int arg) {

        if (!tryAcquire(arg) //获取资源立刻结束

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//没有被中断过,也结束

            selfInterrupt();

    }

---------------------

protected final boolean tryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (!hasQueuedPredecessors()

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) { //判断是否持有锁的是自己,重入

                int nextc = c + acquires;

                if (nextc 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

---------------------

  * 非公平锁

    */

    static final class NonfairSync extends Sync {

        private static final long serialVersionUID = 7316153563782823691L;

        /**

        * Performs lock.  Try immediate barge, backing up to normal

        * acquire on failure.

        */

        final void lock() {

            if (compareAndSetState(0, 1))//CAS设置当前为0 的时候上锁

                setExclusiveOwnerThread(Thread.currentThread());

            else

                acquire(1);//否则尝试获得锁。

        }

        protected final boolean tryAcquire(int acquires) {

            return nonfairTryAcquire(acquires);

        }

    }

    /**

    * 公平锁

    */

    static final class FairSync extends Sync {

        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {

            acquire(1);

        }

        /**

        *

        */

        protected final boolean tryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) {

                if (!hasQueuedPredecessors()

                    compareAndSetState(0, acquires)) {//没有前驱节点并且CAS设置成功

                    setExclusiveOwnerThread(current);//设置当前线程为独占线程

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) {//这里和非公平锁类似

                int nextc = c + acquires;

                if (nextc 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

LockSupport与AQS

LockSupport类是Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。

每个线程都会有一个独有的permit(许可)。

相比较于wait/notify/notifyAll有何优点?

注意:LockSupport是不可重入的:unpark三次之后,park一次可以继续运行,再次park还是会被阻塞。可以理解为unpark是把某个标志位标为1,并不是加1。park是将这个标志位标为0,而非减1。

AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),它是JUC并发包中的核心基础组件。

AQS简单地说就是使用一个FIFO的等待队列和一个volatile int state来实现同步的。即通过CAS state判断是否被锁(CAS来保证原子性、volatile保证可见性),将阻塞的线程打包放入等待队列中。

1. AQS的使用者一般定义一个内部类来继承AQS,使用组合的方式使用。

2. AQS有两种模式:排他和共享。

排他模式:只有一个线程可以拥有锁。(排他锁)

共享模式:可以同时多个线程拥有锁。(读锁)

AQS中两种模式下的waiting thread共用一个queue,所以一般使用者都只是使用一种模式。ReentrantReadWriteLock是同时使用了两种模式。

使用者继承AQS,实现AQS中的几个未实现的方法。然后就可以调用AQS的方法来实现自己的接口功能了。

我们可以看到ReentrantLock使用一个内部类Sync来继承AQS,然后实现排他锁的三个方法。

我们知道ReentrantLock有lock和unlock接口,可以看到这两个接口的实现就是调用AQS原有的方法。

前三个是排他锁所要实现的,后两个是共享锁所要实现了。注意:这五个函数并不是abstract,原因是因为一般都是使用某一种模式(排他或共享模式),所以子类只需使用其中一组就可以了。

在使用AQS的类中用来加锁和解锁的方法。

这里我们可以看到“获取锁,如果失败则加入队列”这个行为是由AQS来实现的。而如何判断失败?这个是由子类来决定的。这个决定支持了可重入性、是否公平性等功能。

共享模式下的对应的四个方法。

我们知道公平锁:先来的一定先获取锁。

非公平锁:当多个线程在争取锁,谁先获取锁的顺序是不固定的。

AQS的公平性是由使用者来决定的。

我们知道AQS中的acquire函数是大致这样实现的。

因为每次acquire的步骤是:先try再入队列。所以就可以出现这种情况:队列中有两个线程在等待,当锁被释放时,刚好又来了一个线程,则try的时候成功了,这样这个线程就获得锁了。

如果想要实现公平锁:tryAcquire的时候判断一下,如果有线程在等待,这个函数直接返回false。

显然非公平锁要比公平锁的效果要高。

Java AQS如何清除垃圾节点

根据清除算法,整理算法,复制算法,分代算法进行清除。

清除算法为标记无用对象,然后进行清除回收,缺点为效率不高,无法清除垃圾碎片。

整理算法是标记无用对象,让所有存活的对象都向一端移动,然后直接清除掉端边界以外的内存。

复制算法是按照容量划分二个大小相等的内存区域,当一块用完的时候将活着的对象复制到另一块上,然后再把已使用的内存空间一次清理掉。缺点是内存使用率不高,只有原来的一半。

分代算法是根据对象存活周期的不同将内存划分为几块,一般是新生代和老年代,新生代基本采用复制算法,老年代采用标记整理算法。

J.U.C锁之 Semaphore

Semaphore 名为"信号量"。

Semaphore用来管理内部许可证,当多个线程要访问竞争资源时可以通过Semaphore来控制并发访问竞争资源的线程数。当线程需要访问竞争资源时需要首先获取一个许可证,执行完毕在返还,如果许可证用完则,线程进入同步队列并阻塞。等待许可证返回唤醒。

主要特性

公平性 :支持公平性和非公平性。所谓公平表示在获取锁时逻辑是否要考虑当前正在排队等待线程。按照大白话来说就时公平表示不能插入强占资源。

应用场景

获取许可

释放许可

其他方法

Semaphore 使用AQS实现锁机制,AQS是AbstractQueuedSynchronizer的缩写,翻译过来就是"同步器",,它实现了Java函数中锁同步(synchronized),锁等待(wait,notify)功能。

AbstractQueuedSynchronizer是一个抽象类,我们可以编写自己类继承AQS重写获取独占式或共享式同步状态模板方法,实现锁锁同步(synchronized),锁等待(wait,notify)功能

AQS核心是一个同步状态,两个队列。它们实现了Java函数中锁同步(synchronized),锁等待(wait,notify),并在其基础上实现了独占式同步,共享式同步2中方式锁的实现。

无论独占式还时共享式获取同步状态成功则直接返回,失败则进入CLH同步队列并阻塞当前线程。当获取同步状态线程释放同步状态,AQS会选择从CLH队列head头部节点的第一个节点释放阻塞,尝试重写竞争获取同步状态,如果成功则将当前节点出队。如果失败则继续阻塞。

获取同步状态的线程也可以使用condition对象释放同步状态进入等待队列。只有等待其他线程使用condition.signal或condition.signAll()唤醒被从阻塞状态中释放重新竞争获取同步状态成功后从原来指令位置继续运行。

AQS实现了锁,必然存在一个竞争资源。AQS存在从一个int类型的成员变量state,我们把它称为同步状态,同步状态通常用做判断线程能否获取锁的依据

AQS 实现了锁那么总需要一个队列将无法获取锁的线程保存起来,方便在锁释放时通知队列中线程去重新竞争锁。

实现原理

同步队列又被称为CLH同步队列,CLH队列是通过 链式方式实现FIFO双向队列 。当线程获取同步状态失败时,AQS则会将当前线程构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态被释放时,会把首节点后第一个节点的线程从阻塞状态下唤醒,唤醒的线程会尝试竞争同步状态,如果能获取同步状态成功,则从同步队列中出队。

实现原理

这里取消节点表示当前节点的线程不在参与排队获取锁。

从概念上来说独占式对应只存在一个资源,且只能被一个线程或者说竞争者占用.

从概念上来说共享式对应存在多个资源的是有多个线程或者竞争者能够获取占用.

我们可以编写自己类继承AQS选择重写独占式或共享式模板方法,从而定义如何获取同步状态和释放同步状态的逻辑。

tryAcquire :尝试独占式获取同步状态,返回值为true则表示获取成功,否则获取失败。

tryRelease :

尝试独占式释放同步状态,返回值为true则表示获取成功,否则获取失败。

tryAcquireShared :尝试共享式获取同步状态,当返回值为大于等于0的时获得同步状态成功,否则获取失败。

tryReleaseShared :尝试共享式释放同步状态,返回值为true则表示获取成功,否则获取失败。

由于多个线程可以同时许可同时执行,当然我们选择使用共享同步,Sync需要重写 tryAcquire 获取同步状态条件逻辑, tryRelease 释放同步条件逻辑。其核心点在于使用同步状态做判断。当同状态为0时,许可被使用完了,同步状态大于0,许可被还可用,每次调用 tryAcquire 同步状态-1,每次调用 tryRelease 同步状态+1

内部存在有三个内部类 Sync、NonfairSync 和 FairSync 类。

Semaphore 很多方法都通过代理内部类的方法实现。

公平信号量获取许可

非公平信号量获取许可

释放许可

完整源码

关于aqs使用java和aqs是什么意思的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

The End

发布于:2022-11-27,除非注明,否则均为首码项目网原创文章,转载请注明出处。