背景
因為想知道java中的關鍵字,對應的操作系統級別的api是啥,本來打算整理幾個我知道的出來,但是,尷尬的是,我發現java里最重要的synchronized關鍵字,我就不知道它對應的api是什么。
redis中如何獲取鎖
在redis源碼里,線程如果要進入一個同步區(只能單線程進入的代碼塊),會先獲取一個互斥量,如果獲取到了,則可以執行;否則,會阻塞在在這個互斥量上。
互斥量類型定義:
// 定義互斥量
static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
類型為 pthread_mutex_t。
互斥量初始化:
使用互斥量前,要先初始化后,才能使用:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
pthread_mutex_init 這個函數是操作系統提供出來的api,不過應該是類unix系統才有這個。
shell中執行man pthread_mutex_init,可以看到:
The pthread_mutex_init() function shall initialize the mutex referenced by mutex with attributes specified by attr. If attr is NULL, the default mutex
attributes are used; the effect shall be the same as passing the address of a default mutex attributes object. Upon successful initialization, the state of
the mutex becomes initialized and unlocked.
pthread_mutex_init 初始化參數mutex指定的互斥量,,使用attr中指定的屬性。如果attr為空,使用默認參數。
成功初始化后,互斥量的狀態變為已初始化、未鎖定。
如何鎖定、解鎖互斥量
// 1
pthread_mutex_lock(&bio_mutex[type]);
// 2
pthread_mutex_unlock(&bio_mutex[type]);
- 1處,加鎖
- 2處,解鎖
我們可以看下linux下執行man pthread_mutex_lock后,看到的幫助:
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
The mutex object referenced by mutex shall be locked by calling pthread_mutex_lock(). If the mutex is already locked, the calling thread shall block until the mutex becomes available. This operation shall return with the mutex object referenced by mutex in the locked state with the calling thread as its owner.
可以重點看下上面那句注釋:調用pthread_mutex_lock,會導致參數mutext引用的互斥量被鎖定;如果該互斥量早已被鎖定,則調用線程將被阻塞。
redis中線程使用互斥量的例子
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
// 1 鎖定
pthread_mutex_lock(&bio_mutex[type]);
// 將新工作推入隊列
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
// 2 解鎖
pthread_mutex_unlock(&bio_mutex[type]);
}
如要了解更多互斥量,可以看看這篇文章,寫的不錯:
Linux C 編程——多線程和互斥鎖mutex
jdk中synchronized,不考慮輕鎖、偏向鎖,最終有用到前面的互斥量嗎
參考文章
現在不用考慮各種優化,只考慮最終synchronized已經升級為重量級鎖之后的表現,會使用前面的互斥量嗎?
由於作者本身也是半桶水,搞了半天也沒把jdk的源碼調試環境搞起來,只能看看代碼了,順便結合網絡上的一些文章,不過結論應該可靠。
大家先可以參考這兩篇文章:
JVM:鎖實現(synchronized&JSR166)行為分析和相關源碼
簡易流程梳理
我這里也簡單列舉一下整個過程,就從下面這里開始:
// Fast Monitor Enter/Exit
// This the fast monitor enter. The interpreter and compiler use
// some assembly copies of this code. Make sure update those code
// if the following function is changed.
// The implementation is extremely sensitive to race condition. Be careful.
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 1
slow_enter (obj, lock, THREAD) ;
}
1處,前面都是偏向鎖相關的東西,先跳過,進入slow_enter。
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
// 1
if (mark->is_neutral()) {
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
// 2
lock->set_displaced_header(mark);
// 3
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
// Fall through to inflate() ...
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
lock->set_displaced_header(markOopDesc::unused_mark());
// 2
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
-
1處,mark->is_neutral(),判斷對象頭,是否是無鎖狀態,neutral本來是中立的意思,這里表示無鎖。
-
2處,如果無鎖,則調用lock的方法,lock本身是當前線程在棧內持有的對象,調用lock的set_displaced_header方法,參數為待加鎖對象(堆里)的對象頭,意思是,把待加鎖對象的對象頭,設置到線程的棧內變量里。
lock變量的class 類型如下:
class BasicLock VALUE_OBJ_CLASS_SPEC { friend class VMStructs; private: // 1 volatile markOop _displaced_header; public: markOop displaced_header() const { return _displaced_header; } // 2 void set_displaced_header(markOop header) { _displaced_header = header; } void print_on(outputStream* st) const; // move a basic lock (used during deoptimization void move_to(oop obj, BasicLock* dest); static int displaced_header_offset_in_bytes() { return offset_of(BasicLock, _displaced_header); } };結合這里的1、2處代碼,上面那句,意思就是,把待加鎖對象的對象頭,存儲到lock變量 _displaced_header屬性。
-
3處,這里比較復雜。
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)這一句里面,cmpxchg_ptr,定義為:
inline static intptr_t cmpxchg_ptr(intptr_t exchange_value, volatile intptr_t* dest, intptr_t compare_value);這一句就是平時我們說的那種cas操作,表示,如果第二個參數,dest指向的值,和第三個參數,compare_value的值相等,則把第二個參數中的值,設為參數1的值。
重點來看,第二個參數,是什么鬼意思?
Handle obj,說明obj是Handle類型,
class Handle VALUE_OBJ_CLASS_SPEC { private: oop* _handle; protected: // 2 oop obj() const { return *_handle; } public: //1 oop operator () () const { return obj(); }那么,obj()的意思,應該就是,代碼1處,應該是進行了操作符重載,所以會調用obj()方法,obj方法,請看2處,會返回 屬性_handle,當然,這里對屬性進行了解引用。
所以,基本的意思就是,返回_handle這個屬性,執行的oop對象。
然后,再說說參數3,參數3就是mark。
Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)這個mark,是在代碼開頭這樣被賦值的。
markOop mark = obj->mark();那,我們看看obj的mark方法就行。(不知道為啥,在Handle類里,沒找到這個方法,不知道為啥,難道是有什么特殊語法嗎。。。),不過這個mark的意思,肯定就是對象里的對象頭無誤。
然后,第1個參數呢,就是lock,就是那個,如果上面的第二、三個參數相等,就將本參數,即,本線程,棧內對象lock的地址,設置到對象頭中,表示,該對象已經被本線程加鎖了。
-
4處,這里表示如果是當前線程重復進入:
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; } -
5處,開始膨脹為重量級鎖,並進入重量級鎖的爭奪
// --接前面的代碼 lock->set_displaced_header(markOopDesc::unused_mark()); ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);這里,會先通過調用ObjectSynchronizer::inflate(THREAD, obj()),來完成輕量級鎖到重量級鎖的升級。
// Inflate light weight monitor to heavy weight monitor static ObjectMonitor* inflate(Thread * Self, oop obj);這個注釋就很清晰,升級輕鎖為重鎖,並且,會返回對象的monitor,即對應的重鎖對象。
膨脹的過程,太復雜,看不懂(心累。。),有興趣的可以看看這篇。
https://github.com/farmerjohngit/myblog/issues/15
膨脹后,返回了對應的monitor,然后進入其enter方法。
然后enter也是茫茫多的代碼,根據網上博客,即:
JVM:鎖實現(synchronized&JSR166)行為分析和相關源碼
會進入以下方法:
ObjectMonitor::EnterI這個里面,也是茫茫多的代碼,而且更可怕的是,注釋也多得很,快比代碼多了。。
void ATTR ObjectMonitor::EnterI (TRAPS) { Thread * Self = THREAD ; assert (Self->is_Java_thread(), "invariant") ; assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ; // 1 Try the lock - TATAS if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } DeferredInitialize () ; //2 We try one round of spinning *before* enqueueing Self. if (TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } //3 The Spin failed -- Enqueue and park the thread ... //4 Enqueue "Self" on ObjectMonitor's _cxq ObjectWaiter node(Self) ; Self->_ParkEvent->reset() ; node._prev = (ObjectWaiter *) 0xBAD ; node.TState = ObjectWaiter::TS_CXQ ; // 5 Push "Self" onto the front of the _cxq. // Once on cxq/EntryList, Self stays on-queue until it acquires the lock. // Note that spinning tends to reduce the rate at which threads // enqueue and dequeue on EntryList|cxq. ObjectWaiter * nxt ; for (;;) { node._next = nxt = _cxq ; if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ; // Interference - the CAS failed because _cxq changed. Just retry. // As an optional optimization we retry the lock. if (TryLock (Self) > 0) { assert (_succ != Self , "invariant") ; assert (_owner == Self , "invariant") ; assert (_Responsible != Self , "invariant") ; return ; } } TEVENT (Inflated enter - Contention) ; int nWakeups = 0 ; int RecheckInterval = 1 ; for (;;) { if (TryLock (Self) > 0) break ; assert (_owner != Self, "invariant") ; if ((SyncFlags & 2) && _Responsible == NULL) { Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ; } //6 park self if (_Responsible == Self || (SyncFlags & 1)) { TEVENT (Inflated enter - park TIMED) ; Self->_ParkEvent->park ((jlong) RecheckInterval) ; // Increase the RecheckInterval, but clamp the value. RecheckInterval *= 8 ; if (RecheckInterval > 1000) RecheckInterval = 1000 ; } else { TEVENT (Inflated enter - park UNTIMED) ; // 7 Self->_ParkEvent->park() ; } if (TryLock(Self) > 0) break ; ... } ... return ; }- 上面的方法不是完整的,因為太長,刪減了,只留了我們要關注的那部分,1處,嘗試獲取鎖
- 2處,嘗試自旋一下
- 3處,自旋也失敗了,准備進入隊列,並阻塞自己,類似於aqs的實現
- 4、5處都是入隊的相關操作
- 6處,阻塞自己,判斷是阻塞一陣時間,還是一直阻塞。
- 7處,阻塞自己。
我們這里,重點看6處,阻塞自己,采用的方法為:
// self的定義,類型為線程 Thread * Self = THREAD ; ... Self->_ParkEvent->park() ;我們看看這個類:
thread.hpp public: volatile intptr_t _Stalled ; volatile int _TypeTag ; // 1 ParkEvent * _ParkEvent ; // for synchronized() // 2 ParkEvent * _SleepEvent ; // for Thread.sleep // 3 ParkEvent * _MutexEvent ; // for native internal Mutex/Monitor // 4 ParkEvent * _MuxEvent ; // for low-level muxAcquire-muxRelease有點意思,竟然有好幾個ParkEvent類型的屬性,第一個,看注釋,就是用來,synchronized使用的;
第二個是Thread.sleep使用的,第三個是jdk自身的native方法用的
ParkEvent是什么
JVM:鎖實現(synchronized&JSR166)行為分析和相關源碼
大家可以再看下這篇,因為感覺寫得不錯。
這個類本身的屬性,看得一知半解,但是它的父類,是這個。
class ParkEvent : public os::PlatformEvent
這個PlatformEvent有意思的很,它是平台相關的。

可以看到,它有5個同名的類,分別在5個文件,分別是什么os_windows.hpp、os_linux.hpp、os_solaris.hpp,盲猜也知道,是不同操作系統下的實現。
我們看看linux下,
class PlatformEvent : public CHeapObj<mtInternal> {
private:
double CachePad [4] ; // increase odds that _mutex is sole occupant of cache line
volatile int _Event ;
volatile int _nParked ;
// 1
pthread_mutex_t _mutex [1] ;
pthread_cond_t _cond [1] ;
double PostPad [2] ;
Thread * _Assoc ;
public: // TODO-FIXME: make dtor private
~PlatformEvent() { guarantee (0, "invariant") ; }
public:
PlatformEvent() {
int status;
status = pthread_cond_init (_cond, os::Linux::condAttr());
assert_status(status == 0, status, "cond_init");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_Event = 0 ;
_nParked = 0 ;
_Assoc = NULL ;
}
// Use caution with reset() and fired() -- they may require MEMBARs
void reset() { _Event = 0 ; }
int fired() { return _Event; }
void park () ;
void unpark () ;
int TryPark () ;
int park (jlong millis) ; // relative timed-wait only
void SetAssociation (Thread * a) { _Assoc = a ; }
} ;
看到1處了嗎,原來,阻塞自己還是用了pthread_mutex_t啊。
看看park怎么實現的:
void os::PlatformEvent::park() { // AKA "down()"
// Invariant: Only the thread associated with the Event/PlatformEvent
// may call park().
// TODO: assert that _Assoc != NULL or _Assoc == Self
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}
guarantee (v >= 0, "invariant") ;
if (v == 0) {
//1 Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
_Event = 0 ;
// 2
status = pthread_mutex_unlock(_mutex);
...
}
guarantee (_Event >= 0, "invariant") ;
}
1處,加鎖;
2處,解鎖。
所以,我們本文的答案找到了。
看看其他平台下呢?

其他平台就不一一截圖了,除了windows,都是用的pthread_mutex_lock。
總結
為了這個答案,花了一天時間,值得嗎,有點不值得,時間花太長了,不過也值得,至少問題解決了。
不過,沒把調試環境搭起來太慘了,各種頭文件找不到,跳轉都點不動,基本上都是全文搜索。。。
謝謝大家。
