摘要: 原创出处 http://cmsblogs.com/?p=2433 「小明哥」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

前面提到的各种BlockingQueue对读或者写都是锁上整个队列,在并发量大的时候,各种锁是比较耗资源和耗时间的,而前面的SynchronousQueue虽然不会锁住整个队列,但它是一个没有容量的“队列”,那么有没有这样一种队列,它即可以像其他的BlockingQueue一样有容量又可以像SynchronousQueue一样不会锁住整个队列呢?有!答案就是LinkedTransferQueue。

LinkedTransferQueue是基于链表的FIFO无界阻塞队列,它出现在JDK7中。Doug Lea 大神说LinkedTransferQueue是一个聪明的队列。它是ConcurrentLinkedQueueSynchronousQueue (公平模式下)、无界的LinkedBlockingQueues等的超集。既然这么牛逼,那势必要弄清楚其中的原理了。

LinkedTransferQueue

看源码之前我们先稍微了解下它的原理,这样看源码就会有迹可循了。

LinkedTransferQueue采用一种预占模式。什么意思呢?有就直接拿走,没有就占着这个位置直到拿到或者超时或者中断。即消费者线程到队列中取元素时,如果发现队列为空,则会生成一个null节点,然后park住等待生产者。后面如果生产者线程入队时发现有一个null元素节点,这时生产者就不会入列了,直接将元素填充到该节点上,唤醒该节点的线程,被唤醒的消费者线程拿东西走人。是不是有点儿SynchronousQueue的味道?

结构

LinkedTransferQueue与其他的BlockingQueue一样,同样继承AbstractQueue类,但是它实现了TransferQueue,TransferQueue接口继承BlockingQueue,所以TransferQueue算是对BlockingQueue一种扩充,该接口提供了一整套的transfer接口:

public interface TransferQueue<E> extends BlockingQueue<E> {
boolean tryTransfer(E e);
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
}

相对于其他的BlockingQueue,LinkedTransferQueue就多了上面几个方法。这几个方法在LinkedTransferQueue中起到了核心作用。

LinkedTransferQueue定义的变量如下:

// 判断是否为多核
private static final boolean MP =
Runtime.getRuntime().availableProcessors() > 1;
// 自旋次数
private static final int FRONT_SPINS = 1 << 7;
// 前驱节点正在处理,当前节点需要自旋的次数
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
static final int SWEEP_THRESHOLD = 32;
// 头节点
transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 删除节点失败的次数
private transient volatile int sweepVotes;
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

Node节点

Node节点由四个部分构成:

  • isData:表示该节点是存放数据还是获取数据
  • item:存放数据,isData为false时,该节点为null,为true时,匹配后,该节点会置为null
  • next:指向下一个节点
  • waiter:park住消费者线程,线程就放在这里

结构如下:

这里写图片描述
源码如下:

static final class Node {
// 表示该节点是存放数据还是获取数据
final boolean isData;
// 存放数据,isData为false时,该节点为null,为true时,匹配后,该节点会置为null
volatile Object item;
//指向下一个节点
volatile Node next;
// park住消费者线程,线程就放在这里
volatile Thread waiter; // null until waiting
final boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
final boolean casItem(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
Node(Object item, boolean isData) {
UNSAFE.putObject(this, itemOffset, item); // relaxed write
this.isData = isData;
}
final void forgetNext() {
UNSAFE.putObject(this, nextOffset, this);
}
final void forgetContents() {
UNSAFE.putObject(this, itemOffset, this);
UNSAFE.putObject(this, waiterOffset, null);
}
final boolean isMatched() {
Object x = item;
return (x == this) || ((x == null) == isData);
}
final boolean isUnmatchedRequest() {
return !isData && item == null;
}
final boolean cannotPrecede(boolean haveData) {
boolean d = isData;
Object x;
return d != haveData && (x = item) != this && (x != null) == d;
}
final boolean tryMatchData() {
// assert isData;
Object x = item;
if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter);
return true;
}
return false;
}
private static final long serialVersionUID = -3375979862319811754L;
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
private static final long waiterOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
waiterOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiter"));
} catch (Exception e) {
throw new Error(e);
}
}
}

节点Node为LinkedTransferQueue的内部类,其内部结构和公平方式的SynchronousQueue差不多,里面也同样提供了一些很重要的方法。

put操作

LinkedTransferQueue提供了add、put、offer三类方法,用于将元素插入队列中,如下:

public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}

由于LinkedTransferQueue是无界的,不会阻塞,所以在调用xfer方法是传入的是ASYNC,同时直接返回true.

take操作

LinkedTransferQueue提供了poll、take方法用于出列元素:

public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll() {
return xfer(null, false, NOW, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

这里和put操作有点不一样,take()方法传入的是SYNC,阻塞。poll()传入的是NOW,poll(long timeout, TimeUnit unit)则是传入TIMED。

tranfer操作

实现TransferQueue接口,就要实现它的方法:

public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void transfer(E e) throws InterruptedException {
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e) {
return xfer(e, true, NOW, 0) == null;
}

xfer()

通过上面几个核心方法的源码我们清楚可以看到,最终都是调用xfer()方法,该方法接受四个参数,item或者null的E,put操作为true、take操作为false的havaData,how(有四个值NOW, ASYNC, SYNC, or TIMED,分别表示不同的操作),超时nanos。

private E xfer(E e, boolean haveData, int how, long nanos) {
// havaData为true,但是e == null 抛出空指针
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) {
// 从首节点开始匹配
// p == null 队列为空
for (Node h = head, p = h; p != null;) {
// 模型,request or data
boolean isData = p.isData;
// item域
Object item = p.item;
// 找到一个没有匹配的节点
// item != p 也就是自身,则表示没有匹配过
// (item != null) == isData,表示模型符合
if (item != p && (item != null) == isData) {
// 节点类型和待处理类型一致,这样肯定是不能匹配的
if (isData == haveData) // can't match
break;
// 匹配,将E加入到item域中
// 如果p 的item为data,那么e为null,如果p的item为null,那么e为data
if (p.casItem(item, e)) { // match
//
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 匹配后唤醒p的waiter线程;reservation则叫人收货,data则叫null收货
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
// 如果已经匹配了则向前推进
Node n = p.next;
// 如果p的next指向p本身,说明p节点已经有其他线程处理过了,只能从head重新开始
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 如果没有找到匹配的节点,则进行处理
// NOW为untimed poll, tryTransfer,不需要入队
if (how != NOW) { // No matches available
// s == null,新建一个节点
if (s == null)
s = new Node(e, haveData);
// 入队,返回前驱节点
Node pred = tryAppend(s, haveData);
// 返回的前驱节点为null,那就是有race,被其他的抢了,那就continue 整个for
if (pred == null)
continue retry;
// ASYNC不需要阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e;
}
}

整个算法的核心就是寻找匹配节点找到了就返回,否则就入队(NOW直接返回):

  • matched。判断匹配条件(isData不一样,本身没有匹配),匹配后就casItem,然后unpark匹配节点的waiter线程,如果是reservation则叫人收货,data则叫null收货。
  • unmatched。如果没有找到匹配节点,则根据传入的how来处理,NOW直接返回,其余三种先入对,入队后如果是ASYNC则返回,SYNC和TIMED则会阻塞等待匹配。

其实相当于SynchronousQueue来说,这个处理逻辑还是比较简单的。

如果没有找到匹配节点,且how != NOW会入队,入队则是调用tryAppend方法:

private Node tryAppend(Node s, boolean haveData) {
// 从尾节点tail开始
for (Node t = tail, p = t;;) {
Node n, u;
// 队列为空则将节点S设置为head
if (p == null && (p = head) == null) {
if (casHead(null, s))
return s;
}
// 如果为data
else if (p.cannotPrecede(haveData))
return null;
// 不是最后一个节点
else if ((n = p.next) != null)
p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null;
// CAS失败,一般来说失败的原因在于p.next != null,可能有其他增加了tail,向前推荐
else if (!p.casNext(null, s))
p = p.next; // re-read on CAS failure
else {
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
return p;
}
}
}

tryAppend方法是将S节点添加到tail上,然后返回其前驱节点。好吧,我承认这段代码我看的有点儿晕!!!

加入队列后,如果how还不是ASYNC则调用awaitMatch()方法阻塞等待:

private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 超时控制
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 自旋次数
int spins = -1; // initialized after first item and cancel checks
// 随机数
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
//匹配了,可能有其他线程匹配了线程
if (item != e) {
// 撤销该节点
s.forgetContents();
return LinkedTransferQueue.<E>cast(item);
}
// 线程中断或者超时了。则调用将s节点item设置为e,等待取消
if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel
// 断开节点
unsplice(pred, s);
return e;
}
// 自旋
if (spins < 0) {
// 计算自旋次数
if ((spins = spinsFor(pred, s.isData)) > 0)
randomYields = ThreadLocalRandom.current();
}
// 自旋
else if (spins > 0) {
--spins;
// 生成的随机数 == 0 ,停止线程?不是很明白....
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield();
}
// 将当前线程设置到节点的waiter域
// 一开始s.waiter == null 肯定是会成立的,
else if (s.waiter == null) {
s.waiter = w; // request unpark then recheck
}
// 超时阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超时阻塞
LockSupport.park(this);
}
}
}

整个awaitMatch过程和SynchronousQueue的awaitFulfill没有很大区别,不过在自旋过程会调用Thread.yield();这是干嘛?

在awaitMatch过程中,如果线程中断了,或者超时了则会调用unsplice()方法去除该节点:

final void unsplice(Node pred, Node s) {
s.forgetContents(); // forget unneeded fields
if (pred != null && pred != s && pred.next == s) {
Node n = s.next;
if (n == null ||
(n != s && pred.casNext(s, n) && pred.isMatched())) {
for (;;) { // check if at, or could be, head
Node h = head;
if (h == pred || h == s || h == null)
return; // at head or list empty
if (!h.isMatched())
break;
Node hn = h.next;
if (hn == null)
return; // now empty
if (hn != h && casHead(h, hn))
h.forgetNext(); // advance head
}
if (pred.next != pred && s.next != s) { // recheck if offlist
for (;;) { // sweep now if enough votes
int v = sweepVotes;
if (v < SWEEP_THRESHOLD) {
if (casSweepVotes(v, v + 1))
break;
}
else if (casSweepVotes(v, 0)) {
sweep();
break;
}
}
}
}
}
}

主体流程已经完成,这里总结下:

  1. 无论是入对、出对,还是交换,最终都会跑到xfer(E e, boolean haveData, int how, long nanos)方法中,只不过传入的how不同而已
  2. 如果队列不为空,则尝试在队列中寻找是否存在与该节点相匹配的节点,如果找到则将匹配节点的item设置e,然后唤醒匹配节点的waiter线程。如果是reservation则叫人收货,data则叫null收货
  3. 如果队列为空,或者没有找到匹配的节点且how != NOW,则调用tryAppend()方法将节点添加到队列的tail,然后返回其前驱节点
  4. 如果节点的how != NOW && how != ASYNC,则调用awaitMatch()方法阻塞等待,在阻塞等待过程中和SynchronousQuque的awaitFulfill()逻辑差不多,都是先自旋,然后判断是否需要自旋,如果中断或者超时了则将该节点从队列中移出

实例

这段摘自JAVA 1.7并发之LinkedTransferQueue原理理解。感觉看完上面的源码后,在结合这个例子会有更好的了解,掌握。

1:Head->Data Input->Data
Match: 根据他们的属性 发现 cannot match ,因为是同类的
处理节点: 所以把新的data放在原来的data后面,然后head往后移一位,Reservation同理
HEAD=DATA->DATA

2:Head->Data Input->Reservation (取数据)
Match: 成功match,就把Data的item变为reservation的值(null,有主了),并且返回数据。
处理节点: 没动,head还在原地
HEAD=DATA(用过)

3:Head->Reservation Input->Data(放数据)
Match: 成功match,就把Reservation的item变为Data的值(有主了),并且叫waiter来取
处理节点: 没动
HEAD=RESERVATION(用过)

666. 彩蛋

如果你对 Java 并发感兴趣,欢迎加入我的知识星球一起交流。

知识星球

文章目录
  1. 1. LinkedTransferQueue
    1. 1.1. 结构
    2. 1.2. Node节点
    3. 1.3. put操作
    4. 1.4. take操作
    5. 1.5. tranfer操作
    6. 1.6. xfer()
    7. 1.7. 实例
  • 666. 彩蛋