第二节:多线程编程的基本需求。 JUC满足了多线程编程的各种需求,但是丰富的需求也是从简单需求开始的。
1.复习:创建线程 我还记得,当年培训班的SE部分结课作业是实现一个Socket客户端/服务端。 其实从这个角度上来说,某鸟的排课水平并不差;这个作业同时要求掌握Socket库的基本用法,还要求理解和实现BIO模型,也就是“服务端监听客户端连接,每个连接创建一个新线程”。
大致的代码如下(节省篇幅,我省略了继承/实现接口的部分,直接使用lambda表达式):1 2 3 4 5 6 7 8 9 10 11 ServerSocket sc = new ServerSocket (10086 ); while (true ){ Socket socket = serverSocket.accept(); new Thread (() -> { }).start(); }
这样的代码可能大家都很熟悉(如果觉得陌生的话,也可以改写成一个MyThread类,实现Runnable接口并重写Run方法),而且肯定会有人让我用线程池;还请暂且忍耐一下,看完这些“原始”的代码。
基本需求1:同步与线程安全(synchronized) 问题来了,假设我们要在业务逻辑里对某个东西进行操作,例如……购买商品?
我们设计一个商店类,剩余库存为2,当库存为0时显示已售空,同时让线程睡眠1秒以模拟数据库读写等操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Shop { private int count = 2 ; public void sell (String name) { System.out.println(name+"开始购买商品" ); if (count<=0 ){ System.out.println(name+"发现商品已售空" ); }else { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } count--; System.out.println(name+"购买商品完成,剩余库存:" +count); } } }
看起来是不是很合理?
对主方法进行修改,专注于线程而不是Socket:
1 2 3 4 5 6 Shop s = new Shop ();for (int i = 0 ; i < 5 ; i++) { int finalI = i; new Thread (() -> s.sell("顾客" + finalI)).start(); }
我们同时建立5个连接,观察一下控制台,结果是什么?
1 2 3 4 5 6 7 8 9 10 线程0开始购买商品 线程3开始购买商品 线程1开始购买商品 线程2开始购买商品 线程4开始购买商品 线程0购买商品完成,剩余库存:0 线程3购买商品完成,剩余库存:0 线程2购买商品完成,剩余库存:-3 线程1购买商品完成,剩余库存:-3 线程4购买商品完成,剩余库存:-3
这……这是一场灾难!5个买家全都购买成功,而库存变成了-3!
(题外话:在高并发秒杀环境中, count 不再是简单的成员变量,而是缓存/数据库的某个值。万一并发处理错误,导致 count 瞬间变成了负数,这时候如果以 count==0 作为判断条件,会导致秒杀无法停止,所以一定要将判断条件改为小于区间。)
这就是一个典型的线程不安全的类。
定义:线程安全:在单个/多个线程环境下都能得到预期运行结果。
究其原因,是由于线程受到操作系统的调度,我们无法直接控制线程何时运行,即使是调节优先级,得到的也只是影响,而不是保证(可以试试把五个线程的优先级排一下看看结果)!
幸运的是,Java语言提供了同步关键字 synchronized ,它是Java对多种锁的封装,根据使用情况不同有不同的表现。
我们把它加到 sell 方法上……
1 2 3 4 5 6 7 8 9 10 线程0开始购买商品 线程0购买商品完成,剩余库存:1 线程3开始购买商品 线程3购买商品完成,剩余库存:0 线程4开始购买商品 线程4发现商品已售空 线程2开始购买商品 线程2发现商品已售空 线程1开始购买商品 线程1发现商品已售空
结论:在将 synchronize 关键字加到某个方法上后,我们可以确保在一个线程进入 这个对象 的这个方法之后,就不会有另一个线程也进入,避免了超售的情况。
这背后实际是线程获取了 这个对象 的锁,在执行完方法后,自动释放了 这个对象 的锁,是不是很智能?
与此相同的用法还有: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void sell (String name) { synchronized (this ){ System.out.println(name+"开始购买商品" ); if (count<=0 ){ System.out.println(name+"发现商品已售空" ); }else { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } count--; System.out.println(name+"购买商品完成,剩余库存:" +count); } } }
在这段代码里, this 关键字指向了当前的对象,与在方法上加 synchronize 关键字作用是一样的。
所以明确一个概念: synchronize 代码块锁住的不是代码块,而是 synchronize 后面圆括号中的对象!
定义:对象锁: 让同一个对象的某个方法无法被多个线程并发执行的机制。
题外话:
为什么我要强调同一对象呢?
让我修改一下代码,每次创建线程都创建一个 Shop 对象……
1 2 3 4 5 6 for (int i = 0 ; i < 5 ; i++) { int finalI = i; new Thread (() -> new Shop ().sell("顾客" + finalI)).start(); }
然后把Shop对象的库存属性改为静态变量,以使所有Shop对象可以共享它。
1 2 3 4 class Shop { private static int count = 2 ; ... }
点击运行,灾难又出现了!
现在的编码需求,变成了让 所有Shop类的对象 的 sell() 方法都无法被同时执行。
继续修改 sell() 方法,既然 count 已经是静态变量,那么我们为什么不把 sell() 方法也改成静态方法呢?
这时候IDE报了一个错,原来在静态方法中不能用 this 关键字(对象都不一定有, this 指定谁去?),那么把它改为 Shop.Class ,类对象就不是对象了?
现在的 sell() 方法:
1 2 3 4 5 6 public static void sell (String name) { synchronized (Shop.class){ .... } } }
当然,我们也可以直接把 synchronize 关键字丢回到静态方法上:
1 2 3 public synchronized static void sell (String name) {... }
定义:类锁: 让同一个类 多个实例对象 的某个方法,都 无法被多个线程并发执行的机制。
基本需求2:线程间通信(Object.wait()/Object.notify()) 刚刚我们实现了一家有序出售物品的商店,但是一家商店不能只能出售物品,卖光了怎么办呢?进货。
大致的过程是,当某个顾客线程发现 count<=0 时,挂起所有“顾客”线程,并且通知一个线程去进货,等待进货完成后给所有挂起的“顾客”线程发送通知。
首先我们要在主方法中,单独开启一个进货线程(这个写法是lambda表达式中的方法引用,由于该线程的run方法只执行这一个无参方法,被IDE检测到了提示替换):
1 new Thread (s::purchase).start();
仅仅 synchronized 关键字已经不够用,我们给 Shop 类增加一个字段:
1 private final Object myLock = new Object ();
这个字段没有其他意义,仅仅作为一把被别人持有的对象锁而存在。
和上一章不同, 这把对象锁的目的,不再是让这个对象的方法无法被并发执行,而是让其他线程持有它,以便唤醒或挂起这些线程。
注意:
虽然我这里用了 “唤醒”和“挂起”,但我指的并不是 Thread.suspend() 和 Thread.resume() 。
这一对被废弃了十几年的方法,是属于 Thread 类的,调用 suspend() 在挂起时并不释放这个线程持有的锁,因此极其容易引发死锁;
而 Object.wait() 会让所有持有这个对象的对象锁的线程阻塞,同时也停止持有这个对象的对象锁。
当调用 Object.notify() 时,会随机取出一个因为 wait() 方法阻塞的线程,让它继续运行的同时重新持有对象锁;
而调用 Object.notifyAll() 时,会让所有之前因为 wait() 方法阻塞的线程解除阻塞,但是注意:只有那个重新持有对象锁的线程才能继续运行。
明白了 wait() 和 notify() 这一对方法后,我们来着手改写 sell() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ... while (count <= 0 ) { try { System.out.println(LocalTime.now() + name + "要求了进货" ); synchronized (myLock) { myLock.notify(); } System.out.println(LocalTime.now() + name + "进货等待中" ); this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ...
在现在的 sell() 方法中,首先把库存不足的判断由 if 改为 while ,这样每个顾客在收到进货完成的通知后,都会重复检查一次库存。
接下来改写条件块的内容:
接下来是进货方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void purchase () { while (true ) { synchronized (myLock) { try { System.out.println(LocalTime.now()+"店家等待进货通知" ); myLock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(LocalTime.now()+"店家开始进货" ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } count += 5 ; synchronized (this ) { this .notifyAll(); } System.out.println(LocalTime.now()+"店家进货完成" ); } }
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 17:12:32.563顾客0开始购买商品 17:12:32.563店家等待进货通知 17:12:33.564顾客0购买商品完成,剩余库存:1 17:12:33.564顾客4开始购买商品 17:12:34.564顾客4购买商品完成,剩余库存:0 17:12:34.564顾客3开始购买商品 17:12:34.564顾客3要求了进货 17:12:34.564顾客3进货等待中 17:12:34.564店家开始进货 17:12:34.565顾客2开始购买商品 17:12:34.565顾客2要求了进货 17:12:34.565顾客2进货等待中 17:12:34.565顾客1开始购买商品 17:12:34.565顾客1要求了进货 17:12:34.565顾客1进货等待中 17:12:36.565店家进货完成 17:12:36.565店家等待进货通知 17:12:37.566顾客1购买商品完成,剩余库存:4 17:12:38.566顾客2购买商品完成,剩余库存:3 17:12:39.567顾客3购买商品完成,剩余库存:2
我们圆满的完成了需求,尽管这个程序会有一个一直等待是否去进货的店家,所以不会直接结束。
基本需求3:线程间通信2(Thread.join()) 还是刚才的问题,我们让店家去进货,但是我们不希望多加一个对象,然后折腾当前对象/myLock这两个对象的锁。
我们把在主方法中创建进货线程、并且循环阻塞等待通知,改成在 sell() 方法中创建进货线程、并调用 join() 方法。
顾名思义, join() 方法表示立即阻塞当前线程,并且让被调用 join() 方法的线程“参与”到程序执行中,在被调用 join() 方法的线程执行完后,才恢复之前阻塞的当前进程的运行。
注意, join() 方法必须在 start() 方法调用后调用;如果 join() 方法和 start() 方法中有其他代码, join() 方法会优先执行。
同时我们去掉 purchase() 方法中关于对象锁的语句:
sell() 方法:
1 2 3 4 5 6 7 8 9 10 11 while (count <= 0 ) { try { System.out.println(LocalTime.now() + name + "要求了进货" ); Thread a = new Thread (this ::purchase); a.start(); a.join(); System.out.println(LocalTime.now() + name + "进货等待中" ); } catch (InterruptedException e) { e.printStackTrace(); } }
purchase() 方法现在只剩下了操作 count 以及一些提示:
1 2 3 4 5 6 7 8 9 10 public void purchase () { System.out.println(LocalTime.now() + "店家开始进货" ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } count += 5 ; System.out.println(LocalTime.now() + "店家进货完成" ); }
运行,这次程序执行完自动结束了,因为不再有一个后台持续阻塞的线程了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 17:54:09.025顾客1开始购买商品 17:54:10.027顾客1购买商品完成,剩余库存:1 17:54:10.027顾客4开始购买商品 17:54:11.027顾客4购买商品完成,剩余库存:0 17:54:11.027顾客3开始购买商品 17:54:11.028顾客3要求了进货 17:54:11.031店家开始进货 17:54:13.032店家进货完成 17:54:13.032顾客3进货等待中 17:54:14.033顾客3购买商品完成,剩余库存:4 17:54:14.033顾客2开始购买商品 17:54:15.033顾客2购买商品完成,剩余库存:3 17:54:15.034顾客0开始购买商品 17:54:16.034顾客0购买商品完成,剩余库存:2
对Java库源码有过分析的可能会知道, join() 方法内部其实是由 Object.wait()/Object.notifyAll() 实现的!
附:本章完整代码: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package top.mothership;import java.time.LocalTime;public class Main { public static void main (String[] args) { Shop s = new Shop (); for (int i = 0 ; i < 5 ; i++) { int finalI = i; new Thread (() -> s.sell("顾客" + finalI)).start(); } } } class Shop { private int count = 2 ; public synchronized void sell (String name) { System.out.println(LocalTime.now() + name + "开始购买商品" ); if (name.contains("2" ) || name.contains("0" )) { Thread.yield (); } while (count <= 0 ) { try { System.out.println(LocalTime.now() + name + "要求了进货" ); Thread a = new Thread (this ::purchase); a.start(); a.join(); System.out.println(LocalTime.now() + name + "进货等待中" ); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } count--; System.out.println(LocalTime.now() + name + "购买商品完成,剩余库存:" + count); } public void purchase () { System.out.println(LocalTime.now() + "店家开始进货" ); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } count += 5 ; System.out.println(LocalTime.now() + "店家进货完成" ); } }