前兩天逛博客的時候看到有個人寫了一篇博客說ReentrantLock比synchronized慢,這就很違反我的認知了,詳細看了他的博客和測試代碼,發現了他測試的不嚴謹,並在評論中友好地指出了他的問題,結果他直接把博客給刪了 刪了 了……
很多老一輩的程序猿對有synchronized有個 性能差 的刻板印象,然后極力推崇使用java.util.concurrent包中的lock類,如果你追問他們synchronized和lock實現性能差多少,估計沒幾個人能答出來。 說到這你是不是也很想知道我的測試結果? synchronized與ReentrantLock所實現的功能差不多,用途也大幅度重合,索性我們就來測測這二者的性能差異。
實測結果
測試平台:jdk11, MacBook Pro (13-inch, 2017) , jmh測試
測試代碼如下:
public class LockTest {
private static Object lock = new Object();
private static ReentrantLock reentrantLock = new ReentrantLock();
private static long cnt = 0;
@Benchmark
@Measurement(iterations = 2)
@Threads(10)
@Fork(0)
@Warmup(iterations = 5, time = 10)
public void testWithoutLock(){
doSomething();
}
@Benchmark
@Measurement(iterations = 2)
@Threads(10)
@Fork(0)
@Warmup(iterations = 5, time = 10)
public void testReentrantLock(){
reentrantLock.lock();
doSomething();
reentrantLock.unlock();
}
@Benchmark
@Measurement(iterations = 2)
@Threads(10)
@Fork(0)
@Warmup(iterations = 5, time = 10)
public void testSynchronized(){
synchronized (lock) {
doSomething();
}
}
private void doSomething() {
cnt += 1;
if (cnt >= (Long.MAX_VALUE >> 1)) {
cnt = 0;
}
}
public static void main(String[] args) {
Options options = new OptionsBuilder().include(LockTest.class.getSimpleName()).build();
try {
new Runner(options).run();
} catch (Exception e) {
} finally {
}
}
}
Benchmark Mode Cnt Score Error Units
LockTest.testReentrantLock thrpt 2 32283819.289 ops/s
LockTest.testSynchronized thrpt 2 25325244.320 ops/s
LockTest.testWithoutLock thrpt 2 641215542.492 ops/s
沒錯synchronized性能確實更差,但就只差20%左右,第一次測試的時候我也挺詫異的,知道synchronized會差,但那種預期中幾個數量級的差異卻沒有出現。 於是我又把@Threads線程數調大了,增加了多線程之間競爭的可能性,得到了如下的結果。
Benchmark Mode Cnt Score Error Units
LockTest.testReentrantLock thrpt 2 29464798.051 ops/s
LockTest.testSynchronized thrpt 2 22346035.066 ops/s
LockTest.testWithoutLock thrpt 2 383047064.795 ops/s
性能差異稍有拉開,但還是在同一量級上。
結論
無可置疑,synchronized的性能確實要比synchronized差個20%-30%,那是不是代碼中所有用到synchronized的地方都應該換成lock? 非也,仔細想想看,ReentrantLock幾乎和可以替代任何使用synchronized的場景,而且性能更好,那為什么jdk一直要留着這個關鍵詞呢?而且完全沒有任何想要廢棄它的想法。
黑格爾說過存在即合理, synchronized因多線程應運而生,它的存在也大幅度簡化了Java多線程的開發。沒錯,它的優勢就是使用簡單,你不需要顯示去加減鎖,相比之下ReentrantLock的使用就繁瑣的多了,你加完鎖之后還得考慮到各種情況下的鎖釋放,稍不留神就一個bug埋下了。
但ReentrantLock的繁瑣之下,它也提供了更復雜的api,足以應對更多更復雜的需求,詳細可以參考我之前的博客ReentrantLock源碼解析。
如今synchronized與ReentrantLock二者的性能差異不再是選誰的主要因素,你在做選擇的時候更應該考慮的是其易用性、功能性和代碼的可維護性…… 二者30%的性能差異決定不了什么,如果你真想優化代碼的性能,你應該選擇的是其他的切入點,而不是斤斤計較這個,切記不要揀了芝麻丟了西瓜。
文章本該到這里就結束了,但我仍然好奇為什么synchronized給老一輩java程序猿留下了性能差的印象,無奈jdk1.5及之前的資料已經比較久遠 不太好找,但是jdk1.6對synchronized的性能提升做了啥還是很好找的。
jdk對synchronized優化了啥?
如果你對代碼段加了synchronized的,jvm編譯后就會在其前后分別插入monitorenter和monitorexit指令,如下:
void onlyMe(Foo f) {
synchronized(f) {
doSomething();
}
}
編譯后:
Method void onlyMe(Foo)
0 aload_1 // Push f
1 dup // Duplicate it on the stack
2 astore_2 // Store duplicate in local variable 2
3 monitorenter // Enter the monitor associated with f
4 aload_0 // Holding the monitor, pass this and...
5 invokevirtual #5 // ...call Example.doSomething()V
8 aload_2 // Push local variable 2 (f)
9 monitorexit // Exit the monitor associated with f
10 goto 18 // Complete the method normally
13 astore_3 // In case of any throw, end up here
14 aload_2 // Push local variable 2 (f)
15 monitorexit // Be sure to exit the monitor!
16 aload_3 // Push thrown value...
17 athrow // ...and rethrow value to the invoker
18 return // Return in the normal case
Exception table:
From To Target Type
4 10 13 any
13 16 13 any
加鎖和釋放鎖的性能消耗其實就體現在了 monitorenter和monitorexit兩個指令上了,如果是優化性能,肯定也是在這兩個指令上優化了。 查閱《Java並發編程的藝術》發現,Java6為了減少鎖獲取和釋放帶來的性能消耗,引入了鎖分級的策略。 將鎖狀態分別分成 無鎖、偏向鎖、輕量級鎖、重量級鎖 四個狀態,其性能依次遞減。但所幸因為局部性的存在,大多數並發情況下偏向鎖或者輕量級鎖就能滿足我們的需求,而且鎖只有在競爭嚴重的情況下才會升級,所以大多數情況下synchronized性能也不會太差。
最后我在jdk11u的源碼里找到了monitorenter和monitorexit的x86版本的實現(匯編指令和具體平台相關)獻給大家,歡迎有志之士研讀下。
//-----------------------------------------------------------------------------
// Synchronization
//
// Note: monitorenter & exit are symmetric routines; which is reflected
// in the assembly code structure as well
//
// Stack layout:
//
// [expressions ] <--- rsp = expression stack top
// ..
// [expressions ]
// [monitor entry] <--- monitor block top = expression stack bot
// ..
// [monitor entry]
// [frame data ] <--- monitor block bot
// ...
// [saved rbp ] <--- rbp
void TemplateTable::monitorenter() {
transition(atos, vtos);
// check for NULL object
__ null_check(rax);
const Address monitor_block_top(
rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize);
const Address monitor_block_bot(
rbp, frame::interpreter_frame_initial_sp_offset * wordSize);
const int entry_size = frame::interpreter_frame_monitor_size() * wordSize;
Label allocated;
Register rtop = LP64_ONLY(c_rarg3) NOT_LP64(rcx);
Register rbot = LP64_ONLY(c_rarg2) NOT_LP64(rbx);
Register rmon = LP64_ONLY(c_rarg1) NOT_LP64(rdx);
// initialize entry pointer
__ xorl(rmon, rmon); // points to free slot or NULL
// find a free slot in the monitor block (result in rmon)
{
Label entry, loop, exit;
__ movptr(rtop, monitor_block_top); // points to current entry,
// starting with top-most entry
__ lea(rbot, monitor_block_bot); // points to word before bottom
// of monitor block
__ jmpb(entry);
__ bind(loop);
// check if current entry is used
__ cmpptr(Address(rtop, BasicObjectLock::obj_offset_in_bytes()), (int32_t) NULL_WORD);
// if not used then remember entry in rmon
__ cmovptr(Assembler::equal, rmon, rtop); // cmov => cmovptr
// check if current entry is for same object
__ cmpptr(rax, Address(rtop, BasicObjectLock::obj_offset_in_bytes()));
// if same object then stop searching
__ jccb(Assembler::equal, exit);
// otherwise advance to next entry
__ addptr(rtop, entry_size);
__ bind(entry);
// check if bottom reached
__ cmpptr(rtop, rbot);
// if not at bottom then check this entry
__ jcc(Assembler::notEqual, loop);
__ bind(exit);
}
__ testptr(rmon, rmon); // check if a slot has been found
__ jcc(Assembler::notZero, allocated); // if found, continue with that one
// allocate one if there's no free slot
{
Label entry, loop;
// 1. compute new pointers // rsp: old expression stack top
__ movptr(rmon, monitor_block_bot); // rmon: old expression stack bottom
__ subptr(rsp, entry_size); // move expression stack top
__ subptr(rmon, entry_size); // move expression stack bottom
__ mov(rtop, rsp); // set start value for copy loop
__ movptr(monitor_block_bot, rmon); // set new monitor block bottom
__ jmp(entry);
// 2. move expression stack contents
__ bind(loop);
__ movptr(rbot, Address(rtop, entry_size)); // load expression stack
// word from old location
__ movptr(Address(rtop, 0), rbot); // and store it at new location
__ addptr(rtop, wordSize); // advance to next word
__ bind(entry);
__ cmpptr(rtop, rmon); // check if bottom reached
__ jcc(Assembler::notEqual, loop); // if not at bottom then
// copy next word
}
// call run-time routine
// rmon: points to monitor entry
__ bind(allocated);
// Increment bcp to point to the next bytecode, so exception
// handling for async. exceptions work correctly.
// The object has already been poped from the stack, so the
// expression stack looks correct.
__ increment(rbcp);
// store object
__ movptr(Address(rmon, BasicObjectLock::obj_offset_in_bytes()), rax);
__ lock_object(rmon);
// check to make sure this monitor doesn't cause stack overflow after locking
__ save_bcp(); // in case of exception
__ generate_stack_overflow_check(0);
// The bcp has already been incremented. Just need to dispatch to
// next instruction.
__ dispatch_next(vtos);
}
void TemplateTable::monitorexit() {
transition(atos, vtos);
// check for NULL object
__ null_check(rax);
const Address monitor_block_top(
rbp, frame::interpreter_frame_monitor_block_top_offset * wordSize);
const Address monitor_block_bot(
rbp, frame::interpreter_frame_initial_sp_offset * wordSize);
const int entry_size = frame::interpreter_frame_monitor_size() * wordSize;
Register rtop = LP64_ONLY(c_rarg1) NOT_LP64(rdx);
Register rbot = LP64_ONLY(c_rarg2) NOT_LP64(rbx);
Label found;
// find matching slot
{
Label entry, loop;
__ movptr(rtop, monitor_block_top); // points to current entry,
// starting with top-most entry
__ lea(rbot, monitor_block_bot); // points to word before bottom
// of monitor block
__ jmpb(entry);
__ bind(loop);
// check if current entry is for same object
__ cmpptr(rax, Address(rtop, BasicObjectLock::obj_offset_in_bytes()));
// if same object then stop searching
__ jcc(Assembler::equal, found);
// otherwise advance to next entry
__ addptr(rtop, entry_size);
__ bind(entry);
// check if bottom reached
__ cmpptr(rtop, rbot);
// if not at bottom then check this entry
__ jcc(Assembler::notEqual, loop);
}
參考資料
- Java Virtual Machine Specification 3.14. Synchronization
- 《Java並發編程的藝術》 2.2 synchronized的實現原理和應用