1. 概述
多線程編程允許我們並發運行線程,每個線程可以處理不同的任務。因此,它可以最佳地利用資源,特別是當我們的計算機具有多個多核 CPU 或多個 CPU 時。有時,我們想控制多個線程同時啟動。
在本教程中,我們將首先了解要求,尤其是“完全相同的時間”的含義。此外,我們將討論如何在 Java 中同時啟動兩個線程。
2. 了解需求
我們的要求是:“同時啟動兩個線程。”
這個要求看起來很容易理解。但是,如果我們仔細考慮一下,甚至可以完全 同時啟動兩個線程嗎?
首先,每個線程都會消耗CPU時間來工作。因此,如果我們的應用程序運行在具有單核 CPU 的計算機上,則不可能完全 同時啟動兩個線程。
如果我們的計算機有一個多核CPU或多個CPU,兩個線程可以在可能開始確切同一時間。但是,我們無法在 Java 端對其進行控制。
這是因為當我們在 Java 中使用線程時,Java 線程調度依賴於操作系統的線程調度。因此,不同的操作系統可能會以不同的方式處理它。
此外,如果我們以更嚴格的方式討論“完全相同的時間”,根據愛因斯坦的狹義相對論:
如果兩個不同的事件在空間上是分開的,就不可能在絕對意義上說這兩個不同的事件同時發生。
無論我們的 CPU 位於主板上或位於 CPU 中的內核多近,總有空間。因此,我們不能確保兩個線程完全同時啟動。
那么,這是否意味着該要求無效?
不,這是一個有效的要求。即使我們不能讓兩個線程開始EXACT同一時間,我們可以得到非常接近通過一些同步技術。
當我們需要兩個線程“同時”啟動時,這些技術可以在大多數實際情況下幫助我們。
在本教程中,我們將探索兩種解決此問題的方法:
- 使用CountDownLatch類
- 使用CyclicBarrier類
- 使用Phaser類
所有方法都遵循相同的想法:我們不會真正同時啟動兩個線程。相反,我們在線程啟動后立即阻塞線程並嘗試同時恢復它們的執行。
由於我們的測試將與線程調度相關,因此值得一提的是本教程中運行測試的環境:
- CPU:Intel(R) Core(TM) i7-8850H CPU。處理器時鍾在 2.6 和 4.3 GHz 之間(4.1 4 核,4 GHz 6 核)
- 操作系統:64 位 Linux 內核版本 5.12.12
- Java:Java 11
現在,讓我們看看CountDonwLatch和CyclicBarrier的作用。
3. 使用CountDownLatch類
CountDownLatch是 Java 5 中作為java.util.concurrent包的一部分引入的同步器。通常,我們使用CountDownLatch來阻塞線程,直到其他線程完成它們的任務。
簡單地說,我們在閂鎖對象中設置一個計數,並將閂鎖對象與一些線程相關聯。當我們啟動這些線程時,它們將被阻塞,直到閂鎖的計數變為零。
另一方面,在其他線程中,我們可以控制在什么情況下我們減少計數,讓被阻塞的線程恢復,例如,當主線程中的某些任務完成時。
3.1. 工作線程
現在,讓我們看看如何使用CountDownLatch類解決我們的問題。
首先,我們將創建我們的Thread類。我們稱之為WorkerWithCountDownLatch:
public class WorkerWithCountDownLatch extends Thread { private CountDownLatch latch; public WorkerWithCountDownLatch(String name, CountDownLatch latch) { this.latch = latch; setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the latch...\n", getName()); latch.await(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (InterruptedException e) { // handle exception } }
我們在WorkerWithCountDownLatch 類中添加了一個閂鎖對象。首先我們來了解一下latch對象的作用。
在run()方法中,我們調用了latch.await()方法。 這意味着,如果我們啟動工作線程,它將檢查閂鎖的計數。 線程將被阻塞,直到計數為零。
這樣,我們就可以在主線程中創建一個count=1的CountDownLatch(1)閂鎖,並將閂鎖對象與我們想要同時啟動的兩個工作線程相關聯。
當我們希望兩個線程繼續執行它們的實際工作時,我們通過 在主線程中調用latch.countDown()來釋放閂鎖。
接下來我們來看看主線程是如何控制這兩個工作線程的。
3.2. 主線程
我們將在usingCountDownLatch()方法中實現主線程:
private static void usingCountDownLatch() throws InterruptedException { System.out.println("==============================================="); System.out.println(" >>> Using CountDownLatch <<<<"); System.out.println("==============================================="); CountDownLatch latch = new CountDownLatch(1); WorkerWithCountDownLatch worker1 = new WorkerWithCountDownLatch("Worker with latch 1", latch); WorkerWithCountDownLatch worker2 = new WorkerWithCountDownLatch("Worker with latch 2", latch); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now release the latch:"); System.out.println("-----------------------------------------------"); latch.countDown(); }
現在,讓我們從main()方法調用上面的usingCountDownLatch ()方法。當我們運行 main()方法時,我們會看到輸出:
===============================================
>>> Using CountDownLatch <<<<
===============================================
[ Worker with latch 1 ] created, blocked by the latch
[ Worker with latch 2 ] created, blocked by the latch
-----------------------------------------------
Now release the latch:
-----------------------------------------------
[ Worker with latch 2 ] starts at: 2021-06-27T16:00:52.268532035Z
[ Worker with latch 1 ] starts at: 2021-06-27T16:00:52.268533787Z
如上面的輸出所示,兩個工作線程幾乎同時啟動。兩個開始時間之間的差異小於兩微秒。
4. 使用CyclicBarrier 類
所述的CyclicBarrier類是Java 5.基本上引入另一個同步器,CyclicBarrier允許等待線程的固定數量為互相繼續執行之前到達一個公共點。
接下來,讓我們看看如何使用CyclicBarrier類解決我們的問題。
4.1. 工作線程
我們先來看看我們的工作線程的實現:
public class WorkerWithCyclicBarrier extends Thread { private CyclicBarrier barrier; public WorkerWithCyclicBarrier(String name, CyclicBarrier barrier) { this.barrier = barrier; this.setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the barrier\n", getName()); barrier.await(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (InterruptedException | BrokenBarrierException e) { // handle exception } } }
實現非常簡單。我們將屏障對象與工作線程相關聯。當線程啟動時,我們立即調用barrier.await() 方法。
這樣,工作線程就會被阻塞,等待各方調用barrier.await()恢復。
4.2. 主線程
接下來我們看看主線程中如何控制兩個工作線程的恢復:
private static void usingCyclicBarrier() throws BrokenBarrierException, InterruptedException { System.out.println("\n==============================================="); System.out.println(" >>> Using CyclicBarrier <<<<"); System.out.println("==============================================="); CyclicBarrier barrier = new CyclicBarrier(3); WorkerWithCyclicBarrier worker1 = new WorkerWithCyclicBarrier("Worker with barrier 1", barrier); WorkerWithCyclicBarrier worker2 = new WorkerWithCyclicBarrier("Worker with barrier 2", barrier); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now open the barrier:"); System.out.println("-----------------------------------------------"); barrier.await(); }
我們的目標是讓兩個工作線程同時恢復。所以,加上主線程,我們一共有三個線程。
如上方法所示,我們在主線程中創建了一個包含三方的屏障對象。接下來,我們創建並啟動兩個工作線程。
正如我們之前所討論的,兩個工作線程被阻塞並等待屏障打開以恢復。
在主線程中,我們可以做一些實際的工作。當我們決定打開barrier時,我們調用barrier.await() 方法讓兩個worker繼續執行。
如果我們在 main()方法中調用usingCyclicBarrier(),我們將得到輸出:
===============================================
>>> Using CyclicBarrier <<<<
===============================================
[ Worker with barrier 1 ] created, blocked by the barrier
[ Worker with barrier 2 ] created, blocked by the barrier
-----------------------------------------------
Now open the barrier:
-----------------------------------------------
[ Worker with barrier 1 ] starts at: 2021-06-27T16:00:52.311346392Z
[ Worker with barrier 2 ] starts at: 2021-06-27T16:00:52.311348874Z
我們可以比較兩個工人的開始時間。即使兩個工作人員沒有在完全相同的時間開始,我們也非常接近我們的目標:兩個開始時間之間的差異小於三微秒。
5. 使用Phaser類
該移相器類是引入了同步Java 7已經很類似的CyclicBarrier和CountDownLatch。但是,Phaser類更靈活。
例如,與CyclicBarrier和CountDownLatch不同,Phaser允許我們動態注冊線程方。
接下來,讓我們使用Phaser解決問題。
5.1. 工作線程
像往常一樣,我們先看一下實現,然后了解它是如何工作的:
public class WorkerWithPhaser extends Thread { private Phaser phaser; public WorkerWithPhaser(String name, Phaser phaser) { this.phaser = phaser; phaser.register(); setName(name); } @Override public void run() { try { System.out.printf("[ %s ] created, blocked by the phaser\n", getName()); phaser.arriveAndAwaitAdvance(); System.out.printf("[ %s ] starts at: %s\n", getName(), Instant.now()); // do actual work here... } catch (IllegalStateException e) { // handle exception } } }
當工作線程被實例化時,我們通過調用phaser.register()將當前線程注冊到給定的 Phaser對象 。這樣,當前的工作就變成了移相器屏障的一個線程方 。
接下來,當工作線程啟動時,我們立即調用phaser.arriveAndAwaitAdvance()。因此,我們告訴 phaser當前線程已經到達,並將等待其他線程方的到達繼續進行。當然,在其他線程方到來之前,當前線程是被阻塞的。
5.2. 主線程
接下來,我們繼續看主線程的實現:
private static void usingPhaser() throws InterruptedException { System.out.println("\n==============================================="); System.out.println(" >>> Using Phaser <<<"); System.out.println("==============================================="); Phaser phaser = new Phaser(); phaser.register(); WorkerWithPhaser worker1 = new WorkerWithPhaser("Worker with phaser 1", phaser); WorkerWithPhaser worker2 = new WorkerWithPhaser("Worker with phaser 2", phaser); worker1.start(); worker2.start(); Thread.sleep(10);//simulation of some actual work System.out.println("-----------------------------------------------"); System.out.println(" Now open the phaser barrier:"); System.out.println("-----------------------------------------------"); phaser.arriveAndAwaitAdvance(); }
在上面的代碼中,我們可以看到,主線程將自己注冊為Phaser對象的線程方。
在我們創建並阻塞了兩個工作線程之后,主線程也調用了phaser.arriveAndAwaitAdvance()。這樣我們可以讓兩個工作線程可以同時恢復。
最后,讓我們調用main()方法中的usingPhaser ()方法:
===============================================
>>> Using Phaser <<<
===============================================
[ Worker with phaser 1 ] created, blocked by the phaser [ Worker with phaser 2 ] created, blocked by the phaser ----------------------------------------------- Now open the phaser barrier: ----------------------------------------------- [ Worker with phaser 2 ] starts at: 2021-07-18T17:39:27.063523636Z [ Worker with phaser 1 ] starts at: 2021-07-18T17:39:27.063523827Z
同樣,兩個工作線程幾乎同時啟動。兩個開始時間之間的差異小於兩微秒。
六,結論
在本文中,我們首先討論了要求:“同時啟動兩個線程”。
接下來,我們討論了同時啟動三個線程的兩種方法:使用CountDownLatch、 CyclicBarrier和Phaser。
他們的想法很相似,阻塞兩個線程並試圖讓它們同時恢復執行。
盡管這些方法不能保證兩個線程完全同時啟動,但對於現實世界中的大多數情況,結果非常接近且足夠。