本文譯自Dirsruptor在github上的wiki中文章:Getting Started
獲取Disruptor
Disruptor jar包可以從maven倉庫mvnrepository獲取,可以將其集成進項目的依賴管理中。
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
### 編寫事件處理生產者和消費者
為了學習Disruptor的使用,這里以非常簡單的例子入手:生產者生產單個long型value傳遞給消費者。這里簡化消費者邏輯,只打印消費的value。首先定義攜帶數據的Event:
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
為了允許Disruptor能夠為我們預分配這些事件,我們需要一個EventFactory用於構造事件:
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
一旦我們定義了事件,我便再需要創建事件消費者用於消費處理事件。在我們的例子中,我們只需要打印value值到控制台即可:
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
有了事件消費者,我們還需要事件生產者產生事件。為了簡單起見,我們假設數據來源於I/O,如:網絡或者文件。由於不同版本的Disruptor,提供了不同的方式編寫生產者。
隨着3.0版本,Disruptor通過將復雜邏輯囊括在RingBuffer中,從而提供了豐富的Lambda-style API幫助開發者構建Producer。因此從3.0之后,更偏愛使用Event Publisher/Event Translator的API發布消息:
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
這種方式的另一個優勢在於Translator代碼可以被分離在單獨的類中,同時也比較容易進行無依賴的單元測試。Disruptor提供了許多不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.),可以通過實現這些接口提供translators。原因是允許轉換器被表示為靜態類或非捕獲lambda作為轉換方法的參數通過Ring Buffer上的調用傳遞給轉換器。
另一方式使用3.0版本之前的遺留API構建生產者發布消息,這種方式比較原始:
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
從以上的代碼流程編寫可以看出,事件的發布比使用一個簡單的隊列要復雜。這是由於需要對事件預分配導致。對於消息的發布有兩個階段,首先在RingBuffer中聲明需要的槽位,然后再發布可用的數據。必須使用try/finally語句塊包裹消息的發布。必須現在try塊中聲明使用RingBuffer的槽位,然后再finally塊中發布使用的sequece。如果不這樣做,將可能導致Disruptor狀態的錯誤,特別是在多生產者的情況下,如果不重啟Disruptor將不能恢復。因此推薦使用EventTranslator編寫producer。
最后一步需要將以上編寫的組件連接起來。雖然可以手動連接各個組件,然而那樣可能比較復雜,因此提供了一個DSL用於構造以便簡化過程。使用DSL帶來裝配的簡化,但是卻對於很多參數無法做到更細致的控制,然而對於大多數情況,DSL還是非常適合:
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
關於對Disruptor的接口設計的影響之一是Java 8,因為它使用了Functional Interfaces去實現Java Lambdas。在Disruptor API的大多數接口都被定義成Functional Interfaces以便Lambdas可以被使用。以上的LongEventMain可以使用Lambdas進行簡化:
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
可以看出使用Lambdas有大量的類將不再需要,如:handler,translator等。也可以看出使用Lambdas簡化publishEvent()只僅僅涉及到參數傳遞。
然而如果將代碼改成這樣:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
注意這里使用了捕獲式的Lambda,意味着通過調用publishEvent()時可能需要實例化一個對象來持有ByteBuffer bb將其傳遞給lambda。這個將可能創建額外的垃圾,如果對GC壓力有嚴格要求的情況下,通過傳遞參數的方式將更加受歡迎。
使用方法引用來代理上述的lambda將能進一步簡化上述的方式,也將更時髦:
public class LongEventMain
{
public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println(event);
}
public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
{
event.set(buffer.getLong(0));
}
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(LongEventMain::handleEvent);
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent(LongEventMain::translate, bb);
Thread.sleep(1000);
}
}
}
這里對ringBuffer.publishEvent的參數使用了方法引用替換了lambda,使其更進一步簡化。
### 基本的參數設置
對於大多數場景使用方式即可。然而,如果你能確定硬件和軟件的環境便可以進一步對Disruptor的參數進行調整以提高性能。主要有兩種參數可以被調整:
- single vs. multiple producers
- alternative wait strategies
Single vs. Multiple Producers
提高並發系統的性能的最好方式是遵循Single Writer Principle,這個也在Disruptor也被應用。如果在你的場景中只僅僅是單生產者,然后你可以調優獲得額外的性能提升:
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), DaemonThreadFactory.INSTANCE);
//.....
}
}
為了說明通過這種技術方式能替身多少性能優勢,這里有一份測試類OneToOne performance test。在i7 Sandy Bridge MacBook Air的運行結果:
Multiple Producer:
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
Single Producer:
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
Alternative Wait Strategies
Disruptor默認使用的等待策略是BlockingWaitStrategy。內部的BlockingWaitStrategy使用典型的Lock和Condition處理線程的wake-up。BlockingWaitStrategy是等待策略中最慢的,但是在CPU使用率方面是最保守的,最廣泛的適用於大多數場景。可以通過調整等待策略參數獲取額外的性能。
1.SleepingWaitStrategy
類似BlockingWaitStrategy,SleepingWaitStrategy也試圖保持CPU使用率。通過使用簡單的忙等循環,但是在循環過程中調用了LockSupport.parkNanos(1)。在典型的Linux系統上停頓線程60us。然而,它具有以下好處:生產線程不需要采取任何其他增加適當計數器的動作,並且不需要發信號通知條件變量的成本。然而將增大生產者和消費者之前數據傳遞的延遲。在低延遲沒有被要求的場景中,這是一個非常好的策略。一個公共的使用場景是異步日志。
2.YieldingWaitStrategy
YieldingWaitStrategy是一個低延遲系統中等待策略。通過犧牲CPU資源來降低延遲。YieldingWaitStrategy通過busy spin等待sequence增長到合適的值。在內部實現中,通過在循環內部使用Thread.yield()允許其他的隊列線程運行。當需要很高的性能且事件處理線程少於CPU邏輯核數時這個策略被強烈推薦。如:啟用了超線程。
3.BusySpinWaitStrategy
BusySpinWaitStrategy是高新跟那個的等待策略,但是對環境有限制。如果事件處理器的數量小於物理核數時才使用這個策略。
### 清理RingBuffer中的對象
當通過Disruptor傳遞數據時,對象的存活時間可能超過預期。為了能夠避免這個發生,在事件處理結束后應當清理下事件對象。如果只有單個生產者,在該生產者中清理對象即是最高效的。然后有時間處理鏈時,就需要特定的事件處理器被放置在鏈的最末尾用於清理事件。
class ObjectEvent<T>
{
T val;
void clear()
{
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}