從spring源碼汲取營養:模仿spring事件發布機制,解耦業務代碼


前言

最近在項目中做了一項優化,對業務代碼進行解耦。我們部門做的是警用系統,通俗的說,可理解為110報警。一條警情,會先后經過接警員、處警調度員、一線警員,警情是需要記錄每一步的日志,是要可追溯的,比如報警人張小三在2019-12-02 00:02:01時間報警,接警員A在1分鍾后,將該警情記錄完成,並分派給處警調度員B,調度員B在5分鍾后,分派給一線警員C,C趕到現場后,花了1個小時處理完成。

這中間,每一個接口,需要做的事情,可能就包括了:警情日志記錄;警員當前任務數統計,包括待處置的任務和已經處置完成的任務;我們其實還有一個操作,就是發mq,去通知其他相關人,比如接警員A接警完成后,要發mq通知其主管。

以前的代碼可能是這樣的:

## 接口1里, 接收警情service里完成以下操作
void 接收警情(xxxReqVo reqVo){
  1:寫庫
  2:記錄警情跟蹤日志
  3:增加當前接警員的接警數
  4:發mq通知其他相關人
}

##接口2里,分派警情的service里完成以下操作
void 分派警情(xxxReqVo reqVo){
  1:寫庫
  2:記錄警情跟蹤日志
  3:增加當前處警調度警員的處警數
  4:發mq通知其他相關人
}

這樣的問題是什么呢?

  1. 在每一個相關接口里,都要“顯式”調用:記錄跟蹤日志的相關方法、統計相關的方法、發mq相關的方法;但凡有一個地方忘記了,都會導致問題,比如統計數量不准,mq忘發,跟蹤日志遺漏等。
  2. 業務邏輯和這類通用業務揉在一起,假設下次又需要給報警人發個短信,豈不是又得去改核心代碼?這不符合我們“對修改關閉,對擴展開放”的開閉原則啊;假設腦殘的產品經理,這次說要給報警人發短信,過兩天又不要了,難道每個接口,挨個挨個改嗎,想想都想打死產品經理,但是這個又犯法,還是想想其他辦法?

這個問題,我們可以用類似mq的方法來解決,即,發送消息,各個消費者去消費。一般,mq的方式適用於微服務之間,而我們這里,將使用事件-發布機制來解決這個問題。
源碼地址(直接dubug跟一下,很簡單,比看文章來得快):
https://gitee.com/ckl111/spring-event-publish-demo

先說說ApplicationListener

spring boot之前的spring 時代,想必一些同學用過org.springframework.context.ApplicationListener,正好我手里有一個老項目,也用到了這個東西,我就拿這個舉個例子:

在我們的項目中,需要在啟動后,初始化一些東西,比如預熱緩存,最早的代碼呢,可能是大家各自實現org.springframework.beans.factory.InitializingBean,但是這樣呢,初始化代碼散落在各個service中;還有一些直接使用@PostContruct注解,然后在對應方法里去完成一些初始化操作。但是總體來說,這些方式,在spring的啟動過程中,被調用的時機比較靠前,有些候某些bean可能還沒初始化完成,而導致一些奇怪的問題。

所以,我們后來統一去掉了這些初始化代碼,全部采用以下機制來實現:

import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

@Service
public class InitRunner implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private InitService initService;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        //root application context,因為是web項目,
        if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
              initService.init();
        }
 }

在這個類中,我們實現了org.springframework.context.ApplicationListener<ContextRefreshedEvent> ,這個 listener的定義如下:

public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

	/**
	 * Handle an application event.
	 * @param event the event to respond to
	 */
	void onApplicationEvent(E event);

}

接口 EventListener 是 jdk 的一個 marker interface:

package java.util;

/**

- A tagging interface that all event listener interfaces must extend.
- @since JDK1.1
  */
  public interface EventListener {
  }
	

我們在實現listener時,指定了本listener感興趣的事件:ContextRefreshedEvent,這個事件的類繼承關系如下:

那么,這個事件是什么意思呢?

/**
 * Event raised when an {@code ApplicationContext} gets initialized or refreshed.
 *
 * @author Juergen Hoeller
 * @since 04.03.2003
 * @see ContextClosedEvent
 */
@SuppressWarnings("serial")
public class ContextRefreshedEvent extends ApplicationContextEvent {

	/**
	 * Create a new ContextRefreshedEvent.
	 * @param source the {@code ApplicationContext} that has been initialized
	 * or refreshed (must not be {@code null})
	 */
	public ContextRefreshedEvent(ApplicationContext source) {
		super(source);
	}

}

注釋說:Event raised when an {@code ApplicationContext} gets initialized or refreshed.,那么意思就是,該事件,是在上下文初始化完成后被發布。

這樣的話,就能保證,在我們listener監聽到這個事件的時候,整個應用上下文已經可以使用了。

一覽Spring事件監聽機制

我們再通過debug,來看看其真實的調用時機:

上圖紅框處,對spring上下文進行refresh,refresh就是spring 最核心的部分了,基本上,看懂了這個函數,就懂了一半:

// Prepare this context for refreshing.
prepareRefresh();

// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();

// Prepare the bean factory for use in this context. 
// 實例化beanFactory
prepareBeanFactory(beanFactory);

try {
  // Allows post-processing of the bean factory in context subclasses.
  // 對beanFactory進行處理
  postProcessBeanFactory(beanFactory);

  // Invoke factory processors registered as beans in the context.
  // BeanFactoryPostProcessor開始作用的地方,這里會調用所有的beanFactory后置處理器
  invokeBeanFactoryPostProcessors(beanFactory);

  // Register bean processors that intercept bean creation.
  // 注冊 bean的后置處理器到beanFactory,注意,截止目前,還沒開始實例化bean(除了少數幾個內部bean)
  registerBeanPostProcessors(beanFactory);

  // Initialize message source for this context. 
  // 注冊國際化相關bean
  initMessageSource();

  // Initialize event multicaster for this context.
  // 注冊事件發布器,這個和本文主題大有關系
  initApplicationEventMulticaster();

  // Initialize other special beans in specific context subclasses.
  //注意上面這行注釋,這個類是交給子類覆蓋的,比如,在 org.springframework.web.context.support.AbstractRefreshableWebApplicationContext中,實例化了	org.springframework.ui.context.ThemeSource
  onRefresh();

  // Check for listener beans and register them.
  // 從spring容器上下文中,查找ApplicationListener類型的監聽器,添加到前兩步,初始化的事件發布器中
  registerListeners();

  // Instantiate all remaining (non-lazy-init) singletons.
  //注意:截止到目前為止,beanFactory里面基本還是空空如也,沒有bean,只有BeanDefinition,在這一步才會   //根據那些BeanDefinition來實例化那些:非lazy-init的bean
  finishBeanFactoryInitialization(beanFactory);

  // Last step: publish corresponding event.
  // 發布:容器完成初始化的事件
  finishRefresh();
}

上面基本都加了注釋,比較容易懂,需要重點關注的是:

  1. 事件發布器初始化
initApplicationEventMulticaster();

這一步會生成一個org.springframework.context.event.ApplicationEventMulticaster,存儲在org.springframework.context.support.AbstractApplicationContext#applicationEventMulticaster

該事件發布器的接口主要有(去除了無關方法):

    /**
	 * Add a listener to be notified of all events.
	 * @param listener the listener to add
	 */
	void addApplicationListener(ApplicationListener<?> listener);

	/**
	 * Multicast the given application event to appropriate listeners.
	 * <p>Consider using {@link #multicastEvent(ApplicationEvent, ResolvableType)}
	 * if possible as it provides a better support for generics-based events.
	 * @param event the event to multicast
	 */
	void multicastEvent(ApplicationEvent event);

從上面可以看出,該接口主要是維護監聽器ApplicationListener,以及進行事件發布。

  1. 注冊監聽器

    // 注冊listeners
    protected void registerListeners() {
      // Register statically specified listeners first.
      for (ApplicationListener<?> listener : getApplicationListeners()) {
        getApplicationEventMulticaster().addApplicationListener(listener);
      }
    
      // Do not initialize FactoryBeans here: We need to leave all regular beans
      // uninitialized to let post-processors apply to them!
      //這里的這句注釋也很魔性,哈哈,側面說明了,截至目前,beanFactory都是沒有bean實例存在的,bean還沒   //有實例化
      String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
      for (String listenerBeanName : listenerBeanNames) {
        getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
      }
     
    
  2. beanFactory初始化完成后,發布事件

    protected void finishRefresh() {
    
      // Publish the final event.
      // 發布上下文refresh完畢的事件,通知listener
      publishEvent(new ContextRefreshedEvent(this));
    }
    

    這里,publishEvent實現如下:

    	protected void publishEvent(Object event, ResolvableType eventType) {
    		// Decorate event as an ApplicationEvent if necessary
    		ApplicationEvent applicationEvent;
    		if (event instanceof ApplicationEvent) {
    			applicationEvent = (ApplicationEvent) event;
    		}
    
    		getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
    	}
    

方案1:參考spring,實現自己的事件監聽機制,解耦業務代碼

項目源碼地址:https://gitee.com/ckl111/spring-event-publish-demo.git

項目結構如下:

  1. 定義與實現事件發布器

    package com.ceiec.base.event;
    
    import org.springframework.context.event.ApplicationEventMulticaster;
    
    /**
     * desc:
     * 參考spring的設計
     * {@link ApplicationEventMulticaster}
     * @author : ckl
     * creat_date: 2019/11/16 0016
     * creat_time: 10:40
     **/
    public interface ICommonApplicationEventMulticaster {
        /**
         * Add a listener to be notified of all events.
         * @param listener the listener to add
         */
        void addApplicationListener(ICommonApplicationEventListener<?> listener);
    
    
        /**
         * Multicast the given application event to appropriate listeners.
         * @param event the event to multicast
         */
        void multicastEvent(CommonApplicationEvent event);
    
    
    }
    
    
    package com.ceiec.base.event;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.event.SimpleApplicationEventMulticaster;
    import org.springframework.stereotype.Component;
    
    import java.util.LinkedHashSet;
    import java.util.Set;
    
    /**
     * desc:
     * 參考spring
     * {@link SimpleApplicationEventMulticaster}
     *
     * @author : ckl
     * creat_date: 2019/11/16 0016
     * creat_time: 10:40
     **/
    @Slf4j
    @Component
    public class CommonApplicationEventMulticaster implements ICommonApplicationEventMulticaster {
        public final Set<ICommonApplicationEventListener<?>> applicationListeners = new LinkedHashSet<>();
    
        @Override
        public void addApplicationListener(ICommonApplicationEventListener<?> listener) {
            applicationListeners.add(listener);
        }
    
        @Override
        public void removeApplicationListener(ICommonApplicationEventListener<?> listener) {
            applicationListeners.remove(listener);
        }
    
        @Override
        public void removeAllListeners() {
            applicationListeners.clear();
        }
    
        @Override
        @SuppressWarnings({"rawtypes", "unchecked"})
        public void multicastEvent(CommonApplicationEvent event) {
            try {
                for (ICommonApplicationEventListener applicationListener : applicationListeners) {
                    //判斷listener是否支持處理該事件,如果支持,則丟給listener處理
                    if (applicationListener.supportsEventType(event)) {
                        applicationListener.onApplicationEvent(event);
                    }
                }
            } catch (Exception e) {
                log.error("{}",e);
            }
        }
    }
    
    
  2. 定義listener

    package com.ceiec.base.event;
    
    import org.springframework.context.ApplicationListener;
    
    import java.util.EventListener;
    
    /**
     * desc:
     * 參考spring
     * {@link ApplicationListener}
     * @author : ckl
     * creat_date: 2019/11/16 0016
     * creat_time: 10:45
     **/
    public interface ICommonApplicationEventListener<E extends CommonApplicationEvent> extends EventListener {
    
    
        boolean supportsEventType(E event );
    
    
        /**
         * Handle an application event.
         * @param event the event to respond to
         */
        void onApplicationEvent(E event);
    }
    
    
  3. 定義事件類

    package com.ceiec.base.event;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.experimental.Accessors;
    
    import java.util.EventObject;
    
    /**
     * desc:
     * 參考spring的設計
     * {@link org.springframework.context.ApplicationEvent}
     **/
    @Data
    @AllArgsConstructor
    @Accessors(chain = true)
    public  class  CommonApplicationEvent<T>{
        /**
         * 事件類型
         */
        private  IEventType iEventType;
    
        /**
         * 事件攜帶的數據
         */
        private T data;
    
    }
    
    
    
  4. listener的樣例實現,下面的實現,用於警情的跟蹤日志記錄

    package com.ceiec.base.listener;
    
    import com.ceiec.base.applicationevent.SystemEventType;
    import com.ceiec.base.event.CommonApplicationEvent;
    import com.ceiec.base.event.ICommonApplicationEventListener;
    import com.ceiec.base.eventmsg.*;
    import com.ceiec.base.service.IIncidentTraceService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * desc:
     * 接警統計listener
     * @author : ckl
     * creat_date: 2019/11/16 0016
     * creat_time: 9:56
     **/
    @Component
    @Slf4j
    public class IncidentTraceListener implements ICommonApplicationEventListener{
        @Autowired
        private IIncidentTraceService iIncidentTraceService;
    
        @Override
        public boolean supportsEventType(CommonApplicationEvent event) {
            return true;
        }
    
        @Override
        public void onApplicationEvent(CommonApplicationEvent event) {
            log.info("{}",event);
            Object data = event.getData();
            if (event.getIEventType() == SystemEventType.FINISH_INCIDENT_APPEAL) {
                FinishIncidentDisposalEventMsg msg = (FinishIncidentDisposalEventMsg) data;
                iIncidentTraceService.finishIncidentDisposal(msg);
            }
        }
    }
    
    
  5. 啟動程序時,注冊listener到事件發布器

    package com.ceiec.base.init;
    
    import com.ceiec.base.event.CommonApplicationEventMulticaster;
    import com.ceiec.base.event.ICommonApplicationEventListener;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    import org.springframework.stereotype.Component;
    
    import java.util.Collection;
    import java.util.Map;
    
    /**
     * desc:
     *
     * @author : ckl
     * creat_date: 2019/11/11 0011
     * creat_time: 15:46
     **/
    @Component
    @Slf4j
    public class InitRunner implements CommandLineRunner,ApplicationContextAware {
    
    
        private ApplicationContext applicationContext;
    
        @Autowired
        private CommonApplicationEventMulticaster commonApplicationEventMulticaster;
    
        @Override
        public void run(String... args) throws Exception {
            Map<String, ICommonApplicationEventListener> map = applicationContext.getBeansOfType(ICommonApplicationEventListener.class);
            Collection<ICommonApplicationEventListener> listeners = map.values();
            for (ICommonApplicationEventListener listener : listeners) {
                /**
                 * 注冊事件listener到事件發布器
                 */
                log.info("register listener:{}",listener);
                commonApplicationEventMulticaster.addApplicationListener(listener);
            }
    
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }
    
    
  6. 定義endpoint,在service中進行事件發布

    controller:

    @Autowired
    private IIncidentService iIncidentService;
    
    @RequestMapping("/test.do")
    public String finishIncident() {
      iIncidentService.finishIncident();
      return "success";
    }
    

    service:

    @Slf4j
    @Service
    public class IIncidentServiceImpl implements IIncidentService {
    
        @Autowired
        private CommonApplicationEventMulticaster commonApplicationEventMulticaster;
    
    
        @Override
        public void finishIncident() {
            FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg();
            msg.setIncidentInformationId(1111L);
            msg.setDesc("處置完成");
    
            CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg);
            commonApplicationEventMulticaster.multicastEvent(event);
        }
    }
    
  7. 效果展示

    啟動時,注冊listener:

    2019-12-03 16:49:47.477  INFO 493432 --- [           main] com.ceiec.base.BootStrap                 : Started BootStrap in 1.436 seconds (JVM running for 2.22)
    2019-12-03 16:49:47.478  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.IncidentStatisticsListener@c6b2dd9
    2019-12-03 16:49:47.479  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.IncidentTraceListener@3f985a86
    2019-12-03 16:49:47.479  INFO 493432 --- [           main] com.ceiec.base.init.InitRunner           : register listener:com.ceiec.base.listener.MqListener@57a2ed35
    

    瀏覽器中,請求http://localhost:8081/test.do,日志如下:

方案2:直接使用spring內置的事件發布器,解耦業務代碼

源碼:https://gitee.com/ckl111/spring-event-publish-demo/tree/master/spring-event-use-builtin-multicaster

這部分,比上面的方案相比,少了很多東西,只包含如下部分:

總的來說,listener直接繼承spring的ApplicationListener,事件發布器直接使用spring的org.springframework.context.ApplicationEventPublisher

核心代碼:

package com.ceiec.base.service.impl;

import com.ceiec.base.applicationevent.SystemEventType;
import com.ceiec.base.event.CommonApplicationEvent;
import com.ceiec.base.eventmsg.FinishIncidentDisposalEventMsg;
import com.ceiec.base.service.IIncidentService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;

/**
 * desc:
 * 發布事件的業務代碼示例
 * @author : ckl
 * creat_date: 2019/12/2 0002
 * creat_time: 14:27
 **/
@Slf4j
@Service
public class IIncidentServiceImpl implements IIncidentService {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;


    @Override
    public void finishIncident() {
        FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg();
        msg.setIncidentInformationId(1111L);
        msg.setDesc("處置完成");

        CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg);
        applicationEventPublisher.publishEvent(event);
    }
}

package com.ceiec.base.listener;

import com.ceiec.base.applicationevent.SystemEventType;
import com.ceiec.base.event.CommonApplicationEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * desc:
 * 這里,直接繼承 spring 的listener
 * @author : ckl
 * creat_date: 2019/11/16 0016
 * creat_time: 9:56
 **/
@Component
@Slf4j
public class IncidentStatisticsListener implements ApplicationListener<CommonApplicationEvent> {



    @Override
    public void onApplicationEvent(CommonApplicationEvent event) {
        log.info("receive event:{}",event);
    }
}

總結

以上兩種都可以用,一個是自己仿的,定制性強一點;一個直接用spring的。大家自由選擇即可。

通過這樣的方式,我們的業務代碼,可以做到解耦,大體和mq其實是類似的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM