Java EventBus


1.EventBus是什么?

  EventBus是guava中的一個工具,官方解釋如下:

 

 

 EventBus允許組件之間通過發布-訂閱進行通信,而不需要組件之間顯示的注冊。它專門設計為了代替使用顯示注冊的傳統的Java進程內事件分發。它不是通用的發布-訂閱系統,也不是用於進程間通信的。

 

Event 可能發布到總線的任何對象。
Subscribing 向EventBus注冊偵聽器的行為,以便其處理程序方法將接收事件。
Listener 通過公開處理程序方法希望接收事件的對象。
Handler method EventBus用於傳遞已發布事件的公共方法。處理程序方法由@Subscribe批注標記。
Posting an event 通過EventBus將事件提供給所有偵聽器。

 

 

 

 

 

 

優點:簡化組件之間的通信。是發布者和訂閱之間解耦,同時避免了復雜且容易出錯的依賴性和生命周期問題。使代碼更加簡潔

2.使用

//Example
//
Class is typically registered by the container. class EventBusChangeRecorder {
 //訂閱事件 @Subscribe
public void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); } } //注冊監聽類 eventBus.register(new EventBusChangeRecorder()); // much later public void changeCustomer() ChangeEvent event = getChangeEvent();
//發布事件 eventBus.post(event); }

3.源碼解析

以下源碼來源Guava版本-20.0

3.1事件總線

@Beta
public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());

  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;

  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  private final Dispatcher dispatcher;

  /**
   * Creates a new EventBus named "default".
   */
  public EventBus() {
    this("default");
  }

  /**
   * Creates a new EventBus with the given {@code identifier}.
   */
  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }

  /**
   * Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
   */
  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this(
        "default",
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        exceptionHandler);
  }

  EventBus(
      String identifier,
      Executor executor,
      Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {
    this.identifier = checkNotNull(identifier);
    this.executor = checkNotNull(executor);
    this.dispatcher = checkNotNull(dispatcher);
    this.exceptionHandler = checkNotNull(exceptionHandler);
  }

  /**
   * Returns the identifier for this event bus.
   */
  public final String identifier() {
    return identifier;
  }

  /**
   * Returns the default executor this event bus uses for dispatching events to subscribers.
   */
  final Executor executor() {
    return executor;
  }

  /**
   * 處理上下文訂閱者拋出的異常
   */
  void handleSubscriberException(Throwable e, SubscriberExceptionContext context) {
    checkNotNull(e);
    checkNotNull(context);
    try {
      exceptionHandler.handleException(e, context);
    } catch (Throwable e2) {
      // if the handler threw an exception... well, just log it
      logger.log(
          Level.SEVERE,
          String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e),
          e2);
    }
  }

  /**
   * 注冊所有的訂閱者為了接收事件
   */
  public void register(Object object) {
    subscribers.register(object);
  }

  /**
   * 注銷所有已注冊的訂閱方法
   * 要注冊的對象,如果之前沒有該對象沒有注冊過拋出IllegalArgumentException
   */
  public void unregister(Object object) {
    subscribers.unregister(object);
  }

  /**
   * 發布事件給所有訂閱者,發送完成就返回成功,不管訂閱者有沒有處理成功。如果沒有訂閱者,該事件將成為一個死亡事件,它將包裝在死亡事件中並重新發布 
   */
  public void post(Object event) {
    //找到事件的所有訂閱者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      //事件轉發器,把事件轉發給訂閱者
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // 如果該事件即沒有訂閱者,也沒事。那么封裝成DeadEvent並重新發布
      post(new DeadEvent(this, event));
    }
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).addValue(identifier).toString();
  }
  ...省略異常日志處理方法
  }
}
  • subscribers是SubscriberRegistry類型的,實際上EventBus在添加、移除和遍歷觀察者的時候都會使用該實例的方法,所有的觀察者信息也都維護在該實例中.
  • executor是事件分發過程中使用到的線程池,可以自己實現; dispatcher是Dispatcher類型的子類,用來在發布事件的時候分發消息給監聽者,它有幾個默認的實現,分別針對不同的分發方式;
  • exceptionHandler是SubscriberExceptionHandler類型的,它用來處理異常信息,在默認的EventBus實現中,會在出現異常的時候打印出log,當然我們也可以定義自己的異常處理策咯。

  通過SubscriberRegistry了解如何注冊和取消注冊以及遍歷。我們需要在EventBus中維護幾個映射,以便在發布事件的時候找到並通知所有的監聽者,首先是事件類型->觀察者列表的映射。

EventBus中發布事件是針對各個方法的,我們將一個事件對應的類型信息和方法信息等都維護在一個對象中,在EventBus中就是觀察者Subscriber. 然后,通過事件類型映射到觀察者列表,當發布事件的時候,只要根據事件類型到列表中尋找所有的觀察者並觸發監聽方法即可。 在SubscriberRegistry中通過如下數據結構來完成這一映射:

/**
   * 通過事件類型索引所有的注冊訂閱者
   * CopyOnWriteArraySet值使獲取事件的所有當前訂閱者的不可變快照變得容易且相對輕便,而沒有任何鎖定。
   */
  private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
  從上面的定義形式中我們可以看出,這里使用的是事件的Class類型映射到Subscriber列表的。這里的Subscriber列表使用的是Java中的CopyOnWriteArraySet集合,
它底層使用了CopyOnWriteArrayList,並對其進行了封裝,也就是在基本的集合上面增加了去重的操作。這是一種適用於讀多寫少場景的集合,在讀取數據的時候不會加鎖,
寫入數據的時候進行加鎖,並且會進行一次數組拷貝。
  在分析register()方法之前,我們先看下SubscriberRegistry內部經常使用的幾個方法,它們的原理與我們上面提出的問題息息相關。首先是findAllSubscribers()方法,它用來獲取指定監聽者對應的全部觀察者集合。下面是它的代碼:
  /**
   * 返回給定監聽器的所有訂閱者(按其訂閱的事件類型分組)。
   */
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    // 創建一個哈希表
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    //獲取監聽者的類型
    Class<?> clazz = listener.getClass();
    //遍歷上述方法,並且根據方法和類型參數創建觀察者並將其插入到映射表中
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      //事件的類型
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

  這里注意一下Multimap數據結構,它是Guava中提供的集合結構,與普通的哈希表不同的地方在於,它可以完成一對多操作。這里用來存儲事件類型到觀察者的一對多映射。

  • 調用SubscriberRegistry的register(listener)來執行注冊監聽器。
  • register步驟如下:
    EventBus->SubscriberRegistry->ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以維護事件和訂閱者的映射。

  當新注冊監聽者的時候,getAnnotatedMethods用反射獲取全部的訂閱者,為了避免浪費性能,會通過subscriberMethodsCache從緩存中加載。

 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return subscriberMethodsCache.getUnchecked(clazz);
  }
//subscriberMethodsCache的定義是:
private
static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache = CacheBuilder.newBuilder() .weakKeys() .build( new CacheLoader<Class<?>, ImmutableList<Method>>() { @Override public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { return getAnnotatedMethodsNotCached(concreteClass); //2 } });
  這里的作用機制是:當使用subscriberMethodsCache.getUnchecked(clazz)獲取指定監聽者中的方法的時候會先嘗試從緩存中進行獲取,如果緩存中不存在就會執行2處的代碼,
調用SubscriberRegistry中的getAnnotatedMethodsNotCached()方法獲取這些監聽方法。其實就是使用反射並完成一些校驗,並不復雜。
//獲取超類class集合
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    ////遍歷超類
    for (Class<?> supertype : supertypes) {
      ////遍歷超類中的所有定義的方法
      for (Method method : supertype.getDeclaredMethods()) {
        ///如果方法上有@Subscribe注解
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          //方法的參數類型數組
          Class<?>[] parameterTypes = method.getParameterTypes();
          //校驗:事件訂閱方法必須只能有一個參數,即事件類
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);
          //封裝方法定義對象
          MethodIdentifier ident = new MethodIdentifier(method);
          //去重並添加進map
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }
這樣,我們就分析完了findAllSubscribers()方法,整理一下:當注冊監聽者的時候,首先會拿到該監聽者的類型,然后從緩存中嘗試獲取該監聽者對應的所有監聽方法,如果沒有的話就遍歷該類的方法進行獲取,並添加到緩存中;
然后,會遍歷上述拿到的方法集合,根據事件的類型(從方法參數得知)和監聽者等信息創建一個觀察者,並將事件類型-觀察者鍵值對插入到一個一對多映射表中並返回。
20.0版本register()在EventBus中沒有具體的實現,所以我們看下SubscriberRegistry中的register():
 /**
   * 在給定的監聽器對象上注冊所有訂閱者方法。
   */
  void register(Object listener) {
    //獲取事件類型-觀察者映射表
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    //遍歷上述映射表並將新注冊的觀察者映射表添加到全局的subscribers中
    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      //如果指定事件對應的觀察者列表不存在就創建一個新的
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

post()方法如下:

public void post(Object event) {
    // 調用SubscriberRegistry的getSubscribers方法獲取該事件對應的全部觀察者
    Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 使用Dispatcher對事件進行分發
        this.dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        this.post(new DeadEvent(this, event));
    }
}

post()方法中還是調用SubscriberRegistry中的方法

 /**
   * 獲取一個迭代器,該迭代器表示在調用此方法時給定事件的所有訂閱者的不變快照。
   */
  Iterator<Subscriber> getSubscribers(Object event) {
    // 獲取事件類型的所有父類型和自身構成的集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); //3

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    // 遍歷上述事件類型,並從subscribers中獲取所有的觀察者列表
    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }
    return Iterators.concat(subscriberIterators.iterator());
  }

  從EventBus.post()方法可以看出,當我們使用Dispatcher進行事件分發的時候,需要將當前的事件和所有的觀察者作為參數傳入到方法中。然后,在方法的內部進行分發操作。最終某個監聽者的監聽方法是使用反射進行觸發的,這部分邏輯在Subscriber內部,而Dispatcher是事件分發的方式的策略接口。EventBus中提供了3個默認的Dispatcher實現,分別用於不同場景的事件分發:

  • ImmediateDispatcher:直接在當前線程中遍歷所有的觀察者並進行事件分發;
  • LegacyAsyncDispatcher:異步方法,存在兩個循環,一先一后,前者用於不斷往全局的隊列中塞入封裝的觀察者對象,后者用於不斷從隊列中取出觀察者對象進行事件分發;實際上,EventBus有個字類AsyncEventBus就是用該分發器進行事件分發的。
  • PerThreadQueuedDispatcher:這種分發器使用了兩個線程局部變量進行控制,當dispatch()方法被調用的時候,會先獲取當前線程的觀察者隊列,並將傳入的觀察者列表傳入到該隊列中;然后通過一個布爾類型的線程局部變量,判斷當前線程是否正在進行分發操作,如果沒有在進行分發操作,就通過遍歷上述隊列進行事件分發。
上述三個分發器內部最終都會調用Subscriber的dispatchEvent()方法進行事件分發:
  final void dispatchEvent(final Object event) {
    //使用指定的執行器執行任務
    executor.execute(()->{
            try {
              //使用反射觸發監聽方法
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              //使用EventBus內部的SubscriberExceptionHandler處理異常
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
上述方法中的executor是執行器,它是通過EventBus獲取到的;處理異常的SubscriberExceptionHandler類型也是通過EventBus獲取到的。(原來EventBus中的構造方法中的字段是在這里用到的!
參考: https://www.jianshu.com/p/4bddd45a8e7a


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM