走进Java并发编程02

第二节:多线程编程的基本需求。

JUC满足了多线程编程的各种需求,但是丰富的需求也是从简单需求开始的。


1.复习:创建线程

我还记得,当年培训班的SE部分结课作业是实现一个Socket客户端/服务端。
其实从这个角度上来说,某鸟的排课水平并不差;这个作业同时要求掌握Socket库的基本用法,还要求理解和实现BIO模型,也就是“服务端监听客户端连接,每个连接创建一个新线程”。

大致的代码如下(节省篇幅,我省略了继承/实现接口的部分,直接使用lambda表达式):
1
2
3
4
5
6
7
8
9
10
11
//略去主类、主方法
//监听10086端口
ServerSocket sc = new ServerSocket(10086);
while(true){
//循环监听
Socket socket = serverSocket.accept();
//为每个连接创建新线程
new Thread(() -> {
//具体的操作,相当于重写run()方法
}).start();
}

这样的代码可能大家都很熟悉(如果觉得陌生的话,也可以改写成一个MyThread类,实现Runnable接口并重写Run方法),而且肯定会有人让我用线程池;还请暂且忍耐一下,看完这些“原始”的代码。

基本需求1:同步与线程安全(synchronized)

问题来了,假设我们要在业务逻辑里对某个东西进行操作,例如……购买商品?

我们设计一个商店类,剩余库存为2,当库存为0时显示已售空,同时让线程睡眠1秒以模拟数据库读写等操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Shop{
private int count = 2;
public void sell(String name){
System.out.println(name+"开始购买商品");
if(count<=0){
System.out.println(name+"发现商品已售空");
}else{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
System.out.println(name+"购买商品完成,剩余库存:"+count);
}
}
}

看起来是不是很合理?

对主方法进行修改,专注于线程而不是Socket:

1
2
3
4
5
6
Shop s = new Shop();
for (int i = 0; i < 5; i++) {
//lambda表达式要求表达式中只有静态变量
int finalI = i;
new Thread(() -> s.sell("顾客" + finalI)).start();
}

我们同时建立5个连接,观察一下控制台,结果是什么?

1
2
3
4
5
6
7
8
9
10
线程0开始购买商品
线程3开始购买商品
线程1开始购买商品
线程2开始购买商品
线程4开始购买商品
线程0购买商品完成,剩余库存:0
线程3购买商品完成,剩余库存:0
线程2购买商品完成,剩余库存:-3
线程1购买商品完成,剩余库存:-3
线程4购买商品完成,剩余库存:-3

这……这是一场灾难!5个买家全都购买成功,而库存变成了-3!

(题外话:在高并发秒杀环境中, count 不再是简单的成员变量,而是缓存/数据库的某个值。万一并发处理错误,导致 count 瞬间变成了负数,这时候如果以 count==0 作为判断条件,会导致秒杀无法停止,所以一定要将判断条件改为小于区间。)

这就是一个典型的线程不安全的类。

定义:线程安全:在单个/多个线程环境下都能得到预期运行结果。

究其原因,是由于线程受到操作系统的调度,我们无法直接控制线程何时运行,即使是调节优先级,得到的也只是影响,而不是保证(可以试试把五个线程的优先级排一下看看结果)!

幸运的是,Java语言提供了同步关键字 synchronized ,它是Java对多种锁的封装,根据使用情况不同有不同的表现。

我们把它加到 sell 方法上……

1
2
3
4
5
6
7
8
9
10
线程0开始购买商品
线程0购买商品完成,剩余库存:1
线程3开始购买商品
线程3购买商品完成,剩余库存:0
线程4开始购买商品
线程4发现商品已售空
线程2开始购买商品
线程2发现商品已售空
线程1开始购买商品
线程1发现商品已售空

结论:在将 synchronize 关键字加到某个方法上后,我们可以确保在一个线程进入 这个对象 的这个方法之后,就不会有另一个线程也进入,避免了超售的情况。

这背后实际是线程获取了 这个对象 的锁,在执行完方法后,自动释放了 这个对象 的锁,是不是很智能?

与此相同的用法还有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 public void sell(String name){
synchronized (this){
System.out.println(name+"开始购买商品");
if(count<=0){
System.out.println(name+"发现商品已售空");
}else{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
System.out.println(name+"购买商品完成,剩余库存:"+count);
}
}
}

在这段代码里, this 关键字指向了当前的对象,与在方法上加 synchronize 关键字作用是一样的。

所以明确一个概念: synchronize 代码块锁住的不是代码块,而是 synchronize 后面圆括号中的对象!

定义:对象锁: 让同一个对象的某个方法无法被多个线程并发执行的机制。


题外话:

为什么我要强调同一对象呢?

让我修改一下代码,每次创建线程都创建一个 Shop 对象……

1
2
3
4
5
6
//主方法、主类
for (int i = 0; i < 5; i++) {
//lambda表达式要求表达式中只有静态变量
int finalI = i;
new Thread(() -> new Shop().sell("顾客" + finalI)).start();
}

然后把Shop对象的库存属性改为静态变量,以使所有Shop对象可以共享它。

1
2
3
4
class Shop{
private static int count = 2;
...
}

点击运行,灾难又出现了!

现在的编码需求,变成了让 所有Shop类的对象sell() 方法都无法被同时执行。

继续修改 sell() 方法,既然 count 已经是静态变量,那么我们为什么不把 sell() 方法也改成静态方法呢?

这时候IDE报了一个错,原来在静态方法中不能用 this 关键字(对象都不一定有, this 指定谁去?),那么把它改为 Shop.Class ,类对象就不是对象了?

现在的 sell() 方法:

1
2
3
4
5
6
public static void sell(String name){
synchronized (Shop.class){
....
}
}
}

当然,我们也可以直接把 synchronize 关键字丢回到静态方法上:

1
2
3
public synchronized static void sell(String name){
...
}

定义:类锁: 让同一个类 多个实例对象 的某个方法,都 无法被多个线程并发执行的机制。


基本需求2:线程间通信(Object.wait()/Object.notify())

刚刚我们实现了一家有序出售物品的商店,但是一家商店不能只能出售物品,卖光了怎么办呢?进货。

大致的过程是,当某个顾客线程发现 count<=0 时,挂起所有“顾客”线程,并且通知一个线程去进货,等待进货完成后给所有挂起的“顾客”线程发送通知。

首先我们要在主方法中,单独开启一个进货线程(这个写法是lambda表达式中的方法引用,由于该线程的run方法只执行这一个无参方法,被IDE检测到了提示替换):

1
new Thread(s::purchase).start();

仅仅 synchronized 关键字已经不够用,我们给 Shop 类增加一个字段:

1
private final Object myLock = new Object();

这个字段没有其他意义,仅仅作为一把被别人持有的对象锁而存在。

和上一章不同, 这把对象锁的目的,不再是让这个对象的方法无法被并发执行,而是让其他线程持有它,以便唤醒或挂起这些线程。

注意:

虽然我这里用了 “唤醒”和“挂起”,但我指的并不是 Thread.suspend()Thread.resume()

这一对被废弃了十几年的方法,是属于 Thread 类的,调用 suspend() 在挂起时并不释放这个线程持有的锁,因此极其容易引发死锁;

Object.wait() 会让所有持有这个对象的对象锁的线程阻塞,同时也停止持有这个对象的对象锁。

当调用 Object.notify() 时,会随机取出一个因为 wait() 方法阻塞的线程,让它继续运行的同时重新持有对象锁;

而调用 Object.notifyAll() 时,会让所有之前因为 wait() 方法阻塞的线程解除阻塞,但是注意:只有那个重新持有对象锁的线程才能继续运行。

明白了 wait()notify() 这一对方法后,我们来着手改写 sell() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
while (count <= 0) {
try {
System.out.println(LocalTime.now() + name + "要求了进货");
synchronized (myLock) {
myLock.notify();
}
System.out.println(LocalTime.now() + name + "进货等待中");
this.wait();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
...

在现在的 sell() 方法中,首先把库存不足的判断由 if 改为 while ,这样每个顾客在收到进货完成的通知后,都会重复检查一次库存。

接下来改写条件块的内容:

  • 尝试获取myLock的对象锁;

  • 在其他获取了myLock对象锁,并且被阻塞的线程中,选一个恢复运行(在主方法中实际我们只创建了一个这样的线程,因此这里 notify()notifyAll() 没有什么区别 );

  • 将所有持有当前对象的对象锁的线程阻塞。

接下来是进货方法:

  • 当然进货方法要写死循环,一旦被 sell() 方法恢复运行后,能再次阻塞,等待下一次需要进货的时候;

  • 获取myLock的对象锁,并且开始阻塞;

  • 在被 sell() 方法恢复运行后,将库存+5,然后将所有获取了当前对象的对象锁,并且被阻塞的线程恢复运行(招呼其他顾客继续购物)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 public void purchase() {
while (true) {
synchronized (myLock) {
try {
System.out.println(LocalTime.now()+"店家等待进货通知");
myLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(LocalTime.now()+"店家开始进货");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count += 5;
synchronized (this) {
this.notifyAll();
}
System.out.println(LocalTime.now()+"店家进货完成");
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
17:12:32.563顾客0开始购买商品
17:12:32.563店家等待进货通知
17:12:33.564顾客0购买商品完成,剩余库存:1
17:12:33.564顾客4开始购买商品
17:12:34.564顾客4购买商品完成,剩余库存:0
17:12:34.564顾客3开始购买商品
17:12:34.564顾客3要求了进货
17:12:34.564顾客3进货等待中
17:12:34.564店家开始进货
17:12:34.565顾客2开始购买商品
17:12:34.565顾客2要求了进货
17:12:34.565顾客2进货等待中
17:12:34.565顾客1开始购买商品
17:12:34.565顾客1要求了进货
17:12:34.565顾客1进货等待中
17:12:36.565店家进货完成
17:12:36.565店家等待进货通知
17:12:37.566顾客1购买商品完成,剩余库存:4
17:12:38.566顾客2购买商品完成,剩余库存:3
17:12:39.567顾客3购买商品完成,剩余库存:2

我们圆满的完成了需求,尽管这个程序会有一个一直等待是否去进货的店家,所以不会直接结束。

基本需求3:线程间通信2(Thread.join())

还是刚才的问题,我们让店家去进货,但是我们不希望多加一个对象,然后折腾当前对象/myLock这两个对象的锁。

我们把在主方法中创建进货线程、并且循环阻塞等待通知,改成在 sell() 方法中创建进货线程、并调用 join() 方法。

顾名思义, join() 方法表示立即阻塞当前线程,并且让被调用 join() 方法的线程“参与”到程序执行中,在被调用 join() 方法的线程执行完后,才恢复之前阻塞的当前进程的运行。

注意, join() 方法必须在 start() 方法调用后调用;如果 join() 方法和 start() 方法中有其他代码, join() 方法会优先执行。

同时我们去掉 purchase() 方法中关于对象锁的语句:

sell() 方法:

1
2
3
4
5
6
7
8
9
10
11
while (count <= 0) {
try {
System.out.println(LocalTime.now() + name + "要求了进货");
Thread a = new Thread(this::purchase);
a.start();
a.join();
System.out.println(LocalTime.now() + name + "进货等待中");
} catch (InterruptedException e) {
e.printStackTrace();
}
}

purchase() 方法现在只剩下了操作 count 以及一些提示:

1
2
3
4
5
6
7
8
9
10
public void purchase() {
System.out.println(LocalTime.now() + "店家开始进货");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count += 5;
System.out.println(LocalTime.now() + "店家进货完成");
}

运行,这次程序执行完自动结束了,因为不再有一个后台持续阻塞的线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
17:54:09.025顾客1开始购买商品
17:54:10.027顾客1购买商品完成,剩余库存:1
17:54:10.027顾客4开始购买商品
17:54:11.027顾客4购买商品完成,剩余库存:0
17:54:11.027顾客3开始购买商品
17:54:11.028顾客3要求了进货
17:54:11.031店家开始进货
17:54:13.032店家进货完成
17:54:13.032顾客3进货等待中
17:54:14.033顾客3购买商品完成,剩余库存:4
17:54:14.033顾客2开始购买商品
17:54:15.033顾客2购买商品完成,剩余库存:3
17:54:15.034顾客0开始购买商品
17:54:16.034顾客0购买商品完成,剩余库存:2

对Java库源码有过分析的可能会知道, join() 方法内部其实是由 Object.wait()/Object.notifyAll() 实现的!

附:本章完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package top.mothership;

import java.time.LocalTime;

public class Main {
public static void main(String[] args) {
Shop s = new Shop();
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> s.sell("顾客" + finalI)).start();
}
}

}

class Shop {
private int count = 2;


public synchronized void sell(String name) {
System.out.println(LocalTime.now() + name + "开始购买商品");
if (name.contains("2") || name.contains("0")) {
Thread.yield();
}
while (count <= 0) {
try {
System.out.println(LocalTime.now() + name + "要求了进货");
Thread a = new Thread(this::purchase);
a.start();
a.join();
System.out.println(LocalTime.now() + name + "进货等待中");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
System.out.println(LocalTime.now() + name + "购买商品完成,剩余库存:" + count);
}

public void purchase() {
System.out.println(LocalTime.now() + "店家开始进货");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
count += 5;

System.out.println(LocalTime.now() + "店家进货完成");
}
}

走进Java并发编程01

前言

本系列文章作为学习笔记,记录了博主向并发包进攻的过程。

在此,我假设本文读者已经掌握了Java的基本编码,如果是大学生,至少Java的课程要通过;如果是培训学员,至少需要学完SE部分。虽然影响不大,但是如果对Spring,或者至少Java Web有一定了解和编码经验,应该会对阅读有所帮助(我可能会举一些网站相关的例子)。

本文夹杂大量个人理解,错误不可避免,非常欢迎评论指正。

正文

第一章 :假如没有juc……

2004年9月30日18:00PM,J2SE1.5发布,成为Java语言发展史上的又一里程碑。为了表示该版本的重要性,J2SE1.5更名为Java SE 5.0。——via 维基百科

前置知识:JSR——Java Specification Requests的缩写,意思是Java 规范提案。是指向JCP(Java Community Process)提出新增一个标准化技术规范的正式请求。

JSR166完成(同样也是J2SE1.5发布)之后,Java程序员发现,除了Thread类、Runnable接口外,Java引入了大量的API,极大的丰富了Java在多线程编程的能力。

让我们从已掌握的知识开始,先忘掉java.util.concurrent,复习一下基础的概念:线程/进程/并发/并行,还有我们的老朋友Thread/Runnable/synchronized/volatile。


第一节:进程与线程。

在提到线程和进程之前,推荐一篇有趣的文章:
《我是一个CPU:这个世界慢!死!了! 》
(阅读前需要对计算机的基本组成有一定了解,至少要明白SSD、CPU的L1L2缓存是什么。)

如果将一颗2.6GHz频率,一个逻辑核心的CPU(这个性能在现在可以说是很低了,主流的i7-8700K有6个物理核心,通过Intel的超线程技术让系统认为有12个逻辑核心,最高睿频4.7GHz)执行每个指令的时间放大到一秒,那么从内存中读取1MB 需要7.5天,从SSD上的随机读取需要用4.5天,从SSD读取1MB需要一个月,而HDD的磁盘寻址需要10个月,连续读取1MB需要20个月!

我们设想一个场景。你打开一台一个逻辑核心的电脑,登录QQ,同时打开IDEA开始编写代码,在你眼里QQ和IDEA是同时运行着的程序。但是在CPU的眼里,QQ和IDEA需要CPU来执行的操作其实是轮流着来的,只是它切换的速度太快,你感受不到!

CPU其实一直不断的在加载QQ的上下文(内存、显卡、硬盘等资源)→执行QQ→保存QQ的上下文→加载IDEA的上下文…

好,我们引申出概念:

1.什么是进程?

“计算机中已运行程序的实体,分配资源的基本单位。”

我们编写的程序,只是指令(比如做加法)、数据(比如一个整数1)和组织形式(比如某个数组,是按顺序排的一系列元素)的描述,当我们下达运行它的命令时,才会产生进程。操作系统按进程为单位分配资源,这些资源包括一片内存、操作系统描述符(所谓文件句柄/文件描述符),安全特性以及CPU状态(视是否在运行存储在寄存器/内存中)等。

总结:进程是一个单位,我们下达运行程序命令时,就可以向操作系统申请领取“一个进程的”大礼包,包含了一段时间内的一部分计算机资源。

2.什么是线程?

“操作系统调度的最小单位”。
在此不对OS这门课做展开,不讨论内核态用户态线程。

我们再设想一个场景。你从操作系统领取了大礼包,它包含了一部分的时间(我感觉生命在流逝)和一部分的计算机资源。但是现在你的代码里需要打开硬盘上的某个文件,或者更过分的是等待某个网络请求(还记得150ms在CPU的眼里是12.5年吗?)……

这是一种严重的浪费!在这个礼包、甚至之后的好多礼包给的时间内,你创建的进程什么都没有做,白白的浪费了系统资源,时间一到CPU就切换到了别的程序,这极大的影响了程序的执行效率。

这时候如果我能把进程拆分为一个个部分,由它们共享这些资源;当第一个部分需要等待某个耗时较长的操作时,其他部分也可以在这份时间里利用这份资源,岂不美哉?

总结:线程是一个更小的单位,“一个进程”的资源,可以由多个“线程”共享,免得出现资源浪费。

3.什么是并发,什么是并行?

Erlang之父简单的介绍了它们的区别:

串行是一个队列一台咖啡机,如果有人匹到了一把Dota,过了一个小时打完回来接咖啡,后面的人都得必须等着;
并发是两个队列用一台咖啡机,咖啡机在处理两个队列的人的状态中不断切换,在逻辑上这台咖啡机可以同时处理两个队列;
并行则是两个队列用两台咖啡机,有两台咖啡机在同一个时间点处理两个队列,注意,是真正意义上、物理上的同时,这也是多逻辑核心CPU的模型!