Java 並發之Concurrent 包綜述


■ 並發原理

單核系統:線程交替執行,由於交替又快又多,給人一種同時執行的感覺
多核系統:不僅可以交替執行線程,而且可以重疊執行線程
補充: 本章指的並發主要指的是線程間的並發

 

■ 常見的並發機制

 

■ 不同系統的並發機制

  • UNIX:管道、消息、共享內存、信號量、信號
  • Linux內核:原子操作、自旋鎖、信號量、屏障(由於服務器一般都位於Linux服務器上,因此此是我們最重要要了解的)
  • Solaris線程同步原語:互斥鎖、信號量、多讀者/單寫者鎖、條件變量 
  • Windows:等待函數、分派器對象、臨界區、輕量級讀寫鎖和條件變量

 ■ 互斥的需求

  • 強制互斥: 當臨界區共享時,一次只允許一個線程進入臨界區,即必須強制實施互斥
  • 禁止干涉: 一個在非臨界區停止的線程不能干涉其他線程,包括臨界區和非臨界區的線程
  • 禁止無限延遲: 決不允許出現需要訪問臨界區的線程被無限延遲的情況,如產生死鎖或飢餓
  • 可用立入: 當沒有線程在臨界區中時,任何需要進入臨界區的線程必須能夠立即進入
  • 核數無關: 對相關線程的執行速度和處理器的數目沒有任何要求和限制
  • 有限時間: 一個線程駐留在臨界區的時間必須是有限的

 ■ 互斥的方案

  • 硬件支持:處理器原生支持的互斥指令,好處是可以減少開銷,但很難成為一個通用的解決方案
  • 系統或語言級別支持:即由操作系統或程序語言提供該級別的互斥支持,比如信號量、管程、消息傳遞等
  • 軟件方法支持:這些方法通常基於在訪問內存時基本互斥的假設,盡管允許訪問的順序事先沒有具體安排,但同時訪問內存中的同一地址的操作被內存仲裁器串行化執行了,即可以理解用算法的方式解決互斥問題,比如Dekker算法、Peterson算法

      * Dekker算法

/**
 * Dekker算法基本約束:
 *      某一時刻對某一內存地只能進行一次訪問
 * 1.設置flag做為兩個線程進入臨界區的密鑰,當一個線程失敗,其他仍可訪問
 *      - 每個線程只能改變自身的flag,只能檢查其他線程的flag而不能改變
 *      - 當一個線程要進入臨界區,需周期性檢查另一個線程flag,直到另一線程不在臨界區
 *      - 當線程進入臨界區時,應立即設置自身flag為true,表明占領臨界區
 *      - 當線程離開臨界區時,應立即設置自身flag為false,表明釋放臨界區
 * 2.設置turn用於安排臨界區的訪問順序,訪問線程必須重復讀取turn值直到被允許進入臨界區
 *      - 當turn值等於線程號,該線程可以進入其臨界區
 *      - 否則,該線程必須被強制等待(忙等待或自旋等待)
 */
public class Dekker {
    //觀察兩個線程的狀態
    boolean[] flag = {false,false};
    //表示臨界區訪問權限的輪轉,初始權利給P1 -- 安排執行順序避免謙讓造成的活鎖問題
    int turn = 1;
    public void P0(){
        while (true){
            //設置P0的flag為true,同時檢查P1的flag
            flag[0] = true;
            while (flag[1]){
                //當臨界區不可用時,判斷當前臨界區權限是否是P1
                if (turn == 1){ // 用於處理活鎖問題
                    //當臨界區權限是P1時,需要將P0設置為false,使得P1能進入臨界區,用於處理死鎖問題
                    flag[0] = false;
                    //循環校驗turn的權限(空自旋),直到P1執行完畢將權限交給P0
                    while (turn == 1){
                        /** do Nothing 空自旋 **/
                    }
                    flag[0] = true;//此時P1應已執行完畢,應當禁止P1進入臨界區
                }
            }
            //當P1的flag為false時,P0可以立即進入臨界區
            /** critical section 臨界區 **/
            //當臨界區執行完之后,turn設置為1,將臨界區訪問權限交換給P1
            //並將P0的flag設置為false,釋放臨界區,使得P1可進入臨界區
            turn = 1;
            flag[0] = false;
            /** do otherThings **/
        }
    }
    public void P1(){
        while (true){
            //設置P1的flag為true,同時檢查P0的flag
            flag[1] = true;
            while (flag[0]){
                //當臨界區不可用時,判斷當前臨界區權限是否是P0
                if (turn == 0){ //用於處理活鎖問題
                    //當臨界區權限是P0時,需要將P1設置為false,使得P0能進入臨界區,用於處理死鎖問題
                    flag[1] = false;
                    //循環校驗turn的權限(空自旋),直到P0執行完畢將權限交給P1
                    while (turn == 0){
                        /** do Nothing 空自旋 **/
                    }
                    flag[1] = true;//此時P0應已執行完畢,應當禁止P0進入臨界區
                }
            }
            //當P0的flag為false時,P1可以立即進入臨界區
            /** critical section 臨界區 **/
            //當臨界區執行完之后,turn設置為0,將臨界區訪問權限交換給P0
            //同時將P1的flag設置為false,釋放臨界區,使得P0可進入臨界區
            turn = 0;
            flag[1] = false;
            /** do otherThings **/
        }
    }
    public static void main(){
        /** 並發執行P0 P1 讀者有興趣可自己驗證一下**/
    }
}

      * Peterson算法

/**
 * Peterson算法比Dekker算法更加簡單出色而且很容易推廣到多個線程
 *     1.互斥保護驗證:P0角度
 *      - 當PO設置flag[0]=true,則P1不能進入臨界區
 *      - 當P1已進入臨界區,而flag[1]=true,P0不能進入臨界區
 *     2.避免相互阻塞驗證:P0角度
 *      - 當P0在while循環中被阻塞,此時flag[1]=true且turn=1
 *      - 當flag[1]=false或turn=0,此時P0可以進入臨界區
 *     3.復雜度:該算法用簡單的交替進入臨界區的方式降低了並發互斥的復雜度
 */
public class Peterson {
    boolean[] flag = {false,false};//表明每個互斥線程的位置
    int turn = 0;//解決同時發生的沖突
    public void P0(){
        while (true){
            flag[0] = true;
            //每次都要顯式設置turn=1並作為while空自旋條件,迫使其他線程也有進入臨界區的機會
            //這也是解決互斥的一個簡潔方案,大家依次來,不能重復獨占
            turn = 1;
            while (flag[1] && turn == 1){
                /** do Nothing 空自旋**/
            }
            /** critical section 臨界區 **/
            flag [0] = false;
            /** do otherThings **/
        }
    }
    public void P1(){
        while (true){
            flag[1] = true;
            turn = 0;
            while (flag[0] && turn == 0){
                /** do Nothing 空自旋**/
            }
            /** critical section 臨界區 **/
            flag [1] = false;
            /** do otherThings **/
        }
    }
    public static void main(){
        /** 並發執行P0 P1 讀者有興趣可自己驗證一下**/
    }
}

 

 ■ 信號量

  • 基本原理: N個線程可以通過簡單的信號進行合作,讓一個線程可以被迫在某個位置停止,直到它接收到一個特殊的信號。任何復雜的合作需求都可以通過適當的信號結構得到滿足
  • 組成部分
    •   為了發信號,需要使用一個稱作信號量的特殊變量sem,通常被初始化為非負數
    •   為通過信號量sem傳送信號,線程可執行原語semSignal(sem):此時信號量sem+1,當sem小於或等於0,則被semWait阻塞的線程被阻塞
    •   為了通過信號量sem接收信號,線程可執行原語semWait(sem):此時信號量sem-1,當sem變成負數,則執行semWait的線程被阻塞,否則線程繼續執行
  • 分類:無論是計數信號量還是二元信號量,都需要使用隊列保存在信號量上等待的進程/線程,這就需要決定進程按照什么順序從隊列中移除
    •   強信號量:使用FIFO先見先出公平策略(即被阻塞時間最長的進程/線程最先被隊列釋放)的信號量(常用)
    •   弱信號量:沒有規定進程/線程從隊列中移除順序的信號量
  • 補充: 二元信號量的區別只是sem的值只能是0和1而已

        * 信號量的實現 (CAS版)

/**
 * 設計原則:任何時候只能一個線程可以用wait和signal操作控制一個信號量
 * 要求:semWait和semSingal操作必須作為原子原語實現
 * semaphore信號量(以下都簡稱sem)的屬性 
 *      flag : 表示信號量是否可用,默認是0
 *      count: 
 *          當>=0時,表示可執行semWait而不被掛起的線程數
 *          當<0時,表示掛起在信號量的等待隊列的線程數
 *      queue: 表示信號量相關聯的等待隊列,被阻塞的線程需放入該隊列中
 *  PS:這里我們選用Boolean版本的CAS
 */
semWait(sem){
    //當發現sem.flag不為0時,就自旋等待直到為0
    //補充一點:忙等待可以保證隊列操作的同步,
    //但由於wait和signal執行時間短,其開銷還是很小的
    while(!compare_and_swap(sem.flag,0,1));
    sem.count--;
    if(sem.count < 0){
        /** 該線程進入sem.queue等待隊列中並被阻塞 **/
    }
    sem.flag = 0;
}
semSignal(sem){
    //當發現sem.flag不為0時,就自旋等待直到為0
    while(!compare_and_swap(sem.flag,0,1));
    sem.count++;
    if(sem.count <= 0){
        /** 從sem.queue等待隊列中移出,被移出的線程進入就緒隊列**/
    }
    sem.flag = 0;
}

       * 信號量實現互斥

final int n = /** 線程數 **/
int s = 1;//semaphore
public void P(int i){
    while(true){
        semWait(s);
        /** critical zone 臨界區 **/
        semSignal(s);
        /** do other 其他部分 **/
    }
}

 

■  消息傳遞 (大家比較熟悉的並發機制,當然有時間的話還會專門介紹一個MQ,比如 Rabbit MQ )

 1. 消息傳遞的概述

  • 消息定義:消息傳遞指的是線程間通過發送消息的方式實現相互通信
  • 消息實現:通常會提供一對原語實現 send(destination,message) 、receive(source,message)
  • 發送消息:一個線程以消息massage的形式給另一個指定的目標destination線程發送消息
  • 接收消息:線程通過執行recieve原語接收來自源線程source的消息massage

 2. 消息結構

  • 消息類型: 指定的消息類型,接收者往往會根據該類型進行消息監聽和捕獲
  • 目標ID/源ID: 發送方/源的標識符
  • 消息長度: 整個消息的總長度,注意要控制長度
  • 控制信息: 額外信息,比如創建消息鏈表的指針、記錄源和目標之間傳遞消息數目、順序和序號以及優先級
  • 消息內容: 消息正文,相當於Body
  • 補充: 讀者可以參間ISO中各種協議的包格式,比如HTTP和TCP的包

 

 3. 消息通信情況

  • send:要么發送線程被阻塞直到該消息被目標線程接收,要么不阻塞
  • receive :
    • 若消息在接收之前已經被發送,該消息被目標線程接受並繼續執行
    • 若沒有正在等待的消息,則該目標線程被阻塞直到所等待的消息到達,或者該線程繼續執行,放棄接收

 

 

■ Concurrent 包結構

■ Concurrent 包整體類圖

■ Concurrent包實現機制

  • 綜述: 在整個並發包設計上,Doug Lea大師采用了3.1 Concurrent包整體架構的三層結構
  • 補充: 並發包所涉及的內容筆者會陸續推出對應番進行闡述,敬請期待(進度視筆者的忙碌程度而定)

1. 底層-硬件指令支持

  • 綜述: 並發包最底層是依賴於硬件級別的Volatile和CAS的支持
  • Volatile:借用 Volatile 的內存讀寫語義和阻止重排序保證數據可見性
  • CAS: 借用CAS的高效機器級別原子指令保證內存執行的 讀-改-寫 操作的原子性
  • 組合: 借用 Volatile 變量的讀/寫和CAS實現線程之間的有效通信,保證了原子性、可見性、有序性

2. 中間層-基礎數據結構+算法支持

  • 綜述: 在數據結構和算法的設計使用上,Doug Lea大師專門設計了AQS框架作為所有並發類庫的並發基礎,同時引入非阻塞算法和原子變量類增強了並發特性
  • AQS框架: AQS中提供了最基本、有效的並發API, Doug Lea大師期望其作為所有並發操作的基礎解決方案,並發包中的絕大部分實現都是依賴於AQS(AbstractQueuedSynchronizer),同時 AQS的基礎是 CAS 和 Volatile的底層支持
  • 非阻塞數據結構: 非阻塞數據結構是非阻塞隊列的設計基礎,同時也是阻塞隊列的參考對比的重要依據
  • 原子變量類: Doug Lea大師專門為所有的原子變量設計了專門的類庫,甚至在后期還對齊做了增強,比如 LongAdder、LongAccumulator 等,從側面可以反映出數值操作對於編程的重要性

3. 高層-並發類庫支持

  • 綜述: Doug Lea大師在並發包中已經提供了豐富的並發類庫極大方便了快速、安全的使用並發操作
  • Lock: Lock接口定義了一系列並發操作標准,詳情參見 AQS框架之Lock
  • 同步器: 每個並發類的同步器的實現依賴於AQS(繼承),比如 ReentrantLock 中的Sync;同時筆者也將 並發類 同屬於同步器的范圍內
  • 阻塞隊列: 顧名思義,支持阻塞的隊列,主要是以Queue結尾的類
  • 執行器: 所謂執行器,指的是任務的執行者,比如線程池和Fork-Join
  • 並發容器: 即支持並發的容器,主要包含COW和以Concurrent開頭的類,通常並發容器是非阻塞的

 

PS: 感謝  黃志鵬kira 的友情贊助,繼續加油寫出更棒的文章,后續文章會不斷改進迭代~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM