分布式調度框架TBSchedule使用方法


一、TBSchedule簡介

TBSchedule是來自淘寶的分布式調度開源框架,基於Zookeeper純Java實現,其目的是讓一種批量任務或者不斷變化的任務,能夠被動態的分配到多個主機的JVM中的不同線程組中並行執行。所有的任務能夠被不重復,不遺漏的快速處理。這種框架任務的分配通過分片實現了不重復調度,又通過架構中Leader的選擇,存活的自我保證,完成了可用性和伸縮性的保障。
TBSchedule源碼地址:http://code.taobao.org/p/tbschedule/src/

二、開發環境

  1. WIN10,也可換為Linux
  2. JDK 1.7
  3. Tomcat 8.5
  4. 安裝zookeeper

三、配置步驟

1.安裝zookeeper

(1)下載zookeeper

http://zookeeper.apache.org/releases.html

下載3.4.11版本:

http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

 

(2)解壓至c:/prog/zookeeper/zookeeper-3.4.11

復制conf下的zoo_sample.cfg為zoo.cfg

修改dataDir為:

dataDir=/prog/zookeeper/data

 

tickTime單位為毫秒,為心跳間隔和最小的心跳超時間隔

clientPort是監聽客戶端連接的端口,默認為2181

 

(3)創建目錄:c:/prog/zookeeper/data

 

2.啟動zookeeper

運行bin/zkServer.cmd

如果在Linux下,則執行:

[root@192.168.1.5]$ ./zkServer start

 

3.下載TBSchedule

采用svn來Checkout TBSchedule

svn地址:http://code.taobao.org/svn/tbschedule/

 

4.在Eclipse中導入項目:

右鍵工程區域(Package Explorer)->Import...->Maven-Existing Maven Projects

 

注意:TBSchedule編碼為GBK,但引用TBSchedule的工程編碼為UTF-8時,此處也要將TBSchedule工程的編碼設置為UTF-8。

 

5.安裝Tomcat

(1)下載Tomcat

地址:https://tomcat.apache.org/download-80.cgi#8.5.27

 

(2)解壓Tomcat 8.5至c:\prog\tomcat\apache-tomcat-8.5.11

 

6.配置TBSchedule控制台

(1)將TBSchedule工程中的console\ScheduleConsole.war拷貝至tomcat/webapps中

 

(2)啟動tomcat

 

(3)瀏覽器中打開:

http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true

點擊保存會提示:

錯誤信息:Zookeeper connecting ......localhost:2181

 

如配置正確則可以忽略上述提示,直接進入“管理主頁...”。

 

7.查看zookeeper中節點

運行zookeeper下的bin/zkClient.cmd

輸入ls /app-schedule/demo,顯示:

[strategy, baseTaskType, factory]

 

說明已經創建znode成功。

 

查看TBSchedule控制台中的“Zookeeper數據”,也能看到相同數據。

 

8.在項目中使用TBSchedule

Eclipse中新建一個maven工程tbsdemo

GroupId:com.jf

Artifact Id:tbsdemo

 

9.在pom.xml中引入Spring、TBSchedule、Zookeeper

pom.xml內容為:

 

   <modelVersion> 4.0 . 0 </modelVersion>
  
   <groupId>com.jf</groupId>
   <artifactId>tbsdemo</artifactId>
   <version> 0.0 . 1 -SNAPSHOT</version>
   <packaging>jar</packaging>
  
   <name>tbsdemo</name>
   <url>http: //maven.apache.org</url>
  
   <properties>
     <project.build.sourceEncoding>UTF- 8 </project.build.sourceEncoding>
<!-- spring版本號 -->
     <spring.version> 4.0 . 5 .RELEASE</spring.version>
<!-- mybatis版本號 -->
     <mybatis.version> 3.3 . 0 </mybatis.version>
<!-- log4j日志文件管理包版本 -->
     <slf4j.version> 1.7 . 7 </slf4j.version>
     <log4j.version> 1.2 . 17 </log4j.version>
   </properties>
  
   <dependencies>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version> 4.11 </version>
       <scope>test</scope>
     </dependency>
     <!-- spring核心包 -->
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
         <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
     </dependency>
     <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version> 3.4 . 11 </version>
     </dependency>
     <dependency>
        <groupId>com.taobao.pamirs.schedule</groupId>
         <artifactId>tbschedule</artifactId>
         <version> 3.3 . 3.2 </version>
     </dependency>
   </dependencies>
</project>

 

 

10.在src/main/resources下創建applicationContext.xml,輸入:

 

<?xml version= "1.0"  encoding= "UTF-8" ?>
     xsi:schemaLocation="http: //www.springframework.org/schema/beans 
                         http: //www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                         http: //www.springframework.org/schema/context 
                         http: //www.springframework.org/schema/context/spring-context-4.0.xsd">
  
     <context:component-scan base- package = "com.jf"  />
     <!-- 引入配置文件 -->
     <bean id= "propertyConfigurer"
        class = "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
     <property name= "locations" >
            <list>
               <value>classpath:tbschedule.properties</value>
            </list>
        </property>
     </bean>
     <bean id= "scheduleManagerFactory"     class = "com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
        init-method= "init" >
        <property name= "zkConfig" >
            <map>
               <entry key= "zkConnectString"  value= "${schedule.zookeeper.address}"  />
               <entry key= "rootPath"  value= "${schedule.root.catalog}"  />
               <entry key= "zkSessionTimeout"  value= "${schedule.timeout}"  />
               <entry key= "userName"  value= "${schedule.username}"  />
               <entry key= "password"  value= "${schedule.password}"  />
               <entry key= "isCheckParentPath"  value= "true"  />
            </map>
        </property>
     </bean>
</beans>

 

 

11.創建TBSchedule配置文件

在src/main/resources/中創建tbschedule.propertie

輸入:

 

#注冊中心地址
schedule.zookeeper.address=localhost: 2181
#定時任務根目錄,任意指定,調度控制台配置時對應
schedule.root.catalog=/app-schedule/demo
#賬戶,任意指定,調度控制台配置時對應
schedule.username=admin
#密碼,任意指定,調度控制台配置時對應
schedule.password=password
#超時配置
schedule.timeout= 60000

 

 

注意schedule.username、schedule.password要與TBSchedule控制台中設置的一致。

 

12.創建任務數據類TaskModel:

 

package  com.jf.tbsdemo.pojo;
  
public  class  TaskModel {
     private  long  id;
     private  String taskInfo;
     public  TaskModel( long  id, String taskInfo) {
        this .id = id;
        this .taskInfo = taskInfo;
     }
     public  long  getId() {
        return  id;
     }
     public  void  setId( long  id) {
        this .id = id;
     }
     public  String getTaskInfo() {
         return  taskInfo;
     }
     public  void  setTaskInfo(String taskInfo) {
        this .taskInfo = taskInfo;
     }
}

 

 

13.創建任務處理類IScheduleTaskDealSingleTest:

注意:任務處理分單任務和多任務(批處理),分別實現IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法參數只有一個任務T,而后者的execute()方法參數為List<T>,本文使用單任務模式。

 

 

package  com.jf.tbsdemo;
  
import  java.util.ArrayList;
import  java.util.Comparator;
import  java.util.Date;
import  java.util.List;
  
import  org.apache.log4j.Logger;
import  org.springframework.stereotype.Component;
  
import  com.jf.tbsdemo.pojo.TaskModel;
import  com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import  com.taobao.pamirs.schedule.TaskItemDefine;
  
public  class  IScheduleTaskDealSingleTest  implements  IScheduleTaskDealSingle<TaskModel> {
     private  static  final  Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest. class );
  
     public  Comparator<TaskModel> getComparator() {
         return  null ;
     }
  
     public  List<TaskModel> selectTasks(String taskParameter, String ownSign,  int  taskQueueNum,
             List<TaskItemDefine> taskItemList,  int  eachFetchDataNum)  throws  Exception {
  
         logger.info( "IScheduleTaskDealSingleTest選擇任務列表開始.........." );
         List<TaskModel> models =  new  ArrayList<TaskModel>();
         models.add( new  TaskModel( 1 "task1" ));
         models.add( new  TaskModel( 2 "task2" ));
  
         return  models;
     }
  
     public  boolean  execute(TaskModel model, String ownSign)  throws  Exception {
         logger.info( "IScheduleTaskDealSingleTest執行開始.........."  new  Date());
         logger.info( "任務"  + model.getId() +  ",內容:" + model.getTaskInfo());
         return  true ;
     }
}

 

 

其中,selectTasks()方法負責取得要處理的任務信息,execute()方法為處理任務的方法。selectTasks()方法可以理解為生產者,execute()方法可以理解為消費者。

 

14.創建主程序類TaskCenter:

 

package  com.jf.tbsdemo;
  
import  org.apache.log4j.Logger;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.FileSystemXmlApplicationContext;
  
public  class  TaskCenter {
     private  static  final  Logger logger = Logger.getLogger(TaskCenter. class );
  
     public  static  void  main(String[] args)  throws  Exception {
        // 初始化Spring
        ApplicationContext ctx =  new  FileSystemXmlApplicationContext( "classpath:applicationContext.xml" );
        logger.info( "---------------task start------------------" );
     }
}

 

 

15.在Eclipse中運行主程序類TaskCenter

 

16.在TBSchedule中創建任務:

(1)進入TBSchedule的控制台->任務管理

點擊“創建新任務…”

 

(2)配置任務屬性:

  • 在任務處理的SpringBean中輸入:iScheduleTaskDealSingleTest
  • 處理模式分為:SLEEP、NOTSLEEP,其中SLEEP模式是指當一個線程處理完任務后在任務池中取不到其他任務時,會檢查其他線程是否活動,如果是則自己休眠,否則說明自己是最后一位,則調用業務接口取得待處理的任務放入任務池,並喚醒其他線程處理。

NOTSLEEP模式下線程在任務池中取不到任務時,將立即調用業務接口獲取待處理的任務。

SLEEP模式較為簡單,因為取任務的線程同一時間只有一個,不易發生沖突,效率也會較低。NOTSLEEP模式開銷較大,也要防止發生重復獲取相同任務。

  • 設置執行開始時間結束時間:與Crontab格式一致,在本時間段內任務才會執行。
  • 添加任務項:

0,1,2,3,4,5,6,7,8,9

 

17.在TBSchedule中創建調度策略:

(1)進入TBSchedule的控制台->調度策略

點擊“創建新策略…”

 

(2)填寫策略屬性:

注意任務名稱要與新建的任務名稱一致。

 

 

(3)點擊創建,將立即啟動調度任務

 

另外,除了在控制台中配置調度策略、任務,還可以通過通過代碼、Spring配置來設置任務調度參數,推薦采用Spring配置方式。

 

18.代碼方式

創建類TaskCenter:

 

 

package  com.jf.tbsdemo;
  
import  java.util.Properties;
  
import  javax.annotation.Resource;
  
import  org.apache.log4j.Logger;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.FileSystemXmlApplicationContext;
  
import  com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import  com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
import  com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
  
public  class  TaskCenter {
     private  static  final  Logger logger = Logger.getLogger(TaskCenter. class );
  
     // 初始化調度工廠
     @Resource
     TBScheduleManagerFactory scheduleManagerFactory =  new  TBScheduleManagerFactory();
     private  void  startTask() {
        // 初始化Spring
        ApplicationContext ctx =  new  FileSystemXmlApplicationContext( "classpath:applicationContext.xml" );
  
  
        Properties p =  new  Properties();
        p.put( "zkConnectString" "localhost:2181" );
        p.put( "rootPath" "/app-schedule/demo" );
        p.put( "zkSessionTimeout" "60000" );
        p.put( "userName" "admin" );
        p.put( "password" "password" );
        p.put( "isCheckParentPath" "true" );
  
        scheduleManagerFactory.setApplicationContext(ctx);
  
        try  {
            scheduleManagerFactory.init(p);
    
            // 創建任務調度任務的基本信息
            String baseTaskTypeName =  "DemoTask" ;
            ScheduleTaskType baseTaskType =  new  ScheduleTaskType();
            baseTaskType.setBaseTaskType(baseTaskTypeName);
            baseTaskType.setDealBeanName( "demoTaskBean" );
            baseTaskType.setHeartBeatRate( 10000 );
            baseTaskType.setJudgeDeadInterval( 100000 );
            baseTaskType.setTaskParameter( "AREA=BJ,YEAR>30" );
            baseTaskType.setTaskItems(ScheduleTaskType
                   .splitTaskItem( "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
                          "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
                          "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}" ));
            baseTaskType.setFetchDataNumber( 500 );
            baseTaskType.setThreadNumber( 5 );
            scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
            logger.info( "創建調度任務成功:"  + baseTaskType.toString());
    
            // 創建任務的調度策略
            String taskName = baseTaskTypeName;
            String strategyName = taskName +  "-Strategy" ;
            try  {
                scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,  true );
            catch  (Exception e) {
               e.printStackTrace();
            }
            ScheduleStrategy strategy =  new  ScheduleStrategy();
            strategy.setStrategyName(strategyName);
            strategy.setKind(ScheduleStrategy.Kind.Schedule);
            strategy.setTaskName(taskName);
            strategy.setTaskParameter( "china" );
    
            strategy.setNumOfSingleServer( 1 );
            strategy.setAssignNum( 10 );
            strategy.setIPList( "127.0.0.1" .split( "," ));
            scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
  
            logger.info( "創建調度策略成功:"  + strategy.toString());
  
            logger.info( "---------------task start------------------" );
        catch (Exception e) {
            logger.error( "出現異常" , e);
        }
     }
  
     public  static  void  main(String[] args)  throws  Exception {
        TaskCenter taskCenter =  new  TaskCenter();
        taskCenter.startTask();
     }
}

 

 

19.Spring配置文件方式

(1)增加類AbstractBaseScheduleTask:

 

package  com.jf.tbsdemo;
  
import  com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import  com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import  com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
  
public  abstract  class  AbstractBaseScheduleTask<T>  implements  IScheduleTaskDealSingle<T> {
     /**
      * 調度任務的配置
      */
     private  ScheduleTaskType scheduleTaskType;
     /**
      * 調度策略的配置
      */
     private  ScheduleStrategy scheduleStrategy;
  
     public  ScheduleTaskType getScheduleTaskType() {
         return  scheduleTaskType;
     }
  
     public  void  setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
         this .scheduleTaskType = scheduleTaskType;
     }
  
     public  ScheduleStrategy getScheduleStrategy() {
         return  scheduleStrategy;
     }
  
     public  void  setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
         this .scheduleStrategy = scheduleStrategy;
     }
}

 

 

(2)修改IScheduleTaskDealSingleTest:

類聲明改為:

public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {

 

(3)在applicationContext.xml中對聲明IScheduleTaskDealSingleTest的Bean並注入參數,內容為:

 

 

<?xml version= "1.0"  encoding= "UTF-8" ?>
     xsi:schemaLocation="http: //www.springframework.org/schema/beans 
                         http: //www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                         http: //www.springframework.org/schema/context 
                         http: //www.springframework.org/schema/context/spring-context-4.0.xsd">
  
     <context:component-scan base- package = "com.jf"  />
     <!-- 引入配置文件 -->
     <bean id= "propertyConfigurer"
        class = "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
        <property name= "locations" >
            <list>
               <value>classpath:tbschedule.properties</value>
            </list>
        </property>
     </bean>
  
     <!--tbschedule管理器初始化(配置zookeeper,注冊調度任務和調度策略)-->
<bean id= "systemTBScheduleManagerFactory"  class = "com.jf.tbsdemo.SystemTBScheduleManagerFactory" >
    <property name= "zkConfig" >
             <map>
                 <entry key= "zkConnectString"  value= "${schedule.zookeeper.address}"  />
               <entry key= "rootPath"  value= "${schedule.root.catalog}"  />
               <entry key= "zkSessionTimeout"  value= "${schedule.timeout}"  />
               <entry key= "userName"  value= "${schedule.username}"  />
               <entry key= "password"  value= "${schedule.password}"  />
               <entry key= "isCheckParentPath"  value= "true"  />
             </map>
         </property>
     </bean>
    
<bean name= "scheduleTaskType"  class = "com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType" >
<!-- 心跳頻率(毫秒) -->
         <property name= "heartBeatRate"  value= "5000"  />
         <!-- 假定服務死亡間隔(毫秒) -->
    <property name= "judgeDeadInterval"  value= "60000"  />
     <!-- 處理模式 -->
         <property name= "processorType"  value= "SLEEP"  />
         <!-- 線程數 -->
         <property name= "threadNumber"  value= "5"  />
<!--允許執行的開始時間-->
         <property name= "permitRunStartTime"  value= ""  />
         <!--允許執行的結束時間-->
         <property name= "permitRunEndTime"  value= ""   />
<!--當沒有數據的時候,休眠的時間-->
<property name= "sleepTimeNoData"  value= "3000"   />
<!--在每次數據處理完后休眠的時間-->
<property name= "sleepTimeInterval"  value= "1000"   />
<!--每次獲取數據的數量-->
<property name= "fetchDataNumber"  value= "10"   />
<!--任務項數組-->
         <property name= "taskItems" >
             <list>
                 <value> 0 :{TYPE=A,KIND= 1 }</value>
                 <value> 1 :{TYPE=B,KIND= 2 }</value>
                 <value> 2 :{TYPE=C,KIND= 3 }</value>
             </list>
</property>
     </bean>
     <bean name= "scheduleStrategy"  class = "com.taobao.pamirs.schedule.strategy.ScheduleStrategy" >
         <!--最大線程組數量-->
         <property name= "assignNum"  value= "9"  />
         <!--單個機器(JVM)的線程組數量-->
         <property name= "numOfSingleServer"  value= "3"  />
         <!--策略運行的機器(JVM)IP-->
         <property name= "IPList" >
             <list>
                 <value> 127.0 . 0.1 </value>
             </list>
         </property>
</bean>
<!--任務simpleTask-->
     <bean id= "simpleTask"  class = "com.jf.tbsdemo.IScheduleTaskDealSingleTest"  >
         <property name= "scheduleTaskType"  ref= "scheduleTaskType"  />
         <property name= "scheduleStrategy"  ref= "scheduleStrategy"  />
     </bean>
</beans>

 

 

(4)打開控制台,刪除所有已有的任務、調度策略,再啟動TaskCenter,刷新頁面則可看到當前出現了正在運行的任務和調度策略。

 

(5)也可以在控制台中修改策略,但重啟TaskCenter之后會恢復Spring中的配置信息。

 

20.數據分片方法

為了避免TBSchedule管理的多線程重復處理數據,需要采用分片,實現方法如下:

(1)在selectTasks()方法中實現分片獲取待處理數據

(2) selectTasks()方法的taskItemList參數為當前線程分配到的可處理任務分片信息,全部任務分片信息由配置文件中的taskItem定義,每項任務信息為TaskItemDefine類型,其中taskItemId標明了分片ID(即0,1,2),parameter為自定義參數(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。

(3)根據上面算出的分片ID來取得相應的待處理任務,例如selectTasks()方法從數據庫中獲取待處理的交易請求記錄,可以將記錄的主鍵或者其他字段HashCode值的余數作為分片ID,在selectTasks()方法中只獲取與taskItemList中指定分片ID相同的任務,避免不同線程重復獲取同一任務。

(4)在系統運行過程中,線程數量會有所變化,因此要在每個selectTasks()方法執行開始先獲取taskItemList。

(5) 每次執行selectTasks()方法取得記錄條數不要超過eachFetchDataNum

(6)典型的分片代碼實現:

 

/**
  * 根據條件,查詢當前調度服務器可處理的任務
  * @param taskParameter 任務的自定義參數
  * @param ownSign 當前環境名稱
  * @param taskItemNum 當前任務類型的任務隊列數量
  * @param taskItemList 當前調度服務器,分配到的可處理隊列
  * @param eachFetchDataNum 每次獲取數據的數量
  * @return
  * @throws Exception
  */
public  List<Date> selectTasks(String taskParameter, String ownSign,  int  taskItemNum, List<TaskItemDefine> taskItemList,  int  eachFetchDataNum)  throws  Exception {
     List<Date> dateList =  new  ArrayList<>();
  
     List<Long> taskIdList =  new  ArrayList<>();
     for (TaskItemDefine t : taskItemList){  //確定當前任務處理器需處理的任務項id
         taskIdList.add(Long.valueOf(t.getTaskItemId()));
     }
  
for ( int  i= 0 ;i<eachFetchDataNum;i++){  // 添加最多指定數量的待處理數據
     Date date =  new  Date();  //生成待處理數據
         Long remainder = date.getTime() % taskItemNum ;
         if (taskIdList.contains(remainder)){   //根據數據取模,判斷當前待處理數據,是否應由當前任務處理器處理
             dateList.add(date);
         }
         TimeUnit.SECONDS.sleep( 1 );
     }
     return  dateList;   //返回當前任務處理器需要處理的數據
}

 

 

21.參數說明

(1)zookeeper參數

zkConnectString:zookeeper注冊中心地址

rootPath:定時任務根目錄,任意指定,調度控制台配置時對應

zkSessionTimeout:超時時間

userName:賬戶,任意指定,調度控制台配置時對應

password:密碼,任意指定,調度控制台配置時對應

isCheckParentPath:設置為true會檢查上級目錄是否已經被用作TBSchedule調度,如果是則啟動任務失敗。

(2)任務參數:

heartBeatRate:心跳頻率(毫秒)

judgeDeadInterval:假定服務死亡間隔(毫秒)

sleepTimeNoData: 當沒有數據的時候,休眠的時間

sleepTimeInterval:在每次數據處理完后休眠的時間

processorType:處理模式,可為SLEEP或NOTSLEEP。

permitRunStartTime:執行開始時間如果為空則不定時,直接執行。

permitRunEndTime:執行結束時間,與執行開始時間之間的時間才可以執行任務。

taskItems: 任務項數組,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}

在調度過程中,某線程分得了獲取數據的任務,假設獲取第1項任務,則在selectTasks()方法的taskItemList參數中包含第1項任務的信息,TaskItemDefine類型,包含:taskItemId、parameter成員變量,分別為:1、{TYPE=B,KIND=2}。可根據該信息取得相應的數據。

fetchDataNumber:selectTasks()方法每次獲取數據的數量

executeNumber:每次執行數量,即execute()方法每次取得的任務數量,只在bean實現IScheduleTaskDealMulti才生效。

threadNumber:每個線程組中的線程數

maxTaskItemsOfOneThreadGroup:每一組線程能分配的最大任務數量,避免在隨着機器的減少把正常的服務器壓死,0或者空表示不限制

taskParameter:任務的自定義參數,可作為selectTasks中的參數傳入。

(3)調度策略參數:

strategyName:策略名稱,必須填寫,不能有中文和特殊字符。

kind:任務類型,Schedule,Java,Bean 大小寫敏感。

taskName:要配置調度策略的任務名稱,與這一任務配置的名稱要一致。

taskParameter:任務參數,逗號分隔的Key-Value。對任務類型為Java、Bean的有效,對任務類型為Schedule的無效,需要通過任務管理來配置。

assignNum:最大線程組數量,是所有機器(JVM)總共運行的線程組的最大數量。

numOfSingleServer單個機器(JVM)的線程組數量,如果是0,則表示無限制。

IPList:策略運行的機器(JVM)IP列表,127.0.0.1或者localhost會在所有機器上運行。

 

四、注意事項

  1. 如果分配給某線程的任務還未執行完,重啟該線程所屬進程后,這些任務將會丟失,因此要自行實現冪等,且不要直接kill進程,而是發消息通知各線程執行完畢后安全退出。

當在控制台點擊停止任務的按鈕時。會將任務池中未處理的任務清除,而停止前的在處理的任務將繼續執行。

  1. 如果要設置任務間隔一定時間運行一次,假設為10秒,可以將permitRunEndTime、permitRunStartTime設置為空,將sleepTimeNoData、sleepTimeInterval均設置為10000,這樣每個線程運行完畢后不管有沒有任務均休眠10秒。

也可以只設置permitRunStartTime,將permitRunEndTime設置為空或者-1。

  1. 一般來說沒有任務時線程休眠時間間隔較大,而有任務時休眠時間間隔要較小,因此sleepTimeNoData一般都大於sleepTimeInterval。
  2. 使用同一個zookeeper的不同項目如果使用同一個zookeeper實例時,所使用的zookeeper根目錄不能有父子關系,即使是同一項目的不同實例(例如測試環境、開發環境、准生產環境各部署一套實例)也要使用不具有父子關系的不同根目錄。
  3. 任務中配置的每次獲取數據量(fetchDataNumber)要大於10倍的線程數(threadNumber),即:

fetchDataNumber >= threadNumber * 最少循環次數10,否則TBSchedule日志會提示:參數設置不合理,系統性能不佳。

  1. 假定服務死亡間隔judgeDeadInterval至少要大於心跳頻率heartBeatRate的5倍。
  2. 任務配置出錯時,在控制台會對該任務加紅色高亮底色標識。
  3. 當線程組運行出現故障未及時取數時,在控制台會對該線程組加紅色高亮底色標識。
  4. 當運行過程中增加節點或修改配置,日志中可能會出現提示Zookeeper節點不存在的NullPointerException異常,不用理會。

10.理論上單台機器最大線程數為:

線程數threadNumber*單個機器的線程組數量numOfSingleServer,而numOfSingleServer並不是上限,僅有1台機器時,該機器的線程組數量能達到assignNum。

11.TBSchedule給各機器以線程組為單位進行分配,所有機器的線程組總數不會超過最大線程組數量assignNum。

12.一般來說在selectTasks()中獲取任務,然后在execute()方法中處理,在SLEEP處理模式下,最后一個活動線程才會去獲取任務,因此不會出現重復執行任務的情況。但如果在selectTasks()或execute()中再新建線程或線程池來處理任務,會出現新建線程未處理完成,但TBSchedule框架認為已處理結束從而進行下一次獲取任務的操作,可能會重復取出正在處理的任務,因此應盡量避免新建線程和線程池。

13.在selectTasks()中獲取到任務后或者在execute()中處理完任務后應更改狀態,防止下次再次取到,造成重復處理。

14.在SLEEP處理模式下,配置的分片數量應合理,分片較多則同一線程組分配過多分片,對不同分片分別查詢獲取任務則效率會降低,而分片較少則不利於擴展機器。

15.在SLEEP處理模式下,同一時間只會有一個線程執行selectTasks(),其他線程均處於休眠狀態,因此不宜在selectTasks()中進行過多操作而讓其他線程等待時間過長,處理工作應盡量在execute()中進行。或者采用NOTSLEEP模式,讓多個線程可以同時運行selectTasks()獲取不同分片的任務。

NOTSLEEP模式需要實現getComparator(),防止從任務池中取出的某項任務正在被本進程中的其他線程處理。原理是在取任務前先取得正在運行的任務放入maybeRepeatTaskList中,取得任務放入任務池后,再與maybeRepeatTaskList中的每項任務對比。同時取任務時加鎖保證只有一個線程在取任務。

只有在NotSleep模式下getComparator()才有效,在Sleep模式下無效。

執行getComparator()時會遍歷正在處理的任務池。

16.復雜任務可以拆分成多項子任務,並配置不同的策略,為操作最復雜的子任務分配較多線程,從而提高總體的處理效率。

如果不進行拆分,則會有單個線程處理時間較長,並發的線程數較少,處理時間長短不一, 且任務分配不均勻等問題。例如任務為:從FTP中取得不同大小的文件進行解析,將每行數據寫入分庫中。

如果在selectTasks中取得的每個任務對應一個文件,在execute()中處理任務時(解析文件並入庫),效率會非常低。可對解析文件的任務做改造:

改造方案1:在execute()中解析文件后入庫時采用線程池處理。但這樣仍不能解決任務分配不勻的問題,且引入線程池會增加線程數量。尤其是會造成框架錯誤判斷任務已結束,導致重復處理,因此本方案不合理。

改造方案2:將任務拆分為兩個子任務,文件解析和入分庫。

子任務1:在原execute()中對文件解析后不直接入分庫,而是取1000條合成1條記錄存入本地單庫的中間表,解析文件耗時較短且記錄數較少可以較快完成,且時間不均可以忽略。

子任務2:對中間表記錄按照自增主鍵ID分片,selectTasks()中取得記錄,然后拆分成原始單條記錄返回,在execute()中對單條記錄進行入庫處理。

 

改造方案2的線程數較少,且任務分配會比較均勻,同時避免了單線程處理一個大任務等待時間過長的問題。

17.Zookeeper存儲的數據:機器、策略定義、任務定義、任務分片(包含當前屬於哪個機器處理)

18.在zookeeper中每台機器均可保存不同的線程數等配置,說明不同機器可以使用不同的線程數等配置,但不建議使用不同配置。

19.在多任務模式下,executeNumber不要設置的太大,較小一些可以減少等待最后一個活躍線程的時間,並且如果fetchDataNumber<線程數*executeNumber,則會有線程無法分得任務。任務分配在本進程中進行,並不會請求zookeeper,因此設置的較小一些效率更高。

20.當需要重啟應用時,要在控制台上先把機器全部停止,等待線程組消失,否則直接重啟應用時會出現新的機器實例,舊的機器實例未能釋放分片,導致新的機器獲取不到任務分片無法執行,控制台上會顯示新、舊線程組均為紅色。

21.使用同一zookeeper目錄的多台機器中,先啟動的機器一般為leader,負責分片的分配。

22.控制台顯示某線程組紅色異常,長時間未取數時,可能是取任務的selectTasks()運行異常,或者每次取的任務數量過大,導致長時間未會處理完,可以適當調小eachFetchDataNum。

也有可能是因為在SLEEP模式下任務處理時間過長。

23.分片按線程組進行分配,同一機器中有多個線程組時,該機器分得多個分片,也會均勻分配給線程組,每個線程組各自獨立取任務調度,不會同時取任務。

24.當加入新機器時,會請求獲得分片。框架10秒掃描一次,如果發現機器數量有變化,且占用分片較多的機器完成任務則會自動重新分配分片。

25.如果每次從數據庫里取待處理記錄生成任務時,如果總記錄數較多,即使取到的有效記錄數較少,則掃描整張表花費時間較長,除了建立必要的索引,也應該減少無數據時掃描頻次,即降低sleepTimeInterval,也可在selectTasks()中在取到記錄后檢查數量,如果較少則sleep一段時間再返回任務,也應加大sleepTimeNoData。

26.如果任務處理結束后還要合並結果再進入下一輪處理,則最慢的機器會減慢整體速度,因此要盡量保證任務分片均勻分給不同機器,分片數量要能被機器數量整除,也能被最大線程組數量assignNum整除,這樣每台機器處理的任務數量大致相同。

27.在Zookeeper連接配置中保存時提示:

錯誤信息:Zookeeper connecting ......localhost:2181

同時無法進入其他頁面,可能由於采用不同的用戶名密碼配置過同一目錄造zookeeper數據異常,可以在zookeeper中手動刪除目錄數據,或者更換新目錄后重啟應用。

在zookeeper中刪除目錄方法:

[root@192.168.1.5]$ ./zkClient.sh

[zk: localhost:2181(CONNECTED) 0] addauth digest admin:password

[zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo

 

在控制台無法修改目錄的賬戶密碼,可在zookeeper客戶端中刪除目錄后重建目錄及賬戶密碼。

 

28.每位用戶登錄控制台后打開的配置信息均保存在bin目錄下的pamirsScheduleConfig.properties,因此在同一Tomcat下操作不同的TBSchedule目錄時會沖突,已修改TBSchedule的代碼解決了這一問題。

29.selectTasks()方法從數據庫中取得記錄時,可以在select語句中對某字段進行mod取余,這樣只獲取本線程組所分配的分片。一般有多個分庫時,同時也會采用mycat,主鍵ID無法采用自增,常用雪花算法來生成不重復的ID,但對這種ID取模一般不容易均勻,因此可增加創建時間戳字段來用於取模,一般各機器取得的任務數較為均勻。

30.如果使用zookeeper集群,則在tbschedule.properties中配置schedule.zookeeper.address時,格式如下:

IP1:Port1,IP2:Port2,IP3:Port3

31.TBSchedule無法實現任務均衡的轉移,即當一台機器處理任務較多,其他機器較閑時,不會轉到其他機器。

32.如果使用數據庫連接池,則單個機器中的線程數量不要比連接池數量大太多,或者不高於,以防出現線程獲取不到數據庫連接的情況出現。

33.Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,導致其它線程不工作的情況。
在NotSleep模式下,減少了線程休眠的時間,避免大任務阻塞的情況,但為了避免數據被重復處理,增加了CPU在數據比較上的開銷。
同時要求業務接口實現對象的比較接口。
如果對任務處理不允許停頓的情況下建議用NotSleep模式,其它情況建議用Sleep模式。
34.主機編號最小的為Leader,如果是Leader第一次啟動則會清除所有垃圾數據。
35.如果任務是輪詢類型,可將permitRunStartTime、permitRunEndTime均設置為空,將一直運行,可設置sleepTimeNoData、sleepTimeInterval來sleep。

如果要設置在一定時間做內輪詢,則可以同時設置permitRunStartTime、permitRunEndTime,在這一時間段內會執行selectTasks()及execute()。

在到達結束時間時,會將任務池清空,並設置停止運行標志,此時將無法再啟動新的線程運行execute(),因此如果selectTasks()運行時間略長於permitRunEndTime-permitRunStartTime,則execute()可能會永遠都無法被執行到。

例如:permitRunStartTime設置為:0/10 * * * * ?

    permitRunEndTime設置為:5/10 * * * * ?

而selectTasks()執行時間為6秒,則在第6秒時execute()沒有機會被執行。

因此對於輪詢任務,最好將permitRunStartTime、permitRunEndTime均設置為空.

將permitRunEndTime設置為-1與為空作用一致。
36.如果任務是定時任務,則可以只設置permitRunStartTime,而將permitRunEndTime設置為空或-1,這樣在selectTasks()取得任務為空時會sleep(),直到下一個開始時間時才會執行。

例如:permitRunStartTime設置為:0/10 * * * * ?

    permitRunEndTime設置為:-1

則在每10秒的第0秒開始執行selectTasks()取任務,如果取到任務則會交給其他線程執行execute(),如果未取到則會sleep(),直到下一個開始時間時才會執行。

如果只希望同一時間僅有一個線程處理任務,則可以只設置一個分片,並采用SLEEP模式,numOfSingleServer、assignNum均設置為1。

37.每個心跳周期都會向zookeeper更新心跳信息,如果超過judgeDeadInterval(假定服務死亡間隔)未更新過,則清除zookeeper上的任務信息及Server信息。每個心跳周期也會重新分配分片。
也會清除zookeeper中分配給已消失機器上的任務信息。
38.如果有定時任務執行出現故障,或者因重啟錯過了執行時間,如果要在下一次時間點前再次執行,則可以在控制台上臨時增加任務類型、策略,來臨時定時執行一次,月日也加上防止忘記刪除任務導致多次重復執行。執行完成后再刪除該任務類型、策略。
39.有時應用啟動后日志顯示正常,但不執行任務,有可能是zookeeper中數據出現錯誤,可刪除該目錄,重啟應用即可。
40.在控制台上點擊機器的停止按鈕時,會將zookeeper中該機器的運行狀態設置為false,並清除本機器的任務池中未被處理的任務。在每台機器進程中每2秒刷新一次運行狀態,當檢測到false,則在任務執行完畢后不再取任務處理。
41.SystemTBScheduleManagerFactory也可取消,改用@Bean注解,例如:
ScheduleJobConfiguration.java:

 

package  com.jfbank.schedule.monitor.alarm.tbs;
  
import  java.util.HashMap;
import  java.util.Map;
  
import  org.springframework.beans.factory.annotation.Value;
import  org.springframework.context.annotation.Bean;
import  org.springframework.context.annotation.Configuration;
  
import  com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
  
@Configuration 
public  class  ScheduleJobConfiguration{ 
  
     @Bean (initMethod =  "init"
     public  TBScheduleManagerFactory tbScheduleManagerFactory( 
             @Value ( "${schedule.zookeeper.address}" )String zkConnectString,  
             @Value ( "${schedule.root.catalog}" )String rootPath, 
             @Value ( "${schedule.timeout}" )String zkSessionTimeout, 
             @Value ( "${schedule.username}" )String userName, 
             @Value ( "${schedule.password}" )String password, 
             @Value ( "${schedule.isCheckParentPath}" )String isCheckParentPath) { 
         TBScheduleManagerFactory tbScheduleManagerFactory =  new  TBScheduleManagerFactory(); 
         Map<String, String> zkConfig =  new  HashMap<String, String>(); 
         zkConfig.put( "zkConnectString" , zkConnectString); 
         zkConfig.put( "rootPath" , rootPath); 
         zkConfig.put( "zkSessionTimeout" , zkSessionTimeout); 
         zkConfig.put( "userName" , userName); 
         zkConfig.put( "password" , password); 
         zkConfig.put( "isCheckParentPath" , isCheckParentPath); 
         tbScheduleManagerFactory.setZkConfig(zkConfig); 
         return  tbScheduleManagerFactory; 
     }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM