一、线程的通信

1.通信方式

  1. 共享内存:多个线程共享同一块内存空间,可以直接在内存中读写数据进行通信。
  2. 互斥锁:使用互斥锁来实现对共享资源的访问控制,确保同一时间只有一个线程能够访问共享资源,从而避免多个线程同时对资源进行读写而导致数据不一致的问题。
  3. 条件变量:条件变量通常和互斥锁一起使用,用于线程之间的信号传递。一个线程可以等待某个条件变量被触发,另一个线程可以通过发送信号来触发条件变量,从而唤醒等待该条件变量的线程。
  4. 信号量:信号量是一种用于线程间同步和互斥的机制。它可以用于多个线程之间的同步,以确保它们在互斥资源上的访问顺序正确。
  5. 管道:管道是一种用于进程间通信的机制,但也可以用于线程之间的通信。一个线程可以将数据写入管道,另一个线程可以从管道中读取数据,从而实现线程之间的数据传输。
  6. 消息队列:消息队列是一种用于进程间通信的机制,但也可以用于线程之间的通信。一个线程可以向消息队列发送消息,另一个线程可以从消息队列中读取消息,从而实现线程之间的数据传输。

线程间经常使用wait/notify/notifyAll进行通信,这种方式属于条件变量通信

wait/notify关键字来源于Object类,属于native标注的Java内置方法,底层由C++来实现

2.同步队列与阻塞队列

1)同步队列

同步队列是Java中用于实现线程同步的一种数据结构,通常是一个先进先出(FIFO)的队列。它通常是指一个队列,其中的线程已经持有了锁/或者尝试获取锁,但是由于某些原因(例如等待某些资源)无法执行,于是它们在队列中等待锁释放。

同步队列是通过Java内置的锁机制实现的,当一个线程在获取锁时,如果锁被其他线程持有,则该线程会进入同步队列中排队等待获取锁。当锁被释放时,队列中的线程会按照先进先出的顺序依次尝试获取锁并执行任务。

同步队列中的线程状态为BLOCKED或者RUNNABLE

说人话就是,同步队列当中放着持有锁或者正在等待拿锁的线程

2)等待队列/阻塞队列

一般来说,线程的等待队列是指线程在调用wait()方法时进入的一个队列,用于等待条件满足。当一个线程调用了对象的wait()方法后,该线程会释放占有的锁,并进入该对象的等待队列中等待被唤醒。线程在等待队列中会处于WAITINGTIMED_WAITING状态,等待其他线程通过notify()或notifyAll()方法将其唤醒。

3.wait()/notify()

通过一个生产者-消费者模型来讲解下这两者的作用与关系

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Producer implements Runnable {
private Queue<String> bags;
private int maxSize;
public Producer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}

