Java並發與線程同步


前言

  目前CPU的運算速度已經達到了百億次每秒,所以為了提高生產率和高效地完成任務,基本上都采用多線程和並發的運作方式。

  並發(Concurrency):是指在某個時間段內,多任務交替處理的能力。CPU把可執行時間均勻地分成若干份,每個進程執行一段時間后,記錄當前的工作狀態,

釋放相關的執行資源並進入等待狀態,讓其他線程搶占CPU資源。

  並行(Parallelism):是指同時處理多任務的能力

在並發環境下,由於程序的封閉性被打破,出現了一下特點:

  1、並發程序之間有相互制約的關系。直接制約體現在一個程序需要另一個程序的計算結果;間接體現為多個程序競爭共享資源,如處理器、緩沖區等。

  2、並發程序的執行過程是斷斷續續的。程序需要記憶現場指令及執行點

  3、當並發數設置合理並且CPU擁有足夠的處理能力時,並發會提高程序的運行效率。

 

線程安全

  線程是CPU調度和分派的基本單位,為了更充分地利用CPU資源,一般都會使用多線程進行處理。多線程的作用是提高任務的平均執行速度,但是會導致程序可理解性變差,

編程難度加大。

  同樣,線程數不是越多越好,合適的線程數才能讓CPU資源被充分利用。

 

  線程安全的定義:當多個線程訪問一個對象時,如果不用考慮這些線程在運行時環境下的調度和交替執行,也不需要進行額外的同步,或者在調用方進行任何其他的協調操作

,調用這個對象的行為都可以獲得正確的結果,那這個對象是線程安全的。

  線程可以擁有自己的操作棧、程序計數器、局部變量表等資源,它與同進程內的其他線程共享該進程的所有資源。

  

  線程在生命周期內存在多種狀態。

  有NEW(新建狀態)、RUNNABLE(就緒狀態)、RUNNING(運行狀態)、BLOCKED(阻塞)狀態、DEAD(終止狀態)五種狀態。

  1、NEW,即新建狀態,是線程被創建且未啟動的狀態。創建線程的方式有三種,第一種是繼承自Thread類,第二種是實現Runnable接口。第三種是實現Callable接口。

  推薦使用實現Runnable接口的方式,因為繼承Thread類往往不符合里氏替換原則(任何父類出現的地方都可以用子類替換,子類不要重寫重載父類的方法)。

  Callable與Runnable有兩點不同:

    1):Callable可以通過call()獲得返回值。

    2):call()方法可以拋出異常。而Runnable只有通過setDefaultUncaughtExceptionHandler()的方式才能在主線程中捕捉到子線程異常。

  2、RUNNABLE,即就緒狀態,是調用start()方法后運行之前的狀態。需要注意的是線程的start()不能被多次調用,否則會拋出IllegalStateException異常

  3、RUNNING,即運行狀態,是run()正在執行時線程的狀態。線程可能會由於某些因素而退出RUNNING,如時間、異常、鎖、調度等

  4、BLOCKED,即阻塞狀態,進入此狀態,有以下幾種情況

    同步阻塞:鎖被其他線程占用

    異步阻塞:調用Thread的某些方法,主動讓出CPU執行權,比如sleep()、join()等

    等待阻塞:執行了await()

  5、DEAD,即終止狀態,是run()方法執行結束,或因異常退出后的狀態,此狀態不可逆轉。

 

  jstack看到的線程狀態:

狀態名稱 說明
NEW 初始狀態,線程被創建,但還沒有調用其start()方法
RUNNABLE 運行狀態,Java線程將操作系統中的就緒與運行兩種狀態籠統地稱作“運行中”
BLOCKED 阻塞狀態,表示線程阻塞於鎖
WAITING

等待狀態,表示該線程需要等待其他線程做出一些特定動作(通知或中斷)

TIME_WAITING 超時等待狀態,該狀態不同於WAITING,它在達到超時時間后,將會返回到運行狀態
TERMINATED 終止狀態,表示該線程已經執行完畢

 

 

線程安全的核心理念就是“要么只讀,要么加鎖”

線程安全問題只有在多線程環境下才出現,單線程串行執行不存在此問題。保證高並發場景下的線程安全,可以從以下維度考量:

  1、數據單線程內可見:單線程總是安全的。通過限制數據只在單線程內可見,可以避免數據被其他線程篡改。最典型的就是線程局部變量,它存儲在獨立的

    虛擬機棧幀的局部變量表中,與其他線程毫無瓜葛。ThreadLocal就是采用這種方式來實現線程安全的。

  2、只讀對象:只讀對象總是線程安全的。它的特性是允許復制、拒絕寫入。最典型的只讀對象有String,Integer等。一個對象想要拒絕任何寫入,必須滿足以下條件:

    1):使用final關鍵字修飾類。避免被繼承,如String,調用其的方法不會影響其原來的值,只會返回一個新構造的字符串對象

    2):使用private final 關鍵字避免屬性被中途修改

    3):沒有任何更新方法

    4):返回值不能可變對象為引用

  3、線程安全類:某些線程安全類內部有非常明確的線程安全機制。比如StringBuffer就是一個線程安全類,其內部采用sychronized關鍵字來修飾相關方法

  4、同步與鎖機制:如果想要對某個對象進行並發更新操作,但又不屬於上述三類,需要開發工程師在代碼中實現安全的同步機制。

  合理利用好JDK提供的並發包(java.util.concurrent),並發包主要分為以下幾個類族:

    1):線程同步類,這些類使得線程間的協調更加容易,支持了更加豐富的線程協調場景,逐步淘汰了使用Object類的wait和notify進行同步的方式,主要代表為

      CountDownLatch、Semaphore、CycleBarrier等

    2):並發集合類,如ConcurrentHashMap,它不斷優化,從剛開始的鎖分段到后來的CAS,不斷地提升並發性能。其他還有BlockingQueue、CopyOnWriteArrayList等

    3):線程管理類,如使用Executors靜態工廠或者使用ThreadPoolExecutor來創建線程池等,另外,通過ScheduledExecutorService來執行定時任務

    4):鎖相關類。鎖以Lock為核心,最有名的是ReentrantLock。

 

線程安全的實現方法

  1、互斥同步

  同步是指在多個線程並發訪問共享數據時,保證共享數據在同一時刻只被一個(或者是一些,使用信號量的時候)線程使用。而互斥是實現同步的一種手段,臨界區、互斥量

和信號量都是主要的互斥方式。

  synchronized

  在Java語言中,最基本的互斥同步手段就是synchronized關鍵字,synchronized關鍵字經過編譯之后,會在同步塊的前后分別形成monitorenter和moniterexit這兩個字節碼指令,

這兩個字節碼都需要一個reference類型的參數來指明要鎖定和解鎖的對象。如果Java程序中的synchronized明確指定了對象參數,那就是這個對象的reference;如果沒有明確指定,

那就根據synchronized修飾的是實例方法還是類方法,去取對應的對象實例或者Class對象來作為鎖對象。

  在執行monitorenter指令時,首先要嘗試獲取對象的鎖。如果這個對象沒被鎖定(monitor為0),或者當前線程已經擁有了那個對象的鎖,把鎖的計數器加1,相應的,在執行

monitorexit指令時會將計數器減1,當計數器為0時,鎖就被釋放。如果獲取對象鎖失敗,那當前線程就要阻塞等待,直到對象鎖被另外一個線程釋放為止。

  synchronized同步塊對同一條線程來說是可重入的,不會出現自己把自己鎖死的問題。其次,同步塊在已進入的線程執行完之前,會阻塞后面其他線程的進入。Java中的線程

是映射到操作系統的原生線程之上的,如果要阻塞或喚醒一個線程,都需要操作系統來幫忙完成,這就需要從用戶態轉換到核心態之中,因此狀態轉換需要耗費很多的處理器時間。

對於簡單的同步代碼塊,狀態轉換的操作有可能比用戶代碼執行的時間還要長。所以synchronized是Java語言中的一個重量級的操作。同時虛擬機本身也做了一些優化,譬如在通

知操作系統阻塞線程之前加入一段自旋等待的過程,避免頻繁地切入到核心態中。

  Lock:

  相比synchronized,ReentrantLock增加了一些高級功能,只要有以下3項:等待可中斷、可實現公平鎖、以及鎖可以綁定多個條件。

  等待可中斷:是指當持有鎖的線程長期不釋放鎖的時候,正在等待的線程可以選擇放棄等待,改為處理其他的事情,可中斷特性對處理執行時間非常長的同步塊很有幫助。

  公平鎖:是指多個線程在等待同一個鎖時,必須按照申請鎖的時間順序來依次獲得鎖;而非公平鎖則不保證這一點,在鎖被釋放時,任何一個等待鎖的線程都有機會獲得鎖。

    synchronized中的鎖是非公平的,ReentrantLock默認情況下也是非公平的,但可以通過帶布爾值的構造函數要求使用公平鎖。

  鎖綁定多個條件:是指一個ReentrantLock對象可以同時綁定多個Condition對象,而在synchronized中,鎖對象的wait()和notify()或notifyAll()方法可以實現一個隱含的條件,

    如果要和多於一個的條件關聯的時候,就不得不額外地添加一個鎖,而ReentrantLock則無需這么做,只需要多次調用newCondition()方法即可。

 

  2、非阻塞同步

  互斥同步最主要的問題就是進行線程阻塞和喚醒所帶來的性能問題,因此這種同步也成為阻塞同步。從處理問題的方式上說,互斥同步屬於一種悲觀的並發策略,總是認

為只要不去做正確的同步措施(例如加鎖),那就肯定會出現問題,無論共享數據是否真的會出現競爭,它都要進行加鎖、用戶態核心態轉換、維護鎖計數器和檢查是否有被

阻塞的線程需要喚醒等操作。

  隨着硬件指令集的發展,我們可以選擇:基於沖突檢測的樂觀並發策略,通俗的說,就是先進行操作,如果沒有其他線程爭用共享數據,那操作就成功了;如果共享數據

有爭用,產生了沖突,那就再采用其他的補償措施(常見的補償措施就是不斷地重試,直到成功為止),這種樂觀的並發策略的許多實現都不需要把線程掛起,因此這種同步

操作稱為非阻塞同步。

  為什么使用樂觀並發策略需要”硬件指令集的發展“才能進行呢?因為我們需要操作和沖突檢測這兩個步驟具備原子性,靠什么來保證呢?如果這里使用互斥同步來保證就

失去意義了,所以我們只能靠硬件來完成這件事情,硬件保證一個從語義上看起來需要多次操作的行為只通過一條處理器指令就能完成,這類指令常用的有:

    1)、測試並設置(Test-and-Set)

    2)、獲取並增加(Fetch-and-Increment)

    3)、交換(Swap)

    4)、比較並交換(Compare-and-Swap,CAS)

    5)、加載鏈接/條件存儲(Load_Linked/Store-Conditional,LL/SC)

     其中后面的兩條是現代處理器新增的。

  CAS指令需要3個操作數,分別是內存位置(在Java中可以理解為變量的內存地址,用V表示)、舊的預期值(用A表示)和新值(用B表示)。CAS指令執行時,當且僅當V

符合舊預期值A時,處理器用新值B更新V的值,否則它就不執行更新,但是無論是否更新了V的值,都會返回V的舊值,且上面的處理過程是一個原子操作。

  不過CAS有個邏輯漏洞:如果一個變量V初次讀取的時候是A值,並且在准備賦值的時候檢查到它仍為A值,那我們就能說它的值沒有被其他線程改變過了嗎?如果在這段期

間它的值曾經被改成了B,后來又被改為A,那CAS操作就會誤認為它從來沒有改變過。這個漏洞稱為CAS操作的ABA問題。java.unit.concurrent包為了解決這個問題,提供了一個

帶有標記的原子引用類”AtomicStampReference“,它可以通過控制變量值的版本來保證CAS的正確性。不過這個類目前來說比較雞肋,大部分情況下ABA問題不會影響程序並發

的正確性,如果需要解決ABA問題,改用傳統的互斥同步可能會比原子類更高效。

 

  3、無同步方案

   要保證線程安全,並不是一定要進行同步,兩者沒有因果關系。同步只是保證共享數據爭用時的正確性手段,如果一個方法本來就不涉及共享數據,那它自然就無需任何同

步措施去保證正確性,因此會有一些代碼天生就是線程安全的,比如:

  可重入代碼(Reentrant Code):這種代碼也叫做純代碼(Pure Code),可以在代碼執行的任何時刻中斷它,轉而去執行另外一段代碼(包括遞歸調用它本身),而在控制

權返回后,原來的程序不會出現任何錯誤。...

  線程本地存儲(Thread Local Storage):如果一段代碼中所需要的數據必須與其他代碼共享,那就看看這些共享數據的代碼是否能保證在同一個線程中執行?如果能保證,

我們就可以把共享數據的可見范圍控制在同一個線程之內,這樣,無需同步也能保證線程之間不出現數據爭用的問題。如ThreadLocal類可以實現線程本地存儲的功能。每個線程

的Thread對象中都有一個ThreadLocalMap對象,這個對象存儲了一組以ThreadLocal.threadLocalHashCode為鍵,以本地線程變量為值的K-V鍵值對,ThreadLocal對象就是當前

線程的ThreadLocalMap的訪問入口,每一個ThreadLocal對象都包含了一個獨一無二的threadLocalHashCode值,使用這個值就可以在線程K-V值對中找回對應的本地線程變量。

 

什么是鎖?

    單機單線程時代,沒有鎖的概念。自動出現了資源競爭,人們才意識到需要對部分執行現場進行加鎖,表明自己短暫擁有。計算機中的鎖也從最開始的悲觀鎖,發展到

  后來的樂觀鎖、偏向鎖、分段鎖等。鎖主要提供了兩種特性:互斥性和不可見性。

  1、用並發包中的鎖類

    Lock是頂層接口,它的實現邏輯並未用到synchronized,而是利用了volatile的可見性。ReentrantLock對了Lock接口的實現主要依賴了Sync,而Sync繼承了

    AbstractQueuedSynchronizer(AQS),在AQS中,定義了一個volatile int state 變量作為共享資源。如果線程獲取此共享資源失敗,則進入同步FIFO隊列中等待;

    如果成功獲取資源就執行臨界區代碼。執行完釋放資源時,會通知同步隊列中的等待線程來獲取資源后出對並執行。

    ReentrantLock的lock()方法默認執行的是NonfairSync中的lock()實現,利用Unsafe類的CAS;期望state值為0時將其值設為1,返回是否成功

    因此ReentrantLock的lock()方法只有在state為0時才能獲得鎖,並將state設為1。這樣其他線程就無法獲取鎖,只能等待。

    由於ReentrantLock是可重入鎖,即在獲得鎖的情況下,可以再次獲得鎖。並且線程可以進入任何一個它已經擁有的鎖所同步着的代碼塊。若在沒有釋放鎖的情況下,

    再次獲得鎖,則state加1,在釋放資源時,state減1,因此Lock獲取多少次鎖就要釋放多少次鎖,直到state為0。

  2、利用同步代碼塊

    同步代碼塊一般使用Java的sychronized關鍵字來實現,有兩種方式對方法進行加鎖操作:

      1):第一,在方法簽名處加synchronized關鍵字

      2):第二,使用synchronized(對象或類)進行同步

    這里的原則是鎖的范圍盡可能小,鎖的時間盡可能短,即能鎖對象,就不要鎖類,能鎖代碼塊,就不要鎖方法。

  synchronized鎖特性由JVM負責實現。在JDK的不斷優化迭代中,synchronized鎖的性能得到極大提升,特別是偏向鎖的實現,使得synchronized已經不是昔日那個

  低性能且笨重的鎖了。

  JVM底層是通過監視鎖來實現synchronized同步的。監視鎖即monitor,是每個對象與生俱來的一個隱藏字段。使用synchronized時,JVM會根據synchronized的當前

  使用環境,找到對應的monitor,再根據monitor的狀態進行加、解鎖的判斷(使用monitorenter和monitorexit指令實現)。例如:線程在進入同步方法或者代碼塊時,會獲取

  該方法或代碼塊所屬對象的monitor(在Java對象頭中),進行加鎖判斷。如果成功加鎖就成為該moniter的唯一持有者。monitor在被釋放前,不能被其他線程獲取。

  從字節碼看synchronized鎖的具體實現:

  同步方法的方法元信息中會使用ACC_SYNCHRONIZED標識該方法是一個同步方法。同步代碼塊中會使用monitorenter及monitorexit兩個字節碼指令獲取和釋放monitor。

如果使用monitorenter進入時monitor為0,表示該線程可以持有monitor后續代碼,並將monitor加1;如果當前線程已經持有了monitor,那么monitor繼續加1(可重入);

如果monitor非0,其他線程就會進入阻塞狀態(和Lock的state類似)。

  JVM對synchronized的優化主要在於對monitor的加鎖、解鎖上。JDK6后不斷優化使得synchronized提供三種鎖的實現,包括偏向鎖、輕量級鎖、重量級鎖,還提供自動的

升級和降級機制。JVM就是利用CAS在對象頭上設置線程ID,表示這個對象偏向於當前線程,這就是偏向鎖。

  偏向鎖是為了在資源沒有被多線程競爭的情況下盡量減少鎖帶來的性能開銷。在鎖的對象頭中有一個ThreadId字段,當第一個線程訪問鎖時,如果該鎖沒有被其他線程訪問

過,即ThreadId字段為空,那么JVM讓其持有偏向鎖,並將ThreadId字段設置為該線程的ID。當下一次獲取鎖時,會判斷當前線程的ID是否與鎖對象的ThreadId一致,如果一致

,那么該線程不會再重復獲取鎖,從而提高了程序的運行效率。如果出現鎖的競爭情況,那么偏向鎖會被撤銷並升級為輕量級鎖。如果資源的競爭非常激烈,會升級為重量級鎖。

偏向鎖可以降低競爭開銷,它不是互斥鎖,不存在線程競爭情況,省去了再次判斷的步驟,提升了性能。

 

線程同步:

  計算機的線程同步,就是線程之間按某種機制協調先后次序執行,即當有一個線程在對內存進行操作時,其他線程都不可以對這個內存進行操作,一直等待直到該線程完成操作,其他線程才能對該內存進行操作。

  在多個線程對同一變量進行寫操作時,如果操作沒有原子性,就可能產生臟數據。所謂原子性,是指不可分割的一系列操作指令,在執行完畢前不能被任何其他操作中斷,那么全部執行,要么全部不執行。

如果每個線程對共享變量的修改都是原子操作,就不存在線程同步問題。

  i++操作就不具備原子性,它需要分成三部ILOAD-->IINC-->ISTORE。

  CAS(Compare And Swap)操作具備原子性

  實現線程同步的方式有很多,比如同步方法、鎖、阻塞隊列等。

 

Volatile

  happen-before:先從happen-before了解線程操作的內存可見性。把happen before定義為方法hb(a,b)表示a happen before b。如果hb(a,b)且hb(b,c),那么能夠推導出hb(a,c)。

即如果a在b之前發生,那么a對內存的操作b是可見的,b之后的操作c也是可見的。

  指令優化:計算機並不會根據代碼順序按部就班地執行相關指令。CPU處理信息時會進行指令優化,分析哪些取數據可以合並進行,哪些存數據動作可以合並進行。CPU拜訪

一次遙遠的內存,一定會到處看看,是否可以存取合並,以提高執行效率。

  happen-before是時鍾順序的先后,並不能保證線程交互的可見性。那什么是可見性呢?可見性是指某線程修改共享變量的指令對其他線程來說都是可見的,它反應的

是指令執行的實時透明度。先從Java內存模型說起:每個線程都有獨占的內存區域,如操作棧,本地變量表等。線程本地內存保存了引用變量在堆內存中的副本。線程對

變量的所有操作都在本地內存區域中進行,執行結束后再同步到堆內存(主內存)中去。在這個操作過程中,該線程對副本的操作,對於其他線程都是不可見的。

volatile的英文本義是揮發、不穩定的,延伸意義為敏感的。當使用volatile修飾變量時,意味着任何對此變量的操作都會在主內存中進行,不會產生副本,以保證共享

變量的可見性,局部阻止了指令重排的發生。它只是輕量級的線程操作可見方式,並非同步方式,如果是多寫場景,一定會產生線程安全問題。如果是一寫多讀的並發場景,

使用volatile修飾變量則非常合適。volatile一寫多讀最典型的應用是CopyOnWriteArrayList,它在修改數據時會把整個集合的數據全部復制出來,對寫操作加鎖,修改完成

后,再用setArray()把array指向新的集合。使用volatile可以使線程盡快地感知array的修改,不進行指令重排,操作后即對其他線程可見。

源碼如下:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
 /** The array, accessed only via getArray/setArray. */ 真正存儲元素的數組
    private transient volatile Object[] array;
final void setArray(Object[] a) { array = a; } }

在實際的業務中,如果不確定共享變量是否會被多個線程並發寫,保險的做法是使用同步代碼塊來實現線程同步。

另外,因為所有的操作都需要同步給內存變量,所以volatile一定會使線程的執行速度變量,故要慎重定義和使用volatile屬性

  

  信號量同步

  信號量同步是指在不同的線程之間,通過傳遞同步信號量來協調線程執行的先后次序。基於時間維度的CountDownLatch和基於信號維度的Semaphore。

  CountDownLatch:

  CountDownLatch允許一個或多個線程等待其他線程完成操作

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;
...
}

可以看到其和ReentrantLock類似,都是依賴AQS中的可見性變量state。

  CountDownLatch:倒數計數器,它的內部提供了一個計數器,再構造閉鎖時必須指定計數器的初始值(state),且計數器的初始值必須大於0。另外它還提供了一個countDown方法來操作計數器的值,(在子線程中)每調用一次countDown方法計數器會減1,直到計數器的值減為0(類似於獲取到了鎖),所有因調用await方法而阻塞的線程都會被喚醒。

 

  適用於比如日常開發中經常會遇到需要在主線程中開啟多線程去並行執行任務,並且主線程需要等待所有子線程執行完畢后再進行匯總的場景

  

  如一個百萬量級的任務量交給線程池去執行,避免一次將全部任務丟給線程池,導致線程池沒有空閑線程任務被拒絕;按批次執行任務,每批次執行幾千或一萬,執行完當前批次再執行下一批次:

        for (int i = 1; i <= pageCount; i++) {
                PageInfo pageInfo = new PageInfo((i-1)*pageSize, pageSize);
                // 查詢到一頁然后進行處理
                List<Info> infos = infoDao.queryInfoByPage(infoVo, pageInfo);
                // 同步執行這一頁
                // 當前頁的大小
                int size = infos.size();
                final CountDownLatch countDownLatch = new CountDownLatch(size);
                for (final Info info : infos) {
                    // 交給線程池去執行
                    EXECUTORS.execute(new MDCRunnable(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 執行業務邏輯
                               ...
                            } catch (Exception e) {
                                LOGGER.error( "續費檢查異常" + e.getMessage(), e);
                            } finally {
                                // 無論執行結果如何都要countDown,避免影響后續的續費檢查
                                countDownLatch.countDown();
                            }
                        }
                    }));
                }
                // 等待線程池執行完一頁的自動續費檢查
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    LOGGER.error("await一頁續費檢查異常" + e.getMessage(), e);
                }
            }                

 

  Semaphore:

  CountDownLatch是基於計數的同步類。在實際編碼中,可能需要處理基於空閑信號的同步情況。 

  控制並發線程數的Semaphore,是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。源碼:

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }
    ...
    }
  // 默認使用非公平鎖
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
  // 構造方法
public Semaphore(int permits) { sync = new NonfairSync(permits); } ... }

使用Semaphore的構造方法指定同時處理的線程的數量,只有在調用Semaphore的acquire()成功后,才可以往下執行,完成后執行release()釋放持有的信號量,下一個線程就可以馬上獲取這個

空閑信號量進入執行。

Semaphore的release()和CountDownLatch的countDown方法相同。

acquire()方法在直到有一個信號量空閑時,才會執行后續的代碼,否則,將一直阻塞。可以理解為Semaphore允許有創建對象時在構造中指定的鎖的數量,當鎖有空閑時,線程就可以拿到

鎖,否則將一直等待。拿到鎖的線程執行完畢后釋放鎖。

countDown和release都是使state減1。

 

  Semaphore可以用於做流量控制,特別是公用資源有限的應用場景,比如數據庫連接,只允許10個線程並發執行操作數據庫

 

用途設想:

  固定100個線程執行百萬級的任務,每個線程執行完就去任務池中去取一條任務執行。使用例子(實測任務非常多時性能不高):

        // 設定30個信號量,可以看成30個服務窗口
        final Semaphore semaphore=new Semaphore(30);
        for (final Info info : infos) {
            new MDCRunnable(new Runnable() {
                @Override
                public void run() {
                    try {
                        // acquire拿到信號量后才會往下執行
                        semaphore.acquire();
                        ...
                        LOGGER.info("done");
                    } catch (Exception e) {
                        LOGGER.error("執行異常" + e.getMessage(), e);
                    } finally {
                        // 無論執行結果如何都要釋放持有的信號量
                        semaphore.release();
                    }
                }
            }).run();
        } 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM