synchronized的使用
synchronized關鍵字是Java中解決並發問題的一種常用方法,也是最簡單的一種方法,其作用有三個:(1)互斥性:確保線程互斥的訪問同步代碼(2)可見性:保證共享變量的修改能夠及時可見(3)有序性:有效解決重排序問題,其用法也有三個:
- 修飾實例方法
- 修飾靜態方法
- 修飾代碼塊
修飾實例方法
public class Thread1 implements Runnable{
//共享資源(臨界資源)
static int i=0;
//如果沒有synchronized關鍵字,輸出小於20000
public synchronized void increase(){
i++;
}
public void run() {
for(int j=0;j<10000;j++){
increase();
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 t=new Thread1();
Thread t1=new Thread(t);
Thread t2=new Thread(t);
t1.start();
t2.start();
t1.join();//主線程等待t1執行完畢
t2.join();//主線程等待t2執行完畢
System.out.println(i);
}
/**
* 輸出結果:
* 20000
*/
}
修飾靜態方法
public class Thread1 {
//共享資源(臨界資源)
static int i = 0;
//如果沒有synchronized關鍵字,輸出小於20000
public static synchronized void increase() {
i++;
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
public void run() {
for (int j = 0; j < 10000; j++) {
increase();
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 10000; j++) {
increase();
}
}
});
t1.start();
t2.start();
t1.join();//主線程等待t1執行完畢
t2.join();//主線程等待t2執行完畢
System.out.println(i);
}
/**
* 輸出結果:
* 20000
*/
}
修飾代碼塊
public class Thread1 implements Runnable{
//共享資源(臨界資源)
static int i=0;
@Override
public void run() {
for(int j=0;j<10000;j++){
//獲得了String的類鎖
synchronized (String.class){
i++;}
}
}
public static void main(String[] args) throws InterruptedException {
Thread1 t=new Thread1();
Thread t1=new Thread(t);
Thread t2=new Thread(t);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
/**
* 輸出結果:
* 20000
*/
}
總結
- synchronized修飾的實例方法,多線程並發訪問時,只能有一個線程進入,獲得對象內置鎖,其他線程阻塞等待,但在此期間線程仍然可以訪問其他方法。
- synchronized修飾的靜態方法,多線程並發訪問時,只能有一個線程進入,獲得類鎖,其他線程阻塞等待,但在此期間線程仍然可以訪問其他方法。
- synchronized修飾的代碼塊,多線程並發訪問時,只能有一個線程進入,根據括號中的對象或者是類,獲得相應的對象內置鎖或者是類鎖
- 每個類都有一個類鎖,類的每個對象也有一個內置鎖,它們是互不干擾的,也就是說一個線程可以同時獲得類鎖和該類實例化對象的內置鎖,當線程訪問非synchronzied修飾的方法時,並不需要獲得鎖,因此不會產生阻塞。
Synchronzied的底層原理
對象頭和內置鎖(ObjectMonitor)
根據jvm的分區,對象分配在堆內存中,可以用下圖表示:
對象頭
Hotspot虛擬機的對象頭包括兩部分信息,第一部分用於儲存對象自身的運行時數據,如哈希碼,GC分代年齡,鎖狀態標志,鎖指針等,這部分數據在32bit和64bit的虛擬機中大小分別為32bit和64bit,官方稱它為"Mark word",考慮到虛擬機的空間效率,Mark Word被設計成一個非固定的數據結構以便在極小的空間中存儲盡量多的信息,它會根據對象的狀態復用自己的存儲空間,詳細情況如下圖:
對象頭的另外一部分是類型指針,即對象指向它的類元數據的指針,如果對象訪問定位方式是句柄訪問,那么該部分沒有,如果是直接訪問,該部分保留。句柄訪問方式如下圖:
直接訪問如下圖:
內置鎖(ObjectMonitor)
通常所說的對象的內置鎖,是對象頭Mark Word中的重量級鎖指針指向的monitor對象,該對象是在HotSpot底層C++語言編寫的(openjdk里面看),簡單看一下代碼:
//結構體如下
ObjectMonitor::ObjectMonitor() {
_header = NULL;
_count = 0;
_waiters = 0,
_recursions = 0; //線程的重入次數
_object = NULL;
_owner = NULL; //標識擁有該monitor的線程
_WaitSet = NULL; //等待線程組成的雙向循環鏈表,_WaitSet是第一個節點
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ; //多線程競爭鎖進入時的單向鏈表
FreeNext = NULL ;
_EntryList = NULL ; //_owner從該雙向循環鏈表中喚醒線程結點,_EntryList是第一個節點
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
ObjectMonitor隊列之間的關系轉換可以用下圖表示:
既然提到了_waitSet和_EntryList(_cxq隊列后面會說),那就看一下底層的wait和notify方法
wait方法的實現過程:
//1.調用ObjectSynchronizer::wait方法
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
/*省略 */
//2.獲得Object的monitor對象(即內置鎖)
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
//3.調用monitor的wait方法
monitor->wait(millis, true, THREAD);
/*省略*/
}
//4.在wait方法中調用addWaiter方法
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
/*省略*/
if (_WaitSet == NULL) {
//_WaitSet為null,就初始化_waitSet
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
//否則就尾插
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
//5.然后在ObjectMonitor::exit釋放鎖,接着 thread_ParkEvent->park 也就是wait
總結:通過object獲得內置鎖(objectMonitor),通過內置鎖將Thread封裝成OjectWaiter對象,然后addWaiter將它插入以_waitSet為首結點的等待線程鏈表中去,最后釋放鎖。
notify方法的底層實現
//1.調用ObjectSynchronizer::notify方法
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
/*省略*/
//2.調用ObjectSynchronizer::inflate方法
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
//3.通過inflate方法得到ObjectMonitor對象
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
/*省略*/
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is inva;lid");
return inf
}
/*省略*/
}
//4.調用ObjectMonitor的notify方法
void ObjectMonitor::notify(TRAPS) {
/*省略*/
//5.調用DequeueWaiter方法移出_waiterSet第一個結點
ObjectWaiter * iterator = DequeueWaiter() ;
//6.后面省略是將上面DequeueWaiter尾插入_EntrySet的操作
/**省略*/
}
總結:通過object獲得內置鎖(objectMonitor),調用內置鎖的notify方法,通過_waitset結點移出等待鏈表中的首結點,將它置於_EntrySet中去,等待獲取鎖。注意:notifyAll根據policy不同可能移入_EntryList或者_cxq隊列中,此處不詳談。
在了解對象頭和ObjectMonitor后,接下來我們結合分析synchronzied的底層實現。
synchronzied的底層原理
synchronized修飾代碼塊
通過下列簡介的代碼來分析:
public class test{
public void testSyn(){
synchronized(this){
}
}
}
javac編譯,javap -verbose反編譯,結果如下:
/**
* ...
**/
public void testSyn();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=3, args_size=1
0: aload_0
1: dup
2: astore_1
3: monitorenter //申請獲得對象的內置鎖
4: aload_1
5: monitorexit //釋放對象內置鎖
6: goto 14
9: astore_2
10: aload_1
11: monitorexit //釋放對象內置鎖
12: aload_2
13: athrow
14: return
此處我們只討論了重量級鎖(ObjectMonitor)的獲取情況,其他鎖的獲取放在后面synchronzied的優化中進行說明。源碼如下:
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
//通過CAS操作嘗試把monitor的_owner字段設置為當前線程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
//獲取鎖失敗
if (cur == NULL) {
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
return ;
}
//如果之前的_owner指向該THREAD,那么該線程是重入,_recursions++
if (cur == Self) {
_recursions ++ ;
return ;
}
//如果當前線程是第一次進入該monitor,設置_recursions為1,_owner為當前線程
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ; //_recursions標記為1
_owner = Self ; //設置owner
OwnerIsThread = 1 ;
return ;
}
/**
*此處省略鎖的自旋優化等操作,統一放在后面synchronzied優化中說
**/
總結:
- 如果monitor的進入數為0,則該線程進入monitor,然后將進入數設置為1,該線程即為monitor的owner
- 如果線程已經占有該monitor,只是重新進入,則進入monitor的進入數加1.
- 如果其他線程已經占用了monitor,則該線程進入阻塞狀態,直到monitor的進入數為0,再重新嘗試獲取monitor的所有權
synchronized修飾方法
還是從簡潔的代碼來分析:
public class test{
public synchronized void testSyn(){
}
}
javac編譯,javap -verbose反編譯,結果如下:
/**
* ...
**/
public synchronized void testSyn();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=1, args_size=1
0: return
LineNumberTable:
line 3: 0
結果和synchronized修飾代碼塊的情況不同,仔細比較會發現多了ACC_SYNCHRONIZED這個標識,test.java通過javac編譯形成的test.class文件,在該文件中包含了testSyn方法的方法表,其中ACC_SYNCHRONIZED標志位是1,當線程執行方法的時候會檢查該標志位,如果為1,就自動的在該方法前后添加monitorenter和monitorexit指令,可以稱為monitor指令的隱式調用。
上面所介紹的通過synchronzied實現同步用到了對象的內置鎖(ObjectMonitor),而在ObjectMonitor的函數調用中會涉及到Mutex lock等特權指令,那么這個時候就存在操作系統用戶態和核心態的轉換,這種切換會消耗大量的系統資源,因為用戶態與內核態都有各自專用的內存空間,專用的寄存器等,用戶態切換至內核態需要傳遞給許多變量、參數給內核,內核也需要保護好用戶態在切換時的一些寄存器值、變量等,這也是為什么早期的synchronized效率低的原因。在jdk1.6之后,從jvm層面做了很大的優化,下面主要介紹做了哪些優化。
synchronized的優化
在了解了synchronized重量級鎖效率特別低之后,jdk自然做了一些優化,出現了偏向鎖,輕量級鎖,重量級鎖,自旋等優化,我們應該改正monitorenter指令就是獲取對象重量級鎖的錯誤認識,很顯然,優化之后,鎖的獲取判斷次序是偏向鎖->輕量級鎖->重量級鎖。
偏向鎖
源碼如下:
//偏向鎖入口
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
//UseBiasedLocking判斷是否開啟偏向鎖
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
//獲取偏向鎖的函數調用
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
}
//不能偏向,就獲取輕量級鎖
slow_enter (obj, lock, THREAD) ;
}
BiasedLocking::revoke_and_rebias調用過程如下流程圖:
偏向鎖的撤銷過程如下:
輕量級鎖
輕量級鎖獲取源碼:
//輕量級鎖入口
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark(); //獲得Mark Word
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//是否無鎖不可偏向,標志001
if (mark->is_neutral()) {
//圖A步驟1
lock->set_displaced_header(mark);
//圖A步驟2
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
} else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { //如果Mark Word指向本地棧幀,線程重入
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);//header設置為null
return;
}
lock->set_displace
d_header(markOopDesc::unused_mark());
//輕量級鎖膨脹,膨脹完成之后嘗試獲取重量級鎖
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
輕量級鎖獲取流程如下:
輕量級鎖撤銷源碼:
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
markOop dhw = lock->displaced_header();
markOop mark ;
if (dhw == NULL) {//如果header為null,說明這是線程重入的棧幀,直接返回,不用回寫
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
}
return ;
}
mark = object->mark() ;
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
//CAS將Mark Word內容寫回
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
//CAS操作失敗,輕量級鎖膨脹,為什么在撤銷鎖的時候會有失敗的可能?
ObjectSynchronizer::inflate(THREAD, object)->exit (THREAD) ;
}
輕量級鎖撤銷流程如下:
輕量級鎖膨脹
源代碼:
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
assert (Universe::verify_in_progress() ||
!SafepointSynchronize::is_at_safepoint(), "invariant") ;
for (;;) { // 為后面的continue操作提供自旋
const markOop mark = object->mark() ; //獲得Mark Word結構
assert (!mark->has_bias_pattern(), "invariant") ;
//Mark Word可能有以下幾種狀態:
// * Inflated(膨脹完成) - just return
// * Stack-locked(輕量級鎖) - coerce it to inflated
// * INFLATING(膨脹中) - busy wait for conversion to complete
// * Neutral(無鎖) - aggressively inflate the object.
// * BIASED(偏向鎖) - Illegal. We should never see this
if (mark->has_monitor()) {//判斷是否是重量級鎖
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
//Mark->has_monitor()為true,說明已經是重量級鎖了,膨脹過程已經完成,返回
return inf ;
}
if (mark == markOopDesc::INFLATING()) { //判斷是否在膨脹
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ; //如果正在膨脹,自旋等待膨脹完成
}
if (mark->has_locker()) { //如果當前是輕量級鎖
ObjectMonitor * m = omAlloc (Self) ;//返回一個對象的內置ObjectMonitor對象
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ;//設置自旋獲取重量級鎖的次數
//CAS操作標識Mark Word正在膨脹
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
omRelease (Self, m, true) ;
continue ; //如果上述CAS操作失敗,自旋等待膨脹完成
}
m->set_header(dmw) ;
m->set_owner(mark->locker());//設置ObjectMonitor的_owner為擁有對象輕量級鎖的線程,而不是當前正在inflate的線程
m->set_object(object);
/**
*省略了部分代碼
**/
return m ;
}
}
}
輕量級鎖膨脹流程圖:
現在來回答下之前提出的問題:為什么在撤銷輕量級鎖的時候會有失敗的可能?
假設thread1擁有了輕量級鎖,Mark Word指向thread1棧幀,thread2請求鎖的時候,就會膨脹初始化ObjectMonitor對象,將Mark Word更新為指向ObjectMonitor的指針,那么在thread1退出的時候,CAS操作會失敗,因為Mark Word不再指向thread1的棧幀,這個時候thread1自旋等待infalte完畢,執行重量級鎖的退出操作
重量級鎖
重量級鎖的獲取入口:
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
return ;
}
if (cur == Self) {
_recursions ++ ;
return ;
}
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
/**
*上述部分在前面已經分析過,不再累述
**/
Self->_Stalled = intptr_t(this) ;
//TrySpin是一個自旋獲取鎖的操作,此處就不列出源碼了
if (Knob_SpinEarly && TrySpin (Self) > 0) {
Self->_Stalled = 0 ;
return ;
}
/*
*省略部分代碼
*/
for (;;) {
EnterI (THREAD) ;
/**
*省略了部分代碼
**/
}
}
進入EnterI (TRAPS)方法(這段代碼個人覺得很有意思):
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
if (TryLock (Self) > 0) {
//這下不自旋了,我就默默的TryLock一下
return ;
}
DeferredInitialize () ;
//此處又有自旋獲取鎖的操作
if (TrySpin (Self) > 0) {
return ;
}
/**
*到此,自旋終於全失敗了,要入隊掛起了
**/
ObjectWaiter node(Self) ; //將Thread封裝成ObjectWaiter結點
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
ObjectWaiter * nxt ;
for (;;) { //循環,保證將node插入隊列
node._next = nxt = _cxq ;//將node插入到_cxq隊列的首部
//CAS修改_cxq指向node
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
if (TryLock (Self) > 0) {//我再默默的TryLock一下,真的是不想掛起呀!
return ;
}
}
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
for (;;) {
if (TryLock (Self) > 0) break ;//臨死之前,我再TryLock下
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ; //終於掛起了
}
if (TryLock(Self) > 0) break ;
/**
*后面代碼省略
**/
}
try了那么多次lock,接下來看下TryLock:
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;//如果有線程還擁有着重量級鎖,退出
//CAS操作將_owner修改為當前線程,操作成功return>0
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
return 1 ;
}
//CAS更新失敗return<0
if (true) return -1 ;
}
}
重量級鎖獲取入口流程圖:
重量級鎖的出口:
void ATTR ObjectMonitor::exit(TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
TEVENT (Exit - Throw IMSX) ;
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
if (_recursions != 0) {
_recursions--; // 如果_recursions次數不為0.自減
TEVENT (Inflated exit - recursive) ;
return ;
}
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
for (;;) {
if (Knob_ExitPolicy == 0) {
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::storeload() ;
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
/**
*模式2:cxq隊列的優先權大於EntryList,直接從cxq隊列中取出一個線程結點,准備喚醒
**/
w = _cxq ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
/**
*模式3:將cxq隊列插入到_EntryList尾部
**/
w = _cxq ;
for (;;) {
//CAS操作取出cxq隊列首結點
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ; //更新w,自旋
}
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ; //改變ObjectWaiter狀態
//下面兩句為cxq隊列反向構造一條鏈,即將cxq變成雙向鏈表
p->_prev = q ;
q = p ;
}
ObjectWaiter * Tail ;
//獲得_EntryList尾結點
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;//_EntryList為空,_EntryList=w
} else {
//將w插入_EntryList隊列尾部
Tail->_next = w ;
w->_prev = Tail ;
}
}
if (QMode == 4 && _cxq != NULL) {
/**
*模式四:將cxq隊列插入到_EntryList頭部
**/
w = _cxq ;
for (;;) {
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
if (_EntryList != NULL) {
//q為cxq隊列最后一個結點
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
}
w = _EntryList ;
if (w != NULL) {
ExitEpilog (Self, w) ;//從_EntryList中喚醒線程
return ;
}
w = _cxq ;
if (w == NULL) continue ; //如果_cxq和_EntryList隊列都為空,自旋
for (;;) {
//自旋再獲得cxq首結點
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
/**
*下面執行的是:cxq不為空,_EntryList為空的情況
**/
if (QMode == 1) {//結合前面的代碼,如果QMode == 1,_EntryList不為空,直接從_EntryList中喚醒線程
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
//下面的操作是雙向鏈表的倒置
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;//_EntryList為倒置后的cxq隊列
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
//構造成雙向的
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
ExitEpilog (Self, w) ; //從_EntryList中喚醒線程
return ;
}
}
}
ExitEpilog用來喚醒線程,代碼如下:
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
Wakee = NULL ;
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ;
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark() ; //喚醒線程
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
重量級鎖出口流程圖:
自旋
通過對源碼的分析,發現多處存在自旋和tryLock操作,那么這些操作好不好,如果tryLock過少,大部分線程都會掛起,因為在擁有對象鎖的線程釋放鎖后不能及時感知,導致用戶態和核心態狀態轉換較多,效率低下,極限思維就是:沒有自旋,所有線程掛起,如果tryLock過多,存在兩個問題:1. 即使自旋避免了掛起,但是自旋的代價超過了掛起,得不償失,那我還不如不要自旋了。 2. 如果自旋仍然不能避免大部分掛起的話,那就是又自旋又掛起,效率太低。極限思維就是:無限自旋,白白浪費了cpu資源,所以在代碼中每個自旋和tryLock的插入應該都是經過測試后決定的。
編譯期間鎖優化
鎖消除
還是先看一下簡潔的代碼
public class test {
public String test(String s1,String s2) {
return s1+s2;
}
}
javac javap后:
public class test {
public test();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
public java.lang.String test(java.lang.String, java.lang.String);
Code:
0: new #2 // class java/lang/StringBuilder
3: dup
4: invokespecial #3 // Method java/lang/StringBuilder."<init>":()V
7: aload_1
8: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
11: aload_2
12: invokevirtual #4 // Method java/lang/StringBuilder.append:(Ljava/lang/String;)Ljava/lang/StringBuilder;
15: invokevirtual #5 // Method java/lang/StringBuilder.toString:()Ljava/lang/String;
18: areturn
}
上述字節碼等價成java代碼為:
public class test {
public String test(String s1,String s2) {
StringBuilder sb = new StringBuilder();
sb.append(s1);
sb.append(s2);
return sb.toString();
}
}
sb的append方法是同步的,但是sb是在方法內部,每個運行的線程都會實例化一個StringBuilder對象,在私有棧持有該對象引用(其他線程無法得到),也就是說sb不存在多線程訪問,那么在jvm運行期間,即時編譯器就會將鎖消除
鎖粗化
將前面的代碼稍微變一下:
public class test {
StringBuilder sb = new StringBuilder();
public String test(String s1,String s2) {
sb.append(s1);
sb.append(s2);
return sb.toString();
}
}
首先可以確定的是這段代碼不能鎖消除優化,因為sb是類的實例變量,會被多線程訪問,存在線程安全問題,那么訪問test方法的時候就會對sb對象,加鎖,解鎖,加鎖,解鎖,很顯然這一過程將會大大降低效率,因此在即時編譯的時候會進行鎖粗化,在sb.appends(s1)之前加鎖,在sb.append(s2)執行完后釋放鎖。
總結
引入偏向鎖的目的:在只有單線程執行情況下,盡量減少不必要的輕量級鎖執行路徑,輕量級鎖的獲取及釋放依賴多次CAS原子指令,而偏向鎖只依賴一次CAS原子指令置換ThreadID,之后只要判斷線程ID為當前線程即可,偏向鎖使用了一種等到競爭出現才釋放鎖的機制,消除偏向鎖的開銷還是蠻大的。如果同步資源或代碼一直都是多線程訪問的,那么消除偏向鎖這一步驟對你來說就是多余的,可以通過-XX:-UseBiasedLocking=false來關閉
引入輕量級鎖的目的:在多線程交替執行同步塊的情況下,盡量避免重量級鎖引起的性能消耗(用戶態和核心態轉換),但是如果多個線程在同一時刻進入臨界區,會導致輕量級鎖膨脹升級重量級鎖,所以輕量級鎖的出現並非是要替代重量級鎖
重入:對於不同級別的鎖都有重入策略,偏向鎖:單線程獨占,重入只用檢查threadId等於該線程;輕量級鎖:重入將棧幀中lock record的header設置為null,重入退出,只用彈出棧幀,直到最后一個重入退出CAS寫回數據釋放鎖;重量級鎖:重入_recursions++,重入退出_recursions--,_recursions=0時釋放鎖
最后放一張摘自網上的一張大圖(保存本地,方便食用):
ps:源碼解讀與流程圖都是原創,可能會有貽誤,歡迎指正,過程不易,如果覺得幫到了你,就點個推薦吧!
參考資料
http://blog.csdn.net/javazejian/article/details/72828483
http://www.cnblogs.com/paddix/p/5405678.html
http://www.cnblogs.com/paddix/p/5367116.html
https://www.jianshu.com/p/c5058b6fe8e5
http://blog.csdn.net/zqz_zqz/article/details/70233767
http://blog.csdn.net/u012465296/article/details/53022317
https://www.zhihu.com/question/39009953?sort=created
http://blog.sina.com.cn/s/blog_c038e9930102v2hs.html
http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/synchronizer.cpp
http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/9b0ca45cd756/src/share/vm/runtime/objectMonitor.cpp