一、什么是JUC

JUC指的是Java并发编程工具包(Java Concurrency Utilities),它提供了一系列的工具类和接口,用于简化并发编程的开发。虽然Java语言中已经提供了synchronized关键字来支持多线程并发编程,但是JUC提供了更加灵活和高效的实现方式,以满足更加复杂的并发编程需求。

具体来说,JUC提供了以下几个重要的类和接口:

  1. Lock和Condition:Lock和Condition是替代synchronized关键字的工具,它们提供了更加灵活和可控的线程同步机制,使得在高并发情况下能够更加高效地协调线程之间的交互。
  2. Semaphore:Semaphore是一种计数信号量,它可以用来控制同时访问某个资源的线程数量。
  3. CountDownLatch:CountDownLatch是一种倒计数器,它可以让一个线程等待多个其他线程执行完毕后再继续执行。
  4. CyclicBarrier:CyclicBarrier也是一种倒计数器,它可以让多个线程相互等待,直到所有线程都到达某个屏障点后再一起继续执行。
  5. Executor和ExecutorService:Executor和ExecutorService是线程池的实现,它们可以有效地管理和控制线程的数量,从而避免创建过多的线程导致系统资源浪费和性能下降。

综上所述,虽然synchronized关键字可以实现基本的线程同步和互斥,但在高并发情况下,JUC提供的工具类和接口能够更加灵活和高效地协调线程之间的交互,从而提高程序的性能和可靠性。

在正式介绍JUC之前,先思考一个问题,那就是如果让我们自己实现一个类似sychronized的锁机制,我们要如何去设计呢?

锁设计猜想

  • 一定会设计到锁的抢占 , 需要有一个标记来实现互斥。 全局变量(0,1)

  • 抢占到了锁,怎么处理(不需要处理.)

  • 没抢占到锁,怎么处理

    • 需要等待(让处于排队中的线程,如果没有抢占到锁,则直接先阻塞->释放CPU资源)。
      • 如何让线程等待?
        • wait/notify(线程通信的机制,无法指定唤醒某个线程)
        • LockSupport.park/unpark(阻塞一个指定的线程,唤醒一个指定的线程) Condition
    • 需要排队(允许有N个线程被阻塞,此时线程处于活跃状态)。
      • 通过一个数据结构,把这N个排队的线程存储起来。
  • 抢占到锁的释放过程,如何处理

    • LockSupport.unpark() -> 唤醒处于队列中的指定线程.
  • 锁抢占的公平性(是否允许插队)

  • 公平

  • 非公平

在JUC中,AQS(AbstractQueuedSynchronizer)即是实现上述过程的一个框架类

二、AQS

AQS(AbstractQueuedSynchronizer)是一个用于构建锁、同步器等并发组件的框架
它是Java并发包(java.util.concurrent)的核心组件之一。

首先介绍一下AQS中用到的一些类与变量

1.内部类—Node

Node作为等待队列节点类,在线程竞争锁失败后,AQS会将线程封装成一个Node节点,放入等待队列当中,自旋等待,自旋竞争再次失败后就会进入等待状态,等待被唤醒再次竞争锁

Node类包含以下主要字段:

  1. prev:指向前一个节点的指针。
  2. next:指向后一个节点的指针。
  3. thread:持有该节点的线程对象。
  4. waitStatus:用于表示线程的状态,包括取消、阻塞、等待等。
  5. nextWaiter:用于在等待队列中链接不同条件的线程。
1
2
3
     +------+  prev +-----+       +-----+
head | | <---- | | <---- | | tail
+------+ +-----+ +-----+

Node类源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
static final class Node {
/** 标识共享模式的静态常量,它被用于表示线程等待的条件是共享的,而不是独占的 */
static final Node SHARED = new Node();
/** 标识独占模式的静态常量,它被用于表示线程等待的条件是独占的,而不是共享的 */
static final Node EXCLUSIVE = null;

/** 用于标记一个节点已经被取消,不再参与同步操作 */
static final int CANCELLED = 1;
/** 用于表示一个节点已经被唤醒,并且可以尝试竞争同步资源 */
static final int SIGNAL = -1;
/**
* 是一个表示节点所属的条件队列的静态常量。它被用于区分节点所属的等待队列和条件队列
* 每个节点在创建时,都会与一个特定的条件队列相关联。
* 当节点被添加到等待队列中时,它的CONDITION字段会被设置为与它相关联的条件队列。
* 在等待队列中,节点的状态为WAITING或SIGNAL;而在条件队列中,节点的状态为CONDITION。
*/
static final int CONDITION = -2;
/** 标志表示当前节点需要向后传播唤醒信号,通知后继节点可以尝试获取锁 */
static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;
/** 表示持有锁或正在等待锁的线程 */
volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

2.变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 指向同步队列头
*/
private transient volatile Node head;

/**
* 指向同步队列尾
*/
private transient volatile Node tail;

/**
* 同步状态
*/
private volatile int state;

同步队列是AQS中一个重要的概念,用于实现线程的阻塞和唤醒,以及线程的竞争获取同步状态。

AQS中包含了三种队列概念:等待队列、条件队列和同步队列。

  1. 等待队列(Wait Queue):等待队列是AQS中用于存放被阻塞的线程的数据结构。当一个线程需要获取同步状态,但是当前同步状态已经被其他线程占用时,该线程会被封装成一个Node节点并加入到等待队列中。等待队列是一个FIFO队列,可以保证等待时间最长的线程先被唤醒。
  2. 条件队列(Condition Queue):条件队列是基于等待队列实现的,用于支持条件变量的功能。当一个线程需要等待一个条件变量时,它会被封装成一个Node节点并加入到条件队列中,而不是等待队列中。当满足条件时,条件队列中的线程会被转移至等待队列中等待获取同步状态。
  3. 同步队列(Sync Queue):同步队列是AQS中存放已经获取到同步状态的线程(即通过acquire获取成功)的数据结构。当一个线程获取到同步状态后,它会从等待队列中转移到同步队列中,并且会释放之前占有的同步状态。同步队列的管理是通过head和tail指针实现的,head指向同步队列的第一个节点,tail指向同步队列的最后一个节点。

此外,还需要注意exclusiveOwnerThread这个变量,它来自于AQS的抽象父类AbstractOwnableSynchronizer,该类就两个方法,分别是设置和获取该变量exclusiveOwnerThread

3.方法

AQS中的几个重要方法如下:

  1. acquire(int arg):尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
  2. acquireShared(int arg):尝试获取共享锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。
  3. release(int arg):释放独占锁。
  4. releaseShared(int arg):释放共享锁。
  5. tryAcquire(int arg):尝试获取独占锁,如果获取成功,则返回true,否则返回false。
  6. tryAcquireShared(int arg):尝试获取共享锁,如果获取成功,则返回一个大于等于0的值,表示获取共享锁的线程数,否则返回负数。
  7. tryRelease(int arg):尝试释放独占锁,如果成功则返回true,否则返回false。
  8. tryReleaseShared(int arg):尝试释放共享锁,如果成功则返回true,否则返回false。
  9. acquireInterruptibly(int arg):尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。

这些方法是AQS中的核心方法,用于实现同步和互斥。

  • acquire()、acquireShared()、release()、releaseShared() 是AQS中最常用的同步方法

  • tryAcquire()、tryAcquireShared()、tryRelease()、tryReleaseShared() 则是尝试获取/释放同步状态,这些方法一般是被重载后使用。

  • acquireInterruptibly() 方法也是尝试获取独占锁,不过它会响应中断。

1)compareAndSetState

AQS中通过state变量来标识同步状态,为了保证state参数修改的可见性、原子性,在AQS当中使用CAS机制来进行state的修改

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
2)acquire

尝试获取独占锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

其中,tryAcquire 是 AQS 中一个抽象方法,需要用户自定义实现。在 AQS 中,同步状态的获取和释放都是通过 tryAcquiretryRelease 方法实现的,因此用户可以根据自己的需求来定义同步状态的获取和释放逻辑。

addWaiter 方法是 AQS 中的一个辅助方法,用于将一个新的 Node 节点加入到等待队列中,并返回这个新节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

在这里,AQS中的enq方法用于将线程加入到等待队列中,实现方式是通过CAS(compare-and-swap)操作将节点插入到队尾,保证线程的插入是原子性的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued传入addWaiter 方法返回的节点,一丝是在线程被包装成节点入队之后,还会尝试调用自旋来获取锁,步骤如下:

  1. 尝试获取锁,如果成功则直接返回。
  2. 如果获取锁失败,则线程会进入自旋状态,不断地检查前驱节点的状态是否为 SIGNAL。
  3. 如果前驱节点状态为 SIGNAL,说明当前线程可以尝试获取同步状态了,于是调用tryAcquire方法再次尝试获取锁。
  4. 如果获取锁成功,则当前线程会从等待队列中移除,并返回。
  5. 如果tryAcquire方法返回false,则当前线程会继续自旋等待前驱节点唤醒自己。

acquireQueued方法中的自旋是在等待前驱节点释放锁的过程中进行的,如果等待时间过长,一般是由于前驱节点无法释放锁,这时会进入阻塞状态。在进入阻塞状态前,会将自己的节点状态设置为WAITING,并通过LockSupport.park()方法挂起线程,等待前驱节点的唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// node表示当前来抢占锁的线程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) { // 自旋
// 尝试去获得锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 返回true,不需要等待直接返回
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 否则让线程去阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 用于在当前线程上阻塞,直到被其他线程唤醒或中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 使当前线程阻塞
return Thread.interrupted(); // 返回当前线程中断状态,即是否被中断
}
3)acquireShared

尝试获取共享锁,如果获取失败,则将当前线程加入同步队列并进行自旋或者阻塞,直到获取成功或者被中断。

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
4)release

释放独占锁

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

其中,tryRelease交给子类实现

4.小结

AQS是一个用于构建同步器的框架,它为子类提供了许多方法,使得子类仅需要重写部分方法就可以方便的实现同步功能,下面是AQS为子类提供的一些公共功能

  1. 获取/释放锁:AQS提供了acquire()和release()两个方法,这两个方法是获取锁和释放锁的基础。
  2. 等待队列的操作:AQS提供了许多操作等待队列的方法,例如enq()、deq()、transferForSignal()等等。这些方法让子类能够方便地实现等待队列的管理。
  3. 条件队列的操作:AQS还提供了一些操作条件队列的方法,例如addConditionWaiter()、transferAfterCancelledWait()等等。这些方法可以让子类方便地实现条件队列的管理。
  4. 重入锁的实现:AQS还提供了一些方法,可以方便地实现重入锁。例如tryAcquire()、tryRelease()等等。
  5. 共享锁的实现:AQS提供了一些方法,可以方便地实现共享锁。例如tryAcquireShared()、tryReleaseShared()等等。
  6. 线程的中断处理:AQS提供了interruptMode()和clearInterruptsForReentry()两个方法,可以方便地处理线程中断的情况。

总的来说,AQS并没有定义具体的加锁和释放锁的逻辑,而是通过子类来实现这些逻辑,同时提供了一些钩子方法,使得子类可以在特定的时间点进行扩展和定制。这种设计能够提高复用性和灵活性,使得开发者可以快速地构建出各种同步器,以满足不同的需求。

其中,子类至少需要重写以下方法:

  1. tryAcquire(int):尝试以独占模式获取同步状态,如果获取成功,返回true,否则返回false。
  2. tryRelease(int):尝试以独占模式释放同步状态,如果释放成功,返回true,否则返回false。
  3. tryAcquireShared(int):尝试以共享模式获取同步状态,如果获取成功,返回大于等于0的值,否则返回小于0的值。
  4. tryReleaseShared(int):尝试以共享模式释放同步状态,如果释放成功,返回true,否则返回false。

三、ReentrantLock类

1.基础分析

ReentrantLock并不是直接继承了AQS抽象类,而是定义了一个内部类来继承AQS,并在此基础上分别实现了公平锁与非公平锁两种同步类型,宏观上看代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ReentrantLock implements Lock, java.io.Serializable {

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {

abstract void lock();
// 默认实现非公平锁
final boolean nonfairTryAcquire(int acquires) {
...
}

protected final boolean tryRelease(int releases) {
...
}

...
}

/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
...
}

protected final boolean tryAcquire(int acquires) {
...
}
}

/**
* 公平锁
*/
static final class FairSync extends Sync {
final void lock() {
...
}

protected final boolean tryAcquire(int acquires) {
...
}
}

...
public void lock() {
sync.lock();
}
}

在不指定锁类型时,声明的就是非公平锁,如果传入了boolean值,则判断后生成公平或者非公平锁

1
2
3
4
5
6
7
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

2.非公平锁-加锁

首先看加锁过程:

1
2
3
4
5
6
7
8
final void lock() {
// 无论AQS中是否有排队情况,都会尝试插队抢占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 插队失败后执行AQS中的acquire方法
acquire(1);
}

在上面已经分析过acquire方法,这里不再赘述,简单来说就是一个封装线程为节点入队,并在入队后自旋尝试继续获取锁的过程

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

由ReentrantLock实现的非公平锁加锁过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { // 无锁状态
// 不管队列情况,直接抢锁
if (compareAndSetState(0, acquires)) { //CAS(#Lock) -> 原子操作| 实现互斥的判断
setExclusiveOwnerThread(current); //把获得锁的线程保存到exclusiveOwnerThread中
return true;
}
}
//如果当前获得锁的线程和当前抢占锁的线程是同一个,表示重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc); // 保存state
return true;
}
return false;
}

3.公平锁-加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 尝试抢占一把锁
final void lock() {
acquire(1);
}

// AQS中定义
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 注意与非公平锁的tryAcquire方法区别在这里
// hasQueuedPredecessors用来判断当前线程是否有前驱节点,如果有就不能抢占
// 即需要按照入队顺序依次获取锁,如果当前线程前面有线程未获得锁,当前线程也不能抢占
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

4.解锁

ReentrantLock类内

1
2
3
4
public void unlock() {
sync.release(1);
}

AQS中

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { // 判断是否完全释放了锁,即state为0
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 队列不为空,尝试唤醒
return true;
}
return false;
}

ReentrantLock中

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // state新值
if (Thread.currentThread() != getExclusiveOwnerThread()) //判断释放锁的线程是否持有锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //完全释放了锁
free = true;
setExclusiveOwnerThread(null); // 释放独占标记
}
setState(c);
return free;
}

AQS中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 表示可以唤醒状态
compareAndSetWaitStatus(node, ws, 0); // 恢复状态值为0

Node s = node.next;
if (s == null || s.waitStatus > 0) { // 说明这个线程已经被销毁,或者存在异常
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 从tail往head进行遍历
if (t.waitStatus <= 0) // 查找状态小于等于0,表示需要被唤醒
s = t;
}
// 最后找到的是head后面第一个能唤醒的节点
if (s != null)
LockSupport.unpark(s.thread);// 唤醒
}

下面是waitStatus的取值与代表意义

  • SIGNAL(-1):表示当前节点的后继节点需要被唤醒。
  • CANCELLED(1):表示当前节点已经取消等待。
  • CONDITION(2):表示当前节点在条件队列中等待。
  • PROPAGATE(-3):用于共享模式下,表示后继节点需要向前传播唤醒信号。

四、补充

1.AQS为什么要采用双向链表的结构呢?

1)新入队列的线程,需要保证它的前置节点状态是正常的,不然可能存在异常节点导致后续无法正常唤醒,因此需要能够从后往前查询,不然就要从头往后遍历

2)在队列中的线程,是允许被中断的,被中断之后标记为cancel状态,继续存在与队列当中,如果单向链表,就必须要从头往后遍历

3)新入队的节点,由于公平锁的存在,要判断前驱节点是否是头节点,如果是头节点才会继续自旋尝试获取锁,不然入队后的自旋就毫无意义,因此要能查找前置节点

总结:主要是要考虑入队和唤醒