@Override
public void run() {
int i=0;
while(true){
i++;
synchronized (bags){ //抢占锁
if(bags.size()==maxSize){
System.out.println("bags 满了");
try {
//park(); ->JVM ->Native
bags.wait(); //满了,阻塞当前线程并且释放Producer抢到的锁
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("生产者生产:bag"+i);
bags.add("bag"+i); //生产bag
bags.notify(); //表示当前已经生产了数据,提示消费者可以消费了
} //同步代码快执行结束
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Consumer implements Runnable{
private Queue<String> bags;
private int maxSize;
public Consumer(Queue<String> bags, int maxSize) {
this.bags = bags;
this.maxSize = maxSize;
}

@Override
public void run() {
while(true){
synchronized (bags){
if(bags.isEmpty()){
System.out.println("bags为空");
try {
bags.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String bag=bags.remove();
System.out.println("消费者消费:"+bag);
bags.notify(); //这里只是唤醒Producer线程,但是Producer线程并不能马上执行。
}
}
}
}

如上,当生产者发现队列已满时,会调用wait()进行阻塞;当生产了资源之后,可以调用notify()通知消费者可以开始消费了;同理,消费者发现队列为空会陷入阻塞等待唤醒,消费后会尝试唤醒生产者

这里要注意,只有持有锁的对象才可以调用wait()方法,不然会抛出异常

4.join

join也是基于wait/notify来实现,notify是在线程销毁之后调用的

join() 是 Java 中 Thread 类提供的一个方法,可以让一个线程等待另一个线程的结束,直到被等待的线程执行完毕,或者等待一定的时间,或者被中断。

具体来说,调用 t.join() 方法会让当前线程(通常是主线程)等待线程 t 执行完毕,也就是线程 t 终止之前,当前线程都会处于阻塞状态,等待线程 t 终止后才能继续执行。如果 t 在当前线程调用 join() 方法之前已经终止,那么 join() 方法会立即返回。

5.Condition

Condition实际上就是J.U.C版本的wait/notify。可以让线程基于某个条件去等待和唤醒

那么为什么有了wait/notify的情况下,还要再去实现一个Condition呢?

Condition 的出现主要是为了解决 Object 中的 wait() 和 notify() 在多个线程等待同一个锁时的局限性。

Condition 提供了三个基本操作:

  1. await(): 使当前线程等待直到其他线程唤醒它。
  2. signal(): 唤醒一个等待在 Condition 上的线程。
  3. signalAll(): 唤醒所有等待在 Condition 上的线程。

与 Object 的 wait() 和 notify() 方法相比,Condition 提供的操作更加灵活。它可以允许多个等待线程同时等待同一个锁,并且可以按照需要唤醒指定的线程或者所有线程。

另外,Condition 对象必须和一个 Lock 对象绑定使用,因为 Condition 需要依赖于锁来实现等待和唤醒操作

Condition设计猜想

  • 作用: 实现线程的阻塞和唤醒
  • 前提条件: 必须先要获得锁 await/signal/signalAll
    • await -> 让线程阻塞, 并且释放锁
    • signal -> 唤醒阻塞的线程
  • 加锁的操作,必然会涉及到AQS的阻塞队列
  • await 释放锁的时候,-> AQS队列中不存在已经释放锁的线程 -> 这个被释放的线程去了哪里?
  • signal 唤醒被阻塞的线程 -> 从哪里唤醒?

通过await方法释放的线程,必须要有一个地方来存储,并且还需要被阻塞—–因此需要有一个等待队列

signal——从上面猜想到的等待队列中,唤醒一个线程——唤醒后放哪里去? 是不是应该再放到AQS队列?

二、Condition

1.接口介绍

先来看看Condition接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Condition {

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();
}

Condition 对象必须和一个 Lock 对象绑定使用,因为 Condition 需要依赖于锁来实现等待和唤醒操作。例如,可以通过调用 Lock 的 newCondition() 方法来创建一个 Condition 对象:

1
2
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

在使用 Condition 时,通常的流程是:

  1. 获得锁。
  2. 判断条件是否满足,如果满足则执行相应的操作,否则执行 await() 方法将线程加入到等待队列中。
  3. 等待其他线程唤醒自己。
  4. 被唤醒后再次获得锁。
  5. 执行相应的操作并释放锁。

2.await方法

Condition是一个接口,本身是不包含方法的实现的,它在并发编程工具类之中,主要在两个类之中被使用,分别是AbstractQueuedLongSynchronizerAbstractQueuedSynchronizer,后者正是前文提到过的AQS框架

下面分析主要依据AQS框架之中实现Condition接口的内部类ConditionObject来分析

在AQS框架之中,ConditionObject属于内部类,实现了Condition接口定义的诸多方法,其中await方法的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加到等待队列
Node node = addConditionWaiter(); // 1
// 完整的释放锁(考虑重入问题)
int savedState = fullyRelease(node); // 2
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 3
// 上下文切换(程序计数器、寄存器) 用户态-内核态的转化(上下文切换)
// 阻塞当前线程(当其他线程调用signal()方法时,该线程会从这个位置去执行)
LockSupport.park(this);
// 要判断当前被阻塞的线程是否是因为interrupt()唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 4
break;
}
// 重新竞争锁,savedState表示的是被释放的锁的重入次数.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 5
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 用于判断线程在等待过程中是否被中断,如果被中断,会设置线程的中断状态。
reportInterruptAfterWait(interruptMode);
}

注意LockSupport.park(this);将线程阻塞在了这里

概述上述方法执行步骤如下:

  1. 获取当前线程的独占锁,并将状态设置为等待状态;
  2. 将当前线程加入等待队列中,等待唤醒;
  3. 释放独占锁,允许其他线程访问;
  4. 等待被唤醒,被唤醒后重新尝试获取独占锁;
  5. 获取到锁后,将等待状态清除。

具体的,代码分析如下:

1.首先,调用了addConditionWaiter获得封装的节点,方法具体实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addConditionWaiter() {
Node t = lastWaiter; // 最后一个等待节点
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 从头开始,清除所有CANCELLED状态的节点
t = lastWaiter; // t始终指向最后一个等待队列的节点
}
// 封装node为当前线程 设置为等待状态
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

2.用于完全释放独占锁或者共享锁,即如果node当前持有锁,则释放之;就是将持有锁的state值恢复为0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) { // 释放锁
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

3.用于判断指定的线程是否在 AQS 同步队列中,如果是,则需要将其从同步队列中移除

1
2
3
4
5
6
7
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}

4.其作用是在当前线程等待过程中检查中断状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// REINTERRUPT表示重置线程的中断状态,即线程在等待时被中断,
// 但是不能抛出异常,需要重置中断状态,让线程继续执行。
private static final int REINTERRUPT = 1;
// THROW_IE表示抛出InterruptedException异常
private static final int THROW_IE = -1;

private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/*
* transferAfterCancelledWait 方法的作用就是当一个线程被意外唤醒后,将该线程重新加入到等待队列中。
* 具体来说,该方法会将该线程从同步队列中删除,并重新将该线程加入到等待队列中,然后会对该线程进行自旋,等待被唤醒。
*/

5.acquireQueued是AQS中的一个核心方法,主要用于实现独占模式下的获取锁操作。该方法会在一个循环中不断尝试获取锁,如果获取不到,则会将当前线程包装成一个Node节点,并将其加入等待队列中,最后通过调用parkAndCheckInterrupt方法使线程阻塞等待唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3.signal方法

  • 要把被阻塞的线程,先唤醒(signal、signalAll)

  • 把等待队列中被唤醒的线程转移到AQS队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;// 得到当前等待队列
if (first != null)
doSignal(first);
}
// 唤醒等待队列中的一个线程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

唤醒操作

1
2
3
4
5
6
7
8
9
10
11
final boolean transferForSignal(Node node) {
// 被取消,不能唤醒
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 这里是把当前等待队列中头部节点--->保存到AQS队列
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

唤醒之后,还记得上await方法中的LockSupport.park(this);吗?当时阻塞在这里,现在唤醒之后会继续往下执行

4.Condition的实际应用

->实现阻塞队列(业务组件)

-> 在线程池中会用到阻塞队列

-> 生产者消费者

-> 流量缓冲

三、阻塞队列

1.什么叫阻塞队列?

队列是一种只允许在一端进行删除操作,在另一端进行插入操作的线性表,允许插入的一端称为队尾、
允许删除的一端称为队头。
那么阻塞队列,实际上是在队列的基础上增加了两个操作。

  • 支持阻塞插入:队列满了的情况下,会阻塞继续往队列中添加数据的线程,直到队列元素被释放。
  • 只是阻塞移除:队列为空的情况下,会阻塞从队列中获取元素的线程,直到队列添加了新的元素。

2.阻塞队列中的方法

添加元素

针对队列满了之后的不同的处理策略

  • add -> 如果队列满了,抛出异常
  • offer -> true/false , 添加成功返回true,否则返回false
  • put -> 如果队列满了,则一直阻塞
  • offer(timeout) , 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout 时长,超过阻塞时长,返回false。

移除元素

  • element-> 队列为空,抛异常
  • peek -> true/false , 移除成功返回true,否则返回false take -> 一直阻塞
  • poll(timeout) -> 如果超时了,还没有元素,则返回null

dequeue -> LIFO , FIFO的队列.

3.JUC中的阻塞队列

  • ArrayBlockingQueue 基于数组结构
  • LinkedBlockingQueue 基于链表结构
  • PriorityBlcokingQueue 基于优先级队列
  • DelayQueue 允许延时执行的队列
  • SynchronousQueue 没有任何存储结构的的队列

4.ArrayBlockingQueue

变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

最终的入队方法,本质就是一个数组

1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putInd、ex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

5.LinkedBlockingQueue

本质就是一个单向链表,因此是无界阻塞队列

6.PriorityBlcokingQueue

双向链表组成的队列.

支持双向插入和移除.

在一定程度上能够解决多线程的竞争问题

7.SynchronousQueue

SynchronousQueue 是 Java 并发包中的一个特殊的 BlockingQueue 实现。它在一定程度上类似于管道,允许线程在管道两端之间进行信息传递,但它具有阻塞和同步的特性。

SynchronousQueue 的特点是容量为 0,也就是说在一个线程放入元素之前必须有另一个线程等待从队列中取走该元素。如果没有线程等待取元素,那么就无法插入元素,因此 put 操作将会一直阻塞,直到有另一个线程取走该元素。

SynchronousQueue 的主要用途是让一个线程等待另一个线程的结果,例如一个生产者线程等待一个消费者线程处理完数据。它可以用于生产者消费者模式、任务提交和任务执行的协调等场景,也可以用于一些特殊的任务调度场景。

1
2
3
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

newCachedThreadPool线程池的实现队列,由于其容量为0,最大线程数基本处于最大,这种情况下可以创建大量线程,快速响应操作

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

四、阻塞队列的使用

1.CountDownLatch

实现原理

  • 它可以让一个线程阻塞
  • 也可以让多个线程阻塞

共享锁的实现

可以允许多个线程同时抢占到锁,然后等到计数器归零的时候,同时唤醒. state记录计数器.

countDown的时候,实际上就是 state–

CountDownLatch可以让许多线程阻塞,调用countDown直到计数器state为0后同时释放

因此它有两种使用方法

1.初始时设置state的值,在某一公共处调用await方法,所有操作都会阻塞在那里,在某些线程内调用countDown不断减小计数器,直到某一状态时统一放行;比如多线程处理某任务,阻塞主线程,等多线程任务处理完毕再统一放行;

2.在多线程内部调用await阻塞,将每个线程都拦在指定位置,然后调用一次countDown即可为所有线程放行

CountDownLatch用到的是AQS的共享锁

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

AQS内方法,尝试获取共享锁,获取不到就封装入队,先自旋再次尝试获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

2.Semaphore

信号灯.

限流器,限制资源的访问.

本质上: 抢占一个令牌. -> 如果抢占到令牌,就通行, 否则,就阻塞!

  • acquire() 抢占一个令牌
  • release() 释放一个令牌.

3.CyclicBarrier

可重复的栅栏

相当于,多个线程通过CountDownLatch的await

然后另外一个线程使用countDown方法来唤醒