歡迎大家關注我的個人博客—精靈王,獲取更好的閱讀體驗以及最新的文章分享~
簡介
在JAVA體系中,有支持實現事件監聽機制,在Spring 中也專門提供了一套事件機制的接口,方便我們實現。比如我們可以實現當用戶注冊后,給他發送一封郵件告訴他注冊成功的一些信息,比如用戶訂閱的主題更新了,通知用戶注意及時查看等。
觀察者模式
觀察者模式還有很多其他的稱謂,如發布-訂閱(Publish/Subscribe)模式、模型-視圖(Model/View)模式、源-監聽器(Source/Listener)模式或從屬者(Dependents)模式。觀察者模式定義了一種一對多的依賴關系,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態上發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。
觀察者模式一般包含以下幾個對象:
Subject:
被觀察的對象。它提供一系列方法來增加和刪除觀察者對象,同時它定義了通知方法notify()。目標類可以是接口,也可以是抽象類或具體類。
ConcreteSubject:
具體的觀察對象。Subject的具體實現類,在這里實現通知事件。
Observer:
觀察者。這里是抽象的觀察者,觀察者有一個或者多個。
ConcreteObserver:
具體的觀察者。在這里維護觀察對象的具體操作。
Java 中的事件機制
Java中提供了基本的事件處理基類:
- EventObject:所有事件狀態對象都將從其派生的根類;
- EventListener:所有事件偵聽器接口必須擴展的標記接口;
非常經典的開門案例:
一、創建事件對象
@Getter
@Setter
public class DoorEvent extends EventObject{
int state;
public DoorEvent(Object source){
super(source);
}
public DoorEvent(Object source,int state){
super(source);
this.state = state;
}
}
二、事件監聽器
public interface DoorListener extends EventListener{
void doorEvent(DoorEvent doorEvent);
}
public class CloseDoorEvent implements DoorListener{
@Override
public void doorEvent(DoorEvent doorEvent){
if(doorEvent.getState() == -1){
System.out.println("門關上了");
}
}
}
public class OpenDoorListener implements DoorListener{
@Override
public void doorEvent(DoorEvent doorEvent){
if(doorEvent.getState() == 1){
System.out.println("門打開了");
}
}
}
三、測試
public static void main(String[] args){
List<DoorListener> list = new ArrayList<>();
list.add(new OpenDoorListener());
list.add(new CloseDoorEvent());
for(DoorListener listener : list){
listener.doorEvent(new DoorEvent(-1,-1));
listener.doorEvent(new DoorEvent(1,1));
}
}
四、輸出結果
門打開了
門關上了
Spring 中的事件機制
在 Spring 容器中通過ApplicationEven
類和 ApplicationListener
接口來實現事件監聽機制,每次Event 被發布到Spring容器中時都會通知該Listener。需要注意的是,Spring 的事件默認是同步的,調用 publishEvent
方法發布事件后,它會處於阻塞狀態,直到Listener接收到事件並處理返回之后才繼續執行下去。
代碼示例:
一、定義事件對象
@Getter
@Setter
@ToString
public class UserDTO extends ApplicationEvent{
private Integer userId;
private String name;
private Integer age;
public UserDTO(Object source){
super(source);
}
}
二、定義事件監聽器,可以通過注解或者實現接口來實現。
@Component
public class UserRegisterSmsListener{
// 通過注解實現監聽器
@EventListener
public void handleUserEvent(UserDTO userDTO){
System.out.println("監聽到用戶注冊,准備發送短信,user:"+userDTO.toString());
}
}
// 通過實現接口實現監聽器
@Component
public class UserRegisterEmailListener implements ApplicationListener<UserDTO>{
@Override
public void onApplicationEvent(UserDTO userDTO){
System.out.println("監聽到用戶注冊,准備發送郵件,user:" + userDTO.toString());
}
}
@Component
public class UserRegisterMessageListener implements ApplicationListener<UserDTO>{
@Override
public void onApplicationEvent(UserDTO userDTO){
System.out.println("監聽到用戶注冊,給新用戶發送首條站內短消息,user:" + userDTO.toString());
}
}
三、注冊服務
public interface UserService{
void register();
}
@Service
public class UserServiceImpl implements UserService{
@Autowired
private ApplicationEventPublisher eventPublisher;
@Override
public void register(){
UserDTO userDTO = new UserDTO(this);
userDTO.setAge(18);
userDTO.setName("精靈王jinglingwang.cn");
userDTO.setUserId(1001);
System.out.println("register user");
eventPublisher.publishEvent(userDTO);
}
}
四、測試
@Autowired
private UserService userService;
@Test
public void testUserEvent(){
userService.register();
}
五、輸出結果
register user
監聽到用戶注冊,准備發送短信,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,准備發送郵件,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
指定監聽器的順序
監聽器的發布順序是按照 bean 自然裝載的順序執行的,Spring 支持兩種方式來實現有序
一、實現SmartApplicationListener接口指定順序。
把上面三個Listener都改成實現SmartApplicationListener接口,並指定getOrder的返回值,返回值越小,優先級越高。
@Component
public class UserRegisterMessageListener implements SmartApplicationListener{
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType){
return eventType == UserDTO.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType){
return true;
}
@Override
public void onApplicationEvent(ApplicationEvent event){
System.out.println("監聽到用戶注冊,給新用戶發送首條站內短消息,user:" + event.toString());
}
@Override
public int getOrder(){
return -1;
}
}
另外兩個監聽器的改造省略,指定改造后的UserRegisterSmsListener返回order為0,UserRegisterEmailListener的getOrder返回1,測試輸出結果如下:
register user
監聽到用戶注冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,准備發送短信,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,准備發送郵件,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
二、使用注解@Order()
@Component
public class UserRegisterSmsListener{
@Order(-2)
@EventListener
public void handleUserEvent(UserDTO userDTO){
System.out.println("監聽到用戶注冊,准備發送短信,user:"+userDTO.toString());
}
}
測試輸出結果如下:
register user
監聽到用戶注冊,准備發送短信,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
監聽到用戶注冊,准備發送郵件,user:UserDTO(userId=1001, name=精靈王jinglingwang.cn, age=18)
可以發現,短信監聽器最先執行。
異步支持
Spring 事件機制默認是同步阻塞的,如果 ApplicationEventPublisher 發布事件之后他會一直阻塞等待listener 響應,多個 listener 的情況下前面的沒有執行完后面的會一直被阻塞。這時候我們可以利用 Spring 提供的線程池注解 @Async
來實現異步線程
一、使用 @Async 之前需要先開啟線程池,在 啟動類上添加 @EnableAsync 注解即可。
@EnableAsync
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
二、監聽器使用異步線程
自定義異步線程池
@Configuration
public class AsyncConfig{
@Bean("asyncThreadPool")
public Executor getAsyncExecutor(){
System.out.println("asyncThreadPool init");
Executor executor = new ThreadPoolExecutor(
10,20,60L,TimeUnit.SECONDS
,new ArrayBlockingQueue<>(100),new MyThreadFactory());
return executor;
}
class MyThreadFactory implements ThreadFactory{
final AtomicInteger threadNumber = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r){
Thread t = new Thread(r);
t.setName("async-thread-"+threadNumber.getAndIncrement());
t.setDaemon(true);
return t;
}
}
}
指定監聽器的線程池
@Component
public class UserRegisterSmsListener{
@Order(-2)
@Async("asyncThreadPool")
@EventListener
public void handleUserEvent(UserDTO userDTO){
System.out.println(Thread.currentThread().getName() + " 監聽到用戶注冊,准備發送短信,user:"+userDTO.toString());
}
}
三、測試輸出結果
register user
監聽到用戶注冊,給新用戶發送首條站內短消息,user:UserDTO(userId=1001, name=admol, age=18)
監聽到用戶注冊,准備發送郵件,user:UserDTO(userId=1001, name=admol, age=18)
async-thread-0 監聽到用戶注冊,准備發送短信,user:UserDTO(userId=1001, name=admol, age=18)
Spring事件機制原理分析
Spring事件機制涉及的重要類主要有以下四個:
ApplicationEvent:
事件對象,繼承至JDK的類EventObject
,可以攜帶事件的時間戳
ApplicationListener:
事件監聽器,繼承至JDK的接口EventListener
,該接口被所有的事件監聽器實現,比如支持指定順序的SmartApplicationListener
ApplicationEventMulticaster:
事件管理者,管理監聽器和發布事件,ApplicationContext通過委托ApplicationEventMulticaster來 發布事件
ApplicationEventPublisher:
事件發布者,該接口封裝了事件有關的公共方法,作為ApplicationContext的超級街廓,也是委托 ApplicationEventMulticaster完成事件發布。
源碼展示
ApplicationEvent
事件對象ApplicationEvent的主要源代碼如下,繼承了JAVA的 EventObject 對象:
public abstract class ApplicationEvent extends EventObject {
private static final long serialVersionUID = 7099057708183571937L;
private final long timestamp; // 多了一個時間戳屬性
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis(); // 初始當前化時間戳
}
public final long getTimestamp() {
return this.timestamp;
}
}
從上面ApplicationEvent的子類關系圖種可以發現,ApplicationEvent有一個重要的子類ApplicationContextEvent
,而ApplicationContextEvent又有4個重要的子類ContextStartedEvent
、ContextRefreshedEvent
、ContextClosedEvent
、ContextStoppedEvent
。
從名字就可以看出,這4個事件都和Spring容器有關系的:
- ContextRefreshedEvent:當spring容器context刷新時觸發
- ContextStartedEvent:當spring容器context啟動后觸發
- ContextStoppedEvent:當spring容器context停止時觸發
- ContextClosedEvent:當spring容器context關閉時觸發,容器被關閉時,其管理的所有單例Bean都被銷毀。
當每個事件觸發時,相關的監聽器就會監聽到相應事件,然后觸發onApplicationEvent
方法。
ApplicationListener
事件監聽器,繼承DK的接口EventListener
/* ...
* @author Rod Johnson
* @author Juergen Hoeller
* @param <E> the specific ApplicationEvent subclass to listen to
* @see org.springframework.context.event.ApplicationEventMulticaster
*/
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event. by jinglingwang.cn
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}
注釋@param <E> the specific ApplicationEvent subclass to listen to@see ApplicationEventMulticaster
里面說明了事件的廣播在ApplicationEventMulticaster
類。
ApplicationEventMulticaster
ApplicationEventMulticaster
是一個接口,負責管理監聽器和發布事件,定義了如下方法:
addApplicationListener(ApplicationListener<?> listener)
:新增一個listener;addApplicationListenerBean(String listenerBeanName)
:新增一個listener,參數為bean name;removeApplicationListener(ApplicationListener<?> listener)
:刪除listener;void removeAllListeners()
:刪除所有的ListenerremoveApplicationListenerBean(String listenerBeanName)
:根據bean name 刪除listener;multicastEvent(ApplicationEvent event)
:廣播事件;multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType)
:廣播事件,指定事件的source類型。
AbstractApplicationEventMulticaster 實現了 ApplicationEventMulticaster接口,SimpleApplicationEventMulticaster 繼承了AbstractApplicationEventMulticaster ;
-
AbstractApplicationEventMulticaster 主要實現了管理監聽器的方法(上面接口的前5個方法)
-
SimpleApplicationEventMulticaster 主要實現了事件廣播相關的方法(上面接口的最后2個方法)
兩個類分別繼承了部分上面的方法。
一、先看新增Listener方法實現邏輯:
public abstract class AbstractApplicationEventMulticaster
implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {
private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);
...
@Override
public void addApplicationListener(ApplicationListener<?> listener) {
synchronized (this.retrievalMutex) { // 加排他鎖
// Explicitly remove target for a proxy, if registered already,
// in order to avoid double invocations of the same listener.
Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
if (singletonTarget instanceof ApplicationListener) {
// 刪除,避免重復調用
this.defaultRetriever.applicationListeners.remove(singletonTarget);
}
// 加入到Set LinkedHashSet 集合中
this.defaultRetriever.applicationListeners.add(listener);
this.retrieverCache.clear(); // 緩存
}
}
...
}
最核心的一句代碼:this.defaultRetriever.applicationListeners.add(listener);
ListenerRetriever類是AbstractApplicationEventMulticaster類的內部類,里面有兩個集合,用來記錄維護事件監聽器。
private class ListenerRetriever {
public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();
public final Set<String> applicationListenerBeans = new LinkedHashSet<>();
...
}
這就和設計模式中的發布訂閱模式一樣了,維護一個List,用來管理所有的訂閱者,當發布者發布消息時,遍歷對應的訂閱者列表,執行各自的回調handler。
二、看SimpleApplicationEventMulticaster類實現的廣播事件邏輯:
@Override
public void multicastEvent(ApplicationEvent event) {
multicastEvent(event, resolveDefaultEventType(event)); // 繼續調用下面的廣播方法
}
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 遍歷監聽器列表
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) { // 是否指定了線程池
executor.execute(new Runnable() {
@Override
public void run() { // 線程池執行
invokeListener(listener, event);
}
});
}
else { // 普通執行
invokeListener(listener, event);
}
}
}
代碼分析:
- 首先根據事件類型,獲取事件監聽器列表:
getApplicationListeners(event, type)
- 遍歷監聽器列表,for循環
- 判斷是否有線程池,如果有,在線程池執行
- 否則直接執行
我們再看看 invokeListener
方法的邏輯:
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) { // 是否有錯誤處理
try {
doInvokeListener(listener, event);
} catch (Throwable err) {
errorHandler.handleError(err);
}
} else {
doInvokeListener(listener, event); // 直接執行
}
}
核心邏輯就是繼續調用doInvokeListener
方法:
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);// 執行監聽器事件
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || msg.startsWith(event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
發現最后實際就是調用的 listener.onApplicationEvent(event);
也就是我們通過實現接口ApplicationListener的方式來實現監聽器的onApplicationEvent實現邏輯。
ApplicationEventPublisher類
在我們的發布事件邏輯代碼的地方,通過查看 eventPublisher.publishEvent(userDTO);
方法可以發現ApplicationEventPublisher是一個接口,publishEvent方法的邏輯實現主要在類AbstractApplicationContext中:
public abstract class AbstractApplicationContext extends DefaultResourceLoader
implements ConfigurableApplicationContext, DisposableBean {
...
private Set<ApplicationEvent> earlyApplicationEvents;
...
@Override
public void publishEvent(ApplicationEvent event) {
publishEvent(event, null); // 調用下面的方法
}
// 發布事件主要邏輯
protected void publishEvent(Object event, ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
// 事件裝飾為 ApplicationEvent
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
} else {
applicationEvent = new PayloadApplicationEvent<Object>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// 容器啟動的時候 earlyApplicationEvents 可能還沒有初始化
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent); // 加入到集合,同一廣播
} else {
// 還沒初始化,直接廣播事件
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// 通過父上下文發布事件.
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
...
}
這段代碼的主要邏輯在這:
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
可以發現earlyApplicationEvents也是一個Set集合,如果這個集合已經初始化了,就把事件加入到集合中,否則直接調用multicastEvent執行事件監聽邏輯。
我們跟蹤找到初始化這個集合的地方,發現在方法protected void prepareRefresh()
中:
protected void prepareRefresh() {
this.startupDate = System.currentTimeMillis();
this.closed.set(false);
this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("Refreshing " + this);
}
initPropertySources();
getEnvironment().validateRequiredProperties();
**this.earlyApplicationEvents = new LinkedHashSet<ApplicationEvent>();**
}
繼續跟蹤調用這個方法的地方,發現在AbstractApplicationContext.refresh()
方法中,而這個方法是Spring容器初始化必須要調用的過程,非常的重要。
那在什么地方使用到了這個集合呢?我們繼續跟蹤發現在 protected void registerListeners()
方法中,代碼如下:
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them! jinglingwang.cn
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// 拿到集合引用
Set<ApplicationEvent> ****earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null; // 把之前的集合置為null
if (earlyEventsToProcess != null) { // 如果集合不為空,則廣播里面的事件
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
邏輯是先獲得該集合的引用,然后置空之前的集合,然后遍歷集合,進行廣播事件multicastEvent
,這個方法的邏輯上面已經說過了。
而registerListeners這個方法是在什么時候調用的呢?通過跟蹤發現也是在AbstractApplicationContext.refresh()
方法中。
只不過基本是在方法邏輯的最后,也就是Spring已經容器初始化完成了。
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
**prepareRefresh**();
....
try {
onRefresh();
// Check for listener beans and register them.
**registerListeners**();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
**finishRefresh**();
}
catch (BeansException ex) {
...
}
finally {
...
}
}
}
容器初始化之前和之后都有可能進行廣播事件。
總結
- 事件監聽機制和觀察者模式非常相似
- JDK 也有實現提供事件監聽機制
- Spring 的事件機制也是基於JDK 來擴展的
- Spring 的事件機制默認是同步阻塞的
- Spring 容器初始化前后都可能進行廣播事件
個人博客:精靈王