Disruptor 系列(二)使用場景
今天用一個訂單問題來加深對 Disruptor 的理解。當系統中有訂單產生時,系統首先會記錄訂單信息。同時也會發送消息到其他系統處理相關業務,最后才是訂單的處理。
代碼包含以下內容:
1) 事件對象 Event
2)三個消費者 Handler
3)一個生產者 Producer
4)執行 Main 方法
一、訂單處理系統代碼
(1) Event
public class Trade {
private String id;//ID
private String name;
private double price;//金額
private AtomicInteger count = new AtomicInteger(0);
// 省略getter/setter
}
(2) Handler 類
一個負責存儲訂單信息,一個負責發送 kafka 信息到其他系統中,最后一個負責處理訂單信息。
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* 第一個 Handler1,存儲到數據庫中
*/
public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String id = event.getId(); // 獲取訂單號
System.out.println(String.format("%s:Thread Id %s 訂單信息保存 %s 到數據庫中 ....",
this.getClass().getSimpleName(), threadId, id));
}
}
import com.lmax.disruptor.EventHandler;
/**
* 第二個 Handler2,訂單信息發送到其它系統中
*/
public class Handler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String id = event.getId(); // 獲取訂單號
System.out.println(String.format("%s:Thread Id %s 訂單信息 %s 發送到 karaf 系統中 ....",
this.getClass().getSimpleName(), threadId, id));
}
}
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* 第三個 Handler2,處理訂單信息
*/
public class Handler3 implements EventHandler<Trade>, WorkHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
onEvent(event);
}
@Override
public void onEvent(Trade event) throws Exception {
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
String id = event.getId(); // 獲取訂單號
System.out.println(String.format("%s:Thread Id %s 訂單信息 %s 處理中 ....",
this.getClass().getSimpleName(), threadId, id));
}
}
(3) Producer 類
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class TradePublisher implements Runnable {
Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int LOOP = 1; // 模擬百萬次交易的發生
public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
this.disruptor=disruptor;
this.latch=latch;
}
@Override
public void run() {
TradeEventTranslator tradeTransloator = new TradeEventTranslator();
for(int i = 0; i < LOOP; i++) {
disruptor.publishEvent(tradeTransloator);
}
latch.countDown();
}
}
class TradeEventTranslator implements EventTranslator<Trade>{
@Override
public void translateTo(Trade event, long sequence) {
event.setId(UUID.randomUUID().toString());
}
}
(4) 執行的 Main 方法
package com.github.binarylei.disruptor.demo3;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
public static void main(String[] args) throws InterruptedException {
long beginTime=System.currentTimeMillis();
int bufferSize=1024;
ExecutorService executor=Executors.newFixedThreadPool(8);
Disruptor<Trade> disruptor = new Disruptor<>(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
//菱形操作
//使用disruptor創建消費者組C1,C2
EventHandlerGroup<Trade> handlerGroup =
disruptor.handleEventsWith(new Handler1(), new Handler2());
//聲明在C1,C2完事之后執行JMS消息發送操作 也就是流程走到C3
handlerGroup.then(new Handler3());
disruptor.start();//啟動
CountDownLatch latch=new CountDownLatch(1);
//生產者准備
executor.submit(new TradePublisher(latch, disruptor));
latch.await();//等待生產者完事.
disruptor.shutdown();
executor.shutdown();
System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));
}
}
測試結果如下:
Handler1:Thread Id 10 訂單信息保存 a097c77d-08f1-430a-8342-2143963f268f 到數據庫中 ....
Handler2:Thread Id 11 訂單信息 a097c77d-08f1-430a-8342-2143963f268f 發送到 karaf 系統中 ....
Handler3:Thread Id 13 訂單信息 a097c77d-08f1-430a-8342-2143963f268f 處理中 ....
總耗時:1631
可以看到 Handler3 在 Handler1 和 Handler2 執行完成后才執行。
二、Disruptor DSL
雖然 disruptor 模式使用起來很簡單,但是建立多個消費者以及它們之間的依賴關系需要的樣板代碼太多了。為了能快速又簡單適用於99%的場景,我為 Disruptor 模式准備了一個簡單的領域特定語言(DSL),定義了消費順序。更多Disruptor場景使用
在講解 Disruptor DSL 之前先看一下多個消費者不重復消費的問題。
2.1 多個消費者不重復消費
默認一個消費者一個線程,如果想要實現 C3 多個消費者共同不重復消費數據,可以使用 handlerGroup.thenHandleEventsWithWorkerPool(customers)
//使用disruptor創建消費者組C1, C2
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
// 多個消費者不重復消費
Handler3[] customers = new Handler3[]{new Handler3(), new Handler3(), new Handler3()};
handlerGroup.thenHandleEventsWithWorkerPool(customers);
2.2 消費者的“四邊形模式”
在這種情況下,只要生產者(P1)將元素放到ring buffer上,消費者C1和C2就可以並行處理這些元素。但是消費者C3必須一直等到C1和C2處理完之后,才可以處理。在現實世界中的對應的案例就像:在處理實際的業務邏輯(C3)之前,需要校驗數據(C1),以及將數據寫入磁盤(C2)。
//1. 使用disruptor創建消費者組C1,C2
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
//2. 聲明在C1,C2完事之后執行JMS消息發送操作 也就是流程走到C3
handlerGroup.then(new Handler3());
2.3 消費者的“順序執行模式”
disruptor.handleEventsWith(new Handler1()).
handleEventsWith(new Handler2()).
handleEventsWith(new Handler3());
2.4 消費者的“六邊形模式”
我們甚至可以在一個更復雜的六邊形模式中構建一個並行消費者鏈:
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h2);
disruptor.after(h1).handleEventsWith(h4);
disruptor.after(h2).handleEventsWith(h5);
disruptor.after(h4, h5).handleEventsWith(h3);
每天用心記錄一點點。內容也許不重要,但習慣很重要!