JUC并发编程与源码分析_01_概述


本文介绍JUC并发编程以及源码分析。

1. 概述

JUC是java.util.concurrent的缩写,主要涉及到以下三个包:

  1. java.util.concurrent:并发编程的基础包
  2. java.util.concurrent.atomic:并发编程的原子包
  3. java.util.concurrent.locks:并发编程的锁包

那么为什么会出现并发编程呢?

摩尔定律失效,CPU的主频已经到了极限,不再翻倍。而是在核数上做文章,即多核。

在主频不再提高且核数在不断增加的情况下,要想让程序更快就要用到并行或并发编程。

利用并发编程,可以充分利用多核处理器,提高程序性能,形成高并发系统。并且可以提高程序吞吐量, 处理异步+回调等生产请求。

演变:单机单线程——>单机多线程——>多机多线程。

但是除了上面的优点,其实也有一定的弊端和问题,如:

  1. 线程安全问题:比如i++这种非原子操作、集合类等数据结构是否安全等等(比如StringBuffer、Vector、HashTable等都是线程安全的)。
  2. 线程锁问题:Synchronized重量级锁
  3. 线程性能问题:线程并发处理不当,导致性能下降,死锁等等。

因此,在引入并发之后,还需要处理随之而来的一系列问题。

2. 进程/线程回顾

2.1 进程/线程是什么

简单地说,进程就是后台的一个运行程序,是操作系统层面的,进程是分配资源的基本单位。线程则是进程中的一部分,负责处理某个请求,是调度的基本单位,共享某个进程中的资源。比如Java程序运行,这就是启动了一个进程,其中有垃圾回收线程,main线程等等。又或者word中,会有备份容灾线程(突然断电,或者突然关闭,自动备份一份),拼写单词检查线程(自动红色波浪线)等等。

2.2 线程的状态

创建、就绪、运行、阻塞、消亡。

2.3 wait/sleep的区别?

sleep是放弃时间片进入阻塞状态,sleep时间过后,自动进入就绪状态。而wait则是主动释放掉资源,进入无限期等待状态,必须唤醒notify()才会重新进入就绪状态。

  1. wait是Object类中的方法,而sleep是Thread中的静态方法。
  2. 如果wait没有设置时间,则会进入无限期等待状态【阻塞】,必须通过notify()方法来主动唤醒,之后进入就绪状态;而sleep则必须设置时间【阻塞】,时间过后,则会进入就绪状态。【都是释放出CPU时间片】。sleep可以通过interupt()来提前终止sleep。
  3. sleep不会释放资源锁,而wait会释放。
  4. sleep属于线程级别,可以在任何地方使用。而wait只能在synchronized语句块中使用【即必须占有资源所之后,才会wait释放】

2.4 并发/并行是什么?

并行,无论宏观还是围观上,都是一起执行。并发,宏观上是一起执行,但是微观上,则是断断续续地交替串行执行的。

2.5 补充

在集合中,ArrayList是线程不安全的,多线程并发修改和读取,在高并发情况下,会出现java.util.ConcurrentModificationException异常。而Vector则是线程安全的。

因此,如果使用ArrayList出现了线程不安全:

  1. 可以使用Vector替代,

  2. 可以使用Collections工具类中的一些方法,如Collections.synchronizedList(new ArrayList()),即将不安全的数据结构转换为线程安全的结构。

  3. 也可使用JUC下的java.util.concurrent.CopyOnWriteArrayList类

    该类就是写时复制、读写分离的变种,类似MVCC。即读的时候,允许并发读;而写的时候,只能加锁。写的时候,复制一版,一版供读,另一版供写。add源码如下所示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public boolean add(E e) {
    synchronized (lock) {
    Object[] es = getArray();
    int len = es.length;
    es = Arrays.copyOf(es, len + 1);
    es[len] = e;
    setArray(es);
    return true;
    }
    }

    即,在写操作的时候,复制一版,然后并将写后的数组重新复制给原始数组。

同理,HashSet也是线程不安全的,同样,JUC也提供了java.util.concurrent.CopyOnWriteArraySet类。HashMap也是线程不安全的,JUC提供了java.util.concurrent.ConcurrentHashMap类。

3. java.util.concurrent.locks

locks包里面有三个接口:

  1. Lock:排他锁
  2. Condition:与Lock匹配的钥匙
  3. ReadWriteLock:读写分离锁

3.1 Lock

在前面进线程中提到过,解决并发安全问题,除了synchronized之外,还有Lock锁。java.util.concurrent.locks.Lock是一个接口,其主要的实现类有:

  • java.util.concurrent.locks.ReentrantLock:可重入锁
1
2
3
4
5
6
7
Lock lock = new ReentrantLock();
lock.lock();
try{
// ...
}finally {
lock.unlock();
}

3.2 Condition

而与Lock锁匹配的“钥匙”,则是java.util.concurrent.locks.Condition接口,可由lock.newCondition()方法获得实现类对象。

3.3 ReadWriteLock

该接口就是读写锁。实现类是:

  • java.util.concurrent.locks.ReentrantReadWriteLock

该类里面有两个内部类(ReadLock、WriteLock),并且通过方法,创建了实例,返回这两个对象。

4. JUC辅助类

4.1 CountDownLatch

这个工具类可以限制某个线程,等待多少个线程结束之后才执行。即阻塞某线程,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadTest24 {

public static void main(String[] args) {

CountDownLatch countDownLatch = new CountDownLatch(6);

for (int i = 0; i < 6; i++) {

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行完毕");
countDownLatch.countDown(); // 计数
}, String.valueOf(i)).start();
}

// 主线程等待计数结束,即为0时,才会继续执行
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + "最后关闭");
}
}

4.2 CyclicBarrier

这个类,是等待几个线程都开始运行了,才开始运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 等待收集完成之后,执行里面的Runnable实现类线程。
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("开始执行");
});

for (int i = 0; i < 7; i++) {
final int tempInt = 1;
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "收集到第:" + tempInt + "颗龙珠");
try {
cyclicBarrier.await(); // 收集龙珠,等待其他几个线程。
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}

4.3 Semaphore

信号量就是一种资源,线程抢不到会一直等待。如果信号量的数量为1,那么就等同于锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 信号量
Semaphore semaphore = new Semaphore(3); // 设置信号量为3,即资源数量

for (int i = 0; i < 6; i++) {

new Thread(()->{

try {
// 获取信号量,资源自动减一
// 抢不到会一直等下去,,直到有线程释放信号量,或超时。
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t抢到了车位");

// 暂停一会线程
TimeUnit.SECONDS.sleep(3);

System.out.println(Thread.currentThread().getName() + "\t离开了线程");

} catch (InterruptedException e) {
e.printStackTrace();
} finally {

// 释放资源,资源自动加一
semaphore.release();
}

}, String.valueOf(i)).start();
}

5. java.util.concurrent

这个包里面的接口有很多个,常用的有:

  1. BlockingQueue
  2. Callable
  3. Executor
  4. Future

5.1 BlockingQueue阻塞队列

信号量只是一个标志,并不是程序真正处理的数据,但是获取不到信号量就会一直阻塞下去。而阻塞队列也类似,获取不到队列中的数据也会一直阻塞下去;但是阻塞队列存储的数据就是程序真正处理的数据。

除了BlockingQueue,还有BlockingDeque双向队列。这两个都是接口,实现类有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue(只存储一个元素)等。

队列和list类似,一样有add、offer等方法,只不过有的方法会产生阻塞,有的则不会。

6. 线程池

在进程/线程的时候,提到过线程池,无需我们手动创建线程启动线程。线程池在创建的时候,可以指定创建几个线程。然后我们只需要将任务创建好,将其放到线程池中即可。线程池会自动分配线程取执行这个任务。

线程池一方面直接一次性创建多个线程,直接使用。另一方面,线程执行完任务之后,不会销毁,继续保留,等待复用线程。

线程池的优势:

线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务。如果任务数量超过了最大线程数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用;控制最大并发数;管理线程。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统西园,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池主要涉及到的接口和类有:

  1. java.util.concurrent.Executor接口,
  2. java.util.concurrent.ExecutorService接口【主要用这个接口】
  3. java.util.concurrent.ThreadPoolExecutor【实现类】
  4. java.util.concurrent.Executors工具类。

线程池有5个线程,依次有10个任务交给线程池工作。简单案例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ThreadTest25 {

public static void main(String[] args) {
System.out.println();

ExecutorService threadPool = Executors.newFixedThreadPool(5);

try{

for (int i = 0; i < 10; i++) {

final int temp = i;

threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t办理任务" + temp);
});
}

} catch(Exception e){
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}

image-20220817144059421

6.1 ThreadPoolExecutor

线程池最重要的类就是java.util.concurrent.ThreadPoolExecutor,这个类是线程池类。虽然通过工具类可创建各种线程池,但本质上都是通过传入不同的参数,创建线程池对象。

方法名 描述
Executors.newCachedThreadPool() 创建一个可根据需要创建新线程的线程池。可扩容
Executors.newFixedThreadPool(n) 创建一个可重用固定线程数的线程池。
Executors.newSingleThreadExecutor() 创建一个只有一个线程的线程池。
Executors.newScheduledThreadPool(n) 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

本质上还是构造方法,ThreadPoolExecutor的构造方法如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

核心有七个参数,六个属性。

  1. corePoolSize

    线程池中的常驻核心线程数(常驻指的就是值班的人)

  2. maximumPoolSize

    线程池中能够容纳同时执行的最大线程数,此值必须大于等于1。(类似,银行的窗口数,有时候不一定窗口都开,即非核心线程,但是最多只能开窗口数。)

  3. keepAliveTime

    多余的空闲线程的存活时间,当前池中线程数量超过corePoolSize时,当空闲时间达到keepAliveTime时,多余线程会被销毁,直到只剩下corePoolSize个线程为止。

  4. unit

    keepAliveTime的单位

  5. workQueue

    任务队列,被提交但尚未被执行的任务。就是阻塞队列,任务太多,线程处理不过来,就存放到阻塞队列中。(类似银行中的候客区)

  6. threadFactory

    表示生成线程池中工作线程的线程工厂,用于创建线程一般默认的即可

  7. handler

    拒绝策略,表示当任务队列满了,并且工作线程大于整个线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的Runnable的策略。(保持默认即可)

6.2 线程池工作原理

其实线程池完全可以类比银行办理业务,肯定需要窗口(并行最大容量),但是一般情况下,客流量没那么大,所以窗口只开少数几个(核心线程)。如果任务量大,显然需要存放任务,即候客区(阻塞队列)。在客流量大的情况下,候客区必定会存放满;此时只能开放剩余窗口。但是客流量不一定时刻都大,那么什么时候关闭多余的几个窗口呢?空闲窗口的空闲时间达到一定时间,就会关闭(即存活时间)。另外,开放窗口也是需要有人来开放的,即线程工厂,负责创建线程。最后,如果客流量太多了,候客区满了之后,开放全部窗口仍然无法处理客流量,此时只能拒绝进入,即拒绝策略。

核心线程 -> 阻塞队列 -> 扩容直至最大线程数量,最终拒绝策略。扩容线程需要线程工厂,非核心线程在空闲之后,经过存活时间就会消亡。

6.3 参数设置

上面的三个线程池:单一的,固定数量的,可扩容的,哪个用的多?一个都不用,工作中使用自定义的。因为:

  1. FixedThreadPool和SingleThreadPool,允许的请求队列长度为Integer.MAX_VALUE,即阻塞队列很长,这就会导致堆积大量的请求,从而导致OOM。
  2. CachedThreadPool和ScheduledThreadPool,允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

CPU密集型:CPU核数+1就是最大线程数。

IO密集型:CPU核数*2就是最大线程数。

CPU 密集型 和 IO密集型 的区别,如何确定线程池大小?_醋酸菌HaC的博客-CSDN博客_cpu密集型和io密集型区别

6.4 四大拒绝策略

  1. AboryPolicy(默认):直接抛出RejectedExecutionException异常,阻止系统正常运行
  2. CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。即回退到调用者,让调用者执行这个任务。
  3. DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中,尝试再次提交当前任务。
  4. DiscardPloicy:该策略默默丢弃无法处理的任务,不予处理也不抛出异常。如果允许任务丢失,这是最好的一种策略。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class ThreadTest25 {

public static void main(String[] args) {
System.out.println();

// ExecutorService threadPool = Executors.newFixedThreadPool(5);
//
// Executors.newCachedThreadPool();
//
// try{
//
// for (int i = 0; i < 10; i++) {
//
// final int temp = i;
//
// threadPool.execute(() -> {
// System.out.println(Thread.currentThread().getName() + "\t办理任务" + temp);
// });
// }
//
// } catch(Exception e){
// e.printStackTrace();
// } finally {
// threadPool.shutdown();
// }

ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
// new ThreadPoolExecutor.AbortPolicy() // 终止策略
// new ThreadPoolExecutor.CallerRunsPolicy() // 调用者运行策略
// new ThreadPoolExecutor.DiscardPolicy() // 抛弃策略
new ThreadPoolExecutor.DiscardOldestPolicy() // 抛弃策略(阻塞队列中最老的)
);

try{

for (int i = 0; i < 10; i++) {

final int temp = i;

executorService.execute(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "\t办理任务" + temp);
});
}

} catch(Exception e){
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}

7. JMM

JMM是Java Memory Model的简称,即Java内存模型。

7.1 概述

首先回顾操作系统的知识,我们知道,数据存储在硬盘中,然后加载到内存中。之后CPU中就直接读取内存中的数据,放到寄存器进行操作数据。内存的出现就是为了缓解硬盘的缺陷【IO速度慢】,而缓存的出现则是为了进一步优化内存【内存的速度比不过CPU的计算速度】。换句话说,CPU的计算速度远高于硬盘的读写速度,因此,出现了内存,而内存的速度虽然比硬盘高了不少,但是仍然低于CPU的计算速度,此时就出现了缓存。本质上说,就是为了降低CPU的空闲时间。使得其等待时间较短,即计算出结果就立马写入成功,直接进行下一次计算,无需等待写入IO。

查看计算机的CPU缓存,可任务管理器->性能->点击CPU就会出现。

image-20220818101114145

但是我们知道,Java的一个特性就是跨平台,显然一个程序在不同的平台下运行的状态应该是一样的。注意,这的状态指的不仅仅是运行结果,而且还有运行时间、内存访问等等。此时就需要考虑底层硬件的差异。但是有的系统有一级缓存,有的是二级缓存等等。这时候应该怎么办呢?如何屏蔽掉不同操作系统不同硬件的缓存之间的差异呢?

Java是通过JVM来屏蔽操作系统的差异的,因此JVM规范中试图定义一种Java内存模型(Java Memory Model,JMM)来屏蔽掉各种硬件和操作系统的内存访问差异。以实现Java程序在各种平台下都能达到一致的内存访问效果。

7.2 JMM

JMM本身是一种抽象的概念,并不是真实存在的实体,它仅仅描述的是一组约定或规范,通过这组规范定义了程序中(尤其是多线程)各个变量的读写访问方式并决定一个线程对共享变量的写入何时以及如何编程对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的

JMM的关键技术点都是围绕多线程的原子性、可见性和有序性展开的。

  1. 通过JMM来实现线程和主内存之间的抽象关系
  2. 屏蔽各个硬件平台和操作系统的内存访问差异以实现让Java程序在各种平台下都能达到一致的内存访问效果

7.3 可见性

可见性是指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更

JMM规定了所有的变量都存储在主内存中。注意,主内存指的就是内存,不是缓存,也不是寄存器,也不是硬盘。下图左侧是具体的硬件情况,而右侧则是抽象出的模型。一个线程会有自己的一块内存空间,即本地内存,而且线程并不会直接操作主内存,而是会读取主内存的数据到本地内存,然后操作本地内存。最后为了可见性,本地修改完之后,立即将副本推送到主内存中修改数据。

换句话说,主内存是各个线程共享的,而本地内存则是线程私有的。

image-20220818104831550

系统主内存中共享变量数据修改被写入的时机是不确定的,多线程并发下很可能出现“脏读”。因此,因为是高并发,所以如果对主内存加锁(一个线程的工作期间),显然这是不合理的,串行后的效率非常低。此时可对数据进行副本,保存到线程的私有内存中。所以每个线程都有自己的工作内存,线程自己的工作内存保存了该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在线程自己的工作内存中进行,而不能够直接读写主内存中的变量。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递均需要通过主内存来完成

image-20220818110339656

显然,在本地内存修改后,如果没来得及更新主内存,就会出现丢失修改的情况。比如AB线程修改主内存数据D,均是加一操作。A修改了本地内存,但是还没及时更新主内存,此时B就读取了主内存中的数据(仍然为原始数据,不是A修改之后的),此时最终主内存中的数据只能是被加了一次。

因此,可见性指的就是只要本地内存发生了修改,必须通知其他线程,数据发生了修改,读取主内存的数据是不对的,已经过时了,需要通知其他使用该数据的线程。可见性就是及时通知,修改及时可见。

7.4 原子性

指一个操作是不可打断的,即多线程环境下,操作不被其他线程干扰。

7.5 有序性

有序性是指:对于一个线程的执行代码而言,我们总是习惯性认为代码的执行总是从上到下,有序执行。但为了提升性能,编译器和处理器通常会对指令序列进行重新排序。Java规范规定JVM线程内存维持顺序化语义,即只要程序的最终结果与它顺序化执行的结果相等,那么指令的执行顺序可以与代码顺序不一致,此过程叫指令的重排序

简单地说,有序性指的就是指令重排序。我们平时书写的只是Java代码,其编译后加载到内存中的字节码指令顺序不一定就是语句的顺序。

优缺点:

JVM能根据处理器特性(CPU多级缓存系统、多核处理器等)适当的对机器指令进行重排序,使机器指令能更符合CPU的执行特性,最大限度的发挥机器性能。

但是指令重排可以保证串行语义一致,但是没有义务保证多线程间的语义也一致(即可能产生并发安全问题)。简单说,两行以上不相干的代码在执行的时候有可能先执行的不是第一条,不见得是从上到下顺序执行,执行顺序会被优化。

从源码到最终执行示例图:

image-20220818113039922

单线程环境里面确保程序最终执行结果和代码顺序执行的结果一致。处理器在进行重排序时必须要考虑指令之间的数据依赖性。

多线程环境中线程交替执行,由于编译器优化重排的存在,两个线程中使用的变量能够保证一致性是无法确定的,结果无法预测。所以在多线程环境下,应该根据业务慎重重排。

案例如下所示:

1
2
3
4
int x = 11;
int y = 12;
x = x + 5;
y = x + x;

显然第一行和第二行是可以重排的,但是第一行和第三行是不允许重排的,因为第三行显然是依赖第一行的数据的。

7.6 多线程对变量的读写过程

这里针对可见性进行整理一下。读取过程如下:

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有变量都存储在主内存,主内存就是共享数据区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到线程自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,各个线程中的工作内存存储着主内存中的变量拷贝副本,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。

如下图所示:

image-20220818114412883

多线程之间无法读取其他线程本地内存的数据,只能共享主内存中数据,即横向不打通,竖向共享主内存。那么在修改主内存之后,是如何通知其他线程主内存数据发生了改变呢?这里采用了总线嗅探机制,MESI缓存一致性协议。

JMM定义了线程和主内存之间的抽象关系

  1. 线程之间的共享变量存储在主内存中(从硬件角度来说就是内存条)
  2. 每个线程都有一个私有的本地工作内存,本地工作内存中存储了该线程用来读/写共享变量的副本(从硬件角度来说就是CPU的缓存,比如寄存器、L1、L2、L3缓存等等)

小结:

  1. 我们定义的所有共享变量都存储在物理主内存中
  2. 每个线程都有自己独立的工作内存,里面保存该线程使用到的变量的副本(主内存中该变量的一份拷贝)
  3. 线程对共享变量所有的操作都必须先在线程自己的工作内存中进行,然后写回主内存,不能直接从主内存中读写(不能越级)
  4. 不同线程之间也无法直接访问其他线程的工作内存中的变量,线程间变量值的传递需要通过主内存来进行(同级不能相互访问)

7.7 多线程先行发生原则之happens-before

在JMM中,如果一个操作执行的结果需要对另一个操作可见,或者代码重排序,那么这两个操作之间必须存在happens-before(先行发生)原则。也就是逻辑上存在先后关系。

案例如下所示:

1
2
x = 5;	// 线程A执行
y = x; // 线程B执行

问题:y是否等于5呢?

如果线程A的操作(x=5)happens-before(先行发生)线程B的操作(y=x),那么可以确定线程B执行后y=5一定成立;

如果他们不存在happens-before原则,那么y=5不一定成立。

这就是happens-before原则的威力,包含可见性和有序性的约束。

如果Java内存模型中所有的有序性都仅靠volatile和synchronized来完成,那么有很多操作都讲会变得非常啰嗦。但是我们没有时时、处处、次次添加volatile和synchronized来完成程序,这是因为Java语言中JMM原则下有一个“先行发生”(Happens-Before)的原则限制和规矩,已经立好了规矩。其实synchronized和volatile其实底层也是依据本原则的。

这个原则非常重要:它是判断数据是否存在竞争、线程是否安全的非常有用的手段。依赖这个原则,我们可以通过几条简单规则一揽子解决并发环境下两个操作之间是否可能存在冲突的所有问题,而不需要陷入Java内存模型苦涩难懂的底层编译原理之中。

7.7.1 总原则

  1. 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  2. 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法。注意,只有结果一致,重排才合法。

7.7.2 八条原则

这里从学术角度陈述happens-before的八条原则。

  1. 次序规则

    就是先来先到的原则。一个线程内,按照代码顺序,写在前面的操作先行发生于写在后面的操作。简单说就是,前一个操作的结果可以被后续的操作获取。就是前面一个操作把变量X赋值为1,那后面一个操作肯定能知道x已经变成1。

  2. 锁定规则

    一个unLock操作先行发生于后面(这里的后面是指时间上的先后)对同一个锁的lock操作。换句话说,只有线程释放锁之后,其他线程才能获取到该锁。

  3. volatile变量规则

    对一个volatile变量的写操作先行发生于后面对这个变量的读操作,前面的写对后面的读是可见的。(这里的后面同样是指时间上的先后),就是说,只要写了,后面的读就会可见这个写后的结果。

  4. 传递规则

    如果才做A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C。

  5. 线程启动规则(Thread Start Rule)

    Thread对象的start()方法先行发生于此线程的每一个操作,其实就是start方法就是线程的入口,一定是最先执行的。

  6. 线程中断规则(Thread Interruption Rule)

    对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生。可以通过Thread.interrupted()检测到是否发生中断。也就是说,要先调用interrupt()方法设置过中断标志位,才能检测到中断发送。

  7. 线程终止规则(Thread Termination Rule)

    线程中的所有操作都先行发生于对此线程的终止检测,我们可以通过isAlive()等手段检测线程是否已经终止执行。

  8. 对象终结规则(Finalizer Rule)

    一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

7.7.3 小结

在Java语言里面,Happens-Before的语义本质上是一种可见性。A Happens-Before B意味着A发生过的事情对B来说是可见的,无论A事件和B事件是否发生在同一个线程里。

JMM的设计分为两部分:

  1. 一部分是面向我们程序员提供的,也就是happens-before原则,它通俗易懂的向我们程序员阐述了一个强内存模型,我们只要理解happens-before规则,就可以编写并发安全的程序了。
  2. 另一部分是针对JVM实现的,为了尽可能少的对编译器和处理器做约束从而提高性能,JMM在不影响程序执行结果的前提下对其不做要求,即允许优化重排序。我们只需要关注前者就好了,也就是理解happens-before规则即可,其他繁杂的内容有JMM规范结合操作系统给我们搞定,我们只写好代码即可。

8. volatile

volatile是一个修饰符,在高并发下经常使用。被volatile修饰的变量有两大特点:

  1. 可见性
  2. 有序性(不再允许JVM自己重排,而是由程序员自己设置是否重排)

参考上面JMM规范的三大特性,可知volatile不支持原子性。

volatile的具体含义如下:

  1. 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中。
  2. 当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,重新回到主内存中读取最新共享变量。
  3. 所以,volatile的写内存语义是直接刷新到主内存中,读的内存语义是直接从内存中读取。(即可见性)

那么volatile是如何保证读写是直接写回和读取主内存呢(可见性),如何禁止重排呢(有序性)?通过内存屏障Memory Barrier。

8.1 内存屏障

回顾现实世界,屏障随处可见,比如高架桥上的两侧围栏、比如湖边的围栏等等。屏障就是为了保证安全,规范行为,使其有序(即禁止随意走动,重排)。

内存屏障(也称内存栅栏,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后,才可以开始执行此点之后的操作),避免代码重排序。

内存屏障其实就是一种JVM指令,Java内存模型的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过内存屏障指令,volatile实现了Java内存模型中的可见性和有序性(禁重排),但volatile无法保证原子性。

内存屏障是一个同步点,该点之前的所有写操作都要回写到主内存,该点之后的所有读操作就能获得内存屏障之前的所有写操作的最新结果。(实现了可见性)

  1. 写屏障(Store Memory Barrier):告诉处理器在写屏障之前将所有存储在缓存(store bufferes)中的数据同步到主内存。也就是说,当看到Store命令时,就必须把该指令之前所有写入指令执行完毕后才能继续向下执行。
  2. 读屏障(Load Memory Barrier):告诉处理器在读屏障之后的读操作,都在读屏障之后执行(不允许重排到前面)。也就是说,在Load屏障指令之后就能保证后面的读取数据指令一定能够读取到最新的数据。【换句话说,本指令规定了不允许重排后的语句出现在读屏障之前,也就是说规定了重排序后的位置上限,而此时读屏障上面有写屏障,这就保证了下面的读操作一定是读取的最新数据】

因此在重排序时,不允许把内存屏障之后的指令重排序到内存屏障之前。一句话:对一个volatile变量的写,先行发生于任意后续对这个volatile变量的读,也叫写后读。

8.2 内存屏障分类

7.7节中的happens-before先行发生原则,类似接口规范,但是是怎么落地实现的呢?答案是:内存屏障。

粗略地说,内存屏障分为以下三种:

  1. 读屏障(Load Barrier)

    在读指令之前,插入读屏障,让工作内存活CPU高速缓存当中的缓存数据失效,重新回到主内存中获取最新数据

  2. 写屏障(Store Barrier)

    在写指令之后插入写屏障,强制把写缓冲区的数据刷回到主内存中

  3. 全屏障(Full Barries)

    上面两个的集合,即组合使用,从而达到有序性和可见性。

详细地说,底层c++程序,内存屏障会分为四种:

  1. loadload
  2. storestore
  3. loadstore
  4. storeload

image-20220818204145836

上述四类屏障,保证了重排的范围,即屏障前后的重排不能跨过屏障。

8.3 禁重排概述

  1. 重排序有可能影响程序的执行和实现,因此,我们有时候希望告诉JVM你别“自作聪明”给我重排序,我这里不需要排序,听主人的。
  2. 对于编译器的重排序,JMM会根据重排序的规则,禁止特定类型的编译器重排序。
  3. 对于处理器的重排序,Java编译器在生成指令序列的适当位置,插入内存屏障指令,来禁止特定类型的处理器排序。

8.4 happens-before之volatile变量规则

image-20220818213550840

  1. 当第一个操作为volatile读时,不论第二个操作是什么,都不能重排序。这个操作保证了volatile读之后的操作不会被重排到volatile读之前。【对应图中篮框】
  2. 当第二个操作为volatile写时,不论第一个操作是什么,都不能重排序。这个操作保证了volatile写之前的操作不会被重排到volatile写之后。【对应图中红框】
  3. 当第一个操作为volatile写时,第二个操作为volatile读时,不能重排。【对应紫圈】

具体来说,Java中volatile是通过读写屏障实现的,如下所示:

  1. volatile读:

    1. 在每个volatile读操作的后面插入一个LoadLoad屏障,禁止处理器把上面的volatile读与下面的普通读重排序。

    2. 在每个volatile读操作的后面插入一个LoadStore屏障,禁止处理器把上面的volatile读与下面的普通写重排序。

      image-20220818215947875

  2. volatile写:

    1. 在每个volatile写操作的前面插入一个StoreStore屏障,可以保证在volatile写之前,其前面的所有普通写操作都已经刷新到主内存中。

    2. 在每个volatile写操作的后面插入一个StoreLoad屏障,作用是避免volatile写与后面可能有的volatile读/些操作重排序。

      image-20220818215433738

8.5 保证可见性

可见性指的是不同线程对某个变量完成操作后对结果及时课件,即该共享变量一旦改变所有线程立即可见。

案例如下所示,需求:两个线程,A线程根据某共享变量进行循环,而B线程会修改共享变量,然后退出程序。此时共享变量如果没有volatile修饰的话,是无法保证可见性的。代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ThreadTest26 {

// 共享变量
static boolean flag = true;

public static void main(String[] args) {
System.out.println();

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t--->come in");
while(flag){
// 注意,循环体不能写sout输出语句,因为输出语句用到了sync,就会重新从主内存加载,即这是主动加载数据,并不是可见性主动通知的。
}
System.out.println(Thread.currentThread().getName() + "\t--->flag被设置为false,程序停止");
}, "t1").start();

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 主线程修改flag
flag = false;
System.out.println(Thread.currentThread().getName() + "\t 修改完成,flag为:" + flag);

// 运行程序,可以发现,此时子线程并没有停止,说明其读取的就是它的本地内存,而没有及时更新主内存。
// 或者说,本线程修改数据后,没有刷新到主内存。
// 不管那个,都是可见性没有保证。
}
}

将共享变量设置为volatile修饰,可以看到程序能够正常结束,子线程可以及时获取到最新的数据,从而正常结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadTest27 {

// 共享变量,volatile修饰
static volatile boolean flag = true;

public static void main(String[] args) {
System.out.println();

new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t--->come in");
while(flag){}
System.out.println(Thread.currentThread().getName() + "\t--->flag被设置为false,程序停止");
}, "t1").start();

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 主线程修改flag
flag = false;
System.out.println(Thread.currentThread().getName() + "\t 修改完成,flag为:" + flag);

// 运行程序,可以发现,主线程修改了flag之后,能够及时通知子线程。程序正常运行结束。
}
}

那么子线程为什么看不到主线程main修改为false的flag的值呢?

问题可能:

  1. 主线程修改了flag之后没有将其刷新到主内存,所以t1线程看不到。
  2. 主线程将flag刷新到了主内存,但是t1一直读取的是自己工作内存中flag的值,没有去主内存中更新获取flag最新的值。

正常流程希望:

  1. 线程中修改了自己工作内存中的副本之后,立即将其刷新到主内存。
  2. 工作内存中每次读取共享变量时,都去主内存中重新服务,然后拷贝到工作内存。

解决方案:

  • 使用volatile修饰共享变量,就可以达到上面的效果,被volatile修饰的变量有以下特点:
    1. 线程中读取的时候,每次读取都会去主内存中读取共享变量最新的值,然后将其复制到工作内存。
    2. 线程中修改了工作内存中变量的副本,修改之后会立即刷新到主内存。

这里要注意,主内存中的数据,在修改的时候要加锁,保证并发安全。而且加锁后,会清空其他线程工作内存中该变量副本的值,在使用变量前必须重新load或assign。

8.6 JMM中的8个原子操作

工作内存和主内存之间的数据操作会涉及到几个原子操作。

  1. read:作用于主内存,将变量的值从主内存传输到工作内存,主内存到工作内存
  2. load:作用于工作内存,将read从主内存传输的变量值放入到工作内存变量副本中,即数据加载
  3. use:作用于工作内存,将工作内存变量副本的值传递给执行引擎,每当JVM遇到需要该变量的字节码指令时会执行该操作
  4. assign:作用于工作内存,将从执行引擎接收到的值赋值给工作内存变量,每当JVM遇到一个给变量赋值字节码指令时就会执行该操作
  5. store:作用于工作内存,将赋值完毕的工作变量的值写回给主内存
  6. write:作用于主内存,将store传输过来的变量值赋值给主内存中的变量

由于上述6条只能保证单条指令的原子性,针对多条指令的组合性原子保证,没有大面积加锁,所以,JVM提供了另外两个原子指令

  1. lock:作用于主内存,将一个变量标记为一个线程独占的状态,只是写时候加锁,就只是锁了写变量的过程
  2. unlock:作用于主内存,把一个处于锁定状态的变量释放,然后才能被其他线程占用。

image-20220818224726310

8.7 无原子性

volatile不支持原子性,只有可见性和有序性。volatile变量的复合操作不具有原子性,比如number++。

以多线程对某个变量进行++操作,可以看到,在没有synchronized等锁修饰的前提下,无论对变量是否添加volatile修饰,都不能保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MyNumber {

volatile int number;

public void addPlusPlus(){
number++;
}

}

public class ThreadTest28 {

public static void main(String[] args) {

System.out.println();

MyNumber mn = new MyNumber();

for (int i = 1; i <= 10; i++) {

new Thread(() -> {
for (int j = 0; j < 1000; j++) {
mn.addPlusPlus();
}
}, String.valueOf(i)).start();
}

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(mn.number);
}
}

原因分析:因为JMM只定义了8个原子操作,这里面的操作要么是加载读取,要么是存储写入等等。都没有涉及到运算。

而上面的++操作,因为底层涉及到获取变量值,然后加1,然后存储,此时,只能保证获取值和存储是原子操作,但是中间,任意线程都可操作。这时候,就相当于丢失修改。无论是不是复合操作,就算是基本的语句,也是无法保证原子性的。而可见性指的是在读取时一定是最新的,但是读取后,就无法保证再读取了。

image-20220819095238645

而前面的用synchronized关键字修饰的情况,保证了从读取数据到写入数据整个过程都是加锁的,也就是上面图中,包括实线和虚线两部分,而不仅仅是实线。这就保证了整体原子性,从而保证不会被丢失修改。

另外,synchronized保证的是从主内存中读取数据。

那么volatile关键字的作用是什么呢?既然synchronized已经完全实现了全部功能。因为synchronized锁太重量级了,完全保证了主内存中的数据是不可见的。

虽然volatile没有保证原子性,但是能够实现可见性,因此对于共享变量,但是没有更新操作的情况还是比synchronized适合的。即volatile变量不合参与到依赖当前值的运算

8.8 指令禁重排

重排序是指编译器和处理器为了优化性能而对指令序列进行重新排序的一种手段,有时候会改变程序语句的先后顺序:

  1. 不存在数据依赖关系,可以重排序;
  2. 存在数据依赖关系,禁止重排序;

但重排后的指令绝对不能改变原有的串行语义!这点在并发设计中必须要重点考虑

数据依赖性是指:若两个操作访问同一变量,且这两个操作中有一个为写操作,此时两操作间就存在数据依赖性

从 源代码到最终执行的指令序列中,需要有三次重排序。

image-20220819105036112

编译器和处理器在重排序时,会遵守数据依赖性,不会改变存在依赖关系的两个操作的执行,但不同处理器和不同线程之间的数据性不会被编译器和处理器考虑,其只会作用于单处理器和单线程环境

此时可再回顾一下8.4中的规则,因为存在数据依赖性,所以加入屏障,所以也就不会重排序。

8.9 总结

volatile关键字保证了其修饰的变量在任何地方写的时候,都能够立即刷新回主内存中;在任何地方读的时候,都是读取的是主内存中的数据,保证了可见性。

另外,其修饰的变量在读写的时候都加入了内存屏障,禁止重排序,保证了部分线程安全。

但是没有保证针对该变量操作的原子性。

那么volatile应该使用在哪里呢?

  1. 对于单一赋值的情况,是可以的。比如a=10这种。而a=a+10是不可的,a=b+10也是不可的。
  2. 对于状态标志,判断业务是否结束,根据flag是否为true来结束某个任务。
  3. 对于开销较低的读,也是可以的。比如读操作远多于写,写方法用synchronized修饰,而读方法可以不用synchronized修饰,在变量上直接用volatile修饰。
  4. 单例模式,DCL(Double Check Lock)双端锁的发布。

在翻译底层字节码指令时,会在变量定义的时候,如果该变量是volatile修饰的,那么就会添加一个flags标志位,值为ACC_VOLATILE。

9. CAS算法

CAS算法的实现,主要体现在java.util.concurrent.atomic包中,里面的原子类几乎都是CAS思想的落地实现。

前面的案例提到过,在多线程下,对于共享变量,如果不加synchronized,肯定会出现线程安全问题。而synchronized是比较重量级的,应该少用。但是volatile是无法保证的。可以直接使用atomic包中的数据类型,比如AtomicInteger,作为整型变量。

除了AtomicInteger之外,还有AtomicBoolean等封装的基本数据类型。另外,还有AtomicReference<T>封装了引用数据类型。

9.1 概述

CAS是Compare And Swap的缩写,中文翻译成比较并交换,是实现并发算法时常用到的一种技术。它包含三个操作数——内存位置值(V)、预期原值(A)、更新值(B)。

  1. 执行CAS操作的时候,将内存位置的值与预期原值比较
  2. 如果相匹配,那么处理器会自动将该位置更新为新值
  3. 如果不匹配,处理器不做任何操作,多个线程同时执行CAS操作,只有一个会成功。

当且仅当旧的预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做或重来,这种重来重试的这种行为称为——自旋。注意,自旋就是再次执行这一个操作,即重新读取值,执行线程操作,写入数据。这样既避免了丢失修改,也保证了线程能够执行完成。(自旋锁)

就是说,在线程刚开始,肯定要先读取数据,即A,然后经过操作,修改为B,之后写入的时候,读取数值V,判断V和A是否一致。即判断是否有人修改过这个值。从而保证没有被人操作过数据。

9.2 CAS算法底层

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。

CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg

执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行CAS操作,也就是说CAS的原子性实际上是CPU实现独占的,比如用synchronized重量级锁,这里的排他时间要短很多,所以在多线程情况下性能会比较好。

和内存屏障类似,AtomicInteger的相关操作底层仍然是Unsafe类实现的。换句话说,atomic包中都是CAS算法的落地实现,所以CAS的核心就是Unsafe类。

Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部方法操作可以向C的指针一个直接操作内存,因为Java中CAS操作的执行依赖于Unsafe类的方法。

注意,Unsafe类中的所有方法都是native修饰的,也就是说Unsafe类中的方法都是直接调用操作系统底层资源执行相应任务。

9.3 自旋锁SpinLock

CAS是实现自旋锁的基础。自旋锁其实比较简单,就是旋转,采用循环的方式去获取锁,不会立即阻塞。自旋锁的好处就是不会阻塞,减少了线程上下文切换的消耗,缺点是循环会消耗CPU。

自旋锁案例如下所示:本质上就是通过对属性赋值,判断是否等于原始值来进行是否赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class SpinLockDemo {

// 注意,原子引用对象,如果不是利用compareAndSet赋值,其余情况都是null
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock(){
Thread thread = Thread.currentThread();

// 自旋锁,如果原子引用是null,那么就赋值为thread。
// 其实就是赋值,如果赋值失败,那么就一直循环请求判断
while(!atomicReference.compareAndSet(null, thread)){

}

System.out.println(Thread.currentThread().getName() + "\t" + "---come in");
}

public void unlock(){
Thread thread = Thread.currentThread();
// 解锁
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "\t" + "---come out");
}

public static void main(String[] args) {

SpinLockDemo spinLockDemo = new SpinLockDemo();

new Thread(()->{
// 以自旋锁形式获取锁
spinLockDemo.lock();

// 模拟线程进行工作
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}

// 工作完之后,释放锁
spinLockDemo.unlock();
}, "A").start();


// 暂停500毫秒,保证A先启动,开始B线程启动
new Thread(()->{
// 以自旋锁形式获取锁
spinLockDemo.lock();

// 工作完之后,释放锁
spinLockDemo.unlock();
}, "B").start();
}
}

9.4 CAS算法缺点

  1. 循环时间长,开销很大

    如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

  2. ABA问题

    CAS仅仅是在开始和最后写入的时候读取并判断二者是否相等。如果在此期间,有线程修改该值并又修改回原值,显然CAS会察觉不到。即并不是没有修改过数据。

因此,要想解决ABA问题,可添加版本号,每次修改版本号都会强制自动加一。atomic包中的工具类如AtomicStampReference,也就是给引用添加了版本号。

10. ThreadLocal

ThreadLocal也被称为线程局部变量,注意,这是一个变量

1
Each thread holds an implicit reference to its copy of a thread-local variable as long as the thread is alive and the {@code ThreadLocal} instance is accessible; after a thread goes away, all of its copies of thread-local instances are subject to garbage collection (unless other references to these copies exist).

ThreadLocal提供线程局部变量。这些变量与正常的变量不同,因为每一个线程在访问ThreadLocal实例的时候(通过get或set方法)都有自己的、独立初始化的变量副本。ThreadLocal实例通常是类中的私有静态字段,使用它的目的是希望将状态(例如,用户ID或事务ID)与线程关联起来。而不是仅在一个方法内,或者是在一个类中的某个属性,脱离了对象的概念

简单地说,类似servlet中的应用域、会话域、请求域等等,这里的ThreadLocal可以看成是线程域(只不过这个线程域只能存放一个变量)。回顾在web项目中,经常在controller、service之间通过方法调用来传参,其实有些时候,如果方法传参不合适,可以采用ThreadLocal来存储,因为一般情况下,传参都是一次请求中用到,而一次请求就对应servlet中的一个线程。

在JMM以及JVM中提到过,JVM为每个线程都分配一块栈空间,对于共享变量,会将其拷贝一份到本地变量中。主要解决了让每个线程绑定自己的值,通过使用get()和set()方法,获取默认值或将其值更改为当前线程所需的副本的值,从而避免了线程安全问题。换句话说,给线程分配了本地变量副本,每个线程操作的都是自己空间的变量,此时在一定程度上,避免了线程安全问题。而如果没有拷贝空间的话,此时就会在主内存中加锁,串行访问,使得效果降低。

ThreadLocal可以看成是当前线程的一个局部变量,在线程中的任意时刻都可以使用,可以看成是打破了方法的作用域。并且ThreadLocal是存储在ThreadLocalMap中。

10.1 Thread、ThreadGroup、ThreadLocal<T>、ThreadLocalMap

前三者都是在java.lang包下。ThreadLocalMap是ThreadLocal的静态内部类。

10.2 ThreadLocal

java.lang.ThreadLocal类有一个构造方法,还有几个方法。

方法名 描述
ThreadLocal() 创建线程局部变量对象。
T get() 返回当前线程的此线程局部变量副本中的值
protected T initialValue() 返回此线程局部变量的当前线程的“初始值”
void remove() 删除此线程局部变量的当前线程值
void set(T value) 将此线程局部变量的当前线程副本设置为指定值。
static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) 创建一个线程局部变量。【和第三个方法功能一样,一般不再使用第三个方法】

注意,ThreadLocal对象不能用构造方法来创建,创建后的对象虽然不是null,但是get()方法是null,如果直接使用会报空指针异常。必须初始化,可以直接用第5个方法以初始化形式来创建对象。

ThreadLocal案例如下所示,注意,size只是为了验证结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 /* 需求1:5个销售卖房子,集团高层值关心销售总量的准确统计数。
*
* 需求2:5个销售卖完房子,各自独立销售额度,自己业绩按照提成走,分灶吃放,各个销售自己动手,丰衣足食。
*/
public class ThreadTest29 {

public static void main(String[] args) {
System.out.println();


House house = new House();

for (int i = 1; i <= 5; i++) {

new Thread(() -> {

// 随机每个销售的销售额
int size = new Random().nextInt(5) + 1;

// 销售房子
for (int j = 0; j < size; j++) {
house.saleHouse(); // 从共享变量中售卖房子
house.saleVolumeByThreadLocal(); // 从本地变量中售卖房子
}

// 这里不能用size,因为真实情况下,是不知道的,只能根据线程自己的操作来动态获取。只能采用线程本地变量
// System.out.println(Thread.currentThread().getName() + "销售卖出房子:" + size);
System.out.println(Thread.currentThread().getName() + "销售卖出房子:" + house.saleVolume.get());

}, String.valueOf(i)).start();
}

try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(Thread.currentThread().getName() + "\t" + "共计卖出多少套:" + house.saleCount);
}
}

class House {

int saleCount = 0;

public synchronized void saleHouse(){
saleCount++;
}

// 针对需求2,设置ThreadLocal线程本地变量,每个线程独一份。
// 设置初始化本地变量值为0
ThreadLocal<Integer> saleVolume = ThreadLocal.withInitial(() -> 0);

public void saleVolumeByThreadLocal(){
saleVolume.set(saleVolume.get() + 1);
}
}

10.3 案例分析

上面的ThreadLocal案例有什么隐患呢?

线程的本地空间是有限的,尤其是在线程池中,而ThreadLocal本地变量是线程自带的,空间有限,在多线程复用的情况下,有可能会使得系统逻辑混乱,内存溢出。所以在使用完这个变量后,要及时remove掉该数据,即上面的第四个方法。

阿里巴巴开发手册:必须回收自定义的ThreadLocal变量,尤其在线程池场景下,线程经常会被复用,如果不清理自定义的ThreadLocal变量,可能会影响后续业务逻辑和造成内存泄露等问题。尽量在代理中使用try-finally块进行回收。

尤其在线程池中,因为线程池中的线程是会复用的,如果该线程分配的每个任务都用到了ThreadLocal,而且该任务执行结束后,没有remove掉该变量【注意,执行任务的时候,ThreadLocal是线程本地变量,占用的是线程的空间】。显然随着线程的复用,也就是任务的分配,线程的可用空间就会越来越少。所以就会导致内存泄露,直至内存溢出。

注意:经过测试,如果不同任务是同一个对象,那么所有线程的ThreadLocal的内存地址是一样的。如果任务不是一个对象,那么ThreadLocal的内存地址就不同。【我认为,ThreadLocal在对象级别,感觉就是在主内存拷贝一份,复制到本地内存,但是此时该对象指的就是主内存的内存地址。】

另外,因为线程复用,所以不同的任务的初始值应该是一样的,但是由于线程复用了,导致当前任务的初始值是上一个任务的执行之后的结果。即逻辑发生混乱。

修改后的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for (int i = 1; i <= 5; i++) {

new Thread(() -> {

// 随机每个销售的销售额
int size = new Random().nextInt(5) + 1;

try{

// 销售房子
for (int j = 0; j < size; j++) {
house.saleHouse(); // 从共享变量中售卖房子
house.saleVolumeByThreadLocal(); // 从本地变量中售卖房子
}

// 这里不能用size,因为真实情况下,是不知道的,只能根据线程自己的操作来动态获取。只能采用线程本地变量
// System.out.println(Thread.currentThread().getName() + "销售卖出房子:" + size);
System.out.println(Thread.currentThread().getName() + "销售卖出房子:" + house.saleVolume.get());

}finally {
// 此刻,该任务已经执行结束,本地变量不需要了。后续没有该任务了。
// remove掉该本地变量。
house.saleVolume.remove();
}

}, String.valueOf(i)).start();
}

内存泄露的案例比较难实现,这里实现一个简单的业务逻辑混乱情况。10个任务,每个任务都是从0加到1,采用线程池,只有3个核心线程。显然,各个任务最开始的值,并不都是0,因为一个线程复用了,导致前面的任务结果影响了后面的任务结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MyData {

ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

public void add(){
threadLocal.set(threadLocal.get() + 1);
}
}

public class ThreadTest30 {

public static void main(String[] args) {

MyData myData = new MyData();

ExecutorService threadPool = Executors.newFixedThreadPool(3);

try {

// 10个任务分配到3个线程中
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {

// 获取到操作之前的值
Integer beforeInt = myData.threadLocal.get();
myData.add();
Integer afterInt = myData.threadLocal.get();

System.out.println(Thread.currentThread().getName() + "\tbefore:" + beforeInt + "\tafter:" + afterInt);

});
}

} catch (Exception e) {

} finally {
threadPool.shutdown();
}
}
}

结果如下所示:

image-20220819233015252

为了保证在线程复用的情况下,各个任务之间互不干扰,需要及时remove掉本地变量。修改后的部分代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 10个任务分配到3个线程中
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {

try{

System.out.println(myData.threadLocal);

// 获取到操作之前的值
Integer beforeInt = myData.threadLocal.get();
myData.add();
Integer afterInt = myData.threadLocal.get();

System.out.println(Thread.currentThread().getName() + "\tbefore:" + beforeInt + "\tafter:" + afterInt);

}finally {

// 清空该值,使得是初始值
myData.threadLocal.remove();

}

});
}

10.4 ThreadLocal源码分析

Thread的部分源码如下:

1
2
3
public class Thread implements Runnable {
ThreadLocal.ThreadLocalMap threadLocals = null;
}

上面的threadLocal其实就是线程真正的本地变量,而该变量的数据类型就是ThreadLocal中的内部类ThreadLocalMap。

ThreadLocal的部分源码如下。ThreadLocal里面有ThreadLocalMap静态内部类,而ThreadLocalMap里面也有Entry静态内部类,该类继承了弱引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ThreadLocal<T> {
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
// 弱引用
super(k);
value = v;
}
}
}
}

ThreadLocal中的get方法源码如下。获取当前线程,然后调用getMap方法,获取到当前线程的thredLocals属性。如果thredLocals不是null,然后获取Entry,并判断是否为null,如果不为null,则返回具体存储的值。如果thredLocals为null或者Entry为null,那么就会创建一个ThreadLocal返回其初始值。【而初始值就是返回null】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

private T setInitialValue() {
// 初始化为null
T value = initialValue();

// 获取当前线程,并获得ThreadLocal变量,如果有则直接赋值
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
// 如果没有,则创建并赋值
createMap(t, value);
}
if (this instanceof TerminatingThreadLocal) {
TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this);
}
return value;
}

protected T initialValue() {
return null;
}

void createMap(Thread t, T firstValue) {
// 可以看到,其实ThreadLocal本地变量,也是key、value键值对,key就是thisThreadLocal对象。所以也就是说entry存储空间,一个ThreadLocal只有一份。
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// Entry数组,长度为16
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}

private static final int INITIAL_CAPACITY = 16;

ThreadLocal中的set方法源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}

// ThreadLocalMap.set
private void set(ThreadLocal<?> key, Object value) {

// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

if (k == key) {
e.value = value;
return;
}

if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

实际上,ThreadLocalMap就是一个以ThreadLocal实例为key,任意值为value的Entry对象。

当我们为threadLocal变量赋值的时候,实际上就是以当前theadLocal实例为key,值为value的Entry往这个threadLocalMap中存放。

因此说,ThreadLocal本质上并不存储值,而是通过该对象获取到ThreadLocalMap对象,然后在Map对象中存储<threadLocal, value>。

image-20220820105551438

10.5 ThreadLocal内存泄露

内存泄漏:不再会被使用的对象或者变量占用的内存不能被回收,就是内存泄露。

上面提到过,ThreadLocal本质上是ThreadLocalMap,而ThreadLocalMap底层则是Entry数组,Entry是内部类,源码如下所示:

1
2
3
4
5
6
7
8
9
10
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
// 调用弱引用的构造方法,创建一个弱引用指向ThreadLocal对象
super(k);
value = v;
}
}

ThreadLocalMap从字面上就可以看出这是一个保存ThreadLocal对象的map(以ThreadLocal为key),不过这是经过了两层包装的ThreadLocal对象:

  1. 第一层包装是使用WeakReference<ThreadLocal<?>>将ThreadLocal对象变成一个弱引用的对象;
  2. 第二层包装是定义了一个专门的类Entry来扩展WeakReferen ce<ThreadLocal<?>>

强软弱虚四个引用的关系如下图所示,软引用、弱引用和虚引用均继承强引用。强引用无论什么时候都不会被回收,软引用只有在内存不足引起垃圾回收的时候,才会被回收;弱引用是下一次垃圾回收的时候进行回收;虚引用则是一个附属信息,不会决定对象的生命周期,get()方法返回null,必须和ReferenceQueue联合使用,相当于一个监控的作用。

image-20220820120516235

那么在存储的时候,为什么要将ThreadLocal对象封装成弱引用呢?如果不用会有什么后果呢?

其实在JVM已经提到过,强引用只是在栈内存中的一个变量,而上面只是给ThreadLocal对象多了一个弱引用,要想ThreadLocal对象真正的在垃圾回收时消失,就必须使得原始的强引用丢失。只保留一个弱引用。

当方法执行完以后,栈帧销毁,即强引用就会消失,此时ThreadLocal对象就只有一个弱引用指向,这样的话,肯定在下次垃圾回收时就会回收这个ThreadLocal对象。因为,只要强引用消失,从业务逻辑上讲,其实就不会再使这个对象了,为了尽快地清除它,所以使用弱引用最合适。

如果Entry中的key引用是强引用,显然就会导致key指向的ThreadLocal对象以及v指向的对象不能被gc回收,造成内存泄露。【因为,key是强引用,而外部已经不使用ThreadLocal对象了,即没用但是却回收不了】

如下所示,ThreadLocal对象被回收以后,key指向了null。但是我们知道,ThreadLocal本质上是存储在ThreadLocalMap中的Entry的,也就是ThreadLocalMap并没有消失,ThreadLocalMap中出现了key为null的Entry。而如果线程复用,显然ThreadLocalMap就会存储很多个任务的ThreadLocal,这时候Entry数组(table)就会被占满,而且占用的对象value也不会再被访问,因为key已经为null了。造成内存泄露。

因此,为了避免出现过多的null情况,当key为null的时候,之后,我们再次调用get、set、remove方法时,就会尝试删除key为null的entry,可以释放value对象所占用的内存。即set方法中调用replaceStaleEntry方法来释放value。

也就是说,弱引用不能百分百的保证内存不泄露,因为value必须我们调用方法时才会被清除。

我们应该在不使用某个ThreadLocal对象后,手动调用remove方法来删除它,尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取到上个线程任务遗留下来的value值,造成bug。

而ThreadLocal的remove方法,则是清除掉key和value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ThraedLocal.remove()
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}

// ThreadLocalMap.remove()
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}

image-20220820131738702

10.6 总结

  1. ThreadLocal.withInitial();

  2. 建议把ThreadLocal修饰为static

    阿里开发手册:因为ThreadLocal无法解决共享对象的更新问题。这个变量是针对一个线程内所有操作动向的,所以设置为静态变量,所有此类实例共享此静态变量,也就是说在类第一次被使用时装载,只分配一块存储空间,所有此类的对象(只要是这个线程内定义的)都可以操纵这个变量。

    ThreadLocal能实现了线程的数据隔离,不在于它自己本身,而在于Thread的ThreadLocalMap。所以,ThreadLocal可以只初始化一次,只分配一块存储空间就足以了,没必要作为成员变量多次被初始化。

  3. 用完记得手动remove

另外:

  1. ThreadLocal并不解决线程间共享数据的问题
  2. ThreadLocal适用于变量在线程间隔离且在方法间共享的场景
  3. ThreadLocal通过隐式的在不同线程内创建独立实例副本,避免了实例线程安全的问题
  4. 每个线程持有一个只属于自己的专属Map,并维护了ThreadLocal对象与具体实例的映射,该Map由于只被持有它的线程访问,故不存在线程安全以及锁的问题
  5. ThreadLocalMap的Entry对ThreadLocal的引用为弱引用,避免了ThreadLocal对象无法被回收的问题
  6. 都会通过expungeStaleEntry,cleanSomeSlots,replaceStaleEntry这三个方法回收键为null的Entry对象的值(即具体实例)以及Entry对象本身,从而防止内存泄露,属于安全加固的方法

11. AQS

AQS指的是AbstractQueuedSynchronizer的缩写,即抽象的队列同步器。AQS是JUC底层架构中的重点之重点。为什么加锁就能保证并发安全呢?底层就是AQS。存储未抢到锁的线程,并按照一定规则后续为其分配锁。因此:

  1. 锁,面向锁的使用者:定义了程序员和锁交互的使用层API,隐藏了实现细节,直接调用即可。
  2. 同步器,面向锁的实现者:因为锁的种类有很多,但是锁的一些基本操作是相同的。因此Java并发大神DougLee,提出统一规范并简化了锁的实现,将其抽象出来屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的公共基础部分

因此,AQS可以存储阻塞的线程,并提供一种机制为阻塞的线程调度唤醒并分配锁资源。

11.1 概述

在java.util.concurrent.locs包中关于AQS有如下三个抽象类:

  1. AbstractOwnableSynchronizer
  2. AbstractQueuedLongSynchronizer
  3. AbstractQueuedSynchronizer

其中下面的两个类均继承上面的类。AbstractQueuedSynchronizer和AbstractQueuedLongSynchronizer相当于孪生兄弟,只不过AbstractQueuedSynchronizer出现的更早,因此着重关注AbstractQueuedSynchronizer即可。

我们知道,锁资源是需要线程竞争的,那么当多线程情况下,必然出现抢不到的情况,那么这些线程该怎么办呢?销毁吗?显然是不行的,应该等待分配锁资源,此时他们就应该被放到一个等待队列中。那么如何在队列中放置这些线程呢?其实就是AQS。

AQS是用来实现锁或者其他同步器组件的公共基础部分的抽象实现,是重量级基础框架及整个JUC体系的基石,主要用于解决锁分配给“谁”的问题。

下面是AQS源码中的注释:

1
2
3
4
/*
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on
first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic {@code int} value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated {@code int} value manipulated using methods {@link #getState}, {@link #setState} and {@link #compareAndSetState} is tracked with respect to synchronization.
*/

AQS依赖于FIFO等待队列,通过一个标志state来指明该线程是否获取锁或者释放锁。AQS整体就是一个抽象的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态。

1
2
3
The wait queue is a variant of a "CLH" (Craig, Landin, and
Hagersten) lock queue. CLH locks are normally used for
spinlocks. We instead use them for blocking synchronizers by including explicit ("prev" and "next") links plus a "status" field that allow nodes to signal successors when releasing locks, and handle cancellation due to interrupts and timeouts. The status field includes bits that track whether a thread needs a signal (using LockSupport.unpark). Despite these additions, we maintain most CLH locality properties.

AQS中的CLH变体(虚拟双向队列)如下所示:

image-20220821100453415

前面所提到的一些类,比如ReentrantLock、CountDownLatch、Semaphore等底层都依赖于AQS。他们源码都有Sync内部类,该类继承了AbstractQueuedSynchronizer。部分类的继承实现关系图如下所示:

image-20220821105031692

我们知道,加锁就会导致阻塞,而有阻塞就需要排队,实现排队那必然需要队列。

11.2 体系架构

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

整体上的结构如下所示:

image-20220821104233671

AQS部分源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public abstract class AbstractQueuedSynchronizer{

// 可用于表示节点中的status取值,初始默认值为0
static final int WAITING = 1;
static final int CANCELLED = 0x80000000;
static final int COND = 2;

// 内部类,抽象的,静态的节点
abstract static class Node {
// 因为是双向队列,所以节点需要有前后指针
volatile Node prev;
volatile Node next;

Thread waiter;

// node节点的状态,即节点在队列中的状态
volatile int status;
}

// 队列中的头尾节点指针
private transient volatile Node head;
private transient volatile Node tail;

// 上面提到的state状态位
// state为0,表示所代表的锁资源空闲,可以抢;>=1表示被有人占用
// 这里的>=1,后面会涉及到可重用锁。
private volatile int state;
}

// 独占节点,表示以独占的方式等待锁
static final class ExclusiveNode extends Node { }

// 共享节点,表示以共享的模式等待锁
static final class SharedNode extends Node { }

11.3 源码分析

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。以ReentrantLock类为例,深入底层AQS。ReentrantLock类实现了Lock接口,并且该类底层有一个Sync抽象类来实现锁功能的,而Sync又继承了AQS。ReentrantLock锁默认是非公平锁。

image-20220821112738597

ReentrantLock部分源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReentrantLock {
private final Sync sync;

// Sync内部类
abstract static class Sync extends AbstractQueuedSynchronizer {}

// 非公平锁内部类
static final class NonfairSync extends Sync {}

// 公平锁内部类
static final class FairSync extends Sync {}

// 相当于传入false,即非公平锁。
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
sync.lock();
}

public void unlock() {
sync.release(1);
}

}

可以看到,lock和unlock方法底层都是调用的是Sync类中的方法。由于Sync类是抽象类,所以可分别查看NonfairSync类和FairSync类。以NonfairSync为例。

Sync类部分源码如下所示,其实Sync中并没有release方法,其实这里调用的是AQS中的release方法。initialTryLock和acquire方法也是,均是AQS中的方法,而acquire在AQS中实现了,底层调用的tryAcquire方法,仅仅抛出异常,并没有真正实现。而NonfairSync实现了这两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract boolean initialTryLock();

@ReservedStackAccess
final void lock() {
// 如果没有初步抢到,则进入acruqire()
if (!initialTryLock())
// 参数1表明抢锁成功,占用资源。这里只是再次尝试抢,并不是真正的抢到了,可看源码
acquire(1);
}

// 实现了具体细节
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractQueuedSynchronizer{
public final void acquire(int arg) {
// acquire没有初步抢到,这里再次尝试抢
if (!tryAcquire(arg))
// 如果还没有抢到,只能进入队列。
// 这里是重载了,具体实现可查看源码
acquire(null, arg, false, false, false, 0L);
}

// 未实现具体细节
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}

// 未实现具体细节
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 初步抢
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0)
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}

// 实现了具体细节
protected final boolean tryAcquire(int acquires) {
// 当前锁资源为空,设置状态,并抢占成功,设置当前线程为资源拥有者
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 抢锁失败
return false;
}
}

其实这就是模板模式的体现,AQS中仅实现了最基本的方法以及框架,比如tryAcquire的实现就是仅抛出了异常,但是acquire就实现了具体的细节。模板模式就是实现一些必备的框架,但是具体的部分细节需要用户自定义实现。类似的有Servlet中的GenericServlet和HttpServlet中的doGet等方法。

换句话说,ReentrantLock的本质框架仍然是AQS,只不过封装了lock、unlock方法,实现了tryAcquire的具体细节。

可查看FairSync源码,和NonfairSync类似,只不过多了一层hasQueuedThreads()判断,这个方法是公平锁加锁时判断等待队列中是否存在有效节点的方法。也就是说,公平锁需要判断是否前面有线程在等待,如果有,那么就加入队列等待。

  1. 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入到等待队列中;
  2. 非公平锁,不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程锁苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。

11.4 Acquire方法

前面可以看到,无论公平锁还是非公平锁,lock底层都是acquire。在JDK8中acquire分为三步【上面的源码是JDK11或15的,在JDK15中,虽然代码看起来公平锁也是直接抢了,但是它底层实现是有区别的,只是在是可重入锁或者等待队列是空的时候才抢的】:

  1. tryAcquire()

    能抢到,则抢,抢不到则排队。

  2. 调用addWaiter()

    抢锁失败,加入到等待者队列。

  3. 调用acquireQueued

    在队列中,尝试抢锁

11.5 tryAcquire方法

lock方法调用之后,先用initialTryLock尝试获取锁资源,如果失败,再通过tryAcquire尝试获取。如果还失败,那么只能入队了。

11.6 addWaiter方法

JDK8的addWaiter就是JDK15中重载的acquire方法,源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued

/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/

for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}

11.7 acquireQueued方法

队列中的线程对象(Node.waiter属性)如何被调度的呢?如何获取锁资源呢?其实还是在上面的那个方法中,只要入队一个线程,那么就把前一个节点(线程)的状态修改为waiting,表示可以唤醒,可以分配资源了。

LockSupport.park(this);表示把当前线程挂起,等待资源分配。

只要有多余的线程进入,那么就会入队,就会循环(自旋锁),循环判断首节点以及是否可抢到资源,如果抢到,该线程就唤醒,然后删除原始节点,当前节点作为头节点。并且,在循环的过程中,也会判断当前节点所表示的线程是否已经cancel取消了操作,如果取消了,则将其从双向链表中删除。

11.8 release方法

上面是线程排队进入AQS,挂起线程的流程。此时如果所需的锁资源释放掉了。也就是unlock,那么AQS队列中的线程如何分配资源呢?

前面已经知道了unlock底层是release方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void unlock() {
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
// 1-1=0
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
// 设置资源拥有者为null
setExclusiveOwnerThread(null);
// 设置状态为0,表示没有占用
setState(c);

// 表明资源空闲
return free;
}

private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);

// 唤醒线程
LockSupport.unpark(s.waiter);
}
}

源码看的不是很懂,简单说一下。当锁被占用的时候,就会出现队列,头节点为null,后续跟着很多个线程节点,均挂起。当有线程释放锁之后,队列中的线程(第二个节点)就会被唤醒执行任务,此时头结点就会不被引用,被垃圾回收掉,而第二个节点因为线程已经在执行任务了,此时节点的waiter就被置为null,作为新的头节点。

11.9 cancelAcquire方法

某个线程不想等待了,这时候就需要从队列中移除这个节点。

11.10 总结

其实可以看到,多线程加锁,本质上就是通过Lock对象来限制多线程对某块代码段的操作。如果有多个线程,此时就将多余的线程挂起,等待正在执行的线程释放锁之后,再合理地安排其他线程唤醒。

个人理解:公平锁和非公平锁,其实就是队列的首节点(线程)和现在刚刚到达的线程二者之间的抢占。但是队列中的线程,显然是无法参与抢占的。


文章作者: 浮云
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 浮云 !
  目录