解決多線程安全問題


一定要看后面的文章,先說結論: 

非公平鎖tryAcquire的流程是:檢查state字段,若為0,表示鎖未被占用,那么嘗試占用,若不為0,檢查當前鎖是否被自己占用,若被自己占用,則更新state字段,表示重入鎖的次數。如果以上兩點都沒有成功,則獲取鎖失敗,返回false。

 

還有其他的鎖,如果想要了解,參考:JAVA鎖機制-可重入鎖,可中斷鎖,公平鎖,讀寫鎖,自旋鎖,

用synchronized實現ReentrantLock 美團面試題參考:使用synchronized 實現ReentrantLock(美團面試題目)

前幾天去百度面試,面試官問多線程如何解決並發問題,感覺自己對lock的原理了解不夠,這里對兩種方式synchronized和lock做個系統的總結:

解決多線程的並發安全問題,java無非就是加鎖,具體就是兩個方法

(1) Synchronized(java自帶的關鍵字)

(2) lock 可重入鎖 (可重入鎖這個包java.util.concurrent.locks 底下有兩個接口,分別對應兩個類實現了這個兩個接口: 

       (a)lock接口, 實現的類為:ReentrantLock類 可重入鎖;

       (b)readwritelock接口,實現類為:ReentrantReadWriteLock 讀寫鎖)

也就是說有三種:

(1)synchronized 是互斥鎖;

(2)ReentrantLock 顧名思義 :可重入鎖

(3)ReentrantReadWriteLock :讀寫鎖

讀寫鎖特點:

a)多個讀者可以同時進行讀
b)寫者必須互斥(只允許一個寫者寫,也不能讀者寫者同時進行)
c)寫者優先於讀者(一旦有寫者,則后續讀者必須等待,喚醒時優先考慮寫者)

總結來說,Lock和synchronized有以下幾點不同:

1)Lock是一個接口,而synchronized是Java中的關鍵字,synchronized是內置的語言實現;
2)當synchronized塊結束時,會自動釋放鎖,lock一般需要在finally中自己釋放。synchronized在發生異常時,會自動釋放線程占有的鎖,因此不會導致死鎖現象發生;而Lock在發生異常時,如果沒有主動通過unLock()去釋放鎖,則很可能造成死鎖現象,因此使用Lock時需要在finally塊中釋放鎖;
3)lock等待鎖過程中可以用interrupt來終端等待,而synchronized只能等待鎖的釋放,不能響應中斷。
4)lock可以通過trylock來知道有沒有獲取鎖,而synchronized不能; 

5. 當synchronized塊執行時,只能使用非公平鎖,無法實現公平鎖,而lock可以通過new ReentrantLock(true)設置為公平鎖,從而在某些場景下提高效率。

6、LLock可以提高多個線程進行讀操作的效率。(可以通過readwritelock實現讀寫分離)
7、synchronized 鎖類型 可重入 不可中斷 非公平 而 lock 是: 可重入 可判斷 可公平(兩者皆可) 
在性能上來說,如果競爭資源不激烈,兩者的性能是差不多的,而當競爭資源非常激烈時(即有大量線程同時競爭),此時Lock的性能要遠遠優於synchronized。所以說,在具體使用時要根據適當情況選擇。 

首先看一下Synchronized的原理:

1、synchronized

把代碼塊聲明為 synchronized,有兩個重要后果,通常是指該代碼具有 原子性(atomicity)和 可見性(visibility)

實現原子性的算范為CAS(Compare and Swap) 參考:Java多線程系列——原子類的實現(CAS算法)

(1) 原子性

原子性意味着個時刻,只有一個線程能夠執行一段代碼,這段代碼通過一個monitor object保護。從而防止多個線程在更新共享狀態時相互沖突。

 (2)  可見性

可見性則更為微妙,它要對付內存緩存和編譯器優化的各種反常行為。啥是可見性呢?

答:它必須確保釋放鎖之前對共享數據做出的更改對於隨后獲得該鎖的另一個線程是可見的 。

作用:如果沒有同步機制提供的這種可見性保證,線程看到的共享變量可能是修改前的值或不一致的值,這將引發許多嚴重問題。 

一般來說,線程以某種不必讓其他線程立即可以看到的方式(不管這些線程在寄存器中、在處理器特定的緩存中,還是通過指令重排或者其他編譯器優化),不受緩存變量值的約束,但是如果開發人員使用了同步,那么運行庫將確保某一線程對變量所做的更新先於對現有synchronized 塊所進行的更新,當進入由同一監控器(lock)保護的另一個synchronized 塊時,將立刻可以看到這些對變量所做的更新。類似的規則也存在於volatile變量上。

——volatile只保證可見性,不保證原子性! 

(3)synchronize的限制:

  1. 當線程嘗試獲取鎖的時候,如果獲取不到鎖會一直阻塞, 它無法中斷一個正在等候獲得鎖的線程;
  2. 如果獲取鎖的線程進入休眠或者阻塞,除非當前線程異常,否則其他線程嘗試獲取鎖必須一直等待,也無法通過投票得到鎖,如果不想等下去,也就沒法得到鎖。

2、ReentrantLock (可重入鎖) 

何為可重入(美團面試提問過此處):參考:如何理解ReentrantLock的可重入和互斥?

可重入的意思是某一個線程是否可多次獲得一個鎖,在繼承的情況下,如果不是可重入的,那就形成死鎖了,比如遞歸調用自己的時候;,如果不能可重入,每次都獲取鎖不合適,比如synchronized就是可重入的,ReentrantLock也是可重入的

鎖的概念就不用多解釋了,當某個線程A已經持有了一個鎖,當線程B嘗試進入被這個鎖保護的代碼段的時候.就會被阻塞.而鎖的操作粒度是”線程”,而不是調用(至於為什么要這樣,下面解釋).同一個線程再次進入同步代碼的時候.可以使用自己已經獲取到的鎖,這就是可重入鎖java里面內置鎖(synchronize)和Lock(ReentrantLock)都是可重入的 

我自己寫了個例子:   

View Code

 

 2.1 . 為什么要可重入 

如果線程A繼續再次獲得這個鎖呢?比如一個方法是synchronized,遞歸調用自己,那么第一次已經獲得了鎖,第二次調用的時候還能進入嗎? 直觀上當然需要能進入.這就要求必須是可重入的.可重入鎖又叫做遞歸鎖,不然就死鎖了。 

 它實現方式是:

為每個鎖關聯一個獲取計數器和一個所有者線程,當計數值為0的時候,這個所就沒有被任何線程只有.當線程請求一個未被持有的鎖時,JVM將記下鎖的持有者,並且將獲取計數值置為1,如果同一個線程再次獲取這個鎖,技術值將遞增,退出一次同步代碼塊,計算值遞減,當計數值為0時,這個鎖就被釋放.ReentrantLock里面有實現

其實也有不可重入鎖:這個還真有.Linux下的pthread_mutex_t鎖是默認是非遞歸的。可以通過設置PTHREAD_MUTEX_RECURSIVE屬性,將pthread_mutex_t鎖設置為遞歸鎖。如果要自己實現不可重入鎖,同可重入鎖,這個計數器只能為1.或者0,再次進入的時候,發現已經是1了,就進行阻塞.jdk里面沒有默認的實現類.

Java.util.concurrent.lock 中的Lock 框架是鎖定的一個抽象,Lock彌補了synchronized的局限,提供了更加細粒度的加鎖功能。  

ReentrantLock 類是唯一實現了Lock的類 ,它擁有與synchronized 相同的並發性和內存語義,但是添加了類似鎖投票定時鎖等候可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的性能。(換句話說,當許多線程都想訪問共享資源時,JVM 可以花更少的時候來調度線程,把更多時間用在執行線程上。)  

用sychronized修飾的方法或者語句塊在代碼執行完之后鎖自動釋放,而是用Lock需要我們手動釋放鎖,所以為了保證鎖最終被釋放(發生異常情況),要把互斥區放在try內,釋放鎖放在finally內!!  

Lock 接口api如下  

復制代碼
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
復制代碼

 

 lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用來獲取鎖的。

unLock()方法是用來釋放鎖的。
在Lock中聲明了四個方法來獲取鎖,那么這四個方法有何區別呢?

  首先lock()方法是平常使用得最多的一個方法,就是用來獲取鎖。如果鎖已被其他線程獲取,則進行等待。

  由於在前面講到如果采用Lock,必須主動去釋放鎖,並且在發生異常時,不會自動釋放鎖。因此一般來說,使用Lock必須在try{}catch{}塊中進行,並且將釋放鎖的操作放在finally塊中進行,以保證鎖一定被被釋放,防止死鎖的發生。通常使用Lock來進行同步的話,是以下面這種形式去使用的: 

復制代碼
Lock lock = ...;
lock.lock();
try{
//處理任務
}catch(Exception ex){

}finally{
lock.unlock(); //釋放鎖
}
復制代碼

     tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待。

  tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。

  所以,一般情況下通過tryLock來獲取鎖時是這樣使用的: 

復制代碼
Lock lock = ...;
if(lock.tryLock()) {
try{
//處理任務
}catch(Exception ex){

}finally{
lock.unlock(); //釋放鎖
} 
}else {
//如果不能獲取鎖,則直接做其他事情
}
復制代碼

  

   lockInterruptibly()方法比較特殊,當通過這個方法去獲取鎖時,如果線程正在等待獲取鎖,則這個線程能夠響應中斷,即中斷線程的等待狀態。也就使說,當兩個線程同時通過lock.lockInterruptibly()想獲取某個鎖時,假若此時線程A獲取到了鎖,而線程B只有在等待,那么對線程B調用threadB.interrupt()方法能夠中斷線程B的等待過程。

  由於lockInterruptibly()的聲明中拋出了異常,所以lock.lockInterruptibly()必須放在try塊中或者在調用lockInterruptibly()的方法外聲明拋出InterruptedException。

  因此lockInterruptibly()一般的使用形式如下: 

復制代碼
public void method() throws InterruptedException { lock.lockInterruptibly(); try { //..... } finally { lock.unlock(); } }
復制代碼

 注意,當一個線程獲取了鎖之后,是不會被interrupt()方法中斷的。單獨調用interrupt()方法不能中斷正在運行過程中的線程,只能中斷阻塞過程中的線程。

  因此當通過lockInterruptibly()方法獲取某個鎖時,如果不能獲取到,只有進行等待的情況下,是可以響應中斷的。

  而用synchronized修飾的話,當一個線程處於等待某個鎖的狀態,是無法被中斷的,只有一直等待下去。   

2 AQS

    AbstractQueuedSynchronizer簡稱AQS,是一個用於構建鎖和同步容器的框架。事實上concurrent包內許多類都是基於AQS構建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解決了在實現同步容器時設計的大量細節問題。

    AQS使用一個FIFO的隊列表示排隊等待鎖的線程,它維護一個status的變量,每個節點維護一個waitstatus的變量,當線程獲取到鎖的時候,隊列的status置為1,此線程執行完了,那么它的waitstatus為-1;隊列頭部的線程執行完畢之后,它會調用它的后繼的線程(百度面試)。

隊列頭節點稱作“哨兵節點”或者“啞節點”,它不與任何線程關聯。其他的節點與等待線程關聯,每個節點維護一個等待狀態waitStatus。如圖

     AQS中還有一個表示狀態的字段state,例如ReentrantLocky用它表示線程重入鎖的次數,Semaphore用它表示剩余的許可數量,FutureTask用它表示任務的狀態。對state變量值的更新都采用CAS操作保證更新操作的原子性。

    AbstractQueuedSynchronizer繼承了AbstractOwnableSynchronizer,這個類只有一個變量:exclusiveOwnerThread,表示當前占用該鎖的線程,並且提供了相應的get,set方法。

    理解AQS可以幫助我們更好的理解JCU包中的同步容器。

3 lock()與unlock()實現原理

        ReentrantLock是Lock的默認實現之一。那么lock()和unlock()是怎么實現的呢?首先我們要弄清楚幾個概念

  • 可重入鎖。可重入鎖是指同一個線程可以多次獲取同一把鎖。ReentrantLock和synchronized都是可重入鎖。
  • 可中斷鎖。可中斷鎖是指線程嘗試獲取鎖的過程中,是否可以響應中斷。synchronized是不可中斷鎖,而ReentrantLock則提供了中斷功能。
  • 公平鎖與非公平鎖。公平鎖是指多個線程同時嘗試獲取同一把鎖時,獲取鎖的順序按照線程達到的順序,而非公平鎖則允許線程“插隊”。synchronized是非公平鎖,而ReentrantLock的默認實現是非公平鎖,但是也可以設置為公平鎖。
  • CAS操作(CompareAndSwap)。CAS操作簡單的說就是比較並交換。CAS 操作包含三個操作數 —— 內存位置(V)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該位置的值。CAS 有效地說明了“我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。” Java並發包(java.util.concurrent)中大量使用了CAS操作,涉及到並發的地方都調用了sun.misc.Unsafe類方法進行CAS操作。

    ReentrantLock提供了兩個構造器,分別是 

復制代碼
public ReentrantLock() {
    sync = new NonfairSync();
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
復制代碼

 

    默認構造器初始化為NonfairSync對象,即非公平鎖,而帶參數的構造器可以指定使用公平鎖和非公平鎖。由lock()和unlock的源碼可以看到,它們只是分別調用了sync對象的lock()和release(1)方法。

    Sync是ReentrantLock的內部類,它的結構如下

 可以看到Sync擴展了AbstractQueuedSynchronizer。

3.3 NonfairSync

    我們從源代碼出發,分析非公平鎖獲取鎖和釋放鎖的過程。 

3.3.1 lock() 

    lock()源碼如下 

復制代碼
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
復制代碼

      首先用一個CAS操作,判斷state是否是0(表示當前鎖未被占用),如果是0則把它置為1,並且設置當前線程為該鎖的獨占線程,表示獲取鎖成功。當多個線程同時嘗試占用同一個鎖時,CAS操作只能保證一個線程操作成功,剩下的只能乖乖的去排隊啦。

    “非公平”即體現在這里,如果占用鎖的線程剛釋放鎖,state置為0,而排隊等待鎖的線程還未喚醒時,新來的線程就直接搶占了該鎖,那么就“插隊”了(請注意此處的非公平鎖是指新來的線程跟隊列頭部的線程競爭鎖,隊列其他的線程還是正常排隊,百度面試題)。

    若當前有三個線程去競爭鎖,假設線程A的CAS操作成功了,拿到了鎖開開心心的返回了,那么線程B和C則設置state失敗,走到了else里面。我們往下看acquire。

acquire(arg)

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

代碼非常簡潔,但是背后的邏輯卻非常復雜,可見Doug Lea大神的編程功力。

 1. 第一步。嘗試去獲取鎖。如果嘗試獲取鎖成功,方法直接返回。

tryAcquire(arg) 

復制代碼
final boolean nonfairTryAcquire(int acquires) {
    //獲取當前線程
    final Thread current = Thread.currentThread();
    //獲取state變量值
    int c = getState();
    if (c == 0) { //沒有線程占用鎖
        if (compareAndSetState(0, acquires)) {
            //占用鎖成功,設置獨占線程為當前線程
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { //當前線程已經占用該鎖
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        // 更新state值為新的重入次數
        setState(nextc);
        return true;
    }
    //獲取鎖失敗
    return false;
}
復制代碼

 

    非公平鎖tryAcquire的流程是:檢查state字段,若為0,表示鎖未被占用,那么嘗試占用,若不為0,檢查當前鎖是否被自己占用,若被自己占用,則更新state字段,表示重入鎖的次數。如果以上兩點都沒有成功,則獲取鎖失敗,返回false。

2. 第二步,入隊。由於上文中提到線程A已經占用了鎖,所以B和C執行tryAcquire失敗,並且入等待隊列。如果線程A拿着鎖死死不放,那么B和C就會被掛起。

先看下入隊的過程。

先看addWaiter(Node.EXCLUSIVE) 

復制代碼
/**
 * 將新節點和當前線程關聯並且入隊列
 * @param mode 獨占/共享
 * @return 新節點
 */
private Node addWaiter(Node mode) {
    //初始化節點,設置關聯線程和模式(獨占 or 共享)
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取尾節點引用
    Node pred = tail;
    // 尾節點不為空,說明隊列已經初始化過
    if (pred != null) {
        node.prev = pred;
        // 設置新節點為尾節點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 尾節點為空,說明隊列還未初始化,需要初始化head節點並入隊新節點
    enq(node);
    return node;
}
復制代碼

 B、C線程同時嘗試入隊列,由於隊列尚未初始化,tail==null,故至少會有一個線程會走到enq(node)。我們假設同時走到了enq(node)里。 

復制代碼
/**
 * 初始化隊列並且入隊新節點
 */
private Node enq(final Node node) {
    //開始自旋
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果tail為空,則新建一個head節點,並且tail指向head
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // tail不為空,將新節點入隊
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
復制代碼

 

這里體現了經典的自旋+CAS組合來實現非阻塞的原子操作。由於compareAndSetHead的實現使用了unsafe類提供的CAS操作,所以只有一個線程會創建head節點成功。假設線程B成功,之后B、C開始第二輪循環,此時tail已經不為空,兩個線程都走到else里面。假設B線程compareAndSetTail成功,那么B就可以返回了,C由於入隊失敗還需要第三輪循環。最終所有線程都可以成功入隊。

     當B、C入等待隊列后,此時AQS隊列如下:

3. 第三步,掛起。B和C相繼執行acquireQueued(final Node node, int arg)。這個方法讓已經入隊的線程嘗試獲取鎖,若失敗則會被掛起。 

復制代碼
/**
 * 已經入隊的線程嘗試獲取鎖
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //標記是否成功獲取鎖
    try {
        boolean interrupted = false; //標記線程是否被中斷過
        for (;;) {
            final Node p = node.predecessor(); //獲取前驅節點
            //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 獲取成功,將當前節點設置為head節點
                p.next = null; // 原head節點出隊,在某個時間點被GC回收
                failed = false; //獲取成功
                return interrupted; //返回是否被中斷過
            }
            // 判斷獲取失敗后是否可以掛起,若可以則掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 線程若被中斷,設置interrupted為true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
復制代碼

 

code里的注釋已經很清晰的說明了acquireQueued的執行流程。假設B和C在競爭鎖的過程中A一直持有鎖,那么它們的tryAcquire操作都會失敗,因此會走到第2個if語句中。我們再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了哪些事吧。 

復制代碼
/**
 * 判斷當前線程獲取鎖失敗之后是否需要掛起.
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驅節點的狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 前驅節點狀態為signal,返回true
        return true;
    // 前驅節點狀態為CANCELLED
    if (ws > 0) {
        // 從隊尾向前尋找第一個狀態不為CANCELLED的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 將前驅節點的狀態設置為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  
/**
 * 掛起當前線程,返回線程中斷狀態並重置
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
復制代碼

 

    線程入隊后能夠掛起的前提是,它的前驅節點的狀態為SIGNAL,它的含義是“Hi,前面的兄弟,如果你獲取鎖並且出隊后,記得把我喚醒!”。所以shouldParkAfterFailedAcquire會先判斷當前節點的前驅是否狀態符合要求,若符合則返回true,然后調用parkAndCheckInterrupt,將自己掛起。如果不符合,再看前驅節點是否>0(CANCELLED),若是那么向前遍歷直到找到第一個符合要求的前驅,若不是則將前驅節點的狀態設置為SIGNAL。

也就是說當隊列頭部的線程執行完了之后,這個線程會調用后面的隊列的第一個線程(百度面試)。

     整個流程中,如果前驅結點的狀態不是SIGNAL,那么自己就不能安心掛起,需要去找個安心的掛起點,同時可以再嘗試下看有沒有機會去嘗試競爭鎖。

    最終隊列可能會如下圖所示

  線程B和C都已經入隊,並且都被掛起。當線程A釋放鎖的時候,就會去喚醒線程B去獲取鎖啦。

3.3.2 unlock()

unlock相對於lock就簡單很多。源碼如下 

復制代碼
public void unlock() {
    sync.release(1);
}
  
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
復制代碼

 

如果理解了加鎖的過程,那么解鎖看起來就容易多了。流程大致為先嘗試釋放鎖,若釋放成功,那么查看頭結點的狀態是否為SIGNAL,如果是則喚醒頭結點的下個節點關聯的線程,如果釋放失敗那么返回false表示解鎖失敗。這里我們也發現了,每次都只喚起頭結點的下一個節點關聯的線程

   最后我們再看下tryRelease的執行過程 

復制代碼
/**
 * 釋放當前線程占用的鎖
 * @param releases
 * @return 是否釋放成功
 */
protected final boolean tryRelease(int releases) {
    // 計算釋放后state值
    int c = getState() - releases;
    // 如果不是當前線程占用鎖,那么拋出異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        // 鎖被重入次數為0,表示釋放成功
        free = true;
        // 清空獨占線程
        setExclusiveOwnerThread(null);
    }
    // 更新state值
    setState(c);
    return free;
}
復制代碼

 

這里入參為1。tryRelease的過程為:當前釋放鎖的線程若不持有鎖,則拋出異常。若持有鎖,計算釋放后的state值是否為0,若為0表示鎖已經被成功釋放,並且則清空獨占線程,最后更新state值,返回free。 

3.3.3 小結

    用一張流程圖總結一下非公平鎖的獲取鎖的過程。    

 
 

3.4 FairSync

    公平鎖和非公平鎖不同之處在於,公平鎖在獲取鎖的時候,不會先去檢查state狀態,而是直接執行aqcuire(1),這里不再贅述。    

4 超時機制

    在ReetrantLock的tryLock(long timeout, TimeUnit unit) 提供了超時獲取鎖的功能。它的語義是在指定的時間內如果獲取到鎖就返回true,獲取不到則返回false。這種機制避免了線程無限期的等待鎖釋放。那么超時的功能是怎么實現的呢?我們還是用非公平鎖為例來一探究竟。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

 還是調用了內部類里面的方法。我們繼續向前探究  

復制代碼
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
復制代碼

 

這里的語義是:如果線程被中斷了,那么直接拋出InterruptedException。如果未中斷,先嘗試獲取鎖,獲取成功就直接返回,獲取失敗則進入doAcquireNanos。tryAcquire我們已經看過,這里重點看一下doAcquireNanos做了什么。 

復制代碼
/**
 * 在有限的時間內去競爭鎖
 * @return 是否獲取成功
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 起始時間
    long lastTime = System.nanoTime();
    // 線程入隊
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 又是自旋!
        for (;;) {
            // 獲取前驅節點
            final Node p = node.predecessor();
            // 如果前驅是頭節點並且占用鎖成功,則將當前節點變成頭結點
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 如果已經超時,返回false
            if (nanosTimeout <= 0)
                return false;
            // 超時時間未到,且需要掛起
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 阻塞當前線程直到超時時間到期
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            // 更新nanosTimeout
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                //相應中斷
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
復制代碼

 

doAcquireNanos的流程簡述為:線程先入等待隊列,然后開始自旋,嘗試獲取鎖,獲取成功就返回,失敗則在隊列里找一個安全點把自己掛起直到超時時間過期。這里為什么還需要循環呢?因為當前線程節點的前驅狀態可能不是SIGNAL,那么在當前這一輪循環中線程不會被掛起,然后更新超時時間,開始新一輪的嘗試 

3、讀寫鎖ReentrantReadWriteLock

接口 ReadWriteLock,有個實現類是ReentrantReadWriteLock

讀讀互不干擾,寫寫互斥,如果有讀也有寫,那么寫線程要優先讀線程

對!讀取線程不應該互斥!

我們可以用讀寫鎖ReadWriteLock實現:

復制代碼
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; class Data { private int data;// 共享數據 private ReadWriteLock rwl = new ReentrantReadWriteLock(); public void set(int data) { rwl.writeLock().lock();// 取到寫鎖 try { System.out.println(Thread.currentThread().getName() + "准備寫入數據"); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + "寫入" + this.data); } finally { rwl.writeLock().unlock();// 釋放寫鎖  } } public void get() { rwl.readLock().lock();// 取到讀鎖 try { System.out.println(Thread.currentThread().getName() + "准備讀取數據"); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "讀取" + this.data); } finally { rwl.readLock().unlock();// 釋放讀鎖  } } } 
復制代碼

 

與互斥鎖定相比,讀-寫鎖定允許對共享數據進行更高級別的並發訪問。雖然一次只有一個線程(writer 線程)可以修改共享數據,但在許多情況下,任何數量的線程可以同時讀取共享數據(reader 線程) 

從理論上講,與互斥鎖定相比,使用讀-寫鎖定所允許的並發性增強將帶來更大的性能提高。

在實踐中,只有在多處理器上並且只在訪問模式適用於共享數據時,才能完全實現並發性增強。——例如,某個最初用數據填充並且之后不經常對其進行修改的 collection,因為經常對其進行搜索(比如搜索某種目錄),所以這樣的 collection 是使用讀-寫鎖定的理想候選者。 

4、線程間通信Condition

Condition可以替代傳統的線程間通信,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll()。

——為什么方法名不直接叫wait()/notify()/nofityAll()?因為Object的這幾個方法是final的,不可重寫!

 

傳統線程的通信方式,Condition都可以實現。

注意,Condition是被綁定到Lock上的,要創建一個Lock的Condition必須用newCondition()方法。 

Condition的強大之處在於它可以為多個線程間建立不同的Condition

看JDK文檔中的一個例子:假定有一個綁定的緩沖區,它支持 put 和 take 方法。如果試圖在空的緩沖區上執行take 操作,則在某一個項變得可用之前,線程將一直阻塞;如果試圖在滿的緩沖區上執行 put 操作,則在有空間變得可用之前,線程將一直阻塞。我們喜歡在單獨的等待 set 中保存put 線程和take 線程,這樣就可以在緩沖區中的項或空間變得可用時利用最佳規划,一次只通知一個線程。可以使用兩個Condition 實例來做到這一點。

——其實就是java.util.concurrent.ArrayBlockingQueue的功能

優點:
假設緩存隊列中已經存滿,那么阻塞的肯定是寫線程,喚醒的肯定是讀線程,相反,阻塞的肯定是讀線程,喚醒的肯定是寫線程。 

 

如果想查看 線程5個狀態 請參考:Java線程的5種狀態及切換(透徹講解)-京東面試

以下是補充的知識點:

1、線程與進程:

在開始之前先把進程與線程進行區分一下,一個程序最少需要一個進程,而一個進程最少需要一個線程。

線程是程序執行流的最小單位,而進程是系統進行資源分配和調度的一個獨立單位。  

 

2.java.util.concurrent.locks包常用類  

2.2 ReentrantLock
  ReentrantLock,意思是“可重入鎖”,ReentrantLock是唯一實現了Lock接口的類,並且ReentrantLock提供了更多的方法。
詳見:java.util.concurrent.locks.ReentrantLock ,不再列舉了。

2.3 ReadWriteLock
接口,只定義了兩個方法:

Lock readLock();
Lock writeLock();
一個用來獲取讀鎖,一個用來獲取寫鎖。也就是說將文件的讀寫操作分開,分成2個鎖來分配給線程,從而使得多個線程可以同時進行讀操作。

2.4 ReentrantReadWriteLock
實現了ReadWriteLock接口。

下面嘗試寫個例子,表示ReadWriteLock和使用synchronized的區別。 

 
1.如果有一個線程已經占用了讀鎖,則此時其他線程如果要申請寫鎖,則申請寫鎖的線程會一直等待釋放讀鎖。

2.如果有一個線程已經占用了寫鎖,則此時其他線程如果申請寫鎖或者讀鎖,則申請的線程會一直等待釋放寫鎖。 

 


參考:  lock與synchronized區別詳解

參考:  Synchronized與Lock的區別與應用場景

參考:lock和synchronized的同步區別與選擇 

參考:ReentrantLock實現原理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM