进程与线程
进程
- 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
- 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
- 进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
线程
- 一个进程之内可以分为一到多个线程。
- 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行
- Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作为线程的容器
两者对比
- 进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
- 进程拥有共享的资源,如内存空间等,供其内部的线程共享
- 进程间通信较为复杂
- 同一台计算机的进程通信称为 IPC(Inter-process communication)
- 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP
- 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
- 线程更轻量,线程上下文切换成本一般上要比进程上下文切换低
并发与并行
单核 cpu 下,线程实际还是 串行执行的。操作系统中有一个组件叫做任务调度器,将 cpu 的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 cpu 在线程间(时间片很短)的切换非常快,人类感觉是 同时运行的 。总结为一句话就是: 微观串行,宏观并行 ,一般会将这种 线程轮流使用 CPU 的做法称为并发
cpu | 时间片1 | 时间片2 | 时间片3 |
---|---|---|---|
core | 线程1 | 线程2 | 线程3 |
多核 cpu下,每个 核(core)都可以调度运行线程,这时候线程可以是并行的。
cpu | 时间片1 | 时间片2 | 时间片3 |
---|---|---|---|
core | 线程1 | 线程2 | 线程3 |
core | 线程2 | 线程3 | 线程1 |
- 并发(concurrent)是同一时间应对(dealing with)多件事情的能力
- 并行(parallel)是同一时间动手做(doing)多件事情的能力
同步和异步
以调用方角度来讲,如果需要等待结果返回,才能继续运行就是同步,不需要等待结果返回,就能继续运行就是异步
多核cpu才能够提高效率,单核仍然是轮流执行,还多了线程切换的时间
- 单核 cpu 下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用cpu ,不至于一个线程总占用 cpu,别的线程没法干活
- 多核 cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的,有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任务都能拆分(参考后文的【阿姆达尔定律】)也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
- IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化
Java线程
本章内容
- 创建和运行线程
- 查看线程
- 线程 API
- 线程状态
创建核运行线程
// 最常规的方式,不推荐
new Thread("thread1"){
@Override
public void run() {
System.out.println("我是另外一个线程");
}
}.start();
System.out.println("你好啊啊哈哈哈Z");
TimeUnit.SECONDS.sleep(1);
System.out.println("啊哈哈哈");
//线程和任务分开,更加灵活
Runnable runable = ()->log.info("我是b线程");
new Thread(runable,"t2").start();
log.info("我是主线程");
FutureTask<String> futureTask = new FutureTask<>(()->{
log.info("我是另外一个线程");
TimeUnit.SECONDS.sleep(3);
return "啊哈哈哈";
});
new Thread(futureTask,"t1").start();
log.info("我是主线程");
// 间接实现了future接口,所以可以
log.info(futureTask.get());
线程上下文切换
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
切换的原因
- 线程的 cpu 时间片用完
- 垃圾回收
- 有更高优先级的线程需要运行
- 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 - 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
- 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等
- Context Switch 频繁发生会影响性能
相关方法解析
run和start
- 直接调用 run 是在主线程中执行了 run,没有启动新的线程
- 使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码
sleep和yield
- 调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞),此时任务调度器不会分配时间片给该线程
- 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
- 睡眠结束后的线程未必会立刻得到执行
- 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
yield意思为礼让一下 - 调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行线程,此时任务调度器可以分配时间片给该线程
- 具体的实现依赖于操作系统的任务调度器
可以通过thread.setPriority(Thread.MAX_PRIORITY)
设置线程优先级,在cpu繁忙的时候,设置了优先级会有更大的机会优先执行,空闲的时候没有什么作用
Runnable task1 = () -> {
int count = 0;
for (; ; ) {
System.out.println("---->1 " + count++);
}
};
Runnable task2 = () -> {
int count = 0;
for (; ; ) {
// Thread.yield();
System.out.println(" ---->2 " + count++);
}
};
Thread t1 = new Thread(task1, "t1");
Thread t2 = new Thread(task2, "t2");
// t1.setPriority(Thread.MIN_PRIORITY);
// t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
sleep可以避免cpu空转导致cpu被占满
join方法详解
static int r = 10;
@Test
public void test() throws InterruptedException {
Thread thread = new Thread(() -> {
log.info("线程开始更改r变量");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
r = 100;
});
thread.start();
thread.join();
log.info("r的值为{}", r);
}
如下代码,如果不实用join,此时r的直还没有更改,显示的还是更改前的数值。
join
的作用为等待某个线程运行结束
多个join情况
@Test
public void test() throws InterruptedException {
Thread thread = new Thread(() -> {
try {
log.info("t1开始");
TimeUnit.SECONDS.sleep(1);
log.info("t1结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
Thread thread1 = new Thread(() -> {
try {
log.info("t2开始");
TimeUnit.SECONDS.sleep(2);
log.info("t2结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2");
log.info("主线程开始");
thread.start();
thread1.start();
thread.join();
thread1.join();
log.info("主线程结束");
}
相关结果
只需要等待两秒
如果颠倒两个join,示意图如下
有时效的join
Thread thread = new Thread(() -> {
try {
log.info("t1开始");
TimeUnit.SECONDS.sleep(2);
log.info("t1结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
log.info("主线程开始");
thread.start();
thread.join(1);
log.info("主线程结束");
结果如下
主线程结束那么就结束了。如果没等够结束继续向下运行。
interrupt 方法详解
sleep,join,wait(后面介绍)会使该线程处于阻塞状态,处于改状态的线程cpu将不会分配时间片给该线程,这也是sleep可以降低cpu的使用率的原因。处于阻塞状态下的线程可以被打断,此时会抛出异常。打断处于阻塞状态下的线程,此时不会处理打断状态,打断正常运行下的线程,打断标记会置为真,可以用来优雅的停止线程
@Test
public void test() {
Thread thread = new Thread(() -> {
sleep(2);
},"t1");
log.info("主线程开始");
thread.start();
sleep(1);
thread.interrupt();
log.info(thread.isInterrupted()+"");
log.info("主线程结束");
}
需要在线程中自己处理打断的状态,代码如下
两阶段终止模式(在t1线程中优雅的停止t2线程)
Thread thread = new Thread(() -> {
while (true) {
try {
if (Thread.currentThread().isInterrupted()){
log.info("我被打断了(▰˘︹˘▰)");
break;
}
log.info("执行监控");
sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
},"t1");
thread.start();
sleep(5);
log.info("主线程打断监控线程");
thread.interrupt();
sleep(3);
log.info("主线程结束");
@Test
public void test() throws InterruptedException {
// 处于这个状态也是阻塞
// 当被打算后打断标志为真
Thread thread = new Thread(()->{
log.info("开始打断");
LockSupport.park();
log.info("打断标志{}",Thread.currentThread().isInterrupted());
// 当打断标志为真的时候再次执行park时效,打断标志为加的时候才有效,
// Thread.interrupted()可以清除打断标志
LockSupport.park();
log.info("啦啦啦");
});
thread.start();
sleep(2);
thread.interrupt();
sleep(1);
}
守护线程
我在本地实验没有达到这个效果。main线程结束后就没有下文了。
线程的状态
- NEW 线程刚被创建,但是还没有调用 start() 方法
- RUNNABLE 当调用了 start() 方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了 操作系统 层面的
- 【可运行状态】、【运行状态】和【阻塞状态】(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)
- BLOCKED , WAITING , TIMED_WAITING 都是 Java API 层面对【阻塞状态】的细分,后面会在状态转换一节详述
- TERMINATED 当线程代码运行结束
Thread t1 = new Thread("t1") {
@Override
public void run() {
log.debug("running...");
}
};
Thread t2 = new Thread("t2") {
@Override
public void run() {
while(true) { // runnable
}
}
};
t2.start();
Thread t3 = new Thread("t3") {
@Override
public void run() {
log.debug("running...");
}
};
t3.start();
Thread t4 = new Thread("t4") {
@Override
public void run() {
synchronized (TestState.class) {
try {
Thread.sleep(1000000); // timed_waiting
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t4.start();
Thread t5 = new Thread("t5") {
@Override
public void run() {
try {
t2.join(); // waiting
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t5.start();
Thread t6 = new Thread("t6") {
@Override
public void run() {
synchronized (TestState.class) { // blocked
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t6.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("t1 state {}", t1.getState());
log.debug("t2 state {}", t2.getState());
log.debug("t3 state {}", t3.getState());
log.debug("t4 state {}", t4.getState());
log.debug("t5 state {}", t5.getState());
log.debug("t6 state {}", t6.getState());
System.in.read();
线程的6种状态代码示例如上
多线程统筹的事例
Thread t1 = new Thread(()->{
log.info("洗水壶");
sleep(1);
log.info("烧开水");
sleep(15);
},"t1");
Thread t2 = new Thread(()->{
log.info("洗茶壶");
sleep(1);
log.info("洗茶杯");
sleep(2);
log.info("拿茶叶");
sleep(1);
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("泡茶");
},"t2");
t1.start();
t2.start();
需要花费16秒
共享模型之管程
本章内容
- 共享问题
- synchronized
- 线程安全分析
- Monitor
- wait/notify
- 线程状态转换
- 活跃性
- Lock
多线程操作共享资源此时会有线程安全的问题
Thread t1 = new Thread(()->{
for (int i = 0; i < 5000; i++) {
n++;
}
},"t1");
Thread t2 = new Thread(()->{
for (int i = 0; i < 500; i++) {
n--;
}
},"t2");
t1.start();
t2.start();
t2.join();
t1.join();
log.info("最终n的结果为{}",n);
发现最终n的结果不是0
在多个线程对共享资源读写操作时发生指令交错,就会出现问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区
既有读又有写
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
为了避免临界区的竞态条件发生,有多种手段可以达到目的。
- 阻塞式的解决方案:synchronized,Lock
- 非阻塞式的解决方案:原子变量
当一个对象拥有锁的时候,其他线程会进入到阻塞状态,获取锁的执行完毕会唤醒拿不到该锁阻塞的线程
synchronized (Demo01.class){n++;}
- synchronized(对象) 中的对象,可以想象为一个房间(room),有唯一入口(门)房间只能一次进入一人进行计算,线程 t1,t2 想象成两个人
- 当线程 t1 执行到 synchronized(room) 时就好比 t1 进入了这个房间,并锁住了门拿走了钥匙,在门内执行count++ 代码
- 这时候如果 t2 也运行到了 synchronized(room) 时,它发现门被锁住了,只能在门外等待,发生了上下文切换,阻塞住了
- 这中间即使 t1 的 cpu 时间片不幸用完,被踢出了门外(不要错误理解为锁住了对象就能一直执行下去哦),
- 这时门还是锁住的,t1 仍拿着钥匙,t2 线程还在阻塞状态进不来,只有下次轮到 t1 自己再次获得时间片时才能开门进入
- 当 t1 执行完 synchronized{} 块内的代码,这时候才会从 obj 房间出来并解开门上的锁,唤醒t2 线程把钥匙给他。t2 线程这时才可以进入 obj 房间,锁住了门拿上钥匙,执行它的 count-- 代码
synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
改造
更符合面相对象线程操作资源类的思想
@Test
public void test() throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(()->{
for (int i = 0; i < 5000; i++) {
room.increment();
}
},"t1");
Thread t2 = new Thread(()->{
for (int i = 0; i < 500; i++) {
room.decrement();
}
},"t2");
t1.start();
t2.start();
t2.join();
t1.join();
log.info("最终n的结果为{}",room.get());
}
class Room {
int value = 0;
public void increment() {
synchronized (this) {
value++;
}
}
public void decrement() {
synchronized (this) {
value--;
}
}
public int get() {
synchronized (this) {
return value;
}
}
public Room() {
}
}
静态方法中synchronized等价于锁是该类,非静态方法锁是该对象的实例
线程安全问题
- 如果它们没有共享,则线程安全
- 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
- 如果只有读操作,则线程安全
- 如果有读写操作,则这段代码是临界区,需要考虑线程安全
- 局部变量是线程安全的
- 但局部变量引用的对象则未必
- 如果该对象没有逃离方法的作用访问,它是线程安全的
- 如果该对象逃离方法的作用范围,需要考虑线程安全
成员变量的线程安全问题
class ThreadUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
method2();
method3();
}
}
private void method2() {
list.add("1");
}
private void method3() {
list.remove(0);
}
原因:多个线程添加的时候有可能只添加成功一次,多个线程执行删除的时候执行了多次
list.add
不是原子操作
常见的线程安全类
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的类
线程安全指的是多个线程调用他们类的方法是安全的。即可以理解为他们的方法都是原子性的。但是组合起来不是原子的
开闭原则中不想让子类覆盖的方法可以将该类设置成为final,如string
多人买票线程代码分析
public class ExerciseSell {
public static void main(String[] args) throws InterruptedException {
// 模拟多人买票
TicketWindow window = new TicketWindow(1000);
// 所有线程的集合
List<Thread> threadList = new ArrayList<>();
// 卖出的票数统计
List<Integer> amountList = new Vector<>();
for (int i = 0; i < 2000; i++) {
Thread thread = new Thread(() -> {
// 买票
int amount = window.sell(random(5));
// 统计买票数
amountList.add(amount);
});
threadList.add(thread);
thread.start();
}
for (Thread thread : threadList) {
thread.join();
}
// 统计卖出的票数和剩余票数
log.debug("余票:{}",window.getCount());
log.debug("卖出的票数:{}", amountList.stream().mapToInt(i-> i).sum());
}
// Random 为线程安全
static Random random = new Random();
// 随机 1~5
public static int random(int amount) {
return random.nextInt(amount) + 1;
}
}
// 售票窗口
class TicketWindow {
private int count;
public TicketWindow(int count) {
this.count = count;
}
// 获取余票数量
public int getCount() {
return count;
}
// 售票
public synchronized int sell(int amount) {
if (this.count >= amount) {
this.count -= amount;
return amount;
} else {
return 0;
}
}
}
下面的这个由于涉及到两个对象,因此需要锁住该类
@Slf4j(topic = "c.ExerciseTransfer")
public class ExerciseTransfer {
public static void main(String[] args) throws InterruptedException {
Account a = new Account(1000);
Account b = new Account(1000);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
a.transfer(b, randomAmount());
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
b.transfer(a, randomAmount());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
// 查看转账2000次后的总金额
log.debug("total:{}", (a.getMoney() + b.getMoney()));
}
// Random 为线程安全
static Random random = new Random();
// 随机 1~100
public static int randomAmount() {
return random.nextInt(100) + 1;
}
}
// 账户
class Account {
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public void setMoney(int money) {
this.money = money;
}
// 转账
public void transfer(Account target, int amount) {
synchronized(Account.class) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
}
对象头的相关信息
intege占用空间分析,一个int占用4个字节,然后一个包装对象需要包含对象头核对象值,对象头需要占用8个字节,int中一个包装类型是普通类型的4倍
obj中的对象头里记录着对象的锁的地址
后面synchronized的原理感觉很复杂,后面再看
wait 和notify
@Test
public void test() throws InterruptedException {
Object lock = new Object();
new Thread(()->{
synchronized (lock) {
log.info("我睡眠了");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被唤醒了");
}
},"t1").start();
new Thread(()->{
synchronized (lock) {
log.info("我睡眠了");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被唤醒了");
}
},"t2").start();
TimeUnit.SECONDS.sleep(2);
synchronized (lock){
log.info("我随机唤醒一个");
lock.notify();
}
}
wait和notify的正确姿势
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
// 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了
while (!hasCigarette) {
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();
}
保护性暂停
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
主线程等待另外一个线程获取结果
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
GuardObject guardObject = new GuardObject();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("获取到了结果");
guardObject.complete("hello");
}).start();
log.info("主线程等待结果");
log.info(""+guardObject.get());
}
}
class GuardObject{
private Object response;
private final Object lock = new Object();
public Object get() throws InterruptedException {
synchronized (lock) {
while (Objects.isNull(response)){
lock.wait();
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
join底层就是应用该线程进行操作
带超时时间的等待
多任务的
相关事例
@Data
@Slf4j
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(2);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
@Slf4j
class People extends Thread{
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.info("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.info("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.info("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
// 增加超时效果
class GuardedObject {
// 标识 Guarded Object
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
// 结果
private Object response;
// 获取结果
// timeout 表示要等待多久 2000
public Object get(long timeout) {
synchronized (this) {
// 开始时间 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (Objects.isNull(response)) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间时,退出循环
if (waitTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
以上是生产者和消费者一一对应,生产的消息回立即被消费
生产者和消费者
public class Demo02 {
public static void main(String[] args) throws InterruptedException {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info( messageQueue.take().toString());
},"消费者"+i).start();
}
for (int i = 0; i < 5; i++) {
int id = i;
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("防止的id为"+id);
messageQueue.put(new Message(id,"消息"+id));
},"生产者"+i).start();
}
}
}
@Data
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
@Slf4j
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()){
log.info("队列为空,等待...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.notifyAll();
return queue.removeFirst();
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size()==capacity){
log.info("队列满了,等待...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
也可以消费者只有一个,一直循环
park和unpark
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
log.info("我被park了");
LockSupport.park();
log.info("我又被unpark了");
},"t1");
t1.start();
TimeUnit.SECONDS.sleep(2);
LockSupport.unpark(t1);
}
先unpark然后再park的效果
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被park了");
LockSupport.park();
log.info("我又被unpark了");
},"t1");
t1.start();
TimeUnit.SECONDS.sleep(1);
LockSupport.unpark(t1);
此时日志输出
- park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll
- 是唤醒所有等待线程,就不那么【精确】park & unpark 可以先 unpark,而 wait & notify 不能先 notify
线程状态转换
- new->runnable 调用start方法
- RUNNABLE <--> WAITING t 线程用 synchronized(obj) 获取了对象锁后 调用
obj.wait()
方法时,t 线程从 RUNNABLE --> WAITING 调用 obj.notify() , obj.notifyAll() , t.interrupt() 时 竞争锁成功,t 线程从 WAITING --> RUNNABLE 竞争锁失败,t 线程从 WAITING --> BLOCKED当前线程调用t.join()
方法时,当前线程从 RUNNABLE --> WAITING注意是当前线程在t 线程对象的监视器上等待t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING 调LockSupport.unpark
(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从WAITING -->RUNNABLE - RUNNABLE <--> TIMED_WAITING
- RUNNABLE <--> BLOCKED t 线程用 synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKE
降低锁的粒度,准备多把锁
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
TimeUnit.SECONDS.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
TimeUnit.SECONDS.sleep(1);
}
}
}
需要保证这两个方法没有关联,可以提高并发度,但是这种情况下容易造成死锁。
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
synchronized (A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();
哲学家死锁问题
相互持有对方的锁不放开
public class TestDeadLock {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c1, c5).start();
}
}
@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
synchronized (left) {
// 尝试获得右手筷子
synchronized (right) {
eat();
}
}
}
}
Random random = new Random();
private void eat() {
log.debug("eating...");
Sleeper.sleep(0.5);
}
}
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
活锁
两个线程改变了对方的结束条件,谁也无法结束
解决死锁可以一次性获得所有的锁,这种情况下会产生饥饿问题。
线程饥饿问题
ReentrantLock
相对于 synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
lock.lock();
try {
method1();
} finally {
lock.unlock();
}
}
public static void method1(){
lock.lock();
try {
log.info("进入到method1");
method2();
}finally {
lock.unlock();
}
}
private static void method2() {
lock.lock();
try {
log.info("进入到method2");
}finally {
lock.unlock();
}
}
可被打断
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.info("获得所的过程中被打断");
}
try {
log.info("获取到了锁");
}finally {
lock.unlock();
}
},"t1");
lock.lock();
t1.start();
try {
Sleeper.sleep(1);
log.info("打断锁");
t1.interrupt();
} finally {
lock.unlock();
}
}
虽然被打断了,但是还是可以获取的到
可超时
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.info("启动...");
// 可以加时间
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
log.info("获取立刻失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.info("获得了锁");
t1.start();
try {
Sleeper.sleep(3);
} finally {
lock.unlock();
}
}
公平锁,在创建锁的时候指定是否公平
条件变量
- synchronized 中也有条件变量,wait notify 就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比synchronized 是那些不满足条件的线程都在一间休息室等消息而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
lock.lock();
try {
while (!hasCigrette) {
log.info("没有烟,不干活");
waitCigaretteQueue.await();
}
// 此处和wait 相比不许再判断
log.info("干活");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"t1").start();
new Thread(()->{
lock.lock();
try {
while (!hasBreakfast) {
log.info("没有早餐,不干活");
waitbreakfastQueue.await();
}
// 此处和wait 相比不许再判断
log.info("干活");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"t2").start();
lock.lock();
try {
log.info("唤醒早餐和香烟");
Sleeper.sleep(2);
hasBreakfast = true;
hasCigrette = true;
waitbreakfastQueue.signal();
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
本章小结
本章我们需要重点掌握的是
- 分析多线程访问共享资源时,哪些代码片段属于临界区
- 使用 synchronized 互斥解决临界区的线程安全问题
- 掌握 synchronized 锁对象语法
- 掌握 synchronzied 加载成员方法和静态方法语法
- 掌握 wait/notify 同步方法
- 使用 lock 互斥解决临界区的线程安全问题
- 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
- 学会分析变量的线程安全性、掌握常见线程安全类的使用
- 了解线程活跃性问题:死锁、活锁、饥饿
- 应用方面
- 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果
- 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果
- 原理方面
- monitor、synchronized 、wait/notify 原理
- synchronized 进阶原理
- park & unpark 原理
- 模式方面
- 同步模式之保护性暂停
- 异步模式之生产者消费者
- 同步模式之顺序控制