Guava 12:Guava EventBus源碼剖析


一、架構速讀

傳統上,Java的進程內事件分發都是通過發布者和訂閱者之間的顯式注冊實現的。設計EventBus就是為了取代這種顯示注冊方式,使組件間有了更好的解耦。EventBus不是通用型的發布-訂閱實現,不適用於進程間通信。

架構圖如下:

二、簡單使用

步驟如下:

1.構造一個事件總線
2.構造一個事件監聽器
3.把事件監聽器注冊到事件總線上
4.事件總線發布事件,觸發監聽器方法

主測試類如下:

 1 package guava.eventbus;
 2 
 3 import com.google.common.eventbus.EventBus;
 4 
 5 /**
 6  * @Description 主測試類
 7  * @author denny
 8  * @date 2018/7/18 上午9:54
 9  */
10 public class MainTest {
11 
12 
13     public static void main(String[] args) {
14         // 1.構造一個事件總線
15         EventBus eventBus = new EventBus("test");
16 
17         // 2.構造一個事件監聽器
18         EventListener listener = new EventListener();
19 
20         // 3.把事件監聽器注冊到事件總線上
21         eventBus.register(listener);
22 
23         // 4.事件總線發布事件,觸發監聽器方法
24         eventBus.post(new TestEvent1(1));
25         eventBus.post(new TestEvent2(2));
26         // 事件3是事件2子類,雖然監聽器只訂閱了父類事件2,一樣可以監聽到子類
27         eventBus.post(new TestEvent3(3));
28         // 未被訂閱的事件,用DeadEvent可訂閱
29         eventBus.post(new TestEvent4(4));
30     }
31 }

事件監聽器如下:

 1 package guava.eventbus;
 2 
 3 import com.alibaba.fastjson.JSON;
 4 import com.google.common.eventbus.DeadEvent;
 5 import com.google.common.eventbus.Subscribe;
 6 
 7 /**
 8  * @Description 事件監聽器
 9  * @author denny
10  * @date 2018/7/18 上午9:53
11  */
12 public class EventListener {
13 
14     private int message = 0;
15 
16     /**
17      * @Description 訂閱事件1
18      * @param event 事件1
19      * @return void
20      * @author denny
21      * @date 2018/7/18 上午9:46
22      */
23     @Subscribe
24     public void onEvent1(TestEvent1 event) {
25         message = event.getMessage();
26         System.out.println("EventListener onEvent1 監聽器接收到消息:"+message);
27     }
28 
29     /**
30      * @Description 訂閱事件2
31      * @param event 事件2
32      * @return void
33      * @author denny
34      * @date 2018/7/18 上午9:59
35      */
36     @Subscribe
37     public void onEvent2(TestEvent2 event) {
38         message = event.getMessage();
39         System.out.println("EventListener onEvent2 監聽器接收到消息:"+message);
40     }
41 
42     /**
43      * @Description 死亡事件(該事件沒有被訂閱會觸發)
44      * @param event 未訂閱事件
45      * @return void
46      * @author denny
47      * @date 2018/7/18 上午9:59
48      */
49     @Subscribe
50     public void onDeadEvent(DeadEvent event) {
51         System.out.println("EventListener DeadEvent 有消費沒有被訂閱!!!!event="+ event.toString());
52     }
53 }

事件類:

 1 package guava.eventbus;
 2 
 3 /**
 4  * @Description 事件1
 5  * @author denny
 6  * @date 2018/7/18 上午9:54
 7  */
 8 public class TestEvent1 {
 9 
10     private final int message;
11 
12     /**
13      * 構造方法
14      * @param message
15      */
16     public TestEvent1(int message) {
17         this.message = message;
18         System.out.println("TestEvent1 事件message:"+message);
19     }
20 
21     public int getMessage() {
22         return message;
23     }
24 }
25 
26 
27 /**
28  * @Description 事件2
29  * @author denny
30  * @date 2018/7/18 上午9:54
31  */
32 public class TestEvent2 {
33 
34     private final int message;
35 
36     /**
37      * 構造方法
38      * @param message
39      */
40     public TestEvent2(int message) {
41         this.message = message;
42         System.out.println("TestEvent2 事件message:"+message);
43     }
44 
45     public int getMessage() {
46         return message;
47     }
48 }
49 
50 /**
51  * @Description 事件3
52  * @author denny
53  * @date 2018/7/18 上午9:54
54  */
55 public class TestEvent3 extends TestEvent2{
56 
57     private final int message;
58 
59     /**
60      * 構造方法
61      * @param message
62      */
63     public TestEvent3(int message) {
64         super(message);
65         this.message = message;
66         System.out.println("TestEvent2 事件message:"+message);
67     }
68 
69     @Override
70     public int getMessage() {
71         return message;
72     }
73 }
74 
75 /**
76  * @Description 事件4
77  * @author denny
78  * @date 2018/7/18 上午9:54
79  */
80 public class TestEvent4 {
81 
82     private final int message;
83 
84     /**
85      * 構造方法
86      * @param message
87      */
88     public TestEvent4(int message) {
89         this.message = message;
90         System.out.println("TestEvent4 事件message:"+message);
91     }
92 
93     public int getMessage() {
94         return message;
95     }
96 }

運行結果如下:

TestEvent1 事件message:1
EventListener onEvent1 監聽器接收到消息:1 ---》觸發訂閱的事件1
TestEvent2 事件message:2
EventListener onEvent2 監聽器接收到消息:2---》觸發訂閱的事件2(一個監聽器可以訂閱多個事件)
TestEvent2 事件message:3
TestEvent2 事件message:3
EventListener onEvent2 監聽器接收到消息:3---》訂閱事件2,可觸發訂閱子類事件3
TestEvent4 事件message:4
EventListener DeadEvent 有消費沒有被訂閱!!!!event="DeadEvent{source=EventBus{test}, event=guava.eventbus.TestEvent4@19e1023e}"---》事件4沒有被訂閱,觸發DeadEvent死亡事件。

注意:

1.事件總線EventBus

不提供單列,用戶自己看着用~

2.監聽器Listener

1)監聽器使用@Subscribe標記的方法(參數為自定義事件),即可實現事件的監聽。要監聽多個事件,就寫多個方法(每個方法都用@Subscribe標記)即可。

2)注意一定要把Listener注冊到eventbus上。

三、源碼剖析

源碼版本為:guava-22.0.jar,先來回顧下第一節的樣例代碼:

 1 public static void main(String[] args) {
 2         // 1.構造一個事件總線
 3         EventBus eventBus = new EventBus("test");
 4 
 5         // 2.構造一個事件監聽器
 6         EventListener listener = new EventListener();
 7 
 8         // 3.把事件監聽器注冊到事件總線上
 9         eventBus.register(listener);
10 
11         // 4.事件總線發布事件,觸發監聽器方法
12         eventBus.post(new TestEvent1(1));
13         eventBus.post(new TestEvent2(2));
14         // 事件3是事件2子類,雖然監聽器只訂閱了父類事件2,一樣可以監聽到子類
15         eventBus.post(new TestEvent3(3));
16         // deadEvent未被訂閱的事件,供用戶自行處理
17         eventBus.post(new TestEvent4(4));
18     }

如上圖,雖然是google封裝的事件總線,但是依然是觀察者模式,那么核心就是發布、訂閱。下面就從這兩個方面來看一下源碼,看看有沒有值得借鑒的地方。

 3.1 核心類速讀

1.EventBus事件總線

核心方法

register:把監聽器中申明的所有訂閱事件方法注冊到SubscriberRegistry(訂閱者注冊器)中。

post發布事件給所有已注冊過的訂閱者,最終開啟線程完成訂閱方法。

具體如下圖:

  1 @Beta
  2 public class EventBus {
  6   private final String identifier;//事件總線標識:用於自定義標識這個事件總線
  7   private final Executor executor;//默認的線程執行器,用於把事件轉發給訂閱者
 10   private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//訂閱注冊器
 11   private final Dispatcher dispatcher;//事件轉發器
 12  15   //構造器:使用默認字符串
 16   public EventBus() {
 17     this("default");
 18   }
 19   //構造器:使用自定義字符串
 26   public EventBus(String identifier) {
 27     this(
 28         identifier,
 29         MoreExecutors.directExecutor(),
 30         Dispatcher.perThreadDispatchQueue(),
 31         LoggingHandler.INSTANCE);
 32   } 58 
 93   //注冊監聽者中申明的所有訂閱方法(@Subscribe標記的),用以接收事件
 97   public void register(Object object) {
 98     subscribers.register(object);
 99   }
100   // 解除訂閱
107   public void unregister(Object object) {
108     subscribers.unregister(object);
109   }
110 
111   //發布事件給所有已注冊過的訂閱者
121   public void post(Object event) {
// 找到事件的所有訂閱者
122 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 123 if (eventSubscribers.hasNext()) {
// 事件轉發器,把事件轉發給訂閱者
124 dispatcher.dispatch(event, eventSubscribers); 125 } else if (!(event instanceof DeadEvent)) { 126 // 如果該事件即沒有訂閱者,也沒事DeadEvent,那么封裝成DeadEvent並重新發布 127 post(new DeadEvent(this, event)); 128 }
...省略非重要方法
167 }

2.Subscriber訂閱者

核心方法:dispatchEvent使用executor線程執行器,單獨開啟線程執行訂閱方法。

  1 class Subscriber {
  2 
  3   /**
  4    * 構造  5    */
  6   static Subscriber create(EventBus bus, Object listener, Method method) {
  7     return isDeclaredThreadSafe(method)
  8         ? new Subscriber(bus, listener, method)
  9         : new SynchronizedSubscriber(bus, listener, method);
 10   }
 11 
 12   /** 訂閱者所屬的事件總線*/
 13   @Weak private EventBus bus;
 14 
 15   /** 監聽器 listener*/
 16   @VisibleForTesting final Object target;
 17 
 18   /** 訂閱者方法 */
 19   private final Method method;
 20 
 21   /** 線程執行器,用來分發事件給訂閱者 */
 22   private final Executor executor;
23 /** 構造器:使用事件總線、監聽器、訂閱方法 */ 24 private Subscriber(EventBus bus, Object target, Method method) { 25 this.bus = bus; 26 this.target = checkNotNull(target); 27 this.method = method; 28 method.setAccessible(true); 29 30 this.executor = bus.executor(); 31 } 32 33 /** 34 * 使用executor線程執行器,執行訂閱方法*/ 36 final void dispatchEvent(final Object event) { 37 executor.execute( 38 new Runnable() { 39 @Override 40 public void run() { 41 try { 42 invokeSubscriberMethod(event); 43 } catch (InvocationTargetException e) { 44 bus.handleSubscriberException(e.getCause(), context(event)); 45 } 46 } 47 }); 48 } 49 50 /** 51 * 調用訂閱者方法*/ 54 @VisibleForTesting 55 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 56 try { 57 method.invoke(target, checkNotNull(event)); 58 } catch (IllegalArgumentException e) { 59 throw new Error("Method rejected target/argument: " + event, e); 60 } catch (IllegalAccessException e) { 61 throw new Error("Method became inaccessible: " + event, e); 62 } catch (InvocationTargetException e) { 63 if (e.getCause() instanceof Error) { 64 throw (Error) e.getCause(); 65 } 66 throw e; 67 } 68 }
...
120 }

本節我們了解了2個核心類EventBus(注冊監聽器、發布事件)、Subscriber訂閱者(執行訂閱方法),下面我們從源碼流程上來串連一遍。

 3.2.注冊監聽器

我們從注冊監聽器開始看,eventBus.register(listener); 如下圖所示:

 1 public void register(Object object) {
 2     subscribers.register(object);
 3 }
 4 
 5 /**
 6    * 把listener中申明的所有訂閱方法都注冊
 7    */
 8   void register(Object listener) {
// 獲取該監聽器類型對應的所有訂閱方法,key是事件類型,value是訂閱者集合
9 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); 10 // 遍歷map Map<K, Collection<V>> 11 for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { 12 //key:事件類型
      Class<?> eventType = entry.getKey();
//value:訂閱者集合
13 Collection<Subscriber> eventMethodsInListener = entry.getValue(); 14 //從subscribers並發map 中獲取事件對應的事件訂閱者set,subscribersprivate final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); 15 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); 16 //不存在,就構造 17 if (eventSubscribers == null) { 18 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); 19 eventSubscribers = 20 MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); 21 } 22 //存在,把訂閱者集合添加進subscribers 23 eventSubscribers.addAll(eventMethodsInListener); 24 } 25 }

1.調用SubscriberRegistry的register(listener)來執行注冊監聽器。

2.register步驟如下:

EventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers 用以維護事件和訂閱者的映射

1). findAllSubscribers從緩存中獲取該監聽器類型對應的所有訂閱方法,key是event class,value是Subscriber集合

2).遍歷map,把訂閱者集合添加進SubscriberRegistry-》subscribers。

其中findAllSubscribers詳細代碼如下:

12   /**
13    * 返回所有該監聽器訂閱者,以事件分組
14    */
15   private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
16     Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
17     Class<?> clazz = listener.getClass();
//從緩存中獲取該監聽器類型對應的所有訂閱方法,遍歷塞進Multimap
18 for (Method method : getAnnotatedMethods(clazz)) { 19 Class<?>[] parameterTypes = method.getParameterTypes(); 20 Class<?> eventType = parameterTypes[0]; 21 methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); 22 } 23 return methodsInListener; 24 }

如上圖,

1.方法getAnnotatedMethods:從緩存中取 listener中訂閱方法的不可變列表,

2.遍歷塞進Multimap:一個key,多個value,每次put進去,往Collection<V>中add(value)。

getAnnotatedMethods源碼如下:

 1 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
 2     return subscriberMethodsCache.getUnchecked(clazz);
 3   }
 4 
 5 private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
 6       CacheBuilder.newBuilder()
 7           .weakKeys()
 8           .build(
 9               new CacheLoader<Class<?>, ImmutableList<Method>>() {
10                 @Override
11                 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
12                   return getAnnotatedMethodsNotCached(concreteClass);
13                 }
14               });

如上圖,我們發現這里用到了google cache來做緩存,關於google cache飛機票

這個cache的源碼注釋翻譯如下:一個線程安全的緩存,包含從每個類到該類中的所有方法和所有超類的映射,這些超類都用{@code @Subscribe}注釋。緩存是跨該類的所有實例共享的;如果創建了多個EventBus實例,並且在所有這些實例上注冊了同一個類的對象,這將大大提高性能。

值得借鑒點:EventBus-包含->SubscriberRegistry-->static final subscriberMethodsCache,即所有EventBus類的實例共享一個靜態cache,性能高且線程安全。

下面來看看具體怎么獲取的訂閱事件方法(監聽器@Subscribe注解的訂閱事件方法),核心方法getAnnotatedMethodsNotCached如下:

 1 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
//獲取超類class集合
2 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); 3 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    //遍歷超類
4 for (Class<?> supertype : supertypes) {
5   //遍歷超類中的所有定義的方法
  
  
for (Method method : supertype.getDeclaredMethods()) {
//如果方法上有@Subscribe注解
6 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { 7 // 方法的參數類型數組 8 Class<?>[] parameterTypes = method.getParameterTypes(); 9 // 校驗:事件訂閱方法必須只能有一個參數,即事件類
        checkArgument(
10 parameterTypes.length == 1, 11 "Method %s has @Subscribe annotation but has %s parameters." 12 + "Subscriber methods must have exactly 1 parameter.", 13 method, 14 parameterTypes.length); 15       // 封裝方法定義對象 16 MethodIdentifier ident = new MethodIdentifier(method);
        // 去重並添加進map
17 if (!identifiers.containsKey(ident)) { 18 identifiers.put(ident, method); 19 } 20 } 21 } 22 }
    // map轉ImmutableList
23 return ImmutableList.copyOf(identifiers.values()); 24 }

3.3 發布事件

eventBus.post(new TestEvent1(1));

調用事件轉發器Dispatcher,分發事件給訂閱者,

1 public void post(Object event) {
//從注冊器中獲取當前事件對應的訂閱者集合:eventBus-包含-》SubscriberRegistry-包含-》ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers
2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 dispatcher.dispatch(event, eventSubscribers); 5 } else if (!(event instanceof DeadEvent)) { 6 //如果該事件即沒有訂閱者,也沒事DeadEvent,那么封裝成DeadEvent並重新發布
7 post(new DeadEvent(this, event));
8 }
9 }

Dispatcher是個抽象類,有多個內部類復寫不同dispatch方法。EventBus默認構造時使用PerThreadQueuedDispatcher,即每個線程一個待轉發事件隊列。如下圖所示:

 1   private static final class PerThreadQueuedDispatcher extends Dispatcher {
 2 
 3     // This dispatcher matches the original dispatch behavior of EventBus.
 4 
 5     /**
 6      * 每個線程待轉發事件隊列
 7      */
 8     private final ThreadLocal<Queue<Event>> queue =
 9         new ThreadLocal<Queue<Event>>() {
10           @Override
11           protected Queue<Event> initialValue() {
12             return Queues.newArrayDeque();
13           }
14         };
15 
16     /**
17      * 每個線程的轉發狀態,用於避免重入事件轉發,初始化狀態為fasle,即不在轉發。
18      */
19     private final ThreadLocal<Boolean> dispatching =
20         new ThreadLocal<Boolean>() {
21           @Override
22           protected Boolean initialValue() {
23             return false;
24           }
25         };
26 
27     @Override
28     void dispatch(Object event, Iterator<Subscriber> subscribers) {
29       checkNotNull(event);
30       checkNotNull(subscribers);
31       Queue<Event> queueForThread = queue.get();
// 事件添加進隊列
32 queueForThread.offer(new Event(event, subscribers)); 33 // 當前不在轉發中,開始轉發 34 if (!dispatching.get()) { 35 dispatching.set(true); 36 try { 37 Event nextEvent;
// 迭代從線程隊列中取事件
38 while ((nextEvent = queueForThread.poll()) != null) {
// 迭代事件的Iterator<Subscriber> subscribers,調用訂閱者轉發事件
39 while (nextEvent.subscribers.hasNext()) { 40 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 41 } 42 } 43 } finally { 44 dispatching.remove(); 45 queue.remove(); 46 } 47 } 48 } 49 50 private static final class Event { 51 private final Object event; 52 private final Iterator<Subscriber> subscribers; 53 54 private Event(Object event, Iterator<Subscriber> subscribers) { 55 this.event = event; 56 this.subscribers = subscribers; 57 } 58 } 59 }

可見核心方法在dispatchEvent,調用訂閱者轉發事件

 1   final void dispatchEvent(final Object event) {
 2     executor.execute(
 3         new Runnable() {
 4           @Override
 5           public void run() {
 6             try {
 7  invokeSubscriberMethod(event);
 8             } catch (InvocationTargetException e) {
 9               bus.handleSubscriberException(e.getCause(), context(event));
10             }
11           }
12         });
13   }
14 
15   /**
16    * 執行訂閱者方法18    */
19   @VisibleForTesting
20   void invokeSubscriberMethod(Object event) throws InvocationTargetException {
21     try {
22       method.invoke(target, checkNotNull(event));
23     } catch (IllegalArgumentException e) {
24       throw new Error("Method rejected target/argument: " + event, e);
25     } catch (IllegalAccessException e) {
26       throw new Error("Method became inaccessible: " + event, e);
27     } catch (InvocationTargetException e) {
28       if (e.getCause() instanceof Error) {
29         throw (Error) e.getCause();
30       }
31       throw e;
32     }
33   }

四、總結

本文我們先快速了解Google EventBus總體架構,然后從一個簡單的應用入手知道如何使用,再深入剖析源碼徹底了解原理,並分析了有哪些值得借鑒的地方,最后我們來看一下傳統觀察者模式和EventBus的區別:

傳統觀察者模式和EventBus區別
 

監聽者管理

監聽特定事件

把監聽者注冊到生產者

按事件超類監聽 檢測沒有監聽者的事件 分發事件

傳統觀察者模式

用列表管理監聽者,還要考慮線程同步;或者使用工具類 定義相應的事件監聽者類

調用事件生產者的register方法,開發者必須知道所有事件生產者的類型,才能正確地注冊監聽者

很困難,需要開發者自己去實現匹配邏輯 在每個事件分發方法中添加邏輯代碼 開發者自己寫代碼,包括事件類型匹配、異常處理、異步分發
EventBus 內部已經實現了監聽者管理

以自定義Event為唯一參數創建方法,

並用Subscribe注解標記。

EventBus.register(Object) EventBus自動把事件分發給事件超類的監聽者 EventBus會把所有發布后沒有監聽者處理的事件包裝為DeadEvent

EventBus.post(Object)

異步分發可以直接用EventBus的子類AsyncEventBus

 

==參考==

官方介紹https://github.com/google/guava/wiki/EventBusExplained

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM