Spring增加了event事件機制,方便了日常項目中需要業務解藕的開發場景,也支持異步和重試機制,很多場景都可以使用
目前開發的項目中需要記錄一些異常單據信息的記錄,主流程中正常單據接單的流程不動,在識別出某種異常后記錄,但是這種記錄不應該影響主流程,所以考慮用Spring的event異步事件處理
1.什么是事件機制
Java的設計模式中有兩種設計模式,觀察者模式和監聽者模式
監聽者模式:當有一個事件發生時,通知關注此事件的對象,告訴事件發生了就發布這個事件,那怎么知道通知誰呢,所以需要知道誰關心這個事件,那么就需要對這個事件關心的對象中定義一個事件,如果事件發生了,關心的這個對象就監聽到了,可以執行相應的操作。
例如日常生活中的訂閱報紙,報紙老板A,現在小明和老板打招呼說我要訂報紙(這個過程就相當於觀察者的注冊),老板A就會拿出自己的小本本記下小明,下次小王、小芳也是類似的操作,現在老板A就有了三個觀察者了,然后老板會自動的把報紙送到三位的家里,突然有一天小明說不想訂報紙了,老板就在自己的小本本上划掉小明的名字(觀察者的取消注冊),等到送報紙的時候就不再送到小明家里。

如果有其他要通過這個事件傳遞的對象,可以在定義事件的時候增加屬性

在我的項目中,我們使用了SmartApplicationListener,
SmartApplicationListener是高級監聽器,是ApplicationListener的子類,能夠實現有序監聽。
public interface SmartApplicationListener extends ApplicationListener<ApplicationEvent>, Ordered { /** * Determine whether this listener actually supports the given event type. */ boolean supportsEventType(Class<? extends ApplicationEvent> eventType); /** * Determine whether this listener actually supports the given source type. */ boolean supportsSourceType(Class<?> sourceType); }
對上述對示例中對 RejectWaybillRecordEvent創建一個監聽者,實現SmartApplicationListener接口:
@Component public class RejectWaybillRecordEventListenerDemo implements SmartApplicationListener { @Override @AsyncEnabled public void onApplicationEvent(ApplicationEvent event) { //do something } @Override public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { return RejectWaybillRecordEvent.class.equals(eventType); } @Override public boolean supportsSourceType(Class<?> sourceType) { return true; } @Override public int getOrder() { return 0; } }
最后發布事件,使用spring的話可以直接注入,所以我們直接注入ApplicationContext 對象調用發布方法,我們定義個類用於發布事件:
public class EventPublisherAssistant implements ApplicationContextAware { private static ApplicationContext applicationContext; public EventPublisherAssistant() { } public static void publishEvent(BaseApplicationEvent event) { applicationContext.publishEvent(event); } public void setApplicationContext(ApplicationContext context) throws BeansException { applicationContext = context; } }
那么在需要發布事件的地方調用:
EventPublisherAssistant.publishEvent(new RejectWaybillRecordEvent(param1, waybillCode(), vendorOrderCode()));
@Override public void multicastEvent(ApplicationEvent event) { multicastEvent(event, resolveDefaultEventType(event)); } @Override public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(new Runnable() { @Override public void run() { invokeListener(listener, event); } }); } else { invokeListener(listener, event); } } }
所以重點在是否配置了executor
那么問題來了,怎么注入一個線程池呢,往哪個類注入呢
最簡單的方式是手動實現applicationEventMulticaster的bean,然后利用Set注入的方法注入了一個線程池,線程池也需要實例化,最簡單的是直接使用spring自帶的簡單異步任務線程池
<!-- 任務執行器 --> <task:executor id="executor" pool-size="10" /> <!-- 名字必須是applicationEventMulticaster,因為AbstractApplicationContext默認找個 --> <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster"> <!-- 注入任務執行器 這樣就實現了異步調用 --> <property name="taskExecutor" ref="executor"/> </bean>
這種方式的配置缺點是全局的,要么全部異步,要么全部同步,如果想要同步,刪除掉 <property name="taskExecutor" ref="executor"/> 這個屬性
那如何更優雅靈活的支持異步呢?
Spring提供了@Aync注解來完成異步調用,我們在監聽事件處理的onApplicationEvent方法上加上@Aync注解即可
這個注解用於標注某個方法或某個類里面的所有方法都是需要異步處理的。被注解的方法被調用的時候,會在新線程中執行,而調用它的方法會在原來的線程中執行。這樣可以避免阻塞、以及保證任務的實時性
前提是:
<!-- 開啟@AspectJ AOP代理 --> <aop:aspectj-autoproxy proxy-target-class="true"/> <task:executor id="executor" pool-size="10" /> <task:annotation-driven executor="executor" />
在我們的項目中的使用是通過定義bean的方式加載的
首先定一個抽象基類實現SmartApplicationListener,定義方法onApplicationEvent
public abstract class BaseApplicationListener implements SmartApplicationListener, ApplicationContextAware, InitializingBean { protected Logger logger = LoggerFactory.getLogger(this.getClass()); private static final String APPLICATION_EVENT_EXECUTOR_BEAN_NAME = "applicationEventExecutor"; protected ApplicationContext applicationContext; private Executor defaultExecutor = null; private ConcurrentHashMap<String, Executor> asyncExecutorMap = new ConcurrentHashMap(); public void onApplicationEvent(final ApplicationEvent applicationEvent) { // 1.獲取異步執行線程池 Executor asyncExecutor = this.getAsyncExecutor(); // 2. 如果有異步線程池則使用異步模式,否則同步執行 if (asyncExecutor != null) { asyncExecutor.execute(new Runnable() { public void run() { doInternalEventResponse(applicationEvent); } }); } else { doInternalEventResponse(applicationEvent); } } /** * 獲取異步執行線程池 * * @return */ private Executor getAsyncExecutor() { try { Method onEventResponseMethod = this.getClass().getMethod("onEventResponse", ApplicationEvent.class); AsyncEnabled asyncAnnotation = onEventResponseMethod.getAnnotation(AsyncEnabled.class); if (asyncAnnotation == null) { return null; } String asyncExecutorName = asyncAnnotation.executor(); // 如果指定線程池為空,則使用默認池 if (asyncExecutorName == null || "".equals(asyncExecutorName.trim())) { return this.defaultExecutor; } else if (this.asyncExecutorMap.containsKey(asyncExecutorName)) { return asyncExecutorMap.get(asyncExecutorName); } else if (applicationContext.containsBean(asyncExecutorName)) { Executor asyncExecutor = this.applicationContext.getBean(asyncExecutorName, Executor.class); // 如果為找到指定的Executor,則使用默認線程池 if (asyncExecutor == null) { asyncExecutor = this.defaultExecutor; } this.asyncExecutorMap.put(asyncExecutorName, asyncExecutor); return asyncExecutor; } else { return this.defaultExecutor; } } catch (NoSuchMethodException e) { logger.info("基礎Event-listener:getAsyncExecutor處理發生失敗", e.getMessage()); } return null; } private void doInternalEventResponse(ApplicationEvent applicationEvent){ try { // 業務事件響應前-可擴展Event保存等操作 onEventResponse(applicationEvent); // 業務事件響應后-可擴展Event處理標記等操作 } catch (Exception e) { this.logger.error("Event Response 處理異常", e); throw new RuntimeException(e); } finally { Profiler.registerInfoEnd(callerInfo); } } /** * 事件監聽處理 */ public abstract void onEventResponse(ApplicationEvent applicationEvent); public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public void afterPropertiesSet() throws Exception { try { if (!applicationContext.containsBean(APPLICATION_EVENT_EXECUTOR_BEAN_NAME)) { this.logger.error("未初始化APPLICATION_EVENT_EXECUTOR,所有異步EventListener將自動調整為同步."); return; } this.defaultExecutor = this.applicationContext.getBean(APPLICATION_EVENT_EXECUTOR_BEAN_NAME, Executor.class); } catch (Exception e) { logger.error("從ApplicationContext中獲取APPLICATION_EVENT_EXECUTOR異常,所有異步EventListener將自動調整為同步."); } } }
定義的注解AsyncEnabled
/** * <p> * listener是否為異步執行注解描述 * executor:異步執行是選擇的線程池,如果為空,則采用默認線程池 * 如果不存在默認線程池,則忽略異步設置,同步執行 */ @Target(value = {ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface AsyncEnabled { /** * 異步執行線程池名稱 * * @return */ String executor() default "applicationEventExecutor"; }
<bean id="applicationEventExecutor" class="java.util.concurrent.ThreadPoolExecutor"> <constructor-arg index="0" value="2"/> <constructor-arg index="1" value="5"/> <constructor-arg index="2" value="30"/> <constructor-arg index="3" value="MINUTES"/> <constructor-arg index="4" ref="eventDefaultQueue"/> <constructor-arg index="5" ref="eventDiscardPolicy"/> </bean>
使用的是jdk工具類中的ThreadPoolExecutor,這個類有一個6個屬性的構造方法
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
-- corePoolSize:最小的線程數
--maximumPoolSize:最大的線程數
--keepAliveTime:超過corePoolSize的那些線程,任務完成后,再經過這個時長會被結束掉
--unit:keepAliveTime 對應的時間單位
--workQueue:線程池所使用的緩沖隊列,這個緩沖隊列的長度決定了能夠緩沖的最大數量,兩種常用的隊列方式ArrayBlockingQueue 和LinkedBlockingQueue 后面具體說明
--handler:事件的兜底處理方案,默認的策略是AbortPolicy,對拒絕任務拋棄處理,並且拋出異常
如果不想采用默認的策略,就配置其他的策略,也是需要實例化的:
<!--事件異步默認線程池 隊列的方式:ArrayBlockingQueue--> <bean id="eventDefaultQueue" class="java.util.concurrent.ArrayBlockingQueue"> <constructor-arg index="0" value="1000"/> </bean> <!--事件異步默認線程池 拒絕策略:拋棄 --> <bean id="eventDiscardPolicy" class="java.util.concurrent.ThreadPoolExecutor.DiscardPolicy"/>
到這里,通過自定義listener實現事件監聽的異步處理就完成了
下面是一些相關的知識點:
ArrayBlockingQueue 和LinkedBlockingQueu
ArrayBlockingQueue:是一個阻塞式的隊列,繼承自AbstractBlockingQueue,間接的實現了Queue接口和Collection接口。底層以數組的形式保存數據(實際上可看作一個循環數組)
LinkedBlockingQueu:也是一個阻塞式的隊列,LinkedBlockingQueue保存元素的是一個鏈表。其內部有一個Node的內部類,其中有一個成員變量 Node next。就這樣形成了一個鏈表的結構,要獲取下一個元素,只要調用next就可以了。而ArrayBlockingQueue則是一個數組
阻塞隊列的特征:
* This queue orders elements FIFO (first-in-first-out).
區別:
1)LinkedBlockingQueue內部讀寫(插入獲取)各有一個鎖,而ArrayBlockingQueue則讀寫共享一個鎖。
2)吞吐量,在源碼中有一段說明:
* Linked queues typically have higher throughput than array-based queues but
* less predictable performance in most concurrent applications.
翻譯一下:
LinkedBlockingQueue比ArrayBlockingQueue有更高的吞吐量,但是性能表現更難預測(也就是說相比ArrayBlockingQueue性能表現不穩定,實際也很穩定了)
3)LinkedBlockingQueue創建時,默認會直接創建一個Integer.MAX_VALUE的數組,當插入少,讀取多時,就會造成很大的空間浪費。但是Node節點的創建是根據需要動態創建的。
* <p>The optional capacity bound constructor argument serves as a
* way to prevent excessive queue expansion. The capacity, if unspecified,
* is equal to {@link Integer#MAX_VALUE}. Linked nodes are
* dynamically created upon each insertion unless this would bring the
* queue above capacity.
*
線程池對拒絕任務的處理策略:DiscardPolicy
拒絕任務是指當線程池里面的線程數量達到 maximumPoolSize 且 workQueue 隊列已滿的情況下被嘗試添加進來的任務
在 ThreadPoolExecutor 里面定義了 4 種 handler 策略,分別是
1. CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重復調用 execute() 方法,直到成功。
這個策略的問題是當線程池不夠的時候,會中斷主線程,主線程沒有長連接,就會建立了一個新的長連接去處理這個異步任務,那么主線程就不會處理正常的業務了
假設隊列大小為 10,corePoolSize 為 3,maximumPoolSize 為 6,那么當加入 20 個任務時,執行的順序就是這樣的:首先執行任務 1、2、3,然后任務 4~13 被放入隊列。這時候隊列滿了,任務 14、15、16 會被馬上執行,最終順序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13
2. AbortPolicy :對拒絕任務拋棄處理,並且拋出異常。
3. DiscardPolicy :對拒絕任務直接無聲拋棄,沒有異常信息。
4. DiscardOldestPolicy :對拒絕任務不拋棄,而是拋棄隊列里面等待最久的一個線程,然后把拒絕任務加到隊列。
一個任務通過 execute(Runnable) 方法被添加到線程池,任務就是一個 Runnable 類型的對象,任務的執行方法就是 Runnable 類型對象的 run() 方法。
當一個任務通過 execute(Runnable) 方法添加到線程池時,線程池采用的策略如下:
1. 如果此時線程池中的數量小於 corePoolSize ,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2. 如果此時線程池中的數量等於 corePoolSize ,但是緩沖隊列 workQueue 未滿,那么任務被放入緩沖隊列。
3. 如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量小於maximumPoolSize ,建新的線程來處理被添加的任務。
4. 如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量等於maximumPoolSize ,那么通過 handler 所指定的策略來處理此任務。
處理任務的優先級為:
核心線程 corePoolSize 、任務隊列 workQueue 、最大線程 maximumPoolSize ,如果三者都滿了,使用 handler處理被拒絕的任務。當線程池中的線程數量大於 corePoolSize 時,如果某線程空閑時間超過 keepAliveTime ,線程將被終止。這樣,線程池可以動態的調整池中的線程數。