⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 blog.csdn.net/qq_38306425/article/details/109332045 「工地杨大锤」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

面试官:你好,你先做个自我介绍吧

某人:面试官你好,我叫开局一张嘴面试全靠吹,某某年毕业,毕业自家里蹲大学,做过某某项目。。。。。。

面试官微微一笑,捋了捋稀疏的头发:看你简历,你精通多线程?那你手写过堵塞队列吗?

某人心里出现一万个问号,堵塞队列是啥玩意?平时基本都是crud,顶多用多线程跑数据

图片

某人:没有手写过。

面试官:哦,那你说下堵塞队列吧

某人支支吾吾:这个有点忘了

面试官:没事,那我们下一个。

此处省略一万字。

面试官扭了扭严重负荷的颈椎:先到这里吧,你先回去等通知。

某人:好的。

不出意外,某人等了一个月,等的望眼欲穿,也没等到那个期待的电话。

1.什么是队列

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

图片

队列其实就是跟平时排队一样,按照顺序来,先排队的先买到东西,后排队的后买到东西,排队的第一个叫队头,最后一个叫队尾,这就是队列的先进先出,这是和栈最大的区别。

2.什么是堵塞队列?

图片

当队列为空时,消费者挂起,队列已满时,生产者挂起,这就是生产-消费者模型,堵塞其实就是将线程挂起。因为生产者的生产速度和消费者的消费速度之间的不匹配,就可以通过堵塞队列让速度快的暂时堵塞, 如生产者每秒生产两个数据,而消费者每秒消费一个数据,当队列已满时,生产者就会堵塞(挂起),等待消费者消费后,再进行唤醒。

堵塞队列会通过挂起的方式来实现生产者和消费者之间的平衡,这是和普通队列最大的区别。

3.如何实现堵塞队列?

jdk其实已经帮我们提供了实现方案,java5增加了concurrent包,concurrent包中的BlockingQueue就是堵塞队列,我们不需要关心BlockingQueue如何实现堵塞,一切都帮我们封装好了,只需要做一个没有感情的API调用者就行。

4.BlockingQueue如何使用?

BlockingQueue本身只是一个接口,规定了堵塞队列的方法,主要依靠几个实现类实现。

4.1 BlockingQueue主要方法

图片

1.插入数据 (1)offer(E e):如果队列没满,返回true,如果队列已满,返回false(不堵塞) (2)offer(E e, long timeout, TimeUnit unit):可以设置等待时间,如果队列已满,则进行等待。超过等待时间,则返回false (3)put(E e):无返回值,一直等待,直至队列空出位置

2.获取数据 (1)poll():如果有数据,出队,如果没有数据,返回null (2)poll(long timeout, TimeUnit unit):可以设置等待时间,如果没有数据,则等待,超过等待时间,则返回null (3)take():如果有数据,出队。如果没有数据,一直等待(堵塞)

4.2 BlockingQueue主要实现类

1.ArrayBlockingQueue:ArrayBlockingQueue是基于数组实现的,通过初始化时设置数组长度,是一个有界队列,而且ArrayBlockingQueue和LinkedBlockingQueue不同的是,ArrayBlockingQueue只有一个锁对象,而LinkedBlockingQueue是两个锁对象,一个锁对象会造成要么是生产者获得锁,要么是消费者获得锁,两者竞争锁,无法并行。

2.LinkedBlockingQueue:LinkedBlockingQueue是基于链表实现的,和ArrayBlockingQueue不同的是,大小可以初始化设置,如果不设置,默认设置大小为Integer.MAX_VALUE,LinkedBlockingQueue有两个锁对象,可以并行处理。

3.DelayQueue:DelayQueue是基于优先级的一个无界队列,队列元素必须实现Delayed接口,支持延迟获取,元素按照时间排序,只有元素到期后,消费者才能从队列中取出。

4.PriorityBlockingQueue:PriorityBlockingQueue是基于优先级的一个无界队列,底层是基于数组存储元素的,元素按照优选级顺序存储,优先级是通过Comparable的compareTo方法来实现的(自然排序),和其他堵塞队列不同的是,其只会堵塞消费者,不会堵塞生产者,数组会不断扩容,这就是一个彩蛋,使用时要谨慎。

5.SynchronousQueue:SynchronousQueue是一个特殊的队列,其内部是没有容器的,所以生产者生产一个数据,就堵塞了,必须等消费者消费后,生产者才能再次生产,称其为队列有点不合适,现实生活中,多个人才能称为队,一个人称为队有些说不过去。

5.手写堵塞队列

我是参照了ArrayBlockingQueue的源码写的,欢迎大家斧正。

/**
* @author yz
* @version 1.0
* @date 2020/10/31 11:24
*/
public class YzBlockingQuery {

private Object[] tab; //队列容器

private int takeIndex; //出队下标

private int putIndex; //入队下标

private int size;//元素数量

private ReentrantLock reentrantLock = new ReentrantLock();

private Condition notEmpty;//读条件

private Condition notFull;//写条件

public YzBlockingQuery(int tabCount) {
if (tabCount <= 0) {
new NullPointerException();
}

tab = new Object[tabCount];
notEmpty = reentrantLock.newCondition();
notFull = reentrantLock.newCondition();
}

public boolean offer(Object obj) {
if (obj == null) { throw new NullPointerException(); }
try {
//获取锁
reentrantLock.lock();
//队列已满
while (size==tab.length){
System.out.println("队列已满");
//堵塞
notFull.await();
}
tab[putIndex]=obj;
if(++putIndex==tab.length){
putIndex=0;
}
size++;
//唤醒读线程
notEmpty.signal();
return true;
} catch (Exception e) {
//唤醒读线程
notEmpty.signal();
} finally {
reentrantLock.unlock();
}
return false;
}


public Object take(){
try {
reentrantLock.lock();
while (size==0){
System.out.println("队列空了");
//堵塞
notEmpty.await();
}
Object obj= tab[takeIndex];
//如果到了最后一个,则从头开始
if(++takeIndex==tab.length){
takeIndex=0;
}
size--;
//唤醒写线程
notFull.signal();
return obj;
}catch (Exception e){
//唤醒写线程
notFull.signal();
}finally {
reentrantLock.unlock();
}
return null;
}


public static void main(String[] args) {
Random random = new Random(100);
YzBlockingQuery yzBlockingQuery=new YzBlockingQuery(5);
Thread thread1 = new Thread(() -> {
for (int i=0;i<100;i++) {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
yzBlockingQuery.offer(i);
System.out.println("生产者生产了:"+i);
}
});

Thread thread2 = new Thread(() -> {
for (int i=0;i<100;i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object take = yzBlockingQuery.take();
System.out.println("消费者消费了:"+take);
}
});

thread1.start();
thread2.start();
}
}

文章目录
  1. 1. 1.什么是队列
  2. 2. 2.什么是堵塞队列?
  3. 3. 3.如何实现堵塞队列?
  4. 4. 4.BlockingQueue如何使用?
    1. 4.1. 4.1 BlockingQueue主要方法
    2. 4.2. 4.2 BlockingQueue主要实现类
  5. 5. 5.手写堵塞队列