java基礎-多線程(+整理的思維導圖)


本文內容主要整理轉載自廖雪峰的官方網站

12 多線程

整理的思維導圖,github下載地址)

image-20200426215828269

多線程是Java最基本的一種並發模型,本章我們將詳細介紹Java多線程編程。

12.1 多線程基礎

操作系統執行多任務實際上就是讓CPU對多個任務輪流交替執行。

例如,讓瀏覽器執行0.001秒,讓QQ執行0.001秒,再讓音樂播放器執行0.001秒,在人看來,CPU就是在同時執行多個任務。

進程

  • 一個任務稱為一個進程,瀏覽器就是一個進程,視頻播放器是另一個進程,類似的,音樂播放器和Word都是進程。

  • 某些進程內部還需要同時執行多個子任務。
    例如,我們在使用Word時,Word可以讓我們一邊打字,一邊進行拼寫檢查,同時還可以在后台進行打印,
    我們把子任務稱為線程。

進程和線程的關系就是:一個進程可以包含一個或多個線程,但至少會有一個線程。

image-20200415232635257

實現多任務的方法

因為同一個應用程序,既可以有多個進程,也可以有多個線程,因此,實現多任務的方法,有以下幾種:

  • 多進程模式(每個進程只有一個線程):

image-20200415232902433

  • 多線程模式(一個進程有多個線程):

image-20200415232923951

  • 多進程+多線程模式(復雜度最高):

image-20200415232944176

多任務:進程VS線程

和多線程相比,多進程的缺點在於:

  • 創建進程比創建線程開銷大,尤其是在Windows系統上;
  • 進程間通信比線程間通信要慢,因為線程間通信就是讀寫同一個變量,速度很快。

而多進程的優點在於:

  • 多進程穩定性比多線程高,因為在多進程的情況下,一個進程崩潰不會影響其他進程,

  • 在多線程的情況下,任何一個線程崩潰會直接導致整個進程崩潰。

多線程特點

Java語言內置了多線程支持:
一個Java程序實際上是一個JVM進程,
JVM進程用一個主線程來執行main()方法,
main()方法內部,我們又可以啟動多個線程。
此外,JVM還有負責垃圾回收的其他工作線程等。

因此,對於大多數Java程序來說,我們說多任務,實際上是說如何使用多線程實現多任務。

和單線程相比,多線程編程的特點在於:多線程經常需要讀寫共享數據,並且需要同步
例如,播放電影時,就必須由一個線程播放視頻,另一個線程播放音頻,兩個線程需要協調運行,否則畫面和聲音就不同步。因此,多線程編程的復雜度高,調試更困難。

Java多線程編程的特點又在於:

  • 多線程模型是Java程序最基本的並發模型;
  • 后續讀寫網絡、數據庫、Web開發等都依賴Java多線程模型。

因此,必須掌握Java多線程編程才能繼續深入學習其他內容。

12.2 創建新線程

Java語言內置了多線程支持。
當Java程序啟動的時候,實際上是啟動了一個JVM進程,
然后,JVM啟動主線程來執行main()方法。
main()方法中,我們又可以啟動其他線程。

方法一:

Thread派生一個自定義類,然后覆寫run()方法:

start()方法會在內部自動調用實例的run()方法。

// 多線程 
public class Main {
    public static void main(String[] args) {
        Thread t = new MyThread();
        t.start(); // 啟動新線程
    }
}

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("start new thread!");
    }
}

方法二:

創建Thread實例時,傳入一個Runnable實例:

public class Main {
    public static void main(String[] args) {
        Thread t = new Thread(new MyRunnable());
        t.start(); // 啟動新線程
    }
}

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("start new thread!");
    }
}

或者用Java8引入的lambda語法進一步簡寫為:

// 多線程 
public class Main {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            System.out.println("start new thread!");
        });
        t.start(); // 啟動新線程
    }
}

線程打印 VS main()打印

image-20200415235102645

藍色表示主線程,也就是main線程,

main線程執行的代碼有4行,
首先打印main start
然后創建Thread對象,
緊接着調用start()啟動新線程。

start()方法被調用時,JVM就創建了一個新線程,
我們通過實例變量t來表示這個新線程對象,並開始執行。

接着,main線程繼續執行打印main end語句,
t線程在main線程執行的同時會並發執行,打印thread runthread end語句。

run()方法結束時,新線程就結束了。而main()方法結束時,主線程也結束了。

我們再來看線程的執行順序:

  1. main線程肯定是先打印main start,再打印main end
  2. t線程肯定是先打印thread run,再打印thread end

但是,除了可以肯定,main start會先打印外,main end打印在thread run之前、thread end之后或者之間,都無法確定。
t線程開始運行以后,兩個線程就開始同時運行了,並且由操作系統調度,程序本身無法確定線程的調度順序。

模擬並發執行的效果

sleep()傳入的參數是毫秒。調整暫停時間的大小,我們可以看到main線程和t線程執行的先后順序。

public class Main {
    public static void main(String[] args) {
        
        System.out.println("main start...");
        
        Thread t = new Thread() {
            public void run() {
                System.out.println("thread run...");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {}
                System.out.println("thread end.");
            }
        };
        
        t.start();
        
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {}
        
        System.out.println("main end...");
    }
}

要特別注意:直接調用Thread實例的run()方法是無效的:

必須調用Thread實例的start()方法才能啟動新線程

線程的優先級

可以對線程設定優先級,設定優先級的方法是:

Thread.setPriority(int n) // 1~10, 默認值5

優先級高的線程被操作系統調度的優先級較高,操作系統對高優先級線程可能調度更頻繁,
但我們決不能通過設置優先級來確保高優先級的線程一定會先執行。

小結

  • Java用Thread對象表示一個線程,通過調用start()啟動一個新線程;
  • 一個線程對象只能調用一次start()方法;
  • 線程的執行代碼寫在run()方法中;
  • 線程調度由操作系統決定,程序本身無法決定調度順序;
  • Thread.sleep()可以把當前線程暫停一段時間。

12.3 線程的狀態

在Java程序中,一個線程對象只能調用一次start()方法啟動新線程,並在新線程中執行run()方法。
一旦run()方法執行完畢,線程就結束了。因此,Java線程的狀態有以下幾種:

  • New:新創建的線程,尚未執行;
  • Runnable:運行中的線程,正在執行run()方法的Java代碼;
  • Blocked:運行中的線程,因為某些操作被阻塞而掛起;
  • Waiting:運行中的線程,因為某些操作在等待中;
  • Timed Waiting:運行中的線程,因為執行sleep()方法正在計時等待;
  • Terminated:線程已終止,因為run()方法執行完畢。

狀態轉移圖

用一個狀態轉移圖表示如下:

image-20200416000603153

當線程啟動后,它可以在RunnableBlockedWaitingTimed Waiting這幾個狀態之間切換,直到最后變成Terminated狀態,線程終止。

線程終止的原因有:

  • 線程正常終止:run()方法執行到return語句返回;
  • 線程意外終止:run()方法因為未捕獲的異常導致線程終止;
  • 線程強制終止:對某個線程的Thread實例調用stop()方法強制終止(強烈不推薦使用)。

線程等待

一個線程還可以等待另一個線程直到其運行結束。例如,main線程在啟動t線程后,可以通過t.join()等待t線程結束后再繼續運行:
join就是指等待該線程結束,然后才繼續往下執行自身線程。)

// 多線程 
public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            System.out.println("hello");
        });
        System.out.println("start");
        t.start();
        t.join();
        System.out.println("end");
    }
}
start
hello
end

main線程對線程對象t調用join()方法時,主線程將等待變量t表示的線程運行結束。

如果t線程已經結束,對實例t調用join()會立刻返回。
此外,join(long)的重載方法也可以指定一個等待時間,超過等待時間后就不再繼續等待。

小結

  • Java線程對象Thread的狀態包括:NewRunnableBlockedWaitingTimed WaitingTerminated
  • 通過對另一個線程對象調用join()方法可以等待其執行結束;
  • 可以指定等待時間,超過等待時間線程仍然沒有結束就不再等待;
  • 對已經運行結束的線程調用join()方法會立刻返回。

12.4 中斷線程

常規中斷

如果線程需要執行一個長時間任務,就可能需要能中斷線程。中斷線程就是其他線程給該線程發一個信號,該線程收到信號后結束執行run()方法,使得自身線程能立刻結束運行。

我們舉個栗子:假設從網絡下載一個100M的文件,如果網速很慢,用戶等得不耐煩,就可能在下載過程中點“取消”,這時,程序就需要中斷下載線程的執行。

中斷一個線程非常簡單,只需要在其他線程中對目標線程調用interrupt()方法,目標線程需要反復檢測自身狀態是否是interrupted狀態,如果是,就立刻結束運行。

我們還是看示例代碼:

// 中斷線程
public class Main {
    public static void main(String[] args) throws InterruptedException {

        Thread t = new MyThread();

        t.start();

        Thread.sleep(1); // 暫停1毫秒

        t.interrupt(); // 中斷t線程

        t.join(); // 等待t線程結束

        System.out.println("end");
    }
}

class MyThread extends Thread {
    public void run() {
        int n = 0;
        while (! isInterrupted()) {
            n ++;
            System.out.println(n + " hello!");
        }
    }
}
//每次執行的結果是不一樣的,不能確定1ms以內會執行上述代碼多少次
1 hello!
end

main線程通過調用t.interrupt()方法中斷t線程,
但要注意,interrupt()方法僅僅向t線程發出了“中斷請求”,至於t線程是否能立刻響應,要看具體代碼。
t線程的while循環會檢測isInterrupted(),所以上述代碼能正確響應interrupt()請求,使得自身立刻結束運行run()方法。

等待狀態線程

如果線程處於等待狀態,例如,t.join()main線程進入等待狀態
此時,如果對main線程中的t線程調用interrupt(),那么t線程的join()方法會立刻拋出InterruptedException

因此,目標線程(也就是這里的t線程)只要捕獲到join()方法拋出的InterruptedException,就說明有其他線程對其調用了interrupt()方法(在這里是指,在main線程中對t線程調用了interrupt方法),通常情況下該線程應該立刻結束運行。

我們來看下面的示例代碼:

// 中斷線程 
public class Main {
    public static void main(String[] args) throws InterruptedException {
        Thread t = new MyThread();
        t.start();
        Thread.sleep(1000);//t線程開始后,將主線程暫停1000ms,讓t線程執行
        t.interrupt(); // 中斷t線程(提出中斷請求)
        t.join(); // 等待t線程結束
        System.out.println("end");
    }
}

class MyThread extends Thread {
    public void run() {
        Thread hello = new HelloThread();
        hello.start(); // 啟動hello線程
        try {
            hello.join(); // 等待hello線程結束
            //主線程結束1000ms暫停后,執行t中斷的時候,t線程在這里等待中
            //在上一級對t線程執行interrupt,
            //那么這里的 hello.join()方法會立刻拋出InterruptedException異常
        } catch (InterruptedException e) {
            System.out.println("interrupted!");
        }
        hello.interrupt();
    }
}

class HelloThread extends Thread {
    public void run() {
        int n = 0;
        while (!isInterrupted()) {
            n++;
            System.out.println(n + " hello!");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}
1 hello!
2 hello!
3 hello!
4 hello!
5 hello!
6 hello!
7 hello!
8 hello!
9 hello!
10 hello!
interrupted!
end

main線程通過調用t.interrupt()從而通知t線程中斷,而此時t線程正位於hello.join()的等待中,此方法會立刻結束等待並拋出InterruptedException

由於我們在t線程中捕獲了InterruptedException,因此,就可以准備結束該線程。

t線程結束前,對hello線程也進行了interrupt()調用通知其中斷。

如果去掉這一行代碼,可以發現hello線程仍然會繼續運行,且JVM不會退出。

設置中斷標志位

另一個常用的中斷線程的方法是設置標志位。我們通常會用一個running標志位來標識線程是否應該繼續運行,在外部線程中,通過把HelloThread.running置為false,就可以讓線程結束:

// 中斷線程 
public class Main {
    public static void main(String[] args)  throws InterruptedException {
        HelloThread t = new HelloThread();
        t.start();
        Thread.sleep(1);
        t.running = false; // 標志位置為false
    }
}

class HelloThread extends Thread {
    public volatile boolean running = true;
    public void run() {
        int n = 0;
        while (running) {
            n ++;
            System.out.println(n + " hello!");
        }
        System.out.println("end!");
    }
}
1 hello!
end!

注意到HelloThread的標志位boolean running是一個線程間共享的變量。線程間共享變量需要使用volatile關鍵字標記,確保每個線程都能讀取到更新后的變量值。

volatile關鍵字

為什么要對線程間共享的變量用關鍵字volatile聲明?這涉及到Java的內存模型。在Java虛擬機中,變量的值保存在主內存中,但是,當線程訪問變量時,它會先獲取一個副本,並保存在自己的工作內存中。如果線程修改了變量的值,虛擬機會在某個時刻把修改后的值回寫到主內存,但是,這個時間是不確定的!

image-20200416165809735

這會導致如果一個線程更新了某個變量,另一個線程讀取的值可能還是更新前的。
例如,主內存的變量a = true,線程1執行a = false時,
它在此刻僅僅是把變量a的副本變成了false,主內存的變量a還是true
在JVM把修改后的a回寫到主內存之前,其他線程讀取到的a的值仍然是true
這就造成了多線程之間共享的變量不一致。

因此,volatile關鍵字的目的是告訴虛擬機:

  • 每次訪問變量時,總是獲取主內存的最新值;
  • 每次修改變量后,立刻回寫到主內存。

volatile關鍵字解決的是可見性問題:當一個線程修改了某個共享變量的值,其他線程能夠立刻看到修改后的值。

如果我們去掉volatile關鍵字,運行上述程序,發現效果和帶volatile差不多,這是因為在x86的架構下,JVM回寫主內存的速度非常快,但是,換成ARM的架構,就會有顯著的延遲。

小結

  • 對目標線程調用interrupt()方法可以請求中斷一個線程,目標線程通過檢測isInterrupted()標志獲取自身是否已中斷。如果目標線程處於等待狀態,該線程會捕獲到InterruptedException
  • 目標線程檢測到isInterrupted()true或者捕獲了InterruptedException都應該立刻結束自身線程;
  • 通過標志位判斷需要正確使用volatile關鍵字;
  • volatile關鍵字解決了共享變量在線程間的可見性問題。

12.5 守護線程

有一種線程的目的就是無限循環,例如,一個定時觸發任務的線程:

class TimerThread extends Thread {
    @Override
    public void run() {
        while (true) {
            System.out.println(LocalTime.now());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
}

如果這個線程不結束,JVM進程就無法結束。問題是,由誰負責結束這個線程?

然而這類線程經常沒有負責人來負責結束它們。但是,當其他線程結束時,JVM進程又必須要結束,怎么辦?

答案是使用守護線程(Daemon Thread)。

守護線程是指為其他線程服務的線程。在JVM中,所有非守護線程都執行完畢后,無論有沒有守護線程,虛擬機都會自動退出。

因此,JVM退出時,不必關心守護線程是否已結束。

如何創建守護線程呢?方法和普通線程一樣,只是在調用start()方法前,調用setDaemon(true)把該線程標記為守護線程:

Thread t = new MyThread();
t.setDaemon(true);
t.start();

在守護線程中,編寫代碼要注意:守護線程不能持有任何需要關閉的資源,例如打開文件等,因為虛擬機退出時,守護線程沒有任何機會來關閉文件,這會導致數據丟失。

小結

  • 守護線程是為其他線程服務的線程;
  • 所有非守護線程都執行完畢后,虛擬機退出;
  • 守護線程不能持有需要關閉的資源(如打開文件等)。

12.6 線程同步-synchronized

同步的本質就是給指定對象加鎖,加鎖后才能繼續執行后續代碼

當多個線程同時運行時,線程的調度由操作系統決定,程序本身無法決定。因此,任何一個線程都有可能在任何指令處被操作系統暫停,然后在某個時間段后繼續執行。

這個時候,有個單線程模型下不存在的問題就來了:如果多個線程同時讀寫共享變量,會出現數據不一致的問題。

多線程模型下,要保證邏輯正確,對共享變量進行讀寫時,必須保證一組指令以原子方式執行:即某一個線程執行時,其他線程必須等待:
image-20200423110745778

通過加鎖和解鎖的操作,就能保證3條指令總是在一個線程執行期間,不會有其他線程會進入此指令區間。
即使在執行期線程被操作系統中斷執行,其他線程也會因為無法獲得鎖導致無法進入此指令區間。
只有執行線程將鎖釋放后,其他線程才有機會獲得鎖並執行。
這種加鎖和解鎖之間的代碼塊我們稱之為臨界區(Critical Section),任何時候臨界區最多只有一個線程能執行。

實現加鎖

概括一下如何使用synchronized

  1. 找出修改共享變量的線程代碼塊;
  2. 選擇一個共享實例作為鎖;
  3. 使用synchronized(lockObject) { ... }
public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread();
        var dec = new DecThread();
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count);
    }
}

class Counter {
    public static final Object lock = new Object();
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count += 1;
            }
        }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) {
            synchronized(Counter.lock) {
                Counter.count -= 1;
            }
        }
    }
}

