disruptor筆記之三:環形隊列的基礎操作(不用Disruptor類)


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列鏈接

  1. 快速入門
  2. Disruptor類分析
  3. 環形隊列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的第三篇,主要任務是編碼實現消息生產和消費,與《disruptor筆記之一:快速入門》不同的是,本次開發不使用Disruptor類,和Ring Buffer(環形隊列)相關的操作都是自己寫代碼實現;
  • 這種脫離Disruptor類操作Ring Buffer的做法,不適合用在生產環境,但在學習Disruptor的過程中,這是種高效的學習手段,經過本篇實戰后,在今后使用Disruptor時,您在開發、調試、優化等各種場景下都能更加得心應手;
  • 簡單的消息生產消費已不能滿足咱們的學習熱情,今天的實戰要挑戰以下三個場景:
  1. 100個事件,單個消費者消費;
  2. 100個事件,三個消費者,每個都獨自消費這個100個事件;
  3. 100個事件,三個消費者共同消費這個100個事件;

前文回顧

為了完成本篇的實戰,前文《disruptor筆記之二:Disruptor類分析》已做了充分的研究分析,建議觀看,這里簡單回顧以下Disruptor類的幾個核心功能,這也是咱們編碼時要實現的:

  1. 創建環形隊列(RingBuffer對象)
  2. 創建SequenceBarrier對象,用於接收ringBuffer中的可消費事件
  3. 創建BatchEventProcessor,負責消費事件
  4. 綁定BatchEventProcessor對象的異常處理類
  5. 調用ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
  6. 啟動獨立線程,用來執行消費事件的業務邏輯
  • 理論分析已經完成,接下來開始編碼;

源碼下載

名稱 鏈接 備注
項目主頁 https://github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) git@github.com:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:

在這里插入圖片描述

  • disruptor-tutorials是個父工程,里面有多個module,本篇實戰的module是low-level-operate,如下圖紅框所示:

在這里插入圖片描述

開發

  • 進入編碼階段,今天的任務是挑戰以下三個場景:
  1. 100個事件,單個消費者消費;
  2. 100個事件,三個消費者,每個都獨自消費這個100個事件;
  3. 100個事件,三個消費者共同消費這個100個事件;
  • 咱們先把工程建好,然后編寫公共代碼,例如事件定義、事件工廠等,最后才是每個場景的開發;
  • 在父工程disruptor-tutorials新增名為low-level-operate的module,其build.gradle如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • 然后是springboot啟動類:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LowLevelOperateApplication {
	public static void main(String[] args) {
		SpringApplication.run(LowLevelOperateApplication.class, args);
	}
}
  • 事件類,這是事件的定義:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {
    private String value;
}
  • 事件工廠,定義如何在內存中創建事件對象:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {
    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}
  • 事件生產類,定義如何將業務邏輯的事件轉為disruptor事件發布到環形隊列,用於消費:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {

    // 存儲數據的環形隊列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {

        // ringBuffer是個隊列,其next方法返回的是下最后一條記錄之后的位置,這是個可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件添加業務信息
            stringEvent.setValue(content);
        } finally {
            // 發布
            ringBuffer.publish(sequence);
        }
    }
}
  • 事件處理類,收到事件后具體的業務處理邏輯:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 這里延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 定義一個接口,外部通過調用接口的方法來生產消息,再放幾個常量在里面后面會用到:
package com.bolingcavalry.service;

public interface LowLevelOperateService {
    /**
     * 消費者數量
     */
    int CONSUMER_NUM = 3;

    /**
     * 環形緩沖區大小
     */
    int BUFFER_SIZE = 16;

    /**
     * 發布一個事件
     * @param value
     * @return
     */
    void publish(String value);

    /**
     * 返回已經處理的任務總數
     * @return
     */
    long eventCount();
}
  • 以上就是公共代碼了,接下來逐個實現之前規划的三個場景;

100個事件,單個消費者消費

  • 這是最簡單的功能了,實現發布消息和單個消費者消費的功能,代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 統計消息總數
     */
    private final AtomicLong eventCount = new AtomicLong();

    private ExecutorService executors;

    @PostConstruct
    private void init() {
        // 准備一個匿名類,傳給disruptor的事件處理類,
        // 這樣每次處理事件時,都會將已經處理事件的總數打印出來
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 創建環形隊列實例
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        // 准備線程池
        executors = Executors.newFixedThreadPool(1);

        //創建SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        // 創建事件處理的工作類,里面執行StringEventHandler處理事件
        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                sequenceBarrier,
                new StringEventHandler(eventCountPrinter));

        // 將消費者的sequence傳給環形隊列
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 在一個獨立線程中取事件並消費
        executors.submit(batchEventProcessor);

        // 生產者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代碼有以下幾處需要注意:
  1. 自己創建環形隊列RingBuffer實例
  2. 自己准備線程池,里面的線程用來獲取和消費消息
  3. 自己動手創建BatchEventProcessor實例,並把事件處理類傳入
  4. 通過ringBuffer創建sequenceBarrier,傳給BatchEventProcessor實例使用
  5. 將BatchEventProcessor的sequence傳給ringBuffer,確保ringBuffer的生產和消費不會出現混亂
  6. 啟動線程池,意味着BatchEventProcessor實例在一個獨立線程中不斷的從ringBuffer中獲取事件並消費;
  • 為了驗證上述代碼能否正常工作,我這里寫了個單元測試類,如下所示,邏輯很簡單,調用OneConsumerServiceImpl.publish方法一百次,產生一百個事件,再檢查OneConsumerServiceImpl記錄的消費事件總數是不是等於一百:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {

    @Autowired
    @Qualifier("oneConsumer")
    LowLevelOperateService oneConsumer;

    private static final int EVENT_COUNT = 100;

    private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
        for(int i=0;i<eventCount;i++) {
            log.info("publich {}", i);
            service.publish(String.valueOf(i));
        }

        // 異步消費,因此需要延時等待
        Thread.sleep(10000);

        // 消費的事件總數應該等於發布的事件數
        assertEquals(expectEventCount, service.eventCount());
    }

    @Test
    public void testOneConsumer() throws InterruptedException {
        log.info("start testOneConsumerService");
        testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
    }
  • 注意,如果您是直接在IDEA上點擊圖標來執行單元測試,記得勾選下圖紅框中選項,否則可能出現編譯失敗:

在這里插入圖片描述

  • 執行上述單元測試類,結果如下圖所示,消息的生產和消費都符合預期,並且消費邏輯是在獨立線程中執行的:

在這里插入圖片描述

  • 繼續挑戰下一個場景;

100個事件,三個消費者,每個都獨自消費這個100個事件

  • 這個場景在kafka中也有,就是三個消費者的group不同,這樣每一條消息,這兩個消費者各自消費一次;
  • 因此,100個事件,3個消費者每人都會獨立消費這100個事件,一共消費300次;
  • 代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 統計消息總數
     */
    private final AtomicLong eventCount = new AtomicLong();

    /**
     * 生產一個BatchEventProcessor實例,並且啟動獨立線程開始獲取和消費消息
     * @param executorService
     */
    private void addProcessor(ExecutorService executorService) {
        // 准備一個匿名類,傳給disruptor的事件處理類,
        // 這樣每次處理事件時,都會將已經處理事件的總數打印出來
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                ringBuffer.newBarrier(),
                new StringEventHandler(eventCountPrinter));

        // 將當前消費者的sequence實例傳給ringBuffer
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 啟動獨立線程獲取和消費事件
        executorService.submit(batchEventProcessor);
    }

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        // 創建多個消費者,並在獨立線程中獲取和消費事件
        for (int i=0;i<CONSUMER_NUM;i++) {
            addProcessor(executorService);
        }

        // 生產者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代碼和前面的OneConsumerServiceImpl相比差別不大,主要是創建了多個BatchEventProcessor實例,然后分別在線程池中提交;

  • 驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testLowLevelOperateService的第三個參數是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示預期的被消費消息數為300

 	@Autowired
    @Qualifier("multiConsumer")
    LowLevelOperateService multiConsumer;

    @Test
    public void testMultiConsumer() throws InterruptedException {
        log.info("start testMultiConsumer");
        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
    }
  • 執行單元測試,如下圖所示,一共消費了300個事件,並且三個消費者在不動線程:

在這里插入圖片描述

100個事件,三個消費者共同消費這個100個事件

  • 本篇的最后一個實戰是發布100個事件,然后讓三個消費者共同消費100個(例如A消費33個,B消費33個,C消費34個);

  • 前面用到的BatchEventProcessor是用來獨立消費的,不適合多個消費者共同消費,這種多個消費共同消費的場景需要借助WorkerPool來完成,這個名字還是很形象的:一個池子里面有很多個工作者,把任務放入這個池子,工作者們每人處理一部分,大家合力將任務完成;

  • 傳入WorkerPool的消費者需要實現WorkHandler接口,於是新增一個實現類:

package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {

    public StringWorkHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event) throws Exception {
        log.info("work handler event : {}", event);

        // 這里延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 新增服務類,實現共同消費的邏輯,有幾處要注意的地方稍后會提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 統計消息總數
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];

        // 創建多個StringWorkHandler實例,放入一個數組中
        for (int i=0;i < CONSUMER_NUM;i++) {
            handlers[i] = new StringWorkHandler(o -> {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            });
        }

        // 創建WorkerPool實例,將StringWorkHandler實例的數組傳進去,代表共同消費者的數量
        WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);

        // 這一句很重要,去掉就會出現重復消費同一個事件的問題
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(executorService);

        // 生產者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代碼中,要注意的有以下兩處:
  1. StringWorkHandler數組傳入給WorkerPool后,每個StringWorkHandler實例都放入一個新的WorkProcessor實例,WorkProcessor實現了Runnable接口,在執行workerPool.start時,會將WorkProcessor提交到線程池中;

  2. 和前面的獨立消費相比,共同消費最大的特點在於只調用了一次ringBuffer.addGatingSequences方法,也就是說三個消費者共用一個sequence實例;

  • 驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testWorkerPoolConsumer的第三個參數是EVENT_COUNT,表示預期的被消費消息數為100
 	@Autowired
    @Qualifier("workerPoolConsumer")
    LowLevelOperateService workerPoolConsumer;
    
    @Test
    public void testWorkerPoolConsumer() throws InterruptedException {
        log.info("start testWorkerPoolConsumer");
        testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
    }
  • 執行單元測試如下圖所示,三個消費者一共消費100個事件,且三個消費者在不同線程:

在這里插入圖片描述

  • 至此,咱們在不用Disruptor類的前提下完成了三種常見場景的消息生產消費,相信您對Disruptor的底層實現也有了深刻認識,今后不論是使用還是優化Disruptor,一定可以更加得心應手;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM