歡迎訪問我的GitHub
https://github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
關於disruptor
disruptor是LMAX公司開發的一個高性能隊列,其作用和阻塞隊列(BlockingQueue)類似,都是在相同進程內、不同線程間傳遞數據(例如消息、事件),另外disruptor也有自己的一些特色:
- 以廣播的形式發布事件,並且消費者之間存在依賴關系;
- 為事件提前分配內存;
- 無鎖算法;
關於Ring Buffer(環形隊列)
- 提到disruptor一般都會提到Ring Buffer(環形隊列)是它的特點,實際上從3.0版本之后,環形隊列只是用來存儲和更新事件數據,在其他更復雜的場景下,用戶可以通過自定義操作將其替換掉;
- 簡單的說,disruptor官方認為Ring Buffe是核心概念(Core Concepts),但不是特色( key features)
本篇概覽
作為《disruptor筆記》系列的開篇,本篇有兩個任務:
-
創建名為disruptor-tutorials的gradle工程,作為整個系列的父工程,該系列所有代碼都是這個父工程下的module;
-
在disruptor-tutorials下面新建名為basic-event的module,這是個springboot應用,作用是使用disruptor的基本功能:一個線程發布事件,另一個線程消費事件,也就是對環形隊列最基本的操作,如下圖:
用disruptor實現消息的發布和消費的套路
- 咱們提前小結用disruptor實現消息的發布和消費的套路,后面的開發按部就班即可,括號中是本篇對應的java類:
- 事件的定義:一個普通的bean(StringEvent.java)
- 事件工廠:定義如何生產事件的內存實例,這個實例剛從內存中創建,還沒有任何業務數據(StringEventFactory.java)
- 事件處理:封裝了消費單個事件的具體邏輯(StringEventHandler.java)
- 事件生產者:定義了如何將業務數據設置到還沒有業務數據的事件中,就是工廠創建出來的那種(StringEventProducer.java)
- 初始化邏輯:創建和啟動disruptor對象,將事件工廠傳給disruptor,創建事件生產者和事件處理對象,並分別與disruptor對象關聯;
- 業務邏輯:也就是調用事件生產者的onData方法發布事件,本文的做法是在單元測試類中發布事件,然后檢查消費的事件數和生產的事件數是否一致;
7
環境信息
《Disruptor筆記》系列涉及的環境信息如下:
- 操作系統:64位win10
- JDK:1.8.0_281
- IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)
- gradle:6.7.1
- springboot:2.3.8.RELEASE
- disruptor:3.4.4
源碼下載
- 本篇實戰中的完整源碼可在GitHub下載到,地址和鏈接信息如下表所示(https://github.com/zq2599/blog_demos):
名稱 | 鏈接 | 備注 |
---|---|---|
項目主頁 | https://github.com/zq2599/blog_demos | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blog_demos.git | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
- 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:
創建父工程
- 因為是系列文章,所以這里做個父工程來管理所有依賴庫和插件,新建名為disruptor-tutorials的gradle工程,build.gradle如下:
import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter
buildscript {
repositories {
maven {
url 'https://plugins.gradle.org/m2/'
}
// 如果有私服就在此配置,如果沒有請注釋掉
maven {
url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
}
// 阿里雲
maven {
url 'http://maven.aliyun.com/nexus/content/groups/public/'
}
mavenCentral()
}
ext {
// 項目版本
projectVersion = '1.0-SNAPSHOT'
// sprignboot版本 https://github.com/spring-projects/spring-boot/releases
springBootVersion = '2.3.8.RELEASE'
}
}
plugins {
id 'java'
id 'java-library'
id 'org.springframework.boot' version "${springBootVersion}" apply false
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'net.nemerosa.versioning' version '2.14.0'
id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false
id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates
}
// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find
// a buildScan property to change. This avoids that problem.
if (hasProperty('buildScan')) {
buildScan {
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
termsOfServiceAgree = 'yes'
}
}
wrapper {
gradleVersion = '6.7.1'
}
def buildTimeAndDate = OffsetDateTime.now()
ext {
// 構建時取得當前日期和時間
buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)
buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)
buildRevision = versioning.info.commit
}
allprojects {
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'io.franzbecker.gradle-lombok'
compileJava {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
options.encoding = 'UTF-8'
}
compileJava.options*.compilerArgs = [
'-Xlint:all', '-Xlint:-processing'
]
// Copy LICENSE
tasks.withType(Jar) {
from(project.rootDir) {
include 'LICENSE'
into 'META-INF'
}
}
// 寫入到MANIFEST.MF中的內容
jar {
manifest {
attributes(
'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
'Built-By': 'travis',
'Build-Date': buildDate,
'Build-Time': buildTime,
'Built-OS': "${System.properties['os.name']}",
'Build-Revision': buildRevision,
'Specification-Title': project.name,
'Specification-Version': projectVersion,
'Specification-Vendor': 'Will Zhao',
'Implementation-Title': project.name,
'Implementation-Version': projectVersion,
'Implementation-Vendor': 'Will Zhao'
)
}
}
repositories {
mavenCentral()
// 如果有私服就在此配置,如果沒有請注釋掉
maven {
url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
}
// 阿里雲
maven {
url 'http://maven.aliyun.com/nexus/content/groups/public/'
}
jcenter()
}
buildscript {
repositories {
maven { url 'https://plugins.gradle.org/m2/' }
}
}
}
allprojects { project ->
buildscript {
dependencyManagement {
imports {
mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"
mavenBom "org.junit:junit-bom:5.7.0"
}
dependencies {
dependency 'org.projectlombok:lombok:1.16.16'
dependency 'org.apache.commons:commons-lang3:3.11'
dependency 'commons-collections:commons-collections:3.2.2'
dependency 'com.lmax:disruptor:3.4.4'
}
}
ext {
springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']
}
}
}
group = 'bolingcavalry'
version = projectVersion
- 接下來編寫消息發布和消費的代碼;
新建module
- 前面新建了整個《Disruptor筆記》系列的父工程,現在新建名為basic-event的module,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
}
dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'com.lmax:disruptor'
testImplementation('org.springframework.boot:spring-boot-starter-test')
}
- 這個module是個springboot應用,啟動類如下:
package com.bolingcavalry;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BasicEventApplication {
public static void main(String[] args) {
SpringApplication.run(BasicEventApplication.class, args);
}
}
- 接下來按照前面總結的套路行事;
事件的定義
- 事件定義類StringEvent.java,可見就是個普普通通的java bean:
package com.bolingcavalry.service;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@ToString
@NoArgsConstructor
public class StringEvent {
private String value;
}
事件工廠
- 事件工廠的作用,是讓disruptor知道如何在內存中創建一個事件實例,不過,該實例和業務還沒有任何關系,本篇的事件工廠如下,可見就是創建StringEvent實例,並沒有特別的操作:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventFactory;
public class StringEventFactory implements EventFactory<StringEvent> {
@Override
public StringEvent newInstance() {
return new StringEvent();
}
}
事件處理
- 時間處理類的作用是定義一個事件如何被消費,里面是具體的業務代碼,每個事件都會執行此類的onEvent方法;
- 本篇的事件處理類做的事情是打印事件內容,再用sleep消耗100毫秒,然后再調用外部傳入的Consumer實現類的accept方法:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;
@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {
public StringEventHandler(Consumer<?> consumer) {
this.consumer = consumer;
}
// 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
private Consumer<?> consumer;
@Override
public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這里延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 如果外部傳入了consumer,就要執行一次accept方法
if (null!=consumer) {
consumer.accept(null);
}
}
}
事件生產者
- 每當業務要生產一個事件時,就會調用事件生產者的onData方法,將業務數據作為入參傳進來,此時生產者會從環形隊列中取出一個事件實例(就是前面的事件工廠創建的),把業務數據傳給這個實例,再把實例正式發布出去:
package com.bolingcavalry.service;
import com.lmax.disruptor.RingBuffer;
public class StringEventProducer {
// 存儲數據的環形隊列
private final RingBuffer<StringEvent> ringBuffer;
public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(String content) {
// ringBuffer是個隊列,其next方法返回的是下最后一條記錄之后的位置,這是個可用位置
long sequence = ringBuffer.next();
try {
// sequence位置取出的事件是空事件
StringEvent stringEvent = ringBuffer.get(sequence);
// 空事件添加業務信息
stringEvent.setValue(content);
} finally {
// 發布
ringBuffer.publish(sequence);
}
}
}
初始化邏輯
- 開發一個spring bean,這里面有disruptor的初始化邏輯,有幾處需要關注的地方稍后會說到:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.*;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Service
@Slf4j
public class BasicEventServiceImpl implements BasicEventService {
private static final int BUFFER_SIZE = 16;
private Disruptor<StringEvent> disruptor;
private StringEventProducer producer;
/**
* 統計消息總數
*/
private final AtomicLong eventCount = new AtomicLong();
@PostConstruct
private void init() {
Executor executor = Executors.newCachedThreadPool();
// 實例化
disruptor = new Disruptor<>(new StringEventFactory(),
BUFFER_SIZE,
new CustomizableThreadFactory("event-handler-"));
// 准備一個匿名類,傳給disruptor的事件處理類,
// 這樣每次處理事件時,都會將已經處理事件的總數打印出來
Consumer<?> eventCountPrinter = new Consumer<Object>() {
@Override
public void accept(Object o) {
long count = eventCount.incrementAndGet();
log.info("receive [{}] event", count);
}
};
// 指定處理類
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
// 啟動
disruptor.start();
// 生產者
producer = new StringEventProducer(disruptor.getRingBuffer());
}
@Override
public void publish(String value) {
producer.onData(value);
}
@Override
public long eventCount() {
return eventCount.get();
}
}
- 上述代碼有以下幾點需要注意:
- publish方法給外部調用,用於發布一個事件;
- eventCountPrinter是Consumer的實現類,被傳給了StringEventHandler,這樣StringEventHandler消費消息的時候,eventCount就會增加,也就記下了已經處理的事件總數;
- Disruptor的構造方法中,BUFFER_SIZE表示環形隊列的大小,這里故意設置為16,這樣可以輕易的將環形隊列填滿,此時再發布事件會不會導致環形隊列上的數據被覆蓋呢?稍后咱們可以測一下;
- 記得調用start方法;
web接口
再寫一個web接口類,這樣就可以通過瀏覽器驗證前面的代碼了:
package com.bolingcavalry.controller;
import com.bolingcavalry.service.BasicEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
@RestController
public class BasicEventController {
@Autowired
BasicEventService basicEventService;
@RequestMapping(value = "/{value}", method = RequestMethod.GET)
public String publish(@PathVariable("value") String value) {
basicEventService.publish(value);
return "success, " + LocalDateTime.now().toString();
}
}
業務邏輯
-
現在生產事件的接口已准備好,消費事件的代碼也完成了,接下來就是如何調用生產事件的接口來驗證生產和消費是否正常,這里我選擇使用單元測試來驗證;
-
在disruptor-tutorials\basic-event\src\test\java目錄下新增測試類BasicEventServiceImplTest.java,測試邏輯是發布了一百個事件,再驗證消費事件的數量是否也等於一百:
package com.bolingcavalry.service.impl;
import com.bolingcavalry.service.BasicEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class BasicEventServiceImplTest {
@Autowired
BasicEventService basicEventService;
@Test
public void publish() throws InterruptedException {
log.info("start publich test");
int count = 100;
for(int i=0;i<count;i++) {
log.info("publich {}", i);
basicEventService.publish(String.valueOf(i));
}
// 異步消費,因此需要延時等待
Thread.sleep(1000);
// 消費的事件總數應該等於發布的事件數
assertEquals(count, basicEventService.eventCount());
}
}
- 編碼完成后,點擊下圖紅框1中的按鈕運行單元測試,結果如紅框2所示,測試通過:
- 聰明的您可能會產生疑惑:環形數組大小只有16,消費一個事件耗時很長(100毫秒),那么環形數組中的事件還未消費完時如果還在發布事件會發生什么呢?新事件會覆蓋未消費的事件嗎?顯然不會,因為測試結果是通過的,那么disruptor是怎么做到的呢?其實從日志上可以看出一些端倪,下圖是測試過程中日志的末尾部分,紅框顯示,一直到測試快結束,發布事件的線程還在執行發布操作,這就意味着:如果消費速度過慢導致環形隊列里放不進新的事件時,發布事件的線程就會阻塞,知道環形隊列中可以放入事件為止:
- 至此,disrupor的入門操作就完成了,咱們已經體驗過基本的發布和消費功能,接下來的文章咱們會繼續深入學習其他更強大的功能;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos