Java多線程編程


一. 線程的概念

       在單線程情況下,計算機中存在一個控制權,並按照順序依次執行指令。單線程好像是一個只有一個隊長指揮的小隊,整個小隊同一個時間只能執行一個任務。在多線程情境下,計算機中有多個控制權。多個控制權可以同時進行,每個控制權依次執行一系列的指令。多線程好像是一個小隊中的成員同時執行不同的任務。

       多個線程可以並存於同一個進程空間。在JVM的一個進程空間中,一個棧(stack)代表了方法調用的次序。對於多線程來說,進程空間中需要有多個棧,以記錄不同線程的調用次序。多個棧互不影響,但所有的線程將共享堆(heap)中的對象。

二. 線程的創建方式

1.繼承Thread類

通過繼承Thread類創建線程類的具體步驟和具體代碼如下:

   • 定義一個繼承Thread類的子類,並重寫該類的run()方法;

   • 創建Thread子類的實例,即創建了線程對象;

   • 調用該線程對象的start()方法啟動線程。

 class SomeThead extends Thraad   { 
    public void run()   { 
     //do something here  
    }  
 } 
 
public static void main(String[] args){
 SomeThread thread1 = new SomeThread();   
  步驟3:啟動線程:   
 thread1.start(); 
}

2.實現Runnable接口

通過實現Runnable接口創建線程類的具體步驟和具體代碼如下:

   • 定義Runnable接口的實現類,並重寫該接口的run()方法;

   • 創建Runnable實現類的實例,並以此實例作為Thread的target對象,即該Thread對象才是真正的線程對象。

class SomeRunnable implements Runnable   { 
  public void run()   { 
  //do something here  
  }  
} 
Runnable run1 = new SomeRunnable();   
Thread thread1 = new Thread(run1);   
thread1.start(); 

三. 線程池

      線程池是多線程編程中的核心概念,簡單來說就是一組可以執行任務的空閑線程。線程池維護着多個線程,等待着監督管理者分配可並發執行的任務。這避免了在處理短時間任務時創建與銷毀線程的代價。通常,線程池的線程數量取CPU+2。

  • 重用存在的線程,省去線程的創建銷毀過程,性能佳。
  • 有效控制最大並發線程數。提高了使用率並避免了競爭。
  • 定時執行,定期執行,單線程,並發控制等功能。

      盡管這帶來了諸多優勢,首當其沖的就是程序性能提高,但多線程編程也有缺點 —— 增加了代碼復雜度、同步問題、非預期結果和增加創建線程的開銷。接下來了解一下如何使用 Java 線程池來緩解這些問題。

Java 通過 executor 對象來實現自己的線程池模型。可以使用 executor 接口或其他線程池的實現,它們都允許細粒度的控制。

java.util.concurrent 包中有以下接口:

►Executor —— 執行任務的簡單接口

►ExecutorService —— 一個較復雜的接口,包含額外方法來管理任務和 executor 本身

►ScheduledExecutorService —— 擴展自 ExecutorService,增加了執行任務的調度方法

除了這些接口,這個包中也提供了 Executors 類直接獲取實現了這些接口的 executor 實例。一般來說,一個 Java 線程池包含以下部分:

►工作線程的池子,負責管理線程

►線程工廠,負責創建新線程

►等待執行的任務隊列

在下面的章節,讓我們仔細看一看 Java 類和接口如何為線程池提供支持。

Executors 類和 Executor 接口

Executors 類包含工廠方法創建不同類型的線程池,Executor 是個簡單的線程池接口,只有一個execute() 方法。

我們通過一個例子來結合使用這兩個類(接口),首先創建一個單線程的線程池,然后用它執行一個簡單的語句:

Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("Single thread pool test"));

注意語句寫成了 lambda 表達式,會被自動推斷成 Runnable 類型。

如果有工作線程可用,execute() 方法將執行語句,否則就把 Runnable 任務放進隊列,等待線程可用。

基本上,executor 代替了顯式創建和管理線程。

Executors 類里的工廠方法可以創建很多類型的線程池:

►newSingleThreadExecutor():包含單個線程和無界隊列的線程池,同一時間只能執行一個任務

►newFixedThreadPool():包含固定數量線程並共享無界隊列的線程池;當所有線程處於工作狀態,有新任務提交時,任務在隊列中等待,直到一個線程變為可用狀態

►newCachedThreadPool():只有需要時創建新線程的線程池

►newWorkStealingThreadPool():基於工作竊取(work-stealing)算法的線程池,后面章節詳細說明

接下來,讓我們看一下 ExecutorService 接口提供了哪些新功能?

ExecutorService

創建 ExecutorService 方式之一便是通過 Excutors 類的工廠方法。

ExecutorService executor = Executors.newFixedThreadPool(10);

Besides the execute() method, this interface also defines a similar submit() method that can return a Future object:

除了 execute() 方法,接口也定義了相似的 submit() 方法,這個方法可以返回一個 Future 對象。

Callable<Double> callableTask = () -> {
    return employeeService.calculateBonus(employee);
};
Future<Double> future = executor.submit(callableTask);
// execute other operations
try {
    if (future.isDone()) {
        double result = future.get();
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

從上面的例子可以看到,Future 接口可以返回 Callable 類型任務的結果,而且能顯示任務的執行狀態。

當沒有任務等待執行時,ExecutorService 並不會自動銷毀,所以你可以使用 shutdown() 或shutdownNow() 來顯式關閉它。

executor.shutdown();

ScheduledExecutorService

這是 ExecutorService 的一個子接口,增加了調度任務的方法

ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

schedule() 方法的參數指定執行的方法、延時和 TimeUnit

Future<Double> future = executor.schedule(callableTask, 2, TimeUnit.MILLISECONDS);

另外,這個接口定義了其他兩個方法:

xecutor.scheduleAtFixedRate(
  () -> System.out.println("Fixed Rate Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);
 executor.scheduleWithFixedDelay(
  () -> System.out.println("Fixed Delay Scheduled"), 2, 2000, TimeUnit.MILLISECONDS);

scheduleAtFixedRate() 方法延時 2 毫秒執行任務,然后每 2 秒重復一次。相似的,scheduleWithFixedDelay() 方法延時 2 毫秒后執行第一次,然后在上一次執行完成 2 秒后再次重復執行。

在下面的章節,我們來看一下 ExecutorService 接口的兩個實現:ThreadPoolExecutor 和ForkJoinPool。

ThreadPoolExecutor

這個線程池的實現增加了配置參數的能力。創建 ThreadPoolExecutor 對象

最方便的方式就是通過Executors 工廠方法:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);

這種情況下,線程池按照默認值預配置了參數。線程數量由以下參數控制:

►corePoolSize 和 maximumPoolSize:表示線程數量的范圍

►keepAliveTime:決定了額外線程存活時間

我們深入了解一下這些參數如何使用。

當一個任務被提交時,如果執行中的線程數量小於 corePoolSize,一個新的線程被創建。如果運行的線程數量大於 corePoolSize,但小於 maximumPoolSize,並且任務隊列已滿時,依然會創建新的線程。如果多於 corePoolSize 的線程空閑時間超過 keepAliveTime,它們會被終止。

上面那個例子中,newFixedThreadPool() 方法創建的線程池,corePoolSize=maximumPoolSize=10 並且 keepAliveTime 為 0 秒。

如果你使用 newCachedThreadPool() 方法,創建的線程池 maximumPoolSize 為Integer.MAX_VALUE,並且 keepAliveTime 為 60 秒。

ThreadPoolExecutor cachedPoolExecutor = (ThreadPoolExecutor) Executors.newCachedThreadPool();

The parameters can also be set through a constructor or through setter methods:

這些參數也可以通過構造函數或setter方法設置:

ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 6, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
executor.setMaximumPoolSize(8);

ThreadPoolExecutor 的一個子類便是 ScheduledThreadPoolExecutor,它實現了ScheduledExecutorService 接口。你可以通過 newScheduledThreadPool() 工廠方法來創建這種類型的線程池。

ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5);

上面語句創建了一個線程池,corePoolSize 為 5,maximumPoolSize 無限制,keepAliveTime 為 0 秒。

ForkJoinPool

另一個線程池的實現是 ForkJoinPool 類。它實現了 ExecutorService 接口,並且是 Java 7 中 fork/join 框架的重要組件。

fork/join 框架基於“工作竊取算法”。簡而言之,意思就是執行完任務的線程可以從其他運行中的線程“竊取”工作。

ForkJoinPool 適用於任務創建子任務的情況,或者外部客戶端創建大量小任務到線程池。

這種線程池的工作流程如下:

►創建 ForkJoinTask 子類

►根據某種條件將任務切分成子任務

►調用執行任務

►將任務結果合並

►實例化對象並添加到池中

創建一個 ForkJoinTask,你可以選擇 RecursiveAction 或 RecursiveTask 這兩個子類,后者有返回值。

我們來實現一個繼承 RecursiveTask 的類,計算階乘,並把任務根據閾值划分成子任務。

public class FactorialTask extends RecursiveTask<BigInteger> {
    private int start = 1;
    private int n;
    private static final int THRESHOLD = 20;
 
    // standard constructors
 
    @Override
    protected BigInteger compute() {
        if ((n - start) >= THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .map(ForkJoinTask::join)
              .reduce(BigInteger.ONE, BigInteger::multiply);
        } else {
            return calculate(start, n);
        }
    }
}

這個類需要實現的主要方法就是重寫 compute() 方法,用於合並每個子任務的結果。

具體划分任務邏輯在 createSubtasks() 方法中:

private Collection<FactorialTask> createSubtasks() {
    List<FactorialTask> dividedTasks = new ArrayList<>();
    int mid = (start + n) / 2;
    dividedTasks.add(new FactorialTask(start, mid));
    dividedTasks.add(new FactorialTask(mid + 1, n));
    return dividedTasks;}

最后,calculate() 方法包含一定范圍內的乘數。

private BigInteger calculate(int start, int n) {
    return IntStream.rangeClosed(start, n)
      .mapToObj(BigInteger::valueOf)
      .reduce(BigInteger.ONE, BigInteger::multiply);}

接下來,任務可以添加到線程池:

ForkJoinPool pool = ForkJoinPool.commonPool();
BigInteger result = pool.invoke(new FactorialTask(100));

ThreadPoolExecutor 與 ForkJoinPool 對比

初看上去,似乎 fork/join 框架帶來性能提升。但是這取決於你所解決問題的類型。

當選擇線程池時,非常重要的一點是牢記創建、管理線程以及線程間切換執行會帶來的開銷。

ThreadPoolExecutor 可以控制線程數量和每個線程執行的任務。這很適合你需要在不同的線程上執行少量巨大的任務。

相比較而言,ForkJoinPool 基於線程從其他線程“竊取”任務。正因如此,當任務可以分割成小任務時可以提高效率。

為了實現工作竊取算法,fork/join 框架使用兩種隊列:

►包含所有任務的主要隊列

►每個線程的任務隊列

當線程執行完自己任務隊列中的任務,它們試圖從其他隊列獲取任務。為了使這一過程更加高效,線程任務隊列使用雙端隊列(double ended queue)數據結構,一端與線程交互,另一端用於“竊取”任務。

來自The H Developer的圖很好的表現出了這一過程:

和這種模型相比,ThreadPoolExecutor 只使用一個主要隊列。

最后要注意的一點 ForkJoinPool 只適用於任務可以創建子任務。否則它和 ThreadPoolExecutor沒區別,甚至開銷更大。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM