disruptor 高性能之道


disruptor是一個高性能的線程間異步通信的框架,即在同一個JVM進程中的多線程間消息傳遞。應用disruptor知名項目有如下的一些:Storm, Camel, Log4j2,還有目前的美團點評技術團隊也有很多不少的應用,或者說有一些借鑒了它的設計機制。 下面就跟着筆者一起去領略下disruptor高性能之道吧~

disruptor是一款開源的高性能隊列框架,github地址為 https://github.com/LMAX-Exchange/disruptor

分析disruptor,只要把event的生產和消費流程弄懂,基本上disruptor的七寸就已經抓住了。話不多說,趕緊上車,筆者以下面代碼為例講解disruptor:

public static void main(String[] args) {
    Disruptor<StringEvent> disruptor = new Disruptor<>(StringEvent::new, 1024,
            new PrefixThreadFactory("consumer-pool-", new AtomicInteger(0)), ProducerType.MULTI,
            new BlockingWaitStrategy());
 
    // 注冊consumer並啟動
    disruptor.handleEventsWith((EventHandler<StringEvent>) (event, sequence, endOfBatch) -> {
        System.out.println(Util.threadName() + "onEvent " + event);
    });
    disruptor.start();
 
    // publisher邏輯
    Executor executor = Executors.newFixedThreadPool(2,
            new PrefixThreadFactory("publisher-pool-", new AtomicInteger(0)));
    while (true) {
        for (int i = 0; i < 2; i++) {
            executor.execute(() -> {
                Util.sleep(1);
                disruptor.publishEvent((event, sequence, arg0) -> {
                    event.setValue(arg0 + " " + sequence);
                }, "hello world");
            });
        }
 
        Util.sleep(1000);
    }
}
class StringEvent {
    private String value;

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "StringEvent:{value=" + value + "}";
    }
}

class PrefixThreadFactory implements ThreadFactory {
    private String prefix;
    private AtomicInteger num;

    public PrefixThreadFactory(String prefix, AtomicInteger num) {
        this.prefix = prefix;
        this.num = num;
    }

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, prefix + num.getAndIncrement());
    }

}

class Util {

    static String threadName() {
        return String.format("%-16s", Thread.currentThread().getName()) + ": ";
    }

    static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
測試相關類

event生產流程

event的生產是從 RingBuffer.publishEvent 開始的,event生產流程步驟如下:
  • 獲取待插入(到ringBuffer的)位置,相當於先占個位
  • 往該位置上設置event
  • 設置sequence對應event的標志,通知consumer
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
    // 獲取當前要設置的sequence序號,然后進行設置並通知消費者
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}
 
// 獲取下一個sequence,直到獲取到位置才返回
public long next(int n) {
    long current;
    long next;
     
    do {
        // 獲取當前ringBuffer的可寫入sequence
        current = cursor.get();
        next = current + n;
 
        long wrapPoint = next - bufferSize;
        long cachedGatingSequence = gatingSequenceCache.get();
 
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) {
            // 如果當前沒有空位置寫入,獲取多個consumer中消費進度最小的那個的消費進度
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
 
            if (wrapPoint > gatingSequence) {
                // 阻塞1ns,然后continue
                LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                continue;
            }
 
            gatingSequenceCache.set(gatingSequence);
        }
        // cas設置ringBuffer的sequence
        else if (cursor.compareAndSet(current, next)) {
            break;
        }
    } while (true);
 
    return next;
}
 
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
    try {
        // 設置event
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}
public void publish(final long sequence) {
    // 1. 設置availableBuffer,表示對應的event是否設置完成,consumer線程中會用到
    //   - 注意,到這里時,event已經設置完成,但是consumer還不知道該sequence對應的event是否設置完成,
    //   - 所以需要設置availableBuffer中sequence對應event的sequence number
    // 2. 通知consumer
    setAvailable(sequence);
    waitStrategy.signalAllWhenBlocking();
}

從translateAndPublish中看,如果用戶的設置event方法拋出異常,這時event對象是不完整的,那么publish到consumer端,consumer消費的不是完整的數據怎么辦呢?在translateAndPublish中需不需要在異常情況下reset event對象呢?關於這個問題筆者之前是有疑問的,關於這個問題筆者提了一個issue,可點擊 https://github.com/LMAX-Exchange/disruptor/issues/244 進行查看。

筆者建議在consumer消費完event之后,進行reset event操作,這樣避免下次設置event異常consumer時取到不完整的數據,比如log4j2中的AsyncLogger中處理完log4jEvent之后就會調用clear方法進行重置event。

event消費流程

event消費流程入口是BatchEventProcessor.processEvents,event消費流程步驟:
  • 獲取當前consumer線程消費的offset,即nextSequence
  • 從ringBuffer獲取可用的sequence,沒有新的event時,會根據consmer阻塞策略進行執行某些動作
  • 獲取event,然后執行event回調
  • 設置當前consumer線程的消費進度
private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;
 
    while (true) {
        try {
            // 獲取可用的sequence,默認直到有可用sequence時才返回
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }
 
            // 執行消費回調動作,注意,這里獲取到一個批次event,可能有多個,個數為availableSequence-nextSequence + 1
            // nextSequence == availableSequence表示該批次只有一個event
            while (nextSequence <= availableSequence) {
                // 獲取nextSequence位置上的event
                event = dataProvider.get(nextSequence);
                // 用戶自定義的event 回調
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }
 
            // 設置當前consumer線程的消費進度sequence
            sequence.set(availableSequence);
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}
 
public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException{
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
 
    if (availableSequence < sequence) {
        return availableSequence;
    }
 
    // 獲取ringBuffer中可安全讀的最大的sequence number,該信息存在availableBuffer中的sequence
    // 在MultiProducerSequencer.publish方法中會設置
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
 
// 默認consumer阻塞策略 BlockingWaitStrategy
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
    throws AlertException, InterruptedException
{
    long availableSequence;
    if (cursorSequence.get() < sequence) {
        // 當前ringBuffer的sequence小於sequence,阻塞等待
        // event生產之后會喚醒
        synchronized (mutex) {
            while (cursorSequence.get() < sequence) {
                barrier.checkAlert();
                mutex.wait();
            }
        }
    }
 
    while ((availableSequence = dependentSequence.get()) < sequence) {
        barrier.checkAlert();
        ThreadHints.onSpinWait();
    }
 
    return availableSequence;
}

從上面的event消費流程來看,消費線程會讀取ringBuffer的sequence,然后更新本消費線程內的offset(消費進度sequence),如果有多個event的話,那么就是廣播消費模式了(單consumer線程內還是順序消費),如果不想讓event被廣播消費(重復消費),可使用如下方法添加consumer線程(WorkHandler是集群消費,EventHandler是廣播消費):

disruptor.handleEventsWithWorkerPool((WorkHandler<StringEvent>) event -> {
    System.out.println(Util.threadName() + "onEvent " + event);
});

disruptor高性能之道

棄用鎖機制改用CAS

event生產流程中獲取並自增sequence時用的就是CAS,獲取之后該sequence對應位置的操作只會在單線程,沒有了並發問題。

集群消費模式下獲取sequence之后也會使用CAS設置為sequence新值,設置本地消費進度,然后再執行獲取event並執行回調邏輯。

注意,disruptor中較多地方使用了CAS,但並不代表完全沒有了鎖機制,比如默認consumer阻塞策略 BlockingWaitStrategy發揮作用時,consumer消費線程就會阻塞,只不過這只會出現在event生產能力不足是才會存在。如果consumer消費不足,大量event生產導致ringBuffer爆滿,這時event生產線程就會輪詢調用LockSupport.parkNanos(1),這里的成本也不容小覷(涉及到線程切換損耗)。

 
避免偽共享引入緩沖行填充

偽共享講的是多個CPU時的123級緩存的問題,通常,緩存是以緩存行的方式讀取數據,如果A、B兩個變量被緩沖在同一行之內,那么對於其中一個的更新會導致另一個緩沖無效,需要從內存中讀取,這種無法充分利用緩存行的問題就是偽共享。disruptor相關代碼如下:

class LhsPadding {
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding {
    protected volatile long value;
}
 
使用RingBuffer作為數據存儲容器

ringBuffer是一個環形隊列,本質是一個數組,size為2的冪次方(方便做&操作),數據位置sequence值會和size做&操作得出數組下標,然后進行數據的讀寫操作(只在同一個線程內,無並發問題)。

 
小結

disruptor初衷是為了解決內存隊列的延遲問題,作為一個高性能隊列,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都在使用。disruptor的重要機制就是CAS和RingBuffer,借助於它們兩個實現數據高效的生產和消費

disruptor多生產者多消費者模式下,因為RingBuffer數據的寫入是分為2步的(先獲取到個sequence,然后寫入數據),如果獲取到sequence之后,生產者寫入RingBuffer較慢,consumer消費較快,那么生產者最終會拖慢consumer消費進度,這一點需注意(如果已經消費到生產者占位的前一個數據了,那么consumer會執行對應的阻塞策略)。在實際使用過程中,如果consumer消費邏輯耗時較長,可以封裝成任務交給線程池來處理,避免consumer端拖慢生成者的寫入速度。

disruptor的設計對於開發者來說有哪些借鑒的呢?盡量減少競爭,避免多線程對同一數據做操作,比如disruptor使用CAS獲取只會在一個線程內進行讀寫的event對象,這種思想其實已經在JDK的thread本地內存中有所體現;盡量復用對象,避免大量的內存申請釋放,增加GC損耗,disruptor通過復用event對象來保證讀寫時不會產生對象GC問題;選擇合適數據結構,disruptor使用ringBuffer,環形數組來實現數據高效讀寫。

 

參考資料:

1、https://tech.meituan.com/disruptor.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM