在之前的一篇博客中,寫了一個在特殊情況下,也就是只有一個讀線程和一個寫線程的情況下,的無鎖隊列的實現。其中甚至都沒有利用特殊的原子加減操作,只是普通的運算。這樣做的原因是,即使是特殊的原子加減操作,也比普通的加減運算復雜度高很多。因此文中的實現方法可以達到很高的運行效率。
但是,有的情況下並不是只有一個讀線程和一個寫線程。越是一般化的實現,支持的情況越多,但是往往損失的性能也越多。作者看到過一個實現(http://www.oschina.net/code/snippet_732357_13465),可以實現一個讀線程,多個寫線程,或者相反,一個寫線程,多個讀線程。這篇文章中作者采用了原子加減的操作。所以這樣的實現的運行效率會稍有點低。那么,如果情況稍特殊一點,比如,有一個線程讀,兩個線程寫,這時可以有一個特殊的實現能夠達到很高的效率嗎?作者折騰了一番,找到了一個方法。
原理如下圖所示。
基本原理是,將整個buffer分成兩份,兩個寫線程分別寫入其中的一部分。這樣就避免了兩個寫線程之間的沖突。而避免讀線程和寫線程之間沖突的原理,則和之前的博客中的原理相同,也就是,寫線程只修改tail的值,而讀線程只修改head的值。這樣,就不會出現數據還沒讀就被覆蓋,或者數據還沒寫就被讀出的情況了。
這樣的實現有一些缺點。一個是空間利用率不夠高,會有浪費,因為有可能一部分寫滿了而另外一部分還空着;其次,是不能保證讀出的順序和寫入的順序是一致的。不過,實際上有兩個線程寫的時候,這點本來就不重要。沒辦法保證那個線程先寫,哪個后寫。最后,在這個實現中,是buffer的兩個部分輪流讀數據。這個策略可以根據兩個寫線程的數據速率進行調整。
但是,這個實現有一個最大的好處,就是速度快。同樣沒有采用原子加減操作,而只是普通的加減操作。因此實現了很高的運行速度。在符合兩個寫線程,一個讀線程,並且對運行速度有很高要求的場合中,這個實現是一個很好的選擇。
最后,附上代碼。代碼同樣可以在github上找到 https://github.com/drneverend/buffers/blob/master/ringbuffer/RingBuffer1r2w.java
1 public class RingBuffer { 2 private final static int bufferSize = 1024; 3 private final static int halfBufferSize = bufferSize / 2; 4 private String[] buffer = new String[bufferSize]; 5 private int head1 = 0; 6 private int tail1 = 0; 7 private int head2 = 0; 8 private int tail2 = 0; 9 private int nextReadBuffer = 0; 10 11 private Boolean empty1() { 12 return head1 == tail1; 13 } 14 private Boolean empty2() { 15 return head2 == tail2; 16 } 17 private Boolean empty() { 18 return empty1() && empty2(); 19 } 20 private Boolean full1() { 21 return (tail1 + 1) % halfBufferSize == head1; 22 } 23 private Boolean full2() { 24 return (tail2 + 1) % halfBufferSize == head2; 25 } 26 public Boolean put1(String v) { 27 if (full1()) { 28 return false; 29 } 30 buffer[tail1] = v; 31 tail1 = (tail1 + 1) % halfBufferSize; 32 return true; 33 } 34 public Boolean put2(String v) { 35 if (full2()) { 36 return false; 37 } 38 buffer[tail2 + halfBufferSize] = v; 39 tail2 = (tail2 + 1) % halfBufferSize; 40 return true; 41 } 42 public String get() { 43 if (empty()) { 44 return null; 45 } 46 String result = null; 47 if (nextReadBuffer == 0 && !empty1() || nextReadBuffer == 1 && empty2()) { 48 result = buffer[head1]; 49 head1 = (head1 + 1) % halfBufferSize; 50 } else { 51 result = buffer[head2 + halfBufferSize]; 52 head2 = (head2 + 1) % halfBufferSize; 53 } 54 55 nextReadBuffer = (nextReadBuffer + 1) % 2; 56 57 return result; 58 } 59 }