EventBus
1. 什么是EventBus
總線(Bus)一般指計算機各種功能部件之間傳送信息的公共通信干線,而EventBus則是事件源(publisher)向訂閱方(subscriber)發送訂閱事件的總線,它解耦了觀察者模式中訂閱方和事件源之間的強依賴關系。
2. guava EventBus的構成
下面以guava 19版本的EventBus的源碼進行分析。EventBus有三個操作:注冊Listener--register(Object Listener),注銷Listener--unregister(Object Listener),發布Event--post(Object event)。在19版本之前,listener的注冊和注銷,事件的發布的工作都是由EventBus完成,在18版本之后,EventBus把Listener的注冊和注銷的工作委托給SubscriberRegistry, 把事件發布的工作委托給Dispatcher來完成,這樣的修改職責分明,代碼結構更加清晰了。在並發方面也有大的修改,以前對維護某類事件和其感興趣的Subscriber的操作用了讀寫鎖來控制對容器的操作,19版本及之后用了並發容器ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>>避免了對鎖的使用。
3. SubscriberRegistry
在觀察者模式中,事件源中會維護一個Listener的列表,而且向這個事件源注冊的Listener一般只會收到一類事件的通知,如果Listener對多個不同類的事件感興趣,則需要向多個事件源注冊。EventBus是怎樣實現Listener一次注冊,能夠知道Listener對那些事件感興趣的,進而在有某類事件發生時通知到Listener的呢?答案在SubscriberRegistry這個類中。在SubscriberRegister有一個實例屬性ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers,它維護了某個事件類型和對其感興趣的Subscriber的列表。
register(Object listener)的工作就是找出這個Listener對哪些事件感興趣,然后把這種事件類型和該Listener構建成的Subscriber加到subscribers中。unregister的過程和register類似,只從subscribers刪掉Listener感興趣的事件。下面我們分別看看18版本和19版本register,主要的區別就是一個是加鎖的版本,一個用的是並發容器。
18版本:
/**
* Registers all subscriber methods on {@code object} to receive events.
* Subscriber methods are selected and classified using this EventBus's
* {@link SubscriberFindingStrategy}; the default strategy is the
* {@link AnnotatedSubscriberFinder}.
*
* @param object object whose subscriber methods should be registered.
*/
public void register(Object object) {
Multimap<Class<?>, EventSubscriber> methodsInListener =
finder.findAllSubscribers(object);
subscribersByTypeLock.writeLock().lock();
try {
subscribersByType.putAll(methodsInListener);
} finally {
subscribersByTypeLock.writeLock().unlock();
}
}
19版本:
/**
* Registers all subscriber methods on the given listener object.
*/
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
eventSubscribers = MoreObjects.firstNonNull(
subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
SubscriberRegister通過reflection找出該Listener對象(包括其父類)哪些Method用@Subscriber注解了,用@Subscriber注解的方法表示當某件事件發生時,希望收到事件通知。在@Subscriber注解的方法中只能包含一個參數那就是Event,否則會出錯。在reflection的時候用LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache緩存了該Listener Class和對應的Method,加快了后面對同一類Listener進行register的效率。用MethodIdentifier作為Map的key來判別Method的是否相等。
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
// TODO(cgdecker): Should check for a generic parameter type and error out
Class<?>[] parameterTypes = method.getParameterTypes();
checkArgument(parameterTypes.length == 1,
"Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.",
method, parameterTypes.length);
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
在構建Subscriber的時候根據方法是否有@AllowConcurrentEvents,分為同步和並發執行method。
4. Dispatcher
Dispatcher發布事件的時候有三種模式:
1. ImmediateDispatcher,來了一個事件則通知對這個事件感興趣的訂閱者。
2. LegacyAsyncDispatcher,會有一個全局的隊列ConcurrentLinkedQueue<EventWithSubscriber> queue保存EventWithSubscriber(事件和subscriber),如果被不同的線程poll 不能保證在queue隊列中的event是有序發布的。
3. PerThreadQueuedDispatcher,在同一個線程post的Event,執行的順序是有序的。用ThreadLocal<Queue<Event>> queue來實現每個線程post的Event是有序的,在把事件添加到queue后會有一個ThreadLocal<Boolean> dispatching來判斷當前線程是否正在分發,如果正在分發,則這次添加的event不會馬上進行分發而是等到dispatching的值為false才進行。這樣做的原因是為了防止同一個事件被重復分發。我的理解是這樣的:如果沒有dispatching這個狀態變量,在ThreadA中EventA發布了,ListenerA收到了,ListenerA進行處理,在處理的過程中如果又觸發了EventA的發布,如果該線程不結束則會陷入到一種循環中去。
5.EventBus的使用
一般的應用的場景就是在用觀察者模式的地方就可以用EventBus進行替代。結合Spring的使用過程如下: