簡單用法
下面以一個簡單的例子來看看Disruptor的用法:生產者發送一個long型的消息,消費者接收消息並打印出來。
首先,我們定義一個Event:
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
為了使Disruptor對這些Event提前分配,我們需要創建一個EventFactory:
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
事件已經定義好了,我們需要創建一個消費者來處理這些消息。我們需要消費者在終端打印接收到的消息的值:
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
我們需要創建一個事件源,我們假設數據來是來自一些I/O設備,比如網絡或文件。
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
顯而易見的是,事件發布比使用簡單隊列更為復雜,這是事件預分配的緣故,如果2個生產者發布消息,即在RingBuffer中聲明插槽發布可用數據,而且需要在try/finally塊中發布。如果我們在RingBuffer中申請了一個插槽(RingBuffer.next()),那么我們必須發布這個Sequence,如果沒有發布或者發布失敗,那么Disruptor的將會failed,具體點來講,在多生產者的情況下,這將導致消費者失速,而且除了重啟沒有其他辦法可以解決了。
使用version3.0的Translator
Disruptor的version3.0給開發者提供了Lambda表達式風格的API,將RingBuffer的復雜性封裝起來。所以,對於3.0以后的首選方法是通過API中發布事件的Translator部分來發布事件,例如:
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
這種方法的另一個優點是可以將Translator代碼拖到單獨的類中,並方便對其進行單元測試。Disruptor提供了很多不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg等等)可以提供Translator。原因是可以表示為靜態類或者非捕獲的lambda作為參數通過Translator傳遞給RingBuffer。
最后一步是將所有的東西串聯起來,可以手動的連接所有的組件,但是可能會有點復雜,因此可以通過DSL來簡化構建,一些復雜的選項不是通過DSL提供的,但是可以適用於大多數情況。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
注意不再需要一些類(例如處理程序、翻譯程序),還需要注意lambda用於publishEvent()
指的是傳入的參數,如果我們把代碼寫成:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
這將創建一個可捕獲的lambda,這意味需要通過publishEvent()
實例化一個對象以保存ByteBuffer bb
,這將創建額外的垃圾,為了降低GC壓力,則將調用傳遞給lambda的調用應該是首選。
方法的引用可以用lambda來代替,fashion的寫法:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain
{
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor);
// Connect the handler
disruptor.handleEventsWith(LongEventMain::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMain::translate, bb);
Thread.sleep(1000);
}
}
}
基本變種
使用上述方法在最常用的場景中已經夠用了,但是如果你希望追求極致,還能夠針對需要運行的硬件和軟件,利用一些優化選項來提高性能。調優主要有兩種方法:單或多生產者和可選等待策略。
單或多生產者
一個在concurrent系統提高性能的最好方式是單個Write的原則,這同樣也適用於Disruptor,如果你在這種只有一個單線程的生產者發送Event的的Disruptor中,那么你能利用這個來獲得額外的性能。
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), executor);
//.....
}
}
能有多少性能優勢可以通過 OneToOne performance test測試,Tests運行在i7 Sandy Bridge MacBook Air。
多生產者:
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
單生產者:
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
選擇等待策略
默認的Disruptor使用的等待策略是BlockingWaitStrategy(阻塞等待策略),阻塞等待策略內部使用的是典型的鎖和Condition條件變量來處理線程喚醒,這是最慢的等待策略了,但是在CPU使用率上最保守而且能給予肯定的一致性行為。
休眠等待策略(SleepingWaitStrategy)
和BlockingWaitStrategy一樣,為了保證CPU的使用率,不是通過一個簡單的忙等待循環,而是使用一個叫LockSupport.parknanos(1)
在循環中,在典型的Linux系統中將暫停60µs,這樣顯然是有優勢的,生產者線程不需要增加計數器,也不需要信號條件。但是在生產者和消費者之間移動事件的平均延遲事件會更高。休眠等待策略在不需要低延遲的情況下效果最好,但是對生成線程的影響是很小的,一個常見的用例是異步日志記錄。
退出等待策略(YieldingWaitStrategy)
退出等待策略是兩個等待策略中可以被用到低延遲的策略,消耗CPU來提高實時性。YieldingWaitStrategy會忙等待Sequence增加為適當的值。在循環體中Thread.yield()
將會允許其他線程運行,當需要非常高的性能和事件處理線程的的數量小於邏輯核心的總數時,這是推薦的等待策略,啟用了超線程。
自旋等待策略(BusySpinWaitStrategy)
自旋等待策略是常見的等待策略,但是對部署環境也有很高的要求。自旋等待策略應該只能被用在處理線程數小於實際核數的時候。另外,超線程應該被關閉。
從RingBuffer清除對象
當通過Disruptor傳遞數據的時候,對象的存活壽命可能比預期的要長,為了避免這種情況發生,可能需要在處理完事件以后清除它。如果只有一個單事件處理程序,那么在一個處理程序中清除data就夠了。如果有一個事件處理鏈,那么需要在鏈結束的地方利用特殊的處理程序來清除對象。
class ObjectEvent<T>
{
T val;
void clear()
{
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, executor);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}
參考資料: