系列文章目錄
前言
EventBus 是 Guava 的事件處理機制,是觀察者模式(生產/消費模型)的一種實現。
觀察者模式在我們日常開發中使用非常廣泛,例如在訂單系統中,訂單狀態或者物流信息的變更會向用戶發送APP推送、短信、通知賣家、買家等等;審批系統中,審批單的流程流轉會通知發起審批用戶、審批的領導等等。
Observer模式也是 JDK 中自帶就支持的,其在 1.0 版本就已經存在 Observer,不過隨着 Java 版本的飛速升級,其使用方式一直沒有變化,許多程序庫提供了更加簡單的實現,例如 Guava EventBus、RxJava、EventBus 等
一、為什么要用 Observer模式以及 EventBus 優點 ?
EventBus 優點
- 相比 Observer 編程簡單方便
- 通過自定義參數可實現同步、異步操作以及異常處理
- 單進程使用,無網絡影響
缺點
- 只能單進程使用
- 項目異常重啟或者退出不保證消息持久化
如果需要分布式使用還是需要使用 MQ
二、EventBus 使用步驟
1. 引入庫
Gradle
compile group: 'com.google.guava', name: 'guava', version: '29.0-jre'
Maven
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
引入依賴后,這里我們主要使用 com.google.common.eventbus.EventBus
類進行操作,其提供了 register
、unregister
、post
來進行注冊訂閱、取消訂閱和發布消息
public void register(Object object);
public void unregister(Object object);
public void post(Object event);
2. 同步使用
1. 首先創建一個 EventBus
EventBus eventBus = new EventBus();
2. 創建一個訂閱者
在 Guava EventBus 中,是根據參數類型進行訂閱,每個訂閱的方法只能由一個參數,同時需要使用 @Subscribe
標識
class EventListener {
/**
* 監聽 Integer 類型的消息
*/
@Subscribe
public void listenInteger(Integer param) {
System.out.println("EventListener#listenInteger ->" + param);
}
/**
* 監聽 String 類型的消息
*/
@Subscribe
public void listenString(String param) {
System.out.println("EventListener#listenString ->" + param);
}
}
3. 注冊到 EventBus 上並發布消息
EventBus eventBus = new EventBus();
eventBus.register(new EventListener());
eventBus.post(1);
eventBus.post(2);
eventBus.post("3");
運行結果為
EventListener#listenInteger ->1
EventListener#listenInteger ->2
EventListener#listenString ->3
根據需要我們可以創建多個訂閱者完成訂閱信息,同時如果一個類型存在多個訂閱者,則所有訂閱方法都會執行
為什么說這么做是同步的呢?
Guava Event 實際上是使用線程池來處理訂閱消息的,通過源碼可以看出,當我們使用默認的構造方法創建 EventBus
的時候,其中 executor
為 MoreExecutors.directExecutor()
,其具體實現中直接調用的 Runnable#run
方法,使其仍然在同一個線程中執行,所以默認操作仍然是同步的,這種處理方法也有適用的地方,這樣既可以解耦也可以讓方法在同一個線程中執行獲取同線程中的便利,比如事務的處理
EventBus 部分源碼
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers;
private final Dispatcher dispatcher;
public EventBus() {
this("default");
}
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), EventBus.LoggingHandler.INSTANCE);
}
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {
this.subscribers = new SubscriberRegistry(this);
this.identifier = (String)Preconditions.checkNotNull(identifier);
this.executor = (Executor)Preconditions.checkNotNull(executor);
this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);
this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);
}
}
DirectExecutor 部分源碼
enum DirectExecutor implements Executor {
INSTANCE;
private DirectExecutor() {
}
public void execute(Runnable command) {
command.run();
}
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
3. 異步使用
通過上面的源碼,可以看出只要將構造方法中的 executor 換成一個線程池實現即可, 同時 Guava EventBus 為了簡化操作,提供了一個簡化的方案即 AsyncEventBus
EventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool());
這樣即可實現異步使用
AsyncEventBus 源碼
public class AsyncEventBus extends EventBus {
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
}
4. 異常處理
如果處理時發生異常應該如何處理? 在看源碼中,無論是 EventBus
還是 AsyncEventBus
都可傳入自定義的 SubscriberExceptionHandler
該 handler 當出現異常時會被調用,我可可以從參數 exception
獲取異常信息,從 context
中獲取消息信息進行特定的處理
其接口聲明為
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}
總結
在上面的基礎上,我們可以定義一些消息類型來實現不同消息的監聽和處理,通過實現 SubscriberExceptionHandler
來處理異常的情況,無論時同步還是異步都能游刃有余
參考
- https://github.com/google/guava
- https://github.com/greenrobot/EventBus
- https://github.com/ReactiveX/RxJava