一.背景
事件機制作為一種編程機制,在很多開發語言中都提供了支持,同時許多開源框架的設計中都使用了事件機制,比如SpringFramework。
在 Java 語言中,Java 的事件機制參與者有3種角色:
1.Event Source:具體的事件源,比如說,你在界面點擊一個 button 按鈕,那么這個按鈕就是事件源,要想使按鈕對某些事件進行響應,你就需要注冊特定的監聽器 listener,事件源將事件對象傳遞給所有注冊的監聽器;
2.Event Object:事件狀態對象,用於監聽器的相應的方法之中,作為參數;
3.Event Listener:事件監聽器,當它監聽到 event object 產生的時候,它就調用相應的方法進行處理;
上面3個角色就定義了事件機制的基本模型。
流程是:
1)首先我們將監聽器對象注冊給事件源對象,這樣當事件觸發時系統便可以通過事件源訪問相應的監聽器;
2)當事件源觸發事件后,系統將事件的相關信息封裝成相應類型的事件對象,並將其發送給注冊到事件源的相應監聽器;
3)當事件對象發送給監聽器后,系統調用監聽器相應的事件處理方法對事件進行處理,也就是做出響應;
PS:監聽器與事件源之間是“多對多”的關系。
下面對幾個概念進行詳細介紹:
1)事件源:事件最初由事件源產生,事件源可以是 Java Bean 或由生成事件能力的對象。在 Java 中,每一個組件會產生什么樣的事件,已經被定義好了。或者說,對於任何一個事件來說,哪些組件可以產生它,已經是確定的了。
2)事件對象:在事件對象中最高層是 java.util.EventObject,所有事件狀態對象都是繼承該類的派生類,比如 Spring-context 的事件機制的根類為 ApplicationEvent,而 Apollo 配置中心的事件處理類 AppCreationEvent 是繼承自 Spring 的;
除了 EventObject是在 util 包中,其它都在 java.awt、java.awt.event 包或 java.swing、java.swing.event 包中,比如有 AWTEvent、ActionEvent、AdjustmentEvent等等;
-
- EventObject:該類除了從 Object 類中繼承下來的方法外還有一個 getSource 方法,其功能就是返回最初發生 Event 的對象;
public class EventObject implements java.io.Serializable { protected transient Object source; public EventObject(Object source) { if (source == null) throw new IllegalArgumentException("null source"); this.source = source; } public Object getSource() { return source; } public String toString() { return getClass().getName() + "[source=" + source + "]"; } }
- ApplicationEvent:
public abstract class ApplicationEvent extends EventObject { private static final long serialVersionUID = 7099057708183571937L; private final long timestamp = System.currentTimeMillis(); public ApplicationEvent(Object source) { super(source); } public final long getTimestamp() { return this.timestamp; } }
- AppCreationEvent:
public class AppCreationEvent extends ApplicationEvent { public AppCreationEvent(Object source) { super(source); } public App getApp() { Preconditions.checkState(source != null); return (App) this.source; } }
- EventObject:該類除了從 Object 類中繼承下來的方法外還有一個 getSource 方法,其功能就是返回最初發生 Event 的對象;
3) 監聽器對象:監聽器對象就是一個實現了特定監聽器接口的類的實例,在監聽器接口的最頂層接口是 java.util.EventListener,這個接口是所有事件偵聽器接口必須擴展的標記接口。感到詫異的是這個接口完全是空的,里面沒有任何的抽象方法的定義;
public interface EventListener { }
事件監聽器的接口命名方式為:XXListener,而且,在java中,這些接口已經被定義好了。用來被實現,它定義了事件處理器(即事件處理的方法原型,這個方法需要被重新實現)。例如:ActionListener接口、MouseListener接口、WindowListener接口等;
說了這么多,現在要回到正題來,說到 Spring 的事件機制,就得從 Spring 的容器開始說起,在 IOC 容器的啟動過程,當所有的 bean 都已經初始化處理完成之后,Spring IOC 容器會有一個發布事件的動作。
public void refresh() throws BeansException, IllegalStateException { //來個鎖,不然 refresh() 還沒結束,你又來個啟動或銷毀容器的操作,那不就亂套了嘛 synchronized (this.startupShutdownMonitor) { .............. //初始化容器的信息源 initMessageSource(); //初始化事件監聽多路廣播器 initApplicationEventMulticaster(); //是個空殼方法,在AnnotationApplicationContex上下文中沒有實現,可能在spring后面的版本會去擴展。 onRefresh(); //注冊監聽器 registerListeners(); //對象的創建:初始化剩下所有的(非懶加載的)單實例對象 finishBeanFactoryInitialization(beanFactory); //刷新完成工作,包括初始化LifecycleProcessor,發布刷新完成事件等 finishRefresh(); } ................. }
在 AbstractApplicationContext 類的 finishRefresh 方法,里面就會發布(廣播)一條代表初始化結束的消息:
protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). clearResourceCaches(); // Initialize lifecycle processor for this context. initLifecycleProcessor(); // Propagate refresh to lifecycle processor first. getLifecycleProcessor().onRefresh(); //發布(廣播)一條消息,類型ContextRefreshedEvent代表Spring容器初始化結束 publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. LiveBeansView.registerApplicationContext(this); }
這樣,當 Spring IOC 容器加載處理完相應的 bean 之后,也給我們提供了一個機會,可以去做一些自己想做的事情。這也是 Spring IOC 容器提供給外部擴展的地方,我們可以使用這個擴展機制,來實現一些特殊的業務需求。
比如讓我們的 bean 實現 ApplicationListener 接口,這樣當發布事件時, Spring 的 IOC 容器就會以容器中的實例對象作為事件源類,並從中找到事件的監聽者,此時會執行相應的方法比如 onApplicationEvent(E event),我們的業務邏輯代碼就會寫在這個方法里面。這樣我們的目的就可以達到了,但你這是可能會有一個疑問,這樣的代碼我們可以通過實現 InitializingBean 接口來實現啊,也會被 Spring 容器自動調用。但如果我們這時的需求是我們要做的事情,是必要要等到所有的 bean 都被處理完成之后再進行,此時 InitializingBean 接口就步合適了。(可查看下文案例3)
Spring 的事件機制是觀察者設計模式的實現,通過 ApplicationEvent 和 ApplicationListener 接口,可以實現容器事件處理。
ApplicationListener 監聽 ApplicationEvent 及其子類的事件:
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { void onApplicationEvent(E event); }
如果容器中有一個監聽器,每當容器發布事件時,監聽器將自動被觸發,當然這種事件機制必須需要程序顯示的觸發。
其中 Spring 有一些內置的事件,當完成某種操作時會發出某些事件動作。比如上文中的監聽 ContextRefreshedEvent 事件,當所有的bean都初始化完成並成功裝載后會觸發該事件,實現 ApplicationListener<ContextRefreshedEvent> 接口可以收到監聽動作,如何實現自己的業務邏輯。
下面是一些 Spring 的內置事件:
-
- ContextRefreshedEvent:ApplicationContext 被初始化或刷新時,該事件被發布。這也可以在 ConfigurableApplicationContext 接口中使用 refresh() 方法來發生。此處的初始化是指:所有的Bean 被成功裝載,后置處理器 Bean 被檢測並激活,所有 Singleton Bean 被預實例化,ApplicationContext 容器已就緒可用;【容器刷新完成所有 bean 都完全創建會發布這個事件】
- ContextStartedEvent:當使用 ConfigurableApplicationContext (ApplicationContext子接口)接口中的 start() 方法啟動 ApplicationContext 時,該事件被發布。你可以調查你的數據庫,或者你可以在接受到這個事件后重啟任何停止的應用程序;
- ContextStoppedEvent:當使用 ConfigurableApplicationContext 接口中的 stop() 停止 ApplicationContext 時,發布這個事件。你可以在接受到這個事件后做必要的清理的工作;
-
ContextClosedEvent:當使用 ConfigurableApplicationContext 接口中的 close() 方法關閉 ApplicationContext 時,該事件被發布。一個已關閉的上下文到達生命周期末端,它不能被刷新或重啟;【關閉容器會發布這個事件】
-
RequestHandledEvent:這是一個 web-specific 事件,告訴所有 bean HTTP 請求已經被服務。只能應用於使用 DispatcherServlet 的 Web 應用。在使用 Spring 作為前端的MVC控制器時,當Spring處理用戶請求結束后,系統會自動觸發該事件;
二.案例
1.簡單實現自定義監聽器,初步認識監聽器:結合上篇文章案例,實現監聽器監聽 Spring 容器變化
@Configuration public class MyApplicationListener implements ApplicationListener<ApplicationEvent> { //當容器發布此事件之后,方法觸發 @Override public void onApplicationEvent(ApplicationEvent event) { System.out.println("當前收到的事件:"+event); } } =========測試運行結果========= [MyBeanDefinitionRegistryPostProcessor]postProcessBeanDefinitionRegistry--->bean的數量:11 十一月 03, 2020 10:17:59 下午 org.springframework.context.annotation.ConfigurationClassPostProcessor enhanceConfigurationClasses 信息: Cannot enhance @Configuration bean definition 'myBeanDefinitionRegistryPostProcessor' since its singleton instance has been created too early. The typical cause is a non-static @Bean method with a BeanDefinitionRegistryPostProcessor return type: Consider declaring such methods as 'static'. [MyBeanDefinitionRegistryPostProcessor]postProcessBeanFactory--->bean的數量:12 [MyBeanFactoryPostProcessor]調用了postProcessBeanFactory [MyBeanFactoryPostProcessor]當前beanFactory共有12個bean [MyBeanFactoryPostProcessor]當前beanFactory有下面組件[org.springframework.context.annotation.internalConfigurationAnnotationProcessor, org.springframework.context.annotation.internalAutowiredAnnotationProcessor, org.springframework.context.annotation.internalCommonAnnotationProcessor, org.springframework.context.event.internalEventListenerProcessor, org.springframework.context.event.internalEventListenerFactory, extConfig, myApplicationListener, myBeanDefinitionRegistryPostProcessor, myBeanFactoryPostProcessor, myBeanPostProcessor, person, color] PropertyValues: length=0 [MyBeanFactoryPostProcessor]postProcessBeanFactory方法中修改了name屬性初始值了 PropertyValues: length=1; bean property 'name' [MyBeanPostProcessor]后置處理器處理bean=【extConfig】開始 [MyBeanPostProcessor]后置處理器處理bean=【extConfig】完畢! [MyBeanPostProcessor]后置處理器處理bean=【myApplicationListener】開始 [MyBeanPostProcessor]后置處理器處理bean=【myApplicationListener】完畢! Person有參構造器:[name=張三,sex=男] [Person]調用了BeanNameAware的setBeanName方法了:person [Person]調用了BeanFactoryAware的setBeanFactory方法了:org.springframework.beans.factory.support.DefaultListableBeanFactory@e45f292: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.event.internalEventListenerProcessor,org.springframework.context.event.internalEventListenerFactory,extConfig,myApplicationListener,myBeanDefinitionRegistryPostProcessor,myBeanFactoryPostProcessor,myBeanPostProcessor,person,color]; root of factory hierarchy [MyBeanPostProcessor]后置處理器處理bean=【person】開始 [Person]調用了Initailization的afterPropertiesSet方法了 [MyBeanPostProcessor]后置處理器處理bean=【person】完畢! [MyBeanPostProcessor]后置處理器處理bean=【color】開始 [MyBeanPostProcessor]后置處理器處理bean=【color】完畢! 當前收到的事件:org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e817b38, started on Tue Nov 03 22:17:59 CST 2020] Person [name=趙四, sex=null] 當前收到的事件:org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e817b38, started on Tue Nov 03 22:17:59 CST 2020] [Person]調用了DisposableBean的destroy方法了
2.自定義發布一個事件,步驟如下:
1)實現一個監聽器來監聽某個事件(事件是實現了ApplicationEvent 及其子類的事件);
2)把監聽器加入到容器中;
2)發布事件,只要容器有相關事件的發布,我們就能監聽到這個事件;
public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExtConfig.class); Person bean = context.getBean(Person.class); System.out.println(bean.toString()); //發布事件 context.publishEvent(new ApplicationEvent(new String("自定義事件")) { }); context.close(); } =========測試運行結果========= [MyBeanDefinitionRegistryPostProcessor]postProcessBeanDefinitionRegistry--->bean的數量:11 ............... [MyBeanPostProcessor]后置處理器處理bean=【color】完畢! 當前收到的事件:org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e817b38, started on Tue Nov 03 22:40:39 CST 2020] Person [name=趙四, sex=null] 當前收到的事件:com.hrh.ext.ExtTest$1[source=自定義事件] 當前收到的事件:org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e817b38, started on Tue Nov 03 22:40:39 CST 2020] [Person]調用了DisposableBean的destroy方法了
從上面的運行結果可以看出,我們在容器中添加了一個事件,當我們發布事件時,監聽器會監聽到事件並把事件的內容發布出來。
3.自定義監聽器,實現容器的 bean都初始化后執行相應的操作,比如執行特定的方法:
1)自定義注解:
//該注解作用在類上 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface BaseService { }
2)兩個測試 Mapper:
@Configuration @BaseService public class TaskScheduleJobMapper { public void initMapper() { System.out.println(">>>>> 【initMapper】Start job <<<<<"); } } @Configuration @BaseService public class TaskScheduleJobTxlogMapper { public void initMapper() { System.out.println(">>>>> 【initMapper】Recording log <<<<<"); } }
3)測試系統入口:
public interface BaseInterface { public void init(); } @Configuration public class BaseInterfaceImpl implements BaseInterface { @Override public void init() { System.out.println("System start........"); } }
4)自定義監聽器:
@Configuration public class ApplicationContextListener implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { // root application context if (null == contextRefreshedEvent.getApplicationContext().getParent()) { System.out.println(">>>>> Spring初始化完畢 <<<<<"); // spring初始化完畢后,通過反射調用所有使用BaseService注解的initMapper方法 Map<String, Object> baseServices = contextRefreshedEvent.getApplicationContext().getBeansWithAnnotation(BaseService.class); for (Object service : baseServices.values()) { System.out.println(">>>>> {" + service.getClass().getName() + "}.initMapper():"); try { Method initMapper = service.getClass().getMethod("initMapper"); initMapper.invoke(service); } catch (Exception e) { System.out.println("初始化BaseService的initMapper方法異常:" + e); e.printStackTrace(); } } // 系統入口初始化 Map<String, BaseInterface> baseInterfaceBeans = contextRefreshedEvent.getApplicationContext().getBeansOfType(BaseInterface.class); for (Object service : baseInterfaceBeans.values()) { System.out.println(">>>>> {" + service.getClass().getName() + "}.init()"); try { Method init = service.getClass().getMethod("init"); init.invoke(service); } catch (Exception e) { System.out.println("初始化BaseInterface的init方法異常:" + e); e.printStackTrace(); } } } } } =======測試運行結果======= [MyBeanDefinitionRegistryPostProcessor]postProcessBeanDefinitionRegistry--->bean的數量:14 ................ [MyBeanPostProcessor]后置處理器處理bean=【color】開始 [MyBeanPostProcessor]后置處理器處理bean=【color】完畢! >>>>> Spring初始化完畢 <<<<< >>>>> {com.hrh.ext.TaskScheduleJobMapper$$EnhancerBySpringCGLIB$$6bfe7114}.initMapper(): >>>>> 【initMapper】Start job <<<<< >>>>> {com.hrh.ext.TaskScheduleJobTxlogMapper$$EnhancerBySpringCGLIB$$7132ffe6}.initMapper(): >>>>> 【initMapper】Recording log <<<<< >>>>> {com.hrh.ext.BaseInterfaceImpl$$EnhancerBySpringCGLIB$$f49a26ba}.init() System start........ Person [name=趙四, sex=null] [Person]調用了DisposableBean的destroy方法了
4.自定義事件及監聽並進行發布
1)自定義事件:
public class EmailEvent extends ApplicationEvent { private String address; private String text; /** * Create a new {@code ApplicationEvent}. * * @param source the object on which the event initially occurred or with * which the event is associated (never {@code null}) */ public EmailEvent(Object source) { super(source); } public EmailEvent(Object source, String address, String text) { super(source); this.address = address; this.text = text; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public String getText() { return text; } public void setText(String text) { this.text = text; } }
2)自定義監聽器監聽自定義事件:
@Component public class EmailListener implements ApplicationListener { @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof EmailEvent) { EmailEvent emailEvent = (EmailEvent) event; System.out.println("郵件地址:" + emailEvent.getAddress()); System.out.println("郵件內容:" + emailEvent.getText()); } else { System.out.println("容器本身事件:" + event); } } }
3)測試發布事件:
public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExtConfig.class); Person bean = context.getBean(Person.class); System.out.println(bean.toString()); //創建一個ApplicationEvent對象 EmailEvent event = new EmailEvent("hello","249968839@qq.com","This is a event test"); //主動觸發該事件 context.publishEvent(event); context.close(); } =======測試運行結果======= [MyBeanDefinitionRegistryPostProcessor]postProcessBeanDefinitionRegistry--->bean的數量:15 ............. >>>>> Spring初始化完畢 <<<<< >>>>> {com.hrh.ext.TaskScheduleJobMapper$$EnhancerBySpringCGLIB$$45955a2d}.initMapper(): >>>>> 【initMapper】Start job <<<<< >>>>> {com.hrh.ext.TaskScheduleJobTxlogMapper$$EnhancerBySpringCGLIB$$4ac9e8ff}.initMapper(): >>>>> 【initMapper】Recording log <<<<< >>>>> {com.hrh.ext.BaseInterfaceImpl$$EnhancerBySpringCGLIB$$e5b13f13}.init() System start........ 容器本身事件:org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Fri Nov 06 22:55:23 CST 2020] Person [name=趙四, sex=null] 容器本身事件:com.hrh.ext.ExtTest$1[source=自定義事件] 郵件地址:249968839@qq.com 郵件內容:This is a event test 容器本身事件:org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Fri Nov 06 22:55:23 CST 2020] [Person]調用了DisposableBean的destroy方法了
三.原理
上面說了幾個案例,現在你應該知道怎么使用了吧,接下來通過 debug 代碼來分析它的執行流程。
1.先來看看 ContextRefreshedEvent事件是怎么發布的,還是從熟悉的配方 refresh() 講起,當容器刷新后,可以看到它調用了 finishRefresh() 來刷新執行事件:
public void refresh() throws BeansException, IllegalStateException { //來個鎖,不然 refresh() 還沒結束,你又來個啟動或銷毀容器的操作,那不就亂套了嘛 synchronized (this.startupShutdownMonitor) { .............. //初始化容器的信息源 initMessageSource(); //初始化事件監聽多路廣播器 initApplicationEventMulticaster(); //是個空殼方法,在AnnotationApplicationContex上下文中沒有實現,可能在spring后面的版本會去擴展。 onRefresh(); //注冊監聽器 registerListeners(); //對象的創建:初始化剩下所有的(非懶加載的)單實例對象 finishBeanFactoryInitialization(beanFactory); //刷新完成工作,包括初始化LifecycleProcessor,發布刷新完成事件等 finishRefresh(); } ................. }
2.在 AbstractApplicationContext.finishRefresh 方法中就會發布(廣播)一條代表初始化結束的消息:publishEvent(new ContextRefreshedEvent(this))
protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). clearResourceCaches(); // Initialize lifecycle processor for this context. initLifecycleProcessor(); // Propagate refresh to lifecycle processor first. getLifecycleProcessor().onRefresh(); //發布(廣播)一條消息,類型ContextRefreshedEvent代表Spring容器初始化結束 publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. LiveBeansView.registerApplicationContext(this); }
3.繼續 publishEvent 方法會發現執行了getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType) 來廣播消息:
public void publishEvent(ApplicationEvent event) { publishEvent(event, null); } protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); // Decorate event as an ApplicationEvent if necessary ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event;//類型轉換 } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType(); } } // Multicast right now if possible - or lazily once the multicaster is initialized //初始化過程中的registerListeners方法會把earlyApplicationEvents設置為空,(早期事件,容器初始化時使用,可以忽略) if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { //執行廣播消息 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } // Publish event via parent context as well... 方便使用父類進行發布事件 if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
PS:publishEvent 方法是發布(廣播)服務的核心能力,而它定義在 ApplicationEventPublisher 接口中,ApplicationContext接口繼承了 ApplicationEventPublisher,所以 AbstractApplicationContext抽象類(ApplicationContext接口的實現類)就實現了該方法,也具有了發送廣播的能力。
上面的 getApplicationEventMulticaster() 是什么東西呢?需要深入了解下,它的作用是獲取事件的多播器(派發器),即將事件發送給多個監聽器,讓監聽器執行相應的邏輯。
private ApplicationEventMulticaster applicationEventMulticaster; ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException { if (this.applicationEventMulticaster == null) { throw new IllegalStateException("ApplicationEventMulticaster not initialized - " + "call 'refresh' before multicasting events via the context: " + this); } return this.applicationEventMulticaster; }
從上面的代碼可以看出,applicationEventMulticaster是 AbstractApplicationContext 的私有成員變量,那么這個多播器(派發器)是怎么獲取到的呢?在前面的 refresh 方法中有一個initApplicationEventMulticaster()方法,就是調用 AbstractApplicationContext.initApplicationEventMulticaster() 來初始化這個多播器(派發器)的:
protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = getBeanFactory(); //從bean工廠查找有沒有一個bean為applicationEventMulticaster,如果有,從容器中拿出來 if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (logger.isTraceEnabled()) { logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } else { //如果沒有,則往容器中注冊一個SimpleApplicationEventMulticaster,名字為applicationEventMulticaster,如果派發事件需要就可以使用了 this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); if (logger.isTraceEnabled()) { logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " + "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); } } }
4.說完了 getApplicationEventMulticaster(),再來說說 multicastEvent(),它的作用是派發事件,它是 ApplicationEventMulticaster接口的一個方法,所以它會調用實現類 SimpleApplicationEventMul
ticaster(上面注冊的多播器)的multicastEvent 方法:
public interface ApplicationEventMulticaster { void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType); } //public abstract class AbstractApplicationEventMulticaster implements ApplicationEventMulticaster public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster { @Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); //根據消息類型取出對應的所有監聽器 for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { //如果可以使用多線程執行,就使用多線程來異步派發事件,執行監聽器的方法 if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } } }
5.最后來看看 invokeListener 方法,它會拿到監聽器來回調 MyApplicationListener. onApplicationEvent方法,然后控制台就輸出了信息 :
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null) { try { doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { doInvokeListener(listener, event);//執行 } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { //回調onApplicationEvent listener.onApplicationEvent(event); } catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass())) { // Possibly a lambda-defined listener which we could not resolve the generic event type for // -> let's suppress the exception and just log a debug message. Log logger = LogFactory.getLog(getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, ex); } } else { throw ex; } } }
6.當再執行案例2的事件時,從AbstractApplicationContext. publishEvent()(上面第3步)開始執行,步驟還是multicastEvent派發事件 --> invokeListener回調執行 onApplicationEvent;
7.當容器關閉時,會調用doClose(),里面有個ContextClosedEvent事件,監聽器監聽到事件控制台就輸出了信息:
context.close(); @Override public void close() { synchronized (this.startupShutdownMonitor) { doClose(); // If we registered a JVM shutdown hook, we don't need it anymore now: // We've already explicitly closed the context. if (this.shutdownHook != null) { try { Runtime.getRuntime().removeShutdownHook(this.shutdownHook); } catch (IllegalStateException ex) { // ignore - VM is already shutting down } } } } protected void doClose() { // Check whether an actual close attempt is necessary... if (this.active.get() && this.closed.compareAndSet(false, true)) { if (logger.isDebugEnabled()) { logger.debug("Closing " + this); } LiveBeansView.unregisterApplicationContext(this); try { // Publish shutdown event. publishEvent(new ContextClosedEvent(this)); } catch (Throwable ex) { logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex); } // Stop all Lifecycle beans, to avoid delays during individual destruction. if (this.lifecycleProcessor != null) { try { this.lifecycleProcessor.onClose(); } catch (Throwable ex) { logger.warn("Exception thrown from LifecycleProcessor on context close", ex); } } // Destroy all cached singletons in the context's BeanFactory. destroyBeans(); // Close the state of this context itself. closeBeanFactory(); // Let subclasses do some final clean-up if they wish... onClose(); // Reset local application listeners to pre-refresh state. if (this.earlyApplicationListeners != null) { this.applicationListeners.clear(); this.applicationListeners.addAll(this.earlyApplicationListeners); } // Switch to inactive. this.active.set(false); } }
8.上面在第4步中派發事件操作 getApplicationListeners 拿到了所有的監聽器,那么容器中有哪些監聽器呢?從第1步的 registerListeners() 可以看到,容器是先注冊了多播器和監聽器后才進行事件的發布,下面是監聽器的注冊:
protected void registerListeners() { // Register statically specified listeners first. //從容器中拿到所有的監聽器添加到多路派發器中,(最開始是沒有的,跳過循環執行下面的步驟) for (ApplicationListener<?> listener : getApplicationListeners()) { getApplicationEventMulticaster().addApplicationListener(listener); } // Do not initialize FactoryBeans here: We need to leave all regular beans // uninitialized to let post-processors apply to them! //若容器中沒有,即上面的遍歷沒有執行,則根據類型獲取組件名稱,然后根據組件名稱獲取對應的組件加入到多路派發器中 String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false); for (String listenerBeanName : listenerBeanNames) { getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); } // Publish early application events now that we finally have a multicaster... Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents; this.earlyApplicationEvents = null; if (earlyEventsToProcess != null) { for (ApplicationEvent earlyEvent : earlyEventsToProcess) { getApplicationEventMulticaster().multicastEvent(earlyEvent); } } }
下面再來看看 getApplicationListeners 方法,發現它是 AbstractApplicationEventMulticaster 類的一個方法,所以在開始詳解 getApplicationListeners 方法前,先來看看 AbstractApplicationEventMulticaster 類是一個起什么作用的類。
從上面類圖可以看到 AbstractApplicationEventMulticaster 它實現了幾個接口,它是一個事件派發器,上文第3點里面講到的初始化派發器獲取的 SimpleApplicationEventMulticaster 其實是繼承自AbstractApplicationEventMulticaster 的。
在看下面的獲取監聽器源碼前,我們先思考一個問題:監聽器如何做到只監聽指定類型的消息?如何要實現,有什么方式?
我們可以先猜測下:
1) 注冊監聽器的時候,將監聽器和消息類型綁定;(該論證可查看下文 addApplicationListener 實現方法的解析)
2) 廣播的時候,按照這條消息的類型去找指定了該類型的監聽器,但不可能每條廣播都去所有監聽器里面找一遍,應該是說廣播的時候會觸發一次監聽器和消息的類型綁定;
好了,接下來我們帶着這些問題和猜測詳細看看 getApplicationListeners 的源碼:
public abstract class AbstractApplicationEventMulticaster implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware { //創建監聽器助手類,用於存放監聽器集合,參數是否是預過濾監聽器 private final ListenerRetriever defaultRetriever = new ListenerRetriever(false); //ListenerCacheKey是基於事件類型和源類型的類作為key用來存儲監聽器助手 final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64); @Nullable private ClassLoader beanClassLoader;//類加載器 //互斥的監聽器助手類 private Object retrievalMutex = this.defaultRetriever; //監聽器助手類(封裝一組特定目標監聽器的幫助類,允許有效地檢索預過濾的監聽器,此助手的實例按照時間類型和源類型緩存) private class ListenerRetriever { //存放事件監聽器,有序、不可重復 public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>(); //存放事件監聽器bean名稱,有序,不可重復 public final Set<String> applicationListenerBeans = new LinkedHashSet<>(); //是否預過濾監聽器 private final boolean preFiltered; public ListenerRetriever(boolean preFiltered) { this.preFiltered = preFiltered; } //獲取事件監聽器 public Collection<ApplicationListener<?>> getApplicationListeners() { //創建一個指定大小的ApplicationListener監聽器List集合 List<ApplicationListener<?>> allListeners = new ArrayList<>( this.applicationListeners.size() + this.applicationListenerBeans.size()); allListeners.addAll(this.applicationListeners); //如果存放監聽器bean name的集合不為空 if (!this.applicationListenerBeans.isEmpty()) { //獲取IOC容器工廠類 BeanFactory beanFactory = getBeanFactory(); for (String listenerBeanName : this.applicationListenerBeans) { try { //獲取指定bean name的監聽器實例 ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); //判定如果是預過濾的監聽器或者集合中不包含監聽器實例則添加到集合中 if (this.preFiltered || !allListeners.contains(listener)) { allListeners.add(listener); } } catch (NoSuchBeanDefinitionException ex) { // Singleton listener instance (without backing bean definition) disappeared - // probably in the middle of the destruction phase } } } if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) { AnnotationAwareOrderComparator.sort(allListeners); } return allListeners; } } //流程1:當所發布的事件類型和事件源類型與Map(retrieverCache)中的key匹配時, //將直接返回value中的監聽器列表作為匹配結果,通常這發生在事件不是第一次發布時,能避免遍歷所有監聽器並進行過濾, //如果事件時第一次發布,則會執行流程2。 protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { //事件源,事件最初發生在其上的對象 Object source = event.getSource(); //事件源class對象 Class<?> sourceType = (source != null ? source.getClass() : null); //緩存的key有兩個維度:消息來源+消息類型(關於消息來源可見ApplicationEvent構造方法的入參) //創建基於事件源和源類型的監聽器助手cacheKey ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType); // Quick check for existing entry on ConcurrentHashMap... // retrieverCache是ConcurrentHashMap對象(並發容器,使用鎖分段來確保多線程下數據安全),所以是線程安全的, // ListenerRetriever中有個監聽器的集合,並有些簡單的邏輯封裝,調用它的getApplicationListeners方法返回的監聽類集合是排好序的(order注解排序) ListenerRetriever retriever = this.retrieverCache.get(cacheKey); //如果retrieverCache中找到對應的監聽器集合,就立即返回了 if (retriever != null) { return retriever.getApplicationListeners(); } //如果類加載器為null,或者事件源在給定的類加載器上下文是安全的並且源類型為null或者源類型在指定上下文是安全的 if (this.beanClassLoader == null || (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) { // Fully synchronized building and caching of a ListenerRetriever //鎖定監聽器助手對象,同步從ListenerRetriever監聽器助手中獲取指定的監聽器 synchronized (this.retrievalMutex) { //搶到鎖之后再做一次判斷,因為有可能在前面BLOCK的時候,另一個搶到鎖的線程已經設置好了緩存, //即避免自己在BLOCK的時候其他線程已經將數據放入緩存了 retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { //返回監聽器助手中存儲的監聽器對象 return retriever.getApplicationListeners(); } retriever = new ListenerRetriever(true); //retrieveApplicationListeners方法實際檢索給定事件和源類型的監聽器 Collection<ApplicationListener<?>> listeners = retrieveApplicationListeners(eventType, sourceType, retriever);//流程2 //retriever放到緩存器中(更新緩存) this.retrieverCache.put(cacheKey, retriever); return listeners; } } else { // No ListenerRetriever caching -> no synchronization necessary // 無ListenerRetriever監聽器助手 -> 無需同步緩存 return retrieveApplicationListeners(eventType, sourceType, null); } } }
從上面的源碼可以看出:
1)在獲取 ApplicationListener 的時候用到了緩存,同時有緩存更新和用鎖來確保線程同步(雙重判斷也做了),這樣如果在自定義廣播時,如果多線程同時發廣播,就不會有線程同步的問題了。
2)發送消息的時候根據類型去找所有對應的監聽器,這樣就可以實現自定義監聽器只接收指定類型的消息。
3)在廣播消息的時刻,如果某個類型的消息在緩存中找不到對應的監聽器集合,就調用 retrieveApplicationListeners 方法去找出符合條件的所有監聽器,然后放入這個集合中。
下面再詳細看看 retrieveApplicationListeners:
private Collection<ApplicationListener<?>> retrieveApplicationListeners( ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever) { //存放匹配的監聽器的列表 List<ApplicationListener<?>> allListeners = new ArrayList<>(); Set<ApplicationListener<?>> listeners; Set<String> listenerBeans; //鎖定監聽器助手對象 synchronized (this.retrievalMutex) { //獲取監聽器助手中存儲的監聽器對象,去重 listeners = new LinkedHashSet<>(this.defaultRetriever.applicationListeners); //獲取監聽器助手中存儲的監聽器bean名稱集合,去重 listenerBeans = new LinkedHashSet<>(this.defaultRetriever.applicationListenerBeans); } // Add programmatically registered listeners, including ones coming // from ApplicationListenerDetector (singleton beans and inner beans). ////遍歷所有的監聽器 for (ApplicationListener<?> listener : listeners) { //判斷監聽器是否匹配的邏輯在supportsEvent(listener, eventType, sourceType)中 if (supportsEvent(listener, eventType, sourceType)) { //監聽器助手類不為空,加入監聽器助手類中的監聽器集合中 if (retriever != null) { retriever.applicationListeners.add(listener); } //放入匹配的監聽器列表中 allListeners.add(listener); } } // Add listeners by bean name, potentially overlapping with programmatically // registered listeners above - but here potentially with additional metadata. //監聽器助手中存儲的監聽器bean名稱集合有值 if (!listenerBeans.isEmpty()) { //獲取IOC容器 ConfigurableBeanFactory beanFactory = getBeanFactory(); //遍歷 for (String listenerBeanName : listenerBeans) { try { //判斷監聽器是否匹配的邏輯在supportsEvent(listener, eventType, sourceType)中 if (supportsEvent(beanFactory, listenerBeanName, eventType)) { //獲取指定bean name的監聽器實例 ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class); //如果匹配的監聽器列表不包含上面獲取的實例和符合supportsEvent的邏輯 if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) { //監聽器助手類不為空 if (retriever != null) { //bean名稱是單例,在容器中沒有重復監聽器 if (beanFactory.isSingleton(listenerBeanName)) { //加入監聽器助手類中的監聽器集合中 retriever.applicationListeners.add(listener); } else { //去重,applicationListenerBeans是LinkedHashSet retriever.applicationListenerBeans.add(listenerBeanName); } } //放入匹配的監聽器列表中 allListeners.add(listener); } } //監聽器是否匹配的邏輯不在supportsEvent(listener, eventType, sourceType)中,從監聽器助手類和匹配的監聽器列表中移除 else { // Remove non-matching listeners that originally came from // ApplicationListenerDetector, possibly ruled out by additional // BeanDefinition metadata (e.g. factory method generics) above. Object listener = beanFactory.getSingleton(listenerBeanName); if (retriever != null) { retriever.applicationListeners.remove(listener); } allListeners.remove(listener); } } catch (NoSuchBeanDefinitionException ex) { // Singleton listener instance (without backing bean definition) disappeared - // probably in the middle of the destruction phase } } } //監聽器進行排序 AnnotationAwareOrderComparator.sort(allListeners); if (retriever != null && retriever.applicationListenerBeans.isEmpty()) { retriever.applicationListeners.clear(); retriever.applicationListeners.addAll(allListeners); } return allListeners; }
下面是 supportsEvent 的源碼探究:
//首先對原始的ApplicationListener進行一層適配器包裝成GenericApplicationListener, //便於后面使用該接口中定義的方法判斷監聽器是否支持傳入的事件類型或事件源類型 protected boolean supportsEvent( ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) { GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ? (GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener)); return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType)); } public interface GenericApplicationListener extends ApplicationListener<ApplicationEvent>, Ordered { //判斷是否支持該事件類型 boolean supportsEventType(ResolvableType eventType); //判斷是否支持該事件源類型,默認是true,也就是說事件源的類型通常對於判斷匹配的監聽器沒有意義 default boolean supportsSourceType(@Nullable Class<?> sourceType) { return true; } } public class GenericApplicationListenerAdapter implements GenericApplicationListener, SmartApplicationListener { ...... //監聽器泛型的實際類型 @Nullable private final ResolvableType declaredEventType; //判斷監聽器是否支持該事件類型,因為我們的監聽器實例通常都不是SmartApplicationListener類型 //eventType是發布的事件的類型 @Override @SuppressWarnings("unchecked") public boolean supportsEventType(ResolvableType eventType) { if (this.delegate instanceof SmartApplicationListener) { Class<? extends ApplicationEvent> eventClass = (Class<? extends ApplicationEvent>) eventType.resolve(); return (eventClass != null && ((SmartApplicationListener) this.delegate).supportsEventType(eventClass)); } else { //this.declaredEventType.isAssignableFrom(eventType)當以下兩種情況返回true // 1.declaredEventType和eventType類型相同 // 2.declaredEventType是eventType的父類型 //只要監聽器泛型的實際類型和發布的事件類型一樣或是它的父類型,則該監聽器將被成功匹配。 return (this.declaredEventType == null || this.declaredEventType.isAssignableFrom(eventType)); } } }
9.下面是流程總結圖:
四.擴展
1.從上面的案例中可以看到,只要 bean 繼承 ApplicationEvent,然后使用容器的 publishEvent 就可以發布事件了(類似上文案例4),那么還有其他的方式使 bean 具有跟容器一樣具有發布事件的能力嗎?
答案當然是有的,比如 ApplicationEventPublisherAware 這個接口就可以使 bean 具有發布事件的能力。下面是該接口的源碼:
public interface ApplicationEventPublisherAware extends Aware { void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher); }
我們可以創建一個bean,實現了 ApplicationEventPublisherAware 接口,那么該 bean 的 setApplicationEventPublisher 方法就會被調用,通過該方法可以接收到 ApplicationEventPublisher 類型的入參,借助這個 ApplicationEventPublisher 就可以發消息了;
比如下面的案例:
1)接口:
public interface UserEventRegisterService { /** * 發布事件,注冊用戶 */ void register(); }
2)接口實現:
@Service public class UserEventRegisterServiceImpl implements UserEventRegisterService { @Resource private ApplicationEventPublisher applicationEventPublisher; @Override public void register() { User user = new User(); user.setId(1); user.setName("張三"); user.setSex("男"); applicationEventPublisher.publishEvent(user); System.out.println("結束。"); } }
3)自定義監聽器:可以使用 @EventListener 注解來實現自定義監聽器(該注解下文詳細介紹)
@Component public class UserEventListener { @EventListener(condition = "#user.id != null")//監聽當用戶id不為空的事件 public void handleEvent(User user){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("監聽器監聽到的信息:"+user); } }
4)配置類:
@ComponentScan("com.hrh.ext") @Configuration public class ExtConfig { }
5)測試:
public static void main(String[] args) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExtConfig.class); UserEventRegisterService service = context.getBean(UserEventRegisterService.class); service.register(); context.close(); } ======運行結果====== 監聽器監聽到的信息:User [id=1, name=張三, sex=男] 結束。
2.監聽器是如何加入到容器中的呢?我們從容器 refresh 刷新開始看起,發現在 refresh 里面有 prepareBeanFactory(beanFactory) 方法,為所有bean准備了一個后置處理器 ApplicationListenerDetector:
public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { // Prepare this context for refreshing. prepareRefresh(); // Tell the subclass to refresh the internal bean factory. ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory(); // Prepare the bean factory for use in this context. prepareBeanFactory(beanFactory); ......... } } protected void prepareBeanFactory(ConfigurableListableBeanFactory beanFactory) { .......... // Register early post-processor for detecting inner beans as ApplicationListeners. beanFactory.addBeanPostProcessor(new ApplicationListenerDetector(this)); .......... }
接下來再看看 ApplicationListenerDetector類的源碼,由於它是一個后置處理器(不清楚后置處理器的可查看上篇文章,開頭有簡單介紹),所以它有 postProcessBeforeInitialization 和 postProcessAfterInitialization 兩個方法,這兩個方法在對所有的 bean 進行實例化后進行攔截操作:
private final transient AbstractApplicationContext applicationContext; private final transient Map<String, Boolean> singletonNames = new ConcurrentHashMap<>(256); @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) { //如果bean是ApplicationListener類型 if (bean instanceof ApplicationListener) { // potentially not detected as a listener by getBeanNamesForType retrieval //判斷bean在不在並發容器中 Boolean flag = this.singletonNames.get(beanName); if (Boolean.TRUE.equals(flag)) { // singleton bean (top-level or inner): register on the fly //注冊監聽器,其實就是保存在成員變量applicationEventMulticaster的成員變量defaultRetriever的集合applicationListeners中 this.applicationContext.addApplicationListener((ApplicationListener<?>) bean); } else if (Boolean.FALSE.equals(flag)) { if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) { // inner bean with other scope - can't reliably process events logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " + "but is not reachable for event multicasting by its containing ApplicationContext " + "because it does not have singleton scope. Only top-level listener beans are allowed " + "to be of non-singleton scope."); } this.singletonNames.remove(beanName); } } return bean; } public void addApplicationListener(ApplicationListener<?> listener) { Assert.notNull(listener, "ApplicationListener must not be null"); if (this.applicationEventMulticaster != null) { this.applicationEventMulticaster.addApplicationListener(listener); } this.applicationListeners.add(listener); }
如上所示,如果當前 bean 實現了 ApplicationListener 接口,就會調用 this.applicationContext.addApplicationListener 方法將當前 bean 注冊到 applicationContext 的監聽器集合中,后面有廣播就直接找到這些監聽器,調用每個監聽器的 onApplicationEvent 方法;
接下來詳細看看 addApplicationListener 方法,它的實現方法在 AbstractApplicationEventMulticaster 類中:
@Override public void addApplicationListener(ApplicationListener<?> listener) { //鎖定監聽器助手對象 synchronized (this.retrievalMutex) { // Explicitly remove target for a proxy, if registered already, // in order to avoid double invocations of the same listener. // 如果已經注冊,則顯式刪除已經注冊的監聽器對象 // 為了避免調用重復的監聽器對象 Object singletonTarget = AopProxyUtils.getSingletonTarget(listener); if (singletonTarget instanceof ApplicationListener) { //如果因為AOP導致創建了監聽類的代理,那么就要在注冊列表中清除代理類 this.defaultRetriever.applicationListeners.remove(singletonTarget); } //把監聽器加入集合defaultRetriever.applicationListeners中,這是個LinkedHashSet實例 this.defaultRetriever.applicationListeners.add(listener); //清空監聽器助手緩存Map this.retrieverCache.clear(); } }
在上面可以看到,如果對象是由 AOP生成的代理類,需要清除,是因為 AOP 是通過代理技術實現的,此時可能通過 CGLIB 生成了監聽類的代理類,此類的實例如果被注冊到監聽器集合中,那么廣播時按照消息類型就會取出兩個監聽器實例來,到時就是一個消息被兩個實例消費了,因此需要先清理掉代理類。
同時也論證了一個觀點:所謂的注冊監聽器,其實就是把 ApplicationListener 的實現類放入一個LinkedHashSet的集合,此處沒有任何與消息類型相關的操作,因此,監聽器注冊的時候並沒有將消息類型和監聽器綁定;
3.異步監聽事件案例:正常的事件通知是 ContextRefreshedEvent --> EmailEvent --> ContextClosedEvent
//開啟異步支持 @EnableAsync @Component public class EmailListener implements ApplicationListener { @Async//異步執行 @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof EmailEvent) { EmailEvent emailEvent = (EmailEvent) event; System.out.println("郵件地址:" + emailEvent.getAddress()); System.out.println("郵件內容:" + emailEvent.getText()); } else { System.out.println("容器本身事件:" + event); } //通過線程名稱區別 System.out.println(event+ ":" + Thread.currentThread().getName()); } } =======測試運行結果:從下面的運行結果可以看出是異步執行了======= 郵件地址:249968839@qq.com 郵件內容:This is a event test com.hrh.ext.EmailEvent[source=hello]:SimpleAsyncTaskExecutor-2 容器本身事件:org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Mon Nov 09 22:29:09 CST 2020] 容器本身事件:org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Mon Nov 09 22:29:09 CST 2020] org.springframework.context.event.ContextRefreshedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Mon Nov 09 22:29:09 CST 2020]:SimpleAsyncTaskExecutor-1 org.springframework.context.event.ContextClosedEvent[source=org.springframework.context.annotation.AnnotationConfigApplicationContext@2e5d6d97, started on Mon Nov 09 22:29:09 CST 2020]:SimpleAsyncTaskExecutor-3
5.Spring事務監聽機制---使用 @TransactionalEventListener 處理數據庫事務提交成功后再執行操作
在項目中,往往需要執行數據庫操作后,發送消息或事件來異步調用其他組件執行相應的操作,例如:用戶注冊后發送激活碼、配置修改后發送更新事件等。
但是,數據庫的操作如果還未完成,此時異步調用的方法查詢數據庫發現沒有數據,這就會出現問題。
如下面偽代碼案例:
void saveUser(User u) { //保存用戶信息 userDao.save(u); //觸發保存用戶事件 applicationContext.publishEvent(new SaveUserEvent(u.getId())); } @EventListener void onSaveUserEvent(SaveUserEvent event) { //獲取事件中的信息(用戶id) Integer id = event.getEventData(); //查詢數據庫,獲取用戶(此時如果用戶還未插入數據庫,則返回空) User u = userDao.getUserById(id); //這里可能報空指針異常! String phone = u.getPhoneNumber(); MessageUtils.sendMessage(phone); }
為了解決上述問題,Spring為我們提供了兩種方式: @TransactionalEventListener 注解 和 事務同步管理器 TransactionSynchronizationManager,以便我們可以在事務提交后再觸發某一事件。
@TransactionalEventListener 注解的使用案例:只有當前事務提交之后,才會執行事件監聽器的方法
//phase默認為AFTER_COMMIT,共有四個枚舉:BEFORE_COMMIT,AFTER_COMMIT,AFTER_ROLLBACK,AFTER_COMPLETION
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) void onSaveUserEvent(SaveUserEvent event) { Integer id = event.getEventData(); User u = userDao.getUserById(id); String phone = u.getPhoneNumber(); MessageUtils.sendMessage(phone); }
TransactionSynchronizationManager 的使用案例:@TransactionalEventListener底層下面這樣來實現的
@EventListener void onSaveUserEvent(SaveUserEvent event) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { Integer id = event.getEventData(); User u = userDao.getUserById(id); String phone = u.getPhoneNumber(); MessageUtils.sendMessage(phone); } }); }