小結

  • 多線程同時讀寫共享變量時,會造成邏輯錯誤,因此需要通過synchronized同步;
  • 同步的本質就是給指定對象加鎖,加鎖后才能繼續執行后續代碼;
  • 注意加鎖對象必須是同一個實例;
  • 對JVM定義的單個原子操作不需要同步。

12.7 同步方法-封裝synchronized

使得類里面的"方法"進行同步

使用synchronized的時候,鎖住的是哪個對象非常重要。
好的方法是把synchronized邏輯封裝起來。例如,我們編寫一個計數器如下:

例:封裝Counter

public class Counter {
    private int count = 0;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }

    public void dec(int n) {
        synchronized(this) {
            count -= n;
        }
    }

    public int get() {
        return count;
    }
}

synchronized鎖住的對象是this,即當前實例,這又使得創建多個Counter實例的時候,它們之間互不影響,可以並發執行:

var c1 = Counter();
var c2 = Counter();

// 對c1進行操作的線程:
new Thread(() -> {
    c1.add();
}).start();
new Thread(() -> {
    c1.dec();
}).start();

// 對c2進行操作的線程:
new Thread(() -> {
    c2.add();
}).start();
new Thread(() -> {
    c2.dec();
}).start();

現在,對於Counter類,多線程可以正確調用。

線程安全

沒有特殊說明時,一個類默認是非線程安全的

如果一個類被設計為允許多線程正確訪問,我們就說這個類就是“線程安全”的(thread-safe),上面的Counter類就是線程安全的。

線程安全:

  • Java標准庫的java.lang.StringBuffer

  • 一些不變類,例如StringIntegerLocalDate
    它們的所有成員變量都是final,多線程同時訪問時只能讀不能寫,這些不變類也是線程安全的。

  • 最后,類似Math這些只提供靜態方法,沒有成員變量的類,也是線程安全的。

非線程安全:

  • 除了上述幾種少數情況,大部分類,例如ArrayList,都是非線程安全的類,
    我們不能在多線程中修改它們。
  • 但是,如果所有線程都只讀取,不寫入,那么ArrayList是可以安全地在線程間共享的。

鎖住this實例的寫法

當我們鎖住的是this實例時,實際上可以用synchronized修飾這個方法。下面兩種寫法是等價的:

public void add(int n) {
    synchronized(this) { // 鎖住this
        count += n;
    } // 解鎖
}


public synchronized void add(int n) { // 鎖住this
    count += n;
} // 解鎖

因此,用synchronized修飾的方法就是同步方法,它表示整個方法都必須用this實例加鎖。

對一個靜態方法添加synchronized

如果對一個靜態方法添加synchronized修飾符,它鎖住的是哪個對象?

public synchronized static void test(int n) {
    ...
}

對於static方法,是沒有this實例的,因為static方法是針對類而不是實例。
但是我們注意到任何一個類都有一個由JVM自動創建的Class實例,因此,對static方法添加synchronized,鎖住的是該類的Class實例。

上述synchronized static方法實際上相當於:

public class Counter {
    public static void test(int n) {
        synchronized(Counter.class) {
            ...
        }
    }
}

小結

  • synchronized修飾方法可以把整個方法變為同步代碼塊,synchronized方法加鎖對象是this
  • 通過合理的設計和數據封裝可以讓一個類變為“線程安全”;
  • 一個類沒有特殊說明,默認不是thread-safe;
  • 多線程能否安全訪問某個非線程安全的實例,需要具體問題具體分析。

12.8 死鎖

兩個線程各自持有不同的鎖,然后各自試圖獲取對方手里的鎖,造成了雙方無限等待下去,這就是死鎖。

Java的synchronized鎖是可重入鎖

JVM允許同一個線程重復獲取同一個鎖,這種能被同一個線程反復獲取的鎖,就叫做可重入鎖。

public class Counter {
    private int count = 0;

    public synchronized void add(int n) {
        if (n < 0) {
            dec(-n);
        } else {
            count += n;
        }
    }

    public synchronized void dec(int n) {
        count += n;
    }
}

觀察synchronized修飾的add()方法,一旦線程執行到add()方法內部,說明它已經獲取了當前實例的this鎖。如果傳入的n < 0,將在add()方法內部調用dec()方法。由於dec()方法也需要獲取this

死鎖例子

兩個線程各自持有不同的鎖,然后各自試圖獲取對方手里的鎖,造成了雙方無限等待下去,這就是死鎖

public void add(int m) {
    synchronized(lockA) { // 獲得lockA的鎖
        this.value += m;
        synchronized(lockB) { // 獲得lockB的鎖
            this.another += m;
        } // 釋放lockB的鎖
    } // 釋放lockA的鎖
}

public void dec(int m) {
    synchronized(lockB) { // 獲得lockB的鎖
        this.another -= m;
        synchronized(lockA) { // 獲得lockA的鎖
            this.value -= m;
        } // 釋放lockA的鎖
    } // 釋放lockB的鎖
}

避免死鎖

線程獲取鎖的順序要一致。兩個線程要嚴格按照先獲取lockA,再獲取lockB的順序,改寫dec()方法如下:

public void dec(int m) {
    synchronized(lockA) { // 獲得lockA的鎖
        this.value -= m;
        synchronized(lockB) { // 獲得lockB的鎖
            this.another -= m;
        } // 釋放lockB的鎖
    } // 釋放lockA的鎖
}

小結

  • Java的synchronized鎖是可重入鎖;
  • 死鎖產生的條件是多線程各自持有不同的鎖,並互相試圖獲取對方已持有的鎖,導致無限等待;
  • 避免死鎖的方法是多線程獲取鎖的順序要一致。

12.9 多線程協調: wait 和 notify

在Java程序中,synchronized解決了多線程競爭的問題。例如,對於一個任務管理器,多個線程同時往隊列中添加任務,可以用synchronized加鎖:

import java.util.*;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        //新建TaskQueue實例對象
        var q = new TaskQueue();
        //存放線程的數組
        var ts = new ArrayList<Thread>();

        //開啟並運行5個線程,每個線程都嘗試打印q中的task
        for (int i=0; i<5; i++) {
            var t = new Thread() {
                public void run() {
                    // 執行task:
                    while (true) {
                        try {
                            String s = q.getTask();
//                            System.out.println("execute task: " + s);
                            System.out.println(this.getName() + " execute task: " + s + "\n");
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
            t.start();
            ts.add(t);
        }

        //新建add線程,在線程中,每隔100ms,往q中添加一個String
        var add = new Thread(() -> {
            for (int i=0; i<10; i++) {
                // 放入task:
                String s = "t-" + Math.random();
                System.out.println("add task: " + s);
                q.addTask(s);
                //這里每次往q中添加一個String后暫停100ms的原因是為了讓上述5個線程中的某一個能夠及時捕捉到,並打印
                try { Thread.sleep(100); } catch(InterruptedException e) {}
            }
        });

        //開始執行add線程,並等待add執行完畢
        //這里是main線程等待add線程執行完畢,但是for循環創建的5個線程是在一直運行中的
        add.start();
        add.join();

        //主線程暫停100ms
        //目的是留出一點時間,讓上述5個線程能夠將q中的String全部get出來
        //然后再對所有的 提出中斷請求
        Thread.sleep(100);

        //
        for (var t : ts) {
            t.interrupt();
        }

        System.out.println("main Thread ended");
    }
}

class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s) {
        this.queue.add(s);
        this.notifyAll();
    }

    public synchronized String getTask() throws InterruptedException {
        while (queue.isEmpty()) {
            this.wait();
        }
        return queue.remove();
    }
}
add task: t-0.3008528971311387
Thread-0 execute task: t-0.3008528971311387

add task: t-0.8437246350254718
Thread-4 execute task: t-0.8437246350254718

add task: t-0.40109144938751884
Thread-0 execute task: t-0.40109144938751884

add task: t-0.6488781210842749
Thread-4 execute task: t-0.6488781210842749

add task: t-0.7625266753242343
Thread-1 execute task: t-0.7625266753242343

add task: t-0.9845878231829128
Thread-4 execute task: t-0.9845878231829128

add task: t-0.4077696485977367
Thread-4 execute task: t-0.4077696485977367

add task: t-0.33171418799079655
Thread-4 execute task: t-0.33171418799079655

add task: t-0.2938665331723114
Thread-1 execute task: t-0.2938665331723114

add task: t-0.6479359799196959
Thread-4 execute task: t-0.6479359799196959

main Thread ended

Process finished with exit code 0

12.10 使用ReentrantLock

Java語言直接提供了synchronized關鍵字用於加鎖,但這種鎖
一是很
二是獲取時必須一直等待,沒有額外的嘗試機制。

java.util.concurrent.locks包提供的ReentrantLock用於替代synchronized加鎖,

傳統的synchronized代碼:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

如果用ReentrantLock替代,可以把代碼改造為:

public class Counter {
    private final Lock lock = new ReentrantLock();
    private int count;

    public void add(int n) {
        lock.lock();
        try {
            count += n;
        } finally {
            lock.unlock();
        }
    }
}

因為synchronized是Java語言層面提供的語法,所以我們不需要考慮異常,
ReentrantLock是Java代碼實現的鎖,我們就必須先獲取鎖,然后在finally中正確釋放鎖。

顧名思義,ReentrantLock是可重入鎖,它和synchronized一樣,一個線程可以多次獲取同一個鎖。

嘗試獲取鎖

synchronized不同的是,ReentrantLock可以嘗試獲取鎖:

if (lock.tryLock(1, TimeUnit.SECONDS)) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

上述代碼在嘗試獲取鎖的時候,最多等待1秒。如果1秒后仍未獲取到鎖,tryLock()返回false,程序就可以做一些額外處理,而不是無限等待下去。

所以,使用ReentrantLock比直接使用synchronized更安全,線程在tryLock()失敗的時候不會導致死鎖。

小結

  • ReentrantLock可以替代synchronized進行同步;
  • ReentrantLock獲取鎖更安全;
  • 必須先獲取到鎖,再進入try {...}代碼塊,最后使用finally保證釋放鎖;
  • 可以使用tryLock()嘗試獲取鎖。

12.11 使用Condition

使用來實現和 synchronized 類似的 wait 和 notify 功能的

class TaskQueue {
    //引用的`Condition`對象必須從`Lock`實例的`newCondition()`返回,
    //這樣才能獲得一個綁定了`Lock`實例的`Condition`實例。
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String s) {
        lock.lock();
        try {
            queue.add(s);
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public String getTask() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                condition.await();
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

可見,使用Condition時,引用的Condition對象必須從Lock實例的newCondition()返回,這樣才能獲得一個綁定了Lock實例的Condition實例。

Condition提供的await()signal()signalAll()原理和synchronized鎖對象的wait()notify()notifyAll()是一致的,並且其行為也是一樣的:

  • await()會釋放當前鎖,進入等待狀態;
  • signal()會喚醒某個等待線程;
  • signalAll()會喚醒所有等待線程;
  • 喚醒線程從await()返回后需要重新獲得鎖。

此外,和tryLock()類似,await()可以在等待指定時間后,如果還沒有被其他線程通過signal()signalAll()喚醒,可以自己醒來:

if (condition.await(1, TimeUnit.SECOND)) {
    // 被其他線程喚醒
} else {
    // 指定時間內沒有被其他線程喚醒
}

可見,使用Condition配合Lock,我們可以實現更靈活的線程同步。

小結

  • Condition可以替代waitnotify
  • Condition對象必須從Lock對象獲取。

12.12 使用ReadWriteLock

前面講到的ReentrantLock保證了只有一個線程可以執行臨界區代碼:

但是有些時候,這種保護有點過頭。因為我們發現,任何時刻,只允許一個線程修改,也就是調用inc()方法是必須獲取鎖,但是,get()方法只讀取數據,不修改數據,它實際上允許多個線程同時調用。

實際上我們想要的是:允許多個線程同時讀,但只要有一個線程在寫,其他線程就必須等待

使用ReadWriteLock可以解決這個問題,它保證:

  • 只允許一個線程寫入(其他線程既不能寫入也不能讀取);
  • 沒有寫入時,多個線程允許同時讀(提高性能)。

ReadWriteLock實現這個功能十分容易。我們需要創建一個ReadWriteLock實例,然后分別獲取讀鎖和寫鎖:

public class Counter {
    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
    private final Lock rlock = rwlock.readLock();
    private final Lock wlock = rwlock.writeLock();
    private int[] counts = new int[10];

    public void inc(int index) {
        wlock.lock(); // 加寫鎖
        try {
            counts[index] += 1;
        } finally {
            wlock.unlock(); // 釋放寫鎖
        }
    }

    public int[] get() {
        rlock.lock(); // 加讀鎖
        try {
            return Arrays.copyOf(counts, counts.length);
        } finally {
            rlock.unlock(); // 釋放讀鎖
        }
    }
}

把讀寫操作分別用讀鎖和寫鎖來加鎖,在讀取時,多個線程可以同時獲得讀鎖,這樣就大大提高了並發讀的執行效率。

使用ReadWriteLock時,適用條件是同一個數據,有大量線程讀取,但僅有少數線程修改。

例如,一個論壇的帖子,回復可以看做寫入操作,它是不頻繁的,但是,瀏覽可以看做讀取操作,是非常頻繁的,這種情況就可以使用ReadWriteLock

小結

使用ReadWriteLock可以提高讀取效率:

  • ReadWriteLock只允許一個線程寫入;
  • ReadWriteLock允許多個線程在沒有寫入時同時讀取;
  • ReadWriteLock適合讀多寫少的場景。

12.13 使用StampedLock

ReadWriteLock可以解決多線程同時讀,但只有一個線程能寫的問題。

他的潛在問題是:
如果有線程正在讀,寫線程需要等待讀線程釋放鎖后才能獲取寫鎖,即讀的過程中不允許寫,這是一種悲觀的讀鎖。

樂觀鎖

Java 8引入了新的樂觀讀寫鎖:StampedLock

改進之處在於:讀的過程中也允許獲取寫鎖后寫入!這樣一來,我們讀的數據就可能不一致,所以,需要一點額外的代碼來判斷讀的過程中是否有寫入,這種讀鎖是一種樂觀鎖。

public class Point {
    private final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY) {
        long stamp = stampedLock.writeLock(); // 獲取寫鎖
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp); // 釋放寫鎖
        }
    }

    public double distanceFromOrigin() {
        long stamp = stampedLock.tryOptimisticRead(); // 獲得一個樂觀讀鎖
        // 注意下面兩行代碼不是原子操作
        // 假設x,y = (100,200)
        double currentX = x;
        // 此處已讀取到x=100,但x,y可能被寫線程修改為(300,400)
        double currentY = y;
        // 此處已讀取到y,如果沒有寫入,讀取是正確的(100,200)
        // 如果有寫入,讀取是錯誤的(100,400)
        if (!stampedLock.validate(stamp)) { // 檢查樂觀讀鎖后是否有其他寫鎖發生
            stamp = stampedLock.readLock(); // 獲取一個悲觀讀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

ReadWriteLock相比,寫入的加鎖是完全一樣的,不同的是讀取。

  • 注意到首先我們通過tryOptimisticRead()獲取一個樂觀讀鎖,並返回版本號。

  • 接着進行讀取,讀取完成后,我們通過validate()去驗證版本號,
    如果在讀取過程中沒有寫入,版本號不變,驗證成功,我們就可以放心地繼續后續操作。
    如果在讀取過程中有寫入,版本號會發生變化,驗證將失敗。

  • 在失敗的時候,我們再通過獲取悲觀讀鎖再次讀取。
    由於寫入的概率不高,程序在絕大部分情況下可以通過樂觀讀鎖獲取數據,極少數情況下使用悲觀讀鎖獲取數據。

可見,StampedLock把讀鎖細分為樂觀讀和悲觀讀,能進一步提升並發效率。但這也是有代價的:一是代碼更加復雜,二是StampedLock是不可重入鎖,不能在一個線程中反復獲取同一個鎖。

StampedLock還提供了更復雜的將悲觀讀鎖升級為寫鎖的功能,它主要使用在if-then-update的場景:即先讀,如果讀的數據滿足條件,就返回,如果讀的數據不滿足條件,再嘗試寫。

tryOptimisticRead()返回的是版本號,不是鎖,根本沒有鎖

后面validate()就是為了驗證在這段時間內版本號變了沒,如果沒變,那就沒有寫入

版本號就是個long

readLock()才返回真正的讀鎖,必須finally中unlock

小結

  • StampedLock提供了樂觀讀鎖,可取代ReadWriteLock以進一步提升並發性能;
  • StampedLock是不可重入鎖。

12.14 使用Concurrent集合

針對ListMapSetDeque等,java.util.concurrent包也提供了對應的並發集合類。我們歸納一下:

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingDeque

使用這些並發集合與使用非線程安全的集合類完全相同。我們以ConcurrentHashMap為例:

Map<String, String> map = new ConcurrentHashMap<>();
// 在不同的線程讀寫:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");

因為所有的同步和加鎖的邏輯都在集合內部實現,對外部調用者來說,只需要正常按接口引用,其他代碼和原來的非線程安全代碼完全一樣。即當我們需要多線程訪問時,把:

Map<String, String> map = new HashMap<>();

改為:

Map<String, String> map = new ConcurrentHashMap<>();

小結

  • 使用java.util.concurrent包提供的線程安全的並發集合可以大大簡化多線程編程:
  • 多線程同時讀寫並發集合是安全的;
  • 盡量使用Java標准庫提供的並發集合,避免自己編寫同步代碼。

12.15 使用Atomic

Java的java.util.concurrent包除了提供底層鎖、並發集合外,還提供了一組原子操作的封裝類,它們位於java.util.concurrent.atomic包。

我們以AtomicInteger為例,它提供的主要操作有:

  • 增加值並返回新值:int addAndGet(int delta)
  • 加1后返回新值:int incrementAndGet()
  • 獲取當前值:int get()
  • 用CAS方式設置:int compareAndSet(int expect, int update)

Atomic類是通過無鎖(lock-free)的方式實現的線程安全(thread-safe)訪問。它的主要原理是利用了CAS:Compare and Set。

利用AtomicLong可以編寫一個多線程安全的全局唯一ID生成器:

class IdGenerator {
    AtomicLong var = new AtomicLong(0);

    public long getNextId() {
        return var.incrementAndGet();
    }
}

通常情況下,我們並不需要直接用do ... while循環調用compareAndSet實現復雜的並發操作,而是用incrementAndGet()這樣的封裝好的方法,因此,使用起來非常簡單。

在高度競爭的情況下,還可以使用Java 8提供的LongAdderLongAccumulator

小結

使用java.util.concurrent.atomic提供的原子操作可以簡化多線程編程:

  • 原子操作實現了無鎖的線程安全;
  • 適用於計數器,累加器等。

12.16 使用線程池

線程池內部維護了若干個線程,沒有任務的時候,這些線程都處於等待狀態。如果有新任務,就分配一個空閑線程執行。如果所有線程都處於忙碌狀態,新任務要么放入隊列等待,要么增加一個新線程進行處理。

Java標准庫提供了ExecutorService接口表示線程池,它的典型用法如下:

// 創建固定大小的線程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任務:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);

因為ExecutorService只是接口,Java標准庫提供的幾個常用實現類有:

  • FixedThreadPool:線程數固定的線程池;
  • CachedThreadPool:線程數根據任務動態調整的線程池;
  • SingleThreadExecutor:僅單線程執行的線程池。

創建這些線程池的方法都被封裝到Executors這個類中。我們以FixedThreadPool為例,看看線程池的執行邏輯:

提交任務的時候只需要實現runnable接口

// thread-pool 
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        // 創建一個固定大小的線程池:
        ExecutorService es = Executors.newFixedThreadPool(4);
        for (int i = 0; i < 6; i++) {
            es.submit(new Task("" + i));
        }
        // 關閉線程池:
        es.shutdown();
    }
}

class Task implements Runnable {
    private final String name;

    public Task(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("start task " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("end task " + name);
    }
}
start task 0
start task 2
start task 3
start task 1
end task 0
end task 2
end task 1
end task 3
start task 4
start task 5
end task 4
end task 5

觀察執行結果,一次性放入6個任務,由於線程池只有固定的4個線程,因此,前4個任務會同時執行,等到有線程空閑后,才會執行后面的兩個任務。

線程池在程序結束的時候要關閉。使用shutdown()方法關閉線程池的時候,它會等待正在執行的任務先完成,然后再關閉。shutdownNow()會立刻停止正在執行的任務,awaitTermination()則會等待指定的時間讓線程池關閉。

如果我們把線程池改為CachedThreadPool,由於這個線程池的實現會根據任務數量動態調整線程池的大小,所以6個任務可一次性全部同時執行。

ScheduledThreadPool

還有一種任務,需要定期反復執行,例如,每秒刷新證券價格。這種任務本身固定,需要反復執行的,可以使用ScheduledThreadPool。放入ScheduledThreadPool的任務可以定期反復執行。

創建一個ScheduledThreadPool仍然是通過Executors類:

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

我們可以提交一次性任務,它會在指定延遲后只執行一次:

// 1秒后執行一次性任務:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

如果任務以固定的每3秒執行,我們可以這樣寫:

// 2秒后開始執行定時任務,每3秒執行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

如果任務以固定的3秒為間隔執行,我們可以這樣寫:

// 2秒后開始執行定時任務,以3秒為間隔執行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

注意FixedRate和FixedDelay的區別。FixedRate是指任務總是以固定時間間隔觸發,不管任務執行多長時間:

│░░░░   │░░░░░░ │░░░    │░░░░░  │░░░  
├───────┼───────┼───────┼───────┼────>
│<─────>│<─────>│<─────>│<─────>│

而FixedDelay是指,上一次任務執行完畢后,等待固定的時間間隔,再執行下一次任務:

│░░░│       │░░░░░│       │░░│       │░
└───┼───────┼─────┼───────┼──┼───────┼──>
    │<─────>│     │<─────>│  │<─────>│

因此,使用ScheduledThreadPool時,我們要根據需要選擇執行一次、FixedRate執行還是FixedDelay執行。

小結

JDK提供了ExecutorService實現了線程池功能:

  • 線程池內部維護一組線程,可以高效執行大量小任務;
  • Executors提供了靜態方法創建不同類型的ExecutorService
  • 必須調用shutdown()關閉ExecutorService
  • ScheduledThreadPool可以定期調度多個任務。

12.17 使用Future

12.18 使用ForkJoin


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM