簡單看看LockSupport和AQS


  這次我們可以看看並發中鎖的原理,大概會說到AQS,ReentrantLock,ReentrantReadWriteLock以及JDK8中新增的StampedLock,這些都是在java並發中很重要的東西,慢慢看吧!

 

一.LockSupport工具類

  LockSupport工具類是jdk中rt.jar里面的,主要作用是掛起和喚醒線程,該類是創建鎖和創建其他同步類的基礎。還有我們要知道,LockSupport這個類是以Unsafe這個類為基礎,講過前面簡單的看了看Unsafe,是不是覺得還是比較熟悉的吧!

  我們先看看LockSupport的park(英文翻譯:停下,坐下)和unpark(英文翻譯:喚醒,啟動)方法,注意,這兩個方法和wait和notify功能很像,但是在這里我更喜歡叫做授權

  簡單的看一個例子:

package com.example.demo.study;
import java.util.concurrent.locks.LockSupport;

public class Study0130 {

    public static void main(String[] args) {
        System.out.println("main begin");                
        LockSupport.park();       
        System.out.println("main end");
    }
}

 

  我們可以看到我們直接調用park方法的話,當前的線程就阻塞了,不能到后面去了,這里我們可以說當前線程沒有被LockSupport類授權,沒有許可證,所以到這里碰到park()這個路口就只能掛了;那么怎么樣才能使得當前線程被授權呢?我們就需要unpark()方法進行授權

package com.example.demo.study;
import java.util.concurrent.locks.LockSupport;

public class Study0130 {

    public static void main(String[] args) {
        //這里就是給當前線程授權了,當前線程可以隨便跑,碰到park都不會掛
        LockSupport.unpark(Thread.currentThread());
        
        System.out.println("main begin");
        LockSupport.park();
        System.out.println("main end");

    }
}

 

  還記得以前的wait和notify的用法么?一個線程A中調用了wait方法,那么線程A就掛起了,如果在線程B中調用notify方法,那么A線程就會被喚醒;這里的park和unpark方法也可以實現這種,看以下代碼:

package com.example.demo.study;

import java.util.concurrent.locks.LockSupport;

public class Study0130 {

    public static void main(String[] args) {

        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("thread1 start");
                //線程1會阻塞
                LockSupport.park();
                System.out.println("thread1 end");
            }
        });

        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("thread2 start");
                //給線程1授權
                LockSupport.unpark(thread1);
                System.out.println("thread2 end");
            }
        });
        thread1.start();
        thread2.start();
    }
}

 

  我們打開LockSupport的park和unpark方法可以發現,就是調用的Unsafe實現的,可惜看不到源碼...

 

  假如我們調用park方法使得線程阻塞太久了也不是我們想看到的,我們還可以使用parkNanos設置阻塞時間,當時間到了,就會自動返回:

 

  最后說一下,還可以調用park方法的時候傳進去一個對象,比如LockSupport.park(this);這樣使用可以使用jstack pid命令查看堆棧信息的時候,可以看到是那個類被阻塞了!

   到此為止,應該就是LockSupport的常用方法了!

 

二.認識AQS

  AQS全稱是AbstractQueuedSynchronizer,叫做抽象同步隊列,用於實現各種同步組件,比如並發包中的鎖就是用這個實現的,把這個弄清楚了,那些鎖的機制就差不多懂了!

  那么所謂的AQS到底是什么呢?其實就是一個有順序的雙向鏈表(或者叫做FIFO雙向隊列,一樣的意思),在這個雙向鏈表中,每一個節點中都可以存放一個線程,節點的所有屬性如下圖所示,我們隨便說幾個;

  prev表示指向前一個節點,next指向后一個節點,thread表示當前節點存儲的一個線程,SHARED表示當前節點存儲的線程是由於獲取共享資源是被阻塞了才被丟到鏈表中的;EXCLUSIVE表示當前節點存儲的線程是由於獲取獨占資源阻塞才被丟到鏈表中來的;

  waitStatus表示當前節點存儲的線程的狀態,可能的狀態有以下幾種:(1)CANCELLED =  1;  表示線程被取消了  (2)SIGNAL    = -1; 表示線程需要喚醒 (3)CONDITION = -2;表示線程在鏈表中等待 (4)PROPAGATE = -3;表示線程釋放共享資源時需要通知其他節點;

  注意,這里其實還有一個狀態,就是waitStatus為0,表示當前節點是初始狀態,所以可以知道當waitStatus大於0的時候是無效狀態,小於零才是有效狀態

  

  這個Node類是AQS的一個內部類,那么怎么通過AQS來訪問這個鏈表呢?下面我們再來看看AQS有哪些屬性可以幫助我們訪問這個雙向鏈表;

//字段 //指向鏈表的頭節點
private transient volatile Node head;
//指向鏈表的尾節點
private transient volatile Node tail;
//狀態信息,這個字段在每個實現類中表達的意思都不一樣,比如在ReentrantLock中表示可重入的次數,
//在Semaphore中表示可用信號的個數等等用法
private volatile int state;

//獲取Unsafe對象,前面用過的,還記得說過為什么可以使用getUnsafe的方式獲取對象,而我們自己的類中卻不能用這種方式
private static final Unsafe unsafe = Unsafe.getUnsafe();

//下面的這幾個屬性就是獲取AQS類中的字段的偏移量,在前幾篇的博客已經說過了這偏移量有什么用
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

//方法 //這幾個方法都是嘗試獲取鎖
public final void acquire(int arg) {}//獨占方式 protected boolean tryAcquire(int arg) {}
public final void acquireShared(int arg) {}//共享方式 public final void acquireInterruptibly(int arg){}//獨占方式
public final void acquireSharedInterruptibly(int arg){}//共享方式
//這幾個方法都是試圖釋放鎖 public final boolean release(int arg) {}//獨占方式 public final boolean releaseShared(int arg) {}//共享方式 protected boolean tryRelease(int arg) {} protected boolean tryReleaseShared(int arg) {}

 

  在AQS中對線程的同步主要的是操作state,對state的操作方式分為兩種,獨占方式和共享方式,至於兩種方式各自的獲取鎖和釋放鎖的方法在上面已經標識出來了!

  這里稍微提一下什么叫做鎖啊?在java多線程中可以把一個對象當做一個鎖,為什么呢?我們可以簡單看看一個普通的java對象(不是數組)在java堆中有哪些組成部分:

  (這里后來思考了一下,這種在對象頭中鎖的標志應該是synchronized使用的鎖,但是對於Lock這種鎖其實就是維護一個AtomicInteger變量來實現的,20200325修改)

 

   一個java對象是由對象頭(Header)、實例數據(Instance Data)和對齊填充(Padding)三部分組成,實例數據和對齊填充可以看做是一類,因為對齊填充就是起到填充空白的作用,因為java對象的字節數必須是8的倍數(對象頭肯定是8的倍數,這里其實就是填充實例數據成8的倍數即可),所以對齊填充可能有也可能沒有;

  對象頭一般有兩部分組成(數組的話還有一個部分,即數組長度),如下所示:

    第一部分:用於存儲對象自身的運行時數據,如哈希碼(HashCode)、GC分代年齡、鎖狀態標志、線程持有的鎖、偏向線程ID、偏向時間戳等,這部分數據的長度在32位和64位的虛擬機(未開啟壓縮指針)中分別為32bit和64bit,官方稱它為“MarkWord”。

    第二部分:對象頭的另外一部分是klass類型指針,即對象指向它的類元數據的指針,虛擬機通過這個指針來確定這個對象是哪個類的實例。

  我們可以看作一個對象就是一個鎖,如果一個線程獲取了某個鎖,那么在這個鎖對象的對象頭的markword中存了某個線程的編號,也就表示該線程持有了該鎖!

  上面說了這么多,我們大概就知道了所謂的AQS就是如下圖所示這樣,維護了一個鏈表,每次只有頭部的這個節點中的線程是運行的,當頭部的線程由於某些原因阻塞了或中斷了,下一個線程才會嘗試獲取資源,重復如此

  然后我們再來說說一個線程以獨占方式獲取資源或者是共享方式獲取資源;

 

三.獨占方式

  當一個線程要以獨占方式獲取該資源,說得直白一點就是實現一個獨占鎖,類似synchorized代碼塊一樣,對共享資源的操作都在這個代碼塊中,一個線程只有先獲取這個鎖才能進入到代碼塊中操作共享資源,其他線程嘗試獲取鎖的時候,和這個鎖中對象頭的線程編號比較如果不一樣,那就只能將這個線程放到鏈表中存起來,然后該線程掛起來,等條件滿足之后再喚醒,就是使用LockSupport的park和unpark方法實現的。

  就以ReentrantLock為例,一個線程獲取到了ReentrantLock的鎖之后,在AQS中就會首先使用CAS將state從0變為1,然后設置當前鎖為本線程所持有;如果當前線程繼續嘗試獲取鎖,那么只會將state從1變為2,其他的沒啥變化,這也叫做可重入次數;當如果其他線程去嘗試獲取鎖的時候,那么發現鎖對象的對象頭中不是自己線程編號,於是就丟進了阻塞隊列中掛起;

  1.當線程通過acquire(int arg)獲取獨占資源時: 

public final void acquire(int arg) {
     //1.tryAcquire方法沒有實現,這個方法主要是留給具體子類去實現,通過具體場景去用CAS修改state的值,修改成功返回true,否則false
     //2.如果修改state的值失敗,就會到第二個條件這里,這里會將當前線程封裝成一個Node.EXCLUSIVE類型的節點,然后存到鏈表尾端,最后在acquireQueued方法內部會調用
      LockSupport.park(this);方法阻塞線程
//3.調用selfInterrupt方法中斷當前的線程,為什么要這樣呢?因為一個線程在阻塞隊列中等待,這時通過某種方式把它中斷了,不會立即看到效果的,
   //只會在這個線程獲取資源后再調用selfInterrupt方法將中斷補上
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //中斷當前線程 static void selfInterrupt() { Thread.currentThread().interrupt(); }

 

  2.當線程通過release(int arg)釋放獨占資源時:

public final boolean release(int arg) {
    //tryRelease方法沒有實現,子類根據具體場景是實現,其實就是修改state的值
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        //這個方法在下面,里面會調用LockSupport.unpark(s.thread)方法激活阻塞隊列中的一個節點的線程,而這個激活的線程會通過tryAcquire嘗試當前的state是否滿足自己的需要
     //滿足條件的話就運行,不滿足的話還是會掛起 unparkSuccessor(h); return true; } return false; }

  通過簡單的看了這獲取資源和釋放資源我們可以看到底層還是使用的Unsafe的park和unpark方法,還有就是tryAcquire()方法和tryRelease()方法需要在具體的子類自己實現,在其中就是對AQS中state的修改,子類還需要定義state這個狀態值的增減是什么含義;

  例如ReentrantLock繼承自AQS的實現中,state為0表示鎖空閑,為1表示鎖被占用,在重寫tryAcquire()方法的時候,需要用CAS將state的值從0改為1,並且設置當前鎖的持有者就是當前線程;而重寫tryRelease()方法的時候,就需要用CAS將state的值從1改為0,然后設置當前鎖的持有者為null

 

四.共享方式

  知道了獨占方式之后,共享方式就簡單了,什么叫做共享?同一時間可以有多個線程獲取資源,這就叫做共享!!!

  一個線程嘗試去獲取資源成功后,此時另外一個線程也可以直接用CAS去嘗試獲取資源,成功的話就修改,失敗的話就丟進鏈表中存起來;例如Semaphore信號量,當一個線程通過acquire()方法獲取信號量的時候,信號量滿足條件就通過CAS去獲取,不滿足就將線程丟到鏈表里面;

  共享方式和前面的獨占方式其實很像,我們也來簡單的看一看:

  1.當線程通過acquireShared(int arg)獲取共享資源時:

 public final void acquireShared(int arg) {
    //tryAcquireShared方法也是沒有實現,留給具體子類會根據實際情況實現,會設置state的值,設置成功就直接返回
    //設置失敗的話就進入到doAcquireShared方法中,這個方法里會將當前線程封裝為Node.SHARED類型的節點,然后放到阻塞隊列的最后面
   //使用LockSupport.park(this)方法掛起自己
if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }

 

  2.當線程通過releaseShared(int arg)釋放共享資源時:

public final boolean releaseShared(int arg) {
    //tryReleaseShared方法由子類實現,修改state的值,嘗試釋放資源
    //釋放資源成功的話,然后使用LockSupport.unpark(thread)去喚醒阻塞隊列中的一個線程
    //激活的線程會使用tryReleaseShared查看當前state的值是否符合自己的需要,滿足則激活,向下運行,否則還是被放在AQS阻塞隊列中掛起
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

  例如讀寫鎖ReentrantReadWriteLock就是繼承自AQS的實現,由於state是int類型的,32位,高16位表示獲取讀鎖的次數,所以讀鎖的tryAcquireShared方法實現中,首先檢查寫鎖是否被其他線程持有,是則返回false,否則就用CAS將state的高16位+1;在讀鎖的tryReleaseShared的實現中,內部使用CAS將state的高16位減一,成功的話就返回true,失敗的話返回false

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM