今天用一個停車場問題來加深對Disruptor的理解。一個有關汽車進入停車場的問題。當汽車進入停車場時,系統首先會記錄汽車信息。同時也會發送消息到其他系統處理相關業務,最后發送短信通知車主收費開始。看了很多文章,里面的代碼都是大同小異的,可能代碼真的是很經典。以下代碼也是來源網絡,只是自己手動敲的,加了一些注釋。
代碼包含以下內容:
1) 事件對象Event
2)三個消費者Handler
3)一個生產者Processer
4)執行Main方法
Event類:汽車信息
-
public class MyInParkingDataEvent {
-
-
private String carLicense; // 車牌號
-
-
public String getCarLicense() {
-
return carLicense;
-
}
-
-
public void setCarLicense(String carLicense) {
-
this.carLicense = carLicense;
-
}
-
-
}
-
import com.lmax.disruptor.EventHandler;
-
import com.lmax.disruptor.WorkHandler;
-
-
/**
-
* Handler 第一個消費者,負責保存進場汽車的信息
-
*
-
*/
-
public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent> , WorkHandler<MyInParkingDataEvent>{
-
-
-
public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
-
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
-
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
-
System.out.println(String.format( "Thread Id %s 保存 %s 到數據庫中 ....", threadId, carLicense));
-
}
-
-
-
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
-
throws Exception {
-
this.onEvent(myInParkingDataEvent);
-
}
-
-
}
-
import com.lmax.disruptor.EventHandler;
-
-
/**
-
* 第二個消費者,負責發送通知告知工作人員(Kafka是一種高吞吐量的分布式發布訂閱消息系統)
-
*/
-
public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent>{
-
-
-
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
-
throws Exception {
-
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
-
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
-
System.out.println(String.format( "Thread Id %s 發送 %s 進入停車場信息給 kafka系統...", threadId, carLicense));
-
}
-
-
}
-
import com.lmax.disruptor.EventHandler;
-
-
/**
-
* 第三個消費者,sms短信服務,告知司機你已經進入停車場,計費開始。
-
*/
-
public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent>{
-
-
-
public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
-
throws Exception {
-
long threadId = Thread.currentThread().getId(); // 獲取當前線程id
-
String carLicense = myInParkingDataEvent.getCarLicense(); // 獲取車牌號
-
System.out.println(String.format( "Thread Id %s 給 %s 的車主發送一條短信,並告知他計費開始了 ....", threadId, carLicense));
-
}
-
-
}
-
import java.util.concurrent.CountDownLatch;
-
import com.lmax.disruptor.EventTranslator;
-
import com.lmax.disruptor.dsl.Disruptor;
-
-
/**
-
* 生產者,進入停車場的車輛
-
*/
-
public class MyInParkingDataEventPublisher implements Runnable{
-
-
private CountDownLatch countDownLatch; // 用於監聽初始化操作,等初始化執行完畢后,通知主線程繼續工作
-
private Disruptor<MyInParkingDataEvent> disruptor;
-
private static final Integer NUM = 1; // 1,10,100,1000
-
-
public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
-
Disruptor<MyInParkingDataEvent> disruptor) {
-
this.countDownLatch = countDownLatch;
-
this.disruptor = disruptor;
-
}
-
-
-
public void run() {
-
MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
-
try {
-
for(int i = 0; i < NUM; i ++) {
-
disruptor.publishEvent(eventTranslator);
-
Thread.sleep( 1000); // 假設一秒鍾進一輛車
-
}
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} finally {
-
countDownLatch.countDown(); // 執行完畢后通知 await()方法
-
System.out.println(NUM + "輛車已經全部進入進入停車場!");
-
}
-
}
-
-
}
-
-
class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {
-
-
-
public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
-
this.generateData(myInParkingDataEvent);
-
}
-
-
private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
-
myInParkingDataEvent.setCarLicense( "車牌號: 鄂A-" + (int)(Math.random() * 100000)); // 隨機生成一個車牌號
-
System.out.println( "Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
-
return myInParkingDataEvent;
-
}
-
-
}
-
import com.lmax.disruptor.EventFactory;
-
import com.lmax.disruptor.YieldingWaitStrategy;
-
import com.lmax.disruptor.dsl.Disruptor;
-
import com.lmax.disruptor.dsl.EventHandlerGroup;
-
import com.lmax.disruptor.dsl.ProducerType;
-
-
/**
-
* 執行的Main方法 ,
-
* 一個生產者(汽車進入停車場);
-
* 三個消費者(一個記錄汽車信息,一個發送消息給系統,一個發送消息告知司機)
-
* 前兩個消費者同步執行,都有結果了再執行第三個消費者
-
*/
-
public class MyInParkingDataEventMain {
-
-
public static void main(String[] args) {
-
long beginTime=System.currentTimeMillis();
-
int bufferSize = 2048; // 2的N次方
-
try {
-
// 創建線程池,負責處理Disruptor的四個消費者
-
ExecutorService executor = Executors.newFixedThreadPool( 4);
-
-
// 初始化一個 Disruptor
-
Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
-
-
public MyInParkingDataEvent newInstance() {
-
return new MyInParkingDataEvent(); // Event 初始化工廠
-
}
-
}, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
-
-
// 使用disruptor創建消費者組 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
-
EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
-
new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());
-
-
// 當上面兩個消費者處理結束后在消耗 smsHandler
-
MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
-
handlerGroup.then(myParkingDataSmsHandler);
-
-
// 啟動Disruptor
-
disruptor.start();
-
-
CountDownLatch countDownLatch = new CountDownLatch(1); // 一個生產者線程准備好了就可以通知主線程繼續工作了
-
// 生產者生成數據
-
executor.submit( new MyInParkingDataEventPublisher(countDownLatch, disruptor));
-
countDownLatch.await(); // 等待生產者結束
-
-
disruptor.shutdown();
-
executor.shutdown();
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
-
System.out.println( "總耗時:"+(System.currentTimeMillis()-beginTime));
-
}
-
-
}
--------------------- 本文來自 ITDragon龍 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/qq_19558705/article/details/77247912?utm_source=copy