《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://blog.csdn.net/wangwenpeng0529/article/details/105769978/ 「Debug-ya」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注微信公众号:【芋道源码】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

简介

在 Java 5.0 提供了 java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步 IO 和轻量级任务框架;还提供了设计用于多线程上下文中的 Collection 实现等

volatile 关键字

内存可见性

内存可见性(Memory Visibility)是指当某个线程正在使用对象状态而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。

可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。

我们可以通过同步来保证对象被安全地发布。除此之外我们也可以使用一种更加轻量级的 volatile 变量。

Java 提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。可以将 volatile 看做一个轻量级的锁,但是又与锁有些不同:

  • 对于多线程,不是一种互斥关系
  • 不能保证变量状态的“原子性操作

问题代码示例

/**
* @ClassName TestVolatile
* @Description: Thread 已经修改了flag,但是main线程还是拿到的false
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestVolatile {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
if (td.isFlag()) {
System.out.println("______________");
break;
}
}
}
}

class ThreadDemo implements Runnable {
private boolean flag = false;

public boolean isFlag() {
return flag;
}

public void setFlag(boolean flag) {
this.flag = flag;
}

@Override
public void run() {
try {
//增加这种出现问题的几率
Thread.sleep(200);
} catch (Exception e) {
}
flag = true;
System.out.println("flag=" + isFlag());
}
}

图片

两个线程同时修改这一个flag,为什么main拿到的还是这种修改之前的值

内存分析

图片

解决方法,加锁

public class TestVolatile {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
new Thread(td).start();
while (true) {
synchronized (td) {
if (td.isFlag()) {
System.out.println("______________");
break;
}
}
}
}
}

图片

加了锁,就可以让while循环每次都从主存中去读取数据,这样就能读取到true了。但是一加锁,每次只能有一个线程访问,当一个线程持有锁时,其他的就会阻塞,效率就非常低了。不想加锁,又要解决内存可见性问题,那么就可以使用volatile关键字。

volatile

private volatile boolean flag = false;

volatile 关键字:当多个线程进行操作共享数据时,可以保证内存中的数据可见。相较于 synchronized 是一种较为轻量级的同步策略。

注意:

  • volatile 不具备“互斥性”
  • volatile 不能保证变量的“原子性”

原子性

所谓原子性就是操作不可再细分

问题代码

package com.atguigu.juc;

/**
* @ClassName TestAtomicDemo
* @Description:
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestAtomicDemo {
public static void main(String[] args) {
AtomicDemo ad = new AtomicDemo();
for (int i = 0; i < 10; i++) {
new Thread(ad).start();
}
}
}

class AtomicDemo implements Runnable {

private int serialNumber = 0;

@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
}

public int getSerialNumber() {
return serialNumber++;
}
}

图片

图片

看到这里,好像和上面的内存可见性问题一样。是不是加个volatile关键字就可以了呢?其实不是的,因为加了volatile,只是相当于所有线程都是在主存中操作数据而已,但是不具备互斥性。比如两个线程同时读取主存中的0,然后又同时自增,同时写入主存,结果还是会出现重复数据。

import java.util.concurrent.atomic.AtomicInteger;

/**
* @ClassName TestAtomicDemo
*
* 原子变量:在 java.util.concurrent.atomic 包下提供了一些原子变量。
* 1. volatile 保证内存可见性
* 2. CAS(Compare-And-Swap) 算法保证数据变量的原子性
* CAS 算法是硬件对于并发操作的支持
* CAS 包含了三个操作数:
* ①内存值 V
* ②预估值 A
* ③更新值 B
* 当且仅当 V == A 时, V = B; 否则,不会执行任何操作。
*
* @Description:
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestAtomicDemo {
public static void main(String[] args) {
AtomicDemo ad = new AtomicDemo();
for (int i = 0; i < 10; i++) {
new Thread(ad).start();
}
}
}

class AtomicDemo implements Runnable {

private AtomicInteger serialNumber = new AtomicInteger(0);

@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber());
}

public int getSerialNumber() {
return serialNumber.getAndIncrement();
}
}

AtomicInteger这个玩意是具有原子性的integer,用它替换后发现能保证线程安全

Connected to the target VM, address: '127.0.0.1:61323', transport: 'socket'
Thread-4:1
Thread-6:4
Thread-0:3
Thread-7:9
Thread-2:2
Thread-5:6
Thread-3:5
Thread-1:0
Thread-9:7
Thread-8:8
Disconnected from the target VM, address: '127.0.0.1:61323', transport: 'socket'

CAS 算法

解决了原子性问题,解决了内存可见性的问题

CAS (Compare-And-Swap) 是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。CAS 是一种无锁的非阻塞算法的实现。

CAS 包含了 3 个操作数:

  • 需要读写的内存值 V 进行比较的值 A 拟写入的新值 B
  • 当且仅当 V 的值等于 A 时, CAS 通过原子方式用新值 B 来更新 V 的值,否则不会执行任何操作。
  • CAS比较失败的时候不会放弃CPU,会反复执行,直到自己修改主内存的数据

模拟CAS算法

/**
* @ClassName TestCompareAndSwap
* @Description: cas模拟 模拟带锁,底层不是带synchronized
* cas 每次修改之前,都会执行获取比较操作
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestCompareAndSwap {

public static void main(String[] args) {
final CompareAndSwap cas = new CompareAndSwap();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
int expectValue = cas.getValue();
System.out.println(cas.compareAndSet(expectValue, (int) Math.random() * 101));
}
}).start();
}
}
}

class CompareAndSwap {
public int value;

//获取内存值
public synchronized int getValue() {
return value;
}

//比较并交换
public synchronized int compareAndSwap(int expectValue, int newV) {
int oldV = value;
//内存值和预估值一致 就替换
if (oldV == expectValue) {
this.value = newV;
}
return oldV;
}

//设置 调用比较并交换 看期望值和原来的值是否一致
public synchronized boolean compareAndSet(int expectValue, int newV) {
return expectValue == compareAndSwap(expectValue, newV);
}
}

原子变量

小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将 volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。

类 AtomicBoolean、 AtomicInteger、 AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。

AtomicIntegerArray、 AtomicLongArray 和 AtomicReferenceArray 类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。

核心方法:boolean compareAndSet(expectedValue, updateValue)

java.util.concurrent.atomic 包下提供了一些原子操作的常用类:

AtomicBoolean 、 
AtomicInteger 、
AtomicLong 、
AtomicReference
AtomicIntegerArray 、
AtomicLongArray
AtomicMarkableReference
AtomicReferenceArray
AtomicStampedReference3-ConcurrentHashMap

锁分段机制ConcurrentHashMap

线程安全的hash表 每一段都是一个独立的锁

Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。

ConcurrentHashMap 同步容器类是Java 5 增加的一个线程安全的哈希表。对与多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用“锁分段”机制替代 Hashtable 的独占锁。进而提高性能。

此包还提供了设计用于多线程上下文中的 Collection 实现:ConcurrentHashMap、 ConcurrentSkipListMap、 ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。当期望许多线程访问一个给定 collection 时, ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时, CopyOnWriteArrayList 优于同步的 ArrayList。

ConcurrentHashMap就是一个线程安全的hash表。我们知道HashMap是线程不安全的,Hash Table加了锁,是线程安全的,因此它效率低。HashTable加锁就是将整个hash表锁起来,当有多个线程访问时,同一时间只能有一个线程访问,并行变成串行,因此效率低。所以JDK1.5后提供了ConcurrentHashMap,它采用了锁分段机制。

图片

1.8以后底层又换成了CAS,把锁分段机制放弃了。CAS基本就达到了无锁的境界

CopyOnWrite写入并复制

package com.atguigu.juc;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

/*
* CopyOnWriteArrayList/CopyOnWriteArraySet : “写入并复制”
* 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。
*/
public class TestCopyOnWriteArrayList {

public static void main(String[] args) {
HelloThread ht = new HelloThread();
for (int i = 0; i < 10; i++) {
new Thread(ht).start();
}
}
}

class HelloThread implements Runnable {
//private static List<String> list = Collections.synchronizedList(new ArrayList<String>());

//每次修改都会复制 添加操作多时 不适合选这个
private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();

static {
list.add("AA");
list.add("BB");
list.add("CC");
}

@Override
public void run() {
Iterator<String> it = list.iterator();
while (it.hasNext()) {
System.out.println(it.next());
list.add("AA");//边迭代边添加 会出现并发修改异常
}
}
}

CountDownLatch 闭锁

闭锁,在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才继续执行,Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。

CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
  • 等待直到某个操作所有参与者都准备就绪再继续执行。
import java.util.concurrent.CountDownLatch;

/**
* @ClassName TestCountDownLatch
* @Description: 闭锁操作 其他线程都执行完成后当前线程才能继续执行
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(5);
LatchDemo ld = new LatchDemo(latch);
//计算执行时间
long start = System.currentTimeMillis();
for (int i = 0; i < 5; i++) {
new Thread(ld).start();
}
//闭锁 等待其他线程的执行
latch.await();
long end = System.currentTimeMillis();
System.out.println("执行时间===============================" + (end - start));

}
}

class LatchDemo implements Runnable {
private CountDownLatch latch;

public LatchDemo(CountDownLatch latch) {
this.latch = latch;
}

@Override
public void run() {
synchronized (this) {
try {
for (int i = 0; i < 1000; i++) {
if (i % 2 == 0) {
System.out.println(Thread.currentThread().getName() + "-------------" + i);
}
}
} finally {
//线程执行完毕后 countdown 减一
latch.countDown();
}
}
}
}

实现 Callable 接口

Java 5.0 在 java.util.concurrent 提供了一个新的创建执行线程的方式:Callable 接口

Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常。

Callable 需要依赖FutureTask , FutureTask 也可以用作闭锁。

package com.atguigu.juc;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
* @ClassName TestCallable
* @Description:
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestCallable {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableThreadDemo td = new CallableThreadDemo();
//futureTask 实现类的支持 用于接收运算结果
FutureTask<Integer> result = new FutureTask<>(td);

new Thread(result).start();//线程开始运行
Integer sum = result.get();//等待线程执行完成后 才能获取到结果 也可以用于闭锁操作作为等待项
System.out.println("总和" + sum);
}
}

/**
* @Description 多了一个方法的返回值 并且可以抛出异常
* @Author WangWenpeng
* @Param
*/
class CallableThreadDemo implements Callable<Integer> {

@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}

//class ThreadDemo implements Runnable{
// @Override
// public void run() {
//
// }
//}

同步锁显示锁 Lock

在 Java 5.0 之前,协调共享对象的访问时可以使用的机制只有 synchronized 和 volatile 。Java 5.0 后增加了一些新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。

ReentrantLock 实现了 Lock 接口,并提供了与synchronized 相同的互斥性和内存可见性。但相较于synchronized 提供了更高的处理锁的灵活性。

package com.atguigu.juc;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @ClassName TestLock
* @Description: 同步锁 更灵活的方式
* lock上锁 unlock释放锁
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestLock {

public static void main(String[] args) {
Ticket ticket = new Ticket();

new Thread(ticket, "1号窗口").start();
new Thread(ticket, "2号窗口").start();
new Thread(ticket, "3号窗口").start();
}
}

class Ticket implements Runnable {
private int ticket = 100;
private Lock lock = new ReentrantLock();

@Override
public void run() {
//这样买票没有问题
//while (ticket > 0) {
// System.out.println(Thread.currentThread().getName() + "完成售票,余票为" + --ticket);
//}

//放大问题出现的记录 出现了负票号
//while (true) {
// if (ticket > 0) {
// try {
// Thread.sleep(200);
// System.out.println(Thread.currentThread().getName() + "完成售票,余票为" + --ticket);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
//}

//显式加锁和释放锁
while (true) {
lock.lock();
try {
if (ticket > 0) {
try {
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + "完成售票,余票为" + --ticket);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}
}

lock的等待唤醒机制

/**
* @ClassName TestProductorAndConsumer
* @Description: 生产者消费者模型
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);

//没有等待唤醒机制的时候
//生产者一直生产 不考录消费者 可能造成数据丢失
//消费者一直消费 不考虑生产者 可能造成重复消费
new Thread(productor, "生产者a").start();
new Thread(consumer, "消费者a").start();
}
}

/**
* 店员
*/
class Clerk {
//库存共享数据 存在安全问题
private int product = 0;

//进货
public synchronized void get() {
if (product >= 10) {
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
}
} else {
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "店员进货1个产品 库存为" + ++product);
}
}

//卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("产品缺货,无法售卖");
try {
this.wait();
} catch (InterruptedException e) {
}
} else {
System.out.println(Thread.currentThread().getName() + "店员销售1个产品 库存为" + --product);
this.notifyAll();
}
}
}

/**
* 生产者
*/
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.get();
}
}
}

/**
* @Description 消费者
* @Author WangWenpeng
* @Date 6:45 2020/4/27
* @Param
*/
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

lock出问题的情况

生产者等待,增加出问题的几率 库存空位改成1

 if (product >= 1) {
System.out.println("产品已满,无法添加");
--------------------------------------------------------------------------------------

@Override
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
clerk.get();
}
}

图片

消费者等于0的时候, 两个消费者同时生产,之后停住了,没有其他线程去唤醒,导致停在生产者这里。

解决方法,去掉else,让他能走唤醒方法。

//进货
public synchronized void get() {
if (product >= 1) {
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
}
}
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "店员进货1个产品 库存为" + ++product);
}

//卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("产品缺货,无法售卖");
try {
this.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + "店员销售1个产品 库存为" + --product);
this.notifyAll();
}

让线程能走到notifyall,可以避免停止在生产者这里。

虚假唤醒

增加到两个消费者两个生产者之后,如果现在没有库存,两个消费者都停止在wait,然后出现生产者将库存加一,唤醒所有消费者,这时候就出现了两个消费者同时去消费一个库存,导致库存变成负数,这就是虚假唤醒。

图片

在object类的wait方法中,虚假唤醒是可能的,因此这个wait方法应该总被使用在循环中

解决方法

将代码中的if换为while循环执行

//进货
public synchronized void get() {
while (product >= 1) { //wait使用在循环中
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
}
}
this.notifyAll();
System.out.println(Thread.currentThread().getName() + "店员进货1个产品 库存为" + ++product);
}

//卖货
public synchronized void sale() {
while (product <= 0) {
System.out.println("产品缺货,无法售卖");
try {
this.wait();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + "店员销售1个产品 库存为" + --product);
this.notifyAll();
}

控制线程通信Condition

Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题, Condition 方法的名称与对应的 Object 版本中的不同。

在 Condition 对象中,与 wait、 notify 和 notifyAll 方法对应的分别是await、 signal 和 signalAll。Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得Condition 实例,请使用其 newCondition()方法。

/**
* 店员
*/
class ClerkLock {

//库存共享数据 存在安全问题
private int product = 0;

//使用lock,去掉synchronized this.wait和lock就是两把锁,用lock统一
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

//进货
public void get() {
lock.lock();
try {
while (product >= 1) {
System.out.println("产品已满,无法添加");
try {
condition.await();
} catch (InterruptedException e) {
}
}
condition.signalAll();
System.out.println(Thread.currentThread().getName() + "店员进货1个产品 库存为" + ++product);
} finally {
lock.unlock();
}
}

//卖货
public synchronized void sale() {
lock.lock();
try {
while (product <= 0) {
System.out.println("产品缺货,无法售卖");
try {
condition.await();
} catch (InterruptedException e) {
}
}
System.out.println(Thread.currentThread().getName() + "店员销售1个产品 库存为" + --product);
condition.signalAll();
} finally {
}
}
}

这里店员的代码全部处理为condition,用他的方法实现线程的通信。

线程按序交替线程按序交替

编写一个程序,开启 3 个线程,这三个线程的 ID 分别为A、 B、 C,每个线程将自己的 ID 在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。

如:ABCABCABC…… 依次递归9-ReadWriteLock 读写锁读-写锁 ReadWriteLock

package com.atguigu.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @ClassName TestABCAlternate
* @Description: 线程交替打印
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestABCAlternate {

public static void main(String[] args) {
Alternate alternate = new Alternate();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 20; i++) {
alternate.loopA(i);
}
}
}, "A").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 20; i++) {
alternate.loopB(i);
}
}
}, "B").start();

new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i <= 20; i++) {
alternate.loopC(i);
}
}
}, "C").start();
}
}

class Alternate {
private int number = 1;//当前正在执行的线程号
private Lock lock = new ReentrantLock();

private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();

public void loopA(int totalLoop) {
lock.lock();
try {
//1.判断1号线程
if (number != 1) {
condition1.await();
}
//2.开始打印
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + totalLoop);
}
//3.唤醒线程2
number = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void loopB(int totalLoop) {
lock.lock();
try {
//1.判断1号线程
if (number != 2) {
condition2.await();
}
//2.开始打印
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + totalLoop);
}
//3.唤醒线程2
number = 3;
condition3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void loopC(int totalLoop) {
lock.lock();
try {
//1.判断1号线程
if (number != 3) {
condition3.await();
}
//2.开始打印
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i + "\t" + totalLoop);
}
//3.唤醒线程2
number = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

ReadWriteLock 读写锁

ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。

ReadWriteLock 读取操作通常不会改变共享资源,但执行写入操作时,必须独占方式来获取锁。对于读取操作占多数的数据结构。ReadWriteLock 能提供比独占锁更高的并发性。而对于只读的数据结构,其中包含的不变性可以完全不需要考虑加锁操作。

图片

读锁是多个线程可以一起,写锁是独占的

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* @ClassName ReadWriteLock
* @Description: 读写锁 读和写之间不互斥 写和写之间互斥
* @Author: WangWenpeng
* @Version 1.0
*/
public class TestReadWriteLock {
public static void main(String[] args) {
ReadWriteLockDemo demo = new ReadWriteLockDemo();

new Thread(new Runnable() {
@Override
public void run() {
demo.set((int) (Math.random() * 101));
}
}, "writeLock").start();

for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
demo.get();
}
}, "readLock-" + i).start();
}
}
}

class ReadWriteLockDemo {
private int number = 0;
private ReadWriteLock lock = new ReentrantReadWriteLock();

//读
public void get() {
lock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "读:" + number);
} finally {
lock.readLock().unlock();
}
}

//写
public void set(int number) {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "写:" + number);
this.number = number;
} finally {
lock.writeLock().unlock();
}
}
}

线程八锁

  • 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法
  • 锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
  • 加个普通方法后发现和同步锁无关
  • 换成两个对象后,不是同一把锁了,情况立刻变化。
  • 都换成静态同步方法后,情况又变化
  • 所有的非静态同步方法用的都是同一把锁——实例对象本身,也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。
  • 所有的静态同步方法用的也是同一把锁——类对象本身,这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
/**
* 1. 两个普通同步方法,两个线程,标准打印, 打印? //one two
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number.getTwo();
}
}).start();
}
}
class Number {
public synchronized void getOne() {
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
}
/**
2. 新增 Thread.sleep() 给 getOne() ,打印? //one two
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number.getTwo();
}
}).start();
}
}
class Number {
public synchronized void getOne() {
try {
Thread.sleep(3000);//让one 睡3秒
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
}
/*
*3. 新增普通方法 getThree() , 打印? //three one two
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number.getTwo();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number.getThree();
}
}).start();
}
}
class Number {
public synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
//普通方法
public void getThree(){
System.out.println("three");
}
}
/*
* 4. 两个普通同步方法,两个 Number 对象,打印? //two one
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number2.getTwo();
}
}).start();
}
}

class Number {
public synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
}
/*
* 5. 修改 getOne() 为静态同步方法,打印? //two one
*/
public class TestThread8Monitor {
public static void main(String[] args) {
Number number = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}//这样其实不能通过类的实例访问静态,为演示这个问题
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number.getTwo();
}
}).start();
}
}
class Number {
//静态同步方法
public static synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
}
/*
* 6. 修改两个方法均为静态同步方法,一个 Number 对象? //one two
*/
public class TestThread8Monitor {

public static void main(String[] args) {
Number number = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}//这样其实不能通过类的实例访问静态,为演示这个问题
}).start();

new Thread(new Runnable() {
@Override
public void run() {
number.getTwo();
}
}).start();
}
}
class Number {
public static synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public static synchronized void getTwo() {
System.out.println("two");
}
}
/*
* 7. 一个静态同步方法,一个非静态同步方法,两个 Number 对象? //two one
*/
public class TestThread8Monitor {

public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}//这样其实不能通过类的实例访问静态,为演示这个问题
}).start();
new Thread(new Runnable() {
@Override
public void run() {
number2.getTwo();
}
}).start();
}
}

class Number {
public static synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public synchronized void getTwo() {
System.out.println("two");
}
}
/*
* 8. 两个静态同步方法,两个 Number 对象? //one two
*/
public class TestThread8Monitor {

public static void main(String[] args) {
Number number = new Number();
Number number2 = new Number();
new Thread(new Runnable() {
@Override
public void run() {
number.getOne();
}//这样其实不能通过类的实例访问静态,为演示这个问题
}).start();

new Thread(new Runnable() {
@Override
public void run() {
number2.getTwo();
}
}).start();
}
}
class Number {
public static synchronized void getOne() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
System.out.println("one");
}
public static synchronized void getTwo() {
System.out.println("two");
}
}

线程八锁的关键:

  • 非静态方法的锁默认为 this, 静态方法的锁为 对应的 Class 实例
  • 某一个时刻内,只能有一个线程持有锁,无论几个方法。

线程池

第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。

线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。

为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但是,强烈建议程序员使用较为方便的 Executors 工厂方法 :

  • Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)
  • Executors.newFixedThreadPool(int)(固定大小线程池)
  • Executors.newSingleThreadExecutor()(单个后台线程)它们均为大多数使用场景预定义了设置。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
* @Description 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。
* 二、线程池的体系结构:
* java.util.concurrent.Executor : 负责线程的使用与调度的根接口
* |--**ExecutorService 子接口: 线程池的主要接口
* |--ThreadPoolExecutor 线程池的实现类
* |--ScheduledExecutorService 子接口:负责线程的调度
* |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService
* 三、工具类 : Executors
* ExecutorService newFixedThreadPool() : 创建固定大小的线程池
* ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
* ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程
* ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。
* @Author WangWenpeng
* @Param
*/
public class TestThreadPool {

public static void main(String[] args) throws Exception {
//1. 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(5);

//submit Callable方法
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}
});
list.add(future);
}
pool.shutdown();
for (Future<Integer> future : list) {
System.out.println(future.get());
}

//submit Runnable方法
ThreadPoolDemo tpd = new ThreadPoolDemo();
//2. 为线程池中的线程分配任务
for (int i = 0; i < 10; i++) {
pool.submit(tpd);
}
//3. 关闭线程池
pool.shutdown();
}
}

class ThreadPoolDemo implements Runnable {
private int i = 0;

@Override
public void run() {
while (i <= 100) {
System.out.println(Thread.currentThread().getName() + " : " + i++);
}
}
}

线程调度ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。

public static void main(String[] args) throws Exception {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) {
Future<Integer> result = pool.schedule(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int num = new Random().nextInt(100);//生成随机数
System.out.println(Thread.currentThread().getName() + " : " + num);
return num;
}
}, 1, TimeUnit.SECONDS);
System.out.println(result.get());
}
pool.shutdown();
}

ForkJoinPool 分支/合并框架

就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。

JoinFork/Join 框架与线程池的区别

采用 “工作窃取”模式(work-stealing):

  • 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
  • 相对于一般的线程池实现, fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中, 如果一个线程正在执行的任务由于某些原因无法继续运行, 那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间, 提高了性能。
public class TestForkJoinPool {
public static void main(String[] args) {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:" + Duration.between(start, end).toMillis());//166-1996-10590
}
}

class ForkJoinSumCalculate extends RecursiveTask<Long> {
private static final long serialVersionUID = -259195479995561737L;

private long start;
private long end;

private static final long THURSHOLD = 10000L; //临界值

public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long length = end - start;
if (length <= THURSHOLD) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long middle = (start + end) / 2;
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
left.fork(); //进行拆分,同时压入线程队列
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
文章目录
  1. 1. 简介
  2. 2. volatile 关键字
    1. 2.1. 内存可见性
    2. 2.2. 原子性
  3. 3. CAS 算法
    1. 3.1. 模拟CAS算法
    2. 3.2. 原子变量
    3. 3.3. 锁分段机制ConcurrentHashMap
    4. 3.4. CopyOnWrite写入并复制
    5. 3.5. CountDownLatch 闭锁
    6. 3.6. 实现 Callable 接口
    7. 3.7. 同步锁显示锁 Lock
    8. 3.8. lock的等待唤醒机制
    9. 3.9. lock出问题的情况
    10. 3.10. 虚假唤醒
    11. 3.11. 控制线程通信Condition
    12. 3.12. 线程按序交替线程按序交替
    13. 3.13. ReadWriteLock 读写锁
    14. 3.14. 线程池
    15. 3.15. 线程调度ScheduledExecutorService
    16. 3.16. ForkJoinPool 分支/合并框架