這里的線程安全,是指一個讀線程和一個寫線程,讀寫兩個線程是安全的,而不是說多個讀線程和多個寫線程是安全的。。
在程序設計中,我們有時會遇到這樣的情況,一個線程將數據寫到一個buffer中,另外一個線程從中讀數據。所以這里就有多線程競爭的問題。通常的解決辦法是對競爭資源加鎖。但是,一般加鎖的損耗較高。其實,對於這樣的一個線程寫,一個線程讀的特殊情況,可以以一種簡單的無鎖RingBuffer來實現。這樣代碼的運行效率很高。
本文借鑒了Disruptor項目代碼。
代碼我在github上放了一份,需要的同學可以去下載(RingBuffer.java)。本文最后也會附上一份。
代碼的基本原理如下。

如圖所示,假定buffer的長度是bufferSize. 我們設置兩個指針。head指向的是下一次讀的位置,而tail指向的是下一次寫的位置。由於這里是環形buffer (ring buffer),這里就有一個問題,怎樣判斷buffer是滿或者空。這里采用的規則是,buffer的最后一個單元不存儲數據。所以,如果head == tail,那么說明buffer為空。如果 head == tail + 1 (mod bufferSize),那么說明buffer滿了。
接下來就是最重要的內容了:怎樣以無鎖的方式進行線程安全的buffer的讀寫操作。基本原理是這樣的。在進行讀操作的時候,我們只修改head的值,而在寫操作的時候我們只修改tail的值。在寫操作時,我們在寫入內容到buffer之后才修改tail的值;而在進行讀操作的時候,我們會讀取tail的值並將其賦值給copyTail。賦值操作是原子操作。所以在讀到copyTail之后,從head到copyTail之間一定是有數據可以讀的,不會出現數據沒有寫入就進行讀操作的情況。同樣的,讀操作完成之后,才會修改head的數值;而在寫操作之前會讀取head的值判斷是否有空間可以用來寫數據。所以,這時候tail到head - 1之間一定是有空間可以寫數據的,而不會出現一個位置的數據還沒有讀出就被寫操作覆蓋的情況。這樣就保證了RingBuffer的線程安全性。
最后附上代碼供參考。歡迎批評指正,也歡迎各種討論!
1 public class RingBuffer {
2
3 private final static int bufferSize = 1024;
4 private String[] buffer = new String[bufferSize];
5 private int head = 0;
6 private int tail = 0;
7
8 private Boolean empty() {
9 return head == tail;
10 }
11 private Boolean full() {
12 return (tail + 1) % bufferSize == head;
13 }
14 public Boolean put(String v) {
15 if (full()) {
16 return false;
17 }
18 buffer[tail] = v;
19 tail = (tail + 1) % bufferSize;
20 return true;
21 }
22 public String get() {
23 if (empty()) {
24 return null;
25 }
26 String result = buffer[head];
27 head = (head + 1) % bufferSize;
28 return result;
29 }
30 public String[] getAll() {
31 if (empty()) {
32 return new String[0];
33 }
34 int copyTail = tail;
35 int cnt = head < copyTail ? copyTail - head : bufferSize - head + copyTail;
36 String[] result = new String[cnt];
37 if (head < copyTail) {
38 for (int i = head; i < copyTail; i++) {
39 result[i - head] = buffer[i];
40 }
41 } else {
42 for (int i = head; i < bufferSize; i++) {
43 result[i - head] = buffer[i];
44 }
45 for (int i = 0; i < copyTail; i++) {
46 result[bufferSize - head + i] = buffer[i];
47 }
48 }
49 head = copyTail;
50 return result;
51 }
52 }

