在分布式開發中,鎖是線程控制的重要途徑。Java為此也提供了2種鎖機制,synchronized和lock。做為Java愛好者,自然少不了對比一下這2種機制,也能從中學到些分布式開發需要注意的地方。
我們先從最簡單的入手,逐步分析這2種的區別。
一、synchronized和lock的用法區別
synchronized:在需要同步的對象中加入此控制,synchronized可以加在方法上,也可以加在特定代碼塊中,括號中表示需要鎖的對象。
lock:需要顯示指定起始位置和終止位置。一般使用ReentrantLock類做為鎖,多個線程中必須要使用一個ReentrantLock類做為對象才能保證鎖的生效。且在加鎖和解鎖處需要通過lock()和unlock()顯示指出。所以一般會在finally塊中寫unlock()以防死鎖。
用法區別比較簡單,這里不贅述了,如果不懂的可以看看Java基本語法。
二、synchronized和lock性能區別
synchronized是托管給JVM執行的,而lock是java寫的控制鎖的代碼。在Java1.5中,synchronize是性能低效的。因為這是一個重量級操作,需要調用操作接口,導致有可能加鎖消耗的系統時間比加鎖以外的操作還多。相比之下使用Java提供的Lock對象,性能更高一些。但是到了Java1.6,發生了變化。synchronize在語義上很清晰,可以進行很多優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在Java1.6上synchronize的性能並不比Lock差。官方也表示,他們也更支持synchronize,在未來的版本中還有優化余地。
說到這里,還是想提一下這2中機制的具體區別。據我所知,synchronized原始采用的是CPU悲觀鎖機制,即線程獲得的是獨占鎖。獨占鎖意味着其他線程只能依靠阻塞來等待線程釋放鎖。而在CPU轉換線程阻塞時會引起線程上下文切換,當有很多線程競爭鎖的時候,會引起CPU頻繁的上下文切換導致效率很低。
而Lock用的是樂觀鎖方式。所謂樂觀鎖就是,每次不加鎖而是假設沒有沖突而去完成某項操作,如果因為沖突失敗就重試,直到成功為止。樂觀鎖實現的機制就是CAS操作(Compare and Swap)。我們可以進一步研究ReentrantLock的源代碼,會發現其中比較重要的獲得鎖的一個方法是compareAndSetState。這里其實就是調用的CPU提供的特殊指令。
現代的CPU提供了指令,可以自動更新共享數據,而且能夠檢測到其他線程的干擾,而 compareAndSet() 就用這些代替了鎖定。這個算法稱作非阻塞算法,意思是一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。
我也只是了解到這一步,具體到CPU的算法如果感興趣的讀者還可以在查閱下,如果有更好的解釋也可以給我留言,我也學習下。
三、synchronized和lock用途區別
synchronized原語和ReentrantLock在一般情況下沒有什么區別,但是在非常復雜的同步應用中,請考慮使用ReentrantLock,特別是遇到下面2種需求的時候。
1.某個線程在等待一個鎖的控制權的這段時間需要中斷
2.需要分開處理一些wait-notify,ReentrantLock里面的Condition應用,能夠控制notify哪個線程
3.具有公平鎖功能,每個到來的線程都將排隊等候
下面細細道來……
先說第一種情況,ReentrantLock的lock機制有2種,忽略中斷鎖和響應中斷鎖,這給我們帶來了很大的靈活性。比如:如果A、B2個線程去競爭鎖,A線程得到了鎖,B線程等待,但是A線程這個時候實在有太多事情要處理,就是一直不返回,B線程可能就會等不及了,想中斷自己,不再等待這個鎖了,轉而處理其他事情。這個時候ReentrantLock就提供了2種機制,第一,B線程中斷自己(或者別的線程中斷它),但是ReentrantLock不去響應,繼續讓B線程等待,你再怎么中斷,我全當耳邊風(synchronized原語就是如此);第二,B線程中斷自己(或者別的線程中斷它),ReentrantLock處理了這個中斷,並且不再等待這個鎖的到來,完全放棄。(如果你沒有了解java的中斷機制,請參考下相關資料,再回頭看這篇文章,80%的人根本沒有真正理解什么是java的中斷,呵呵)
這里來做個試驗,首先搞一個Buffer類,它有讀操作和寫操作,為了不讀到臟數據,寫和讀都需要加鎖,我們先用synchronized原語來加鎖,如下:
1 |
public class Buffer { |
2 |
3 |
private Object lock; |
4 |
5 |
public Buffer() { |
6 |
lock = this ; |
7 |
} |
8 |
9 |
public void write() { |
10 |
synchronized (lock) { |
11 |
long startTime = System.currentTimeMillis(); |
12 |
System.out.println( "開始往這個buff寫入數據…" ); |
13 |
for (;;) // 模擬要處理很長時間 |
14 |
{ |
15 |
if (System.currentTimeMillis() |
16 |
- startTime > Integer.MAX_VALUE) |
17 |
break ; |
18 |
} |
19 |
System.out.println( "終於寫完了" ); |
20 |
} |
21 |
} |
22 |
23 |
public void read() { |
24 |
synchronized (lock) { |
25 |
System.out.println( "從這個buff讀數據" ); |
26 |
} |
27 |
} |
28 |
} |
接着,我們來定義2個線程,一個線程去寫,一個線程去讀。
1 |
public class Writer extends Thread { |
2 |
3 |
private Buffer buff; |
4 |
5 |
public Writer(Buffer buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
buff.write(); |
12 |
} |
13 |
14 |
} |
15 |
16 |
public class Reader extends Thread { |
17 |
18 |
private Buffer buff; |
19 |
20 |
public Reader(Buffer buff) { |
21 |
this .buff = buff; |
22 |
} |
23 |
24 |
@Override |
25 |
public void run() { |
26 |
27 |
buff.read(); //這里估計會一直阻塞 |
28 |
29 |
System.out.println( "讀結束" ); |
30 |
31 |
} |
32 |
33 |
} |
好了,寫一個Main來試驗下,我們有意先去“寫”,然后讓“讀”等待,“寫”的時間是無窮的,就看“讀”能不能放棄了。
1 |
public class Test { |
2 |
public static void main(String[] args) { |
3 |
Buffer buff = new Buffer(); |
4 |
5 |
final Writer writer = new Writer(buff); |
6 |
final Reader reader = new Reader(buff); |
7 |
8 |
writer.start(); |
9 |
reader.start(); |
10 |
11 |
new Thread( new Runnable() { |
12 |
13 |
@Override |
14 |
public void run() { |
15 |
long start = System.currentTimeMillis(); |
16 |
for (;;) { |
17 |
//等5秒鍾去中斷讀 |
18 |
if (System.currentTimeMillis() |
19 |
- start > 5000 ) { |
20 |
System.out.println( "不等了,嘗試中斷" ); |
21 |
reader.interrupt(); |
22 |
break ; |
23 |
} |
24 |
25 |
} |
26 |
27 |
} |
28 |
}).start(); |
29 |
30 |
} |
31 |
} |
我們期待“讀”這個線程能退出等待鎖,可是事與願違,一旦讀這個線程發現自己得不到鎖,就一直開始等待了,就算它等死,也得不到鎖,因為寫線程要21億秒才能完成 T_T ,即使我們中斷它,它都不來響應下,看來真的要等死了。這個時候,ReentrantLock給了一種機制讓我們來響應中斷,讓“讀”能伸能屈,勇敢放棄對這個鎖的等待。我們來改寫Buffer這個類,就叫BufferInterruptibly吧,可中斷緩存。
1 |
import java.util.concurrent.locks.ReentrantLock; |
2 |
3 |
public class BufferInterruptibly { |
4 |
5 |
private ReentrantLock lock = new ReentrantLock(); |
6 |
7 |
public void write() { |
8 |
lock.lock(); |
9 |
try { |
10 |
long startTime = System.currentTimeMillis(); |
11 |
System.out.println( "開始往這個buff寫入數據…" ); |
12 |
for (;;) // 模擬要處理很長時間 |
13 |
{ |
14 |
if (System.currentTimeMillis() |
15 |
- startTime > Integer.MAX_VALUE) |
16 |
break ; |
17 |
} |
18 |
System.out.println( "終於寫完了" ); |
19 |
} finally { |
20 |
lock.unlock(); |
21 |
} |
22 |
} |
23 |
24 |
public void read() throws InterruptedException { |
25 |
lock.lockInterruptibly(); // 注意這里,可以響應中斷 |
26 |
try { |
27 |
System.out.println( "從這個buff讀數據" ); |
28 |
} finally { |
29 |
lock.unlock(); |
30 |
} |
31 |
} |
32 |
33 |
} |
當然,要對reader和writer做響應的修改
1 |
public class Reader extends Thread { |
2 |
3 |
private BufferInterruptibly buff; |
4 |
5 |
public Reader(BufferInterruptibly buff) { |
6 |
this .buff = buff; |
7 |
} |
8 |
9 |
@Override |
10 |
public void run() { |
11 |
12 |
try { |
13 |
buff.read(); //可以收到中斷的異常,從而有效退出 |
14 |
} catch (InterruptedException e) { |
15 |
System.out.println( "我不讀了" ); |
16 |
} |
17 |
18 |
System.out.println( "讀結束" ); |
19 |
20 |
} |
21 |
22 |
} |
23 |
24 |
/** |
25 |
* Writer倒不用怎么改動 |
26 |
*/ |
27 |
public class Writer extends Thread { |
28 |
29 |
private BufferInterruptibly buff; |
30 |
31 |
public Writer(BufferInterruptibly buff) { |
32 |
this .buff = buff; |
33 |
} |
34 |
35 |
@Override |
36 |
public void run() { |
37 |
buff.write(); |
38 |
} |
39 |
40 |
} |
41 |
42 |
public class Test { |
43 |
public static void main(String[] args) { |
44 |
BufferInterruptibly buff = new BufferInterruptibly(); |
45 |
46 |
final Writer writer = new Writer(buff); |
47 |
final Reader reader = new Reader(buff); |
48 |
49 |
writer.start(); |
50 |
reader.start(); |
51 |
52 |
new Thread( new Runnable() { |
53 |
54 |
@Override |
55 |
public void run() { |
56 |
long start = System.currentTimeMillis(); |
57 |
for (;;) { |
58 |
if (System.currentTimeMillis() |
59 |
- start > 5000 ) { |
60 |
System.out.println( "不等了,嘗試中斷" ); |
61 |
reader.interrupt(); |
62 |
break ; |
63 |
} |
64 |
65 |
} |
66 |
67 |
} |
68 |
}).start(); |
69 |
70 |
} |
71 |
} |
這次“讀”線程接收到了lock.lockInterruptibly()中斷,並且有效處理了這個“異常”。
至於第二種情況,ReentrantLock可以與Condition的配合使用,Condition為ReentrantLock鎖的等待和釋放提供控制邏輯。
例如,使用ReentrantLock加鎖之后,可以通過它自身的Condition.await()方法釋放該鎖,線程在此等待Condition.signal()方法,然后繼續執行下去。await方法需要放在while循環中,因此,在不同線程之間實現並發控制,還需要一個volatile的變量,boolean是原子性的變量。因此,一般的並發控制的操作邏輯如下所示:
1 |
volatile boolean isProcess = false ; |
2 |
ReentrantLock lock = new ReentrantLock(); |
3 |
Condtion processReady = lock.newCondtion(); |
4 |
thread: run() { |
5 |
lock.lock(); |
6 |
isProcess = true ; |
7 |
try { |
8 |
while (!isProcessReady) { //isProcessReady 是另外一個線程的控制變量 |
9 |
processReady.await(); //釋放了lock,在此等待signal |
10 |
} catch (InterruptedException e) { |
11 |
Thread.currentThread().interrupt(); |
12 |
} finally { |
13 |
lock.unlock(); |
14 |
isProcess = false ; |
15 |
} |
16 |
} |
17 |
} |
18 |
} |
這里只是代碼使用的一段簡化,下面我們看Hadoop的一段摘取的源碼:
1 |
private class MapOutputBuffer<K extends Object, V extends Object> |
2 |
implements MapOutputCollector<K, V>, IndexedSortable { |
3 |
... |
4 |
boolean spillInProgress; |
5 |
final ReentrantLock spillLock = new ReentrantLock(); |
6 |
final Condition spillDone = spillLock.newCondition(); |
7 |
final Condition spillReady = spillLock.newCondition(); |
8 |
volatile boolean spillThreadRunning = false ; |
9 |
final SpillThread spillThread = new SpillThread(); |
10 |
... |
11 |
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, |
12 |
TaskReporter reporter |
13 |
) throws IOException, ClassNotFoundException { |
14 |
... |
15 |
spillInProgress = false ; |
16 |
spillThread.setDaemon( true ); |
17 |
spillThread.setName( "SpillThread" ); |
18 |
spillLock.lock(); |
19 |
try { |
20 |
spillThread.start(); |
21 |
while (!spillThreadRunning) { |
22 |
spillDone.await(); |
23 |
} |
24 |
} catch (InterruptedException e) { |
25 |
throw new IOException( "Spill thread failed to initialize" , e); |
26 |
} finally { |
27 |
spillLock.unlock(); |
28 |
} |
29 |
} |
30 |
31 |
protected class SpillThread extends Thread { |
32 |
33 |
@Override |
34 |
public void run() { |
35 |
spillLock.lock(); |
36 |
spillThreadRunning = true ; |
37 |
try { |
38 |
while ( true ) { |
39 |
spillDone.signal(); |
40 |
while (!spillInProgress) { |
41 |
spillReady.await(); |
42 |
} |
43 |
try { |
44 |
spillLock.unlock(); |
45 |
sortAndSpill(); |
46 |
} catch (Throwable t) { |
47 |
sortSpillException = t; |
48 |
} finally { |
49 |
spillLock.lock(); |
50 |
if (bufend < bufstart) { |
51 |
bufvoid = kvbuffer.length; |
52 |
} |
53 |
kvstart = kvend; |
54 |
bufstart = bufend; |
55 |
spillInProgress = false ; |
56 |
} |
57 |
} |
58 |
} catch (InterruptedException e) { |
59 |
Thread.currentThread().interrupt(); |
60 |
} finally { |
61 |
spillLock.unlock(); |
62 |
spillThreadRunning = false ; |
63 |
} |
64 |
} |
65 |
} |
代碼中spillDone 就是 spillLock的一個newCondition()。調用spillDone.await()時可以釋放spillLock鎖,線程進入阻塞狀態,而等待其他線程的 spillDone.signal()操作時,就會喚醒線程,重新持有spillLock鎖。
這里可以看出,利用lock可以使我們多線程交互變得方便,而使用synchronized則無法做到這點。
最后呢,ReentrantLock這個類還提供了2種競爭鎖的機制:公平鎖和非公平鎖。這2種機制的意思從字面上也能了解個大概:即對於多線程來說,公平鎖會依賴線程進來的順序,后進來的線程后獲得鎖。而非公平鎖的意思就是后進來的鎖也可以和前邊等待鎖的線程同時競爭鎖資源。對於效率來講,當然是非公平鎖效率更高,因為公平鎖還要判斷是不是線程隊列的第一個才會讓線程獲得鎖。