目標
- 重點:
- 線程安全的概念
- 線程通信的方式與應用
- reactor線程模型
- 線程數量的優化
- jdk常用命令
- Netty框架的作用
- 難點
- java運行的原理
- 同步關鍵字的原理
- AQS的抽象
- JUC的源碼
- 網絡編程的概念
- GC機制
class文件內容
文件開頭有一個0xcafebabe特殊的標志。
包含版本、訪問標志、常量池、當前類、超級類、接口、字段、方法、屬性
把class文件的信息存在方法區里面,有了類 根據類創建對象,存儲在堆內存中,垃圾回收就是這里。這是線程共享的部分,隨虛擬機或者GC創建或銷毀。除了這個區域 還有線程獨占空間,隨線程生命周期而創建和銷毀。
-
方法區:用來存儲加載的類信息、常量、靜態變量、編譯后的代碼等數據。虛擬機規范中,這是一個邏輯區划,不同的虛擬機不同的實現。oracle的HotSpot在java7中,方法區放在永久代,java8放在元數據空間,並且通過GC機制對這個區域進行管理。
-
堆內存:分為老年代、新生代(Eden、From Survivor、To Survivor) JVM啟動時創建,存放對象的實例。垃圾回收主要管理堆內存。
-
虛擬機棧:每個線程都有一個私有的空間。線程棧由多個棧幀(Stack Frame)組成,一個線程會執行一個或多個方法,一個方法對應一個棧幀。
棧幀包括:局部變量表、操作數棧、動態鏈接、方法返回地址、附加信息。棧內存默認最大1M,超出拋出StackOverflowError
-
本地方法棧:使用Native本地方法准備的,超出也會報StackOverflowError,不同虛擬機廠商不同的實現。
-
程序計數器:記錄當前線程執行字節碼的位置,存儲的是字節碼指令地址,如果執行Native方法,計數器值會為空。
CPU同一時間只會執行一條線程中的指令。JVM多線程會輪流切換並分配CPU執行時間的方式。為了線程切換后,需要通過程序計數器來恢復正確的執行位置。
接下來是源文件編譯后字節碼相關的東西,暫不在本次筆記中記錄。【記得有本書是字節碼相關的解讀,立個flag,日后學習!】
線程狀態
6個狀態
- new:尚未啟動的線程的狀態
- runnable:可運行線程的線程狀態,等待CPU調度
- blocked:線程阻塞等待監視器鎖定的狀態,處於synchronized同步代碼塊或方法中被阻塞。
- waiting:等待線程的狀態,不帶超時的方式:object.wait Thread.join LockSupport.pard
- timed waiting : 具有指定等待時間的等待線程的線程狀態。帶超時的方式:Thread.sleep Object.wait Thread.join LockSupport.parkNanos LockSupport.parkUntil
- Terminated:終止線程的狀態,執行完畢或出現異常。
案例1
//新建 運行 終止 System.out.println("#####第一種狀態新建 運行 終止"); Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("thread1當前狀態:"+Thread.currentThread().getState().toString()); System.err.println("thread1執行了"); } }); System.out.println("沒調用start方法,thread1當前狀態:"+thread1.getState().toString()); thread1.start(); Thread.sleep(2000); System.out.println("等待兩秒,thread1當前狀態:"+thread1.getState().toString()); 復制代碼
#####第一種狀態新建 運行 終止
沒調用start方法,thread1當前狀態:NEW
thread1當前狀態:RUNNABLE
thread1執行了
等待兩秒,thread1當前狀態:TERMINATED
復制代碼
案例2
System.out.println("######第二種 新建 運行 等待 運行 終止(sleep)"); Thread thread2 = new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1500L); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("thread2當前狀態:"+Thread.currentThread().getState().toString()); System.err.println("thread2執行了"); } }); Thread.sleep(2000); System.out.println("沒調用start方法,thread2當前狀態:" + thread2.getState().toString()); thread2.start(); System.out.println("調用start方法,thread2當前狀態:" + thread2.getState().toString()); Thread.sleep(200); System.out.println("等待200毫秒,thread2當前狀態:" + thread2.getState().toString()); Thread.sleep(3000); System.out.println("等待3秒,thread2當前狀態:" + thread2.getState().toString()); 復制代碼
######第二種 新建 運行 等待 運行 終止(sleep)
沒調用start方法,thread2當前狀態:NEW
調用start方法,thread2當前狀態:RUNNABLE
等待200毫秒,thread2當前狀態:TIMED_WAITING thread2當前狀態:RUNNABLE thread2執行了 等待3秒,thread2當前狀態:TERMINATED 復制代碼
案例3
System.out.println("###第三種 新建 運行 阻塞 運行 終止"); Thread thread = new Thread(new Runnable() { @Override public void run() { synchronized (Test.class) { System.out.println("當前狀態:"+Thread.currentThread().getState().toString()); System.out.println("執行了"); } } }); synchronized (Test.class) { System.out.println("沒調用start方法,當前狀態:"+thread.getState().toString()); thread.start(); System.out.println("調用start方法,當前狀態:"+thread.getState().toString()); Thread.sleep(200); System.out.println("200毫秒后,當前狀態:"+thread.getState().toString()); } Thread.sleep(3000); System.out.println("3秒后,當前狀態:"+thread.getState().toString()); 復制代碼
###第三種 新建 運行 阻塞 運行 終止
沒調用start方法,當前狀態:NEW
調用start方法,當前狀態:RUNNABLE
200毫秒后,當前狀態:BLOCKED 當前狀態:RUNNABLE 執行了 3秒后,當前狀態:TERMINATED 復制代碼
線程終止
-
stop()
線程不安全,會強行終止線程的所有鎖定。
-
interrupt()
如果目標線程在調用Object class的wait join sleep方法時被阻塞,那么interrupt會生效,該線程的中斷狀態將被清除,拋出interruptedException異常。
如果目標線程是被IO或者NIO中的channel阻塞,IO操作會被中斷或者返回特殊異常值。達到終止的目的。
如果以上條件都不滿足,則會設置此線程的中斷狀態。
-
通過狀態位來判斷
public class StopThread extends Thread{ public volatile static boolean flag = true; public static void main(String[] args) throws InterruptedException { new Thread(()-> { while(flag) { try { System.out.println("運行中"); Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); Thread.sleep(3000); flag = false; System.out.println("結束"); } } 復制代碼
CPU緩存及內存屏障
CPU有三級緩存,從123到內存再到硬盤。但是存在一個問題,如果多核cpu讀取同樣的數據進行緩存計算,最終寫入主內存的是以哪個為准?
這個時候就出來了一個緩存一致性協議,單個cpu對緩存中的數據做了改動,需要通知給其他cpu。
CPU還有一個性能優化手段,運行時指令重排,把讀緩存命令優先執行。
兩個問題:
- 緩存中的數據與主內存中的數據並不是實時同步的,各個cpu間緩存的數據也不是實時同步的,在同一個時間點,各個cpu看到的同一內存地址的數據的值可能是不一致的。
- 多核多線程中,指令邏輯無法分辨因果關系,可能出現亂序執行。
解決辦法:內存屏障
- 寫內存屏障:在指令后插入Store Barrier,能讓寫入緩存中的最新數據更新寫入主內存,讓其他線程可見。
- 讀內存屏障:在指令前插入Load Barrier,可以讓高速緩存中的數據失效,強制重新從主內存中加載數據。
線程通信
要想實現多個線程之間的協同,如 線程執行先后順序,獲取某個線程執行的結果等,設計線程之間相互通信。
-
文件共享
-
網絡共享
-
共享變量
-
jdk提供的線程協調API
suspend/resume、wait/notify、park/unpark
JDK中對於需要多線程協作的,提供了對應API支持,典型場景是:生產者-消費者模型(線程阻塞、線程喚醒)
suspend/resume
-
同步代碼中使用,suspend掛起之后並不會釋放鎖,容易出現死鎖。
-
suspend比resume后執行
被棄用。
wait/notify notifyAll
只能由同一對象鎖的持有者線程調用,也就是寫在同步塊里,否則會拋出illegalMonitorStateException異常。
wait:加入該對象的等待集合中,並且放棄當前持有的對象鎖。
雖然wait會自動解鎖,但是對順序有要求,如果在notify被調用之后才開始wait方法的調用,線程會永遠處於WAITING狀態。
//正常的wait public void waitNotify() throws Exception { new Thread(()->{ if(baozidian == null) { synchronized (this) { System.out.println("進入等待"); } } System.out.println("買到包子"); }).start(); Thread.sleep(3000); baozidian = new Object(); synchronized (this) { this.notify(); System.out.println("通知"); } } 結果: 進入等待 買到包子 通知 復制代碼
park/unpark
線程調用park則等待許可,unpark為指定線程提供許可。
不要求方法的調用順序。但不會釋放鎖,所以在同步代碼塊中使用可能會死鎖。
/** 死鎖的park/unpark */ public void parkUnparkDeadLockTest() throws Exception { // 啟動線程 Thread consumerThread = new Thread(() -> { if (baozidian == null) { // 如果沒包子,則進入等待 System.out.println("1、進入等待"); // 當前線程拿到鎖,然后掛起 synchronized (this) { LockSupport.park(); } } System.out.println("2、買到包子,回家"); }); consumerThread.start(); // 3秒之后,生產一個包子 Thread.sleep(3000L); baozidian = new Object(); // 爭取到鎖以后,再恢復consumerThread synchronized (this) { LockSupport.unpark(consumerThread); } System.out.println("3、通知消費者"); } 結果: 1、進入等待 復制代碼
注意:最好不要使用if語句來判斷是否進入等待狀態。
官方建議應該在循環體中檢查等待狀態,原因是處於等待狀態的線程可能會收到錯誤警報和偽喚醒。
偽喚醒是指線程因為更底層的原因導致的。
線程封閉
並不是所有時候 都要用到共享數據,shuju被封閉在各自的線程中,就不需要同步。
具體體現有:ThreadLocal、局部變量
ThreadLocal
是一個線程級別的變量,每個線程都有一個ThreadLocal,就是每個線程都擁有了自己獨立的一個變量,競爭條件被徹底消除了,在並發模式下,是絕對安全的變量。
線程池
-
線程在java中是一個對象,更是操作系統的資源,創建銷毀都需要時間。
-
java對象占用堆內存,操作系統線程占用系統內存,根據jvm規范,一個線程默認最大棧大小是1M,線程過多會消耗很多內存。
-
操作系統需要頻繁切換線程上下文。
----->線程池就是為了解決這些問題。
線程池概念
- 線程池管理器:創建並管理,創建、銷毀線程池、添加新任務
- 工作線程:在沒有任務時處於等待狀態,可以循環執行任務
- 任務接口:每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等。
- 任務隊列:存放沒有處理的任務。提供一種緩沖機制。
線程池API-接口定義和實現類
類型 | 名稱 | 描述 |
---|---|---|
接口 | Executor | 最上層的接口,定義了**執行任務的方法execute ** |
接口 | ExecutorService | 繼承了Executor接口,拓展了Callable、Future、關閉方法 |
接口 | ScheduledExecutorService | 繼承了ExecutorService接口,增加了定時任務相關的方法 |
實現類 | ThreadPoolExecutor | 基礎、標准的線程池實現 |
實現類 | ScheduledThreadPoolExecutor | 繼承了ThreadPoolExecutor,實現了 ScheduledExecutorService中相關定時任務的方法 |
代碼示例
公共代碼塊:
/** * 測試:提交15個執行時間需要三秒,看線程池的情況 * @param threadPoolExecutor */ public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception{ for (int i=0;i<15;i++){ int n = i; threadPoolExecutor.submit(new Runnable() { @Override public void run() { try { System.out.println("開始執行:"+n); Thread.sleep(3000l); System.err.println("執行結束:"+n); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("提交任務成功:"+i); } Thread.sleep(500l); System.out.println("當前線程池的數量:"+threadPoolExecutor.getPoolSize()); System.out.println("當前等待隊列的數量:"+threadPoolExecutor.getQueue().size()); Thread.sleep(15000l); System.out.println("當前線程池的數量:"+threadPoolExecutor.getPoolSize()); System.out.println("當前等待隊列的數量:"+threadPoolExecutor.getQueue().size()); } 復制代碼
測試方法1:
/** * 1、線程池信息: 核心線程數量5,最大數量10,無界隊列,超出核心線程數量的線程存活時間:5秒, 指定拒絕策略的 * * @throws Exception */ public void threadPoolExecutorTest1() throws Exception{ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,10,5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); testCommon(threadPoolExecutor); } //預計的結果:線程池數量5,其他進入等待隊列 復制代碼
測試方法1輸出結果: 提交任務成功:0 開始執行:0 提交任務成功:1 開始執行:1 提交任務成功:2 開始執行:2 提交任務成功:3 提交任務成功:4 提交任務成功:5 提交任務成功:6 提交任務成功:7 提交任務成功:8 提交任務成功:9 提交任務成功:10 提交任務成功:11 提交任務成功:12 提交任務成功:13 提交任務成功:14 開始執行:3 開始執行:4 當前線程池的數量:5 當前等待隊列的數量:10 執行結束:2 執行結束:0 執行結束:4 執行結束:1 執行結束:3 開始執行:5 開始執行:6 開始執行:7 開始執行:8 開始執行:9 開始執行:10 開始執行:11 開始執行:12 開始執行:13 開始執行:14 執行結束:5 執行結束:6 執行結束:8 執行結束:7 執行結束:9 執行結束:13 執行結束:10 執行結束:14 執行結束:12 執行結束:11 當前線程池的數量:5 當前等待隊列的數量:0 復制代碼
這里有一個問題就是,最大線程數量設置的是10,當前線程池的數量為什么達不到最大線程數量?
這就需要對execute的過程有個了解。
測試方法2:
/** * 2、 線程池信息: 核心線程數量5,最大數量10,隊列大小3,超出核心線程數量的線程存活時間:5秒, 指定拒絕策略的 * * @throws Exception */ public void threadPoolExecutorTest2() throws Exception{ // 創建一個 核心線程數量為5,最大數量為10,等待隊列最大是3 的線程池,也就是最大容納13個任務。 // 如果不指定拒絕策略,默認的策略是拋出RejectedExecutionException異常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("任務決絕執行。"); } }); testCommon(threadPoolExecutor); } //執行預期結果: //線程池數量5,3個進入等待,這時候核心線程數量和隊列都滿了,會加開5個任務線程(注意,5秒后沒任務執行會銷毀),因為最大線程是10 //最大10+等待隊列3 總共13,剩下兩個拒絕執行 復制代碼
測試方法3:Executors.newFixedThreadPool(int nThreads)
對於無界隊列,最大線程數量實際上是不起作用的。
/** * 3、 線程池信息: 核心線程數量5,最大數量5,無界隊列,超出核心線程數量的線程存活時間:5秒 * * @throws Exception */ private void threadPoolExecutorTest3() throws Exception { // 和Executors.newFixedThreadPool(int nThreads)一樣的 ThreadPoolExecutor threadPoolExecutor1 = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); // ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, // new LinkedBlockingQueue<Runnable>()); testCommon(threadPoolExecutor1); // 預計結:線程池線程數量為:5,超出數量的任務,其他的進入隊列中等待被執行 } 復制代碼
Executors.newFixedThreadPool()
的內部實現實際上就是 注釋掉的部分:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 復制代碼
測試方法4:Executors.newCachedThreadPool()
此種方法適用於不可預估數量的情況
/** * 4、 線程池信息: * 核心線程數量0,最大數量Integer.MAX_VALUE,SynchronousQueue隊列,超出核心線程數量的線程存活時間:60秒 * * @throws Exception */ private void threadPoolExecutorTest4() throws Exception { // SynchronousQueue,實際上它不是一個真正的隊列,因為它不會為隊列中元素維護存儲空間。與其他隊列不同的是,它維護一組線程,這些線程在等待着把元素加入或移出隊列。 // 在使用SynchronousQueue作為工作隊列的前提下,客戶端代碼向線程池提交任務時, // 而線程池中又沒有空閑的線程能夠從SynchronousQueue隊列實例中取一個任務, // 那么相應的offer方法調用就會失敗(即任務沒有被存入工作隊列)。 // 此時,ThreadPoolExecutor會新建一個新的工作者線程用於對這個入隊列失敗的任務進行處理(假設此時線程池的大小還未達到其最大線程池大小maximumPoolSize)。 // 和Executors.newCachedThreadPool()一樣的 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); testCommon(threadPoolExecutor); // 預計結果: // 1、 線程池線程數量為:15,超出數量的任務,其他的進入隊列中等待被執行 // 2、 所有任務執行結束,60秒后,如果無任務可執行,所有線程全部被銷毀,池的大小恢復為0 Thread.sleep(60000L); System.out.println("60秒后,再看線程池中的數量:" + threadPoolExecutor.getPoolSize()); } 復制代碼
測試方法4輸出結果: 提交任務成功:0 提交任務成功:1 提交任務成功:2 提交任務成功:3 提交任務成功:4 提交任務成功:5 提交任務成功:6 提交任務成功:7 提交任務成功:8 提交任務成功:9 提交任務成功:10 提交任務成功:11 提交任務成功:12 提交任務成功:13 提交任務成功:14 開始執行:3 開始執行:2 開始執行:6 開始執行:7 開始執行:10 開始執行:11 開始執行:0 開始執行:1 開始執行:5 開始執行:4 開始執行:8 開始執行:9 開始執行:12 開始執行:13 開始執行:14 當前線程池的數量:15 當前等待隊列的數量:0 執行結束:3 執行結束:2 執行結束:6 執行結束:7 執行結束:10 執行結束:9 執行結束:0 執行結束:1 執行結束:5 執行結束:4 執行結束:14 執行結束:8 執行結束:11 執行結束:12 執行結束:13 當前線程池的數量:15 當前等待隊列的數量:0 60秒后,再看線程池中的數量:0 復制代碼
測試方法5:一次性定時任務
/** * 5、 定時執行線程池信息:3秒后執行,一次性任務,到點就執行 <br/> * 核心線程數量5,最大數量Integer.MAX_VALUE,DelayedWorkQueue延時隊列,超出核心線程數量的線程存活時間:0秒 * * @throws Exception */ public void threadPoolExecutorTest5() throws Exception{ //Executors.newScheduledThreadPool() 一樣的 ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5); scheduledThreadPoolExecutor.schedule(new Runnable() { @Override public void run() { System.out.println("任務被執行,現在時間:"+ DateUtil.now()); } },3000,TimeUnit.MILLISECONDS); System.out.println("定時任務,提交成功,時間是:"+DateUtil.now()); } 復制代碼
測試方法5輸出結果:
定時任務,提交成功,時間是:2021-11-27 13:42:43 任務被執行,現在時間:2021-11-27 13:42:46 復制代碼
測試方法6:周期定時任務
/** * 6、 定時執行線程池信息:線程固定數量5 ,<br/> * 核心線程數量5,最大數量Integer.MAX_VALUE,DelayedWorkQueue延時隊列,超出核心線程數量的線程存活時間:0秒 * * @throws Exception */ public void threadPoolExecutorTest6() throws Exception{ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5); //第一種方式:scheduleAtFixedRate,如果執行時間超過了周期時間 //執行完畢后,立即執行,不考慮延遲時間 scheduledThreadPoolExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Thread.sleep(3000l); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("任務1被執行,現在時間:"+DateUtil.now()); } },2000,1000,TimeUnit.MILLISECONDS); //第二種方式,scheduleWithFixedDelay //如果執行時間超過了周期時間,執行完畢后,加上延遲時間后再執行 scheduledThreadPoolExecutor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(3000l); }catch (InterruptedException e){ e.printStackTrace(); } System.out.println("任務2被執行,現在時間:"+DateUtil.now()); } },2000,1000,TimeUnit.MILLISECONDS); } 復制代碼
測試方法6輸出結果:
//可以看出,任務1每隔3秒執行一次,任務2每隔4秒執行一次 任務1被執行,現在時間:2021-11-27 14:08:45 任務2被執行,現在時間:2021-11-27 14:08:45 任務1被執行,現在時間:2021-11-27 14:08:48 任務2被執行,現在時間:2021-11-27 14:08:49 任務1被執行,現在時間:2021-11-27 14:08:51 任務2被執行,現在時間:2021-11-27 14:08:53 任務1被執行,現在時間:2021-11-27 14:08:54 任務1被執行,現在時間:2021-11-27 14:08:57 任務2被執行,現在時間:2021-11-27 14:08:57 復制代碼
終止線程的兩種方式
scheduledThreadPoolExecutor.shutdown();
//第二種會返回尚未執行的任務 List<Runnable> runnableList = scheduledThreadPoolExecutor.shutdownNow(); 復制代碼
本文同步公眾號【劉墨澤】,歡迎大家關注聊天!