Java多線程學習筆記


作者:Grey

原文地址:Java多線程學習筆記

說明

本文涉及到的所有代碼和圖例

圖例

代碼

順序、並行與並發

順序(sequential)用於表示多個操作“依次處理”。比如把十個操作交給一個人處理時,這個人要一個一個地按順序來處理。

並行(parallel)用於表示多個操作“同時處理”。比如十個操作分給兩個人處理時,這兩個人會並行來處理。

並發(concurrent)相對於順序和並行來說比較抽象,用於表示“將一個操作分割成多個部分並且允許無序處理”。比如將十個操作分成相對獨立的兩類,這樣便可以開始並發處理了。如果一個人來處理,這個人就是順序處理分開的並發操作,而如果是兩個人。這兩個人就可以並行處理同一操作。

如果CPU只有一個,那么並發處理就是順序執行的,而如果有多個CPU,那么並發處理就可能會並行運行。

image

什么是程序,進程和線程?

  • 程序是計算機的可執行文件

  • 進程是計算機資源分配的基本單位

  • 線程是資源調度執行的基本單位

    • 一個程序里面不同的執行路徑

    • 多個線程共享進程中的資源

什么是協程?

協程是一種用戶態的輕量級線程,協程的調度完全由用戶控制。協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧,直接操作棧則基本沒有內核切換的開銷,可以不加鎖的訪問全局變量,所以上下文的切換非常快。協程在子程序內部可中斷的,然后轉而執行別的子程序,在適當的時候再返回來接着執行。

協程的特點在於是一個線程執行,那和多線程比,協程有如下優勢:

  1. 極高的執行效率:因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯;

  2. 不需要多線程的鎖機制:因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。

注意,協程避免了無意義的調度,由此可以提高性能,但也因此,程序員必須自己承擔調度的責任,同時,協程也失去了標准線程使用多CPU的能力。

一個簡單的協程示例, 代碼見:FiberSample.java,需要引入quasar-core依賴包。

線程和進程的關系

線程就是輕量級進程,是程序執行的最小單位。

多進程的方式也可以實現並發,為什么我們要使用多線程?

  1. 共享資源在線程間的通信比較容易。

  2. 線程開銷更小。

進程和線程的區別?

  • 進程是一個獨立的運行環境,而線程是在進程中執行的一個任務。他們兩個本質的區別是是否單獨占有內存地址空間及其它系統資源(比如I/O)。

  • 進程單獨占有一定的內存地址空間,所以進程間存在內存隔離,數據是分開的,數據共享復雜但是同步簡單,各個進程之間互不干擾;而線程共享所屬進程占有的內存地址空間和資源,數據共享簡單,但是同步復雜。

  • 進程單獨占有一定的內存地址空間,一個進程出現問題不會影響其他進程,不影響主程序的穩定性,可靠性高;一個線程崩潰可能影響整個程序的穩定性,可靠性較低。

  • 進程單獨占有一定的內存地址空間,進程的創建和銷毀不僅需要保存寄存器和棧信息,還需要資源的分配回收以及頁調度,開銷較大;線程只需要保存寄存器和棧信息,開銷較小。

  • 進程是操作系統進行資源分配的基本單位,而線程是操作系統進行調度的基本單位,即CPU分配時間的單位。

多線程訪問成員變量與局部變量

類變量(類里面static修飾的變量)保存在“方法區”

實例變量(類里面的普通變量)保存在“堆”

局部變量(方法里聲明的變量)“虛擬機棧”

“方法區”和“堆”都屬於線程共享數據區,“虛擬機棧”屬於線程私有數據區。

因此,局部變量是不能多個線程共享的,而類變量和實例變量是可以多個線程共享的。事實上,在java中,多線程間進行通信的唯一途徑就是通過類變量和實例變量。也就是說,如果一段多線程程序中如果沒有類變量和實例變量,那么這段多線程程序就一定是線程安全的。

開發過程中,為了解決線程安全問題,有如下角度可以考慮:

第一種方案:盡量使用局部變量,代替實例變量和靜態變量。

第二種方案:如果必須是實例變量,那么可以考慮創建多個對象,這樣實例變量的內存就不共享了(一個線程對應一個對象,100個對象對應100個對象,對象不共享,就沒有數據安全問題了)

第三種方案:如果不使用局部變量。對象也不能創建多個。這個時候,就只能選擇syncharonized了。

線程的共享資源和獨有資源

共享資源

  • 進程代碼段
  • 進程的公有數據
  • 進程打開的文件描述符、信號的處理器、進程的當前目錄和進程用戶ID與進程組ID。

獨有資源

線程ID

每個線程都有自己的線程ID,這個ID在本進程中是唯一的。進程用此來標識線程。

寄存器組的值

由於線程間是並發運行的,每個線程有自己不同的運行線索,當從一個線程切換到另一個線程上時,必須將原有的線程的寄存器集合的狀態保存,以便將來該線程在被重新切換到時能得以恢復。

線程的堆棧

堆棧是保證線程獨立運行所必須的。線程函數可以調用函數,而被調用函數中又是可以層層嵌套的,所以線程必須擁有自己的函數堆棧, 使得函數調用可以正常執行,不受其他線程的影響。

錯誤返回碼

由於同一個進程中有很多個線程在同時運行,可能某個線程進行系統調用后設置了err no值,而在該線程還沒有處理這個錯誤,另外一個線程就在此時被調度器投入運行,這樣錯誤值就有可能被修改。所以,不同的線程應該擁有自己的錯誤返回碼變量。

線程的信號屏蔽碼

由於每個線程所感興趣的信號不同,所以線程的信號屏蔽碼應該由線程自己管理。但所有的線程都 共享同樣的信號處理器。

線程的優先級

由於線程需要像進程那樣能夠被調度,那么就必須要有可供調度使用的參數,這個參數就是線程的優先級。

什么是線程切換?

從底層角度上看,CPU主要由如下三部分組成,分別是:

  • ALU: 計算單元
  • Registers: 寄存器組
  • PC:存儲到底執行到哪條指令

T1線程在執行的時候,將T1線程的指令放在PC,數據放在Registers,假設此時要切換成T2線程,T1線程的指令和數據放cache,然后把T2線程的指令放PC,數據放Registers,執行T2線程即可。

以上的整個過程是通過操作系統來調度的,且線程的調度是要消耗資源的,所以,線程不是設置越多越好。

示例:

單線程和多線程來累加1億個數。 示例代碼:CountSum.java

運行結果

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49998957.92 耗時 : 105ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49998957.92 耗時 : 59ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49998957.92 耗時 : 61ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 50000711.89 耗時 : 101ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 50000711.89 耗時 : 54ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 50000711.89 耗時 : 64ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49998124.71 耗時 : 114ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49998124.71 耗時 : 53ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49998124.71 耗時 : 54ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 50000309.80 耗時 : 102ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 50000309.80 耗時 : 53ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 50000309.80 耗時 : 35ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 50001943.57 耗時 : 108ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 50001943.57 耗時 : 58ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 50001943.57 耗時 : 41ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49997176.44 耗時 : 102ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49997176.44 耗時 : 53ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49997176.44 耗時 : 29ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49999627.84 耗時 : 101ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49999627.84 耗時 : 53ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49999627.84 耗時 : 29ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 50001260.16 耗時 : 102ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 50001260.16 耗時 : 55ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 50001260.16 耗時 : 28ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49997786.83 耗時 : 101ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49997786.83 耗時 : 54ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49997786.83 耗時 : 28ms

計算1億個隨機Double類型數據之和[單線程], 結果是:result = 49998955.27 耗時 : 102ms
計算1億個隨機Double類型數據之和[2個線程], 結果是:result = 49998955.27 耗時 : 54ms
計算1億個隨機Double類型數據之和[10個線程], 結果是:result = 49998955.27 耗時 : 28ms

可以看到結果中,創建10個線程 不一定會比創建2個線程要執行更快。

單核CPU設定多線程是否有意義?

有意義,因為線程的操作中可能有不消耗CPU的操作,比如:等待網絡的傳輸,或者線程sleep,此時就可以讓出CPU去執行其他線程。可以充分利用CPU資源。

工作線程數(線程池中線程數量)設多少合適?

  • 和CPU的核數有關

  • 最好是通過壓測來評估。通過profiler性能分析工具jProfiler,或者Arthas

  • 公式

N = Ncpu * Ucpu * (1 + W/C)

其中:

  • Ncpu是處理器的核的數目,可以通過Runtime.getRuntime().availableProcessors() 得到

  • Ucpu是期望的CPU利用率(該值應該介於0和1之間)

  • W/C是等待時間和計算時間的比率。

更深入的分析,可以參考這篇文章

Java中創建線程的方式

  1. 繼承Thread類,重寫run方法
  2. 實現Runnable接口,實現run方法,這比方式1更好,因為一個類實現了Runnable以后,還可以繼承其他類
  3. 通過線程池創建
  4. 通過Callable、Future與FutureTask來創建(需要返回值的時候)

具體示例可見:HelloThread.java

import java.util.concurrent.*;

/**
 * 創建線程的方式
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/7/7
 * @since
 */
public class HelloThread {
    public static void main(String[] args) throws Exception {
        MyFirstThread t1 = new MyFirstThread();
        Thread t2 = new Thread(new MySecondThread());
        Thread t3 = new Thread(new FutureTask<>(new CallableThreadTest()));
        // Thread t4 = new Thread(()-> System.out.println("方式4:使用lambada表達式來創建線程。"));
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> System.out.println("方式3:使用線程池來創建線程。"));
        t1.start();
        t2.start();
        t3.start();
        //t4.start();
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
        t1.join();
        t2.join();
        t3.join();
        //t4.join();

    }

    static class MyFirstThread extends Thread {
        @Override
        public void run() {
            System.out.println("方式1:繼承Thread類並重寫run方法來創建線程");
        }
    }

    /**
     * 方式二, 實現Runnable接口來創建線程
     */
    static class MySecondThread implements Runnable {

        @Override
        public void run() {
            System.out.println("方式2:實現Runnable方式來創建線程");
        }
    }

    static class CallableThreadTest implements Callable<Integer> {
        @Override
        public Integer call() {
            int i;
            for (i = 0; i < 10; i++) {
                i++;
            }
            System.out.println("方式3,實現Callable接口方式來創建有返回值的線程,返回值是:" + i);
            return i;
        }
    }
}

線程狀態和切換

NEW

線程剛剛創建,還沒有啟動,New Thread的時候,還沒有調用start方法時候,就是這個狀態

RUNNABLE

可運行狀態,由線程調度器可以安排執行,包括以下兩種情況:

  • READY
  • RUNNING

READY和RUNNING通過yield來切換

WAITING

等待被喚醒

TIMED_WAITING

隔一段時間后自動喚醒

BLOCKED

被阻塞,正在等待鎖,只有在synchronized的時候在會進入BLOCKED狀態

TERMINATED

線程執行完畢后,是這個狀態

各個線程狀態切換如下

線程狀態

線程基本操作

sleep

當前線程睡一段時間

yield

這是一個靜態方法,一旦執行,它會使當前線程讓出一下CPU。但要注意,讓出CPU並不表示當前線程不執行了。當前線程在讓出CPU后,還會進行CPU資源的爭奪,但是是否能夠再次被分配到就不一定了。

join

等待另外一個線程的結束,當前線程才會運行,示例代碼如下:

public class ThreadBasicOperation {
    static volatile int sum = 0;

    public static void main(String[] args) throws Exception {
        Thread t = new Thread(() -> {
            for (int i = 1; i <= 100; i++) {
                sum += i;
            }
        });
        t.start();
        // join 方法表示主線程願意等待子線程執行完畢后才繼續執行
        // 如果不使用join方法,那么sum輸出的可能是一個很小的值,因為還沒等子線程
        // 執行完畢后,主線程就已經執行了打印sum的操作
        t.join();
        System.out.println(sum);
    }
}

interrupt

// 打斷某個線程(設置標志位)
interrupt()

// 查詢某線程是否被打斷過(查詢標志位)
isInterrupted()

// 查詢當前線程是否被打斷過,並重置打斷標志位
Thread.interrupted()

示例代碼:ThreadInterrupt.java

關於線程的start方法

問題1:反復調用同一個線程的start()方法是否可行?

問題2:假如一個線程執行完畢(此時處於TERMINATED狀態),再次調用這個線程的start()方法是否可行?

兩個問題的答案都是不可行,在調用一次start()之后,threadStatus的值會改變(threadStatus !=0),此時再次調用start()方法會拋出IllegalThreadStateException異常。比如,threadStatus為2代表當前線程狀態為TERMINATED。

如何結束一個線程

不推薦的方式

  • stop方法
  • suspend/resume方法

以上兩種方式都不建議使用, 因為會釋放所有的鎖, 所以容易產生數據不一致的問題。

優雅的方式

如果不依賴循環的具體次數或者中間狀態, 可以通過設置標志位的方式來控制

public class ThreadFinished {
    private static volatile boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        // 推薦方式:設置標志位
        Thread t3 = new Thread(() -> {
            long i = 0L;
            while (flag) {
                i++;
            }
            System.out.println("count sum i = " + i);
        });
        t3.start();
        TimeUnit.SECONDS.sleep(1);
        flag = false;
    }
}

如果要依賴循環的具體次數或者中間狀態, 則可以用interrupt方式

public class ThreadFinished {

    public static void main(String[] args) throws InterruptedException {
        // 推薦方式:使用interrupt
        Thread t4 = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {

            }
            System.out.println("t4 end");
        });
        t4.start();
        TimeUnit.SECONDS.sleep(1);
        t4.interrupt();
    }
}

示例代碼: ThreadFinished.java

並發編程的三大特性

可見性

所謂線程數據的可見性,指的就是內存中的某個數據,假如第一個CPU的一個核讀取到了,和其他的核讀取到這個數據之間的可見性。

每個線程會保存一份拷貝到線程本地緩存,使用volatile,可以保持線程之間數據可見性。

如下示例: ThreadVisible.java

public class ThreadVisible {

    static volatile boolean flag = true;

    public static void main(String[] args) throws InterruptedException {
        Thread t = new Thread(() -> {
            while (flag) {
                // 如果這里調用了System.out.println()
                // 會無論flag有沒有加volatile,數據都會同步
                // 因為System.out.println()背后調用的synchronized
                // System.out.println();
            }
            System.out.println("t end");
        });
        t.start();
        TimeUnit.SECONDS.sleep(3);
        flag = false;
        // volatile修飾引用變量
        new Thread(a::m, "t2").start();
        TimeUnit.SECONDS.sleep(2);
        a.flag = false;
        // 阻塞主線程,防止主線程直接執行完畢,看不到效果
        new Scanner(System.in).next();
    }

    private static volatile A a = new A();

    static class A {
        boolean flag = true;

        void m() {
            System.out.println("m start");
            while (flag) {
            }
            System.out.println("m end");
        }
    }
}

代碼說明:

  • 如在上述代碼的死循環中增加了System.out.println(), 則會強制同步flag的值,無論flag本身有沒有加volatile
  • 如果volatile修飾一個引用對象,如果對象的屬性(成員變量)發生了改變,volatile不能保證其他線程可以觀察到該變化。

關於三級緩存

3_cache

如上圖,內存讀出的數據會在L3,L2,L1上都存一份。

在從內存中讀取數據的時候,根據的是程序局部性的原理,按塊來讀取,這樣可以提高效率,充分發揮總線CPU針腳等一次性讀取更多數據的能力。

所以這里引入了一個緩存行的概念,目前一個緩存行多用64個字節來表示。

如何來驗證CPU讀取緩存行這件事,我們可以通過一個示例來說明:

/**
 * 緩存行對齊
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @since 1.8
 */
public class CacheLinePadding {
  public static T[] arr = new T[2];

  static {
    arr[0] = new T();
    arr[1] = new T();
  }

  public static void main(String[] args) throws Exception {
    Thread t1 = new Thread(() -> {
      for (long i = 0; i < 1000_0000L; i++) {
        arr[0].x = i;
      }
    });

    Thread t2 = new Thread(() -> {
      for (long i = 0; i < 1000_0000L; i++) {
        arr[1].x = i;
      }
    });

    final long start = System.nanoTime();
    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println((System.nanoTime() - start) / 100_0000);
  }
  private static class Padding {
    public volatile long p1, p2, p3, p4, p5, p6, p7;
  }

  // T這個類extends Padding與否,會影響整個流程的執行時間,如果繼承了,會減少執行時間,
  // 因為繼承Padding后,arr[0]和arr[1]一定不在同一個緩存行里面,所以不需要同步數據,速度就更快一些了。
  private static class T extends Padding   {
    public volatile long x = 0L;
  }
}

說明:以上代碼,T這個類繼承Padding類與否,會影響整個流程的執行時間,如果繼承了,會減少執行時間,因為繼承Padding后,arr[0]arr[1]一定不在同一個緩存行里面,所以不需要同步數據,速度就更快一些了。

jdk1.8增加了一個注解:@Contended,標注了以后,不會在同一緩存行, 僅適用於jdk1.8 還需要增加jvm參數

-XX:-RestrictContended

CPU為每個緩存行標記四種狀態(使用兩位)

M: 被修改(Modified)

該緩存行只被緩存在該CPU的緩存中,並且是被修改過的(dirty),即與主存中的數據不一致,該緩存行中的內存需要在未來的某個時間點(允許其它CPU讀取請主存中相應內存之前)寫回(write back)主存。

當被寫回主存之后,該緩存行的狀態會變成獨享(exclusive)狀態。

E: 獨享的(Exclusive)

該緩存行只被緩存在該CPU的緩存中,它是未被修改過的(clean),與主存中數據一致。該狀態可以在任何時刻當有其它CPU讀取該內存時變成共享狀態(shared)。

同樣地,當CPU修改該緩存行中內容時,該狀態可以變成Modified狀態。

S: 共享的(Shared)

該狀態意味着該緩存行可能被多個CPU緩存,並且各個緩存中的數據與主存數據一致(clean),當有一個CPU修改該緩存行中,其它CPU中該緩存行可以被作廢(變成無效狀態(Invalid))。

I: 無效的(Invalid)

該緩存是無效的(可能有其它CPU修改了該緩存行)。

參考:【並發編程】MESI--CPU緩存一致性協議

有序性

計算機在執行程序時,為了提高性能,編譯器和處理器常常會對指令做重排。

為什么指令重排序可以提高性能?

簡單地說,每一個指令都會包含多個步驟,每個步驟可能使用不同的硬件。因此,流水線技術產生了,它的原理是指令1還沒有執行完,就可以開始執行指令2,而不用等到指令1執行結束之后再執行指令2,這樣就大大提高了效率。

但是,流水線技術最害怕中斷,恢復中斷的代價是比較大的,所以我們要想盡辦法不讓流水線中斷。指令重排就是減少中斷的一種技術。

我們分析一下下面這個代碼的執行情況:

a = b + c;
d = e - f ;

先加載b、c(注意,即有可能先加載b,也有可能先加載c),但是在執行add(b,c)的時候,需要等待b、c裝載結束才能繼續執行,也就是增加了停頓,那么后面的指令也會依次有停頓,這降低了計算機的執行效率。

為了減少這個停頓,我們可以先加載e和f,然后再去加載add(b,c),這樣做對程序(串行)是沒有影響的,但卻減少了停頓。既然add(b,c)需要停頓,那還不如去做一些有意義的事情。

綜上所述,指令重排對於提高CPU處理性能十分必要。雖然由此帶來了亂序的問題,但是這點犧牲是值得的。

指令重排一般分為以下三種:

  • 編譯器優化重排

    編譯器在不改變單線程程序語義的前提下,可以重新安排語句的執行順序。

  • 指令並行重排

    現代處理器采用了指令級並行技術來將多條指令重疊執行。如果不存在數據依賴性(即后一個執行的語句無需依賴前面執行的語句的結果),處理器可以改變語句對應的機器指令的執行順序。

  • 內存系統重排

    由於處理器使用緩存和讀寫緩存沖區,這使得加載(load)和存儲(store)操作看上去可能是在亂序執行,因為三級緩存的存在,導致內存與緩存的數據同步存在時間差。

指令重排可以保證串行語義一致,但是沒有義務保證多線程間的語義也一致。所以在多線程下,指令重排序可能會導致一些問題。

亂序存在的條件是:不影響單線程的最終一致性(as - if - serial)

驗證亂序執行的程序示例 DisOrder.java:

public class DisOrder {
  private static int x = 0, y = 0;
  private static int a = 0, b = 0;

  // 以下程序可能會執行比較長的時間
  public static void main(String[] args) throws InterruptedException {
    int i = 0;
    for (;;) {
      i++;
      x = 0;
      y = 0;
      a = 0;
      b = 0;
      Thread one = new Thread(() -> {
        // 由於線程one先啟動,下面這句話讓它等一等線程two. 讀着可根據自己電腦的實際性能適當調整等待時間.
        shortWait(100000);
        a = 1;
        x = b;
      });

      Thread other = new Thread(() -> {
        b = 1;
        y = a;
      });
      one.start();
      other.start();
      one.join();
      other.join();
      String result = "第" + i + "次 (" + x + "," + y + ")";
      if (x == 0 && y == 0) {
        // 出現這個分支,說明指令出現了重排
        // 否則不可能 x和y同時都為0
        System.err.println(result);
        break;
      } else {
        // System.out.println(result);
      }
    }
  }

  public static void shortWait(long interval) {
    long start = System.nanoTime();
    long end;
    do {
      end = System.nanoTime();
    } while (start + interval >= end);
  }
}

如上示例,如果指令不出現亂序,那么x和y不可能同時為0,通過執行這個程序可以驗證出來,在我本機測試的結果是:

執行到第1425295次 出現了x和y同時為0的情況。

原子性

程序的原子性是指整個程序中的所有操作,要么全部完成,要么全部失敗,不可能滯留在中間某個環節;在多個線程一起執行的時候,一個操作一旦開始,就不會被其他線程所打斷。

一個示例:

class T {   
    m =9;
}

對象T在創建過程中,背后其實是包含了多條執行語句的,由於有CPU亂序執行的情況,所以極有可能會在初始化過程中生成以一個半初始化對象t,這個t的m等於0(還沒有來得及做賦值操作)

所以,不要在某個類的構造方法中啟動一個線程,這樣會導致this對象逸出,因為這個類的對象可能還來不及執行初始化操作,就啟動了一個線程,導致了異常情況。

volatile一方面可以保證線程數據之間的可見性,另外一方面,也可以防止類似這樣的指令重排,所以 所以,單例模式中,DCL方式的單例一定要加volatile修飾:

public class Singleton6 {
    private volatile static Singleton6 INSTANCE;
 
    private Singleton6() {
    }
 
    public static Singleton6 getInstance() {
        if (INSTANCE == null) {
            synchronized (Singleton6.class) {
                if (INSTANCE == null) {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    INSTANCE = new Singleton6();
                }
            }
        }
        return INSTANCE;
    }
}

具體可以參考設計模式學習筆記 中單例模式的說明。

實際上,只有很低版本的 Java 才會有這個問題。我們現在用的高版本的 Java 已經在 JDK 內部實現中解決了這個問題(解決的方法很簡單,只要把對象 new 操作和初始化操作設計為原子操作,就自然能禁止重排序)。

CAS

比較與交換的意思

舉個例子:

內存有個值是3,如果用Java通過多線程去訪問這個數,每個線程都要把這個值+1。

之前是需要加鎖,即synchronized關鍵字來控制。但是JUC的包出現后,有了CAS操作,可以不需要加鎖來處理,流程是:

第一個線程:把3拿過來,線程本地區域做計算+1,然后把4寫回去。

第二個線程:也把3這個數拿過來,線程本地區域做計算+1后,在回寫回去的時候,會做一次比較,如果原來的值還是3,那么說明這個值之前沒有被打擾過,就可以把4寫回去,如果這個值變了,假設變為了4,那么說明這個值已經被其他線程修改過了,那么第二個線程需要重新執行一次,即把最新的4拿過來繼續計算,回寫回去的時候,繼續做比較,如果內存中的值依然是4,說明沒有其他線程處理過,第二個線程就可以把5回寫回去了。

流程圖如下

cas_case

ABA問題

CAS會出現一個ABA的問題,即在一個線程回寫值的時候,其他線程其實動過那個原始值,只不過其他線程操作后這個值依然是原始值。

如何來解決ABA問題呢?

我們可以通過版本號或者時間戳來控制,比如數據原始的版本是1.0,處理后,我們把這個數據的版本改成變成2.0版本, 時間戳來控制也一樣,

以Java為例,AtomicStampedReference這個類,它內部不僅維護了對象值,還維護了一個時間戳。當AtomicStampedReference對應的數值被修改時,除了更新數據本身外,還必須要更新時間戳。當AtomicStampedReference設置對象值時,對象值以及時間戳都必須滿足期望值,寫入才會成功。因此,即使對象值被反復讀寫,寫回原值,只要時間戳發生變化,就能防止不恰當的寫入。

CAS的底層實現

Unsafe.cpp-->Atom::cmpxchg-->Atomic_linux_x86_inline.hpp-->調用了匯編的LOCK_IF_MP方法

Multiple_processor

lock cmpxchg

雖然cmpxchg指令不是原子的,但是加了lock指令后,則cmpxhg被上鎖,不允許被打斷。 在單核CPU中,無須加lock,在多核CPU中,必須加lock,可以參考stackoverflow上的這個回答:

is-x86-cmpxchg-atomic-if-so-why-does-it-need-lock

使用CAS好處

jdk早期是重量級別鎖 ,通過0x80中斷 進行用戶態和內核態轉換,所以效率比較低,有了CAS操作,大大提升了效率。

對象的內存布局(Hotspot實現)

使用jol查看一個對象的內存布局

我們可以通過jol包來查看一下某個對象的內存布局

引入jol依賴

<dependency>
   <groupId>org.openjdk.jol</groupId>
   <artifactId>jol-core</artifactId>
   <version>0.15</version>
</dependency>

示例代碼(ObjectModel.java)

import org.openjdk.jol.info.ClassLayout;

/**
 * 對象的內存布局
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @since
 */
// 配置VM參數 -XX:+UseCompressedClassPointers
public class ObjectModel {
    public static void main(String[] args) {
        System.out.println("======T1=======");
        T1 o = new T1();
        String s = ClassLayout.parseInstance(o).toPrintable();
        System.out.println(s);
        System.out.println("======T1=======");
        System.out.println("======T2=======");
        T2 o1 = new T2();
        String s1 = ClassLayout.parseInstance(o1).toPrintable();
        System.out.println(s1);
        System.out.println("======T2=======");
    }
    static class  T1{
        public int a = 3;
    }
    static class T2{
        public int a = 3;
        public long b = 3L;
    }
}

配置VM參數,開啟指針壓縮

-XX:+UseCompressedClassPointers

可以看到結果是

======T1=======
git.snippets.juc.ObjectModel$T1 object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     0x0000000000000001 (non-biasable; age: 0)
  8   4        (object header: class)    0xf800c143
 12   4    int T1.a                      3
Instance size: 16 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total

======T1=======
======T2=======
git.snippets.juc.ObjectModel$T2 object internals:
OFF  SZ   TYPE DESCRIPTION               VALUE
  0   8        (object header: mark)     0x0000000000000001 (non-biasable; age: 0)
  8   4        (object header: class)    0xf800c489
 12   4    int T2.a                      3
 16   8   long T2.b                      3
Instance size: 24 bytes
Space losses: 0 bytes internal + 0 bytes external = 0 bytes total

======T2=======

對於T1來說,其中8個字節的markword4個字節的類型指針,可以找到T.class,這里一共是12個字節, 由於字節數務必是8的整數倍,所以補上4個字節,共16個字節

對於T2來說,其中多了8位表示long這個成員變量, 相加等於24,正好是8的整數倍,不需要補齊。

內存布局詳細說明

object_model_of_hotspot

使用synchronized就是修改了對象的markword信息,markword中還記錄了GC信息,Hashcode信息

鎖升級過程

image

偏向鎖

synchronized代碼段多數時間是一個線程在運行,誰先來,這個就偏向誰,用當前線程標記一下。

輕量級鎖(自旋鎖,無鎖)

偏向鎖撤銷,然后競爭,每個線程在自己線程棧中存一個LR(lock record)鎖記錄
偏向鎖和輕量級鎖都是用戶空間完成的,重量級鎖需要向操作系統申請。
兩個線程爭搶的方式將lock record的指針,指針指向哪個線程的LR,哪個線程就拿到鎖,另外的線程用CAS的方式繼續競爭

重量級鎖

JVM的ObjectMonitor去操作系統申請。

interpreteRuntime.cpp --> monitorenter

image

如果發生異常,synchronized會自動釋放鎖

package git.snippets.juc;

import java.util.concurrent.TimeUnit;

public class ExceptionCauseUnLock {
    /*volatile */ boolean stop = false;

    public static void main(String[] args) {
        ExceptionCauseUnLock t = new ExceptionCauseUnLock();
        new Thread(t::m, "t1").start();
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (t.stop) {
            int m = 1 / 0;
        }
    }

    synchronized void m() {
        while (!stop) {
            stop = true;
        }
    }
}

鎖重入

synchronized是可重入鎖, 可重入次數必須記錄,因為解鎖需要對應可重入次數的記錄

偏向鎖:記錄在線程棧中,每重入一次,LR+1,備份原來的markword

輕量級鎖:類似偏向鎖

重量級鎖:記錄在ObjectMonitor的一個字段中

自旋鎖什么時候升級為重量級鎖?

  • 有線程超過十次自旋
  • -XX:PreBlockSpin(jdk1.6之前)
  • 自旋的線程超過CPU核數一半
  • jdk1.6 以后,JVM自己控制

為什么有偏向鎖啟動和偏向鎖未啟動?

未啟動:普通對象001 已啟動:匿名偏向101

為什么有自旋鎖還需要重量級鎖?

因為自旋會占用CPU時間,消耗CPU資源,如果自旋的線程多,CPU資源會被消耗,所以會升級成重量級鎖(隊列)例如:ObjectMonitor里面的WaitSet,重量級鎖會把線程都丟到WaitSet中凍結, 不需要消耗CPU資源

偏向鎖是否一定比自旋鎖效率高?

明確知道多線程的情況下,不一定。 因為偏向鎖在多線程情況下,會涉及到鎖撤銷,這個時候直接使用自旋鎖,JVM啟動過程,會有很多線程競爭,比如啟動的時候,肯定是多線程的,所以默認情況,啟動時候不打開偏向鎖,過一段時間再打開。
有一個參數可以配置:BiasedLockingStartupDelay默認是4s

偏向鎖狀態下,調用了wait方法,直接升級成重量級鎖

一個線程拿20個對象進行加鎖,批量鎖的重偏向(20個對象),批量鎖撤銷(變成輕量級鎖)(40個對象), 通過Epoch中的值和對應的類對象里面記錄的值比較。

synchronized

鎖定對象

package git.snippets.juc;

/**
 * synchronized鎖定對象
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/15
 * @since
 */
public class SynchronizedObject implements Runnable {
    static SynchronizedObject instance = new SynchronizedObject();
    final Object object = new Object();
    static volatile int i = 0;

    @Override
    public void run() {
        for (int j = 0; j < 1000000; j++) {
            // 任何線程要執行下面的代碼,必須先拿到object的鎖
            synchronized (object) {
                i++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}

鎖定方法

  • 鎖定靜態方法相當於鎖定當前類
package git.snippets.juc;

/**
 * synchronized鎖定靜態方法,相當於鎖定當前類
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/15
 * @since
 */
public class SynchronizedStatic implements Runnable {
    static SynchronizedStatic instance = new SynchronizedStatic();
    static volatile int i = 0;

    @Override
    public void run() {
        increase();
    }

    // 相當於synchronized(SynchronizedStatic.class)
    synchronized static void increase() {
        for (int j = 0; j < 1000000; j++) {
            i++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}

  • 鎖定非靜態方法相當於鎖定該對象的實例或synchronized(this)
package git.snippets.juc;

/**
 * synchronized鎖定方法
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/15
 * @since
 */
public class SynchronizedMethod implements Runnable {
    static SynchronizedMethod instance = new SynchronizedMethod();
    static volatile int i = 0;

    @Override
    public void run() {
        increase();
    }
    void increase() {
        for (int j = 0; j < 1000000; j++) {
            synchronized (this) {
                i++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(instance);
        Thread t2 = new Thread(instance);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
}

臟讀

package git.snippets.juc;

import java.util.concurrent.TimeUnit;

/**
 * 模擬臟讀
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/15
 * @since
 */
public class DirtyRead {
    String name;
    double balance;

    public static void main(String[] args) {
        DirtyRead a = new DirtyRead();
        Thread thread = new Thread(() -> a.set("zhangsan", 100.0));

        thread.start();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(a.getBalance("zhangsan"));
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(a.getBalance("zhangsan"));
    }

    public synchronized void set(String name, double balance) {
        this.name = name;

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        this.balance = balance;
    }

    // 如果get方法不加synchronized關鍵字,就會出現臟讀情況
    public /*synchronized*/ double getBalance(String name) {
        return this.balance;
    }
}

其中的getBalance方法,如果不加synchronized,就會產生臟讀的問題。

可重入鎖

一個同步方法可以調用另外一個同步方法,
一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖(可重入鎖)
子類synchronized,如果調用父類的synchronize方法:super.method(),如果不可重入,直接就會死鎖。

package git.snippets.juc;

import java.io.IOException;

/**
 * 一個同步方法可以調用另外一個同步方法,一個線程已經擁有某個對象的鎖,再次申請的時候仍然會得到該對象的鎖.
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @since
 */
public class SynchronizedReentry implements Runnable {


    public static void main(String[] args) throws IOException {
        SynchronizedReentry myRun = new SynchronizedReentry();
        Thread thread = new Thread(myRun, "t1");
        Thread thread2 = new Thread(myRun, "t2");
        thread.start();
        thread2.start();
        System.in.read();

    }

    synchronized void m1(String content) {
        System.out.println(this);
        System.out.println("m1 get content is " + content);
        m2(content);
    }

    synchronized void m2(String content) {
        System.out.println(this);
        System.out.println("m2 get content is " + content);

    }

    @Override
    public void run() {
        m1(Thread.currentThread().getName());
    }
}

程序在執行過程中,如果出現異常,默認情況鎖會被釋放 ,所以,在並發處理的過程中,有異常要多加小心,不然可能會發生不一致的情況。比如,在一個webapp處理過程中,多個Servlet線程共同訪問同一個資源,這時如果異常處理不合適,在第一個線程中拋出異常,其他線程就會進入同步代碼區,有可能會訪問到異常產生時的數據。因此要非常小心的處理同步業務邏輯中的異常。

示例見:SynchronizedException.java

synchronized的底層實現

在早期的JDK使用的是OS的重量級鎖

后來的改進鎖升級的概念:

synchronized (Object)

  • markword 記錄這個線程ID (使用偏向鎖)
  • 如果線程爭用:升級為 自旋鎖
  • 10次自旋以后,升級為重量級鎖 - OS

所以:

  • 執行時間短(加鎖代碼),線程數少,用自旋
  • 執行時間長,線程數多,用系統鎖

synchronized不能鎖定String常量,Integer,Long等基礎類型

見示例:

SynchronizedBasicType.java

鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用; 但是如果o變成另外一個對象,則鎖定的對象發生改變, 應該避免將鎖定對象的引用變成另外的對象

package git.snippets.juc;

import java.util.concurrent.TimeUnit;

/**
 * 鎖定某對象o,如果o的屬性發生改變,不影響鎖的使用
 * 但是如果o變成另外一個對象,則鎖定的對象發生改變
 * 應該避免將鎖定對象的引用變成另外的對象
 */
public class SyncSameObject {
    Object object = new Object();

    public static void main(String[] args) {
        SyncSameObject t = new SyncSameObject();
        new Thread(t::m).start();
        Thread t2 = new Thread(t::m, "t2");
        //鎖對象發生改變,所以t2線程得以執行,如果注釋掉這句話,線程2將永遠得不到執行機會
        t.object = new Object();

        t2.start();
    }

    void m() {
        synchronized (object) {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("current thread is " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

如果不執行

t.object=new Object() 

這句話,m2線程將永遠得不到執行。

死鎖

兩個或兩個以上的線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去,這就是死鎖現象

死鎖產生的原因主要有如下幾點

  1. 系統的資源競爭

  2. 程序在執行過程中申請和釋放資源的順序不當

死鎖產生的必要條件

  1. 互斥條件:進程要求對所分配的資源(如打印機)進行排他性控制,即在一段時間內某資源僅為一個進程所占有。此時若有其他進程請求該資源,則請求進程只能等待。

  2. 不剝奪條件:進程所獲得的資源在未使用完畢之前,不能被其他進程強行奪走,即只能由獲得該資源的進程自己來釋放(只能是主動釋放)。

  3. 請求和保持條件:進程已經保持了至少一個資源,但又提出了新的資源請求,而該資源已被其他進程占有,此時請求進程被阻塞,但對自己已獲得的資源保持不放。

  4. 循環等待條件:存在一種進程資源的循環等待鏈,鏈中每一個進程已獲得的資源同時被鏈中下一個進程所請求。

模擬死鎖代碼

/**
 * 模擬死鎖
 */
public class DeadLock implements Runnable {
    int flag = 1;
    static Object o1 = new Object();
    static Object o2 = new Object();

    public static void main(String[] args) {
        DeadLock lock = new DeadLock();
        DeadLock lock2 = new DeadLock();
        lock.flag = 1;
        lock2.flag = 0;
        Thread t1 = new Thread(lock);
        Thread t2 = new Thread(lock2);
        t1.start();
        t2.start();

    }

    @Override
    public void run() {
        System.out.println("flag = " + flag);
        if (flag == 1) {
            synchronized (o2) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o1) {
                    System.out.println("1");
                }
            }
        }
        if (flag == 0) {
            synchronized (o1) {
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (o2) {
                    System.out.println("0");
                }
            }
        }
    }
}

如何避免死鎖?

1、讓程序每次至多只能獲得一個鎖。當然,在多線程環境下,這種情況通常並不現實。

2、設計時考慮清楚鎖的順序,盡量減少嵌在的加鎖交互數量。

3、增加時限,比如使用Lock類中的tryLock方法去嘗試獲取鎖,這個方法可以指定一個超時時限,在等待超過該時限之后便會返回一個失敗信息。

volatile

  • 保持線程之間的可見性(不保證操作的原子性),依賴這個MESI協議
  • 防止指令重排序,CPU的load fence和store fence原語支持

CPU原來執行指令一步一步執行,現在是流水線執行,編譯以后可能會產生指令的重排序,這樣可以提高性能

關於volatile不保證原子性的代碼示例:

package git.snippets.juc;

/**
 * Volatile保持線程之間的可見性(不保證操作的原子性)
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/19
 * @since
 */
public class VolatileNOTAtomic {
    volatile static Data data;

    public static void main(String[] args) {
        Thread writer = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                data = new Data(i, i);
            }
        });

        Thread reader = new Thread(() -> {
            while (data == null) {
            }
            int a = data.a;
            int b = data.b;
            if (a != b) {
                // 會出現這種情況是因為new Data(i,i)非原子操作,會產生中間狀態的對象,導致a和b的值會不一致
                System.out.printf("a = %s, b=%s%n", a, b);
            }
        });
        writer.start();
        reader.start();
        try {
            writer.join();
            reader.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end");
    }

    public static class Data {
        int a;
        int b;

        Data(int a, int b) {
            this.a = a;
            this.b = b;
        }
    }
}

volatile並不能保證多個線程共同修改running變量時所帶來的不一致問題,也就是說volatile不能替代synchronized, 示例程序:

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;

/**
 * volatile並不能保證多個線程共同修改變量時所帶來的不一致問題,也就是說volatile不能替代synchronized
 *
 * @author <a href="mailto:410486047@qq.com">Grey</a>
 * @date 2021/4/19
 * @since
 */
public class VolatileCanNotReplaceSynchronized {
    volatile int count = 0;
    int count2 = 0;

    public static void main(String[] args) {
        VolatileCanNotReplaceSynchronized t = new VolatileCanNotReplaceSynchronized();
        List<Thread> threads = new ArrayList<>();
        List<Thread> threads2 = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            threads.add(new Thread(t::m));
            threads2.add(new Thread(t::m2));
        }
        threads.forEach(item -> item.start());
        threads2.forEach(item -> item.start());
        threads.forEach(item -> {
            try {
                item.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        threads2.forEach(item -> {
            try {
                item.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t.count);
        System.out.println(t.count2);
    }

    void m() {
        for (int i = 0; i < 1000; i++) {
            count++;
        }
    }

    synchronized void m2() {
        for (int i = 0; i < 1000; i++) {
            count2++;
        }
    }
}

DCL為什么一定要加volatile?

什么是DCL,請參考設計模式學習筆記中的單例模式說明。

在New對象的時候,編譯完實際上是分了三步

  1. 對象申請內存,成員變量會被賦初始值

  2. 成員變量設為真實值

  3. 成員變量賦給對象

指令重排序可能會導致2和3進行指令重排,導致下一個線程拿到一個半初始化的對象,導致單例被破壞。所以DCL必須加volitile

volatile修飾引用對象

被volatile關鍵字修飾的對象作為類變量或實例變量時,其對象中攜帶的類變量和實例變量也相當於被volatile關鍵字修飾了

示例見:VolatileRef.java

AtomicLong VS LongAddr VS Synchronized

需要實際測試一下。

示例見:

  • AddByAtomicLong.java(無鎖操作)
  • AddByLongAdder.java (LongAdder采用了分段鎖,分段鎖又是CAS實現的。多段並行運行,在線程數比較多的情況下,效率比較高。線程數少的情況下沒什么優勢。)
  • AddBySynchronized.java

分別運行上述三個類,得到的執行結果是:

by AtomicLong , result is 1000000000 time is 19149
by LongAdder , result is 1000000000 time is 1498
by synchronized , result is 1000000000 time is 22913

在大數據量的情況下,LongAdder的效率最高。關於LongAdder的一些說明,參考如下兩篇博客:

ReentrantLock

其中“ReentrantReadWriteLock”,“讀鎖的插隊策略”,"鎖的升降級" 部分參考了如下文檔中的內容

Java中的共享鎖和排他鎖(以讀寫鎖ReentrantReadWriteLock為例)

ReentrantLock vs sychronized

可重入鎖,可以替代sychronizedReentrantLocksychronized的區別在於

  1. 可以tryLock,嘗試若干時間片內獲取鎖。 見ReentrantLockTryLock.java

  2. 可以用lockInterruptibly,在lock的時候可以被打斷,一旦被打斷,可以作出響應,而sychronized一旦wait后,必須得讓別人notify,才能醒來。見ReentrantLockInterrupt.java

  3. 可以設置公平與否,公平的概念是,每個線程來了以后會檢查等待隊列里面會不會有等待的線程,如果有,則進入隊列等待。見:ReentrantLockFair.java

  4. synchronized鎖的是對象,鎖信息保存在對象頭中,ReentrantLock通過代碼中int類型的state標識來標識鎖的狀態

注:在使用ReentrantLock的時候一定要記得unlock,因為如果使用synchronized遇到異常,jvm會自動釋放鎖,但是用ReentrantLock必須手動釋放鎖,因此經常在finally中進行鎖的釋放

詳見:

  • ReentrantLockAndSynchronized.java

  • SynchronizedException.java

不管是公平鎖還是非公平鎖,一旦沒有競爭到鎖,都會進行排隊,當鎖釋放時,都是喚醒排在最前面的線程,所以非公平鎖只是體現在了線程加鎖階段,而沒有體現在線程被喚醒階段。

ReentrantReadWriteLock

在ReentrantReadWriteLock中包含讀鎖和寫鎖,
其中讀鎖是可以多線程共享的,即共享鎖, 而寫鎖是排他鎖,在更改時候不允許其他線程操作。
讀寫鎖其實是一把鎖,所以會有同一時刻不允許讀寫鎖共存的規定。
之所以要細分讀鎖和寫鎖也是為了提高效率,將讀和寫分離,

示例:

package git.snippets.juc;


import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * ReentrantReadWriteLock讀寫鎖示例
 **/
public class ReentrantLockReadAndWrite {

    private static ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private static ReentrantReadWriteLock.ReadLock readLock = reentrantLock.readLock();
    private static ReentrantReadWriteLock.WriteLock writeLock = reentrantLock.writeLock();

    public static void read() {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取讀鎖,開始執行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放讀鎖");
        }
    }

    public static void write() {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + "獲取寫鎖,開始執行");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
            System.out.println(Thread.currentThread().getName() + "釋放寫鎖");
        }
    }

    public static void main(String[] args) {
        new Thread(() -> read(), "Thread1").start();
        new Thread(() -> read(), "Thread2").start();
        new Thread(() -> write(), "Thread3").start();
        new Thread(() -> write(), "Thread4").start();
    }
}

讀鎖的插隊策略

設想如下場景:

在非公平的ReentrantReadWriteLock鎖中,線程2和線程4正在同時讀取,線程3想要寫入,拿不到鎖(同一時刻是不允許讀寫鎖共存的),於是進入等待隊列, 線程5不在隊列里,現在過來想要讀取,

策略1

如果允許讀插隊,就是說線程5讀先於線程3寫操作執行,因為讀鎖是共享鎖,不影響后面的線程3的寫操作,
這種策略可以提高一定的效率,卻可能導致像線程3這樣的線程一直在等待中,因為可能線程5讀操作之后又來了n個線程也進行讀操作,造成線程飢餓;

策略2

不允許插隊,即線程5的讀操作必須排在線程3的寫操作之后,放入隊列中,排在線程3之后,這樣能避免線程飢餓。
事實上ReentrantReadWriteLock在非公平情況下,讀鎖采用的就是策略2:不允許讀鎖插隊,避免線程飢餓。更加確切的說是:在非公平鎖情況下,允許寫鎖插隊,也允許讀鎖插隊,

但是讀鎖插隊的前提是隊列中的頭節點不能是想獲取寫鎖的線程。

以上是在非公平ReentrantReadWriteLock鎖中,

在公平鎖中,讀寫鎖都是是不允許插隊的,嚴格按照線程請求獲取鎖順序執行。

示例見:ReentrantLockCut.java

鎖的升降級

ReentrantReadWriteLock讀寫鎖中,只支持寫鎖降級為讀鎖,而不支持讀鎖升級為寫鎖,

之所以ReentrantReadWriteLock不支持鎖的升級(其它鎖可以支持),主要是避免死鎖,

例如兩個線程A和B都在讀, A升級要求B釋放讀鎖,B升級要求A釋放讀鎖,互相等待形成死循環。

如果能嚴格保證每次都只有一個線程升級那也是可以的。

示例見:ReentrantReadWriteLockUpAndDown.java

CAS,Synchronized,Lock的使用情景

1、對於資源競爭較少(線程沖突較輕)的情況,使用synchronized同步鎖進行線程阻塞和喚醒切換以及用戶態內核態間的切換操作額外浪費消耗cpu資源;而CAS基於硬件實現,不需要進入內核,不需要切換線程,操作自旋幾率較少,因此可以獲得更高的性能。

2、對於資源競爭嚴重(線程沖突嚴重)的情況,CAS自旋的概率會比較大,從而浪費更多的CPU資源,效率低於synchronized。

注: synchronized在jdk1.6之后,已經改進優化。synchronized的底層實現主要依靠Lock-Free的隊列,基本思路是自旋后阻塞,競爭切換后繼續競爭鎖,稍微犧牲了公平性,但獲得了高吞吐量。在線程沖突較少的情況下,可以獲得和CAS類似的性能;而線程沖突嚴重的情況下,性能遠高於CAS。

synchronized作為悲觀鎖,比較適合寫入操作比較頻繁的場景,如果出現大量的讀取操作,每次讀取的時候都會進行加鎖,這樣會增加大量的鎖的開銷,降低了系統的吞吐量。

在資源競爭不是很激烈的情況下,偶爾會有同步的情形下,synchronized是很合適的。原因在於,編譯程序通常會盡可能的進行優化synchronized,另外可讀性非常好,不管用沒用過5.0多線程包的程序員都能理解。默認是非公平鎖:后等待的線程可以先獲得鎖。

ReentrantLock比較適合讀取操作比較頻繁的場景,如果出現大量的寫入操作,數據發生沖突的可能性就會增大,為了保證數據的一致性,應用層需要不斷的重新獲取數據,這樣會增加大量的查詢操作,降低了系統的吞吐量。

Atomic和上面的類似,不激烈情況下,性能比synchronized略遜,而激烈的時候,也能維持常態。激烈的時候,Atomic的性能會優於ReentrantLock一倍左右。但是其有一個缺點,就是只能同步一個值,一段代碼中只能出現一個Atomic的變量,多於一個同步無效。因為他不能在多個Atomic之間同步。

CountDownLatch

類似門閂的概念,可以替代join,但是比join靈活,因為一個線程里面可以多次countDown,但是join一定要等線程完成才能執行。

其底層原理是:調用await()方法的線程會利用AQS排隊,一旦數字減為0,則會將AQS中排隊的線程依次喚醒。

package git.snippets.juc;

import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch可以用Join替代
 */
public class CountDownLatchAndJoin {
 public static void main(String[] args) {
  useCountDownLatch();
  useJoin();
 }

 public static void useCountDownLatch() {
  // use countdownlatch
  long start = System.currentTimeMillis();
  Thread[] threads = new Thread[100000];
  CountDownLatch latch = new CountDownLatch(threads.length);

  for (int i = 0; i < threads.length; i++) {
   threads[i] = new Thread(() -> {
    int result = 0;
    for (int i1 = 0; i1 < 1000; i1++) {
     result += i1;
    }
    // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
    latch.countDown();
   });
  }
  for (Thread thread : threads) {
   thread.start();
  }
  try {
   latch.await();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  long end = System.currentTimeMillis();

  System.out.println("end latch down, time is " + (end - start));

 }

 public static void useJoin() {
  long start = System.currentTimeMillis();

  // use join
  Thread[] threads = new Thread[100000];

  for (int i = 0; i < threads.length; i++) {
   threads[i] = new Thread(() -> {
    int result = 0;
    for (int i1 = 0; i1 < 1000; i1++) {
     result += i1;
    }
    // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);
   });
  }
  for (Thread thread : threads) {
   thread.start();
  }
  for (Thread thread : threads) {
   try {
    thread.join();
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }

  long end = System.currentTimeMillis();

  System.out.println("end join, time is " + (end - start));
 }
}

CyclicBarrier

類似柵欄,類比:滿了20個乘客就發車 這樣的場景。

比如:一個程序可能收集如下來源的數據:

  1. 數據庫
  2. 網絡
  3. 文件

程序可以並發執行,用線程操作1,2,3,然后操作完畢后再合並, 然后執行后續的邏輯操作,就可以使用CyclicBarrier

代碼示例見:CyclicBarrierTest.java

Guava RateLimiter

采用令牌桶算法,用於限流

代碼示例見:RateLimiterUsage.java

Phaser(Since jdk1.7)

遺傳算法,可以用這個結婚的場景模擬: 假設婚禮的賓客有5個人,加上新郎和新娘,一共7個人。 我們可以把這7個人看成7個線程,有如下步驟要執行。

  1. 到達婚禮現場
  2. 吃飯
  3. 離開
  4. 擁抱(只有新郎和新娘線程可以執行)

每個階段執行完畢后才能執行下一個階段,其中hug階段只有新郎新娘這兩個線程才能執行。

以上需求,我們可以通過Phaser來實現,具體代碼和注釋如下:

package git.snippets.juc;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserUsage {
    static final Random R = new Random();
    static WeddingPhaser phaser = new WeddingPhaser();

    static void millSleep() {
        try {
            TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        // 賓客的人數
        final int guestNum = 5;
        // 新郎和新娘
        final int mainNum = 2;
        phaser.bulkRegister(mainNum + guestNum);
        for (int i = 0; i < guestNum; i++) {
            new Thread(new Person("賓客" + i)).start();
        }
        new Thread(new Person("新娘")).start();
        new Thread(new Person("新郎")).start();
    }

    static class WeddingPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("所有人到齊");
                    return false;
                case 1:
                    System.out.println("所有人吃飯");
                    return false;
                case 2:
                    System.out.println("所有人離開");
                    return false;
                case 3:
                    System.out.println("新郎新娘擁抱");
                    return true;
                default:
                    return true;
            }
        }
    }
    static class Person implements Runnable {
        String name;
        Person(String name) {
            this.name = name;
        }
        @Override
        public void run() {
            // 先到達婚禮現場
            arrive();
            // 吃飯
            eat();
            // 離開
            leave();
            // 擁抱,只保留新郎和新娘兩個線程可以執行
            hug();
        }
        private void arrive() {
            millSleep();
            System.out.println("name:" + name + " 到來");
            phaser.arriveAndAwaitAdvance();
        }
        private void eat() {
            millSleep();
            System.out.println("name:" + name + " 吃飯");
            phaser.arriveAndAwaitAdvance();
        }
        private void leave() {
            millSleep();
            System.out.println("name:" + name + " 離開");
            phaser.arriveAndAwaitAdvance();
        }
        private void hug() {
            if ("新娘".equals(name) || "新郎".equals(name)) {
                millSleep();
                System.out.println("新娘新郎擁抱");
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
            }
        }
    }
}

StampedLock

StampedLock其實是對讀寫鎖的一種改進,它支持在讀同時進行一個寫操作,也就是說,它的性能將會比讀寫鎖更快。

更通俗的講就是在讀鎖沒有釋放的時候是可以獲取到一個寫鎖,獲取到寫鎖之后,讀鎖阻塞,這一點和讀寫鎖一致,唯一的區別在於讀寫鎖不支持在沒有釋放讀鎖的時候獲取寫鎖。

StampedLock三種模式

  • 悲觀讀:與讀寫鎖的讀寫類似,允許多個線程獲取悲觀讀鎖
  • 寫鎖:與讀寫鎖的寫鎖類似,寫鎖和悲觀讀是互斥的。
  • 樂觀讀:無鎖機制,類似於數據庫中的樂觀鎖,它支持在不釋放樂觀讀的時候是可以獲取到一個寫鎖的,這點和讀寫鎖不同

參考: 【並發編程】面試官:有沒有比讀寫鎖更快的鎖?

示例代碼:

悲觀讀 + 寫鎖: StampedLockPessimistic.java

樂觀讀:StampedLockOptimistic.java

使用StampedLock的注意事項

1.看名字就能看出來StampedLock不支持重入鎖。

2.它適用於讀多寫少的情況,如果不是這種情況,請慎用,性能可能還不如synchronized。

3.StampedLock的悲觀讀鎖、寫鎖不支持條件變量。

4.千萬不能中斷阻塞的悲觀讀鎖或寫鎖,如果調用阻塞線程的interrupt()
,會導致cpu飆升,如果希望StampedLock支持中斷操作,請使用readLockInterruptibly(悲觀讀鎖)與writeLockInterruptibly(寫鎖)。

Semaphore

表示信號量,有如下兩個操作:

s.acquire() 信號量-1

s.release()信號量+1

到0以后,就不能執行了

這個可以用於限流。

底層原理是:如果沒有線程許可可用,則線程阻塞,並通過AQS來排隊,可以通過release()方法來釋放許可,當某個線程釋放了某個許可后,會從AQS中正在排隊的第一個線程依次開始喚醒,直到沒有空閑許可。

Semaphore使用示例:

有N個線程來訪問,我需要限制同時運行的只有信號量大小的線程數,示例代碼:

package git.snippets.juc;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * Semaphore用於限流
 */
public class SemaphoreUsage {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);
        new Thread(() -> {
            try {
                semaphore.acquire();
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 1 executed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();

        new Thread(() -> {
            try {
                semaphore.acquire();
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2 executed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }).start();
    }
}

Semaphore可以有公平和非公平的方式進行配置。

Exchanger

用於線程之間交換數據,exchange()方法是阻塞的,所以要兩個exchange同時執行到才會觸發交換。

package git.snippets.juc;

import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

/**
 * Exchanger用於兩個線程之間交換變量
 */
public class ExchangerUsage {
    static Exchanger<String> semaphore = new Exchanger<>();

    public static void main(String[] args) {

        new Thread(() -> {
            String s = "T1";
            try {
                s = semaphore.exchange(s);
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 1(T1) executed, Result is " + s);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            String s = "T2";
            try {
                s = semaphore.exchange(s);
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Thread 2(T2) executed, Result is " + s);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

LockSupport

其他鎖的底層用的是AQS

原先讓線程等待需要wait/await,現在僅需要LockSupport.park

原先叫醒線程需要notify/notifyAll,現在僅需要LockSupport.unpark, 還可以叫醒指定線程,

示例代碼:

package git.snippets.juc;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

/**
 * 阻塞指定線程,喚醒指定線程
 */
public class LockSupportUsage {
    public static void main(String[] args) {
        Thread t = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    if (i == 5) {
                        LockSupport.park();
                    }
                    if (i == 8) {
                        LockSupport.park();
                    }
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        t.start();
        // unpark可以先於park調用
        //LockSupport.unpark(t);
        try {
            TimeUnit.SECONDS.sleep(8);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        LockSupport.unpark(t);
        System.out.println("after 8 seconds");
    }
}

練習題: 實現一個監控元素的容器

實現一個容器,提供兩個方法,add,size寫兩個線程,線程1添加10個元素到容器中,線程2實現監控元素的個數,當個數到5個時,線程2給出提示並結束

實現方式:

方法1. 使用wait + notify實現,見:MonitorContainer類的useNotifyAndWait方法

方法2. 使用CountDownLatch實現, 見:MonitorContainer類的useCountDownLatch方法

方法3. 使用LockSupport實現,見:MonitorContainer類的useLockSupport方法

練習題:生產者消費者問題

寫一個固定容量的同步容器,擁有put和get方法,以及getCount方法,能夠支持2個生產者線程以及10個消費者線程的阻塞調用。

實現方式:

方法1. 使用wait/notifyAll

方法2. ReentrantLock的Condition,本質就是等待隊列

代碼見:ProducerAndConsumer.java

Java中引用類型和使用場景

見:Java中的引用類型和使用場景

容器

容器

Vector/HashTable

都加了鎖,一般不用

ConcurrentHashMap

ConcurrentHashMap寫效率未必比HashMap,HashTable高,但是讀效率比這兩者要高

示例代碼:HashTableVSCHM.java

輸出:

...use hashtable....
size : 1000000
write cost 349ms
read cost 28322ms...
use HashMap....
size : 1000000
write cost 203ms
read cost 27590ms...
use ConcurrentHashMap....
size : 1000000
write cost 739ms
read cost 785ms

關於ConcurrentHash和HashMap的一些分析,可以參考這篇文章

ConcurrentLinkedQueue

ConcurrentLinkedQueue底層用的是CAS操作。比Vector效率高,示例見:ConcurrentLinkedQueueVSVector.java

CopyOnWriteList

1.CopyOnWriteList內部也是席過數組來實現的,在向CopyOnWriteListt添加元素時會復制一個新的數組,寫數據時在新數組上進行,讀操作在原數組上進行
2.寫操作會加鎖,防止出現並發寫入丟失數據的問題
3.寫操作結束之后會把原數組指向新數組
4.CopyOnWriteList允許在寫操作時來讀取數據,大大提高了讀的性能,因此適合讀多寫少的應用場景,CopyOnWriteList會比較占內存,同時可能讀到的數據不是實時最新的數據,所以不適合實時性要求很高的場景

示例見:CopyOnWriteListVSVector.java

ConcurrentSkipListMap/TreeMap

  • ConcurrentSkipListMap:高並發且排序,底層是跳表實現

  • TreeMap:底層是紅黑樹,排序

Queue VS List

  • Queue中offer和add方法區別在於:offer方法成功與否用返回值判斷,add方法如果加不進會拋異常

  • Queue中,poll是取並remove這個元素 put方法:如果滿,阻塞。take:如果空,阻塞。底層用的是park/unpark

  • Queue提供了對線程友好的API: offer peek poll

  • BlockingQueue中的put和take方法是阻塞的。 示例:BlockingQueueUsage.java

DelayQueue

  • 用於:按時間進行任務調度。示例:DelayQueueUsage.java

SynchronousQueue

  • 容量為0,示例:SynchronousQueueUsage.java

Exchanger用法

  • 線程之間交換數據。 示例見:ExchangerUsage.java

PriorityQueue

  • 默認是小根堆
  • 需要實現比較器

TransferQueue

  • transfer方法是執行然后等待取走

示例見:TransferQueueUsage.java

練習題,多線程打印A1B2C3

  • 使用wait,notify
  • 使用LockSupport
  • 使用volatile
  • 使用BlockingQueue
  • 使用ReentrantLock的Condition
  • 使用TransferQueue

代碼見:A1B2C3.java

線程池

工作原理

線程池內部是通過隊列+線程程實現的,當我們利用程池執行任務時:

  1. 如果此時線程池中的線程數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。

  2. 如果此時線程池中的線程數量等於corePoolSize,但是緩沖隊列workQueue未滿,那么任務被放入緩沖隊列。

  3. 如果此時線程池中的線程數量大於等於corePoolSize,緩沖隊列workQueue已滿,並且線程池中的線程數量小於maximumPoolSize,建新的線程來處理被添加的任務。

  4. 如果此時線裎池中的線數量大於corePoolSize,緩存沖隊列workQueue已滿, 並且線程池中的數量等於maximumPoolSiz,那么過handler所指定的策略來處理此任務。

  5. 當線程池中的線程數量大於corePoolSize時,如果某線程空閑時間超過keepAliveTime, 線將被終止。這樣,線程池可以動態的調整池中的線程數。

相關配置

corePoolSize:核心線程數

maximumPoolSize:最大線程數 【包括核心線程數】

keepAliveTime:生存時間【線程長時間不干活了,歸還給操作系統,核心線程不用歸還,可以指定是否參與歸還過程】

生存時間單位

任務隊列:等待隊列,如果不指定,最大值是Integer.MAX_VALUE【各種各樣的BlockingQueue】

線程工廠【默認設置優先級是普通優先級,非守護線程】,最好自定義線程名稱,方便回溯

拒絕策略,包括以下四種:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新提交被拒絕的任務

ThreadPoolExecutor.CallerRunsPolicy:由調用線程(提交任務的線程)處理該任務

執行流程:先占滿核心線程-> 再占滿任務隊列-> 再占滿(最大線程數-核心線程數)-> 最后執行拒絕策略
一般自定義拒絕策略:將相關信息保存到redis,kafka,日志,MySQL記錄 實現RejectedExecutionHandler並重寫rejectedExecution方法

自定義拒絕策略代碼示例:

package git.snippets.juc;

import java.util.concurrent.*;

/**
 * 自定義拒絕策略
 */
public class MyRejectedHandler {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(4, 4,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
                Executors.defaultThreadFactory(),
                new MyHandler());
    }

    static class MyHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //log("r rejected")
            //save r kafka mysql redis
            //try 3 times
            if (executor.getQueue().size() < 10000) {
                //try put again();
            }
        }
    }
}

SingleThreadPool

  • 保證線程按順序執行
  • 為什么要有單線程的線程池?這個主要是用來做任務隊列和線程生命周期管理
  • 使用LinkedBlockingQueue作為任務隊列,上界為:Integer.MAX_VALUE(2147483647) 約等於無界。

示例代碼見:SingleThreadPoolUsage.java

CachedThreadPool

  • corePoolSize:0
  • maxiumPoolSize:Integer.MAX_VALUE(2147483647)
  • keepAliveTime 60秒
  • 使用SynchronousQueue作為任務隊列 必須馬上執行

使用示例:CachedThreadPoolUsage.java

FixedThreadPool

  • 最大線程數=核心線程數
  • 使用LinkedBlockingQueue作為任務隊列,上界為:Integer.MAX_VALUE(2147483647)

使用示例見:FixedThreadPoolUsage.java

ScheduledThreadPool

  • 使用DelayWorkQueue

  • scheduleAtFixedRate

當前任務執行時間小於間隔時間,每次到點即執行;
當前任務執行時間大於等於間隔時間,任務執行后立即執行下一次任務。相當於連續執行了。

  • scheduleWithFixedDelay

每當上次任務執行完畢后,間隔一段時間執行。不管當前任務執行時間大於、等於還是小於間隔時間,執行效果都是一樣的。

使用示例:ScheduleThreadPoolUsage.java

ForkJoinPool

  • since jdk1.7

  • RecursiveAction

它是一種沒有任何返回值的任務。只是做一些工作,比如寫數據到磁盤,然后就退出了。 一個RecursiveAction可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的線程或者CPU執行。
我們可以通過繼承來實現一個RecursiveAction。

  • RecursiveTask

它是一種會返回結果的任務。可以將自己的工作分割為若干更小任務,並將這些子任務的執行合並到一個集體結果。 可以有幾個水平的分割和合並。

ForkJoinPool 使用示例:ForkJoinPoolUsage.java

流式API底層也是ForkJoinPool實現的。

MapReduce模型

  • 定義任務的時候,實現ForkJoinTask(原始)
  • 或者使用 RecursiveAction(不帶返回值) ,RecursiveTask(帶返回值)

參考:

WorkStealingPool

每個線程都有單獨的隊列,每個線程隊列執行完畢后,就會去其他的線程隊列里面拿過來執行, 底層是:ForkJoinPool

  • since jdk 1.8
  • 會自動啟動cpu核數個線程去執行任務

使用示例:WorkStealingPoolUsage.java

CompletableFuture

  • since jdk 1.8
  • anyOf()可以實現“任意個CompletableFuture只要一個成功”,allOf()可以實現“所有CompletableFuture都必須成功”,這些組合操作可以實現非常復雜的異步流程控制。

使用示例:CompletableFutureUsage.java

思考題

  • 證明原子操作類比synchronized更高效

  • AtomXXX類可以保證可見性嗎?請寫一個程序來證明

  • 寫一個程序證明AtomXXX類的多個方法並不構成原子性

示例代碼:AtomVSSync.java

參考資料

工作線程數究竟要設置為多少 | 架構師之路

實戰Java高並發程序設計(第2版)

深入淺出Java多線程

多線程與高並發-馬士兵

Java並發編程實戰

【並發編程】MESI--CPU緩存一致性協議

【並發編程】細說並發編程的三大特性

設計模式學習筆記

從LONGADDER看更高效的無鎖實現

Java 8 Performance Improvements: LongAdder vs AtomicLong

Java中的共享鎖和排他鎖(以讀寫鎖ReentrantReadWriteLock為例)

【並發編程】面試官:有沒有比讀寫鎖更快的鎖?

ThreadLocal詳解、ThreadLocal與弱引用間的關系

HashMap?ConcurrentHashMap?相信看完這篇沒人能難住你!

TransferQueue實例

理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的區別

ForkJoinPool 的使用以及原理

聊聊並發(八)——Fork/Join 框架介紹

使用CompletableFuture

進程、線程、協程三者之間的聯系與區別

Java如何實現協程

圖解Java多線程設計模式

Java多線程:死鎖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM