多線程筆記 - disruptor


disruptor 可以理解為一個生產消費的框架. 具體翻譯教程: http://ifeve.com/disruptor-getting-started/

這個框架從數據上看, 是很強大的. 號稱1s處理600萬數據(不是消費掉600萬). 這里學習一下.

一. Hello World

數據容器:

//數據的載體, 封裝要傳遞的數據
public class LongEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }
}

這里的數據封裝類, 叫 Event, 我們知道 Event 翻譯過來是 事件. 但是這里表示數據. 稍微有點別扭

 

數據生產工廠:

//數據包裝類工廠, 用來生產空容器的, 用來裝數據
public class LongEventFactory implements EventFactory {
    @Override
    public Object newInstance() {
        return new LongEvent();
    }
}

這里主要是用來生產數據的空容器的. 給后面用的時候, 進行賦值用的.

 

消費者:

//數據的處理器, 對數據進行處理, 此處只是簡單的打印
public class LongEventHandlerA implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName()  + "消費數據(A) : " + event.getValue());
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        System.out.println(Thread.currentThread().getName()  + "消費數據(A) : " + event.getValue());
    }
}

在Hello World例子中, 只要實現 EventHandler 接口就行了, 后面那個 WorkHandler 是后面例子用的.

 

在來一個消費者B, 代碼和上面一樣:

//數據的處理器, 對數據進行處理, 此處只是簡單的打印
public class LongEventHandlerB implements EventHandler<LongEvent>, WorkHandler<LongEvent> {
    @Override
    public void onEvent(LongEvent event, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName()  + "消費數據(B) : " + event.getValue());
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        System.out.println(Thread.currentThread().getName()  + "消費數據(B) : " + event.getValue());
    }
}

 

生產者:

//數據生產者
public class LongEventProducer {
    //生產的數據可以往 ringBuffer 里面丟
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * onData用來發布事件,每調用一次就發布一次事件事件
     * 它的參數會通過事件傳遞給消費者
     *
     * @param bb
     */
    public void onData(ByteBuffer bb) {
        //可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽
        long sequence = ringBuffer.next();
        try {
            //用上面的索引取出一個空的事件用於填充
            LongEvent event = ringBuffer.get(sequence);
            //設置值
            event.setValue(bb.getLong(0));
        }
        finally {
            //發布事件, 或者說發布數據, 通知消費者可以消費了
            ringBuffer.publish(sequence);
        }
    }
}

 

生產者的另一種寫法, 要稍微簡單點:

public class LongEventProducerWithTranslator {
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
            new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
                @Override
                public void translateTo(LongEvent longEvent, long sequence, ByteBuffer bb) {
                    longEvent.setValue(bb.getLong(0));
                }
            };

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb){
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

 

測試方法:

public static void main(String[] args) throws InterruptedException {
    //線程池
    //Executor executor = Executors.newCachedThreadPool();
    //裝數據的容器工廠
    LongEventFactory factory = new LongEventFactory();
    //容器size
    int bufferSize = 1024;
    //創建 disruptor 實例
    //這種方式已經不推薦使用
    //Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);
    //推薦使用這種, 自定義線程工廠的方式
    ThreadFactory threadFactory = new ThreadFactory() {
        private final AtomicInteger index = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(null, r, "disruptor-thread-" + index.incrementAndGet());
        }
    };
    Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, threadFactory
            , ProducerType.SINGLE
            , new YieldingWaitStrategy());
    //引入數據處理器
    //這種方式是消費相同的數據
    disruptor.handleEventsWith(new LongEventHandlerA())
            .then(new LongEventHandlerB());
    //這種方式是消費不同的數據
    //disruptor.handleEventsWithWorkerPool(new LongEventHandlerA(), new LongEventHandlerB());
    //啟動 disruptor 容器
    disruptor.start();
    //從 disruptor 中拿取裝數據的容器
    RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
    //將這個容器給生產者, 生產者產生的數據, 可以直接丟進去
    //LongEventProducer producer = new LongEventProducer(ringBuffer);

    LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);

    ByteBuffer bb = ByteBuffer.allocate(8);
    for (long num = 0L; num <= 100L; num++) {
        bb.putLong(0, num);
        //發布數據
        producer.onData(bb);

        Thread.sleep(100);
    }
}

消費者消費的時候, 大的方向上, 有兩種模式:

1. 多個消費者, 消費的數據都是一樣的:  handleEventsWith

2. 多個消費者, 消費的數據是不一樣的:  handleEventsWithWorkerPool

disruptor 有個比較有意思的功能, 就是拼接消費模型.

如此例中, 我修改一句代碼:

disruptor.handleEventsWith(new LongEventHandlerA())
                .then(new LongEventHandlerB()).handleEventsWithWorkerPool(new LongEventHandlerC(), new LongEventHandlerD());

看結果:

 

 仔細觀察, 就能發現, A永遠在B前面, 因為 B 是 then() 在A后面的.

C和D永遠不會消費同一條消息. 比如 C 消費了96, 那么D就不能再消費96了, 繼而只能在下一輪中消費97.

 

二. 等待模式

既然是生產消費, 就肯定有個速度問題. 可能是生產快了, 也可能是消費快了. 那么這種情況, 在 disruptor 也是有策略處理的. 這里直接引用譯文.

Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控制線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。

SleepingWaitStrategy

和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux系統上面睡眠時間60µs).然而,它的優點在於生產線程只需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情況下。比如異步日志功能。

YieldingWaitStrategy

YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他准備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。

BusySpinWaitStrategy

BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。

 

在  new Disruptor()  的時候, 可以指定引用哪一種等待策略.

 

三. RingBuffer

RingBuffer 具體是啥, 這里我也不解析了, 可以把它理解為一個 環形結構的 數據存儲器.

 

 這里需要注意, 在給 RingBuffer 分配數據槽 的時候, 數量最好是 2的冪次倍. 這種的性能比隨便寫的要好很多.

這個 RingBuffer 也可以拿出來單獨用, 不和 disruptor 合着用

測試方法:

public static void main2(String[] args) throws InterruptedException {
    ExecutorService pool = Executors.newCachedThreadPool();
    LongEventHandlerA handlerA = new LongEventHandlerA();
    LongEventHandlerB handlerB = new LongEventHandlerB();

    RingBuffer ringBuffer = RingBuffer.createSingleProducer(new LongEventFactory(), 1024);
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    //創建消息處理器, 相當於消費者A
    BatchEventProcessor<LongEvent> eventProcessorA = new BatchEventProcessor<LongEvent>(ringBuffer, sequenceBarrier, handlerA);
    //這一步的目的就是把消費者的位置信息引用注入到生產者 如果只有一個消費者的情況可以省略
    ringBuffer.addGatingSequences(eventProcessorA.getSequence());
    //把消息處理器提交到線程池
    pool.execute(eventProcessorA);

    //創建消息處理器, 相當於消費者B
    BatchEventProcessor<LongEvent> eventProcessorB = new BatchEventProcessor<LongEvent>(ringBuffer, sequenceBarrier, handlerB);
    ringBuffer.addGatingSequences(eventProcessorB.getSequence());
    pool.execute(eventProcessorB);

    for (int i = 0; i < 100; i++) {
     //拿取空槽位置
long seq = ringBuffer.next();
     //對空槽進行數據填充 LongEvent event
= (LongEvent) ringBuffer.get(seq); event.setValue(i);
     //發布數據, 通知消費者進行數據消費 ringBuffer.publish(seq); } Thread.sleep(
1000); //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!) eventProcessorA.halt(); eventProcessorB.halt();   //關閉線程 pool.shutdown(); }

結果:

 

 我這里只貼了一部分, 其實是都消費完了.

從圖中可以看出, 消費沒有順序, 並不是A消費了B才消費, 也不是交替消費. 他們消費的數據是相同的.

 

除了以上這種寫法, 他還有一種寫法, 使用 WorkPool:

public static void main3(String[] args) throws InterruptedException {
    ExecutorService pool = Executors.newCachedThreadPool();
    LongEventHandlerA handlerA = new LongEventHandlerA();
    LongEventHandlerB handlerB = new LongEventHandlerB();
    LongEventFactory eventFactory = new LongEventFactory();
    RingBuffer ringBuffer = RingBuffer.createSingleProducer(eventFactory, 1024);
    SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
    WorkerPool<LongEvent> workerPoolA = new WorkerPool<LongEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handlerA);
    workerPoolA.start(pool);

    WorkerPool<LongEvent> workerPoolB = new WorkerPool<LongEvent>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handlerB);
    workerPoolB.start(pool);

    for (int i = 0; i < 100; i++) {
        long seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區塊
        LongEvent longEvent = (LongEvent) ringBuffer.get(seq);//給這個區塊放入 數據
        longEvent.setValue(i);
        ringBuffer.publish(seq);//發布這個區塊的數據使handler(consumer)可見
    }

    Thread.sleep(1000);//等上1秒,等消費都處理完成
    //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!)
    workerPoolA.halt();
    //通知事件(或者說消息)處理器 可以結束了(並不是馬上結束!!!)
    workerPoolB.halt();
    //終止線程
    pool.shutdown();
}

 

結果:

 

 數據少的時候, 你可能會看到 A 和 B 交替出現, 但事實上, 這里並沒有順序.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM