自我表扬:《Dubbo 实现原理与源码解析 —— 精品合集》
表扬自己:《D数据库实体设计合集》

摘要: 原创出处 http://cmsblogs.com/?p=2222 「小明哥」欢迎转载,保留摘要,谢谢!

作为「小明哥」的忠实读者,「老艿艿」略作修改,记录在理解过程中,参考的资料。


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

此篇博客所有源码均来自 JDK 1.8

在没有 Lock 之前,我们使用 synchronized 来控制同步,配合 Object 的 #wait()#notify() 等一系列方法可以实现等待 / 通知模式。在 Java SE 5 后,Java 提供了 Lock 接口,相对于 synchronized 而言,Lock 提供了条件 Condition ,对线程的等待、唤醒操作更加详细和灵活。下图是 Condition 与 Object 的监视器方法的对比(摘自《Java并发编程的艺术》):

Condition 与 Object 的监视器方法的对比

1. Condition

java.util.concurrent.locks.Condition ,条件 Condition 接口,定义了一系列的方法,来对阻塞和唤醒线程:

// ========== 阻塞 ==========

void await() throws InterruptedException; // 造成当前线程在接到信号或被中断之前一直处于等待状态。
void awaitUninterruptibly(); // 造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
long awaitNanos(long nanosTimeout) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在`nanosTimeout` 之前唤醒,那么返回值 `= nanosTimeout - 消耗时间` ,如果返回值 `<= 0` ,则可以认定它已经超时了。
boolean await(long time, TimeUnit unit) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean awaitUntil(Date deadline) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回 true ,否则表示到了指定时间,返回返回 false 。

// ========== 唤醒 ==========

void signal(); // 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
void signalAll(); // 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

Condition 是一种广义上的条件队列。他为线程提供了一种更为灵活的等待 / 通知模式,线程在调用 await 方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。Condition 必须要配合 Lock 一起使用,因为对共享状态变量的访问发生在多线程环境下。一个 Condition 的实例必须与一个 Lock 绑定,因此 Condition 一般都是作为 Lock 的内部实现。

2. ConditionObject

获取一个 Condition 必须要通过 Lock 的 #newCondition() 方法。该方法定义在接口 Lock 下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition 为一个接口,其下仅有一个实现类 ConditionObject ,由于 Condition 的操作需要获取相关的锁,而 AQS则是同步锁的实现基础,所以 ConditionObject 则定义为 AQS 的内部类。代码如下:

public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter; // 头节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 尾节点

public ConditionObject() {
}

// ... 省略内部代码
}
  • 从上面代码可以看出,ConditionObject 拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用 #await()方法时,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部。结构如下:

    Condition 等待队列

  • Node 里面包含了当前线程的引用。Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized 的 Node 静态内部类)。

  • ConditionObject 的队列结构比 CLH 同步队列的结构简单些,新增过程较为简单,只需要将原尾节点的 Node.next 指向新增节点,然后更新 ConditionObject.lastWaiter 即可。

2.1 大体实现流程

老艿艿:在理解 Condition 的时候,看了下 《Java Condition 源码分析》大体实现流程,写的挺不错的,所以直接引用。

AQS 等待队列与 Condition 队列是两个相互独立的队列

  • #await() 就是在当前线程持有锁的基础上释放锁资源,并新建 Condition 节点加入到 Condition 的队列尾部,阻塞当前线程 。
  • #signal() 就是将 Condition 的头节点移动到 AQS 等待节点尾部,让其等待再次获取锁。

以下是 AQS 队列和 Condition 队列的出入结点的示意图,可以通过这几张图看出线程结点在两个队列中的出入关系和条件。

I.初始化状态:AQS等待队列有 3 个Node,Condition 队列有 1 个Node(也有可能 1 个都没有)

img

II.节点1执行 Condition.await()

  1. 将 head 后移
  2. 释放节点 1 的锁并从 AQS 等待队列中移除
  3. 将节点 1 加入到 Condition 的等待队列中
  4. 更新 lastWaiter 为节点 1

img

III.节点 2 执行 Condition.signal() 操作

  1. 将 firstWaiter后移
  2. 将节点 4 移出 Condition 队列
  3. 将节点 4 加入到 AQS 的等待队列中去
  4. 更新 AQS 的等待队列的 tail

img

2.2 等待

2.2.1 await

调用 Condition 的 #await() 方法,会使当前线程进入等待状态,同时会加入到 Condition 等待队列,并且同时释放锁。当从 #await() 方法结束时,当前线程一定是获取了Condition 相关联的锁。

public final void await() throws InterruptedException {
// 当前线程中断
if (Thread.interrupted())
throw new InterruptedException();
//当前线程加入等待队列
Node node = addConditionWaiter();
//释放锁
long savedState = fullyRelease(node);
int interruptMode = 0;
/**
* 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
* 直到检测到此节点在同步队列上
*/
while (!isOnSyncQueue(node)) {
//线程挂起
LockSupport.park(this);
//如果已经中断了,则退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//竞争同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
  • 首先,将当前线程新建一个节点同时加入到条件队列中。
  • 然后,释放当前线程持有的同步状态。
  • 之后,则是不断检测该节点代表的线程,出现在 CLH 同步队列中(收到 signal 信号之后,就会在 AQS 队列中检测到),如果不存在则一直挂起
  • 最后,重新参与竞争,获取到同步状态。

2.1.1.1 addConditionWaiter

#addConditionWaiter() 方法,加入条件队列,代码如下:

private Node addConditionWaiter() {
Node t = lastWaiter; //尾节点
//Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
if (t != null && t.waitStatus != Node.CONDITION) {
//清除条件队列中所有状态不为Condition的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//当前线程新建节点,状态 CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/**
* 将该节点加入到条件队列中最后一个位置
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
  • 该方法主要是将当前线程加入到 Condition 条件队列中。当然,在加入到尾节点之前,会调用 #unlinkCancelledWaiters() 方法,清除所有状态不为 Condition 的节点。

2.1.1.2 fullyRelease

#fullyRelease(Node node) 方法,负责完全释放该线程持有的锁,因为例如 ReentrantLock 是可以重入的。代码如下:

final long fullyRelease(Node node) {
boolean failed = true;
try {
// 节点状态--其实就是持有锁的数量
long savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
  • 正常情况下,释放锁都能成功,因为是调用 Lock#lock() 方法,调用 Condition#await() 方法。
  • 那么什么情况下会失败,抛出 IllegalMonitorStateException 异常呢?例如,当前线程未持有锁,调用 Lock#lock() 方法,而直接调用 Condition#await() 方法,此时就会抛出该异常。
  • 另外,释放失败的情况下,会设置 Node 的等待状态为 Node.CANCELED

2.1.1.3 isOnSyncQueue

#isOnSyncQueue(Node node) 方法,如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回 true 。代码如下:

final boolean isOnSyncQueue(Node node) {
// 状态为 Condition,获取前驱节点为 null ,返回 false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 后继节点不为 null,肯定在 CLH 同步队列中
if (node.next != null)
return true;

return findNodeFromTail(node);
}

2.1.1.4 unlinkCancelledWaiters

#unlinkCancelledWaiters() 方法,负责将条件队列中状态不为 Condition 的节点删除。代码如下:

// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
// 纯属链表操作,很好理解,看不懂多看几遍就可以了
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null; // 用于中间不需要跳过时,记录上一个 Node 节点
while (t != null) {
Node next = t.nextWaiter;
// 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

2.2.2 其他 await 实现方法

老艿艿:本文省略,感兴趣的胖友,可以看看 《一行一行源码分析清楚 AbstractQueuedSynchronizer (二)》「带超时机制的 await」「不抛出 InterruptedException 的 await」 小节。

2.3 通知

2.3.1 signal

调用 ConditionObject的 #signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

public final void signal() {
//检测当前线程是否为拥有锁的独
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//头节点,唤醒条件队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒
}
  • 该方法首先会判断当前线程是否已经获得了锁,这是前置条件。然后调用 #doSignal(Node first) 方法,唤醒条件队列中的头节点。代码如下:

    private void doSignal(Node first) {
    do {
    //修改头结点,完成旧头结点的移出工作
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    }
    • 主要是做两件事:1)修改头节点;2)调用 #transferForSignal(Node first) 方法将节点移动到 CLH 同步队列中。代码如下:

       final boolean transferForSignal(Node node) {
      //将该节点从状态CONDITION改变为初始状态0,
      if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      return false;

      //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
      Node p = enq(node);
      int ws = p.waitStatus;
      //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
      if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      LockSupport.unpark(node.thread);
      return true;
      }
      • x

整个通知的流程如下:

  1. 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点
  3. 唤醒首节点是先将条件队列中的头节点移出,然后调用 AQS 的 #enq(Node node) 方法将其安全地移到 CLH 同步队列中
  4. 最后判断如果该节点的同步状态是否为 Node.CANCEL ,或者修改状态为 Node.SIGNAL 失败时,则直接调用 LockSupport 唤醒该节点的线程。

2.3.2 signalAll

老艿艿:感兴趣的胖友,自己去查看。

2.4 总结

一个线程获取锁后,通过调用 Condition 的 #await() 方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过 #isOnSyncQueue(Node node) 方法,不断自检看节点是否已经在 CLH 同步队列了,如果是则尝试获取锁,否则一直挂起。

当线程调用 #signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过#doSignal(Node first) 方法唤醒CLH同步队列的首节点。被唤醒的线程,将从 #await() 方法中的 while 循环中退出来,然后调用 #acquireQueued(Node node, int arg) 方法竞争同步状态。

3. Condition 的应用

只知道原理,如果不知道使用那就坑爹了,下面是用Condition实现的生产者消费者问题:

public class ConditionTest {
private LinkedList<String> buffer; //容器
private int maxSize ; //容器最大
private Lock lock;
private Condition fullCondition;
private Condition notFullCondition;

ConditionTest(int maxSize){
this.maxSize = maxSize;
buffer = new LinkedList<String>();
lock = new ReentrantLock();
fullCondition = lock.newCondition();
notFullCondition = lock.newCondition();
}

public void set(String string) throws InterruptedException {
lock.lock(); //获取锁
try {
while (maxSize == buffer.size()){
notFullCondition.await(); //满了,添加的线程进入等待状态
}

buffer.add(string);
fullCondition.signal();
} finally {
lock.unlock(); //记得释放锁
}
}

public String get() throws InterruptedException {
String string;
lock.lock();
try {
while (buffer.size() == 0){
fullCondition.await();
}
string = buffer.poll();
notFullCondition.signal();
} finally {
lock.unlock();
}
return string;
}
}

参考资料

  1. 方腾飞:《Java并发编程的艺术》的 「5.6 Condition 接口」 章节

老艿艿:Condition 的实现细节很多,推荐再阅读下 《一行一行源码分析清楚 AbstractQueuedSynchronizer (二)》

666. 彩蛋

如果你对 Java 并发感兴趣,欢迎加入我的知识星球一起交流。

知识星球

文章目录
  1. 1. 1. Condition
  2. 2. 2. ConditionObject
    1. 2.1. 2.1 大体实现流程
    2. 2.2. 2.2 等待
      1. 2.2.1. 2.2.1 await
        1. 2.2.1.1. 2.1.1.1 addConditionWaiter
        2. 2.2.1.2. 2.1.1.2 fullyRelease
        3. 2.2.1.3. 2.1.1.3 isOnSyncQueue
        4. 2.2.1.4. 2.1.1.4 unlinkCancelledWaiters
      2. 2.2.2. 2.2.2 其他 await 实现方法
    3. 2.3. 2.3 通知
      1. 2.3.1. 2.3.1 signal
      2. 2.3.2. 2.3.2 signalAll
    4. 2.4. 2.4 总结
  3. 3. 3. Condition 的应用
  4. 4. 参考资料
  5. 5. 666. 彩蛋