事件機制-Spring 源碼系列(4)


事件機制-Spring 源碼系列(4)

 

目錄:

Ioc容器beanDefinition-Spring 源碼(1)

Ioc容器依賴注入-Spring 源碼(2)

Ioc容器BeanPostProcessor-Spring 源碼(3)

事件機制-Spring 源碼(4)

AOP執行增強-Spring 源碼系列(5)

 

 

ApplicationEvent 事件抽象類
ApplicationListener 監聽器接口
ApplicationContext 事件源
事件源觸發事件后,將事件通知給監聽器,監聽器執行相應邏輯的過程
 
使用簡單的實現:
事件:
public class EatEvent extends ApplicationEvent {

    private String status;

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public EatEvent(Object source) {
        super(source);
    }
}

監聽器:

public class MeListener implements ApplicationListener<EatEvent> {

    public void onApplicationEvent(EatEvent event) {
        System.out.println("收到通知,可以去吃飯了");
    }
}

觸發事件:

public class TestDo implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    public void doTest(){
        applicationContext.publishEvent(new EatEvent(this));
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

以上代碼是實際spring項目中經常會用到的,利用spring的事件機制,可以解耦各個具體監聽器,在變化的需求中通過增減監聽器來減少具體實現的改動。

spring核心是管理bean,而對於這種事件機制,天然有了比較好的實現基礎,可以想象這些事件bean在初始化時已經被管理器加入到某個注冊表里了,然后事件觸發時,就要找容器觸發。

網上找的完整的相關類圖:

 

 

源碼實現部分:

首先我們在創建一個Listener的時候,需要把這個bean交給容器管理,由EventMulticaster來管理,從applicationContext.publishEvent(new EatEvent("”))為入口來看源碼。

public void publishEvent(ApplicationEvent event) {
   publishEvent(event, null);
}
protected void publishEvent(Object event, ResolvableType eventType) {
   Assert.notNull(event, "Event must not be null");
   if (logger.isTraceEnabled()) {
      logger.trace("Publishing event in " + getDisplayName() + ": " + event);
   }

   // Decorate event as an ApplicationEvent if necessary
   ApplicationEvent applicationEvent;
   if (event instanceof ApplicationEvent) {
      applicationEvent = (ApplicationEvent) event;
   }
   else {
      applicationEvent = new PayloadApplicationEvent<Object>(this, event);
      if (eventType == null) {
         eventType = ResolvableType.forClassWithGenerics(PayloadApplicationEvent.class, event.getClass());
      }
   }

   // Multicast right now if possible - or lazily once the multicaster is initialized
   if (this.earlyApplicationEvents != null) {
      this.earlyApplicationEvents.add(applicationEvent);
   }
   else {
      // 獲取ApplicationEventMulticaster
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
   }

   // Publish event via parent context as well...
   if (this.parent != null) {
      if (this.parent instanceof AbstractApplicationContext) {
         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
      }
      else {
         this.parent.publishEvent(event);
      }
   }
}

getApplicationEventMulticaster拿預備好的事件廣播器,可以使用自己實現的事件廣播器,初始化是在AbstractApplicationContext.refresh方法觸發initApplicationEventMulticaster():

protected void initApplicationEventMulticaster() {
   ConfigurableListableBeanFactory beanFactory = getBeanFactory();
   // 取applicationEventMulticaster名的bean,如果沒有,就用框架的SimpleApplicationEventMulticaster,也就是說我們可以自定義一個bean來擴展
   if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
      this.applicationEventMulticaster =
            beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
      if (logger.isDebugEnabled()) {
         logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
      }
   }
   else {
      this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
      beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
      if (logger.isDebugEnabled()) {
         logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
               APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
               "': using default [" + this.applicationEventMulticaster + "]");
      }
   }
}

SimpleApplicationEventMulticaster的multicastEvent(applicationEvent, eventType);方法:

public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
   ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
   for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
      Executor executor = getTaskExecutor();
      if (executor != null) {
         executor.execute(new Runnable() {
            @Override
            public void run() {
               invokeListener(listener, event);
            }
         });
      }
      else {
         invokeListener(listener, event);
      }
   }
}

getApplicationListeners方法來獲取對應的監聽者:

protected Collection<ApplicationListener<?>> getApplicationListeners(
      ApplicationEvent event, ResolvableType eventType) {

   Object source = event.getSource();
   Class<?> sourceType = (source != null ? source.getClass() : null);
   ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);

   // Quick check for existing entry on ConcurrentHashMap...
   ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
   if (retriever != null) {
      return retriever.getApplicationListeners();
   }

   if (this.beanClassLoader == null ||
         (ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
               (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
      // Fully synchronized building and caching of a ListenerRetriever
      synchronized (this.retrievalMutex) {
         retriever = this.retrieverCache.get(cacheKey);
         if (retriever != null) {
            return retriever.getApplicationListeners();
         }
         retriever = new ListenerRetriever(true);
        // 獲取監聽者
         Collection<ApplicationListener<?>> listeners =
               retrieveApplicationListeners(eventType, sourceType, retriever);
        // 進緩存
         this.retrieverCache.put(cacheKey, retriever);
         return listeners;
      }
   }
   else {
      // No ListenerRetriever caching -> no synchronization necessary
      return retrieveApplicationListeners(eventType, sourceType, null);
   }
}

retrieveApplicationListeners需要從容器中過濾出對應的監聽者的bean:

private Collection<ApplicationListener<?>> retrieveApplicationListeners(
      ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {

   LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
   Set<ApplicationListener<?>> listeners;
   Set<String> listenerBeans;
   synchronized (this.retrievalMutex) {
      listeners = new LinkedHashSet<ApplicationListener<?>>(this.defaultRetriever.applicationListeners);
      listenerBeans = new LinkedHashSet<String>(this.defaultRetriever.applicationListenerBeans);
   }
    // 遍歷全部監聽者,過濾出匹配的
   for (ApplicationListener<?> listener : listeners) {
      if (supportsEvent(listener, eventType, sourceType)) {
         if (retriever != null) {
            retriever.applicationListeners.add(listener);
         }
         allListeners.add(listener);
      }
   }
   if (!listenerBeans.isEmpty()) {
      BeanFactory beanFactory = getBeanFactory();
      for (String listenerBeanName : listenerBeans) {
         try {
            Class<?> listenerType = beanFactory.getType(listenerBeanName);
            if (listenerType == null || supportsEvent(listenerType, eventType)) {
                // 就是這行代碼從容器中獲取
               ApplicationListener<?> listener =
                     beanFactory.getBean(listenerBeanName, ApplicationListener.class);
               if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
                  if (retriever != null) {
                     retriever.applicationListenerBeans.add(listenerBeanName);
                  }
                  allListeners.add(listener);
               }
            }
         }
         catch (NoSuchBeanDefinitionException ex) {
            // Singleton listener instance (without backing bean definition) disappeared -
            // probably in the middle of the destruction phase
         }
      }
   }
   AnnotationAwareOrderComparator.sort(allListeners);
   return allListeners;
}
事實上,容器預先將監聽者的bean全部注冊到了defaultRetriever.applicationListeners,每次出發publish時,來遍歷過濾出后緩存起來。這個注冊的操作也是在AbstractApplicationContext.refresh方法中的registerListeners();
最下面的AnnotationAwareOrderComparator.sort用來排序監聽者的執行順序。繼承Ordered即可。
這里我們可以回顧一下這個refresh方法的具體代碼。
以上已經基本看完了怎么講監聽器的獲取,再來看一下執行方法的觸發,回到SimpleApplicationEventMulticaster的multicastEvent(applicationEvent, eventType);
這里牽涉到同步執行或異步執行這些監聽器的問題,默認spring是同步執行的,那么在實際場景中我們會因為監聽者執行影響住流程,采用異步的方式,如果沒有閱讀過源碼,采取的方式可能會使用在publish的時候進行異步化。
但是這里注意到,如果是publish的時候進行異步操作,異步線程在執行多個監聽者時依然需要一個個去執行。
我們想到多個監聽器如果可以並行執行,是可以提高執行效率的,那么這里就有個擴展的入口了那就是通過繼承AbstractApplicationEventMulticaster自定義一個廣播器。
public class AsyncApplicationEventMulticaster extends AbstractApplicationEventMulticaster {
    private TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = (taskExecutor != null ? taskExecutor : new SimpleAsyncTaskExecutor());
    }

    protected TaskExecutor getTaskExecutor() {
        return this.taskExecutor;
    }

    @SuppressWarnings("unchecked")
    public void multicastEvent(final ApplicationEvent event) {
        for (Iterator<ApplicationListener> it = getApplicationListeners().iterator(); it.hasNext();) {
            final ApplicationListener listener =  it.next();
            getTaskExecutor().execute(new Runnable() {
                public void run() {
                    listener.onApplicationEvent(event);
                }
            });
        }
    }
}

實現的時候也可以通過繼承SimpleApplicationEventMulticaster的方式來完成,例子如下:

public class AsyncApplicationEventMulticaster extends SimpleApplicationEventMulticaster {  
    private TaskExecutor taskExecutor = new TaskExecutor() {  
        ExecutorService exeserv = Executors.newCachedThreadPool();  
        public void execute(Runnable task) {  
            exeserv.execute(task);  
        }  
    };  
  
    protected TaskExecutor getTaskExecutor() {  
        return this.taskExecutor;  
    }  
} 

上面提到的擴展點就是自己定義一個id=applicationEventMulticaster的bean,就可以自定義廣播器了。

<bean id="applicationEventMulticaster" class="com.x.y.z.AsyncApplicationEventMulticaster" />

再補充一個,我們看到這個applicationEventMulticaster bean的意味着spring容器中定義的所有監聽器都會被自定義的廣播器來廣播,單純實現異步並不是好的實現,如果有不能異步執行的呢,所以在自定義的廣播器里的實現代碼有必要配合監聽器的信息進行一些篩選的工作。

invokeListener來執行onApplicationEvent方法:

protected void invokeListener(ApplicationListener listener, ApplicationEvent event) {
        ErrorHandler errorHandler = getErrorHandler();
        if (errorHandler != null) {
            try {
                listener.onApplicationEvent(event);
            }
            catch (Throwable err) {
                errorHandler.handleError(err);
            }
        }
        else {
            listener.onApplicationEvent(event);
        }
    }

到這里,就執行到了onApplicationEvent方法。

另外,回到最前面的例子中,注意EatEvent中那個source屬性,代表來源的意思,再調用publish方法時將this傳入,那么在篩選監聽者的時候,就可以判斷是哪個來源的bean發起的通知,再進行一次篩選是否執行的邏輯,如此就是監聽者可以過濾事件源了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM