前言:
guava提供的eventbus可以很方便的處理一對多的事件問題, 最近正好使用到了,做個小結,使用的demo網上已經很多了,不再贅述,本文主要是源碼分析+使用注意點+新老版本eventbus實現方式對比
一.原理
將定義的hander注冊到eventbus中,eventbus遍歷該handler及其父類中含有@subscribe注解的方法,封裝成subscriber對象,一個event會對應多個方法,Map<EventType.class,List<Subscriber>>,但既然是guava出品,這種情況下一定會用自己家的MultiMap了,接收到event后根據類型匹配對應的subscriber去執行,接下來從源碼角度探究下
二.源碼分析
主要分析注冊與分發處理,會貼相關的源碼的注釋(guava版本github 2021 1月版本),方便你閱讀
1.注冊流程
分析之前我們先簡要拓展下關於guava cache的用法,compute if absent,不存在則計算,對應getOrLoad方法(暴露給用戶的是get()),有則直接返回,
注冊流程抓住一個關鍵點即可,即一個subscriber對應一個被@subscriber標記的method,為了方便閱讀,我把代碼貼到一起
1 /** Registers all subscriber methods on the given listener object. */
2 void register(Object listener) { 3 // key-eventType.class value-List<Subscriber>,一個subscriber對應一個方法
4 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); 5
6 for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { 7 Class<?> eventType = entry.getKey(); 8 Collection<Subscriber> eventMethodsInListener = entry.getValue(); 9 // 並發讀寫
10 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); 11
12 if (eventSubscribers == null) { 13 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); 14 // eventType.class不存在時才put,concurrenthashmap的putIfAbsent() 15 // 有可能為null,用newSet替換
16 eventSubscribers =
17 MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); 18 } 19 // 添加
20 eventSubscribers.addAll(eventMethodsInListener); 21 } 22 } 23
24
25 /**
26 * Returns all subscribers for the given listener grouped by the type of event they subscribe to. 27 */
28 private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { 29 Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); 30 Class<?> clazz = listener.getClass(); 31 for (Method method : getAnnotatedMethods(clazz)) { 32 Class<?>[] parameterTypes = method.getParameterTypes(); 33 Class<?> eventType = parameterTypes[0]; 34 // 創建subscriber時,如果未添加@AllowConcurrentEvents注解則生成同步的subscriber
35 methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); 36 } 37 return methodsInListener; 38 } 39
40 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) { 41 try { 42 return subscriberMethodsCache.getUnchecked(clazz); 43 } catch (UncheckedExecutionException e) { 44 throwIfUnchecked(e.getCause()); 45 throw e; 46 } 47 } 48
49 // 映射關系緩存,getOrload
50 private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
51 CacheBuilder.newBuilder() 52 .weakKeys() 53 .build( 54 new CacheLoader<Class<?>, ImmutableList<Method>>() { 55 @Override 56 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception { 57 return getAnnotatedMethodsNotCached(concreteClass); 58 } 59 }); 60
61 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { 62 // 獲得listener的所有父類及自身的class(包括接口)
63 Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); 64 Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); 65 for (Class<?> supertype : supertypes) { 66 for (Method method : supertype.getDeclaredMethods()) { 67 if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { 68 // TODO(cgdecker): Should check for a generic parameter type and error out
69 Class<?>[] parameterTypes = method.getParameterTypes(); 70 // 參數校驗,@subscribe注解的方法有且有能有一個非原始類型參數
71 checkArgument( 72 parameterTypes.length == 1, 73 "Method %s has @Subscribe annotation but has %s parameters. "
74 + "Subscriber methods must have exactly 1 parameter.", 75 method, 76 parameterTypes.length); 77
78 checkArgument( 79 !parameterTypes[0].isPrimitive(), 80 "@Subscribe method %s's parameter is %s. "
81 + "Subscriber methods cannot accept primitives. "
82 + "Consider changing the parameter to %s.", 83 method, 84 parameterTypes[0].getName(), 85 Primitives.wrap(parameterTypes[0]).getSimpleName()); 86
87 MethodIdentifier ident = new MethodIdentifier(method); 88 // 重寫的方法只放入一次
89 if (!identifiers.containsKey(ident)) { 90 identifiers.put(ident, method); 91 } 92 } 93 } 94 } 95 return ImmutableList.copyOf(identifiers.values()); 96 } 97
98
99 // 創建subscriber
100 static Subscriber create(EventBus bus, Object listener, Method method) { 101 return isDeclaredThreadSafe(method) 102 ? new Subscriber(bus, listener, method) 103 : new SynchronizedSubscriber(bus, listener, method); 104 } 105
106 @VisibleForTesting 107 static final class SynchronizedSubscriber extends Subscriber { 108
109 private SynchronizedSubscriber(EventBus bus, Object target, Method method) { 110 super(bus, target, method); 111 } 112
113 @Override 114 void invokeSubscriberMethod(Object event) throws InvocationTargetException { 115 synchronized (this) { 116 super.invokeSubscriberMethod(event); 117 } 118 } 119 }
值得注意的是subscriber的生成,即便你使用了AsyncEventbus,卻沒有在處理方法上聲明@AllowConcurrentEvents,那么在處理event時仍然是同步執行的,注冊流程並發安全問題請看第三部分
2.分發流程
先看下如何獲得event對應的subscriber
1 public void post(Object event) { 2 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); 3 if (eventSubscribers.hasNext()) { 4 // 分發,dispatcher有三種實現,ImmediateDispatcher(同步處理event,深度優先) 5 // LegacyAsyncDispatcher(異步處理event) 6 // PerThreadQueuedDispatcher(默認,同步調用,廣度優先) 內置隊列,可以保證同一線程內的event的順序
7 dispatcher.dispatch(event, eventSubscribers); 8 } else if (!(event instanceof DeadEvent)) { 9 // the event had no subscribers and was not itself a DeadEvent 10 // 把所有沒有被訂閱的event包裝成deadevent,用戶可以自己定義處理deadevent的方法,作為兜底
11 post(new DeadEvent(this, event)); 12 } 13 } 14
15 Iterator<Subscriber> getSubscribers(Object event) { 16 //獲得event的所有父類及自身的class(包括接口),從獲取subscriber的流程來看,post一個event 17 // 時,除了調用該event的處理方法也會調用該event父類的處理方法
18 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); 19
20 List<Iterator<Subscriber>> subscriberIterators =
21 Lists.newArrayListWithCapacity(eventTypes.size()); 22
23 for (Class<?> eventType : eventTypes) { 24 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); 25 if (eventSubscribers != null) { 26 // eager no-copy snapshot
27 subscriberIterators.add(eventSubscribers.iterator()); 28 } 29 } 30 // 類似flatmap,扁平化
31 return Iterators.concat(subscriberIterators.iterator()); 32 } 33
34 @VisibleForTesting 35 static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) { 36 try { 37 return flattenHierarchyCache.getUnchecked(concreteClass); 38 } catch (UncheckedExecutionException e) { 39 throw Throwables.propagate(e.getCause()); 40 } 41 } 42
43 private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> flattenHierarchyCache =
44 CacheBuilder.newBuilder() 45 .weakKeys() 46 .build( 47 new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() { 48 // <Class<?>> is actually needed to compile
49 @SuppressWarnings("RedundantTypeArguments") 50 @Override 51 public ImmutableSet<Class<?>> load(Class<?> concreteClass) { 52 return ImmutableSet.<Class<?>>copyOf( 53 TypeToken.of(concreteClass).getTypes().rawTypes()); 54 } 55 });
從代碼可以看出,先對該event查詢上級,最后把所有event對應的subscriber返回,因此觸發一個event時,其父event的subscriber也會被調用
接下來看下post,流程eventbus有三種dispatcher(ImmediaDispatcher,PerThreadDispatcher,LegacyAsyncDispatcher)eventbus使用的是PerThreadDispatcher,AsyncEventBus使用LegacyAsyncDispatcher
①ImmediaDispatcher
從名字中的Immedia"即時"就能看出這個dispatcher收到event后會立即處理,不會進行異步處理
代碼如下:
從圖中可以看出ImmediaDispatcher是針對每個event,調用其全部的subscriber進行處理,即盡可能多的調用subscriber,所以是廣度優先,這個dispatcher目前未被使用,了解即可
②PerThreadQueueDispatcher(默認的dispatcher)
同樣從名稱可以看出這種dispatcher是一個thread一個queue,那我們可以猜測內部有可能用了ThreadLocal,既然用了隊列,說明想要起到一個緩沖event處理的過程
隊列的緩沖功能使得dispatcher有能力吞吐更高的event,因此是一種深度優先策略,此外每線程每隊列的方式保證了event處理過程是對於每個線程而言是有序的,同樣是廣度優先,對
每一個event都分發到相關的subscriber進行處理,除此之外還有一個值得稱道的點,即Dispatching變量的使用,規避了遞歸產生的死循環問題
1 private static final class PerThreadQueuedDispatcher extends Dispatcher { 2
3 // This dispatcher matches the original dispatch behavior of EventBus.
4
5 /** Per-thread queue of events to dispatch. */
6 private final ThreadLocal<Queue<Event>> queue =
7 new ThreadLocal<Queue<Event>>() { 8 @Override 9 protected Queue<Event> initialValue() { 10 return Queues.newArrayDeque(); 11 } 12 }; 13
14 /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
15 private final ThreadLocal<Boolean> dispatching =
16 new ThreadLocal<Boolean>() { 17 @Override 18 protected Boolean initialValue() { 19 return false; 20 } 21 }; 22
23 @Override 24 void dispatch(Object event, Iterator<Subscriber> subscribers) { 25 checkNotNull(event); 26 checkNotNull(subscribers); 27 // 如果只從代碼來看,PerThreadQueuedDispatcher的dispatch方法始終 28 // 是單線程調用,並不需要ThreadLocal,但從拓展的角度看,當用戶自定義xxeventbus自己實現分發邏輯時,PerThreadQueuedDispatcher實現了線程安全的dispatch 29 //因為eventbus有可能會被多個線程調用,從框架的角度看,無論用戶是否多線程調用,都應該要保證線程安全 30 // 引用issue 3530中 https://github.com/google/guava/issues/3530 的一個回答 if multiple threads are dispatching to this dispatcher, they will read different values for queueForThread and dispatching.
31 Queue<Event> queueForThread = queue.get(); 32 queueForThread.offer(new Event(event, subscribers)); 33
34 // 如果未開始分發事件則進行處理,解決subscriber遞歸調用post產生的死循環
35 if (!dispatching.get()) { 36 dispatching.set(true); 37 try { 38 Event nextEvent; 39 // 對每一個event,分發到相關的subscribers中
40 while ((nextEvent = queueForThread.poll()) != null) { 41 while (nextEvent.subscribers.hasNext()) { 42 nextEvent.subscribers.next().dispatchEvent(nextEvent.event); 43 } 44 } 45 } finally { 46 dispatching.remove(); 47 queue.remove(); 48 } 49 } 50 }
接下來看下剛剛說的dispatching的妙用demo
在guava-test下建立一個新的目錄方便我們修改源碼后進行測試,測試代碼如下
Listener
1 /** 2 * @author tele 3 * @Description 4 * @create 2020-11-23 5 */ 6 public class Listener { 7 8 private final EventBus eventBus; 9 10 public Listener(EventBus eventBus) { 11 this.eventBus = eventBus; 12 } 13 14 @Subscribe 15 public void record(String s) { 16 eventBus.post(s); 17 System.out.println("receive:"+ s); 18 } 19 }
Producer
1 /** 2 * @author tele 3 * @Description 4 * @create 2020-11-23 5 */ 6 public class Producer { 7 8 public String produce() { 9 return "hello"; 10 } 11 }
Main
1 /**
2 * @author tele 3 * @Description 4 * @create 2020-11-23 5 */
6 public class Main { 7
8 public static void main(String[] args) { 9 EventBus eventBus = new EventBus(); 10 Listener listener = new Listener(eventBus); 11 Producer producer = new Producer(); 12 eventBus.register(listener); 13 String produce = producer.produce(); 14 eventBus.post(produce); 15 } 16
17 }
代碼很簡單,問題在於Listener遞歸調用了post方法,按照代碼示意運行后會棧溢出(隊列中event堆積),receive:hello永遠不會打印,可事實真的如此嗎?
很奇怪是嗎,並沒有產生堆棧溢出的問題,反而是不停的輸出receive:hello,接下來我們修改下PerThreadDispatcher的代碼,將dispatching變量注釋掉
再執行下demo
果然溢出了,關鍵點就在於dispatching變量對於同一線程的遞歸分發進行了處理,已經處理過就不再次進行分發,這樣我們的遞歸調用不停的產生的event得以被處理
③LegacyAsyncDispatcher
看名字挺奇怪的,但有async字樣,所以是異步的dispatcher,LegacyAsyncDispacther是AsyncEventBus的專用dispatcher,由於將event對應的subscriber拆分后入隊,多線程情況下無法保證event入隊順序,也就無法保證subscriber的調用順序,但這樣處理實現了深度優先,即盡可能多的調用不同的event的subscriber,與PerThreadDispatcher相比代碼難度小了不少,由於AsyncEventBus的初始化需要傳入線程池參數,所以AsyncEventBus實現了真正的異步處理
1 /** Implementation of a {@link #legacyAsync()} dispatcher. */
2 private static final class LegacyAsyncDispatcher extends Dispatcher { 3
4 // This dispatcher matches the original dispatch behavior of AsyncEventBus. 5 //
6 // We can't really make any guarantees about the overall dispatch order for this dispatcher in 7 // a multithreaded environment for a couple reasons: 8 //
9 // 1. Subscribers to events posted on different threads can be interleaved with each other 10 // freely. (A event on one thread, B event on another could yield any of 11 // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) 12 // 2. It's possible for subscribers to actually be dispatched to in a different order than they 13 // were added to the queue. It's easily possible for one thread to take the head of the 14 // queue, immediately followed by another thread taking the next element in the queue. That 15 // second thread can then dispatch to the subscriber it took before the first thread does. 16 //
17 // All this makes me really wonder if there's any value in queueing here at all. A dispatcher 18 // that simply loops through the subscribers and dispatches the event to each would actually 19 // probably provide a stronger order guarantee, though that order would obviously be different 20 // in some cases.
21
22 /** Global event queue. */
23 private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
24 Queues.newConcurrentLinkedQueue(); 25
26 @Override 27 void dispatch(Object event, Iterator<Subscriber> subscribers) { 28 checkNotNull(event); 29 // 拆分后入隊
30 while (subscribers.hasNext()) { 31 queue.add(new EventWithSubscriber(event, subscribers.next())); 32 } 33
34 EventWithSubscriber e; 35 while ((e = queue.poll()) != null) { 36 e.subscriber.dispatchEvent(e.event); 37 } 38 } 39
40 private static final class EventWithSubscriber { 41 private final Object event; 42 private final Subscriber subscriber; 43
44 private EventWithSubscriber(Object event, Subscriber subscriber) { 45 this.event = event; 46 this.subscriber = subscriber; 47 } 48 } 49 }
注意點:
1.eventbus默認使用的線程池MoreExecutors.directExecutor(),其execute方法是直接調用傳入的runnable的run方法,是非異步的
2.使用AsyncEventBus時,請在對應的方法上添加@AllowConcurrenEvents
三.從並發安全的角度出發,對比下新老版本的注冊流程
本部分為補充內容,重點探討新老版本的注冊並發安全問題,可略過
從20.0開始,event bus的注冊程變成了上面分析的,那么之前的版本是如何實現的呢,一起來分析下.先切到16.0 的tag,注冊代碼如下
顯然是使用了讀寫鎖,不加鎖,eventType會相互覆蓋(HashMultiMap是非線程安全的),先給eventbus加個getSubscriberByType(),記得修改下EventSubscriber的修飾符為public,然后做個多線程的測試
1 /** 2 * @author tele 3 * @Description 4 * @create 2021-01-24 5 */ 6 public class ListenerA { 7 8 @Subscribe 9 public void handle(String msg) { 10 System.out.println("ListenerA:" + msg); 11 } 12 13 } 14 15 /** 16 * @author tele 17 * @Description 18 * @create 2021-01-24 19 */ 20 public class ListenerB { 21 22 @Subscribe 23 public void handle(String msg) { 24 System.out.println("ListenerB:" + msg); 25 } 26 27 } 28 29 /** 30 * @author tele 31 * @Description 32 * @create 2021-01-24 33 */ 34 public class Main { 35 36 37 public static void main(String[] args) throws InterruptedException { 38 39 final EventBus eventBus = new EventBus(); 40 final ListenerA a = new ListenerA(); 41 ListenerB b = new ListenerB(); 42 CountDownLatch countDownLatch = new CountDownLatch(6); 43 44 Runnable r1 = ()-> { 45 eventBus.register(a); 46 countDownLatch.countDown(); 47 }; 48 Thread t1 = new Thread(r1); 49 Thread t2 = new Thread(r1); 50 Thread t3 = new Thread(r1); 51 52 Runnable r2 = ()-> { 53 eventBus.register(b); 54 countDownLatch.countDown(); 55 }; 56 Thread t4 = new Thread(r2); 57 Thread t5 = new Thread(r2); 58 Thread t6 = new Thread(r2); 59 60 t1.start(); 61 t2.start(); 62 t3.start(); 63 t4.start(); 64 t5.start(); 65 t6.start(); 66 countDownLatch.await(); 67 SetMultimap<Class<?>, EventSubscriber> subscribersByType = eventBus.getSubscribersByType(); 68 subscribersByType.asMap().forEach((k,v)-> { 69 System.out.println("key:" + k); 70 v.forEach(System.out::println); 71 }); 72 } 73 }
輸出結果如下:
ok,沒啥問題,接下來再修改下源碼把使用讀寫鎖的兩行代碼注釋掉,再執行下代碼
輸出結果如下:
顯然,ListenerA的注冊結果被覆蓋了,這里簡要說下原因,subscribersByType,k-v結構簡略表示為 K-event.class ,value-Set<Listener.class>,我們知道java中的hashset不重復的特性是基於hashmap實現的.同樣的,這里的SetMultiMap實際是用的HashMultiMap,翻翻源碼就知道了,內部存儲數據的容器是hashmap,那么這個問題就轉換成了hashmap的線程安全問題了,hashmap多線程put hash相同的元素會產生丟失問題,多線程下同時put get有可能導致get 出null.了解到這我們就知道為什么要加鎖了,使用讀寫鎖的版本一直持續到19.0,從20.0開始從開始使用並發容器代替讀寫鎖,因為對於eventbus而言始終是讀遠大於寫,基於cow機制實現的CopyOnWriteArrayList在讀寫同時進行時通過延遲更新的策略不阻塞線程,對於event的處理 而言是可以接受的,因為本次event在post時沒有分發到對應的subsriber,下次同類型的event觸發就ok了,事實上,這種場景極少,因為從使用經歷來看,一般是項目啟動時就注冊,分發都是需要處理邏輯時才會觸發,不阻塞與每次都需要加解讀鎖相比,顯然不阻塞的性能更好了.老版本的分發流程不再贅述,因為確實沒啥好分析的了,如果你能看懂上面分析的新版本的dispatcher,當你看老版本的時候就會感覺很簡單了
四.優勢與缺陷
1.進程內使用,無法實現跨進程處理,需要跨進程傳遞消息,還是老老實實的用消息隊列吧
2.和redis一樣基於內存,天然的不可靠,redis好歹還有aof和rdb,可event bus沒有任何持久化機制
3.個人對新版的Subscriber實現方式有點看法,沒必須要把線程池參數傳遞給Subscriber,因為Subscriber只是被執行者,16.0的版本線程池參數是AsyncEventBus持有
4.優勢:簡單,開箱即用
五.小結
1.只分析了注冊與分發流程,異常處理之類的沒有涉及,用法的話,網上已經很多了,不再贅述
2.event bus的代碼很巧妙,細細品味還有很多巧妙之處,比如上面那個dispatching變量
六.參考文檔
1.github https://github.com/google/guava/wiki/EventBusExplained#for-producers