深入理解Java並發框架AQS系列(二):AQS框架簡介及鎖概念


深入理解Java並發框架AQS系列(一):線程
深入理解Java並發框架AQS系列(二):AQS框架簡介及鎖概念
深入理解Java並發框架AQS系列(三):獨占鎖(Exclusive Lock)
深入理解Java並發框架AQS系列(四):共享鎖(Shared Lock)
深入理解Java並發框架AQS系列(五):條件隊列(Condition)

一、AQS框架簡介

AQS誕生於Jdk1.5,在當時低效且功能單一的synchronized的年代,某種意義上講,她拯救了Java

注:本系列文章所有測試用例均基於jdk1.8,操作系統為macOS

1.1、思考

我們去學習一個知識點或開啟一個新課題時,最好是帶着問題去學習,這樣針對性比較強,且印象比較深刻,主動思考帶給我們帶來了無窮的好處

拋開AQS,設想以下問題:

  • Q:如果我們遇到 thread 無法獲取所需資源時,該如何操作?
  • A:不斷重試唄,一旦資源釋放可快速嘗試獲取
     
  • Q:那如果資源持有時長較長,不斷循環獲取,是否比較浪費CPU ?
  • A:的確,那就讓線程休息1秒鍾,再嘗試獲取,這樣就不會導致CPU空轉了
     
  • Q:那如果資源在第0.1秒時被釋放,那線程豈不是要白白等待0.9秒了 ?
  • A:實在不行就讓當前線程掛起,等釋放資源的線程去通知當前線程,這樣就不存在等待時間長短的問題了
     
  • Q:但如果資源持有時間很短,每次都掛起、喚醒線程成為了一個很大的開銷
  • A:那就依情況而定,lock時間短的,就不斷循環重試,時間長的就掛起
     
  • Q:如何界定lock的時間長短?還有就是如果lock的時間不固定,也無法預期呢?
  • A:唔。。。這是個問題
     
  • Q:如果線程等待期間,我想放棄呢?
  • A:。。。。。。
     
  • Q:還有很多問題
    • 如果我想動態增加資源呢?
    • 如何我不想產生飢餓,而保證加鎖的有序性呢?
    • 或者我要支持/不支持可重入特性呢?
    • 我要查看所有等待資源的線程狀態呢?
    • 。。。。。。

我們發現,一個簡單的等待資源的問題,牽扯出后續諸多龐雜且無頭緒的問題;加鎖不僅依賴一套完善的框架體系,還要具體根據使用場景而定,才能接近最優解;那我們即將要引出的AQS能完美解決上述這些問題嗎?

答案是肯定的:不能

其實Doug Lea也意識到問題的復雜性,不可能出一個超級工具來解決所有問題,所以他把AQS設計為一個abstract類,並提供一系列子類去解決不同場景的問題,例如ReentrantLockSemaphore等;當我們發現這些子類也不能滿足我們加鎖需求時,我們可以定義自己的子類,通過重寫兩三個方法,寥寥幾行代碼,實現強大的功能,這一切都得益於AQS作者敏銳的前瞻性

指的一提的是,雖然我們可以用某個子類去實現另一個子類所提供的功能(例如使用Semaphore替代CountDownLatch),但其易用、簡潔、高效性等能否達到理想效果,都值得商榷;就好比在陸地上穿着雪橇走路,雖能前進,卻低效易摔跤

1.2、並發框架

本小節僅帶大家對AQS架構有個初步了解,在后文的獨占鎖、共享鎖等中會詳細闡述。下圖為AQS框架的主體結構

從上圖中我們看到了AQS中非常關鍵的一個概念:“阻塞隊列”。即AQS的理念是當線程無法獲取資源時,提供一個FIFO類型的有序隊列,用來維護所有處於“等待中”的線程。看似無解可擊的框架設計,同時也牽出另外的一個問題:阻塞隊列一定高效嗎?

當“同步塊邏輯”執行很快時,我們列出兩種場景

  • 場景1:直接使用AQS框架,例如試用其子類ReentrantLock,遇到資源爭搶,放阻塞隊列
  • 場景2:因為鎖占用時間短,無限重試

針對這2種場景,我們寫測試用例比較一下

package org.xijiu.share.aqs.compare;

import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author likangning
 * @since 2021/3/9 上午8:58
 */
public class CompareTest {

  private class MyReentrantLock extends AbstractQueuedSynchronizer {
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      while (true) {
        int c = getState();
        if (c == 0) {
          if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
          }
        }
      }
    }

    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
  }

  /**
   * 使用AQS框架
   */
  @Test
  public void test1() throws InterruptedException {
    ReentrantLock reentrantLock = new ReentrantLock();
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 2; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < 50000000; j++) {
          reentrantLock.lock();
          doBusiness();
          reentrantLock.unlock();
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("ReentrantLock cost : " + (System.currentTimeMillis() - begin));
  }

  /**
   * 無限重試
   */
  @Test
  public void test2() throws InterruptedException {
    MyReentrantLock myReentrantLock = new MyReentrantLock();
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 2; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < 50000000; j++) {
          myReentrantLock.tryAcquire(1);
          doBusiness();
          myReentrantLock.tryRelease(1);
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("MyReentrantLock cost : " + (System.currentTimeMillis() - begin));
  }

  private void doBusiness() {
    // 空實現,模擬程序快速運行
  }
}

上例,雖然MyReentrantLock繼承了AbstractQueuedSynchronizer,但沒有使用其阻塞隊列。我們每種情況跑5次,看下兩者在耗時層面的表現

耗時1 耗時2 耗時3 耗時4 耗時5 平均耗時(ms)
ReentrantLock 11425 12301 12289 10262 11461 11548
MyReentrantLock 8717 8957 10283 8445 8928 9066

上例只是拿獨占鎖舉例,共享鎖也同理。可以簡單概括為:線程掛起、喚醒的時間占整個加鎖周期比重較大,導致每次掛起、喚醒已經成為一種負擔。當然此處並不是說AQS設計有什么缺陷,只是想表達並沒有一種萬能的框架能應對所有情況,一切都要靠使用者靈活理解、應用

1.3、類結構及如何使用

AQS類內部結構

因后文還會反復涉及,此處僅羅列2點

  • private volatile int state 重要屬性,一般不論是實現獨占鎖還是共享鎖,都要進行CAS操作的字段。獨占鎖時,如果通過cas將其從0改變為1的話,那么標記加鎖成功;而在共享鎖時,則表示支持並發數的最大值
  • isHeldExclusively() 標記是否持有線程;AQS雖然為抽象類,但其繼承了類AbstractOwnableSynchronizer,用來標記加鎖線程,但AQS本身不依賴這個屬性,也不會設置這個屬性,實現類如果需要可以直接重新此方法。一般實現可重入特性需要重寫該方法

而我們常用的鎖並發類,基本上都是AQS的子類或通過組合方式實現,可見AQS在Java並發體系的重要性

至於如何使用,是需要區分子類是想實現獨占鎖還是共享鎖

  • 獨占鎖

    • tryAcquire()
    • tryRelease()
    • isHeldExclusively() -- 可不實現
  • 共享鎖

    • tryAcquireShared()
    • tryReleaseShared()

AQS本身是一個abstract類,將主要並發邏輯進行了封裝,我們定義自己的並發控制類,僅需要實現其中的兩三個方法即可。而在對外(public方法)表現形式上,可依據自己的業務特性來定義;例如Semaphore定義為acquirerelease,而ReentrantLock定義為lockunlock

二、鎖

相信大家經常會被各種各樣鎖的定義搞亂,叫法兒也五花八門,為了后續行文的方便,此章我們把一些鎖概念闡述一下

2.1、獨占鎖

獨占鎖,顧名思義,即在同一時刻,僅允許一個線程執行同步塊代碼。好比一伙兒人想要過河,但只有一根獨木橋,且只能承受一人的重量

JDK支持的典型獨占鎖:ReentrantLockReentrantReadWriteLock

2.2、共享鎖

共享鎖其實是相對獨占鎖而言的,涉及到共享鎖就要聊到並發度,即同一時刻最多允許同時執行線程的數量。上圖所述的並發度為3,即在同一時刻,最多可有3個人在同時過河。

但共享鎖的並發度也可以設置為1,此時它可以看作是獨占鎖

JDK支持的典型獨占鎖:SemaphoreCountDownLatch

2.3、公平鎖

雖然叫做公平鎖,但我們知道任何事情都是相對的,此處也不例外,我們也只能做到相對公平,后文會涉及,此處不再贅述

線程在進入時,首先要檢查阻塞隊列中是否為空,如果發現已有線程在排隊,那么主動添加至隊尾並等待被逐一喚起;如果發現阻塞隊列為空,才會嘗試去獲取資源。公平鎖相對非公平鎖效率較低,通常來講,加鎖時間越短,表現越明顯

2.4、非公平鎖

任何一個剛進入的線程,都會嘗試去獲取資源,釋放資源后,還會通知頭節點去嘗試獲取資源,這樣可能導致飢餓發生,即某一個阻塞隊列中的線程一直得不到調度。

那為什么我們會說,非公平鎖的效率要高於公平鎖呢?假設一個獨占鎖,阻塞隊列中已經有10個線程在排隊,線程A搶到資源並執行完畢后,去喚醒頭結點head,head線程喚醒需要時間,head喚醒后才嘗試去獲取資源,而在整個過程中,沒有線程在執行加鎖代碼

因為線程喚起需要引發用戶態及內核態的切換,故是一個相對比較耗時的操作。

我們再舉一個不恰當的例子:行政部在操場上為同學們辦理業務,因為天氣炎熱,故讓排隊的同學在場邊一個涼亭等待,涼亭距離業務點約300米,且無法直接看到業務點,需要等待上一個辦理完畢的同學來通知。假定平均辦理一個業務耗時約30秒
  • 公平鎖:所有新來辦理業務的同學都被告知去排隊,上一個辦理完業務的同學需要去300米外通知下一個同學,來回600米的路程(線程喚醒)預估耗時2分鍾,在這2分鍾里,因為沒有同學過來辦理業務,業務點處於等待狀態
  • 非公平鎖:新來辦理業務的同學首先看一下業務點是否有人正在在辦理,如果有人正在辦理,那么主動進入排隊,如果辦理點空閑,那么直接開始辦理業務。明顯非公平鎖更高效,隊首的同學接到通知,過來辦理的時間片內,業務點可能已經處理了2個同學的業務

AQS框架是支持公平、非公平兩種模式的,使用者可以根據自身的情況做選擇,而Java中的內置鎖synchronized是非公平鎖

2.5、可重入鎖

即某個線程獲取到鎖后、在釋放鎖之前,再次嘗試獲取鎖,能成功獲取到,不會出現死鎖,便是可重入鎖;需要注意的是,加鎖次數需要跟釋放次數一樣

synchronizedReentrantLock均為可重入鎖

2.6、偏向鎖 / 輕量級鎖 / 重量級鎖

之所以將這三個鎖放在一起論述,是因為它們都是synchronized引入的概念,為了描述流暢,我們把它們放在一起

  • 偏向鎖:JVM設計者發現,在大多數場景中,在同一時刻爭搶synchronized鎖只有一個線程,而且總是被這一個線程反復加鎖、解鎖;故引入偏向鎖,且向對象頭的MarkWord部分中, 標記上線程id,值得一提的是,在線程加鎖結束后,並沒有解鎖的動作,這樣帶來的好處首先是少了一次CAS操作,其次當這個線程再次嘗試加鎖時,僅僅比較MarkWord部分中的線程id與當前線程的id是否一致,如果一致則加鎖成功。偏向鎖因此而得名,它偏向於占有它的線程,對其非常友好。當上一個線程釋放鎖后,如果有另一個線程嘗試加鎖,偏向鎖會重新偏向新的線程。而當一個線程正占有鎖,又有一個新的線程試圖加鎖時,便進入了輕量級鎖
  • 輕量級鎖:所謂輕量級鎖,是針對重量級鎖而言的,這個階段也有人叫自旋鎖。其本質就是不會馬上掛起線程,而是反復重試10(可使用參數-XX:PreBlockSpin來修改)次。因為線程掛起、喚醒也是相當耗時的,在鎖並發不高、加鎖時間短時,采用自旋可以得到更好的效果,具體可以參考1.2章的測試用例
  • 重量級鎖:線程掛起並進入阻塞隊列,等待被喚醒

這3層鎖是逐級膨脹的,且過程不可回逆,即某個鎖一旦進入重量級鎖,便不可回退至輕量級鎖或偏向鎖。雖然synchronized不是本文的重點,但既然提起來了,我們可以把其特性簡單羅列一下

  • synchronized 獨占鎖、非公平鎖、可重入;內部做了很多優化

synchronized鎖的性能究竟如何呢?我們跟AQS框架中的ReentrantLock做個簡單對比

public class SynchronizedAndReentrant {

  private static int THREAD_NUM = 5;

  private static int EXECUTE_COUNT = 30000000;

  /**
   * 模擬ReentrantLock處理業務
   */
  @Test
  public void test() throws InterruptedException {
    ReentrantLock reentrantLock = new ReentrantLock();
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < THREAD_NUM; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < EXECUTE_COUNT; j++) {
          reentrantLock.lock();
          doBusiness();
          reentrantLock.unlock();
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("ReentrantLock cost : " + (System.currentTimeMillis() - begin));
  }

  private void doBusiness() {
  }

  /**
   * 模擬synchronized處理業務
   */
  @Test
  public void test2() throws InterruptedException {
    long begin = System.currentTimeMillis();
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < THREAD_NUM; i++) {
      executorService.submit(() -> {
        for (int j = 0; j < EXECUTE_COUNT; j++) {
          synchronized (SynchronizedAndReentrant.class) {
            doBusiness();
          }
        }
      });
    }
    executorService.shutdown();
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    System.out.println("synchronized cost : " + (System.currentTimeMillis() - begin));
  }

}
耗時1 耗時2 耗時3 耗時4 耗時5 平均耗時(ms)
ReentrantLock 5876 5879 5601 5939 5925 5844
synchronized 5551 5611 5794 5397 5445 5559

在JDK1.8的ConcurrentHashMap中,作者已經將分段鎖摒棄,進而采用synchronized為分桶加鎖。synchronized已日趨成熟,我們應該摒棄對它低性能的偏見,放心大膽地去使用它


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM