最近看Elastic-Job源碼,看到它里面實現的任務運行軌跡的持久化,使用的是Guava的AsyncEventBus,一個內存級別的異步事件總線服務,實現了簡單的生產-消費者模式,從而在不影響任務執行效率的基礎上,將任務執行和任務軌跡記錄解耦,大大提高了EJ的性能。
EventBus在Elastic-Job中的使用
EventBus的使用方法不難,具體可以參考EJ里面幾個相關的類:JobEventListener、JobEventBus和LiteJobFacade。主要的流程如下:
- JobEventListener主要是消費者。定義需要監聽的方法,目前主要定義了兩個listen方法,注意想監聽到的話,需要在方法前加上注解:@Subscribe和@AllowConcurrentEvents。看字面意思就是訂閱和允許並發事件。如果不加上后面那個注解,則會導致效率問題,這個咱們后續分析。目前這個接口只有一個實現類JobEventRdbListener,實現了日志寫入DB的操作。
- JobEventBus參考的EventBus源碼,提供了register和post方法,去掉了unregister方法。主要的功能就是注冊監聽器和生產消息。他的構造方法中,默認使用的是Guava的AsyncEventBus,初始化中同時包含了注冊動作。
- LiteJobFacade主要是JobEventBus的使用者。主要調用的是JobEventBus的post方法。
@Override
public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) {
jobEventBus.post(jobExecutionEvent);
}
@Override
public void postJobStatusTraceEvent(final String taskId, final State state, final String message) {
TaskContext taskContext = TaskContext.from(taskId);
jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(),
taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message));
if (!Strings.isNullOrEmpty(message)) {
log.trace(message);
}
}
EventBus源碼分析
言歸正傳,我們來看看EventBus到底是如何實現觀察者模式的。他的主要實現類都在com.google.common.eventbus這個包下面。
主要類概念分析
我們首先來看一下里面比較重要的幾個類,同時理解一些概念。
- EventBus:這個類的作用有兩個,一個是作為一個總線通道,另一個作用是消息的廣播。
- AsyncEventBus:異步的EventBus,功能與EventBus類似,只不過實現方式有所差異。
- Subscriber:可以按照字面理解是訂閱者,也可以說是監聽器。
- SubscriberRegistry:訂閱注冊表。主要存儲的是Subcriber和Event之間的關系,用於消息分發時可以迅速根據Event的類型找到Subscriber。
- Dispatcher:事件分發器,定義了一些分發的策略,里面包含三種分發器。
- 兩個重要的注解@Subscribe和@AllowConcurrentEvents。第一個是標識監聽器的方法,第二個與第一個配合使用,標識允許多線程執行。
- DeadEvent:死信對象,標識沒有訂閱者關注的事件。
- SubscribeExceptionHandler:訂閱者拋出異常的處理器。SubscribeExceptionContext:訂閱者拋出異常的上下文對象。
EventBus
這個類有幾個屬性:
private final String identifier;//唯一標識,默認為default
private final Executor executor;//多線程處理器,默認MoreExecutors.directExecutor()
private final SubscriberExceptionHandler exceptionHandler;//異常處理器
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱注冊表
private final Dispatcher dispatcher;//消息分發器,默認為Dispatcher.perThreadDispatchQueue(),單線程消息分發隊列
其中,identifier表示,同一個應用中,可以根據identifier來區分不同的事件總線,只不過默認為default而已。
EventBus主要定義了幾個方法:
注冊
public void register(Object object) {
subscribers.register(object);
}
注冊的是自己定義的監聽器,也就是listener。
取消注冊
public void unregister(Object object) {
subscribers.unregister(object);
}
類似於注冊。
消息廣播
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
這塊主要是根據event事件類型,來獲取事件的訂閱者,然后進行事件消息的分發。當然,如果沒有訂閱者,也就是event的類型是DeadEvent,也會進行對應的處理。
AsyncEventBus
繼承自EventBus,主要區別在於分發器,使用的是Dispatcher.legacyAsync()。這個后續咱們再分析。
Subscriber
乍看這個類,就是訂閱者,其實我們看源碼就能理解,當一個訂閱類的多個方法用@Subscribe注解時,每個被注解的方法對應的是一個訂閱者。
構造
這個類只是package內可見,沒有定義為public,可以通過靜態方法create來創建它。
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
這里傳入的method就是使用了@Subscribe注解的方法,這塊會先判斷這個方法是否線程安全,即是否使用@AllowConcurrentEvent來進行注解,來創建不同的Subscriber。唯一的差別是SynchronizedSubscriber中一個方法使用了synchronized來修飾。
dispatchEvent
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
調用多線程來處理event。
invokeSubscriberMethod
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
調用訂閱者的方法。
SubscriberRegistry
我們之前在講到EventBus時,里面有兩個方法register和unregister,調用的就是這個類的方法。這個類的作用也講到,是存儲event和對應的訂閱者的關系的。我們來看一下這個類的設計。
屬性
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
@Weak private final EventBus bus;
這個類有兩個屬性。
- 第一個是ConcurrentMap,他的鍵是Class類,也就是Event的類型,值是CopyOnWriteArraySet
,也就是訂閱者。這個ConcurrentMap是Guava定義的並發Map,這個后續咱們有機會再分析。 - 第二個屬性就是EventBus。
register
注冊監聽器。
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
主要的邏輯是:
- 獲取這個類中所有用@Subscribe注解的方法,存儲到Multimap中。
- 遍歷Multimap,鍵為eventType,然后根據這個鍵,從緩存中獲取這個事件對應的訂閱者集合。
- 獲取到之后,判斷集合是否為空,如果為空,新建一個集合來存儲。
unregister
實現與register類似,先根據listener找到subscriber,找到需要監聽的方法,然后根據事件類型去移除subscriber。
findAllSubscribers
獲取監聽器中所有的監聽方法。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
findAllSubscribers用於查找事件類型以及事件處理器的對應關系。查找注解需要涉及到反射,通過反射來獲取標注在方法上的注解。因為Guava針對EventBus的注冊采取的是“隱式契約”而非接口這種“顯式契約”。而類與接口是存在繼承關系的,所有很有可能某個訂閱者其父類(或者父類實現的某個接口)也訂閱了某個事件。因此這里的查找需要順着繼承鏈向上查找父類的方法是否也被注解標注。
getSubscribes
獲取event的訂閱者。
Iterator<Subscriber> getSubscribers(Object event) {
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
Dispatcher
分發器,用於將event分發給subscriber。它內部實現了三種不同類型的分發器,用於不同的情況下事件的順序性。它的核心方法是:
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
它的三種實現:
PerThreadQueuedDispatcher
EventBus默認使用的分發器。它的實現是通過ThreadLocal來實現一個事件隊列,每個線程包含一個這樣的內部隊列。
它的分發代碼如下:
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
嵌套兩層循環,第一層事件不為空,第二層該事件下的訂閱者不為空,則分發事件下去。
LegacyAsyncDispatcher
AsyncEventBus使用的分發器。它在內部通過一個ConcurrentLinkedQueue
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
是一前一后兩個循環。前面一個是遍歷事件訂閱處理器,並構建一個事件實體對象存入隊列。后一個循環是遍歷該事件實體對象隊列,取出事件實體對象中的事件進行分發。
ImmediateDispatcher
同步分發器。
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
總結
Elastic-Job使用的EventBus,可以說很好的對任務的運行和軌跡記錄進行了解耦,借鑒了Guava的思想,將代碼優雅發揮到了新的境界。當然,Guava對EventBus的設計思想是我們需要進行學習和使用的。