1.EventBus是什么?
EventBus是guava中的一個工具,官方解釋如下:
EventBus允許組件之間通過發布-訂閱進行通信,而不需要組件之間顯示的注冊。它專門設計為了代替使用顯示注冊的傳統的Java進程內事件分發。它不是通用的發布-訂閱系統,也不是用於進程間通信的。
Event | 可能發布到總線的任何對象。 |
---|---|
Subscribing | 向EventBus注冊偵聽器的行為,以便其處理程序方法將接收事件。 |
Listener | 通過公開處理程序方法希望接收事件的對象。 |
Handler method | EventBus用於傳遞已發布事件的公共方法。處理程序方法由@Subscribe批注標記。 |
Posting an event | 通過EventBus將事件提供給所有偵聽器。 |
優點:簡化組件之間的通信。是發布者和訂閱之間解耦,同時避免了復雜且容易出錯的依賴性和生命周期問題。使代碼更加簡潔
2.使用
//Example
// Class is typically registered by the container. class EventBusChangeRecorder {
//訂閱事件 @Subscribe public void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); } } //注冊監聽類 eventBus.register(new EventBusChangeRecorder()); // much later public void changeCustomer() ChangeEvent event = getChangeEvent();
//發布事件 eventBus.post(event); }
3.源碼解析
以下源碼來源Guava版本-20.0
3.1事件總線
@Beta public class EventBus { private static final Logger logger = Logger.getLogger(EventBus.class.getName()); private final String identifier; private final Executor executor; private final SubscriberExceptionHandler exceptionHandler; private final SubscriberRegistry subscribers = new SubscriberRegistry(this); private final Dispatcher dispatcher; /** * Creates a new EventBus named "default". */ public EventBus() { this("default"); } /** * Creates a new EventBus with the given {@code identifier}. */ public EventBus(String identifier) { this( identifier, MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE); } /** * Creates a new EventBus with the given {@link SubscriberExceptionHandler}. */ public EventBus(SubscriberExceptionHandler exceptionHandler) { this( "default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler); } EventBus( String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) { this.identifier = checkNotNull(identifier); this.executor = checkNotNull(executor); this.dispatcher = checkNotNull(dispatcher); this.exceptionHandler = checkNotNull(exceptionHandler); } /** * Returns the identifier for this event bus. */ public final String identifier() { return identifier; } /** * Returns the default executor this event bus uses for dispatching events to subscribers. */ final Executor executor() { return executor; } /** * 處理上下文訂閱者拋出的異常 */ void handleSubscriberException(Throwable e, SubscriberExceptionContext context) { checkNotNull(e); checkNotNull(context); try { exceptionHandler.handleException(e, context); } catch (Throwable e2) { // if the handler threw an exception... well, just log it logger.log( Level.SEVERE, String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e), e2); } } /** * 注冊所有的訂閱者為了接收事件 */ public void register(Object object) { subscribers.register(object); } /** * 注銷所有已注冊的訂閱方法 * 要注冊的對象,如果之前沒有該對象沒有注冊過拋出IllegalArgumentException */ public void unregister(Object object) { subscribers.unregister(object); } /** * 發布事件給所有訂閱者,發送完成就返回成功,不管訂閱者有沒有處理成功。如果沒有訂閱者,該事件將成為一個死亡事件,它將包裝在死亡事件中並重新發布 */ public void post(Object event) { //找到事件的所有訂閱者 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { //事件轉發器,把事件轉發給訂閱者 dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // 如果該事件即沒有訂閱者,也沒事。那么封裝成DeadEvent並重新發布 post(new DeadEvent(this, event)); } } @Override public String toString() { return MoreObjects.toStringHelper(this).addValue(identifier).toString(); } ...省略異常日志處理方法 } }
- subscribers是SubscriberRegistry類型的,實際上EventBus在添加、移除和遍歷觀察者的時候都會使用該實例的方法,所有的觀察者信息也都維護在該實例中.
- executor是事件分發過程中使用到的線程池,可以自己實現; dispatcher是Dispatcher類型的子類,用來在發布事件的時候分發消息給監聽者,它有幾個默認的實現,分別針對不同的分發方式;
- exceptionHandler是SubscriberExceptionHandler類型的,它用來處理異常信息,在默認的EventBus實現中,會在出現異常的時候打印出log,當然我們也可以定義自己的異常處理策咯。
通過SubscriberRegistry了解如何注冊和取消注冊以及遍歷。我們需要在EventBus中維護幾個映射,以便在發布事件的時候找到並通知所有的監聽者,首先是事件類型->觀察者列表的映射。
EventBus中發布事件是針對各個方法的,我們將一個事件對應的類型信息和方法信息等都維護在一個對象中,在EventBus中就是觀察者Subscriber. 然后,通過事件類型映射到觀察者列表,當發布事件的時候,只要根據事件類型到列表中尋找所有的觀察者並觸發監聽方法即可。 在SubscriberRegistry中通過如下數據結構來完成這一映射:
/** * 通過事件類型索引所有的注冊訂閱者 * CopyOnWriteArraySet值使獲取事件的所有當前訂閱者的不可變快照變得容易且相對輕便,而沒有任何鎖定。 */ private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
它底層使用了CopyOnWriteArrayList,並對其進行了封裝,也就是在基本的集合上面增加了去重的操作。這是一種適用於讀多寫少場景的集合,在讀取數據的時候不會加鎖,
寫入數據的時候進行加鎖,並且會進行一次數組拷貝。
/** * 返回給定監聽器的所有訂閱者(按其訂閱的事件類型分組)。 */ private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { // 創建一個哈希表 Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); //獲取監聽者的類型 Class<?> clazz = listener.getClass(); //遍歷上述方法,並且根據方法和類型參數創建觀察者並將其插入到映射表中 for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); //事件的類型 Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
這里注意一下Multimap數據結構,它是Guava中提供的集合結構,與普通的哈希表不同的地方在於,它可以完成一對多操作。這里用來存儲事件類型到觀察者的一對多映射。
- 調用SubscriberRegistry的register(listener)來執行注冊監聽器。
- register步驟如下:
EventBus->SubscriberRegistry->ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以維護事件和訂閱者的映射。
當新注冊監聽者的時候,getAnnotatedMethods用反射獲取全部的訂閱者,為了避免浪費性能,會通過subscriberMethodsCache從緩存中加載。
private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) { return subscriberMethodsCache.getUnchecked(clazz); }
//subscriberMethodsCache的定義是:
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder() .weakKeys() .build( new CacheLoader<Class<?>, ImmutableList<Method>>() { @Override public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { return getAnnotatedMethodsNotCached(concreteClass); //2 } });
調用SubscriberRegistry中的getAnnotatedMethodsNotCached()方法獲取這些監聽方法。其實就是使用反射並完成一些校驗,並不復雜。
//獲取超類class集合 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); ////遍歷超類 for (Class<?> supertype : supertypes) { ////遍歷超類中的所有定義的方法 for (Method method : supertype.getDeclaredMethods()) { ///如果方法上有@Subscribe注解 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { //方法的參數類型數組 Class<?>[] parameterTypes = method.getParameterTypes(); //校驗:事件訂閱方法必須只能有一個參數,即事件類 checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); //封裝方法定義對象 MethodIdentifier ident = new MethodIdentifier(method); //去重並添加進map if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
然后,會遍歷上述拿到的方法集合,根據事件的類型(從方法參數得知)和監聽者等信息創建一個觀察者,並將事件類型-觀察者鍵值對插入到一個一對多映射表中並返回。
/** * 在給定的監聽器對象上注冊所有訂閱者方法。 */ void register(Object listener) { //獲取事件類型-觀察者映射表 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); //遍歷上述映射表並將新注冊的觀察者映射表添加到全局的subscribers中 for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); //如果指定事件對應的觀察者列表不存在就創建一個新的 if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
post()方法如下:
public void post(Object event) { // 調用SubscriberRegistry的getSubscribers方法獲取該事件對應的全部觀察者 Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 使用Dispatcher對事件進行分發 this.dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { this.post(new DeadEvent(this, event)); } }
post()方法中還是調用SubscriberRegistry中的方法
/** * 獲取一個迭代器,該迭代器表示在調用此方法時給定事件的所有訂閱者的不變快照。 */ Iterator<Subscriber> getSubscribers(Object event) { // 獲取事件類型的所有父類型和自身構成的集合 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); //3 List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); // 遍歷上述事件類型,並從subscribers中獲取所有的觀察者列表 for (Class<?> eventType : eventTypes) { CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
從EventBus.post()方法可以看出,當我們使用Dispatcher進行事件分發的時候,需要將當前的事件和所有的觀察者作為參數傳入到方法中。然后,在方法的內部進行分發操作。最終某個監聽者的監聽方法是使用反射進行觸發的,這部分邏輯在Subscriber內部,而Dispatcher是事件分發的方式的策略接口。EventBus中提供了3個默認的Dispatcher實現,分別用於不同場景的事件分發:
- ImmediateDispatcher:直接在當前線程中遍歷所有的觀察者並進行事件分發;
- LegacyAsyncDispatcher:異步方法,存在兩個循環,一先一后,前者用於不斷往全局的隊列中塞入封裝的觀察者對象,后者用於不斷從隊列中取出觀察者對象進行事件分發;實際上,EventBus有個字類AsyncEventBus就是用該分發器進行事件分發的。
- PerThreadQueuedDispatcher:這種分發器使用了兩個線程局部變量進行控制,當dispatch()方法被調用的時候,會先獲取當前線程的觀察者隊列,並將傳入的觀察者列表傳入到該隊列中;然后通過一個布爾類型的線程局部變量,判斷當前線程是否正在進行分發操作,如果沒有在進行分發操作,就通過遍歷上述隊列進行事件分發。
final void dispatchEvent(final Object event) { //使用指定的執行器執行任務 executor.execute(()->{ try { //使用反射觸發監聽方法 invokeSubscriberMethod(event); } catch (InvocationTargetException e) { //使用EventBus內部的SubscriberExceptionHandler處理異常 bus.handleSubscriberException(e.getCause(), context(event)); } } }); }