Activiti6作業執行器Job Executor配置(學習筆記)


內容概況:

 

異步執行配置相關:

 

 asyncExecutorActivate:這個屬性是激活作業執行器,它的默認參數是false,只有設為true,activiti啟動的時候才會開啟線程池去掃描定時操作的任務

asyncExecutorXXX:這些屬性的操作都是基於asyncExecutor這樣一個前綴,后面有各種類型的屬性配置,(其實里面的屬性配置大多都是與線程池、隊列相關的配置)

(很重要的配置)asyncExecutor:asyncExecutor的bean的配置,它本身是一個接口,用它也可以配置自定義的線程池。

 

如何自定義一個線程池?

 

 在設置了核心線程數的情況下,比如設為了5,那么在開啟了五個線程之后,有新的任務來了之后,回去檢測核心線程數是否都在執行任務中,如果是,那么新的請求就會放在隊列里去等待,

等到核心線程中有一個線程執行完了自己的任務,那么排在隊列最前面的這個請求,回去獲取這個線程,然后去執行。那么當隊列一直有新的請求加入時,負載過大,超過了隊列大小的時候,最大線程數就會起作用了,新建線程至最大線程數,來一起執行超過容量的新請求。  當三個都滿了以后,就會拒絕服務、請求。 


 

作業執行器的配置:

這里要說明的是R5/P1DT1H, / 后面的指時間間隔,1D表示一天,是24個小時,而1H表示一小時加起來就是25個小時。R5是指執行次數為5,這個屬性的含義是:在流程啟動時開始計時,25小時后第一次執行這個指定事件, 並且每隔25小時執行一次,總共執行次數為5次(ps:包含第一次執行在內)。


流程定義文件my-process_job.bpmn20.xml的改變:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <bean id="processEngineConfiguration"
          class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
        <!-- 給引擎設置自定義的commandInvoker -->
        <!--<property name="commandInvoker" ref="commandInvoker" />-->
        <!-- 若為true則開啟記錄事件、節點的狀態,完成后將完成狀態插入數據庫,若為false則關閉,不記錄 -->
        <property name="enableDatabaseEventLogging" value="true"/>

        <!--打開異步激活器激活異步,如果不配置線程池就會使用它的默認線程池-->
        <property name="asyncExecutorActivate" value="true"/>
        <!--如果使用我們自己定義的線程池,需要先定義一個執行器-->
        <property name="asyncExecutor" ref="asyncExecutor" />
        <!-- 配置事件監聽器 -->
        <property name="eventListeners">
            <list>
                <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/>
            </list>
        </property>
    </bean>
        <!-- 執行器默認使用DefaultAsyncJobExecutor -->
        <bean id="asyncExecutor"
              class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
            <!-- 需要配置一個服務 基於spring去配置它 -->
            <property name="executorService" ref="executorService"/>
        </bean>
        <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
            <property name="threadNamePrefix" value="activiti-job-"/>
            <property name="corePoolSize" value="10"/>
            <property name="maxPoolSize" value="20"/>
            <property name="queueCapacity" value="100"/>
            <!-- 設置當線程池滿了時候的拒絕策略,這里是使用的默認策略,拋出異常 -->
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
            </property>
        </bean>
    <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/>

</beans>

設置一個監聽器JobEventListener,以便測試觀察:

public class JobEventListener implements ActivitiEventListener {
    private static final Logger LOGGER =  LoggerFactory.getLogger(JobEventListener.class);

    //簡單的完成一下監聽器的效果
    @Override
    public void onEvent(ActivitiEvent event) {
        ActivitiEventType eventType = event.getType();
        String name = eventType.name();

        if (name.startsWith("TIMER") || name.startsWith("JOB")){
            LOGGER.info("監聽到job事件 {} \t {}",eventType,event.getProcessInstanceId());
        }
    }

    @Override
    public boolean isFailOnException() {
        return false;
    }
}

編寫activiti配置文件activiti_job.cfg.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <bean id="processEngineConfiguration"
          class="org.activiti.engine.impl.cfg.StandaloneInMemProcessEngineConfiguration">
        <!-- 給引擎設置自定義的commandInvoker -->
        <!--<property name="commandInvoker" ref="commandInvoker" />-->
        <!-- 若為true則開啟記錄事件、節點的狀態,完成后將完成狀態插入數據庫,若為false則關閉,不記錄 -->
        <property name="enableDatabaseEventLogging" value="true"/>

        <!--打開異步激活器激活異步,如果不配置線程池就會使用它的默認線程池-->
        <property name="asyncExecutorActivate" value="true"/>
        <!--如果使用我們自己定義的線程池,需要先定義一個執行器-->
        <property name="asyncExecutor" ref="asyncExecutor" />
        <!-- 配置事件監聽器 -->
        <property name="eventListeners">
            <list>
                <bean class="com.yy.avtiviti.helloworld.event.JobEventListener"/>
            </list>
        </property>
    </bean>
        <!-- 執行器默認使用DefaultAsyncJobExecutor -->
        <bean id="asyncExecutor"
              class="org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor">
            <!-- 需要配置一個服務 基於spring去配置它 -->
            <property name="executorService" ref="executorService"/>
        </bean>
        <bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
            <property name="threadNamePrefix" value="activiti-job-"/>
            <property name="corePoolSize" value="10"/>
            <property name="maxPoolSize" value="20"/>
            <property name="queueCapacity" value="100"/>
            <!-- 設置當線程池滿了時候的拒絕策略,這里是使用的默認策略,拋出異常 -->
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
            </property>
        </bean>
    <bean id="commandInvoker" class="com.yy.avtiviti.helloworld.intercept.MDCCommandInvoker"/>

</beans>

編寫測試類configJobTest :

public class configJobTest {
    private static final Logger LOGGER =  LoggerFactory.getLogger(configTest.class);

    @Rule
    public ActivitiRule activitiRule = new ActivitiRule("activiti_job.cfg.xml");//傳入自定義的mdc配置文件

    @Test
    @Deployment(resources = {"my-process_job.bpmn20.xml"})//流程定義文件
    public void test() throws InterruptedException {
        //這里流程定義文件里設置了定時任務在一定事件內啟動五次,所以不需要自行啟動了。這里啟動的代碼就可以不要了。
        //這里記錄一下時間,看下流程每次啟動時間與結束時間
        LOGGER.info("start");

        //在流程定義文件初始化以后,就開始定時啟動了。
        //那么我應該要查詢一下在這段時間內有多少定時任務去執行
        List<Job> jobList = activitiRule
                .getManagementService()
                .createTimerJobQuery()
                .listPage(0, 100);
        for (Job job : jobList) {
            LOGGER.info("定時任務 {} ,默認重復次數 {}",job,job.getRetries());
        }
        LOGGER.info("jobList.size = {}",jobList.size());
        //因為主線程很快就能執行完,而定時任務還沒有執行,所以讓線程等待一下
        Thread.sleep(1000*10);
        LOGGER.info("end");

    }
}

測試結果如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM