這里的線程安全,是指一個讀線程和一個寫線程,讀寫兩個線程是安全的,而不是說多個讀線程和多個寫線程是安全的。。
在程序設計中,我們有時會遇到這樣的情況,一個線程將數據寫到一個buffer中,另外一個線程從中讀數據。所以這里就有多線程競爭的問題。通常的解決辦法是對競爭資源加鎖。但是,一般加鎖的損耗較高。其實,對於這樣的一個線程寫,一個線程讀的特殊情況,可以以一種簡單的無鎖RingBuffer來實現。這樣代碼的運行效率很高。
本文借鑒了Disruptor項目代碼。
代碼我在github上放了一份,需要的同學可以去下載(RingBuffer.java)。本文最后也會附上一份。
代碼的基本原理如下。
如圖所示,假定buffer的長度是bufferSize. 我們設置兩個指針。head指向的是下一次讀的位置,而tail指向的是下一次寫的位置。由於這里是環形buffer (ring buffer),這里就有一個問題,怎樣判斷buffer是滿或者空。這里采用的規則是,buffer的最后一個單元不存儲數據。所以,如果head == tail,那么說明buffer為空。如果 head == tail + 1 (mod bufferSize),那么說明buffer滿了。
接下來就是最重要的內容了:怎樣以無鎖的方式進行線程安全的buffer的讀寫操作。基本原理是這樣的。在進行讀操作的時候,我們只修改head的值,而在寫操作的時候我們只修改tail的值。在寫操作時,我們在寫入內容到buffer之后才修改tail的值;而在進行讀操作的時候,我們會讀取tail的值並將其賦值給copyTail。賦值操作是原子操作。所以在讀到copyTail之后,從head到copyTail之間一定是有數據可以讀的,不會出現數據沒有寫入就進行讀操作的情況。同樣的,讀操作完成之后,才會修改head的數值;而在寫操作之前會讀取head的值判斷是否有空間可以用來寫數據。所以,這時候tail到head - 1之間一定是有空間可以寫數據的,而不會出現一個位置的數據還沒有讀出就被寫操作覆蓋的情況。這樣就保證了RingBuffer的線程安全性。
最后附上代碼供參考。歡迎批評指正,也歡迎各種討論!
1 public class RingBuffer { 2 3 private final static int bufferSize = 1024; 4 private String[] buffer = new String[bufferSize]; 5 private int head = 0; 6 private int tail = 0; 7 8 private Boolean empty() { 9 return head == tail; 10 } 11 private Boolean full() { 12 return (tail + 1) % bufferSize == head; 13 } 14 public Boolean put(String v) { 15 if (full()) { 16 return false; 17 } 18 buffer[tail] = v; 19 tail = (tail + 1) % bufferSize; 20 return true; 21 } 22 public String get() { 23 if (empty()) { 24 return null; 25 } 26 String result = buffer[head]; 27 head = (head + 1) % bufferSize; 28 return result; 29 } 30 public String[] getAll() { 31 if (empty()) { 32 return new String[0]; 33 } 34 int copyTail = tail; 35 int cnt = head < copyTail ? copyTail - head : bufferSize - head + copyTail; 36 String[] result = new String[cnt]; 37 if (head < copyTail) { 38 for (int i = head; i < copyTail; i++) { 39 result[i - head] = buffer[i]; 40 } 41 } else { 42 for (int i = head; i < bufferSize; i++) { 43 result[i - head] = buffer[i]; 44 } 45 for (int i = 0; i < copyTail; i++) { 46 result[bufferSize - head + i] = buffer[i]; 47 } 48 } 49 head = copyTail; 50 return result; 51 } 52 }