Disruptor
Disruptor是英國外匯交易公司LMAX開發的一個高性能隊列,研發的初衷是解決內存隊列的延遲問題(在性能測試中發現竟然與I/O操作處於同樣的數量級)。
Java內置隊列的問題
介紹Disruptor之前,我們先來看一看常用的線程安全的內置隊列有什么問題。Java的內置隊列如下表所示。
隊列的底層一般分成三種:數組、鏈表和堆。其中,堆一般情況下是為了實現帶有優先級特性的隊列,暫且不考慮。
從數組和鏈表兩種數據結構來看,基於數組線程安全的隊列,比較典型的是ArrayBlockingQueue,它主要通過加鎖的方式來保證線程安全;基於鏈表的線程安全隊列分成LinkedBlockingQueue和ConcurrentLinkedQueue兩大類,前者也通過鎖的方式來實現線程安全,而后者以及上面表格中的LinkedTransferQueue都是通過原子變量compare and swap(以下簡稱“CAS”)這種不加鎖的方式來實現的。
但是對 volatile類型的變量進行 CAS 操作,存在偽共享問題,下面介紹一下
偽共享
CPU的緩存系統是以緩存行(cache line)為單位存儲的,一般的大小為64bytes。在多線程程序的執行過程中,存在着一種情況,多個需要頻繁修改的變量存在同一個緩存行當中。
假設:有兩個線程分別訪問並修改X和Y這兩個變量,X和Y恰好在同一個緩存行上,這兩個線程分別在不同的CPU上執行。那么每個CPU分別更新好X和Y時將緩存行刷入內存時,發現有別的修改了各自緩存行內的數據,這時緩存行會失效,從L3中重新獲取。這樣的話,程序執行效率明顯下降。為了減少這種情況的發生,其實就是避免X和Y在同一個緩存行中,可以主動添加一些無關變量將緩存行填充滿,比如在X對象中添加一些變量,讓它有64 Byte那么大,正好占滿一個緩存行。
偽共享問題 的解決方案
簡單的說,就是 以空間換時間: 使用占位字節,將變量的所在的 緩沖行 塞滿。
disruptor 無鎖框架就是這么干的。
Disruptor框架是如何解決偽共享問題的?
在Disruptor中有一個重要的類Sequence,該類包裝了一個volatile修飾的long類型數據value,無論是Disruptor中的基於數組實現的緩沖區RingBuffer,還是生產者,消費者,都有各自獨立的Sequence,RingBuffer緩沖區中,Sequence標示着寫入進度,例如每次生產者要寫入數據進緩沖區時,都要調用RingBuffer.next()來獲得下一個可使用的相對位置。對於生產者和消費者來說,Sequence標示着它們的事件序號。
例子
/**
* 停車場問題.
* 1) 事件對象Event
* 2)三個消費者Handler
* 3)一個生產者Processer
* 4)執行Main方法
*/
public class DisruptorCar {
private static final Integer NUM = 1; // 1,10,100,1000
/**
* 測試入口 ,
* 一個生產者(汽車進入停車場);
* 三個消費者(一個記錄汽車信息,一個發送消息給系統,一個發送消息告知司機)
* 前兩個消費者同步執行,都有結果了再執行第三個消費者
*/
@Test
public void main() throws InterruptedException {
long beginTime = System.currentTimeMillis();
int bufferSize = 2048; // 2的N次方
try {
// 創建線程池,負責處理Disruptor的四個消費者
ExecutorService executor = Executors.newFixedThreadPool(4);
// 初始化一個 Disruptor
Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
@Override
public MyInParkingDataEvent newInstance() {
return new MyInParkingDataEvent(); // Event 初始化工廠
}
}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 使用disruptor創建消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
// 當上面兩個消費者處理結束后在消耗 smsHandler
MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
handlerGroup.then(myParkingDataSmsHandler);
// 啟動Disruptor
disruptor.start();
CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程准備好了就可以通知主線程繼續工作了
// 生產者生成數據
executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
countDownLatch.await(); // 等待生產者結束
disruptor.shutdown();
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("總耗時:" + (System.currentTimeMillis() - beginTime));
}
public class MyInParkingDataEvent {
private String carLicense; // 車牌號
public String getCarLicense() {
return carLicense;
}
public void setCarLicense(String carLicense) {
this.carLicense = carLicense;
}
}
/**
* Handler 第一個消費者,負責保存進場汽車的信息
*/
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
System.out.println(String.format("Thread Id %s 保存 %s 到數據庫中 ....", threadId, carLicense));
}
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
this.onEvent(myInParkingDataEvent);
}
}
/**
* 第二個消費者,負責發送通知告知工作人員(Kafka是一種高吞吐量的分布式發布訂閱消息系統)
*/
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
System.out.println(String.format("Thread Id %s 發送 %s 進入停車場信息給 kafka系統...", threadId, carLicense));
}
}
/**
* 第三個消費者,sms短信服務,告知司機你已經進入停車場,計費開始。
*/
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {
@Override
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
System.out.println(String.format("Thread Id %s 給 %s 的車主發送一條短信,並告知他計費開始了 ....", threadId, carLicense));
}
}
/**
* 生產者,進入停車場的車輛
*/
public class MyInParkingDataEventPublisher implements Runnable {
private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢后,通知主線程繼續工作
private Disruptor<MyInParkingDataEvent> disruptor;
public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
Disruptor<MyInParkingDataEvent> disruptor) {
this.countDownLatch = countDownLatch;
this.disruptor = disruptor;
}
@Override
public void run() {
MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
try {
for (int i = 0; i < NUM; i++) {
disruptor.publishEvent(eventTranslator);
Thread.sleep(1000); // 假設一秒鍾進一輛車
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown(); // 執行完畢后通知 await()方法
System.out.println(NUM + "輛車已經全部進入進入停車場!");
}
}
}
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
@Override
public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
this.generateData(myInParkingDataEvent);
}
private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
myInParkingDataEvent.setCarLicense("車牌號: 鄂A-" + (int) (Math.random() * 100000)); // 隨機生成一個車牌號
System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
return myInParkingDataEvent;
}
}
}