高性能無鎖隊列 Disruptor 初體驗


原文地址: haifeiWu和他朋友們的博客
博客地址:www.hchstudio.cn
歡迎轉載,轉載請注明作者及出處,謝謝!

最近一直在研究隊列的一些問題,今天樓主要分享一個高性能的隊列 Disruptor 。

what Disruptor ?

它是英國外匯交易公司 LMAX 開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題。基於 Disruptor 開發的系統單線程能支撐每秒600萬訂單。

目前,包括 Apache Storm、Log4j2 在內的很多知名項目都應用了Disruptor以獲取高性能。在樓主公司內部使用 Disruptor 與 Netty 結合用來做 GPS 實時數據的處理,性能相當強悍。本文從實戰角度來大概了解一下 Disruptor 的實現原理。

why Disruptor ?

Disruptor通過以下設計來解決隊列速度慢的問題:

  • 環形數組結構
    為了避免垃圾回收,采用數組而非鏈表。因為,數組對處理器的緩存機制更加友好。

  • 元素位置定位
    數組長度2^n,通過位運算,加快定位的速度。下標采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。

  • 無鎖設計
    每個生產者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之后,直接在該位置寫入或者讀取數據。

  • 針對偽共享問題的優化
    Disruptor 消除這個問題,至少對於緩存行大小是64字節或更少的處理器架構來說是這樣的(有可能處理器的緩存行是128字節,那么使用64字節填充還是會存在偽共享問題),通過增加補全來確保ring buffer的序列號不會和其他東西同時存在於一個緩存行中。

how Disruptor ?

通過上面的介紹,我們大概可以了解到 Disruptor 是一個高性能的無鎖隊列,那么該如何使用呢,下面樓主通過 Disruptor 實現一個簡單的生產者消費者模型,介紹 Disruptor 的使用

首先,根據 Disruptor 的事件驅動的編程模型,我們需要定義一個事件來攜帶數據。


public class DataEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long getValue() {
        return value;
    }
}

為了讓 Disruptor 為我們預先分配這些事件,我們需要構造一個 EventFactory 來執行構造


public class DataEventFactory implements EventFactory<DataEvent> {

    @Override
    public DataEvent newInstance() {
        return new DataEvent();
    }
}

一旦我們定義了事件,我們需要創建一個處理這些事件的消費者。 在我們的例子中,我們要做的就是從控制台中打印出值。

public class DataEventHandler implements EventHandler<DataEvent> {
    @Override
    public void onEvent(DataEvent dataEvent, long l, boolean b) throws Exception {
        new DataEventConsumer(dataEvent);
    }
}

接下來我們需要初始化 Disruptor ,並定義一個生產者來生成消息


public class DisruptorManager {

    private final static Logger LOG = LoggerFactory.getLogger(DisruptorManager.class);

    /*消費者線程池*/
    private static ExecutorService threadPool;
    private static Disruptor<DataEvent> disruptor;
    private static RingBuffer<DataEvent> ringBuffer;

    private static AtomicLong dataNum = new AtomicLong();

    public static void init(EventHandler<DataEvent> eventHandler) {

        //初始化disruptor
        threadPool = Executors.newCachedThreadPool();
        disruptor = new Disruptor<>(new DataEventFactory(), 8 * 1024, threadPool, ProducerType.MULTI, new BlockingWaitStrategy());

        ringBuffer = disruptor.getRingBuffer();
        disruptor.handleEventsWith(eventHandler);
        disruptor.start();

        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                LOG.info("放入隊列中數據編號{},隊列剩余空間{}", dataNum.get(), ringBuffer.remainingCapacity());
            }
        }, new Date(), 60 * 1000);
    }

    /**
     *
     * @param message
     */
    public static void putDataToQueue(long message) {
        if (dataNum.get() == Long.MAX_VALUE) {
            dataNum.set(0L);
        }

        // 往隊列中加事件
        long next = ringBuffer.next();
        try {
            ringBuffer.get(next).set(message);
            dataNum.incrementAndGet();
        } catch (Exception e) {
            LOG.error("向RingBuffer存入數據[{}]出現異常=>{}", message, e.getStackTrace());
        } finally {
            ringBuffer.publish(next);
        }
    }

    public static void close() {
        threadPool.shutdown();
        disruptor.shutdown();
    }
}

最后我們來定義一個 Main 方法來執行代碼


public class EventMain {

    public static void main(String[] args) throws Exception {
        DisruptorManager.init(new DataEventHandler());
        for (long l = 0; true; l++) {
            DisruptorManager.putDataToQueue(l);
            Thread.sleep(1000);
        }
    }
}

上面代碼具體感興趣的小伙伴請移步 https://github.com/haifeiWu/disruptor-learn

然后我們可以看到控制台打印出來的數據

console

小結

Disruptor 通過精巧的無鎖設計實現了在高並發情形下的高性能。

另外在Log4j 2中的異步模式采用了Disruptor來處理。在這里樓主遇到一個小問題,就是在使用Log4j 2通過 TCP 模式往 logstash 發日志數據的時候,由於網絡問題導致鏈接中斷,從而導致 Log4j 2 不停的往 ringbuffer 中寫數據,ringbuffer數據沒有消費者,導致服務器內存跑滿。解決方案是設置 Log4j 2 中 Disruptor 隊列有界,或者換成 UDP 模式來寫日志數據(如果數據不重要的話)。

參考鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM