歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
本篇概覽
- 本文是《disruptor筆記》系列的第三篇,主要任務是編碼實現消息生產和消費,與《disruptor筆記之一:快速入門》不同的是,本次開發不使用Disruptor類,和Ring Buffer(環形隊列)相關的操作都是自己寫代碼實現;
- 這種脫離Disruptor類操作Ring Buffer的做法,不適合用在生產環境,但在學習Disruptor的過程中,這是種高效的學習手段,經過本篇實戰后,在今后使用Disruptor時,您在開發、調試、優化等各種場景下都能更加得心應手;
- 簡單的消息生產消費已不能滿足咱們的學習熱情,今天的實戰要挑戰以下三個場景:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
前文回顧
為了完成本篇的實戰,前文《disruptor筆記之二:Disruptor類分析》已做了充分的研究分析,建議觀看,這里簡單回顧以下Disruptor類的幾個核心功能,這也是咱們編碼時要實現的:
- 創建環形隊列(RingBuffer對象)
- 創建SequenceBarrier對象,用於接收ringBuffer中的可消費事件
- 創建BatchEventProcessor,負責消費事件
- 綁定BatchEventProcessor對象的異常處理類
- 調用ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
- 啟動獨立線程,用來執行消費事件的業務邏輯
- 理論分析已經完成,接下來開始編碼;
源碼下載
- 本篇實戰中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 鏈接 | 備注 |
---|---|---|
項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:
- disruptor-tutorials是個父工程,里面有多個module,本篇實戰的module是low-level-operate,如下圖紅框所示:
開發
- 進入編碼階段,今天的任務是挑戰以下三個場景:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
- 咱們先把工程建好,然后編寫公共代碼,例如事件定義、事件工廠等,最后才是每個場景的開發;
- 在父工程disruptor-tutorials新增名為low-level-operate的module,其build.gradle如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
- 然后是springboot啟動類:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LowLevelOperateApplication {
public static void main(String[] args) {
SpringApplication.run(LowLevelOperateApplication.class, args);
}
}
- 事件類,這是事件的定義:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class StringEvent {
private String value;
}
- 事件工廠,定義如何在內存中創建事件對象:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
- 事件生產類,定義如何將業務邏輯的事件轉為disruptor事件發布到環形隊列,用於消費:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class StringEventProducer {
// 存儲數據的環形隊列
private final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String content) {
// ringBuffer是個隊列,其next方法返回的是下最后一條記錄之后的位置,這是個可用位置
long sequence = ringBuffer.next();
try {
// sequence位置取出的事件是空事件
StringEvent stringEvent = ringBuffer.get(sequence);
// 空事件添加業務信息
stringEvent.setValue(content);
} finally {
// 發布
ringBuffer.publish(sequence);
}
}
}
- 事件處理類,收到事件后具體的業務處理邏輯:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {
public StringEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這里延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 定義一個接口,外部通過調用接口的方法來生產消息,再放幾個常量在里面后面會用到:
package com.bolingcavalry.service;
public interface LowLevelOperateService {
/**
* 消費者數量
*/
int CONSUMER_NUM = 3;
/**
* 環形緩沖區大小
*/
int BUFFER_SIZE = 16;
/**
* 發布一個事件
* @param value
* @return
*/
void publish(String value);
/**
* 返回已經處理的任務總數
* @return
*/
long eventCount();
}
- 以上就是公共代碼了,接下來逐個實現之前規划的三個場景;
100個事件,單個消費者消費
- 這是最簡單的功能了,實現發布消息和單個消費者消費的功能,代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計消息總數
*/
private final AtomicLong eventCount = new AtomicLong();
private ExecutorService executors;
@PostConstruct
private void init() {
// 准備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數打印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
// 創建環形隊列實例
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
// 准備線程池
executors = Executors.newFixedThreadPool(1);
//創建SequenceBarrier
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
// 創建事件處理的工作類,里面執行StringEventHandler處理事件
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
sequenceBarrier,
new StringEventHandler(eventCountPrinter));
// 將消費者的sequence傳給環形隊列
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// 在一個獨立線程中取事件並消費
executors.submit(batchEventProcessor);
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代碼有以下幾處需要注意:
- 自己創建環形隊列RingBuffer實例
- 自己准備線程池,里面的線程用來獲取和消費消息
- 自己動手創建BatchEventProcessor實例,並把事件處理類傳入
- 通過ringBuffer創建sequenceBarrier,傳給BatchEventProcessor實例使用
- 將BatchEventProcessor的sequence傳給ringBuffer,確保ringBuffer的生產和消費不會出現混亂
- 啟動線程池,意味着BatchEventProcessor實例在一個獨立線程中不斷的從ringBuffer中獲取事件並消費;
- 為了驗證上述代碼能否正常工作,我這里寫了個單元測試類,如下所示,邏輯很簡單,調用OneConsumerServiceImpl.publish方法一百次,產生一百個事件,再檢查OneConsumerServiceImpl記錄的消費事件總數是不是等於一百:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {
@Autowired
@Qualifier("oneConsumer")
LowLevelOperateService oneConsumer;
private static final int EVENT_COUNT = 100;
private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
for(int i=0;i<eventCount;i++) {
log.info("publich {}", i);
service.publish(String.valueOf(i));
}
// 異步消費,因此需要延時等待
Thread.sleep(10000);
// 消費的事件總數應該等於發布的事件數
assertEquals(expectEventCount, service.eventCount());
}
@Test
public void testOneConsumer() throws InterruptedException {
log.info("start testOneConsumerService");
testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
}
- 注意,如果您是直接在IDEA上點擊圖標來執行單元測試,記得勾選下圖紅框中選項,否則可能出現編譯失敗:
- 執行上述單元測試類,結果如下圖所示,消息的生產和消費都符合預期,並且消費邏輯是在獨立線程中執行的:
- 繼續挑戰下一個場景;
100個事件,三個消費者,每個都獨自消費這個100個事件
- 這個場景在kafka中也有,就是三個消費者的group不同,這樣每一條消息,這兩個消費者各自消費一次;
- 因此,100個事件,3個消費者每人都會獨立消費這100個事件,一共消費300次;
- 代碼如下,有幾處要注意的地方稍后提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計消息總數
*/
private final AtomicLong eventCount = new AtomicLong();
/**
* 生產一個BatchEventProcessor實例,並且啟動獨立線程開始獲取和消費消息
* @param executorService
*/
private void addProcessor(ExecutorService executorService) {
// 准備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數打印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
ringBuffer,
ringBuffer.newBarrier(),
new StringEventHandler(eventCountPrinter));
// 將當前消費者的sequence實例傳給ringBuffer
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
// 啟動獨立線程獲取和消費事件
executorService.submit(batchEventProcessor);
}
@PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
// 創建多個消費者,並在獨立線程中獲取和消費事件
for (int i=0;i<CONSUMER_NUM;i++) {
addProcessor(executorService);
}
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
-
上述代碼和前面的OneConsumerServiceImpl相比差別不大,主要是創建了多個BatchEventProcessor實例,然后分別在線程池中提交;
-
驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testLowLevelOperateService的第三個參數是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示預期的被消費消息數為300:
@Autowired
@Qualifier("multiConsumer")
LowLevelOperateService multiConsumer;
@Test
public void testMultiConsumer() throws InterruptedException {
log.info("start testMultiConsumer");
testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
}
- 執行單元測試,如下圖所示,一共消費了300個事件,並且三個消費者在不動線程:
100個事件,三個消費者共同消費這個100個事件
-
本篇的最后一個實戰是發布100個事件,然后讓三個消費者共同消費100個(例如A消費33個,B消費33個,C消費34個);
-
前面用到的BatchEventProcessor是用來獨立消費的,不適合多個消費者共同消費,這種多個消費共同消費的場景需要借助WorkerPool來完成,這個名字還是很形象的:一個池子里面有很多個工作者,把任務放入這個池子,工作者們每人處理一部分,大家合力將任務完成;
-
傳入WorkerPool的消費者需要實現WorkHandler接口,於是新增一個實現類:
package com.bolingcavalry.service;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {
public StringWorkHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(StringEvent event) throws Exception {
log.info("work handler event : {}", event);
// 這里延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
- 新增服務類,實現共同消費的邏輯,有幾處要注意的地方稍后會提到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {
private RingBuffer<StringEvent> ringBuffer;
private StringEventProducer producer;
/**
* 統計消息總數
*/
private final AtomicLong eventCount = new AtomicLong();
@PostConstruct
private void init() {
ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);
ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);
StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];
// 創建多個StringWorkHandler實例,放入一個數組中
for (int i=0;i < CONSUMER_NUM;i++) {
handlers[i] = new StringWorkHandler(o -> {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
});
}
// 創建WorkerPool實例,將StringWorkHandler實例的數組傳進去,代表共同消費者的數量
WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);
// 這一句很重要,去掉就會出現重復消費同一個事件的問題
ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
workerPool.start(executorService);
// 生產者
producer = new StringEventProducer(ringBuffer);
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代碼中,要注意的有以下兩處:
-
StringWorkHandler數組傳入給WorkerPool后,每個StringWorkHandler實例都放入一個新的WorkProcessor實例,WorkProcessor實現了Runnable接口,在執行workerPool.start時,會將WorkProcessor提交到線程池中;
-
和前面的獨立消費相比,共同消費最大的特點在於只調用了一次ringBuffer.addGatingSequences方法,也就是說三個消費者共用一個sequence實例;
- 驗證方法依舊是單元測試,在剛才的LowLeverOperateServiceImplTest.java中增加代碼即可,注意testWorkerPoolConsumer的第三個參數是EVENT_COUNT,表示預期的被消費消息數為100:
@Autowired
@Qualifier("workerPoolConsumer")
LowLevelOperateService workerPoolConsumer;
@Test
public void testWorkerPoolConsumer() throws InterruptedException {
log.info("start testWorkerPoolConsumer");
testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
}
- 執行單元測試如下圖所示,三個消費者一共消費100個事件,且三個消費者在不同線程:
- 至此,咱們在不用Disruptor類的前提下完成了三種常見場景的消息生產消費,相信您對Disruptor的底層實現也有了深刻認識,今后不論是使用還是優化Disruptor,一定可以更加得心應手;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos