Disruptor系列(二)— disruptor使用


本文譯自Dirsruptor在github上的wiki中文章:Getting Started

獲取Disruptor

Disruptor jar包可以從maven倉庫mvnrepository獲取,可以將其集成進項目的依賴管理中。

<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.2</version>
</dependency>

### 編寫事件處理生產者和消費者

為了學習Disruptor的使用,這里以非常簡單的例子入手:生產者生產單個long型value傳遞給消費者。這里簡化消費者邏輯,只打印消費的value。首先定義攜帶數據的Event:

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value; 
    }
}

為了允許Disruptor能夠為我們預分配這些事件,我們需要一個EventFactory用於構造事件:

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

一旦我們定義了事件,我便再需要創建事件消費者用於消費處理事件。在我們的例子中,我們只需要打印value值到控制台即可:

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

有了事件消費者,我們還需要事件生產者產生事件。為了簡單起見,我們假設數據來源於I/O,如:網絡或者文件。由於不同版本的Disruptor,提供了不同的方式編寫生產者。

隨着3.0版本,Disruptor通過將復雜邏輯囊括在RingBuffer中,從而提供了豐富的Lambda-style API幫助開發者構建Producer。因此從3.0之后,更偏愛使用Event Publisher/Event Translator的API發布消息:

public class LongEventProducerWithTranslator
{
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

這種方式的另一個優勢在於Translator代碼可以被分離在單獨的類中,同時也比較容易進行無依賴的單元測試。Disruptor提供了許多不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.),可以通過實現這些接口提供translators。原因是允許轉換器被表示為靜態類或非捕獲lambda作為轉換方法的參數通過Ring Buffer上的調用傳遞給轉換器。

另一方式使用3.0版本之前的遺留API構建生產者發布消息,這種方式比較原始:

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

從以上的代碼流程編寫可以看出,事件的發布比使用一個簡單的隊列要復雜。這是由於需要對事件預分配導致。對於消息的發布有兩個階段,首先在RingBuffer中聲明需要的槽位,然后再發布可用的數據。必須使用try/finally語句塊包裹消息的發布。必須現在try塊中聲明使用RingBuffer的槽位,然后再finally塊中發布使用的sequece。如果不這樣做,將可能導致Disruptor狀態的錯誤,特別是在多生產者的情況下,如果不重啟Disruptor將不能恢復。因此推薦使用EventTranslator編寫producer。

最后一步需要將以上編寫的組件連接起來。雖然可以手動連接各個組件,然而那樣可能比較復雜,因此提供了一個DSL用於構造以便簡化過程。使用DSL帶來裝配的簡化,但是卻對於很多參數無法做到更細致的控制,然而對於大多數情況,DSL還是非常適合:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}

關於對Disruptor的接口設計的影響之一是Java 8,因為它使用了Functional Interfaces去實現Java Lambdas。在Disruptor API的大多數接口都被定義成Functional Interfaces以便Lambdas可以被使用。以上的LongEventMain可以使用Lambdas進行簡化:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

可以看出使用Lambdas有大量的類將不再需要,如:handler,translator等。也可以看出使用Lambdas簡化publishEvent()只僅僅涉及到參數傳遞。

然而如果將代碼改成這樣:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

注意這里使用了捕獲式的Lambda,意味着通過調用publishEvent()時可能需要實例化一個對象來持有ByteBuffer bb將其傳遞給lambda。這個將可能創建額外的垃圾,如果對GC壓力有嚴格要求的情況下,通過傳遞參數的方式將更加受歡迎。

使用方法引用來代理上述的lambda將能進一步簡化上述的方式,也將更時髦:

public class LongEventMain
{
    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
        event.set(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

這里對ringBuffer.publishEvent的參數使用了方法引用替換了lambda,使其更進一步簡化。


### 基本的參數設置

對於大多數場景使用方式即可。然而,如果你能確定硬件和軟件的環境便可以進一步對Disruptor的參數進行調整以提高性能。主要有兩種參數可以被調整:

  • single vs. multiple producers
  • alternative wait strategies

Single vs. Multiple Producers

提高並發系統的性能的最好方式是遵循Single Writer Principle,這個也在Disruptor也被應用。如果在你的場景中只僅僅是單生產者,然后你可以調優獲得額外的性能提升:

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(
            factory, bufferSize, ProducerType.SINGLE, new BlockingWaitStrategy(), DaemonThreadFactory.INSTANCE);
        //.....
    }
}

為了說明通過這種技術方式能替身多少性能優勢,這里有一份測試類OneToOne performance test。在i7 Sandy Bridge MacBook Air的運行結果:

Multiple Producer:

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

Single Producer:

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

Alternative Wait Strategies

Disruptor默認使用的等待策略是BlockingWaitStrategy。內部的BlockingWaitStrategy使用典型的Lock和Condition處理線程的wake-up。BlockingWaitStrategy是等待策略中最慢的,但是在CPU使用率方面是最保守的,最廣泛的適用於大多數場景。可以通過調整等待策略參數獲取額外的性能。

1.SleepingWaitStrategy

類似BlockingWaitStrategy,SleepingWaitStrategy也試圖保持CPU使用率。通過使用簡單的忙等循環,但是在循環過程中調用了LockSupport.parkNanos(1)。在典型的Linux系統上停頓線程60us。然而,它具有以下好處:生產線程不需要采取任何其他增加適當計數器的動作,並且不需要發信號通知條件變量的成本。然而將增大生產者和消費者之前數據傳遞的延遲。在低延遲沒有被要求的場景中,這是一個非常好的策略。一個公共的使用場景是異步日志。

2.YieldingWaitStrategy

YieldingWaitStrategy是一個低延遲系統中等待策略。通過犧牲CPU資源來降低延遲。YieldingWaitStrategy通過busy spin等待sequence增長到合適的值。在內部實現中,通過在循環內部使用Thread.yield()允許其他的隊列線程運行。當需要很高的性能且事件處理線程少於CPU邏輯核數時這個策略被強烈推薦。如:啟用了超線程。

3.BusySpinWaitStrategy

BusySpinWaitStrategy是高新跟那個的等待策略,但是對環境有限制。如果事件處理器的數量小於物理核數時才使用這個策略。


### 清理RingBuffer中的對象

當通過Disruptor傳遞數據時,對象的存活時間可能超過預期。為了能夠避免這個發生,在事件處理結束后應當清理下事件對象。如果只有單個生產者,在該生產者中清理對象即是最高效的。然后有時間處理鏈時,就需要特定的事件處理器被放置在鏈的最末尾用於清理事件。

class ObjectEvent<T>
{
    T val;

    void clear()
    {
        val = null;
    }
}

public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
    public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
    {
        // Failing to call clear here will result in the 
        // object associated with the event to live until
        // it is overwritten once the ring buffer has wrapped
        // around to the beginning.
        event.clear(); 
    }
}

public static void main(String[] args)
{
    Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
        () -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);

    disruptor
        .handleEventsWith(new ProcessingEventHandler())
        .then(new ClearingObjectHandler());
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM