spring中的事件監聽機制


一、前言

事件監聽機制也是設計模式中觀察者模式的一種實現。在spring中主要有實現ApplicationListener 接口和@EventListener 注解兩種方式實現。

實現事件監聽機制需要以下三個角色:

1、事件(event)可以封裝和傳遞監聽器中要處理的參數,如對象或字符串,並作為監聽器中監聽的目標。
2、監聽器(listener)具體根據事件發生的業務處理模塊,這里可以接收處理事件中封裝的對象或字符串。
3、事件發布者(publisher)事件發生的觸發者。

二、ApplicationListener 接口

ApplicationListener 接口的定義如下:

public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
 
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}

  它是一個泛型接口,泛型的類型必須是 ApplicationEvent 及其子類,只要實現了這個接口,那么當容器有相應的事件觸發時,就能觸發 onApplicationEvent 方法。ApplicationEvent 類的子類有很多,Spring 框架自帶的如下幾個。

使用方法很簡單,就是實現一個 ApplicationListener 接口,並且將加入到容器中就行。

@Component
public class MyApplicationListener implements ApplicationListener<ApplicationStartedEvent> {

//    @Override
//    public void onApplicationEvent(ApplicationEvent event) {
//        System.out.println("事件觸發:"+event.getClass().getName());
//    }
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        System.out.println("事件觸發:"+event.getClass().getName());
    }

}

可以看到控制台輸出:這樣就觸發了spring默認的一些事件。

事件觸發:org.springframework.boot.context.event.ApplicationStartedEvent
事件觸發:org.springframework.boot.context.event.ApplicationReadyEvent

自定義事件監聽

定義事件
首先,我們需要定義一個時間(MyTestEvent),需要繼承Spring的ApplicationEvent

public class MyTestEvent extends ApplicationEvent {

    private static final long serialVersionUID = 1L;

    private String msg ;

    public MyTestEvent(Object source,String msg) {
        super(source);
        this.msg = msg;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

定義監聽器
需要定義一下監聽器,自己定義的監聽器需要實現ApplicationListener,同時泛型參數要加上自己要監聽的事件Class名,在重寫的方法onApplicationEvent中,添加自己的業務處理:

@Component
public class MyNoAnnotationListener implements ApplicationListener<MyTestEvent> {

    @Override
    public void onApplicationEvent(MyTestEvent event) {
        System.out.println("非注解監聽器:" + event.getMsg());
    }

}

事件發布
有了事件,有了事件監聽者,那么什么時候觸發這個事件呢?每次想讓監聽器收到事件通知的時候,就可以調用一下事件發布的操作。首先在類里自動注入了ApplicationEventPublisher,這個也就是我們的ApplicationContext,它實現了這個接口。

@RestController
@RequestMapping("testEventController")
public class MyTestEventController {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @RequestMapping(value = "/testPublishEvent1" )
    public void testPublishEvent(){
        applicationEventPublisher.publishEvent(new MyTestEvent(this, "我來了"));
    }

}

輸出結果如下圖所示:

非注解監聽器:我來了

三、@EventListener 注解

從Spring 4.2以后,事件處理不用實現ApplicationListener 的 onApplicationEvent方法了,使用注解@EventListener可以自動關聯相關的ApplicationListener。

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {
    @AliasFor("classes")
    Class<?>[] value() default {};  //監聽的類
 
    @AliasFor("value")
    Class<?>[] classes() default {};
 
    String condition() default "";
}

簡單使用
除了通過實現接口,還可以使用@EventListener 注解,實現對任意的方法都能監聽事件。
在任意方法上標注@EventListener 注解,指定 classes,即需要處理的事件類型,一般就是 ApplicationEven 及其子類,可以設置多項。

@Compoment
public class CustomListener {

    @EventListener(classes = {SpringApplicationEvent.class})
    public void listen(SpringApplicationEvent event) {
        System.out.println("注解事件觸發:" + event.getClass().getName());
    }

}

啟動項目

可以看到控制台和之前的輸出是一樣的:

注解事件觸發:org.springframework.boot.context.event.ApplicationStartedEvent
注解事件觸發:org.springframework.boot.context.event.ApplicationReadyEvent

使用注解的好處是不用每次都去實現ApplicationListener,可以在一個class中定義多個方法,用@EventListener來做方法級別的注解。
和上面類似,事件以及事件發布不需要改變,只要這樣定義監聽器即可。

此時,就可以有一個發布,兩個監聽器監聽到發布的消息了,一個是注解方式,一個是非注解方式
結果:

注解監聽器----1:我來了
非注解監聽器:我來了

原理

@Component
public class MyAnnotationListener {

    @EventListener
    public void listener1(MyTestEvent event) {
        System.out.println("注解監聽器1:" + event.getMsg());
    }
}

其實上面添加@EventListener注解的方法被包裝成了ApplicationListener對象,上面的類似於下面這種寫法,這個應該比較好理解。

@Component
public class MyAnnotationListener implements ApplicationListener<MyTestEvent> {
    
    @Override
    public void onApplicationEvent(MyTestEvent event) {
         System.out.println("注解監聽器1:" + event.getMsg());
    }
}

那么Spring是什么時候做這件事的呢?

查看SpringBoot的源碼,找到下面的代碼,因為我是Tomcat環境,這里創建的ApplicationContextorg.springframework.bootweb.servlet.context.AnnotationConfigServletWebServerApplicationContext

protected ConfigurableApplicationContext createApplicationContext() {
        Class<?> contextClass = this.applicationContextClass;
        if (contextClass == null) {
            try {
                switch (this.webApplicationType) {
                case SERVLET:
                    contextClass = Class.forName(DEFAULT_SERVLET_WEB_CONTEXT_CLASS);
                    break;
                case REACTIVE:
                    contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
                    break;
                default:
                    contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
                }
            }
            catch (ClassNotFoundException ex) {
                throw new IllegalStateException(
                        "Unable create a default ApplicationContext, " + "please specify an ApplicationContextClass",
                        ex);
            }
        }
        return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
    }

他的構造方法如下:

public AnnotationConfigServletWebServerApplicationContext() {
        this.reader = new AnnotatedBeanDefinitionReader(this);
        this.scanner = new ClassPathBeanDefinitionScanner(this);
    }

進到AnnotatedBeanDefinitionReader里面

public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) {
        Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
        Assert.notNull(environment, "Environment must not be null");
        this.registry = registry;
        this.conditionEvaluator = new ConditionEvaluator(registry, environment, null);
        AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry);
    }

再進到AnnotationConfigUtils的方法里面,省略了一部分代碼,可以看到他注冊了一個EventListenerMethodProcessor類到工廠了。這是一個BeanFactory的后置處理器。

public static Set<BeanDefinitionHolder> registerAnnotationConfigProcessors(
            BeanDefinitionRegistry registry, @Nullable Object source) {

        DefaultListableBeanFactory beanFactory = unwrapDefaultListableBeanFactory(registry);
    ......
    .....
    ......    

    if (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) {
            RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class);
            def.setSource(source);
            beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME));
        }
    
    ......
    ......

        return beanDefs;
    }

查看這個BeanFactory的后置處理器EventListenerMethodProcessor,下面方法,他會遍歷所有bean,找到其中帶有@EventListener的方法,將它包裝成ApplicationListenerMethodAdapter,注冊到工廠里,這樣就成功注冊到Spring的監聽系統里了。

    @Override
    public void afterSingletonsInstantiated() {
        ConfigurableListableBeanFactory beanFactory = this.beanFactory;
        Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
        String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
        for (String beanName : beanNames) {
            if (!ScopedProxyUtils.isScopedTarget(beanName)) {
                Class<?> type = null;
                try {
                    type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
                }
                catch (Throwable ex) {
                    // An unresolvable bean type, probably from a lazy bean - let's ignore it.
                    if (logger.isDebugEnabled()) {
                        logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
                    }
                }
                if (type != null) {
                    if (ScopedObject.class.isAssignableFrom(type)) {
                        try {
                            Class<?> targetClass = AutoProxyUtils.determineTargetClass(
                                    beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
                            if (targetClass != null) {
                                type = targetClass;
                            }
                        }
                        catch (Throwable ex) {
                            // An invalid scoped proxy arrangement - let's ignore it.
                            if (logger.isDebugEnabled()) {
                                logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
                            }
                        }
                    }
                    try {
                        processBean(beanName, type);
                    }
                    catch (Throwable ex) {
                        throw new BeanInitializationException("Failed to process @EventListener " +
                                "annotation on bean with name '" + beanName + "'", ex);
                    }
                }
            }
        }
    }




private void processBean(final String beanName, final Class<?> targetType) {
        if (!this.nonAnnotatedClasses.contains(targetType) &&
                !targetType.getName().startsWith("java") &&
                !isSpringContainerClass(targetType)) {

            Map<Method, EventListener> annotatedMethods = null;
            try {
                annotatedMethods = MethodIntrospector.selectMethods(targetType,
                        (MethodIntrospector.MetadataLookup<EventListener>) method ->
                                AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
            }
            catch (Throwable ex) {
                // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
                if (logger.isDebugEnabled()) {
                    logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
                }
            }

            if (CollectionUtils.isEmpty(annotatedMethods)) {
                this.nonAnnotatedClasses.add(targetType);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
                }
            }
            else {
                // Non-empty set of methods
                ConfigurableApplicationContext context = this.applicationContext;
                Assert.state(context != null, "No ApplicationContext set");
                List<EventListenerFactory> factories = this.eventListenerFactories;
                Assert.state(factories != null, "EventListenerFactory List not initialized");
                for (Method method : annotatedMethods.keySet()) {
                    for (EventListenerFactory factory : factories) {
                        if (factory.supportsMethod(method)) {
                            Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
                            ApplicationListener<?> applicationListener =
                                    factory.createApplicationListener(beanName, targetType, methodToUse);
                            if (applicationListener instanceof ApplicationListenerMethodAdapter) {
                                ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
                            }
                            context.addApplicationListener(applicationListener);
                            break;
                        }
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
                            beanName + "': " + annotatedMethods);
                }
            }
        }
    }

由方法生成Listener的邏輯由EventListenerFactory完成的,這又分為兩種,一種是普通的@EventLintener 另一種是@TransactionalEventListener ,是由兩個工廠處理的。


上面介紹了@EventListener的原理,其實上面方法里還有一個@TransactionalEventListener注解,其實原理是一模一樣的,只是這個監聽者可以選擇在事務完成后才會被執行,事務執行失敗就不會被執行。

這兩個注解的邏輯是一模一樣的,並且@TransactionalEventListener本身就被標記有@EventListener

只是最后生成監聽器時所用的工廠不一樣而已。

總結:

  1. 同樣的事件能有多個監聽器 -- 經過測試是可以的
  2. 事件監聽操作和發布事件的操作是同步的嗎? -- 是的,所以如果有事務,監聽操作也在事務內。如果需要改為異步也可以手動使用多線程或者借用異步注解實現。
  3. EventListener這個注解其實可以接受參數來表示事件源的,有兩個參數classes和condition,顧明思議前者是表示哪一個事件類,后者是當滿足什么條件是會調用該方法,但其實都可以不用用到,直接在方法上寫參數xxEvent就行

摘自:https://blog.csdn.net/m0_49558851/article/details/118903432

定義event事件模型

  首先定義一個event事件模型。ComplaintDTO為事件發布者傳送給事件監聽者的一些信息,入投訴的訂單信息、客戶信息等。可根據自身需要進行調整,調整之后需要變動相應的get/set方法和構造方法。

import org.springframework.context.ApplicationEvent;

public class ComplaintEvent extends ApplicationEvent {

    private ComplaintDTO complaintDTO;

    public ComplaintEvent(Object source,ComplaintDTO complaintDTO) {
        super(source);
        this.complaintDTO = complaintDTO;
    }

    public ComplaintDTO getComplaintDTO() {
        return complaintDTO;
    }

    public void setComplaintDTO(ComplaintDTO complaintDTO) {
        this.complaintDTO = complaintDTO;
    }
}

發布事件

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@Override
public ComplaintVO createComplaint(ComplaintDTO complaintDTO) {
    //TODO 你的業務代碼
    //發負一屏消息和短信
    try{
        ComplaintEvent complaintEvent = new ComplaintEvent(this,complaintDTO);
        applicationEventPublisher.publishEvent(complaintEvent);
    }catch (Exception e){
        log.info("投訴發短信或負一屏消息失敗:[{}]",e.getMessage());
    }
    vo = Transferor.entity(po, ComplaintVO.class);
    return vo;
}

事件監聽者

事件的監聽者實現ApplicationListener接口,重寫onApplicationEvent方法實現事件處理
這里需要注意的是ApplicationEvent默認並不是異步的,如果需要異步需要我們在方法上加上@Async注解,並在啟動類添加@EnableAsync開啟異步

@Log4j2
@Component
public class ComplaintEventListener implements ApplicationListener<ComplaintEvent> {
    @Async
    @Override
    public void onApplicationEvent(ComplaintEvent event) {
        log.info("訂單投訴發短信與負一屏消息:[{}]", JSON.toJSONString(event));
        //發短信
        try {
            sendSmsMsg(event);
        }catch (Exception e){
            log.info("訂單投訴發短信失敗:[{}]",e.getMessage());
        }
        //發負一屏消息
        try {
            sendHwPubMsg(event);
        }catch (Exception e){
            log.info("訂單投訴發負一屏消息失敗:[{}]",e.getMessage());
        }
    }
}

從Spring 4.2以后,事件處理不用實現ApplicationListener 的 onApplicationEvent方法了,使用注解@EventListener可以自動關聯相關的ApplicationListener。
所以上面的代碼其實可以改成成下面的樣子

@Log4j2
@Component
public class ComplaintEventListener{
    @Async
    @EventListener
    public void sendMsg(ComplaintEvent event) {
        log.info("訂單投訴發短信與負一屏消息:[{}]", JSON.toJSONString(event));
        //發短信
        try {
            sendSmsMsg(event);
        }catch (Exception e){
            log.info("訂單投訴發短信失敗:[{}]",e.getMessage());
        }
        //發負一屏消息
        try {
            sendHwPubMsg(event);
        }catch (Exception e){
            log.info("訂單投訴發負一屏消息失敗:[{}]",e.getMessage());
        }
    }
}

 

摘自:https://fangshixiang.blog.csdn.net/article/details/91897175

Spring事務監聽機制---使用@TransactionalEventListener處理數據庫事務提交成功后再執行操作

@TransactionalEventListener
  首先不得不說,從命名中就可以直接看出,它就是個EventListener
在Spring4.2+,有一種叫做@TransactionEventListener的方式,能夠 控制 在事務的時候Event事件的處理方式。

  我們知道,Spring的事件監聽機制(發布訂閱模型)實際上並不是異步的(默認情況下),而是同步的來將代碼進行解耦。而@TransactionEventListener仍是通過這種方式,只不過加入了回調的方式來解決,這樣就能夠在事務進行Commited,Rollback…等的時候才會去進行Event的處理,達到事務同步的目的

Demo演示

@Slf4j
@Service
public class HelloServiceImpl implements HelloService {

    @Autowired
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Transactional
    @Override
    public Object hello(Integer id) {
        // 向數據庫插入一條記錄
        String sql = "insert into user (id,name,age) values (" + id + ",'fsx',21)";
        jdbcTemplate.update(sql);

        // 發布一個自定義的事件~~~
        applicationEventPublisher.publishEvent(new MyAfterTransactionEvent("我是和事務相關的事件,請事務提交后執行我~~~", id));
        return "service hello";
    }

    @Slf4j
    @Component
    private static class MyTransactionListener {
        @Autowired
        private JdbcTemplate jdbcTemplate;

        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        private void onHelloEvent(HelloServiceImpl.MyAfterTransactionEvent event) {
            Object source = event.getSource();
            Integer id = event.getId();

            String query = "select count(1) from user where id = " + id;
            Integer count = jdbcTemplate.queryForObject(query, Integer.class);
            
            // 可以看到 這里的count是1  它肯定是在上面事務提交之后才會執行的
            log.info(source + ":" + count.toString()); //我是和事務相關的事件,請事務提交后執行我~~~:1
        }
    }

    // 定一個事件,繼承自ApplicationEvent 
    private static class MyAfterTransactionEvent extends ApplicationEvent {

        private Integer id;

        public MyAfterTransactionEvent(Object source, Integer id) {
            super(source);
            this.id = id;
        }

        public Integer getId() {
            return id;
        }
    }
}

首先確認,通過@TransactionalEventListener注解的方式,是完全可以處理這種事務問題的。
接下來先看看這個注解本身,有哪些屬性是我們可用、可控的:

// @since 4.2  顯然,注解的方式提供得還是挺晚的,而API的方式第一個版本就已經提供了
// 另外最重要的是,它頭上有一個注解:`@EventListener`  so  
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener //有類似於注解繼承的效果
public @interface TransactionalEventListener {
    // 這個注解取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
    // 各個值都代表什么意思表達什么功能,非常清晰~
    // 需要注意的是:AFTER_COMMIT + AFTER_COMPLETION是可以同時生效的
    // AFTER_ROLLBACK + AFTER_COMPLETION是可以同時生效的
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

    // 若沒有事務的時候,對應的event是否已經執行  默認值為false表示  沒事務就不執行了
    boolean fallbackExecution() default false;

    // 這里巧妙的用到了@AliasFor的能力,放到了@EventListener身上
    // 注意:一般我建議都需要指定此值,否則默認可以處理所有類型的事件  范圍太廣了
    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] value() default {};
    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] classes() default {};
    
    String condition() default "";
}

可以看到它實際上相當於在@EventListener的基礎上擴展了兩個屬性,來對事務針對性的處理。
根據前面的Spring事件監聽機制的理論知識得知:它的注冊原理顯然也在EventListenerMethodProcessor中,只不過它使用的是TransactionalEventListenerFactory最終來生成一個Adapter適配器:

public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
    private int order = 50; // 執行時機還是比較早的~~~(默認的工廠是最低優先級)
    
    // 顯然這個工廠只會生成標注有此注解的handler~~~
    @Override
    public boolean supportsMethod(Method method) {
        return AnnotatedElementUtils.hasAnnotation(method, TransactionalEventListener.class);
    }

    // 這里使用的是ApplicationListenerMethodTransactionalAdapter,而非ApplicationListenerMethodAdapter
    // 雖然ApplicationListenerMethodTransactionalAdapter是它的子類
    @Override
    public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
        return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
    }
}

  通過這個工廠,會把每個標注有@TransactionalEventListener注解的方法最終都包裝成一個ApplicationListenerMethodTransactionalAdapter,它是一個ApplicationListener,最終注冊進事件發射器的容器里面。

ApplicationListenerMethodTransactionalAdapter

它是包裝@TransactionalEventListener的適配器,繼承自ApplicationListenerMethodAdapter~

// @since 4.2
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

    private final TransactionalEventListener annotation;

    // 構造函數
    public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
        // 這一步的初始化交給父類,做了很多事情   強烈建議看看上面推薦的事件/監聽的博文
        super(beanName, targetClass, method);

        // 自己個性化的:和事務相關
        TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
        if (ann == null) {
            throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
        }
        this.annotation = ann;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        // 若**存在事務**:毫無疑問 就注冊一個同步器進去~~
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
            TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
        }
        // 若fallbackExecution=true,那就是表示即使沒有事務  也會執行handler
        else if (this.annotation.fallbackExecution()) {
            if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
                logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");
            }
            processEvent(event);
        }
        else {
            // No transactional event execution at all
            // 若沒有事務,輸出一個debug信息,表示這個監聽器沒有執行~~~~
            if (logger.isDebugEnabled()) {
                logger.debug("No transaction is active - skipping " + event);
            }
        }
    }

    // TransactionSynchronizationEventAdapter是一個內部類,它是一個TransactionSynchronization同步器
    // 此類實現也比較簡單,它的order由listener.getOrder();來決定
    private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
        return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
    }


    private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {

        private final ApplicationListenerMethodAdapter listener;
        private final ApplicationEvent event;
        private final TransactionPhase phase;

        public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
                ApplicationEvent event, TransactionPhase phase) {
            this.listener = listener;
            this.event = event;
            this.phase = phase;
        }

        // 它的order又監聽器本身來決定  
        @Override
        public int getOrder() {
            return this.listener.getOrder();
        }

        // 最終都是委托給了listenner來真正的執行處理  來執行最終處理邏輯(也就是解析classes、condtion、執行方法體等等)
        @Override
        public void beforeCommit(boolean readOnly) {
            if (this.phase == TransactionPhase.BEFORE_COMMIT) {
                processEvent();
            }
        }

        // 此處結合status和phase   判斷是否應該執行~~~~
        // 此處小技巧:我們發現TransactionPhase.AFTER_COMMIT也是放在了此處執行的,只是它結合了status進行判斷而已~~~
        @Override
        public void afterCompletion(int status) {
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent();
            } else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent();
            } else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent();
            }
        }

        protected void processEvent() {
            this.listener.processEvent(this.event);
        }
    }
}

從源碼里可以看出,其實@TransactionalEventListener的底層實現原理還是事務同步器:TransactionSynchronization和TransactionSynchronizationManager。

以上,建立在小伙伴已經知曉了Spring事件/監聽機制的基礎上,回頭看Spring事務的監聽機制其實就非常非常的簡單了(沒有多少新東西)。

至於在平時業務編碼中處理Spring的事務同步的時候選擇哪種方式呢??我覺得兩種方式都是ok的,看各位的喜好了(我個人偏愛注解方式,耦合度低很多並且還可以使用事件鏈,有時候非常好使)

需要提一句:@TransactionalEventListener@EventListener一樣是存在一個加載時機問題的,若你對加載時機有嚴格要求和把控,建議使用API的方式而非注解方式,避免監聽器未被執行而導致邏輯出錯~


 摘自:https://www.jianshu.com/p/96181bba0326

spring event demo - 異步模式

加上配置,注入一個simpleApplicationEventMulticaster。將這個multicaster的taskexecutor設置為自定義的線程池。
當publish event時,當前線程會到從線程池里來取線程,進行invoke listener,以及執行subscriber邏輯。

@Configuration
public class EventConfig {

    @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
    public SimpleApplicationEventMulticaster myEventMulticaster(){
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
        return simpleApplicationEventMulticaster;
    }
    
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(300);
        executor.setThreadNamePrefix("thread-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

或者這樣傳入配置的線程池

@Bean
    public SimpleApplicationEventMulticaster applicationEventMulticaster(@Qualifier("defaultThreadPoolExecutor") ThreadPoolExecutor defaultThreadPoolExecutor) {
        SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
        simpleApplicationEventMulticaster.setTaskExecutor(defaultThreadPoolExecutor);
        return simpleApplicationEventMulticaster;
    }

output:
從output可以看出,主線程publish event,而subscriber內容由線程池中的線程執行。

Thread main: Publish Event
thread-1: Subscribed event - test

源碼解析

public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {

/*setTaskExecutor

Set the TaskExecutor to execute application listeners with.
Default is a SyncTaskExecutor, executing the listeners synchronously in the calling thread.
Consider specifying an asynchronous TaskExecutor here to not block the caller until all listeners have been executed. However, note that asynchronous execution will not participate in the caller's thread context (class loader, transaction association) unless the TaskExecutor explicitly supports this.
*/
    public void setTaskExecutor(@Nullable Executor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

Spring容器中的通知機制是如何實現的

初始化SimpleApplicationEventMulticaster並注入容器

Spring在初始化bean時,會進行ApplicationEventMulticaster的初始化。
會先檢查是否有local的applicationEventMulticaster,如果有,那么就會創建local也就是自定義的applicationEventMulticaster,放入容器;如果沒有,就會創建默認的SimpleApplicationEventMulticaster放入容器。

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {

    protected void initApplicationEventMulticaster() {
        ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
        if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
            this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
            }
        } else {
            this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
            beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("No 'applicationEventMulticaster' bean, using [" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
            }
        }
    }

}

注冊所有的監聽器

接下來,spring容器會注冊所有的監聽器:

public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {

    protected void registerListeners() {
        Iterator var1 = this.getApplicationListeners().iterator();

        while(var1.hasNext()) {
            ApplicationListener<?> listener = (ApplicationListener)var1.next();
            this.getApplicationEventMulticaster().addApplicationListener(listener);
        }

        String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false);
        String[] var7 = listenerBeanNames;
        int var3 = listenerBeanNames.length;

        for(int var4 = 0; var4 < var3; ++var4) {
            String listenerBeanName = var7[var4];
            this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
        }

        Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
        this.earlyApplicationEvents = null;
        if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
            Iterator var9 = earlyEventsToProcess.iterator();

            while(var9.hasNext()) {
                ApplicationEvent earlyEvent = (ApplicationEvent)var9.next();
                this.getApplicationEventMulticaster().multicastEvent(earlyEvent);
            }
        }
    }

}

 addApplicationListener會將listener和相應的事件類型記錄下來:

public abstract class AbstractApplicationEventMulticaster implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

    public void addApplicationListener(ApplicationListener<?> listener) {
        synchronized(this.defaultRetriever) {
            Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
            if (singletonTarget instanceof ApplicationListener) {
                this.defaultRetriever.applicationListeners.remove(singletonTarget);
            }

            this.defaultRetriever.applicationListeners.add(listener);
            this.retrieverCache.clear();
        }
    }

}

事件發生時,multicast event

  • 調用multicaster的multicastEvent方法
    上面已經提到,事件發生時,會調用AbstractApplicationContext的publishEvent方法,它會調用注入到容器里的multicaster執行multicastEvent。

  • 根據事件類型,找到所有監聽這類事件的listener

Iterator var5 = this.getApplicationListeners(event, type).iterator();

詳細看下getApplicationListeners方法:

    protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
        Object source = event.getSource();
        Class<?> sourceType = source != null ? source.getClass() : null;
        AbstractApplicationEventMulticaster.ListenerCacheKey cacheKey = new AbstractApplicationEventMulticaster.ListenerCacheKey(eventType, sourceType);
        AbstractApplicationEventMulticaster.CachedListenerRetriever newRetriever = null;
        AbstractApplicationEventMulticaster.CachedListenerRetriever existingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this.retrieverCache.get(cacheKey);
        if (existingRetriever == null && (this.beanClassLoader == null || ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
            newRetriever = new AbstractApplicationEventMulticaster.CachedListenerRetriever();
            existingRetriever = (AbstractApplicationEventMulticaster.CachedListenerRetriever)this.retrieverCache.putIfAbsent(cacheKey, newRetriever);
            if (existingRetriever != null) {
                newRetriever = null;
            }
        }

        if (existingRetriever != null) {
            Collection<ApplicationListener<?>> result = existingRetriever.getApplicationListeners();
            if (result != null) {
                return result;
            }
        }

        return this.retrieveApplicationListeners(eventType, sourceType, newRetriever);
    }
  •  對於每一個listener,invoke該listener
        while(var5.hasNext()) {
            ApplicationListener<?> listener = (ApplicationListener)var5.next();
            if (executor != null) {
                executor.execute(() -> {
                    this.invokeListener(listener, event);
                });
            } else {
                this.invokeListener(listener, event);
            }
        }

 



參考文章:

https://blog.csdn.net/erbao_2014/article/details/68924231

https://www.huangchaoyu.com/2020/08/08/SpringBoot-%E6%B6%88%E6%81%AF%E6%9C%BA%E5%88%B6,EventListener%E6%B3%A8%E8%A7%A3%E5%8E%9F%E7%90%86


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM