我們在前面分析客戶端引用的時候會看到如下這段代碼:
// 產生開始調用事件
if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
EventBus.post(new ClientStartInvokeEvent(request));
}
這里用EventBus調用了一下post方法之后就什么也沒做了,就方法名來看是發送了一個post請求,也不知道發給誰,到底有什么用。
所以這一節我們來分析一下EventBus這個類的作用。
首先我們來看一下這個類的方法
從EventBus的方法中我們是不是應該想到了這是使用了什么設計模式?
沒錯,這里用到的是訂閱發布模式(Subscribe/Publish)。訂閱發布模式定義了一種一對多的依賴關系,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有訂閱者對象,使它們能夠自動更新自己的狀態。
我們先分析源碼,分析完源碼之后再來總結一下。
EventBus發送事件
根據上面的示例,我們先看EventBus#post是里面是怎么做的。
EventBus#post
private final static ConcurrentMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP = new ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>>();
public static void post(final Event event) {
//是否開啟總線
if (!isEnable()) {
return;
}
//根據傳入得event獲取到相應的Subscriber
CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
if (CommonUtils.isNotEmpty(subscribers)) {
for (final Subscriber subscriber : subscribers) {
//如果事件訂閱者是同步的,那么直接調用
if (subscriber.isSync()) {
handleEvent(subscriber, event);
} else { // 異步
final RpcInternalContext context = RpcInternalContext.peekContext();
//使用線程池啟動一個線程一部執行任務
final ThreadPoolExecutor asyncThreadPool = AsyncRuntime.getAsyncThreadPool();
try {
asyncThreadPool.execute(
new Runnable() {
@Override
public void run() {
try {
RpcInternalContext.setContext(context);
//調用訂閱者的event事件
handleEvent(subscriber, event);
} finally {
RpcInternalContext.removeContext();
}
}
});
} catch (RejectedExecutionException e) {
LOGGER
.warn("This queue is full when post event to async execute, queue size is " +
asyncThreadPool.getQueue().size() +
", please optimize this async thread pool of eventbus.");
}
}
}
}
}
private static void handleEvent(final Subscriber subscriber, final Event event) {
try {
subscriber.onEvent(event);
} catch (Throwable e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Handle " + event.getClass() + " error", e);
}
}
}
這個post方法主要做了這么幾件事:
- 根據傳入的Event獲取對應的訂閱列表subscribers
- 遍歷subscribers
- 如果訂閱者是異步的,那么就使用線程池啟動執行任務
4, 如果是同步的那么就調用handleEvent方法向訂閱者發布消息
我們再來看看訂閱者是怎樣的:
Subscriber
public abstract class Subscriber {
/**
* 接到事件是否同步執行
*/
protected boolean sync = true;
/**
* 事件訂閱者
*/
protected Subscriber() {
}
/**
* 事件訂閱者
*
* @param sync 是否同步
*/
protected Subscriber(boolean sync) {
this.sync = sync;
}
/**
* 是否同步
*
* @return 是否同步
*/
public boolean isSync() {
return sync;
}
/**
* 事件處理,請處理異常
*
* @param event 事件
*/
public abstract void onEvent(Event event);
}
Subscriber是一個抽象類,默認是同步的方式進行訂閱。總共有下面四個實現類:
LookoutSubscriber
FaultToleranceSubscriber
RestTracerSubscriber
SofaTracerSubscriber
這里我不打算每個都進行分析,到時候打算用到了再詳細說明,這樣不會那么抽象。
由於我們前面講到了,在客戶端引用的時候會發送一個產生開始調用事件給總線,那一定要有訂閱者這個發送事件才有意義。所以我們接下來看看是在哪里進行事件的注冊的。
訂閱者注冊到EventBus
通過上面的繼承關系圖可以看到,在ConsumerConfig是AbstractIdConfig的子類,所以在初始化ConsumerConfig的時候AbstractIdConfig靜態代碼塊也會被初始化。
public abstract class AbstractIdConfig<S extends AbstractIdConfig> implements Serializable {
static {
RpcRuntimeContext.now();
}
}
在調用RpcRuntimeContext#now方法的時候,會調用到RpcRuntimeContext的靜態代碼塊
RpcRuntimeContext
public class RpcRuntimeContext {
static {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);
}
put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
// 初始化一些上下文
initContext();
// 初始化其它模塊
ModuleFactory.installModules();
// 增加jvm關閉事件
if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");
}
destroy(false);
}
}, "SOFA-RPC-ShutdownHook"));
}
}
public static long now() {
return System.currentTimeMillis();
}
}
在RpcRuntimeContext靜態代碼塊里主要做了以下幾件事:
- 初始化一些上下文的東西,例如:應用Id,應用名稱,當前所在文件夾地址等
- 初始化一些模塊,等下分析
- 增加jvm關閉時的鈎子
我們直接看installModules方法就好了,其他的方法和主流程無關。
ModuleFactory#installModules
public static void installModules() {
//通過SPI加載Module模塊
ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
//moduleLoadList 默認是 *
String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
String moduleName = o.getKey();
Module module = o.getValue().getExtInstance();
// judge need load from rpc option
if (needLoad(moduleLoadList, moduleName)) {
// judge need load from implement
if (module.needLoad()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Install Module: {}", moduleName);
}
//安裝模板
module.install();
INSTALLED_MODULES.put(moduleName, module);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " does not need to be loaded.");
}
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " is not in the module load list.");
}
}
}
}
-
這個方法里面一開始獲取Module的擴展類,Module的擴展類有如下幾個:
FaultToleranceModule
LookoutModule
RestTracerModule
SofaTracerModule -
然后會去獲取MODULE_LOAD_LIST配置類,多個配置用“;”分割。
-
調用loader.getAllExtensions()獲取所有的擴展類。遍歷擴展類。
-
接着調用needLoad方法:
static boolean needLoad(String moduleLoadList, String moduleName) {
//用;拆分
String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);
boolean match = false;
for (String activatedModule : activatedModules) {
//ALL 就是 *
if (StringUtils.ALL.equals(activatedModule)) {
match = true;
} else if (activatedModule.equals(moduleName)) {
match = true;
} else if (match && (activatedModule.equals("!" + moduleName)
|| activatedModule.equals("-" + moduleName))) {
match = false;
break;
}
}
return match;
}
這個方法會傳入配置的moduleLoadList和當前遍歷到的moduleName,moduleLoadList默認是*
所以會返回true,如果配置了moduleLoadList不為*
的話,如果moduleName是配置中的之一便會返回true。
- 調用module的install進行模板的裝配
這里我們進入到SofaTracerModule#install中
SofaTracerModule#install
public void install() {
Tracer tracer = TracerFactory.getTracer("sofaTracer");
if (tracer != null) {
subscriber = new SofaTracerSubscriber();
EventBus.register(ClientStartInvokeEvent.class, subscriber);
EventBus.register(ClientBeforeSendEvent.class, subscriber);
EventBus.register(ClientAfterSendEvent.class, subscriber);
EventBus.register(ServerReceiveEvent.class, subscriber);
EventBus.register(ServerSendEvent.class, subscriber);
EventBus.register(ServerEndHandleEvent.class, subscriber);
EventBus.register(ClientSyncReceiveEvent.class, subscriber);
EventBus.register(ClientAsyncReceiveEvent.class, subscriber);
EventBus.register(ClientEndInvokeEvent.class, subscriber);
}
}
這里我們可以看到文章一開始被發送的ClientStartInvokeEvent在這里被注冊了。訂閱者的實現類是SofaTracerSubscriber。
訂閱者被調用
在上面我們分析到在注冊到EventBus之后,會發送一個post請求,然后EventBus會遍歷所有的Subscriber,調用符合條件的Subscriber的onEvent方法。
SofaTracerSubscriber#onEvent
public void onEvent(Event originEvent) {
if (!Tracers.isEnable()) {
return;
}
Class eventClass = originEvent.getClass();
if (eventClass == ClientStartInvokeEvent.class) {
ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
Tracers.startRpc(event.getRequest());
}
else if (eventClass == ClientBeforeSendEvent.class) {
ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
Tracers.clientBeforeSend(event.getRequest());
}
.....
}
這個方法里面主要就是對不同的event做出不同的反應。ClientStartInvokeEvent所做的請求就是調用一下Tracers#startRpc,Tracers是用來做鏈路追蹤的,這篇文章不涉及。
總結
我們首先上一張圖,來說明一下訂閱發布模式整體的結構。
在我們這個例子里EventBus的職責就是調度中心,subscriber的具體實現注冊到EventBus中后,會保存到EventBus的SUBSCRIBER_MAP集合中。
發布者在發布消息的時候會調用EventBus的post方法傳入一個具體的event來調用訂閱者的事件。一個事件有多個訂閱者,消息的發布者不會直接的去調用訂閱者來發布消息,而是通過EventBus來進行觸發。
通過EventBus來觸發不同的訂閱者的事件可以在觸發事件之前同一的為其做一些操作,比如是同步還是異步,要不要過濾部分訂閱者等。
SOFARPC源碼解析系列: