進程與線程
進程
- 程序由指令和數據組成,但這些指令要運行,數據要讀寫,就必須將指令加載至 CPU,數據加載至內存。在指令運行過程中還需要用到磁盤、網絡等設備。進程就是用來加載指令、管理內存、管理 IO 的
- 當一個程序被運行,從磁盤加載這個程序的代碼至內存,這時就開啟了一個進程。
- 進程就可以視為程序的一個實例。大部分程序可以同時運行多個實例進程(例如記事本、畫圖、瀏覽器等),也有的程序只能啟動一個實例進程(例如網易雲音樂、360 安全衛士等)
線程
- 一個進程之內可以分為一到多個線程。
- 一個線程就是一個指令流,將指令流中的一條條指令以一定的順序交給 CPU 執行
- Java 中,線程作為最小調度單位,進程作為資源分配的最小單位。 在 windows 中進程是不活動的,只是作為線程的容器
兩者對比
- 進程基本上相互獨立的,而線程存在於進程內,是進程的一個子集
- 進程擁有共享的資源,如內存空間等,供其內部的線程共享
- 進程間通信較為復雜
- 同一台計算機的進程通信稱為 IPC(Inter-process communication)
- 不同計算機之間的進程通信,需要通過網絡,並遵守共同的協議,例如 HTTP
- 線程通信相對簡單,因為它們共享進程內的內存,一個例子是多個線程可以訪問同一個共享變量
- 線程更輕量,線程上下文切換成本一般上要比進程上下文切換低
並發與並行
單核 cpu 下,線程實際還是 串行執行的。操作系統中有一個組件叫做任務調度器,將 cpu 的時間片(windows下時間片最小約為 15 毫秒)分給不同的程序使用,只是由於 cpu 在線程間(時間片很短)的切換非常快,人類感覺是 同時運行的 。總結為一句話就是: 微觀串行,宏觀並行 ,一般會將這種 線程輪流使用 CPU 的做法稱為並發
cpu | 時間片1 | 時間片2 | 時間片3 |
---|---|---|---|
core | 線程1 | 線程2 | 線程3 |
多核 cpu下,每個 核(core)都可以調度運行線程,這時候線程可以是並行的。
cpu | 時間片1 | 時間片2 | 時間片3 |
---|---|---|---|
core | 線程1 | 線程2 | 線程3 |
core | 線程2 | 線程3 | 線程1 |
- 並發(concurrent)是同一時間應對(dealing with)多件事情的能力
- 並行(parallel)是同一時間動手做(doing)多件事情的能力
同步和異步
以調用方角度來講,如果需要等待結果返回,才能繼續運行就是同步,不需要等待結果返回,就能繼續運行就是異步
多核cpu才能夠提高效率,單核仍然是輪流執行,還多了線程切換的時間
- 單核 cpu 下,多線程不能實際提高程序運行效率,只是為了能夠在不同的任務之間切換,不同線程輪流使用cpu ,不至於一個線程總占用 cpu,別的線程沒法干活
- 多核 cpu 可以並行跑多個線程,但能否提高程序運行效率還是要分情況的,有些任務,經過精心設計,將任務拆分,並行執行,當然可以提高程序的運行效率。但不是所有計算任務都能拆分(參考后文的【阿姆達爾定律】)也不是所有任務都需要拆分,任務的目的如果不同,談拆分和效率沒啥意義
- IO 操作不占用 cpu,只是我們一般拷貝文件使用的是【阻塞 IO】,這時相當於線程雖然不用 cpu,但需要一直等待 IO 結束,沒能充分利用線程。所以才有后面的【非阻塞 IO】和【異步 IO】優化
Java線程
本章內容
- 創建和運行線程
- 查看線程
- 線程 API
- 線程狀態
創建核運行線程
// 最常規的方式,不推薦
new Thread("thread1"){
@Override
public void run() {
System.out.println("我是另外一個線程");
}
}.start();
System.out.println("你好啊啊哈哈哈Z");
TimeUnit.SECONDS.sleep(1);
System.out.println("啊哈哈哈");
//線程和任務分開,更加靈活
Runnable runable = ()->log.info("我是b線程");
new Thread(runable,"t2").start();
log.info("我是主線程");
FutureTask<String> futureTask = new FutureTask<>(()->{
log.info("我是另外一個線程");
TimeUnit.SECONDS.sleep(3);
return "啊哈哈哈";
});
new Thread(futureTask,"t1").start();
log.info("我是主線程");
// 間接實現了future接口,所以可以
log.info(futureTask.get());
線程上下文切換
因為以下一些原因導致 cpu 不再執行當前的線程,轉而執行另一個線程的代碼
切換的原因
- 線程的 cpu 時間片用完
- 垃圾回收
- 有更高優先級的線程需要運行
- 線程自己調用了 sleep、yield、wait、join、park、synchronized、lock 等方法
當 Context Switch 發生時,需要由操作系統保存當前線程的狀態,並恢復另一個線程的狀態,Java 中對應的概念 - 就是程序計數器(Program Counter Register),它的作用是記住下一條 jvm 指令的執行地址,是線程私有的
- 狀態包括程序計數器、虛擬機棧中每個棧幀的信息,如局部變量、操作數棧、返回地址等
- Context Switch 頻繁發生會影響性能
相關方法解析
run和start
- 直接調用 run 是在主線程中執行了 run,沒有啟動新的線程
- 使用 start 是啟動新的線程,通過新的線程間接執行 run 中的代碼
sleep和yield
- 調用 sleep 會讓當前線程從 Running 進入 Timed Waiting 狀態(阻塞),此時任務調度器不會分配時間片給該線程
- 其它線程可以使用 interrupt 方法打斷正在睡眠的線程,這時 sleep 方法會拋出 InterruptedException
- 睡眠結束后的線程未必會立刻得到執行
- 建議用 TimeUnit 的 sleep 代替 Thread 的 sleep 來獲得更好的可讀性
yield意思為禮讓一下 - 調用 yield 會讓當前線程從 Running 進入 Runnable 就緒狀態,然后調度執行線程,此時任務調度器可以分配時間片給該線程
- 具體的實現依賴於操作系統的任務調度器
可以通過thread.setPriority(Thread.MAX_PRIORITY)
設置線程優先級,在cpu繁忙的時候,設置了優先級會有更大的機會優先執行,空閑的時候沒有什么作用
Runnable task1 = () -> {
int count = 0;
for (; ; ) {
System.out.println("---->1 " + count++);
}
};
Runnable task2 = () -> {
int count = 0;
for (; ; ) {
// Thread.yield();
System.out.println(" ---->2 " + count++);
}
};
Thread t1 = new Thread(task1, "t1");
Thread t2 = new Thread(task2, "t2");
// t1.setPriority(Thread.MIN_PRIORITY);
// t2.setPriority(Thread.MAX_PRIORITY);
t1.start();
t2.start();
sleep可以避免cpu空轉導致cpu被占滿
join方法詳解
static int r = 10;
@Test
public void test() throws InterruptedException {
Thread thread = new Thread(() -> {
log.info("線程開始更改r變量");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
r = 100;
});
thread.start();
thread.join();
log.info("r的值為{}", r);
}
如下代碼,如果不實用join,此時r的直還沒有更改,顯示的還是更改前的數值。
join
的作用為等待某個線程運行結束
多個join情況
@Test
public void test() throws InterruptedException {
Thread thread = new Thread(() -> {
try {
log.info("t1開始");
TimeUnit.SECONDS.sleep(1);
log.info("t1結束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
Thread thread1 = new Thread(() -> {
try {
log.info("t2開始");
TimeUnit.SECONDS.sleep(2);
log.info("t2結束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2");
log.info("主線程開始");
thread.start();
thread1.start();
thread.join();
thread1.join();
log.info("主線程結束");
}
相關結果
只需要等待兩秒
如果顛倒兩個join,示意圖如下
有時效的join
Thread thread = new Thread(() -> {
try {
log.info("t1開始");
TimeUnit.SECONDS.sleep(2);
log.info("t1結束");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1");
log.info("主線程開始");
thread.start();
thread.join(1);
log.info("主線程結束");
結果如下
主線程結束那么就結束了。如果沒等夠結束繼續向下運行。
interrupt 方法詳解
sleep,join,wait(后面介紹)會使該線程處於阻塞狀態,處於改狀態的線程cpu將不會分配時間片給該線程,這也是sleep可以降低cpu的使用率的原因。處於阻塞狀態下的線程可以被打斷,此時會拋出異常。打斷處於阻塞狀態下的線程,此時不會處理打斷狀態,打斷正常運行下的線程,打斷標記會置為真,可以用來優雅的停止線程
@Test
public void test() {
Thread thread = new Thread(() -> {
sleep(2);
},"t1");
log.info("主線程開始");
thread.start();
sleep(1);
thread.interrupt();
log.info(thread.isInterrupted()+"");
log.info("主線程結束");
}
需要在線程中自己處理打斷的狀態,代碼如下
兩階段終止模式(在t1線程中優雅的停止t2線程)
Thread thread = new Thread(() -> {
while (true) {
try {
if (Thread.currentThread().isInterrupted()){
log.info("我被打斷了(▰˘︹˘▰)");
break;
}
log.info("執行監控");
sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
},"t1");
thread.start();
sleep(5);
log.info("主線程打斷監控線程");
thread.interrupt();
sleep(3);
log.info("主線程結束");
@Test
public void test() throws InterruptedException {
// 處於這個狀態也是阻塞
// 當被打算后打斷標志為真
Thread thread = new Thread(()->{
log.info("開始打斷");
LockSupport.park();
log.info("打斷標志{}",Thread.currentThread().isInterrupted());
// 當打斷標志為真的時候再次執行park時效,打斷標志為加的時候才有效,
// Thread.interrupted()可以清除打斷標志
LockSupport.park();
log.info("啦啦啦");
});
thread.start();
sleep(2);
thread.interrupt();
sleep(1);
}
守護線程
我在本地實驗沒有達到這個效果。main線程結束后就沒有下文了。
線程的狀態
- NEW 線程剛被創建,但是還沒有調用 start() 方法
- RUNNABLE 當調用了 start() 方法之后,注意,Java API 層面的 RUNNABLE 狀態涵蓋了 操作系統 層面的
- 【可運行狀態】、【運行狀態】和【阻塞狀態】(由於 BIO 導致的線程阻塞,在 Java 里無法區分,仍然認為是可運行)
- BLOCKED , WAITING , TIMED_WAITING 都是 Java API 層面對【阻塞狀態】的細分,后面會在狀態轉換一節詳述
- TERMINATED 當線程代碼運行結束
Thread t1 = new Thread("t1") {
@Override
public void run() {
log.debug("running...");
}
};
Thread t2 = new Thread("t2") {
@Override
public void run() {
while(true) { // runnable
}
}
};
t2.start();
Thread t3 = new Thread("t3") {
@Override
public void run() {
log.debug("running...");
}
};
t3.start();
Thread t4 = new Thread("t4") {
@Override
public void run() {
synchronized (TestState.class) {
try {
Thread.sleep(1000000); // timed_waiting
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t4.start();
Thread t5 = new Thread("t5") {
@Override
public void run() {
try {
t2.join(); // waiting
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
t5.start();
Thread t6 = new Thread("t6") {
@Override
public void run() {
synchronized (TestState.class) { // blocked
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t6.start();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("t1 state {}", t1.getState());
log.debug("t2 state {}", t2.getState());
log.debug("t3 state {}", t3.getState());
log.debug("t4 state {}", t4.getState());
log.debug("t5 state {}", t5.getState());
log.debug("t6 state {}", t6.getState());
System.in.read();
線程的6種狀態代碼示例如上
多線程統籌的事例
Thread t1 = new Thread(()->{
log.info("洗水壺");
sleep(1);
log.info("燒開水");
sleep(15);
},"t1");
Thread t2 = new Thread(()->{
log.info("洗茶壺");
sleep(1);
log.info("洗茶杯");
sleep(2);
log.info("拿茶葉");
sleep(1);
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("泡茶");
},"t2");
t1.start();
t2.start();
需要花費16秒
共享模型之管程
本章內容
- 共享問題
- synchronized
- 線程安全分析
- Monitor
- wait/notify
- 線程狀態轉換
- 活躍性
- Lock
多線程操作共享資源此時會有線程安全的問題
Thread t1 = new Thread(()->{
for (int i = 0; i < 5000; i++) {
n++;
}
},"t1");
Thread t2 = new Thread(()->{
for (int i = 0; i < 500; i++) {
n--;
}
},"t2");
t1.start();
t2.start();
t2.join();
t1.join();
log.info("最終n的結果為{}",n);
發現最終n的結果不是0
在多個線程對共享資源讀寫操作時發生指令交錯,就會出現問題
一段代碼塊內如果存在對共享資源的多線程讀寫操作,稱這段代碼塊為臨界區
既有讀又有寫
多個線程在臨界區內執行,由於代碼的執行序列不同而導致結果無法預測,稱之為發生了競態條件
為了避免臨界區的競態條件發生,有多種手段可以達到目的。
- 阻塞式的解決方案:synchronized,Lock
- 非阻塞式的解決方案:原子變量
當一個對象擁有鎖的時候,其他線程會進入到阻塞狀態,獲取鎖的執行完畢會喚醒拿不到該鎖阻塞的線程
synchronized (Demo01.class){n++;}
- synchronized(對象) 中的對象,可以想象為一個房間(room),有唯一入口(門)房間只能一次進入一人進行計算,線程 t1,t2 想象成兩個人
- 當線程 t1 執行到 synchronized(room) 時就好比 t1 進入了這個房間,並鎖住了門拿走了鑰匙,在門內執行count++ 代碼
- 這時候如果 t2 也運行到了 synchronized(room) 時,它發現門被鎖住了,只能在門外等待,發生了上下文切換,阻塞住了
- 這中間即使 t1 的 cpu 時間片不幸用完,被踢出了門外(不要錯誤理解為鎖住了對象就能一直執行下去哦),
- 這時門還是鎖住的,t1 仍拿着鑰匙,t2 線程還在阻塞狀態進不來,只有下次輪到 t1 自己再次獲得時間片時才能開門進入
- 當 t1 執行完 synchronized{} 塊內的代碼,這時候才會從 obj 房間出來並解開門上的鎖,喚醒t2 線程把鑰匙給他。t2 線程這時才可以進入 obj 房間,鎖住了門拿上鑰匙,執行它的 count-- 代碼
synchronized 實際是用對象鎖保證了臨界區內代碼的原子性,臨界區內的代碼對外是不可分割的,不會被線程切換所打斷。
改造
更符合面相對象線程操作資源類的思想
@Test
public void test() throws InterruptedException {
Room room = new Room();
Thread t1 = new Thread(()->{
for (int i = 0; i < 5000; i++) {
room.increment();
}
},"t1");
Thread t2 = new Thread(()->{
for (int i = 0; i < 500; i++) {
room.decrement();
}
},"t2");
t1.start();
t2.start();
t2.join();
t1.join();
log.info("最終n的結果為{}",room.get());
}
class Room {
int value = 0;
public void increment() {
synchronized (this) {
value++;
}
}
public void decrement() {
synchronized (this) {
value--;
}
}
public int get() {
synchronized (this) {
return value;
}
}
public Room() {
}
}
靜態方法中synchronized等價於鎖是該類,非靜態方法鎖是該對象的實例
線程安全問題
- 如果它們沒有共享,則線程安全
- 如果它們被共享了,根據它們的狀態是否能夠改變,又分兩種情況
- 如果只有讀操作,則線程安全
- 如果有讀寫操作,則這段代碼是臨界區,需要考慮線程安全
- 局部變量是線程安全的
- 但局部變量引用的對象則未必
- 如果該對象沒有逃離方法的作用訪問,它是線程安全的
- 如果該對象逃離方法的作用范圍,需要考慮線程安全
成員變量的線程安全問題
class ThreadUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
method2();
method3();
}
}
private void method2() {
list.add("1");
}
private void method3() {
list.remove(0);
}
原因:多個線程添加的時候有可能只添加成功一次,多個線程執行刪除的時候執行了多次
list.add
不是原子操作
常見的線程安全類
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的類
線程安全指的是多個線程調用他們類的方法是安全的。即可以理解為他們的方法都是原子性的。但是組合起來不是原子的
開閉原則中不想讓子類覆蓋的方法可以將該類設置成為final,如string
多人買票線程代碼分析
public class ExerciseSell {
public static void main(String[] args) throws InterruptedException {
// 模擬多人買票
TicketWindow window = new TicketWindow(1000);
// 所有線程的集合
List<Thread> threadList = new ArrayList<>();
// 賣出的票數統計
List<Integer> amountList = new Vector<>();
for (int i = 0; i < 2000; i++) {
Thread thread = new Thread(() -> {
// 買票
int amount = window.sell(random(5));
// 統計買票數
amountList.add(amount);
});
threadList.add(thread);
thread.start();
}
for (Thread thread : threadList) {
thread.join();
}
// 統計賣出的票數和剩余票數
log.debug("余票:{}",window.getCount());
log.debug("賣出的票數:{}", amountList.stream().mapToInt(i-> i).sum());
}
// Random 為線程安全
static Random random = new Random();
// 隨機 1~5
public static int random(int amount) {
return random.nextInt(amount) + 1;
}
}
// 售票窗口
class TicketWindow {
private int count;
public TicketWindow(int count) {
this.count = count;
}
// 獲取余票數量
public int getCount() {
return count;
}
// 售票
public synchronized int sell(int amount) {
if (this.count >= amount) {
this.count -= amount;
return amount;
} else {
return 0;
}
}
}
下面的這個由於涉及到兩個對象,因此需要鎖住該類
@Slf4j(topic = "c.ExerciseTransfer")
public class ExerciseTransfer {
public static void main(String[] args) throws InterruptedException {
Account a = new Account(1000);
Account b = new Account(1000);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
a.transfer(b, randomAmount());
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
b.transfer(a, randomAmount());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
// 查看轉賬2000次后的總金額
log.debug("total:{}", (a.getMoney() + b.getMoney()));
}
// Random 為線程安全
static Random random = new Random();
// 隨機 1~100
public static int randomAmount() {
return random.nextInt(100) + 1;
}
}
// 賬戶
class Account {
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public void setMoney(int money) {
this.money = money;
}
// 轉賬
public void transfer(Account target, int amount) {
synchronized(Account.class) {
if (this.money >= amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
}
對象頭的相關信息
intege占用空間分析,一個int占用4個字節,然后一個包裝對象需要包含對象頭核對象值,對象頭需要占用8個字節,int中一個包裝類型是普通類型的4倍
obj中的對象頭里記錄着對象的鎖的地址
后面synchronized的原理感覺很復雜,后面再看
wait 和notify
@Test
public void test() throws InterruptedException {
Object lock = new Object();
new Thread(()->{
synchronized (lock) {
log.info("我睡眠了");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被喚醒了");
}
},"t1").start();
new Thread(()->{
synchronized (lock) {
log.info("我睡眠了");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被喚醒了");
}
},"t2").start();
TimeUnit.SECONDS.sleep(2);
synchronized (lock){
log.info("我隨機喚醒一個");
lock.notify();
}
}
wait和notify的正確姿勢
public static void main(String[] args) {
new Thread(() -> {
synchronized (room) {
log.debug("有煙沒?[{}]", hasCigarette);
// 用 notifyAll 僅解決某個線程的喚醒問題,但使用 if + wait 判斷僅有一次機會,一旦條件不成立,就沒有重新判斷的機會了
while (!hasCigarette) {
log.debug("沒煙,先歇會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有煙沒?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以開始干活了");
} else {
log.debug("沒干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外賣送到沒?[{}]", hasTakeout);
while (!hasTakeout) {
log.debug("沒外賣,先歇會!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外賣送到沒?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以開始干活了");
} else {
log.debug("沒干成活...");
}
}
}, "小女").start();
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外賣到了噢!");
room.notifyAll();
}
}, "送外賣的").start();
}
保護性暫停
- 有一個結果需要從一個線程傳遞到另一個線程,讓他們關聯同一個 GuardedObject
- 如果有結果不斷從一個線程到另一個線程那么可以使用消息隊列(見生產者/消費者)
- JDK 中,join 的實現、Future 的實現,采用的就是此模式
- 因為要等待另一方的結果,因此歸類到同步模式
主線程等待另外一個線程獲取結果
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
GuardObject guardObject = new GuardObject();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("獲取到了結果");
guardObject.complete("hello");
}).start();
log.info("主線程等待結果");
log.info(""+guardObject.get());
}
}
class GuardObject{
private Object response;
private final Object lock = new Object();
public Object get() throws InterruptedException {
synchronized (lock) {
while (Objects.isNull(response)){
lock.wait();
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
this.response = response;
lock.notifyAll();
}
}
}
join底層就是應用該線程進行操作
帶超時時間的等待
多任務的
相關事例
@Data
@Slf4j
public class Demo01 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(2);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "內容" + id).start();
}
}
}
@Slf4j
class People extends Thread{
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.info("開始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.info("收到信 id:{}, 內容:{}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.info("送信 id:{}, 內容:{}", id, mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 產生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
// 增加超時效果
class GuardedObject {
// 標識 Guarded Object
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
// 結果
private Object response;
// 獲取結果
// timeout 表示要等待多久 2000
public Object get(long timeout) {
synchronized (this) {
// 開始時間 15:00:00
long begin = System.currentTimeMillis();
// 經歷的時間
long passedTime = 0;
while (Objects.isNull(response)) {
// 這一輪循環應該等待的時間
long waitTime = timeout - passedTime;
// 經歷的時間超過了最大等待時間時,退出循環
if (waitTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虛假喚醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得經歷時間
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
// 產生結果
public void complete(Object response) {
synchronized (this) {
// 給結果成員變量賦值
this.response = response;
this.notifyAll();
}
}
}
以上是生產者和消費者一一對應,生產的消息回立即被消費
生產者和消費者
public class Demo02 {
public static void main(String[] args) throws InterruptedException {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info( messageQueue.take().toString());
},"消費者"+i).start();
}
for (int i = 0; i < 5; i++) {
int id = i;
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("防止的id為"+id);
messageQueue.put(new Message(id,"消息"+id));
},"生產者"+i).start();
}
}
}
@Data
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
@Slf4j
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()){
log.info("隊列為空,等待...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.notifyAll();
return queue.removeFirst();
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size()==capacity){
log.info("隊列滿了,等待...");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
也可以消費者只有一個,一直循環
park和unpark
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
log.info("我被park了");
LockSupport.park();
log.info("我又被unpark了");
},"t1");
t1.start();
TimeUnit.SECONDS.sleep(2);
LockSupport.unpark(t1);
}
先unpark然后再park的效果
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我被park了");
LockSupport.park();
log.info("我又被unpark了");
},"t1");
t1.start();
TimeUnit.SECONDS.sleep(1);
LockSupport.unpark(t1);
此時日志輸出
- park & unpark 是以線程為單位來【阻塞】和【喚醒】線程,而 notify 只能隨機喚醒一個等待線程,notifyAll
- 是喚醒所有等待線程,就不那么【精確】park & unpark 可以先 unpark,而 wait & notify 不能先 notify
線程狀態轉換
- new->runnable 調用start方法
- RUNNABLE <--> WAITING t 線程用 synchronized(obj) 獲取了對象鎖后 調用
obj.wait()
方法時,t 線程從 RUNNABLE --> WAITING 調用 obj.notify() , obj.notifyAll() , t.interrupt() 時 競爭鎖成功,t 線程從 WAITING --> RUNNABLE 競爭鎖失敗,t 線程從 WAITING --> BLOCKED當前線程調用t.join()
方法時,當前線程從 RUNNABLE --> WAITING注意是當前線程在t 線程對象的監視器上等待t 線程運行結束,或調用了當前線程的 interrupt() 時,當前線程從 WAITING --> RUNNABLE 當前線程調用 LockSupport.park() 方法會讓當前線程從 RUNNABLE --> WAITING 調LockSupport.unpark
(目標線程) 或調用了線程 的 interrupt() ,會讓目標線程從WAITING -->RUNNABLE - RUNNABLE <--> TIMED_WAITING
- RUNNABLE <--> BLOCKED t 線程用 synchronized(obj) 獲取了對象鎖時如果競爭失敗,從 RUNNABLE --> BLOCKED持 obj 鎖線程的同步代碼塊執行完畢,會喚醒該對象上所有 BLOCKED 的線程重新競爭,如果其中 t 線程競爭成功,從 BLOCKED --> RUNNABLE ,其它失敗的線程仍然 BLOCKE
降低鎖的粒度,准備多把鎖
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小時");
TimeUnit.SECONDS.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小時");
TimeUnit.SECONDS.sleep(1);
}
}
}
需要保證這兩個方法沒有關聯,可以提高並發度,但是這種情況下容易造成死鎖。
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
synchronized (A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();
哲學家死鎖問題
相互持有對方的鎖不放開
public class TestDeadLock {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("蘇格拉底", c1, c2).start();
new Philosopher("柏拉圖", c2, c3).start();
new Philosopher("亞里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c1, c5).start();
}
}
@Slf4j(topic = "c.Philosopher")
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 嘗試獲得左手筷子
synchronized (left) {
// 嘗試獲得右手筷子
synchronized (right) {
eat();
}
}
}
}
Random random = new Random();
private void eat() {
log.debug("eating...");
Sleeper.sleep(0.5);
}
}
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
活鎖
兩個線程改變了對方的結束條件,誰也無法結束
解決死鎖可以一次性獲得所有的鎖,這種情況下會產生飢餓問題。
線程飢餓問題
ReentrantLock
相對於 synchronized 它具備如下特點
- 可中斷
- 可以設置超時時間
- 可以設置為公平鎖
- 支持多個條件變量
- 與 synchronized 一樣,都支持可重入
基本語法
// 獲取鎖
reentrantLock.lock();
try {
// 臨界區
} finally {
// 釋放鎖
reentrantLock.unlock();
}
可重入
可重入是指同一個線程如果首次獲得了這把鎖,那么因為它是這把鎖的擁有者,因此有權利再次獲取這把鎖如果是不可重入鎖,那么第二次獲得鎖時,自己也會被鎖擋住
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
lock.lock();
try {
method1();
} finally {
lock.unlock();
}
}
public static void method1(){
lock.lock();
try {
log.info("進入到method1");
method2();
}finally {
lock.unlock();
}
}
private static void method2() {
lock.lock();
try {
log.info("進入到method2");
}finally {
lock.unlock();
}
}
可被打斷
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.info("獲得所的過程中被打斷");
}
try {
log.info("獲取到了鎖");
}finally {
lock.unlock();
}
},"t1");
lock.lock();
t1.start();
try {
Sleeper.sleep(1);
log.info("打斷鎖");
t1.interrupt();
} finally {
lock.unlock();
}
}
雖然被打斷了,但是還是可以獲取的到
可超時
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
log.info("啟動...");
// 可以加時間
try {
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
log.info("獲取立刻失敗,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.info("獲得了鎖");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.info("獲得了鎖");
t1.start();
try {
Sleeper.sleep(3);
} finally {
lock.unlock();
}
}
公平鎖,在創建鎖的時候指定是否公平
條件變量
- synchronized 中也有條件變量,wait notify 就是我們講原理時那個 waitSet 休息室,當條件不滿足時進入 waitSet 等待ReentrantLock 的條件變量比 synchronized 強大之處在於,它是支持多個條件變量的,這就好比synchronized 是那些不滿足條件的線程都在一間休息室等消息而 ReentrantLock 支持多間休息室,有專門等煙的休息室、專門等早餐的休息室、喚醒時也是按休息室來喚醒
使用要點:
- await 前需要獲得鎖
- await 執行后,會釋放鎖,進入 conditionObject 等待
- await 的線程被喚醒(或打斷、或超時)取重新競爭 lock 鎖
- 競爭 lock 鎖成功后,從 await 后繼續執行
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
lock.lock();
try {
while (!hasCigrette) {
log.info("沒有煙,不干活");
waitCigaretteQueue.await();
}
// 此處和wait 相比不許再判斷
log.info("干活");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"t1").start();
new Thread(()->{
lock.lock();
try {
while (!hasBreakfast) {
log.info("沒有早餐,不干活");
waitbreakfastQueue.await();
}
// 此處和wait 相比不許再判斷
log.info("干活");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
},"t2").start();
lock.lock();
try {
log.info("喚醒早餐和香煙");
Sleeper.sleep(2);
hasBreakfast = true;
hasCigrette = true;
waitbreakfastQueue.signal();
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
本章小結
本章我們需要重點掌握的是
- 分析多線程訪問共享資源時,哪些代碼片段屬於臨界區
- 使用 synchronized 互斥解決臨界區的線程安全問題
- 掌握 synchronized 鎖對象語法
- 掌握 synchronzied 加載成員方法和靜態方法語法
- 掌握 wait/notify 同步方法
- 使用 lock 互斥解決臨界區的線程安全問題
- 掌握 lock 的使用細節:可打斷、鎖超時、公平鎖、條件變量
- 學會分析變量的線程安全性、掌握常見線程安全類的使用
- 了解線程活躍性問題:死鎖、活鎖、飢餓
- 應用方面
- 互斥:使用 synchronized 或 Lock 達到共享資源互斥效果
- 同步:使用 wait/notify 或 Lock 的條件變量來達到線程間通信效果
- 原理方面
- monitor、synchronized 、wait/notify 原理
- synchronized 進階原理
- park & unpark 原理
- 模式方面
- 同步模式之保護性暫停
- 異步模式之生產者消費者
- 同步模式之順序控制