內容概況:
異步執行配置相關:
asyncExecutorActivate:這個屬性是激活作業執行器,它的默認參數是false,只有設為true,activiti啟動的時候才會開啟線程池去掃描定時操作的任務
asyncExecutorXXX:這些屬性的操作都是基於asyncExecutor這樣一個前綴,后面有各種類型的屬性配置,(其實里面的屬性配置大多都是與線程池、隊列相關的配置)
(很重要的配置)asyncExecutor:asyncExecutor的bean的配置,它本身是一個接口,用它也可以配置自定義的線程池。
如何自定義一個線程池?
在設置了核心線程數的情況下,比如設為了5,那么在開啟了五個線程之后,有新的任務來了之后,回去檢測核心線程數是否都在執行任務中,如果是,那么新的請求就會放在隊列里去等待,
等到核心線程中有一個線程執行完了自己的任務,那么排在隊列最前面的這個請求,回去獲取這個線程,然后去執行。那么當隊列一直有新的請求加入時,負載過大,超過了隊列大小的時候,最大線程數就會起作用了,新建線程至最大線程數,來一起執行超過容量的新請求。 當三個都滿了以后,就會拒絕服務、請求。
作業執行器的配置:
這里要說明的是R5/P1DT1H, / 后面的指時間間隔,1D表示一天,是24個小時,而1H表示一小時加起來就是25個小時。R5是指執行次數為5,這個屬性的含義是:在流程啟動時開始計時,25小時后第一次執行這個指定事件, 並且每隔25小時執行一次,總共執行次數為5次(ps:包含第一次執行在內)。
流程定義文件my-process_job.bpmn20.xml的改變:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration"> <!-- 給引擎設置自定義的commandInvoker --> <!--<property name="commandInvoker" ref="commandInvoker" />--> <!-- 若為true則開啟記錄事件、節點的狀態,完成后將完成狀態插入數據庫,若為false則關閉,不記錄 --> <property name="enableDatabaseEventLogging" value="true"/> <!--打開異步激活器激活異步,如果不配置線程池就會使用它的默認線程池--> <property name="asyncExecutorActivate" value="true"/> <!--如果使用我們自己定義的線程池,需要先定義一個執行器--> <property name="asyncExecutor" ref="asyncExecutor" /> <!-- 配置事件監聽器 --> <property name="eventListeners"> <list> <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/> </list> </property> </bean> <!-- 執行器默認使用DefaultAsyncJobExecutor --> <bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor"> <!-- 需要配置一個服務 基於spring去配置它 --> <property name="executorService" ref="executorService"/> </bean> <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> <property name="threadNamePrefix" value="activiti-job-"/> <property name="corePoolSize" value="10"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <!-- 設置當線程池滿了時候的拒絕策略,這里是使用的默認策略,拋出異常 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/> </property> </bean> <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
設置一個監聽器JobEventListener,以便測試觀察:
public class JobEventListener implements ActivitiEventListener { private static final Logger LOGGER = LoggerFactory.getLogger(JobEventListener.class); //簡單的完成一下監聽器的效果 @Override public void onEvent(ActivitiEvent event) { ActivitiEventType eventType = event.getType(); String name = eventType.name(); if (name.startsWith("TIMER") || name.startsWith("JOB")){ LOGGER.info("監聽到job事件 {} \t {}",eventType,event.getProcessInstanceId()); } } @Override public boolean isFailOnException() { return false; } }
編寫activiti配置文件activiti_job.cfg.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="processEngineConfiguration" class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration"> <!-- 給引擎設置自定義的commandInvoker --> <!--<property name="commandInvoker" ref="commandInvoker" />--> <!-- 若為true則開啟記錄事件、節點的狀態,完成后將完成狀態插入數據庫,若為false則關閉,不記錄 --> <property name="enableDatabaseEventLogging" value="true"/> <!--打開異步激活器激活異步,如果不配置線程池就會使用它的默認線程池--> <property name="asyncExecutorActivate" value="true"/> <!--如果使用我們自己定義的線程池,需要先定義一個執行器--> <property name="asyncExecutor" ref="asyncExecutor" /> <!-- 配置事件監聽器 --> <property name="eventListeners"> <list> <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/> </list> </property> </bean> <!-- 執行器默認使用DefaultAsyncJobExecutor --> <bean id="asyncExecutor" class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor"> <!-- 需要配置一個服務 基於spring去配置它 --> <property name="executorService" ref="executorService"/> </bean> <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> <property name="threadNamePrefix" value="activiti-job-"/> <property name="corePoolSize" value="10"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <!-- 設置當線程池滿了時候的拒絕策略,這里是使用的默認策略,拋出異常 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/> </property> </bean> <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/> </beans>
編寫測試類configJobTest :
public class configJobTest { private static final Logger LOGGER = LoggerFactory.getLogger(configTest.class); @Rule public ActivitiRule activitiRule = new ActivitiRule("activiti_job.cfg.xml");//傳入自定義的mdc配置文件 @Test @Deployment(resources = {"my-process_job.bpmn20.xml"})//流程定義文件 public void test() throws InterruptedException { //這里流程定義文件里設置了定時任務在一定事件內啟動五次,所以不需要自行啟動了。這里啟動的代碼就可以不要了。 //這里記錄一下時間,看下流程每次啟動時間與結束時間 LOGGER.info("start"); //在流程定義文件初始化以后,就開始定時啟動了。 //那么我應該要查詢一下在這段時間內有多少定時任務去執行 List<Job> jobList = activitiRule .getManagementService() .createTimerJobQuery() .listPage(0, 100); for (Job job : jobList) { LOGGER.info("定時任務 {} ,默認重復次數 {}",job,job.getRetries()); } LOGGER.info("jobList.size = {}",jobList.size()); //因為主線程很快就能執行完,而定時任務還沒有執行,所以讓線程等待一下 Thread.sleep(1000*10); LOGGER.info("end"); } }
測試結果如下: