從JVM源碼看synchronized,公平鎖和非公平鎖介紹,為什么要“非公平”?


索引

  • synchronized的使用
    • 修飾實例方法
    • 修飾靜態方法
    • 修飾代碼塊
    • 總結
  • Synchronzied的底層原理
    • 對象頭和內置鎖(ObjectMonitor)
    • synchronzied的底層原理
  • synchronized的優化
    • 偏向鎖
    • 輕量級鎖
    • 輕量級鎖膨脹
    • 重量級鎖
    • 自旋
    • 編譯期間鎖優化
  • 總結
  • 參考資料

 

synchronized的使用

synchronized關鍵字是Java中解決並發問題的一種常用方法,也是最簡單的一種方法,其作用有三個:(1)互斥性:確保線程互斥的訪問同步代碼(2)可見性:保證共享變量的修改能夠及時可見(3)有序性:有效解決重排序問題,其用法也有三個:

  1. 修飾實例方法
  2. 修飾靜態方法
  3. 修飾代碼塊

修飾實例方法

  1. public class Thread1 implements Runnable{
  2. //共享資源(臨界資源)
  3. static int i=0;
  4.  
  5. //如果沒有synchronized關鍵字,輸出小於20000
  6. public synchronized void increase(){
  7. i++;
  8. }
  9. public void run() {
  10. for(int j=0;j<10000;j++){
  11. increase();
  12. }
  13. }
  14. public static void main(String[] args) throws InterruptedException {
  15. Thread1 t= new Thread1();
  16. Thread t1= new Thread(t);
  17. Thread t2= new Thread(t);
  18. t1. start();
  19. t2. start();
  20. t1. join();//主線程等待t1執行完畢
  21. t2. join();//主線程等待t2執行完畢
  22. System.out. println(i);
  23. }
  24. /**
  25. * 輸出結果:
  26. * 20000
  27. */
  28. }

修飾靜態方法

  1. public class Thread1 {
  2. //共享資源(臨界資源)
  3. static int i = 0;
  4.  
  5. //如果沒有synchronized關鍵字,輸出小於20000
  6. public static synchronized void increase() {
  7. i++;
  8. }
  9.  
  10. public static void main(String[] args) throws InterruptedException {
  11. Thread t1 = new Thread(new Runnable() {
  12. public void run() {
  13. for (int j = 0; j < 10000; j++) {
  14. increase();
  15. }
  16. }
  17. });
  18. Thread t2 = new Thread(new Runnable() {
  19. @Override
  20. public void run() {
  21. for (int j = 0; j < 10000; j++) {
  22. increase();
  23. }
  24. }
  25. });
  26. t1.start();
  27. t2.start();
  28. t1. join();//主線程等待t1執行完畢
  29. t2. join();//主線程等待t2執行完畢
  30. System. out.println(i);
  31. }
  32. /**
  33. * 輸出結果:
  34. * 20000
  35. */
  36. }

修飾代碼塊

  1. public class Thread1 implements Runnable{
  2. //共享資源(臨界資源)
  3. static int i=0;
  4.  
  5.  
  6. @Override
  7. public void run() {
  8. for(int j=0;j<10000;j++){
  9. //獲得了String的類鎖
  10. synchronized (String. class){
  11. i++;}
  12. }
  13. }
  14. public static void main(String[] args) throws InterruptedException {
  15. Thread1 t= new Thread1();
  16. Thread t1= new Thread(t);
  17. Thread t2= new Thread(t);
  18. t1.start();
  19. t2.start();
  20. t1. join();
  21. t2. join();
  22. System. out.println(i);
  23. }
  24. /**
  25. * 輸出結果:
  26. * 20000
  27. */
  28. }

總結

  1. synchronized修飾的實例方法,多線程並發訪問時,只能有一個線程進入,獲得對象內置鎖,其他線程阻塞等待,但在此期間線程仍然可以訪問其他方法。
  2. synchronized修飾的靜態方法,多線程並發訪問時,只能有一個線程進入,獲得類鎖,其他線程阻塞等待,但在此期間線程仍然可以訪問其他方法。
  3. synchronized修飾的代碼塊,多線程並發訪問時,只能有一個線程進入,根據括號中的對象或者是類,獲得相應的對象內置鎖或者是類鎖
  4. 每個類都有一個類鎖,類的每個對象也有一個內置鎖,它們是互不干擾的,也就是說一個線程可以同時獲得類鎖和該類實例化對象的內置鎖,當線程訪問非synchronzied修飾的方法時,並不需要獲得鎖,因此不會產生阻塞。

Synchronzied的底層原理

對象頭和內置鎖(ObjectMonitor)

根據jvm的分區,對象分配在堆內存中,可以用下圖表示:

對象頭

Hotspot虛擬機的對象頭包括兩部分信息,第一部分用於儲存對象自身的運行時數據,如哈希碼,GC分代年齡,鎖狀態標志,鎖指針等,這部分數據在32bit和64bit的虛擬機中大小分別為32bit和64bit,官方稱它為"Mark word",考慮到虛擬機的空間效率,Mark Word被設計成一個非固定的數據結構以便在極小的空間中存儲盡量多的信息,它會根據對象的狀態復用自己的存儲空間,詳細情況如下圖:

對象頭的另外一部分是類型指針,即對象指向它的類元數據的指針,如果對象訪問定位方式是句柄訪問,那么該部分沒有,如果是直接訪問,該部分保留。句柄訪問方式如下圖:

直接訪問如下圖:

內置鎖(ObjectMonitor)

通常所說的對象的內置鎖,是對象頭Mark Word中的重量級鎖指針指向的monitor對象,該對象是在HotSpot底層C++語言編寫的(openjdk里面看),簡單看一下代碼:

  1. //結構體如下
  2. ObjectMonitor::ObjectMonitor() {
  3. _header = NULL;
  4. _count = 0;
  5. _waiters = 0,
  6. _recursions = 0; //線程的重入次數
  7. _object = NULL;
  8. _owner = NULL; //標識擁有該monitor的線程
  9. _WaitSet = NULL; //等待線程組成的雙向循環鏈表,_WaitSet是第一個節點
  10. _WaitSetLock = 0 ;
  11. _Responsible = NULL ;
  12. _succ = NULL ;
  13. _cxq = NULL ; //多線程競爭鎖進入時的單向鏈表
  14. FreeNext = NULL ;
  15. _EntryList = NULL ; //處於等待鎖block狀態的線程,會被加入到該列表,_EntryList是第一個節點
  16. _SpinFreq = 0 ;
  17. _SpinClock = 0 ;
  18. OwnerIsThread = 0 ;
  19. }

① _owner:初始時為NULL。當有線程占有該monitor時,_owner標記為該線程的唯一標識。當線程釋放monitor時,_owner又恢復為NULL。_owner是一個臨界資源,JVM是通過CAS操作來保證其線程安全的。
② _cxq:競爭隊列,所有請求鎖的線程首先會被放在這個隊列中(單向鏈接)。_cxq是一個臨界資源,JVM通過CAS原子指令來修改_cxq隊列。修改前_cxq的舊值填入了node的next字段,_cxq指向新值(新線程)。因此_cxq是一個后進先出的stack(棧)。
③ _EntryList:候選隊列,_cxq隊列中有資格成為候選資源的線程會被移動到該隊列中
④ _WaitSet:因為調用wait方法而被阻塞的線程會被放在該隊列中

ObjectMonitor隊列之間的關系轉換可以用下圖表示:

既然提到了_WaitSet和_EntryList(_cxq隊列后面會說),那就看一下底層的wait和notify方法
wait方法的實現過程:

  1. //1.調用ObjectSynchronizer::wait方法
  2. void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  3. /*省略 */
  4. //2.獲得Object的monitor對象(即內置鎖)
  5. ObjectMonitor* monitor = ObjectSynchronizer:: inflate(THREAD, obj());
  6. DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  7. //3.調用monitor的wait方法
  8. monitor-> wait(millis, true, THREAD);
  9. /*省略*/
  10. }
  11. //4.在wait方法中調用addWaiter方法
  12. inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  13. /*省略*/
  14. if (_WaitSet == NULL) {
  15. //_WaitSet為null,就初始化_waitSet
  16. _WaitSet = node;
  17. node->_prev = node;
  18. node->_next = node;
  19. } else {
  20. //否則就尾插
  21. ObjectWaiter* head = _WaitSet ;
  22. ObjectWaiter* tail = head->_prev;
  23. assert(tail->_next == head, "invariant check");
  24. tail->_next = node;
  25. head->_prev = node;
  26. node->_next = head;
  27. node->_prev = tail;
  28. }
  29. }
  30. //5.然后在ObjectMonitor::exit釋放鎖,接着 thread_ParkEvent->park 也就是wait

總結:通過object獲得內置鎖(objectMonitor),通過內置鎖將Thread封裝成ObjectWaiter對象,然后addWaiter將它插入以_WaitSet為首結點的等待線程鏈表中去,最后釋放鎖。

notify方法的底層實現

  1. //1.調用ObjectSynchronizer::notify方法
  2. void ObjectSynchronizer::notify(Handle obj, TRAPS) {
  3. /*省略*/
  4. //2.調用ObjectSynchronizer::inflate方法
  5. ObjectSynchronizer:: inflate(THREAD, obj())->notify(THREAD);
  6. }
  7. //3.通過inflate方法得到ObjectMonitor對象
  8. ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  9. /*省略*/
  10. if (mark->has_monitor()) {
  11. ObjectMonitor * inf = mark-> monitor() ;
  12. assert (inf->header()->is_neutral(), "invariant");
  13. assert (inf->object() == object, "invariant") ;
  14. assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
  15. return inf
  16. }
  17. /*省略*/
  18. }
  19. //4.調用ObjectMonitor的notify方法
  20. void ObjectMonitor::notify(TRAPS) {
  21. /*省略*/
  22. //5.調用DequeueWaiter方法移出_waiterSet第一個結點
  23. ObjectWaiter * iterator = DequeueWaiter() ;
  24. //6.后面省略是將上面DequeueWaiter尾插入_EntrySet的操作
  25. /**省略*/
  26. }

總結:通過object獲得內置鎖(objectMonitor),調用內置鎖的notify方法,通過_WaitSet結點移出等待鏈表中的首結點,將它置於_EntrySet中去,等待獲取鎖。注意:notifyAll根據policy不同可能移入_EntryList或者_cxq隊列中,此處不詳談。

在了解對象頭和ObjectMonitor后,接下來我們結合分析synchronzied的底層實現。

synchronzied的底層原理

synchronized修飾代碼塊

通過下列簡介的代碼來分析:

  1. public class test{
  2. public void testSyn(){
  3. synchronized(this){
  4. }
  5. }
  6. }

javac編譯,javap -verbose反編譯,結果如下:

  1. /**
  2. * ...
  3. **/
  4. public void testSyn();
  5. descriptor: ()V
  6. flags: ACC_PUBLIC
  7. Code:
  8. stack= 2, locals=3, args_size=1
  9. 0: aload_0
  10. 1: dup
  11. 2: astore_1
  12. 3: monitorenter //申請獲得對象的內置鎖
  13. 4: aload_1
  14. 5: monitorexit //釋放對象內置鎖
  15. 6: goto 14
  16. 9: astore_2
  17. 10: aload_1
  18. 11: monitorexit //釋放對象內置鎖
  19. 12: aload_2
  20. 13: athrow
  21. 14: return

此處我們只討論了重量級鎖(ObjectMonitor)的獲取情況,其他鎖的獲取放在后面synchronzied的優化中進行說明。源碼如下:

  1. void ATTR ObjectMonitor::enter(TRAPS) {
  2. Thread * const Self = THREAD ;
  3. void * cur ;
  4. //通過CAS操作嘗試把monitor的_owner字段設置為當前線程
  5. cur = Atomic::cmpxchg_ptr ( Self, &_owner, NULL) ;
  6. //獲取鎖失敗
  7. if (cur == NULL) {
  8. assert (_recursions == 0 , "invariant") ;
  9. assert (_owner == Self, "invariant") ;
  10. return ;
  11. }
  12.  
  13. //如果之前的_owner指向該THREAD,那么該線程是重入,_recursions++
  14. if (cur == Self) {
  15. _recursions ++ ;
  16. return ;
  17. }
  18. //如果當前線程是第一次進入該monitor,設置_recursions為1,_owner為當前線程
  19. if (Self->is_lock_owned ((address)cur)) {
  20. assert (_recursions == 0, "internal state error");
  21. _recursions = 1 ; //_recursions標記為1
  22. _owner = Self ; //設置owner
  23. OwnerIsThread = 1 ;
  24. return ;
  25. }
  26. /**
  27. *此處省略鎖的自旋優化等操作,統一放在后面synchronzied優化中說
  28. **/

總結:

  1. 如果monitor的進入數為0,則該線程進入monitor,然后將進入數設置為1,該線程即為monitor的owner
  2. 如果線程已經占有該monitor,只是重新進入,則進入monitor的進入數加1.
  3. 如果其他線程已經占用了monitor,則該線程進入阻塞狀態,直到monitor的進入數為0,再重新嘗試獲取monitor的所有權

synchronized修飾方法

還是從簡潔的代碼來分析:

  1. public class test{
  2. public synchronized void testSyn(){
  3. }
  4. }

javac編譯,javap -verbose反編譯,結果如下:

  1. /**
  2. * ...
  3. **/
  4. public synchronized void testSyn();
  5. descriptor: ()V
  6. flags: ACC_PUBLIC, ACC_SYNCHRONIZED
  7. Code:
  8. stack= 0, locals=1, args_size=1
  9. 0: return
  10. LineNumberTable:
  11. line 3: 0

結果和synchronized修飾代碼塊的情況不同,仔細比較會發現多了ACC_SYNCHRONIZED這個標識,test.java通過javac編譯形成的test.class文件,在該文件中包含了testSyn方法的方法表,其中ACC_SYNCHRONIZED標志位是1,當線程執行方法的時候會檢查該標志位,如果為1,就自動的在該方法前后添加monitorenter和monitorexit指令,可以稱為monitor指令的隱式調用。

上面所介紹的通過synchronzied實現同步用到了對象的內置鎖(ObjectMonitor),而在ObjectMonitor的函數調用中會涉及到Mutex lock等特權指令,那么這個時候就存在操作系統用戶態和核心態的轉換,這種切換會消耗大量的系統資源,因為用戶態與內核態都有各自專用的內存空間,專用的寄存器等,用戶態切換至內核態需要傳遞給許多變量、參數給內核,內核也需要保護好用戶態在切換時的一些寄存器值、變量等,這也是為什么早期的synchronized效率低的原因。在jdk1.6之后,從jvm層面做了很大的優化,下面主要介紹做了哪些優化。

synchronized的優化

在了解了synchronized重量級鎖效率特別低之后,jdk自然做了一些優化,出現了偏向鎖,輕量級鎖,重量級鎖,自旋等優化,我們應該改正monitorenter指令就是獲取對象重量級鎖的錯誤認識,很顯然,優化之后,鎖的獲取判斷次序是偏向鎖->輕量級鎖->重量級鎖。

偏向鎖

源碼如下:

  1. //偏向鎖入口
  2. void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
  3. //UseBiasedLocking判斷是否開啟偏向鎖
  4. if (UseBiasedLocking) {
  5. if (!SafepointSynchronize::is_at_safepoint()) {
  6. //獲取偏向鎖的函數調用
  7. BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
  8. if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
  9. return;
  10. }
  11. } else {
  12. assert(!attempt_rebias, "can not rebias toward VM thread");
  13. BiasedLocking::revoke_at_safepoint(obj);
  14. }
  15. }
  16. //不能偏向,就獲取輕量級鎖
  17. slow_enter (obj, lock, THREAD) ;
  18. }

BiasedLocking::revoke_and_rebias調用過程如下流程圖:

偏向鎖的撤銷過程如下:

 

輕量級鎖

輕量級鎖獲取源碼:

  1. //輕量級鎖入口
  2. void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  3. markOop mark = obj->mark(); //獲得Mark Word
  4. assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  5. //是否無鎖不可偏向,標志001
  6. if (mark->is_neutral()) {
  7. //圖A步驟1
  8. lock->set_displaced_header(mark);
  9. //圖A步驟2
  10. if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
  11. TEVENT (slow_enter: release stacklock) ;
  12. return ;
  13. }
  14. // Fall through to inflate() ...
  15. } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { //如果Mark Word指向本地棧幀,線程重入
  16. assert(lock != mark->locker(), "must not re-lock the same lock");
  17. assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
  18. lock->set_displaced_header( NULL);//header設置為null
  19. return;
  20. }
  21. lock->set_displace
  22.  
  23. d_header(markOopDesc::unused_mark());
  24. //輕量級鎖膨脹,膨脹完成之后嘗試獲取重量級鎖
  25. ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
  26. }

輕量級鎖獲取流程如下:

輕量級鎖撤銷源碼:

  1. void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  2. assert(! object->mark()->has_bias_pattern(), "should not see bias pattern here");
  3. markOop dhw = lock->displaced_header();
  4. markOop mark ;
  5. if (dhw == NULL) {//如果header為null,說明這是線程重入的棧幀,直接返回,不用回寫
  6. mark = object->mark() ;
  7. assert (!mark->is_neutral(), "invariant") ;
  8. if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
  9. assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
  10. }
  11. if (mark->has_monitor()) {
  12. ObjectMonitor * m = mark->monitor() ;
  13. }
  14. return ;
  15. }
  16.  
  17. mark = object->mark() ;
  18. if (mark == (markOop) lock) {
  19. assert (dhw->is_neutral(), "invariant") ;
  20. //CAS將Mark Word內容寫回
  21. if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
  22. TEVENT (fast_exit: release stacklock) ;
  23. return;
  24. }
  25. }
  26. //CAS操作失敗,輕量級鎖膨脹,為什么在撤銷鎖的時候會有失敗的可能?
  27. ObjectSynchronizer::inflate(THREAD, object)->exit (THREAD) ;
  28. }

輕量級鎖撤銷流程如下:

 

輕量級鎖膨脹

源代碼:

  1. ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
  2. assert (Universe::verify_in_progress() ||
  3. !SafepointSynchronize::is_at_safepoint(), "invariant") ;
  4. for (;;) { // 為后面的continue操作提供自旋
  5. const markOop mark = object->mark() ; //獲得Mark Word結構
  6. assert (!mark->has_bias_pattern(), "invariant") ;
  7.  
  8. //Mark Word可能有以下幾種狀態:
  9. // * Inflated(膨脹完成) - just return
  10. // * Stack-locked(輕量級鎖) - coerce it to inflated
  11. // * INFLATING(膨脹中) - busy wait for conversion to complete
  12. // * Neutral(無鎖) - aggressively inflate the object.
  13. // * BIASED(偏向鎖) - Illegal. We should never see this
  14.  
  15. if (mark->has_monitor()) {//判斷是否是重量級鎖
  16. ObjectMonitor * inf = mark->monitor() ;
  17. assert (inf->header()->is_neutral(), "invariant");
  18. assert (inf->object() == object, "invariant") ;
  19. assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
  20. //Mark->has_monitor()為true,說明已經是重量級鎖了,膨脹過程已經完成,返回
  21. return inf ;
  22. }
  23. if (mark == markOopDesc::INFLATING()) { //判斷是否在膨脹
  24. TEVENT (Inflate: spin while INFLATING) ;
  25. ReadStableMark( object) ;
  26. continue ; //如果正在膨脹,自旋等待膨脹完成
  27. }
  28.  
  29. if (mark->has_locker()) { //如果當前是輕量級鎖
  30. ObjectMonitor * m = omAlloc ( Self) ;//返回一個對象的內置ObjectMonitor對象
  31. m->Recycle();
  32. m->_Responsible = NULL ;
  33. m->OwnerIsThread = 0 ;
  34. m->_recursions = 0 ;
  35. m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; //設置自旋獲取重量級鎖的次數
  36. //CAS操作標識Mark Word正在膨脹
  37. markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
  38. if (cmp != mark) {
  39. omRelease ( Self, m, true) ;
  40. continue ; //如果上述CAS操作失敗,自旋等待膨脹完成
  41. }
  42. m->set_header(dmw) ;
  43. m->set_owner(mark->locker()); //設置ObjectMonitor的_owner為擁有對象輕量級鎖的線程,而不是當前正在inflate的線程
  44. m->set_object( object);
  45. /**
  46. *省略了部分代碼
  47. **/
  48. return m ;
  49. }
  50. }
  51. }

輕量級鎖膨脹流程圖:

現在來回答下之前提出的問題:為什么在撤銷輕量級鎖的時候會有失敗的可能?
假設thread1擁有了輕量級鎖,Mark Word指向thread1棧幀,thread2請求鎖的時候,就會膨脹初始化ObjectMonitor對象,將Mark Word更新為指向ObjectMonitor的指針,那么在thread1退出的時候,CAS操作會失敗,因為Mark Word不再指向thread1的棧幀,這個時候thread1自旋等待infalte完畢,執行重量級鎖的退出操作

重量級鎖

重量級鎖的獲取入口:

  1. void ATTR ObjectMonitor::enter(TRAPS) {
  2. Thread * const Self = THREAD ;
  3. void * cur ;
  4. cur = Atomic::cmpxchg_ptr ( Self, &_owner, NULL) ;
  5. if (cur == NULL) {
  6. assert (_recursions == 0 , "invariant") ;
  7. assert (_owner == Self, "invariant") ;
  8. return ;
  9. }
  10.  
  11. if (cur == Self) {
  12. _recursions ++ ;
  13. return ;
  14. }
  15.  
  16. if (Self->is_lock_owned ((address)cur)) {
  17. assert (_recursions == 0, "internal state error");
  18. _recursions = 1 ;
  19. // Commute owner from a thread-specific on-stack BasicLockObject address to
  20. // a full-fledged "Thread *".
  21. _owner = Self ;
  22. OwnerIsThread = 1 ;
  23. return ;
  24. }
  25. /**
  26. *上述部分在前面已經分析過,不再累述
  27. **/
  28.  
  29. Self->_Stalled = intptr_t(this) ;
  30. //TrySpin是一個自旋獲取鎖的操作,此處就不列出源碼了
  31. if (Knob_SpinEarly && TrySpin (Self) > 0) {
  32. Self->_Stalled = 0 ;
  33. return ;
  34. }
  35. /*
  36. *省略部分代碼
  37. */
  38. for (;;) {
  39. EnterI (THREAD) ;
  40. /**
  41. *省略了部分代碼
  42. **/
  43. }
  44. }

進入EnterI (TRAPS)方法(這段代碼個人覺得很有意思):

  1. void ATTR ObjectMonitor::EnterI (TRAPS) {
  2. Thread * Self = THREAD ;
  3. if (TryLock (Self) > 0) {
  4. //這下不自旋了,我就默默的TryLock一下
  5. return ;
  6. }
  7.  
  8. DeferredInitialize () ;
  9. //此處又有自旋獲取鎖的操作
  10. if (TrySpin (Self) > 0) {
  11. return ;
  12. }
  13. /**
  14. *到此,自旋終於全失敗了,要入隊掛起了
  15. **/
  16. ObjectWaiter node( Self) ; //將Thread封裝成ObjectWaiter結點
  17. Self->_ParkEvent->reset() ;
  18. node._prev = (ObjectWaiter *) 0xBAD ;
  19. node.TState = ObjectWaiter::TS_CXQ ;
  20. ObjectWaiter * nxt ;
  21. for (;;) { //循環,保證將node插入隊列
  22. node._next = nxt = _cxq ; //將node插入到_cxq隊列的首部
  23. //CAS修改_cxq指向node
  24. if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
  25. if (TryLock (Self) > 0) {//我再默默的TryLock一下,真的是不想掛起呀!
  26. return ;
  27. }
  28. }
  29. if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
  30. // Try to assume the role of responsible thread for the monitor.
  31. // CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
  32. Atomic::cmpxchg_ptr ( Self, &_Responsible, NULL) ;
  33. }
  34. TEVENT (Inflated enter - Contention) ;
  35. int nWakeups = 0 ;
  36. int RecheckInterval = 1 ;
  37.  
  38. for (;;) {
  39. if (TryLock (Self) > 0) break ;//臨死之前,我再TryLock下
  40.  
  41. if ((SyncFlags & 2) && _Responsible == NULL) {
  42. Atomic::cmpxchg_ptr ( Self, &_Responsible, NULL) ;
  43. }
  44. if (_Responsible == Self || (SyncFlags & 1)) {
  45. TEVENT (Inflated enter - park TIMED) ;
  46. Self->_ParkEvent->park ((jlong) RecheckInterval) ;
  47. RecheckInterval *= 8 ;
  48. if (RecheckInterval > 1000) RecheckInterval = 1000 ;
  49. } else {
  50. TEVENT (Inflated enter - park UNTIMED) ;
  51. Self->_ParkEvent->park() ; //終於掛起了
  52. }
  53.  
  54. if (TryLock(Self) > 0) break ;
  55. /**
  56. *后面代碼省略
  57. **/
  58. }

try了那么多次lock,接下來看下TryLock:

  1. int ObjectMonitor::TryLock (Thread * Self) {
  2. for (;;) {
  3. void * own = _owner ;
  4. if (own != NULL) return 0 ;//如果有線程還擁有着重量級鎖,退出
  5. //CAS操作將_owner修改為當前線程,操作成功return>0
  6. if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
  7. return 1 ;
  8. }
  9. //CAS更新失敗return<0
  10. if (true) return -1 ;
  11. }
  12. }

重量級鎖獲取入口流程圖:

重量級鎖的出口:

  1. void ATTR ObjectMonitor::exit(TRAPS) {
  2. Thread * Self = THREAD ;
  3. if (THREAD != _owner) {
  4. if (THREAD->is_lock_owned((address) _owner)) {
  5. _owner = THREAD ;
  6. _recursions = 0 ;
  7. OwnerIsThread = 1 ;
  8. } else {
  9. TEVENT ( Exit - Throw IMSX) ;
  10. if (false) {
  11. THROW(vmSymbols::java_lang_IllegalMonitorStateException());
  12. }
  13. return;
  14. }
  15. }
  16. if (_recursions != 0) {
  17. _recursions--; // 如果_recursions次數不為0.自減
  18. TEVENT (Inflated exit - recursive) ;
  19. return ;
  20. }
  21. if ((SyncFlags & 4) == 0) {
  22. _Responsible = NULL ;
  23. }
  24.  
  25. for (;;) {
  26. if (Knob_ExitPolicy == 0) {
  27. OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
  28. OrderAccess::storeload() ;
  29. if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
  30. TEVENT (Inflated exit - simple egress) ;
  31. return ;
  32. }
  33. TEVENT (Inflated exit - complex egress) ;
  34. if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
  35. return ;
  36. }
  37. TEVENT ( Exit - Reacquired) ;
  38. } else {
  39. if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
  40. OrderAccess::release_store_ptr (&_owner, NULL) ;
  41. OrderAccess::storeload() ;
  42. if (_cxq == NULL || _succ != NULL) {
  43. TEVENT (Inflated exit - simple egress) ;
  44. return ;
  45. }
  46. if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
  47. TEVENT (Inflated exit - reacquired succeeded) ;
  48. return ;
  49. }
  50. TEVENT (Inflated exit - reacquired failed) ;
  51. } else {
  52. TEVENT (Inflated exit - complex egress) ;
  53. }
  54. }
  55. ObjectWaiter * w = NULL ;
  56. int QMode = Knob_QMode ;
  57. if (QMode == 2 && _cxq != NULL) {
  58. /**
  59. *模式2:cxq隊列的優先權大於EntryList,直接從cxq隊列中取出一個線程結點,准備喚醒
  60. **/
  61. w = _cxq ;
  62. ExitEpilog ( Self, w) ;
  63. return ;
  64. }
  65.  
  66. if (QMode == 3 && _cxq != NULL) {
  67. /**
  68. *模式3:將cxq隊列插入到_EntryList尾部
  69. **/
  70. w = _cxq ;
  71. for (;;) {
  72. //CAS操作取出cxq隊列首結點
  73. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr ( NULL, &_cxq, w) ;
  74. if (u == w) break ;
  75. w = u ; //更新w,自旋
  76. }
  77. ObjectWaiter * q = NULL ;
  78. ObjectWaiter * p ;
  79. for (p = w ; p != NULL ; p = p->_next) {
  80. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  81. p->TState = ObjectWaiter::TS_ENTER ; //改變ObjectWaiter狀態
  82. //下面兩句為cxq隊列反向構造一條鏈,即將cxq變成雙向鏈表
  83. p->_prev = q ;
  84. q = p ;
  85. }
  86. ObjectWaiter * Tail ;
  87. //獲得_EntryList尾結點
  88. for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
  89. if (Tail == NULL) {
  90. _EntryList = w ; //_EntryList為空,_EntryList=w
  91. } else {
  92. //將w插入_EntryList隊列尾部
  93. Tail->_next = w ;
  94. w->_prev = Tail ;
  95. }
  96. }
  97.  
  98. if (QMode == 4 && _cxq != NULL) {
  99. /**
  100. *模式四:將cxq隊列插入到_EntryList頭部
  101. **/
  102. w = _cxq ;
  103. for (;;) {
  104. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr ( NULL, &_cxq, w) ;
  105. if (u == w) break ;
  106. w = u ;
  107. }
  108. ObjectWaiter * q = NULL ;
  109. ObjectWaiter * p ;
  110. for (p = w ; p != NULL ; p = p->_next) {
  111. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  112. p->TState = ObjectWaiter::TS_ENTER ;
  113. p->_prev = q ;
  114. q = p ;
  115. }
  116. if (_EntryList != NULL) {
  117. //q為cxq隊列最后一個結點
  118. q->_next = _EntryList ;
  119. _EntryList->_prev = q ;
  120. }
  121. _EntryList = w ;
  122. }
  123.  
  124. w = _EntryList ;
  125. if (w != NULL) {
  126. ExitEpilog ( Self, w) ;//從_EntryList中喚醒線程
  127. return ;
  128. }
  129. w = _cxq ;
  130. if (w == NULL) continue ; //如果_cxq和_EntryList隊列都為空,自旋
  131.  
  132. for (;;) {
  133. //自旋再獲得cxq首結點
  134. ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr ( NULL, &_cxq, w) ;
  135. if (u == w) break ;
  136. w = u ;
  137. }
  138. /**
  139. *下面執行的是:cxq不為空,_EntryList為空的情況
  140. **/
  141. if (QMode == 1) {//結合前面的代碼,如果QMode == 1,_EntryList不為空,直接從_EntryList中喚醒線程
  142. // QMode == 1 : drain cxq to EntryList, reversing order
  143. // We also reverse the order of the list.
  144. ObjectWaiter * s = NULL ;
  145. ObjectWaiter * t = w ;
  146. ObjectWaiter * u = NULL ;
  147. while (t != NULL) {
  148. guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
  149. t->TState = ObjectWaiter::TS_ENTER ;
  150. //下面的操作是雙向鏈表的倒置
  151. u = t->_next ;
  152. t->_prev = u ;
  153. t->_next = s ;
  154. s = t;
  155. t = u ;
  156. }
  157. _EntryList = s ; //_EntryList為倒置后的cxq隊列
  158. } else {
  159. // QMode == 0 or QMode == 2
  160. _EntryList = w ;
  161. ObjectWaiter * q = NULL ;
  162. ObjectWaiter * p ;
  163. for (p = w ; p != NULL ; p = p->_next) {
  164. guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
  165. p->TState = ObjectWaiter::TS_ENTER ;
  166. //構造成雙向的
  167. p->_prev = q ;
  168. q = p ;
  169. }
  170. }
  171. if (_succ != NULL) continue;
  172. w = _EntryList ;
  173. if (w != NULL) {
  174. ExitEpilog ( Self, w) ; //從_EntryList中喚醒線程
  175. return ;
  176. }
  177. }
  178. }

ExitEpilog用來喚醒線程,代碼如下:

  1. void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
  2. assert (_owner == Self, "invariant") ;
  3. _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
  4. ParkEvent * Trigger = Wakee->_event ;
  5. Wakee = NULL ;
  6. OrderAccess::release_store_ptr (&_owner, NULL) ;
  7. OrderAccess::fence() ;
  8. if (SafepointSynchronize::do_call_back()) {
  9. TEVENT (unpark before SAFEPOINT) ;
  10. }
  11. DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
  12. Trigger->unpark() ; //喚醒線程
  13. // Maintain stats and report events to JVMTI
  14. if (ObjectMonitor::_sync_Parks != NULL) {
  15. ObjectMonitor::_sync_Parks->inc() ;
  16. }
  17. }

重量級鎖出口流程圖:

 

自旋

通過對源碼的分析,發現多處存在自旋和tryLock操作,那么這些操作好不好,如果tryLock過少,大部分線程都會掛起,因為在擁有對象鎖的線程釋放鎖后不能及時感知,導致用戶態和核心態狀態轉換較多,效率低下,極限思維就是:沒有自旋,所有線程掛起,如果tryLock過多,存在兩個問題:1. 即使自旋避免了掛起,但是自旋的代價超過了掛起,得不償失,那我還不如不要自旋了。 2. 如果自旋仍然不能避免大部分掛起的話,那就是又自旋又掛起,效率太低。極限思維就是:無限自旋,白白浪費了cpu資源,所以在代碼中每個自旋和tryLock的插入應該都是經過測試后決定的。

 

編譯期間鎖優化

鎖消除

還是先看一下簡潔的代碼

  1. public class test {
  2. public String test(String s1,String s2) {
  3. return s1+s2;
  4. }
  5. }

javac javap后:

  1. public class test {
  2. public test();
  3. Code:
  4. 0: aload_0
  5. 1: invokespecial #1 // Method java/lang/Object."<init>":()V
  6. 4: return
  7.  
  8. public java.lang.String test(java.lang.String, java.lang.String);
  9. Code:
  10. 0: new #2 // class java/lang/StringBuilder
  11. 3: dup
  12. 4: invokespecial #3 // Method java/lang/StringBuilder."<init>":()V
  13. 7: aload_1
  14. 8: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
  15. 11: aload_2
  16. 12: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
  17. 15: invokevirtual #5 // Method java/lang/StringBuilder.toString:()Ljava/lang/String;
  18. 18: areturn
  19. }

上述字節碼等價成java代碼為:

  1. public class test {
  2. public String test(String s1,String s2) {
  3. StringBuilder sb = new StringBuilder();
  4. sb. append(s1);
  5. sb. append(s2);
  6. return sb.toString();
  7. }
  8. }

sb的append方法是同步的,但是sb是在方法內部,每個運行的線程都會實例化一個StringBuilder對象,在私有棧持有該對象引用(其他線程無法得到),也就是說sb不存在多線程訪問,那么在jvm運行期間,即時編譯器就會將鎖消除

鎖粗化

將前面的代碼稍微變一下:

  1. public class test {
  2. StringBuilder sb = new StringBuilder();
  3. public String test(String s1,String s2) {
  4. sb. append(s1);
  5. sb. append(s2);
  6. return sb.toString();
  7. }
  8. }

首先可以確定的是這段代碼不能鎖消除優化,因為sb是類的實例變量,會被多線程訪問,存在線程安全問題,那么訪問test方法的時候就會對sb對象,加鎖,解鎖,加鎖,解鎖,很顯然這一過程將會大大降低效率,因此在即時編譯的時候會進行鎖粗化,在sb.appends(s1)之前加鎖,在sb.append(s2)執行完后釋放鎖。

總結

引入偏向鎖的目的:在只有單線程執行情況下,盡量減少不必要的輕量級鎖執行路徑,輕量級鎖的獲取及釋放依賴多次CAS原子指令,而偏向鎖只依賴一次CAS原子指令置換ThreadID,之后只要判斷線程ID為當前線程即可,偏向鎖使用了一種等到競爭出現才釋放鎖的機制,消除偏向鎖的開銷還是蠻大的。如果同步資源或代碼一直都是多線程訪問的,那么消除偏向鎖這一步驟對你來說就是多余的,可以通過-XX:-UseBiasedLocking=false來關閉
引入輕量級鎖的目的:在多線程交替執行同步塊的情況下,盡量避免重量級鎖引起的性能消耗(用戶態和核心態轉換),但是如果多個線程在同一時刻進入臨界區,會導致輕量級鎖膨脹升級重量級鎖,所以輕量級鎖的出現並非是要替代重量級鎖
重入:對於不同級別的鎖都有重入策略,偏向鎖:單線程獨占,重入只用檢查threadId等於該線程;輕量級鎖:重入將棧幀中lock record的header設置為null,重入退出,只用彈出棧幀,直到最后一個重入退出CAS寫回數據釋放鎖;重量級鎖:重入_recursions++,重入退出_recursions--,_recursions=0時釋放鎖
最后放一張摘自網上的一張大圖(保存本地,方便食用):

 

 

轉載自:https://www.linuxidc.com/Linux/2018-02/150798.htm

(9條消息) 公平鎖和非公平鎖介紹,為什么要“非公平”?_vincent_wen0766的博客-CSDN博客_非公平鎖的優勢

什么是公平和非公平

公平鎖指的是按照線程請求的順序,來分配鎖;而非公平鎖指的是不完全按照請求的順序,在一定情況下,可以允許插隊。但需要注意這里的非公平並不是指完全的隨機,不是說線程可以任意插隊,而是僅僅“在合適的時機”插隊

什么時候是合適的時機呢?

假設當前線程在請求獲取鎖的時候,恰巧前一個持有鎖的線程釋放了這把鎖,那么當前申請鎖的線程就可以不顧已經等待的線程而選擇立刻插隊。但是如果當前線程請求的時候,前一個線程並沒有在那一時刻釋放鎖,那么當前線程還是一樣會進入等待隊列

為什么要設置非公平策略呢?

我們都知道非公平是 ReentrantLock的默認策略,如果我們不加以設置的話默認就是非公平的,難道我的這些排隊的時間都白白浪費了嗎,為什么別人比我有優先權呢?畢竟公平是一種很好的行為,而非公平是一種不好的行為

讓我們考慮一種情況,假設線程 A 持有一把鎖,線程 B 請求這把鎖,由於線程 A 已經持有這把鎖了,所以線程 B 會陷入等待,在等待的時候線程 B 會被掛起,也就是進入阻塞狀態,那么當線程 A 釋放鎖的時候,本該輪到線程 B 蘇醒獲取鎖,但如果此時突然有一個線程 C 插隊請求這把鎖,那么根據非公平的策略,會把這把鎖給線程 C,這是因為喚醒線程 B 是需要很大開銷的,很有可能在喚醒之前,線程 C 已經拿到了這把鎖並且執行完任務釋放了這把鎖。相比於等待喚醒線程 B 的漫長過程,插隊的行為會讓線程 C 本身跳過陷入阻塞的過程,如果在鎖代碼中執行的內容不多的話,線程 C 就可以很快完成任務,並且在線程 B 被完全喚醒之前,就把這個鎖交出去,這樣是一個雙贏的局面,對於線程 C 而言,不需要等待提高了它的效率,而對於線程 B 而言,它獲得鎖的時間並沒有推遲,因為等它被喚醒的時候,線程 C 早就釋放鎖了,因為線程 C 的執行速度相比於線程 B 的喚醒速度,是很快的,所以 Java 設計非公平鎖,是為了提高整體的運行效率

公平的場景

用圖示來說明公平和非公平的場景,先來看公平的情況。假設我們創建了一個公平鎖,此時有 4 個線程按順序來請求公平鎖,線程 1 在拿到這把鎖之后,線程 2、3、4 會在等待隊列中開始等待,然后等線程 1 釋放鎖之后,線程 2、3、4 會依次去獲取這把鎖,線程 2 先獲取到的原因是它等待的時間最長

不公平的場景

假設線程 1 在解鎖的時候,突然有線程 5 嘗試獲取這把鎖,那么根據我們的非公平策略,線程 5 是可以拿到這把鎖的,盡管它沒有進入等待隊列,而且線程 2、3、4 等待的時間都比線程 5 要長,但是從整體效率考慮,這把鎖此時還是會交給線程 5 持有

代碼演示公平和非公平

  1. /**
  2.  * 描述:演示公平鎖,分別展示公平和不公平的情況,非公平鎖會讓現在持有鎖的線程優先再次獲取到鎖
  3.  */
  4. public class FairAndUnfair {
  5.  
  6.  
  7.      public static void main(String args[]) {
  8.          PrintQueue printQueue new PrintQueue();
  9.  
  10.  
  11.         Thread thread[] =  new Thread[10];
  12.          for (int 0; i < 10; i++) {
  13.             thread[i] =  new Thread(new Job(printQueue), "Thread " + i);
  14.         }
  15.  
  16.  
  17.          for (int 0; i < 10; i++) {
  18.             thread[i].start();
  19.              try {
  20.                 Thread.sleep( 100);
  21.             }  catch (InterruptedException e) {
  22.                 e.printStackTrace();
  23.             }
  24.         }
  25.     }
  26.  
  27.  
  28. }
  29.  
  30.  
  31. class Job implements Runnable {
  32.  
  33.  
  34.      private PrintQueue printQueue;
  35.  
  36.  
  37.      public Job(PrintQueue printQueue) {
  38.          this.printQueue = printQueue;
  39.     }
  40.  
  41.  
  42.      @Override
  43.      public void run() {
  44.         System.out.printf( "%s: Going to print a job\n", Thread.currentThread().getName());
  45.         printQueue.printJob( new Object());
  46.         System.out.printf( "%s: The document has been printed\n", Thread.currentThread().getName());
  47.     }
  48.  
  49.  
  50. }
  51.  
  52.  
  53. class PrintQueue {
  54.  
  55.  
  56.      private final Lock queueLock new ReentrantLock(false);
  57.  
  58.  
  59.      public void printJob(Object document) {
  60.         queueLock.lock();
  61.  
  62.  
  63.          try {
  64.              Long duration = (long) (Math.random() * 10000);
  65.             System.out.printf( "%s: PrintQueue: Printing a Job during %d seconds\n",
  66.                     Thread.currentThread().getName(), (duration /  1000));
  67.             Thread.sleep(duration);
  68.         }  catch (InterruptedException e) {
  69.             e.printStackTrace();
  70.         }  finally {
  71.             queueLock.unlock();
  72.         }
  73.  
  74.  
  75.         queueLock.lock();
  76.          try {
  77.              Long duration = (long) (Math.random() * 10000);
  78.             System.out.printf( "%s: PrintQueue: Printing a Job during %d seconds\n",
  79.                     Thread.currentThread().getName(), (duration /  1000));
  80.             Thread.sleep(duration);
  81.         }  catch (InterruptedException e) {
  82.             e.printStackTrace();
  83.         }  finally {
  84.             queueLock.unlock();
  85.             }
  86.     }
  87. }

可以通過改變 new ReentrantLock(false) 中的參數來設置公平/非公平鎖,以上代碼在公平的情況下的輸出

  1. Thread 0: Going to print a job
  2. Thread 0: PrintQueue: Printing a Job during 5 seconds
  3. Thread 1: Going to print a job
  4. Thread 2: Going to print a job
  5. Thread 3: Going to print a job
  6. Thread 4: Going to print a job
  7. Thread 5: Going to print a job
  8. Thread 6: Going to print a job
  9. Thread 7: Going to print a job
  10. Thread 8: Going to print a job
  11. Thread 9: Going to print a job
  12. Thread 1: PrintQueue: Printing a Job during 3 seconds
  13. Thread 2: PrintQueue: Printing a Job during 4 seconds
  14. Thread 3: PrintQueue: Printing a Job during 3 seconds
  15. Thread 4: PrintQueue: Printing a Job during 9 seconds
  16. Thread 5: PrintQueue: Printing a Job during 5 seconds
  17. Thread 6: PrintQueue: Printing a Job during 7 seconds
  18. Thread 7: PrintQueue: Printing a Job during 3 seconds
  19. Thread 8: PrintQueue: Printing a Job during 9 seconds
  20. Thread 9: PrintQueue: Printing a Job during 5 seconds
  21. Thread 0: PrintQueue: Printing a Job during 8 seconds
  22. Thread 0: The document has been printed
  23. Thread 1: PrintQueue: Printing a Job during 1 seconds
  24. Thread 1: The document has been printed
  25. Thread 2: PrintQueue: Printing a Job during 8 seconds
  26. Thread 2: The document has been printed
  27. Thread 3: PrintQueue: Printing a Job during 2 seconds
  28. Thread 3: The document has been printed
  29. Thread 4: PrintQueue: Printing a Job during 0 seconds
  30. Thread 4: The document has been printed
  31. Thread 5: PrintQueue: Printing a Job during 7 seconds
  32. Thread 5: The document has been printed
  33. Thread 6: PrintQueue: Printing a Job during 3 seconds
  34. Thread 6: The document has been printed
  35. Thread 7: PrintQueue: Printing a Job during 9 seconds
  36. Thread 7: The document has been printed
  37. Thread 8: PrintQueue: Printing a Job during 5 seconds
  38. Thread 8: The document has been printed
  39. Thread 9: PrintQueue: Printing a Job during 9 seconds
  40. Thread 9: The document has been printed

而以上代碼在非公平的情況下的輸出是這樣的

  1. Thread 0: Going to print a job
  2. Thread 0: PrintQueue: Printing a Job during 6 seconds
  3. Thread 1: Going to print a job
  4. Thread 2: Going to print a job
  5. Thread 3: Going to print a job
  6. Thread 4: Going to print a job
  7. Thread 5: Going to print a job
  8. Thread 6: Going to print a job
  9. Thread 7: Going to print a job
  10. Thread 8: Going to print a job
  11. Thread 9: Going to print a job
  12. Thread 0: PrintQueue: Printing a Job during 8 seconds
  13. Thread 0: The document has been printed
  14. Thread 1: PrintQueue: Printing a Job during 9 seconds
  15. Thread 1: PrintQueue: Printing a Job during 8 seconds
  16. Thread 1: The document has been printed
  17. Thread 2: PrintQueue: Printing a Job during 6 seconds
  18. Thread 2: PrintQueue: Printing a Job during 4 seconds
  19. Thread 2: The document has been printed
  20. Thread 3: PrintQueue: Printing a Job during 9 seconds
  21. Thread 3: PrintQueue: Printing a Job during 8 seconds
  22. Thread 3: The document has been printed
  23. Thread 4: PrintQueue: Printing a Job during 4 seconds
  24. Thread 4: PrintQueue: Printing a Job during 2 seconds
  25. Thread 4: The document has been printed
  26. Thread 5: PrintQueue: Printing a Job during 2 seconds
  27. Thread 5: PrintQueue: Printing a Job during 5 seconds
  28. Thread 5: The document has been printed
  29. Thread 6: PrintQueue: Printing a Job during 2 seconds
  30. Thread 6: PrintQueue: Printing a Job during 6 seconds
  31. Thread 6: The document has been printed
  32. Thread 7: PrintQueue: Printing a Job during 6 seconds
  33. Thread 7: PrintQueue: Printing a Job during 4 seconds
  34. Thread 7: The document has been printed
  35. Thread 8: PrintQueue: Printing a Job during 3 seconds
  36. Thread 8: PrintQueue: Printing a Job during 6 seconds
  37. Thread 8: The document has been printed
  38. Thread 9: PrintQueue: Printing a Job during 3 seconds
  39. Thread 9: PrintQueue: Printing a Job during 5 seconds
  40. Thread 9: The document has been printed

可以看出,非公平情況下,存在搶鎖“插隊”的現象,比如Thread 0 在釋放鎖后又能優先獲取到鎖,雖然此時在等待隊列中已經有 Thread 1 ~ Thread 9 在排隊了

公平和非公平的優缺點

  優勢 劣勢
公平鎖 各線程公平平等,每一個線程在等待一段時間后,總有執行的機會 更慢,吞吐量更少
非公平鎖 更快,吞吐量更大 有可能產生線程飢餓,也就是某些線程在長時間內,始終得不到執行

源碼分析

下面我們來分析公平和非公平鎖的源碼,具體看下它們是怎樣實現的,可以看到在 ReentrantLock 類包含一個 Sync 類,這個類繼承自AQS(AbstractQueuedSynchronizer),代碼如下

  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2.  
  3. private static final long serialVersionUID 7373984872572414699L;
  4.  
  5. /** Synchronizer providing all implementation mechanics */
  6.  
  7. private final Sync sync;

Sync 類的代碼

abstract static class Sync extends AbstractQueuedSynchronizer {...} 

Sync 有公平鎖 FairSync 和非公平鎖 NonfairSync兩個子類

  1. static final class NonfairSync extends Sync {...}
  2. static final class FairSync extends Sync {...}

公平鎖的鎖獲取源碼如下

  1. protected final boolean tryAcquire(int acquires) {
  2.      final Thread current = Thread.currentThread();
  3.      int = getState();
  4.      if (c == 0) {
  5.          if (!hasQueuedPredecessors() && //這里判斷了 hasQueuedPredecessors()
  6.                 compareAndSetState( 0, acquires)) {
  7.             setExclusiveOwnerThread(current);
  8.              return true;
  9.         }
  10.     }  else if (current == getExclusiveOwnerThread()) {
  11.          int nextc = c + acquires;
  12.          if (nextc < 0) {
  13.              throw new Error("Maximum lock count exceeded");
  14.         }
  15.         setState(nextc);
  16.          return true;
  17.     }
  18.      return false;
  19. }

非公平鎖的鎖獲取源碼如下

  1. final boolean nonfairTryAcquire(int acquires) {
  2.      final Thread current = Thread.currentThread();
  3.      int = getState();
  4.      if (c == 0) {
  5.          if (compareAndSetState(0, acquires)) { //這里沒有判斷 hasQueuedPredecessors()
  6.             setExclusiveOwnerThread(current);
  7.              return true;
  8.         }
  9.     }
  10.      else if (current == getExclusiveOwnerThread()) {
  11.          int nextc = c + acquires;
  12.          if (nextc < 0) // overflow
  13.          throw new Error("Maximum lock count exceeded");
  14.         setState(nextc);
  15.          return true;
  16.     }
  17.      return false;
  18. }

通過對比,我們可以明顯的看出公平鎖與非公平鎖的 lock() 方法唯一的區別就在於公平鎖在獲取鎖時多了一個限制條件:hasQueuedPredecessors() 為 false,這個方法就是判斷在等待隊列中是否已經有線程在排隊了。這也就是公平鎖和非公平鎖的核心區別,如果是公平鎖,那么一旦已經有線程在排隊了,當前線程就不再嘗試獲取鎖;對於非公平鎖而言,無論是否已經有線程在排隊,都會嘗試獲取一下鎖,獲取不到的話,再去排隊

注意:針對 tryLock() 方法,它不遵守設定的公平原則

例如,當有線程執行 tryLock() 方法的時候,一旦有線程釋放了鎖,那么這個正在 tryLock 的線程就能獲取到鎖,即使設置的是公平鎖模式,即使在它之前已經有其他正在等待隊列中等待的線程,簡單地說就是 tryLock 可以插隊

看源碼就會發現

  1. public boolean tryLock() {
  2.      return sync.nonfairTryAcquire(1);
  3. }

這里調用的就是 nonfairTryAcquire(),表明了是不公平的,和鎖本身是否是公平鎖無關

公平鎖就是會按照多個線程申請鎖的順序來獲取鎖,從而實現公平的特性。非公平鎖加鎖時不考慮排隊等待情況,直接嘗試獲取鎖,所以存在后申請卻先獲得鎖的情況,但由此也提高了整體的效率


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM