Java 多線程的一次整理


一天沒有出過家門,實屬無聊,沒事瞎寫寫

1. 基本概念

1.1 多進程和多線程的概念

程序是由指令和數據組成,指令要運行,數據要加載,指令被 CPU 加載運行,數據被加載到內存,指令運行時可由 CPU 調度硬盤、網絡等設備。一個線程就是一個指令,CPU 調度的最小單位,一個進程就是一系列的指令流,由 CPU 一條一條執行

  • 進程是程序在計算機上的一次執行活動。當你運行一個程序,你就啟動了一個進程。一系列指令

  • 線程是進程中的實際運行單位,是獨立運行與進程之中的子任務。一條單獨的指令

1.2 並行與並發

並發和並行都是同時處理多路請求,目的是最大化 CPU 的利用率。並行是指兩個或者多個事件在同一時刻發生,並發是指多個事件在同一事件間隔內發生

  • 並發是指單核 CPU 運行多線程時,時間片進行很快的切換,線程輪流執行 CPU

  • 並行是指多核 CPU 運行多線程時,真正的在同一時刻運行

1.3 計算機存儲體系

在很早之前,CPU 的頻率與內存的頻率在一個層面上,上世紀 90 年代,CPU 的頻率大大提升,但內存的頻率沒有得到提升,導致 CPU 的運行速度比內存讀寫速度快很多,使 CPU 花費很長的時間等待數據的到來或把數據寫入到內存中。為了解決 CPU 運算速度與內存讀寫速度不匹配的矛盾,就出現了 CPU 緩存,CPU 緩存分為三個級別,分別是 L1、L2、L3,級別越小越接近 CPU,速度也越來越快,容量也越來越小

 

 

 

多核 CPU 的情況下有多個一級緩存,如何保證緩存內部數據一致性,不讓系統數據混亂,解決方案就是緩存一致性協議(Modified Exclusive Shared Or Invalid,MESI)或者鎖住總線,其中鎖住總線,效率非常低下CPU 串行,所以實際使用 MESI。MESI 通過四種狀態來進行標記

狀態 描述 監聽任務 狀態轉換
M 修改(Modified) 該Cache line有效,數據被修改了,和內存中的數據不一致,數據只存在於本Cache中。 緩存行必須時刻監聽所有試圖讀該緩存行相對就主存的操作,這種操作必須在緩存將該緩存行寫回主存並將狀態變成S(共享)狀態之前被延遲執行。 當被寫回主存之后,該緩存行的狀態會變成獨享(exclusive)狀態。
E 獨享、互斥(Exclusive) 該Cache line有效,數據和內存中的數據一致,數據只存在於本Cache中。 緩存行也必須監聽其它緩存讀主存中該緩存行的操作,一旦有這種操作,該緩存行需要變成S(共享)狀態。 當CPU修改該緩存行中內容時,該狀態可以變成Modified狀態
S 共享(Shared) 該Cache line有效,數據和內存中的數據一致,數據存在於很多Cache中。 緩存行也必須監聽其它緩存使該緩存行無效或者獨享該緩存行的請求,並將該緩存行變成無效(Invalid)。 當有一個CPU修改該緩存行時,其它CPU中該緩存行可以被作廢(變成無效狀態 Invalid)。
I 無效(Invalid) 該Cache line無效。

對於 M 和 E 狀態而言總是精確的,他們在和該緩存行的真正狀態是一致的,而 S 狀態可能是非一致的

1.4 線程的狀態

sleep、yield 和 join 區別:

  • sleep 執行后線程進入阻塞狀態,當前線程休眠一段時間

  • yield 執行后線程進入就緒狀態,使當前線程和所有等待的線程一起進行競爭 CPU 資源

  • join執行線程進入阻塞狀態,t.join 表示阻塞調用此方法的線程,直到線程 t 完成,方可繼續執行。底層實際調用 wait 方法

 

 

 

  1. 新建狀態(New):線程對象被創建后,就進入了新建狀態。例如:Thread thread = new Thread()

  2. 就緒狀態(Runnable):也被稱為"可執行狀態"。線程對象唄創建后,其它線程調用了該對象的 start() 方法,從而就啟動該線程。例如T.stat(),處於就緒狀態的線程,隨時可能被CPU調度執行

  3. 運行狀態(Running):線程獲取 CPU 權限進行執行。需要注意的是,線程只能從就緒狀態進入到運行狀態

  4. 阻塞狀態(Blocked):阻塞狀態是線程放棄CPU使用權,暫時停止運行,直到線程進入就緒狀態,才有機會轉到運行狀態。阻塞的情況分三種:

    1. 等待阻塞:通過調用線程的wait() 方法,讓線程等待某工作的完成

    2. 同步阻塞:線程在獲取 synchronized 同步鎖失敗,它會進入同步阻塞狀態

    3. 其它阻塞:通過調用線程的 sleep() 或 join() 或發出 I/O 請求時,線程會進入到阻塞狀態。當 sleep() 狀態超時,join() 等待線程終止或者超時、或者 I/O 處理完畢時,線程重新轉入到就緒狀態

  5. 死亡狀態(Dead):線程執行完了或者因異常退出了 run() 方法,該線程結束生命周期

2.多線程的實現方式

2.1繼承 Thread 類創建線程

Thead 類本質上是實現了 Runnable 接口的一個實例,代表一個線程的實例

 1 /**
 2  * @description: 多線程實現方法1:集成Thread類
 3  * @author: DZ
 4  **/
 5 @Slf4j
 6 public class MyThread1 extends Thread {
 7     @Override
 8     public void run() {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11     }
12 13     public static void main(String[] args) {
14         MyThread1 t1 = new MyThread1();
15         MyThread1 t2 = new MyThread1();
16         t1.start();
17         t2.start();
18     }
19 }

 

2.2實現 Runnable 接口創建線程

如果自己的類已經 extends 另一個類,就無法直接 extends Thread,此時可以通過實現 Runnable 接口

避免單繼承的局限性、適合多個相同的線程去處理同一個資源

 1 /**
 2  * @description: 多線程實現方法2:實現Runnable接口
 3  **/
 4 @Slf4j
 5 public class MyThread2 implements Runnable {
 6  7     @Override
 8     public void run() {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11     }
12 13     public static void main(String[] args) {
14         MyThread2 m = new MyThread2();
15         //1.調用run方法
16         Thread t1 = new Thread(m);
17         Thread t2 = new Thread(m);
18         t1.start();
19         t2.start();
20     }
21 }

 

2.3實現 Callable 接口,通過 Future Task 包裝器來創建 Thread 線程

可以獲取線程的返回值

 1 /**
 2  * @description: 多線程實現方法2:實現Callable接口
 3  * @author: DZ
 4  **/
 5 @Slf4j
 6 public class MyThread3 implements Callable {
 7     @Override
 8     public String call() throws Exception {
 9         log.info("MyThread1");
10         log.info("MyThread2");
11         return "MyThread3";
12     }
13 14     public static void main(String[] args) throws ExecutionException, InterruptedException {
15         MyThread3 m = new MyThread3();
16         //存儲返回值,其中泛型為返回值的類型
17         FutureTask<String> futureTask = new FutureTask<>(m);
18         new Thread(futureTask).start();
19         System.out.println(futureTask.get());
20     }
21 22 }

 

2.4通過線程池

2.4.1 線程池的主要參數

1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
2   this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
3      Executors.defaultThreadFactory(), defaultHandler);
4 }
  • corePoolSize

當向線程池提交一個任務時,若線程池已創建的線程數小於corePoolSize,即便此時存在空閑線程,也會通過創建一個新線程來執行該任務,直到已創建的線程數大於或等於corePoolSize時,(除了利用提交新任務來創建和啟動線程(按需構造),也可以通過 prestartCoreThread() 或 prestartAllCoreThreads() 方法來提前啟動線程池中的基本線程。)

  • maximumPoolSize

線程池所允許的最大線程個數。當隊列滿了,且已創建的線程數小於maximumPoolSize,則線程池會創建新的線程來執行任務。另外,對於無界隊列,可忽略該參數

  • keepAliveTime

當線程池中線程數大於核心線程數時,線程的空閑時間如果超過線程存活時間,那么這個線程就會被銷毀,直到線程池中的線程數小於等於核心線程數

  • workQueue

用於傳輸和保存等待執行任務的阻塞隊列

  • ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列

  • LinkedBlockingQueue 一個由鏈表結構組成的有界阻塞隊列

  • PriorityBlockingQueue 一個支持優先級排序的無界阻塞隊列

  • DelayQueue 一個使用優先級隊列實現的無界阻塞隊列

  • SynchronousQueue 一個不存儲元素的阻塞隊列

  • LinkedTransferQueue 一個由鏈表結構組成的無界阻塞隊列

  • LinkedBlockingDeque 一個由鏈表結構組成的雙向阻塞隊列

作用:阻塞隊列可以保證任務隊列中沒有任務時阻塞獲取任務的線程,使得線程進入wait狀態,釋放cpu資源。當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執行。使得在線程不至於一直占用cpu資源。

  • threadFactory

用於創建新線程。threadFactory創建的線程也是采用new Thread()方式,threadFactory創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池內的線程編號)

  • handler

當線程池和隊列都滿了,再加入線程會執行此策略

AbortPolicy: 直接拋出異常,阻止線程正常運行

1 public static class AbortPolicy implements RejectedExecutionHandler {
2     public AbortPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       throw new RejectedExecutionException("Task " + r.toString() +" rejected from " + e.toString());
5     }
6   }

 

CallerRunsPolicy: 直接在方法的調用線程中執行,除非線程池已關閉

1 public static class CallerRunsPolicy implements RejectedExecutionHandler {
2     public CallerRunsPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       if (!e.isShutdown()) {
5         r.run();
6       }
7     }
8   }

 

DiscardPolica: 丟棄當前的線程任務而不做任何處理。如果系統允許在資源不足的情況下棄部分任務,則這將是保障系統安全、穩定的一種很好的方案

1 public static class DiscardPolicy implements RejectedExecutionHandler {
2     public DiscardPolicy() {}
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4     }
5   }

 

DiscardOlderPolicy: 移除線程隊列中最早(老)的一個線程任務,並嘗試提交當前任務

1 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2     public DiscardOldestPolicy() { }
3     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
4       if (!e.isShutdown()) {
5         e.getQueue().poll();// 最早(老)的任務出隊列
6         e.execute(r);
7       }
8     }
9   }

 

2.4.2如何設置線程池

  • CPU密集型

盡量使用較小的線程池,一般為CPU核心數+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的線程數,會造成CPU過度切換。

  • IO密集型任務

可以使用稍大的線程池,一般為2*CPU核心數。 IO密集型任務CPU使用率並不高,因此可以讓CPU在等待IO的時候有其他線程去處理別的任務,充分利用CPU時間

  • 混合型任務

線程數 = CPU核心數 * (1+平均等待時間 / 平均工作時間)

2.4.3 代碼示例

 1 import lombok.extern.slf4j.Slf4j;
 2 import org.junit.Test;
 3  4 import java.util.concurrent.*;
 5  6 /**
 7  * @description: 通過線程池實現多線程
 8  * @author: DZ
 9  **/
10 @Slf4j
11 public class MyThread4 {
12     //通常使用方式,定義前5個參數即可,其余默認
13     private ThreadPoolExecutor threadPoolExecutor0 = new ThreadPoolExecutor(5, 10, 60,
14             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
15 16     //所有參數均自定義(增加工廠ThreadFactory和拒絕方式Handle)
17     private ThreadPoolExecutor threadPoolExecutor1 = new ThreadPoolExecutor(5, 10, 60,
18             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
19         @Override
20         public Thread newThread(Runnable r) {
21             Thread thread = new Thread(r);
22             log.info("我是線程{}", thread.getName());
23             return thread;
24         }
25     }, new RejectedExecutionHandler() {
26         @Override
27         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
28             log.info("線程被去掉{}", new Thread(r).getName());
29         }
30     });
31 32     //所有參數均自定義,拒絕方式使用默認new ThreadPoolExecutor.AbortPolicy(),new ThreadPoolExecutor.DiscardOldestPolicy(),new ThreadPoolExecutor.CallerRunsPolicy(),new ThreadPoolExecutor.DiscardPolicy()
33     private ThreadPoolExecutor threadPoolExecutor2 = new ThreadPoolExecutor(5, 10, 60,
34             TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10), new ThreadFactory() {
35         @Override
36         public Thread newThread(Runnable r) {
37             Thread thread = new Thread(r);
38             log.info("我是線程{}", thread.getName());
39             return thread;
40         }
41     }, new ThreadPoolExecutor.DiscardPolicy());
42 43     @Test
44     public void testRunnable() {
45         threadPoolExecutor0.execute(new Runnable() {
46             @Override
47             public void run() {
48                 log.info("MyThread1");
49                 log.info("MyThread2");
50             }
51         });
52     }
53 54     @Test
55     public void testCallable() throws ExecutionException, InterruptedException {
56         Future<String> submit = threadPoolExecutor1.submit(new Callable<String>() {
57             @Override
58             public String call() throws Exception {
59                 log.info("MyThread1");
60                 log.info("MyThread2");
61                 return "MyThread4";
62             }
63         });
64         System.out.println(submit.get());
65     }
66 }

 

3 常見的線程池

3.1 FixedThreadPool

適用於任務數量已知,且相對耗時的任務

1 public static ExecutorService newFixedThreadPool(int nThreads) { 
2     return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 
3 }

3.2 SingleThreadExecutor

這種線程池非常適合所有任務都需要按被提交的順序來執行的場景,是個單線程的串行。

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
4     }
5 6     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
7         return new FinalizableDelegatedExecutorService
8             (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
9     }

3.3 CachedThreadPool

核心線程池為0,存活時間為60s,適合小而快的任務

1   public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
3     }
4 5     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
6         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);
7     }

3.4 ScheduledThreadPool

支持定時或者周期執行的任務

 1  public ScheduledThreadPoolExecutor(int corePoolSize) {
 2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
 3     }
 4     public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
 5         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);
 6     }
 7     public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {
 8         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);
 9     }
10     public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
11         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
12     }

eg

 1 public static void main(String[] args) {
 2         ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
 3         // 1. 延遲一定時間執行一次
 4         service.schedule(() ->{
 5             System.out.println("schedule ==> 延遲一定時間執行一次");
 6         },2, TimeUnit.SECONDS);
 7         // 2. 按照固定頻率周期執行
 8         service.scheduleAtFixedRate(() ->{
 9             System.out.println("scheduleAtFixedRate ==> 按照固定頻率周期執行");
10         },2,3,TimeUnit.SECONDS);
11         //3. 按照固定頻率周期執行
12         service.scheduleWithFixedDelay(() -> {
13             System.out.println("scheduleWithFixedDelay ==> 按照固定頻率周期執行");
14         },2,5,TimeUnit.SECONDS);
15     }
  • 首先我們看第一個方法 schedule , 它有三個參數,第一個參數是線程任務,第二個delay 表示任務執行延遲時長,第三個unit 表示延遲時間的單位,如上面代碼所示就是延遲兩秒后執行任務

1 public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
  • 第二個方法是 scheduleAtFixedRate 如下, 它有四個參數,command 參數表示執行的線程任務 ,initialDelay 參數表示第一次執行的延遲時間,period 參數表示第一次執行之后按照多久一次的頻率來執行,最后一個參數是時間單位。如上面案例代碼所示,表示兩秒后執行第一次,之后按每隔三秒執行一次

1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
  • 第三個方法是 scheduleWithFixedDelay 如下,它與上面方法是非常類似的,也是周期性定時執行, 參數含義和上面方法一致。這個方法和 scheduleAtFixedRate 的區別主要在於時間的起點計時不同。scheduleAtFixedRate 是以任務開始的時間為時間起點來計時,時間到就執行第二次任務,與任務執行所花費的時間無關;而 scheduleWithFixedDelay 是以任務執行結束的時間點作為計時的開始。如下所示

1 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

3.5 SingleThreadEcheduledExecutor

它實際和 ScheduledThreadPool。線程池非常相似,它只是 ScheduledThreadPool的一個特例,內部只有一個線程,它只是將 ScheduledThreadPool 的核心線程數設置為了 1。如源碼所示:

1  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
2         return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
3     }

3.6 ForkJoinPool

這是一個在 JDK7引入的新新線程池,它的主要特點是可以充分利用多核CPU,可以把一個任務拆分為多個子任務,這些子任務放在不同的處理器上並行執行,當這些子任務執行結束后再把這些結果合並起來,這是一種分治思想。

3.7 newWorkStealingPool

WorkStealingPool背后是使用ForkJoinPool實現的(JDK8)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM