Java並發編程 基礎知識學習總結


Java並發編程一直是Java程序員必須懂但又是很難懂的技術內容,這部分的內容我也是反復學習了好幾遍才能理解。本篇博客梳理一下最近從《Java 並發編程的藝術》和他人的博客學習Java並發編程的思路,本篇博客只梳理了Java並發整體的框架,以及羅列了重點內容和參考學習資料,由於篇幅問題就不對每個知識點做過多的深入。

一、進程與線程、並發與並行的概念,為什么要使用多線程

程序一段靜態的代碼,一組指令的有序集合,它本身沒有任何運行的含義,它只是一個靜態的實體,是應用軟件執行的藍本。

  進程進程是CPU分配資源的最小單元,是程序的一次動態執行,它對應着從代碼加載,執行至執行完畢的一個完整的過程,是一個動態的實體,它有自己的生命周期。它因創建而產生,因調度而運行,因等待資源或事件而被處於等待狀態,因完成任務而被撤消。進程是應用程序的執行實例,每個進程是由私有的虛擬地址空間、代碼、數據和其它系統資源組成。進程在運行時創建的資源隨着進程的終止而死亡。

  線程線程是CPU調度的基本單元,可以理解為進程的多條執行線索,每條線索又對應着各自獨立的生命周期。線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。一個線程可以創建和撤銷另一個線程,同一個進程中的多個線程之間可以並發執行。

  一個普通的Java SE程序啟動后它就是一個進程,進程相當於一個空盒,它只提供資源裝載的空間,具體的調度並不是由進程來完成的,而是由線程來完成的。一個java程序從main開始之后,進程啟動,為整個程序提供各種資源,而此時將啟動一個線程,這個線程就是主線程,它將調度資源,進行具體的操作。Thread、Runnable的開啟的線程是主線程下的子線程,是父子關系,此時該java程序即為多線程的,這些線程共同進行資源的調度和執行。

  並行同時進行幾個任務;並行是指兩個或者多個事件在同一時刻發生

 

  並發根據虛擬機分配的時間片分時間運行不同的任務,同一時間只有一個任務在進行。並發是指兩個或多個事件在同一時間間隔發生。

  從圖中可以清楚的看出,並發中,一個單核的CPU在同一時間只能執行一個線程中的任務,CPU通過給每個線程分配CPU時間片來實現這個機制的,時間片是CPU分配給各個線程的時間,因為時間片非常短,所以CPU通過不停地切換線程執行,讓我們感覺多個線程是同時執行的,時間片一般是幾十毫秒(ms)。

  多個處理器、或集群,才能做到並行。單核單處理器無法並行執行程序,只能並發執行。

  上下文切換CPU通過時間片分配算法來循環執行任務,當前任務執行一個時間片后會切換到下一個任務。但是,在切換前會保存上一個任務的狀態,以便下次切換回這個任務時,可以再加載這個任務的狀態。所以任務從保存再加載的過程就是一次上下文切換。

  多線程程序會增加線程創建、上下文切換的開銷以及資源調度的時間,在一些特定的環境下,多線程程序並不一定比單線程程序快。

  為什么要使用多線程

  使用多線程的理由之一是和進程相比,它是一種非常花銷小,切換快,更"節儉"的多任務操作方式。在Linux系統下,啟動一個新的進程必須分配給它獨立的地址空間,建立眾多的數據表來維護它的代碼段、堆棧段和數據段,這是一種"昂貴"的多任務工作方式。而在進程中的同時運行多個線程,它們彼此之間使用相同的地址空間,共享大部分數據,啟動一個線程所花費的空間遠遠小於啟動一個進程所花費的空間,而且,線程間彼此切換所需的時間也遠遠小於進程間切換所需要的時間。
  使用多線程的理由之二是線程間方便的通信機制。對不同進程來說,它們具有獨立的數據空間,要進行數據的傳遞只能通過通信的方式進行,這種方式不僅費時,而且很不方便。線程則不然,由於同一進程下的線程之間共享數據空間,所以一個線程的數據可以直接為其它線程所用,這不僅快捷,而且方便。當然,數據的共享也帶來其他一些問題,有的變量不能同時被兩個線程所修改,有的子程序中聲明為static的數據更有可能給多線程程序帶來災難性的打擊,這些正是編寫多線程程序時最需要注意的地方。
  除了以上所說的優點外,不和進程比較,多線程程序作為一種多任務、並發的工作方式,當然有以下的優點:
  1) 提高應用程序響應,充分利用多核CPU,提升CPU利用率。這對圖形界面的程序尤其有意義,當一個操作耗時很長時,整個系統都會等待這個操作,此時程序不會響應鍵盤、鼠標、菜單的操作,而使用多線程技術,將耗時長的操作(time consuming)置於一個新的線程,可以避免這種尷尬的情況。
  2) 使多CPU系統更加有效。操作系統會保證當線程數不大於CPU數目時,不同的線程運行於不同的CPU上。
  3) 改善程序結構。一個既長又復雜的進程可以考慮分為多個線程,成為幾個獨立或半獨立的運行部分,這樣的程序會利於理解和修改,以及程序功能的解耦。 

 

二、JMM/原子性、可見性、有序性

  任何語言最終都是運行在處理器上,JVM虛擬機為了給開發者一個一致的編程內存模型,需要制定一套規則,這套規則可以在不同架構的機器上有不同實現,並且向上為程序員提供統一的JMM內存模型。

  所以了解JMM內存模型也是了解Java並發原理的一個重點,其中了解指令重排,內存屏障,以及可見性原理尤為重要。

  JMM只保證happens-before和as-if-serial規則,所以在多線程並發時,可能出現原子性,可見性以及有序性這三大問題。

    • 原子性
      原子,即一個不可再被分割的顆粒。在Java中原子性指的是一個或多個操作要么全部執行成功要么全部執行失敗。
    • 有序性
      程序執行的順序按照代碼的先后順序執行。(處理器可能會對指令進行重排序)
    • 可見性
      當多個線程訪問同一個變量時,如果其中一個線程對其作了修改,其他線程能立即獲取到最新的值。

  Java提供了volatile、synchronized、lock等關鍵字方便程序員解決原子性、可見性、以及有序性等問題。

  JMM的詳細介紹見我之前的一篇博文:https://www.cnblogs.com/kukri/p/9109639.html

 

三、線程六大狀態。線程啟動、中斷。

Java中線程的狀態分為六種:

  • 初始(NEW):新創建了一個線程對象,但還沒有調用start()方法。
  • 運行(RUNNABLE):Java線程中將就緒(ready)和運行中(running)兩種狀態籠統的稱為“運行”。線程對象創建后,其他線程(比如main線程)調用了該對象的start()方法。該狀態的線程位於可運行線程池中,等待被線程調度選中,獲取CPU的使用權,此時處於就緒狀態(ready)。就緒狀態的線程在獲得CPU時間片后變為運行中狀態(running)。
  • 阻塞(BLOCKED):表示線程阻塞於鎖。
  • 等待(WAITING):進入該狀態的線程需要等待其他線程做出一些特定動作(通知或中斷)。
  • 超時等待(TIMED_WAITING):該狀態不同於WAITING,它可以在指定的時間后自行返回。
  • 終止(TERMINATED):表示該線程已經執行完畢。

線程狀態轉換圖:

1. 初始狀態

  實現Runnable接口和繼承Thread可以得到一個線程類,new一個實例出來,線程就進入了初始狀態。

2.1. 運行狀態——就緒

就緒狀態只是說你資格運行,調度程序沒有挑選到你,你就永遠是就緒狀態。

調用線程的start()方法,此線程進入就緒狀態。

當前線程sleep()方法結束,其他線程join()結束,等待用戶輸入完畢,某個線程拿到對象鎖,這些線程也將進入就緒狀態。

當前線程時間片用完了,調用當前線程的yield()方法,當前線程進入就緒狀態。

鎖池里的線程拿到對象鎖后,進入就緒狀態。

2.2. 運行狀態——運行中

線程調度程序從可運行池中選擇一個線程作為當前線程時線程所處的狀態。這也是線程進入運行狀態的唯一一種方式。

3. 阻塞狀態

阻塞狀態是線程阻塞在進入synchronized關鍵字修飾的方法或代碼塊(獲取鎖)時的狀態。

4. 等待

處於這種狀態的線程不會被分配CPU執行時間,它們要等待被顯式地喚醒,否則會處於無限期等待的狀態。

5. 超時等待

處於這種狀態的線程不會被分配CPU執行時間,不過無須無限期等待被其他線程顯示地喚醒,在達到一定時間后它們會自動喚醒。

6. 終止狀態

  當線程的run()方法完成時,或者主線程的main()方法完成時,我們就認為它終止了。這個線程對象也許是活的,但是,它已經不是一個單獨執行的線程。線程一旦終止了,就不能復生。

在一個終止的線程上調用start()方法,會拋出java.lang.IllegalThreadStateException異常。

 

  • 線程啟動

  在運行線程之前首先要構造一個線程對象,線程對象在構造的時候需要提供線程所需要的屬性,如線程所屬的線程組、線程優先級、是否是Daemon線程等信息。一個新構造的線程對象是由其parent線程來進行空間分配的,而child線程繼承了parent是否為Daemon、優先級和家在資源的contextClassLoader以及科技城的ThreadLocal,同時還會分配一個唯一的ID來表示這個child線程。至此,一個能夠運行的線程對象就初始化好了,在堆內存中等待着運行。

  • 通過Runnable接口創建線程類

(1)定義runnable接口的實現類,並重寫該接口的run()方法,該run()方法的方法體同樣是該線程的線程執行體。

(2)創建 Runnable實現類的實例,並以此實例作為Thread的target來創建Thread對象,該Thread對象才是真正的線程對象。

(3)調用線程對象的start()方法來啟動該線程。用start方法來啟動線程,真正實現了多線程運行,這時無需等待run方法體代碼執行完畢而直接繼續執行下面的代碼。通過調用Thread類的start()方法來啟動一個線程,這時此線程處於運行狀態中的就緒狀態,並沒有真正運行,一旦得到cpu時間片,就開始執行run()方法,這里方法 run()稱為線程體,它包含了要執行的這個線程的內容,Run方法運行結束,此線程隨即終止。

public class RunnableThreadTest implements Runnable  
{  
  
    private int i;  
    public void run()  
    {  
        for(i = 0;i <100;i++)  
        {  
            System.out.println(Thread.currentThread().getName()+" "+i);  
        }  
    }  
    public static void main(String[] args)  
    {  
        for(int i = 0;i < 100;i++)  
        {  
            System.out.println(Thread.currentThread().getName()+" "+i);  
            if(i==20)  
            {  
                RunnableThreadTest rtt = new RunnableThreadTest();  
                new Thread(rtt,"新線程1").start();  
                new Thread(rtt,"新線程2").start();  
            }  
        }  
    }   
}
  • 線程中斷

中斷可以理解為線程的一個標識位屬性,它表示一個運行中的線程是否被其他線程進行了中斷操作。中斷好比其他線程對該線程打了個招呼,其他線程通過調用該線程的interrupt()方法對其進行中斷操作。

  調用線程的interrupt() 方法不會中斷一個正在運行的線程,這個機制只是設置了一個線程中斷標志位,如果在程序中你不檢測線程中斷標志位,那么即使設置了中斷標志位為true,線程也一樣照常運行。所以說線程是通過檢查自身是否被終端來進行響應,通過方法isInterrupted()來進行判斷是否被中斷,也可以調用靜態方法Thread.interrupted()對當前線程的中斷標識位進行復位,如果該線程已經處於終結狀態,即使該線程被中斷過,在調用該線程對象的isInterrupted()時依舊會返回false。

  可以通過在線程中設置對線程終端標志位的檢查來控制線程的中斷

public class InterruptThreadTest2 extends Thread{
    public void run() {
        // 這里調用的是非清除中斷標志位的isInterrupted方法
        while(!Thread.currentThread().isInterrupted()) {
            long beginTime = System.currentTimeMillis();
            System.out.println(Thread.currentThread().getName() + "is running");
            // 當前線程每隔一秒鍾檢測線程中斷標志位是否被置位
            while (System.currentTimeMillis() - beginTime < 1000) {}
        }
        if (Thread.currentThread().isInterrupted()) {
            System.out.println(Thread.currentThread().getName() + "is interrupted");
        }
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        InterruptThreadTest2 itt = new InterruptThreadTest2();
        itt.start();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        // 設置線程的中斷標志位
        itt.interrupt();
    }
  • 線程的喚醒和阻塞帶來的資源消耗

  java的線程是映射到操作系統原生線程之上的,如果要阻塞或喚醒一個線程就需要操作系統介入,需要在用戶態和內核態之間切換,這種切換會消耗大量的系統資源,因為用戶態與內核態都有各自專用的內存空間,專用的寄存器等,用戶態切換至內核態需要傳遞給許多變量、參數給內核,內核也需要保護好用戶態在切換時的一些寄存器值、變量等,以便內核態調用結束后切換回用戶態繼續工作。

    1. 如果線程狀態切換是一個高頻操作時,這將會消耗很多CPU處理時間;
    2. 如果對於那些需要同步的簡單的代碼塊,獲取鎖掛起操作消耗的時間比用戶代碼執行的時間還要長,這種同步策略顯然非常糟糕的。

 

四、線程間通信

  線程開始運行,擁有自己的棧空間,就如同一個腳本一樣,按照既定的代碼一步一步地執行,知道終止,但是每個運行中的線程,如果僅僅是孤立地運行,那么沒有一點兒價值,或者說價值很少,如果多個線程能夠相互配合完成工作,這將會帶來巨大的價值。

1 volatile和synchronized關鍵字

  Java支持多個線程同時訪問一個對象或者對象的成員變量,由於每個線程可以擁有這個變量的拷貝(雖然對象以及成員變量分配的內存是共享內存中的,但是每個執行的線程還擁有一份拷貝,這樣做的目的是加速程序執行,這是現代多核處理器的一個顯著特性),所以程序在執行過程中,一個線程看到的變量並不一定是最新的。

  關鍵字volatile可以用來修飾字段(成員變量),就是告知程序任何對該變量的訪問均需從共享內存中獲取,而對它的改變必須同步刷新回共享內存。它能保證所有線程對變量訪問的可見性。但過多地使用volatile會降低程序執行的效率。

  關鍵字synchronized可以修飾方法或者以同步塊的形式來進行使用,它主要確保多個線程在同一時刻,只能有一個線程處於方法或者同步塊中,保證了線程對變量訪問的可見性和排他性。

  具體volatile和synchronized的原理分析以及使用場景對比會在第六部分具體講到。

2 等待/通知機制

  一個線程修改了一個對象的值,而另一個線程感知到了變化,然后進行相應的操作,整個過程開始於一個線程,而最終執行又是另一個線程。前者是生產者,后者就是消費者。這種模式隔離了“做什么”(what)和“怎么做”(How),在功能層面上實現了解耦,體系結構具備了良好的伸縮性。

  等待/通知的相關方法是任意Java對象都具備的,因為這些方法被定義在所有對象的超類java.lang.Object上。具體的方法有notify()notifyAll()wait()、wait(long)、wait(long, int)

 

方法名稱 描述
notify() 通知一個在對象上等待的相乘,使其從wait()方法返回,而返回的前提是該線程獲取到了對象的鎖。
notifyAll() 通知所有等待在該對象上的線程。
wait() 調用該方法的線程由RUNNING進入WAITING狀態,只有等待另外線程的通知或被終端才會返回,需要注意,調用wait()方法后,會釋放對象的鎖。
wait(long) 超時等待一段時間,這里的參數時間是毫秒,也就是等待長達n毫秒,如果沒有通知就超時返回。
wait(long, int) 對於超時時間更細粒度的控制,可以達到納秒。

   等待/通知機制,是指一個線程A調用了對象O的wait()方法進入等待狀態,而另一個線程B調用了對象O的notify()或者notifyAll()方法,線程A收到了通知后從對象O的wait()方法返回,進而執行后序操作。上述兩個線程通過對象O來完成交互,而對象上的wait()和notify/notifyAll()的關系就如同開關信號一樣,用來完成等待方和通知方之間的交互工作。

  使用wait(),notify()以及notifyAll()需要注意的細節:

1)使用wait()、notify()和notifyAll()時需要先對調用對象加鎖。

2)調用wait()方法后,線程狀態由RUNNING變為WAITING,並將當前線程放置到對象的等待隊列。

3)notify()或notifyAll()后,等待線程並不會直接從wait()放回,需要等到1.notify()或notifyAll()的線程釋放鎖之后;2.等待線程獲取到鎖, 才會從wait()返回。

4)notify()方法將等待隊列中的一個等待線程從等待隊列中移到都同步隊列中,而notifyAll()方法則是將等待隊列中所有的線程全部移到同步隊列,被移到的狀態由WEAITNG變為BLOCKED。

5)等待線程從wait()方法返回的前提時獲得了調用對象的鎖。

從上述細節可以看到,等待/通知機制依托於同步機制,其目的就是確保等待線程從wait()方法返回時能夠感受到通知線程對變量的修改。

等待/通知的經典范式

范式分為兩部分,分別為等待方(消費者)和通知方(生產者)

等待方的原則:

1)獲取對象的鎖

2)如果條件不滿足,那么調用鎖的wait()方法,使該線程進入waiting,被通知后依然要檢查條件

3)條件滿足則執行對應的邏輯

偽代碼:

synchronized(對象){
    while(條件不滿足){
     對象.wait();
  }
  對應的邏輯處理
}

通知方的原則:

1)獲取對象的鎖

2)改變條件

3)通知所有等待在該對象上的線程

偽代碼:

synchronized(對象){
     改變條件
     對象.notifyAll();
}

3 await()/signal()

  await()/signal()的用法和功能其實和wait()/notify()比較像,前者是屬於Condition接口,后者屬於java.lang.Object類。

  wait()/notify()一般配合synchronized使用,這些方法都是Object類提供的。synchronized在一個對象上鎖(同步),wait()/notify()在這個對象上執行操作,所以一般在一個同步代碼塊中只能對一個對象進行wait()/notify()的操作。

  await()/signal()一般配合Lock對象使用,與wait()/notify()不同的是,wait()/notify()是根據synchronized(object)的object來object.wait()或object.notify(),而await()/signal()是根據鎖對象的Condition對象關聯的,每一個lock中可以有多個Condition,即一個同步代碼塊中,可以對多個Condition對象await()/signal(),這樣的話大大提升了等待/通知機制的靈活性。在JUC並發包中的阻塞隊列,就使用了Condition的await()/signal(),通過notFull和notEmpty兩個Condition,合理的控制了程序的流程。具體在第八部分會詳細講到阻塞隊列。  

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
        condition.await();
    } finally {
        lock.unlock();
    }
}

public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
        condition.signal();
    } finally {
        lock.unlock();
    }
}

  一般情況下使用Object提供的3種方法就已經可以很好的實現線程間的協作。當Lock鎖使用公平模式的時候,可以使用Condition的signal(),線程會按照FIFO的順序沖await()中喚醒。當每個鎖上有多個等待條件時,可以優先使用Condition,這樣可以具體一個Condition控制一個條件等待。

4 Thread.join()

  如果一個線程A執行了threadA.join()語句,其含義是:當前線程A等待thread線程終止之后才從threadA.join()返回。線程Thread除了提供join()方法之外,還提供了join(long millis)和join(long milis, int nanos)兩個具備超時特性的方法。這兩個超時方法表示,如果線程thread在給定的超時時間里沒有終止,那么將會從超時方法中返回。  

  看JDK中的Thread.join()方法的源碼發現,join()方法是用wait(0)方法掛起線程的。可以看到這個join()方法還是用2中講到的等待/通知經典范式的思想來實現的,即:加鎖、循環和處理邏輯三個步驟。只不過看源碼發現,join()中的wait()似乎不太需要notify()喚醒,因為join()中的wait(timeout)的timeout參數一般是0或delay,所以會自動返回。  

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

5 ThreadLocal

  ThreadLocal是一個以ThreadLocal對象為鍵、任意對象為值的存儲結構。這個結構被附帶在線程上,也就是說一個線程可以根據一個ThreadLocal對象查詢到綁定在這個線程上的一個值。

  ThreadLocal是用來維護本線程的變量的,並不能解決共享變量的並發問題。ThreadLocal是各線程將值存入該線程的map中,以ThreadLocal自身作為key,需要用時獲得的是該線程之前存入的值。如果存入的是共享變量,那取出的也是共享變量,並發問題還是存在的。

  ThreadLocal的主要用途是為了保持線程自身對象和避免參數傳遞,主要適用場景是按線程多實例(每個線程對應一個實例)的對象的訪問,並且這個對象很多地方都要用到。

  ThreadLocal的詳細內容查看:

  https://www.jianshu.com/p/98b68c97df9b

  https://blog.csdn.net/qq_36632687/article/details/79551828

  https://mp.weixin.qq.com/s/aM03vvSpDpvwOdaJ8u3Zgw

  

五、常見鎖的概念

  1 樂觀鎖

  總是假設最好的情況,每次去拿數據的時候都認為別人不會修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間別人有沒有去更新這個數據,可以使用版本號機制和CAS算法實現。樂觀鎖適用於多讀的應用場景,因為這種場景沖突比較小,不使用鎖可以減少上下文切換的開銷等,這樣可以提高吞吐量,像數據庫提供的類似於write_condition機制,其實都是提供的樂觀鎖。在Java中 java.util.concurrent.atomic包下面的原子變量類就是使用了樂觀鎖的一種實現方式CAS實現的。CAS算法就是典型的樂觀鎖,具體的原理會在第六部分介紹。

  簡而言之樂觀鎖就是每次操作不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止,不會造成線程阻塞。​

2 悲觀鎖

  總是假設最壞的情況,每次去拿數據的時候都認為別人會修改,所以每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會阻塞block直到它拿到鎖(共享資源每次只給一個線程使用,其它線程阻塞,用完后再把資源轉讓給其它線程)。悲觀鎖更適用於多寫的應用場景,這種場景下沖突較多。傳統的關系型數據庫里邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖。Java中 synchronizedReentrantLock等獨占鎖就是悲觀鎖思想的實現。

  簡而言之悲觀鎖就是每次操作都會加鎖,會造成線程阻塞。

  java中的悲觀鎖就是Synchronized,AQS框架下的鎖則是先嘗試CAS樂觀鎖去獲取鎖,如ReentrantLock。

3 重入鎖

  也叫做遞歸鎖,指的是同一線程外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。在JAVA中 ReentrantLock 和synchronized 都是可重入鎖。重入鎖可以有效的避免死鎖。

4 自旋鎖

采用讓當前線程不停的在循環體內執行實現,當循環的條件被其它線程改變時才能進入臨界區

public void lock() {
  Thread current = Thread.currentThread();
  //compareAndSet(V expect, V update) 如果當前值 == 預期值,則以原子方式將該值設置為給定的更新值。
  while (!sign.compareAndSet(null, current)) {
  }
}
public void unlock() {
  Thread current = Thread.currentThread();
  sign.compareAndSet(current, null);
}

  由於自旋鎖只是將當前線程不停地執行循環體,不進行線程狀態的改變,所以響應速度更快。但當線程數不停增加時,性能下降明顯,因為每個線程都需要執行,占用CPU時間。如果線程競爭不激烈,並且保持鎖的時間段。適合使用自旋鎖。

  互斥鎖相對於自旋鎖的缺點是 其他未獲取到鎖的線程在等待獲取鎖時會進入block狀態,獲取到鎖時再切換回來,這樣會造成CPU很大的時間開銷,如果互斥量僅僅被鎖住很短的一段時間, 用來使線程休眠和喚醒線程的時間會比該線程睡眠的時間還長, 甚至有可能比不斷在自旋鎖上輪訓的時間還長。自旋鎖的問題是, 如果自旋鎖被持有的時間過長, 其它嘗試獲取自旋鎖的線程會一直輪訓自旋鎖的狀態, 這將非常浪費CPU的執行時間, 這時候該線程睡眠會是一個更好的選擇。

  自旋鎖和互斥鎖(排他鎖,獨占鎖)是相對的概念,具體區別可參考:http://ifeve.com/practice-of-using-spinlock-instead-of-mutex/

5 讀寫鎖

  之前提到的鎖(如synchronized和ReentrantLock)基本都是排他鎖,這些鎖在同一時刻只允許一個線程進行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。讀寫鎖的機制: "讀-讀"不互斥 、"讀-寫"互斥 、"寫-寫"互斥 。JUC包中有專門的讀寫鎖的實現:ReentrantReadWriteLock。

  讀寫鎖代碼實例見:https://blog.csdn.net/liyantianmin/article/details/42829233

  其實在mysql中讀寫鎖有時候也分為兩部分:排他鎖(寫鎖)和共享鎖(讀鎖),在java中倒是沒有對這兩個概念進行細分。 

6 公平鎖

  公平鎖的公平性與否是在於:鎖的獲取順序是否符合請求的絕對時間順序,也就是FIFO。

  JUC包中的ReetrantLock就可以實現公平鎖,具體底層實現機制是通過AQS(同步隊列)判斷當前節點是否有前驅節點的判斷,如果該方法返回true,則表示有線程比當前線程更早地請求獲取鎖,因此需要等待前驅線程獲取鎖並釋放之后才能繼續獲取鎖,有關AQS的原理會在第七部分具體講到。

  ReetrantLock具體實現公平鎖的代碼可參考:https://blog.csdn.net/qyp199312/article/details/70598480 

7 偏向鎖

  Java偏向鎖(Biased Locking)是Java6引入的一項多線程優化。 偏向鎖,顧名思義,它會偏向於第一個訪問鎖的線程,如果在運行過程中,同步鎖只有一個線程訪問,不存在多線程爭用的情況,則線程是不需要觸發同步的,這種情況下,就會給線程加一個偏向鎖。 如果在運行過程中,遇到了其他線程搶占鎖,則持有偏向鎖的線程會被掛起,JVM會消除它身上的偏向鎖,將鎖恢復到標准的輕量級鎖。

  偏向鎖通過消除資源無競爭情況下的同步原語,進一步提高了程序的運行性能。synchronized的實現就用到了偏向鎖。

8 死鎖

  把死鎖放在這略微有點萌QAQ,因為它並不是一種鎖的類型,而是指錯誤使用鎖而出現的現象。死鎖出現的原因就是多個線程涉及到了多個鎖,這些鎖出現了交叉或閉環,如下面這個例子:

  創建了兩個字符串a和b,再創建兩個線程A和B,讓每個線程都用synchronized鎖住字符串(A先鎖a,再去鎖b;B先鎖b,再鎖a),如果A鎖住a,B鎖住b,A就沒辦法鎖住b,B也沒辦法鎖住a,這時就陷入了死鎖。

public class DeadLock {
    public static String obj1 = "obj1";
    public static String obj2 = "obj2";
    public static void main(String[] args){
        Thread a = new Thread(new Lock1());
        Thread b = new Thread(new Lock2());
        a.start();
        b.start();
    }    
}
class Lock1 implements Runnable{
    @Override
    public void run(){
        try{
            System.out.println("Lock1 running");
            while(true){
                synchronized(DeadLock.obj1){
                    System.out.println("Lock1 lock obj1");
                    Thread.sleep(3000);//獲取obj1后先等一會兒,讓Lock2有足夠的時間鎖住obj2
                    synchronized(DeadLock.obj2){
                        System.out.println("Lock1 lock obj2");
                    }
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}
class Lock2 implements Runnable{
    @Override
    public void run(){
        try{
            System.out.println("Lock2 running");
            while(true){
                synchronized(DeadLock.obj2){
                    System.out.println("Lock2 lock obj2");
                    Thread.sleep(3000);
                    synchronized(DeadLock.obj1){
                        System.out.println("Lock2 lock obj1");
                    }
                }
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

 

六、CAS、volatile、synchronized

  有人稱CAS、volatile是Java並發的基石,再加上第八章介紹的lock接口和AQS、重入鎖以及JUC中的好多實用的類都是用他們實現的。為了理解阻塞隊列,ConcurrentHashmap和線程池等成熟的並發包的實現原理(第八章)需先掌握第六章和第七章的基本同步組件的原理和應用。

1 volatile

  java編程語言允許線程訪問共享變量,為了確保共享變量能被准確和一致的更新,線程應該確保通過排他鎖單獨獲得這個變量。Java語言提供了volatile,在某些情況下比鎖更加方便。如果一個字段被聲明成volatile,java線程內存模型確保所有線程看到這個變量的值是一致的。注意這里的字段只能是Java中的8大基礎變量,如果是引用型變量,可能無法保證內存可見性。

  valitate是輕量級的synchronized,不會引起線程上下文的切換和調度,執行開銷更小。

  volatile寫的內存語義:當寫一個volatile變量時,JMM會把該線程對應的本地內存中的共享變量值刷新到主內存。

  volatile讀的內存語義:當讀一個volatile變量時,JMM會把該線程對應的本地內存置位無效。線程接下來將從主內存中讀取共享變量。(強制從主內存讀取共享變量,把本地內存與主內存的共享變量的值變成一致)。

  為了實現volatile的內存語義,JMM對編譯器和處理器的重排序做了限制。具體JMM是如何實現volatile的內存語義的可以參見我之前的博客:https://www.cnblogs.com/kukri/p/9109639.html

  volatile可以保證三大特性中的

  可見性:多線程操作的時候,一個線程修改了一個變量的值 ,其他線程能立即看到修改后的值

  有序性:即程序的執行順序按照代碼的順序執行(處理器為了提高代碼的執行效率可能會對代碼進行重排序)

  但不能保證原子性,例如說x++這種復合操作,例如說,x初始值為3,x=x+1會首先從主內存中取x的值到工作內存中,此時取的值是3,+1以后的值為4,但此時還沒來得及將4賦值給x,CPU時間片就輪到了其他線程,另一個線程將x的變量值改為了10,然后該線程重新獲取到了時間片后,繼續執行之前的操作,x被賦為4,這樣就出現了同步問題。

  synchronized關鍵字是防止多個線程同時執行一段代碼,那么就會很影響程序執行效率,而volatile關鍵字在某些情況下性能要優於synchronized,但是要注意volatile關鍵字是無法替代synchronized關鍵字的,因為volatile關鍵字無法保證操作的原子性。通常來說,使用volatile必須具備以下2個條件:

  1)對變量的寫操作不依賴於當前值

  2)該變量沒有包含在具有其他變量的不變式中

  實際上,這些條件表明,可以被寫入 volatile 變量的這些有效值獨立於任何程序的狀態,包括變量的當前狀態。

  事實上,我的理解就是因為volatile本身不具備原子性,因此上面的2個條件需要保證操作是原子性操作,才能保證使用volatile關鍵字的程序在並發時能夠正確執行。

2 synchronized

  跟volatile相比,synchronized更“重量級”,帶來的內存 CPU的開銷也更大一些,帶來的好處就是能確保線程互斥的訪問同步代碼,更安全。

  synchronized 是JVM實現的一種鎖,其中鎖的獲取和釋放分別是monitorenter 和 monitorexit 指令,該鎖在實現上分為了偏向鎖、輕量級鎖和重量級鎖,其中偏向鎖在 java1.6 是默認開啟的,輕量級鎖在多線程競爭的情況下會膨脹成重量級鎖,有關鎖的數據都保存在對象頭中。

  synchronized用法:

  • 修飾普通方法:同步對象是實例對象
  • 修飾靜態方法:同步對象是類本身
  • 修飾代碼塊:可以自己設置同步對象​

  synchronized的缺點:會讓沒有得到鎖的資源進入Block狀態,爭奪到資源之后又轉為Running狀態,這個過程涉及到上下文切換和調度延時以及操作系統用戶態和內核態的切換,代價比較高。Java1.6為 synchronized 做了優化,增加了從偏向鎖到輕量級鎖再到重量級鎖的過度,但是在最終轉變為重量級鎖之后,性能仍然較低。

  三種鎖的差別如下圖:

  關於synchronized的鎖優化問題可進一步參考:https://www.cnblogs.com/barrywxx/p/8678698.html

3 CAS

  Java並發應用中通常指CompareAndSwap或CompareAndSet,即比較並交換。JVM中的CAS操作是利用了處理器提供的CMPXCHG指令實現的;CAS是一種系統原語,原語屬於操作系統用語范疇,是由若干條指令組成的,用於完成某個功能的一個過程,並且原語的執行必須是連續的,在執行過程中不允許被中斷,因此CAS是一個原子操作,但僅能保證一個共享變量的原子性

  CAS(V,A,B)操作包含三個操作數 —— 內存位置(V)、預期原值(A)和新值(B)。 如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值 。否則,處理器不做任何操作。

  

  像剛剛講到的volatile解決不了的x++問題,JUC包中的原子操作類java.util.concurrent.atomic可以解決,其底層其實是用自旋+CAS來實現的。

  例如AtomicInteger:

public final int getAndIncrement() {
    for(;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
            return current;
    }
}

public final boolean compareAndSet(int except, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, except, update);
} 

  JUC的原子操作類具體可參考:https://blog.csdn.net/javazejian/article/details/72772470#%E5%B9%B6%E5%8F%91%E5%8C%85%E4%B8%AD%E7%9A%84%E5%8E%9F%E5%AD%90%E6%93%8D%E4%BD%9C%E7%B1%BBatomic%E7%B3%BB%E5%88%97

  CAS的優點:能有效的減少上下文的切換的開銷,提升性能。並提供了單變量的原子性。

  CAS存在的問題(缺點):

  • ABA問題:

什么是ABA問題?比如有一個 int 類型的值 N 是 1
此時有三個線程想要去改變它:
線程A ​​:希望給 N 賦值為 2
線程B: 希望給 N 賦值為 2
線程C: 希望給 N 賦值為 1​​
此時線程A和線程B同時獲取到N的值1,線程A率先得到系統資源,將 N 賦值為 2,線程 B 由於某種原因被阻塞住,線程C在線程A執行完后得到 N 的當前值2
此時的線程狀態
線程A成功給 N 賦值為2
線程B獲取到 N 的當前值 1 希望給他賦值為 2,處於阻塞狀態
線程C獲取當好 N 的當前值 2 ​​​​​希望給他賦值為1
然后線程C成功給N賦值為1
​最后線程B得到了系統資源,又重新恢復了運行狀態,​在阻塞之前線程B獲取到的N的值是1,執行compare操作發現當前N的值與獲取到的值相同(均為1),成功將N賦值為了2。

在這個過程中線程B獲取到N的值是一個舊值​​,雖然和當前N的值相等,但是實際上N的值已經經歷了一次 1到2到1的改變
上面這個例子就是典型的ABA問題​
怎樣去解決ABA問題
給變量加一個版本號即可,在比較的時候不僅要比較當前變量的值 還需要比較當前變量的版本號。Java中AtomicStampedReference 就解決了這個問題

  • 循環時間長開銷大

  在並發量比較高的情況下,如果許多線程反復嘗試更新某一個變量,卻又一直更新不成功,循環往復,會給CPU帶來很大的壓力。

  • 只能保證一個共享變量的原子操作。

  當對一個共享變量執行操作時,我們可以使用循環CAS的方式來保證原子操作,但是對多個共享變量操作時,循環CAS就無法保證操作的原子性,這個時候就可以用鎖,或者有一個取巧的辦法,就是把多個共享變量合並成一個共享變量來操作。比如有兩個共享變量i=2,j=a,合並一下ij=2a,然后用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,你可以把多個變量放在一個對象里來進行CAS操作。

 

七、JUC包:Lock接口、AQS(隊列同步器)、重入鎖、讀寫鎖

  本章介紹JUC包(java.util.Concurrent)中與鎖相關的API和組件,以及這些API和組件的使用方式和實現原理。

1 Lock接口

  synchronized的缺點:

1)不能響應中斷;

2)同一時刻不管是讀還是寫都只能有一個線程對共享資源操作,其他線程只能等待

3)鎖的釋放由虛擬機來完成,不用人工干預,不過此即使缺點也是優點,優點是不用擔心會造成死鎖,缺點是由可能獲取到鎖的線程阻塞之后其他線程會一直等待,性能不高。

  鎖時用來控制多個線程訪問共享資源的方式,一般來說,一個鎖能防止多個線程同時對共享資源(讀寫鎖例外,可以允許多個線程並發訪問共享資源)。在Lock接口出現之前,Java是靠synchronized關鍵字實現鎖功能的。但是在Java SE 5之后,並發包中新增了Lock接口以及其相關實現類用來實現鎖功能,它提供了與synchronized關鍵字類似的同步功能,只是在使用時需要顯示地獲取和釋放鎖;雖然它缺少了通過synchronized塊提供的隱式獲取釋放鎖的便攜性,但是卻擁有了鎖獲取與釋放的可操作性、可中斷的獲取鎖以及超時獲取鎖等更靈活的特性。

Lock lock = new ReentrantLock();
lock.lock();
try {
} finally {
    lock.unlock();
}

  Lock接口有6個方法如下,本篇就不詳細介紹這幾個API的使用方法了,可參考https://www.cnblogs.com/dolphin0520/p/3923167.html

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

  

2 (AQS)隊列同步器

  隊列同步器AbstractQueuedSynchronizer,是用來構建鎖或者而其他同步組件的基礎框架,它使用了一個int成員變量表示同步狀態,通過內置的FIFO隊列來完成資源獲取線程的排隊工作。

  AQS的主要使用方式是集成,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,在抽象方法的實現過程中免不了對同步狀態進行更改,這時就需要使用同步器提供的三個方法(getState()、setState(int new State)和compareAndSetState(int expect, int update))來進行操作,因為它們能夠保證狀態的改變是安全的。子類推薦被定義為自定義同步組件的靜態內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干同步狀態獲取和釋放的方法來供自定義同步組件使用,同步器既可以支持獨占式地獲取同步狀態,也可以支持共享式地獲取同步狀態,這樣就可以方便實現不同的同步組件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

  Lock接口的實現基本上都是通過聚合了一個同步器的子類來完成線程訪問控制的。同步器是實現鎖(也可以是任意同步組件)的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。

  鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實現細節;

  同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現者所需要關注的領域。

  AQS的實現原理我是看方騰飛的那本《Java 並發編程的藝術》學習的,篇幅過多就不在此羅列了。或參考鏈接https://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html#a1-2

  AQS是JUC中很多同步組件的構建基礎,簡單來講,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態state失敗的線程,會被構造成一個結點(或共享式或獨占式)加入到同步隊列尾部(采用自旋CAS來保證此操作的線程安全),隨后線程會阻塞;釋放時喚醒頭結點的后繼結點,使其加入對同步狀態的爭奪中。

  AQS為我們定義好了頂層的處理實現邏輯,我們在使用AQS構建符合我們需求的同步組件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取即可,至於背后復雜的線程排隊,線程阻塞/喚醒,如何保證線程安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用。AQS定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現。 

  基於AQS實現的同步組件,包括下面要介紹的重入鎖和讀寫鎖相比於傳統的synchronized比除性能外還有以下優勢:

1.可響應中斷、鎖申請等待限時等;

2.公平鎖與非公平鎖(ReentrantLock);

3.根據AQS的獨占式和共享式兩種獲取同步狀態的方法可以實現排他鎖與共享鎖(ReentrantReadWriteLock);

4.另外可以結合Condition來使用await()/signal() 提供更靈活的等待/通知機制;

5.鎖的釋放獲取可以由程序員自己控制,更加靈活。

 

3 重入鎖

  重入鎖ReentrantLock支持重進入,表示該鎖能夠支持一個線程對資源重復加鎖,除此之外,比較重要的是該鎖還支持獲取鎖時的公平性和非公平性選擇。ReentrantLock是通過組合自定義AQS來實現鎖的獲取和釋放。具體原理可查看《Java 並發編程的藝術》相關部分。

  本篇只分析一下ReentrantLock的重點部分,就是如何實現公平鎖的:

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

  重點是在於公平獲取鎖的判斷條件相對於非公平鎖,加入了!hasQueuedPredecessors()的判斷:

   public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        //head沒有next ----> false
        //head有next,next持有的線程不是當前線程 ----> true
        //head有next,next持有的線程是當前線程 ----> false
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }

  即加入了同步隊列中當前節點是否有前驅節點,如果返回true,則表示有線程比當前線程更早地請求獲取鎖,因此要等待前驅線程獲取鎖並釋放鎖才能繼續獲取鎖,對應的就是把該線程加入同步隊列中等待。

  即 tryAcquire return false,取非以后 第一部分return true,然后進入第二部分,通過addWaiter把當前線程存為節點並加入到同步隊列的尾部。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  然后acquireQueued又是一個自旋,等待同步隊列中的線程(節點)出隊列的過程,且保證了隊列的FIFO的特性。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//標記是否成功拿到資源
    try {
        boolean interrupted = false;//標記等待過程中是否被中斷過
        
        //又是一個“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驅
            //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。
                p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了!
                failed = false;
                return interrupted;//返回等待過程中是否被中斷過
            }
            
            //如果自己可以休息了,就進入waiting狀態,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  第一次看這塊的時候有疑問,就感覺AQS自帶的acquire方法中的acquireQueued已經保證了FIFO,ReentrantLock集成AQS實現還怎么保證非公平和公平兩種特性了。

if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

  后來仔細分析程序結構發現,acquire中的if判斷,是先會去判斷tryAcquire是否成功,根據&&(與)運算的特性,當且僅當tryAcquire失敗(false),也就是if語句中的第一個判斷條件為真時,才會去判斷第二個條件,也就是加入同步隊列自旋,這個同步隊列是會保證FIFO也就保證了公平性的。但如果一個新來的線程,恰好在tryAcquire的CAS原子操作中成功獲取了同步狀態,那么他將“插隊”,也就是說越過了同步隊列中的所有節點,直接執行,這樣就失去了公平性。所以雖然AQS機制自帶了同步隊列保證了一部分“公平性”,但tryAcquire中卻沒有保證公平性,所以ReentrantLock是否保證公平性是體現在tryAcquire的方法中的。

  言歸正傳,公平鎖雖然能保證線程鎖的公平獲取,但經過試驗發現,會造成上下文的切換次數增加,造成更大的開銷。非公平鎖雖然可能造成線程“飢餓”,但極少的線程切換,保證了其更大的吞吐量。所以ReentrantLock默認設置為非公平鎖。

  在JDK5.0版本之前,重入鎖的性能遠遠好於synchronized關鍵字,JDK6.0版本之后synchronized 得到了大量的優化,二者性能也不分伯仲,但是重入鎖是可以完全替代synchronized關鍵字的。除此之外,重入鎖又自帶一系列高逼格操作:可中斷響應、鎖申請等待限時、公平鎖、另外可以結合Condition來使用await()/signal() 提供更靈活的等待/通知機制,另外鎖的釋放獲取也更加靈活。

4 讀寫鎖

  之前提到的鎖基本上都是排他鎖(獨占鎖),在同一時刻只允許一個線程進行訪問,而讀寫鎖在同一時刻可以允許多個讀線程訪問,但是在寫線程訪問時,所有的讀線程和其他寫線程均被阻塞。ReentrantReadWriteLock維護了一堆鎖,讀鎖和寫鎖,通過分離讀鎖和寫鎖,使得並發性相比一般的排他鎖有了很大提升。

 

八、阻塞隊列、ConcurrentHashMap、線程池

  掌握了第六部分第七部分所講的核心組件的原理后,就可以了解一下在實際生產環境中使用更多的API了。

1 阻塞隊列 

  阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。

  1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。

  2)支持阻塞的移除方法:意思是當隊列為空時,獲取元素的線程會等待隊列變為非空。

  阻塞隊列常用於生產者和消費者的場景,生產者是向隊列里添加元素的線程,消費者是從隊列里取出元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。  

  JDK7提供了7個阻塞隊列。分別是

  • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
  • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
  • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
  • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
  • SynchronousQueue:一個不存儲元素的阻塞隊列。
  • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
  • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

阻塞隊列的方法對比非阻塞隊列的方法

1.非阻塞隊列中的幾個主要方法:

  add(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則會拋出異常;

  remove():移除隊首元素,若移除成功,則返回true;如果移除失敗(隊列為空),則會拋出異常;

  offer(E e):將元素e插入到隊列末尾,如果插入成功,則返回true;如果插入失敗(即隊列已滿),則返回false;

  poll():移除並獲取隊首元素,若成功,則返回隊首元素;否則返回null;

  peek():獲取隊首元素,若成功,則返回隊首元素;否則返回null

  對於非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。

2.阻塞隊列中的幾個主要方法:

  阻塞隊列包括了非阻塞隊列中的大部分方法,上面列舉的5個方法在阻塞隊列中都存在,但是要注意這5個方法在阻塞隊列中都進行了同步措施。除此之外,阻塞隊列提供了另外4個非常有用的方法:

  put(E e):put方法用來向隊尾存入元素,如果隊列滿,則等待;

  take():take方法用來從隊首取元素,如果隊列為空,則等待;

  offer(E e,long timeout, TimeUnit unit):offer方法用來向隊尾存入元素,如果隊列滿,則等待一定的時間,當時間期限達到時,如果還沒有插入成功,則返回false;否則返回true;

  poll(long timeout, TimeUnit unit):poll方法用來從隊首取元素,如果隊列空,則等待一定的時間,當時間期限達到時,如果取到,則返回null;否則返回取得的元素;

阻塞隊列的實現原理

  JDK是使用通知模式實現的阻塞隊列。所謂通知模式,就是當生產者往隊列滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。通過查看JDK源碼發現ArrayBlockingQueue使用了Condition接口以及(await()/signal())來實現。

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

 生產者/消費者示例代碼(基於阻塞隊列實現)

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BlockingQueueTest {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }


}


class Consumer implements Runnable{
    BlockingQueue<Integer> queue;

    Consumer (BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                queue.take();
                System.out.println("從隊列取走一個元素,隊列剩余" + queue.size() + "個元素");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

class Producer implements Runnable {
    BlockingQueue<Integer> queue;

    Producer (BlockingQueue<Integer> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                queue.put(1);
                System.out.println("向隊列插入一個元素,隊列剩余空間:" + (10 - queue.size()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

 

2 ConcurrentHashMap

這部分內容之前在我的一篇博客中詳細分析過: https://www.cnblogs.com/kukri/p/9392906.html

 

3 線程池

  基本思想還是一種對象池的思想,開辟一塊內存空間,里面存放了眾多(未死亡)的線程,池中線程執行調度由池管理器來處理。當有線程任務時,從池中取一個,執行完成后線程對象歸池,這樣可以避免反復創建線程對象所帶來的性能開銷,節省了系統的資源。合理地使用線程池能夠帶來3個好處。

  第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

  第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

  第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。

為什么要使用線程池:      

  多線程技術主要解決處理器單元內多個線程執行的問題,它可以顯著減少處理器單元的閑置時間,增加處理器單元的吞吐能力。

  假設一個服務器完成一項任務所需時間為:T1 創建線程時間,T2 在線程中執行任務的時間,T3 銷毀線程時間。

  如果:T1 + T3 遠大於 T2,則可以采用線程池,以提高服務器性能。

     一個線程池包括以下四個基本組成部分:
                1、線程池管理器(ThreadPool):用於創建並管理線程池,包括 創建線程池,銷毀線程池,添加新任務;
                2、工作線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,可以循環的執行任務;
                3、任務接口(Task):每個任務必須實現的接口,以供工作線程調度任務的執行,它主要規定了任務的入口,任務執行完后的收尾工作,任務的執行狀態等;
                4、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩沖機制。

  線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高服務器程序性能的。它把T1,T3分別安排在服務器程序的啟動和結束的時間段或者一些空閑的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。

  線程池不僅調整T1,T3產生的時間段,而且它還顯著減少了創建線程的數目,看一個例子:

  假設一個服務器一天要處理50000個請求,並且每個請求需要一個單獨的線程完成。在線程池中,線程數一般是固定的,所以產生線程總數不會超過線程池中線程的數目,而如果服務器不利用線程池來處理這些請求則線程總數為50000。一般線程池大小是遠小於50000。所以利用線程池的服務器程序不會為了創建50000而在處理請求時浪費時間,從而提高效率。

  諸如 Web 服務器、數據庫服務器、文件服務器或郵件服務器之類的許多服務器應用程序都面向處理來自某些遠程來源的大量短小的任務。請求以某種方式到達服務器,這種方式可能是通過網絡協議(例如 HTTP、FTP 或 POP)、通過 JMS 隊列或者可能通過輪詢數據庫。不管請求如何到達,服務器應用程序中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。

  構建服務器應用程序的一個簡單模型是:每當一個請求到達就創建一個新線程,然后在新線程中為請求服務。實際上對於原型開發這種方法工作得很好,但如果試圖部署以這種方式運行的服務器應用程序,那么這種方法的嚴重不足就很明顯。每個請求對應一個線程(thread-per-request)方法的不足之一是:為每個請求創建一個新線程的開銷很大;為每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源要比花在處理實際的用戶請求的時間和資源更多。

  除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。在一個 JVM 里創建太多的線程可能會導致系統由於過度消耗內存而用完內存或“切換過度”。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目。

  線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足。

常見線程池:

①newSingleThreadExecutor
單個線程的線程池,即線程池中每次只有一個線程工作,單線程串行執行任務
②newFixedThreadExecutor(n)
固定數量的線程池,每提交一個任務就是一個線程,直到達到線程池的最大數量,然后后面進入等待隊列直到前面的任務完成才繼續執行
③newCacheThreadExecutor(推薦使用)
可緩存線程池, 當線程池大小超過了處理任務所需的線程,那么就會回收部分空閑(一般是60秒無執行)的線程,當有任務來時,又智能的添加新線程來執行。
④newScheduleThreadExecutor
大小無限制的線程池,支持定時和周期性的執行線程
 
  具體線程池執行機制,原理見《Java 並發編程的藝術》,在此不做過多的照搬。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM