一、TBSchedule簡介
TBSchedule是來自淘寶的分布式調度開源框架,基於Zookeeper純Java實現,其目的是讓一種批量任務或者不斷變化的任務,能夠被動態的分配到多個主機的JVM中的不同線程組中並行執行。所有的任務能夠被不重復,不遺漏的快速處理。這種框架任務的分配通過分片實現了不重復調度,又通過架構中Leader的選擇,存活的自我保證,完成了可用性和伸縮性的保障。
TBSchedule源碼地址:http://code.taobao.org/p/tbschedule/src/
二、開發環境
- WIN10,也可換為Linux
- JDK 1.7
- Tomcat 8.5
- 安裝zookeeper
三、配置步驟
1.安裝zookeeper
(1)下載zookeeper
http://zookeeper.apache.org/releases.html
下載3.4.11版本:
http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
(2)解壓至c:/prog/zookeeper/zookeeper-3.4.11
復制conf下的zoo_sample.cfg為zoo.cfg
修改dataDir為:
dataDir=/prog/zookeeper/data
tickTime單位為毫秒,為心跳間隔和最小的心跳超時間隔
clientPort是監聽客戶端連接的端口,默認為2181
(3)創建目錄:c:/prog/zookeeper/data
2.啟動zookeeper
運行bin/zkServer.cmd
如果在Linux下,則執行:
[root@192.168.1.5]$ ./zkServer start
3.下載TBSchedule
采用svn來Checkout TBSchedule
svn地址:http://code.taobao.org/svn/tbschedule/
4.在Eclipse中導入項目:
右鍵工程區域(Package Explorer)->Import...->Maven-Existing Maven Projects
注意:TBSchedule編碼為GBK,但引用TBSchedule的工程編碼為UTF-8時,此處也要將TBSchedule工程的編碼設置為UTF-8。
5.安裝Tomcat
(1)下載Tomcat
地址:https://tomcat.apache.org/download-80.cgi#8.5.27
(2)解壓Tomcat 8.5至c:\prog\tomcat\apache-tomcat-8.5.11
6.配置TBSchedule控制台
(1)將TBSchedule工程中的console\ScheduleConsole.war拷貝至tomcat/webapps中
(2)啟動tomcat
(3)瀏覽器中打開:
http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true
點擊保存會提示:
錯誤信息:Zookeeper connecting ......localhost:2181
如配置正確則可以忽略上述提示,直接進入“管理主頁...”。
7.查看zookeeper中節點
運行zookeeper下的bin/zkClient.cmd
輸入ls /app-schedule/demo,顯示:
[strategy, baseTaskType, factory]
說明已經創建znode成功。
查看TBSchedule控制台中的“Zookeeper數據”,也能看到相同數據。
8.在項目中使用TBSchedule
Eclipse中新建一個maven工程tbsdemo
GroupId:com.jf
Artifact Id:tbsdemo
9.在pom.xml中引入Spring、TBSchedule、Zookeeper
pom.xml內容為:
<project xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0
.
0
</modelVersion>
<groupId>com.jf</groupId>
<artifactId>tbsdemo</artifactId>
<version>
0.0
.
1
-SNAPSHOT</version>
<packaging>jar</packaging>
<name>tbsdemo</name>
<url>http:
//maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-
8
</project.build.sourceEncoding>
<!-- spring版本號 -->
<spring.version>
4.0
.
5
.RELEASE</spring.version>
<!-- mybatis版本號 -->
<mybatis.version>
3.3
.
0
</mybatis.version>
<!-- log4j日志文件管理包版本 -->
<slf4j.version>
1.7
.
7
</slf4j.version>
<log4j.version>
1.2
.
17
</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>
4.11
</version>
<scope>test</scope>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>
3.4
.
11
</version>
</dependency>
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>
3.3
.
3.2
</version>
</dependency>
</dependencies>
</project>
|
10.在src/main/resources下創建applicationContext.xml,輸入:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:p=
"http://www.springframework.org/schema/p"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<bean id=
"scheduleManagerFactory"
class
=
"com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method=
"init"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
</beans>
|
11.創建TBSchedule配置文件
在src/main/resources/中創建tbschedule.propertie
輸入:
#注冊中心地址
schedule.zookeeper.address=localhost:
2181
#定時任務根目錄,任意指定,調度控制台配置時對應
schedule.root.catalog=/app-schedule/demo
#賬戶,任意指定,調度控制台配置時對應
schedule.username=admin
#密碼,任意指定,調度控制台配置時對應
schedule.password=password
#超時配置
schedule.timeout=
60000
|
注意schedule.username、schedule.password要與TBSchedule控制台中設置的一致。
12.創建任務數據類TaskModel:
package
com.jf.tbsdemo.pojo;
public
class
TaskModel {
private
long
id;
private
String taskInfo;
public
TaskModel(
long
id, String taskInfo) {
this
.id = id;
this
.taskInfo = taskInfo;
}
public
long
getId() {
return
id;
}
public
void
setId(
long
id) {
this
.id = id;
}
public
String getTaskInfo() {
return
taskInfo;
}
public
void
setTaskInfo(String taskInfo) {
this
.taskInfo = taskInfo;
}
}
|
13.創建任務處理類IScheduleTaskDealSingleTest:
注意:任務處理分單任務和多任務(批處理),分別實現IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法參數只有一個任務T,而后者的execute()方法參數為List<T>,本文使用單任務模式。
package
com.jf.tbsdemo;
import
java.util.ArrayList;
import
java.util.Comparator;
import
java.util.Date;
import
java.util.List;
import
org.apache.log4j.Logger;
import
org.springframework.stereotype.Component;
import
com.jf.tbsdemo.pojo.TaskModel;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.TaskItemDefine;
public
class
IScheduleTaskDealSingleTest
implements
IScheduleTaskDealSingle<TaskModel> {
private
static
final
Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest.
class
);
public
Comparator<TaskModel> getComparator() {
return
null
;
}
public
List<TaskModel> selectTasks(String taskParameter, String ownSign,
int
taskQueueNum,
List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest選擇任務列表開始.........."
);
List<TaskModel> models =
new
ArrayList<TaskModel>();
models.add(
new
TaskModel(
1
,
"task1"
));
models.add(
new
TaskModel(
2
,
"task2"
));
return
models;
}
public
boolean
execute(TaskModel model, String ownSign)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest執行開始.........."
+
new
Date());
logger.info(
"任務"
+ model.getId() +
",內容:"
+ model.getTaskInfo());
return
true
;
}
}
|
其中,selectTasks()方法負責取得要處理的任務信息,execute()方法為處理任務的方法。selectTasks()方法可以理解為生產者,execute()方法可以理解為消費者。
14.創建主程序類TaskCenter:
package
com.jf.tbsdemo;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
public
static
void
main(String[] args)
throws
Exception {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
logger.info(
"---------------task start------------------"
);
}
}
|
15.在Eclipse中運行主程序類TaskCenter
16.在TBSchedule中創建任務:
(1)進入TBSchedule的控制台->任務管理
點擊“創建新任務…”
(2)配置任務屬性:
- 在任務處理的SpringBean中輸入:iScheduleTaskDealSingleTest
- 處理模式分為:SLEEP、NOTSLEEP,其中SLEEP模式是指當一個線程處理完任務后在任務池中取不到其他任務時,會檢查其他線程是否活動,如果是則自己休眠,否則說明自己是最后一位,則調用業務接口取得待處理的任務放入任務池,並喚醒其他線程處理。
NOTSLEEP模式下線程在任務池中取不到任務時,將立即調用業務接口獲取待處理的任務。
SLEEP模式較為簡單,因為取任務的線程同一時間只有一個,不易發生沖突,效率也會較低。NOTSLEEP模式開銷較大,也要防止發生重復獲取相同任務。
- 設置執行開始時間結束時間:與Crontab格式一致,在本時間段內任務才會執行。
- 添加任務項:
0,1,2,3,4,5,6,7,8,9
17.在TBSchedule中創建調度策略:
(1)進入TBSchedule的控制台->調度策略
點擊“創建新策略…”
(2)填寫策略屬性:
注意任務名稱要與新建的任務名稱一致。
(3)點擊創建,將立即啟動調度任務
另外,除了在控制台中配置調度策略、任務,還可以通過通過代碼、Spring配置來設置任務調度參數,推薦采用Spring配置方式。
18.代碼方式
創建類TaskCenter:
package
com.jf.tbsdemo;
import
java.util.Properties;
import
javax.annotation.Resource;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
// 初始化調度工廠
@Resource
TBScheduleManagerFactory scheduleManagerFactory =
new
TBScheduleManagerFactory();
private
void
startTask() {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
Properties p =
new
Properties();
p.put(
"zkConnectString"
,
"localhost:2181"
);
p.put(
"rootPath"
,
"/app-schedule/demo"
);
p.put(
"zkSessionTimeout"
,
"60000"
);
p.put(
"userName"
,
"admin"
);
p.put(
"password"
,
"password"
);
p.put(
"isCheckParentPath"
,
"true"
);
scheduleManagerFactory.setApplicationContext(ctx);
try
{
scheduleManagerFactory.init(p);
// 創建任務調度任務的基本信息
String baseTaskTypeName =
"DemoTask"
;
ScheduleTaskType baseTaskType =
new
ScheduleTaskType();
baseTaskType.setBaseTaskType(baseTaskTypeName);
baseTaskType.setDealBeanName(
"demoTaskBean"
);
baseTaskType.setHeartBeatRate(
10000
);
baseTaskType.setJudgeDeadInterval(
100000
);
baseTaskType.setTaskParameter(
"AREA=BJ,YEAR>30"
);
baseTaskType.setTaskItems(ScheduleTaskType
.splitTaskItem(
"0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
+
"4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
+
"8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"
));
baseTaskType.setFetchDataNumber(
500
);
baseTaskType.setThreadNumber(
5
);
scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
logger.info(
"創建調度任務成功:"
+ baseTaskType.toString());
// 創建任務的調度策略
String taskName = baseTaskTypeName;
String strategyName = taskName +
"-Strategy"
;
try
{
scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,
true
);
}
catch
(Exception e) {
e.printStackTrace();
}
ScheduleStrategy strategy =
new
ScheduleStrategy();
strategy.setStrategyName(strategyName);
strategy.setKind(ScheduleStrategy.Kind.Schedule);
strategy.setTaskName(taskName);
strategy.setTaskParameter(
"china"
);
strategy.setNumOfSingleServer(
1
);
strategy.setAssignNum(
10
);
strategy.setIPList(
"127.0.0.1"
.split(
","
));
scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
logger.info(
"創建調度策略成功:"
+ strategy.toString());
logger.info(
"---------------task start------------------"
);
}
catch
(Exception e) {
logger.error(
"出現異常"
, e);
}
}
public
static
void
main(String[] args)
throws
Exception {
TaskCenter taskCenter =
new
TaskCenter();
taskCenter.startTask();
}
}
|
19.Spring配置文件方式
(1)增加類AbstractBaseScheduleTask:
package
com.jf.tbsdemo;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
abstract
class
AbstractBaseScheduleTask<T>
implements
IScheduleTaskDealSingle<T> {
/**
* 調度任務的配置
*/
private
ScheduleTaskType scheduleTaskType;
/**
* 調度策略的配置
*/
private
ScheduleStrategy scheduleStrategy;
public
ScheduleTaskType getScheduleTaskType() {
return
scheduleTaskType;
}
public
void
setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
this
.scheduleTaskType = scheduleTaskType;
}
public
ScheduleStrategy getScheduleStrategy() {
return
scheduleStrategy;
}
public
void
setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
this
.scheduleStrategy = scheduleStrategy;
}
}
|
(2)修改IScheduleTaskDealSingleTest:
類聲明改為:
public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {
(3)在applicationContext.xml中對聲明IScheduleTaskDealSingleTest的Bean並注入參數,內容為:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<!--tbschedule管理器初始化(配置zookeeper,注冊調度任務和調度策略)-->
<bean id=
"systemTBScheduleManagerFactory"
class
=
"com.jf.tbsdemo.SystemTBScheduleManagerFactory"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
<bean name=
"scheduleTaskType"
class
=
"com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType"
>
<!-- 心跳頻率(毫秒) -->
<property name=
"heartBeatRate"
value=
"5000"
/>
<!-- 假定服務死亡間隔(毫秒) -->
<property name=
"judgeDeadInterval"
value=
"60000"
/>
<!-- 處理模式 -->
<property name=
"processorType"
value=
"SLEEP"
/>
<!-- 線程數 -->
<property name=
"threadNumber"
value=
"5"
/>
<!--允許執行的開始時間-->
<property name=
"permitRunStartTime"
value=
""
/>
<!--允許執行的結束時間-->
<property name=
"permitRunEndTime"
value=
""
/>
<!--當沒有數據的時候,休眠的時間-->
<property name=
"sleepTimeNoData"
value=
"3000"
/>
<!--在每次數據處理完后休眠的時間-->
<property name=
"sleepTimeInterval"
value=
"1000"
/>
<!--每次獲取數據的數量-->
<property name=
"fetchDataNumber"
value=
"10"
/>
<!--任務項數組-->
<property name=
"taskItems"
>
<list>
<value>
0
:{TYPE=A,KIND=
1
}</value>
<value>
1
:{TYPE=B,KIND=
2
}</value>
<value>
2
:{TYPE=C,KIND=
3
}</value>
</list>
</property>
</bean>
<bean name=
"scheduleStrategy"
class
=
"com.taobao.pamirs.schedule.strategy.ScheduleStrategy"
>
<!--最大線程組數量-->
<property name=
"assignNum"
value=
"9"
/>
<!--單個機器(JVM)的線程組數量-->
<property name=
"numOfSingleServer"
value=
"3"
/>
<!--策略運行的機器(JVM)IP-->
<property name=
"IPList"
>
<list>
<value>
127.0
.
0.1
</value>
</list>
</property>
</bean>
<!--任務simpleTask-->
<bean id=
"simpleTask"
class
=
"com.jf.tbsdemo.IScheduleTaskDealSingleTest"
>
<property name=
"scheduleTaskType"
ref=
"scheduleTaskType"
/>
<property name=
"scheduleStrategy"
ref=
"scheduleStrategy"
/>
</bean>
</beans>
|
(4)打開控制台,刪除所有已有的任務、調度策略,再啟動TaskCenter,刷新頁面則可看到當前出現了正在運行的任務和調度策略。
(5)也可以在控制台中修改策略,但重啟TaskCenter之后會恢復Spring中的配置信息。
20.數據分片方法
為了避免TBSchedule管理的多線程重復處理數據,需要采用分片,實現方法如下:
(1)在selectTasks()方法中實現分片獲取待處理數據
(2) selectTasks()方法的taskItemList參數為當前線程分配到的可處理任務分片信息,全部任務分片信息由配置文件中的taskItem定義,每項任務信息為TaskItemDefine類型,其中taskItemId標明了分片ID(即0,1,2),parameter為自定義參數(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。
(3)根據上面算出的分片ID來取得相應的待處理任務,例如selectTasks()方法從數據庫中獲取待處理的交易請求記錄,可以將記錄的主鍵或者其他字段HashCode值的余數作為分片ID,在selectTasks()方法中只獲取與taskItemList中指定分片ID相同的任務,避免不同線程重復獲取同一任務。
(4)在系統運行過程中,線程數量會有所變化,因此要在每個selectTasks()方法執行開始先獲取taskItemList。
(5) 每次執行selectTasks()方法取得記錄條數不要超過eachFetchDataNum
(6)典型的分片代碼實現:
/**
* 根據條件,查詢當前調度服務器可處理的任務
* @param taskParameter 任務的自定義參數
* @param ownSign 當前環境名稱
* @param taskItemNum 當前任務類型的任務隊列數量
* @param taskItemList 當前調度服務器,分配到的可處理隊列
* @param eachFetchDataNum 每次獲取數據的數量
* @return
* @throws Exception
*/
public
List<Date> selectTasks(String taskParameter, String ownSign,
int
taskItemNum, List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
List<Date> dateList =
new
ArrayList<>();
List<Long> taskIdList =
new
ArrayList<>();
for
(TaskItemDefine t : taskItemList){
//確定當前任務處理器需處理的任務項id
taskIdList.add(Long.valueOf(t.getTaskItemId()));
}
for
(
int
i=
0
;i<eachFetchDataNum;i++){
// 添加最多指定數量的待處理數據
Date date =
new
Date();
//生成待處理數據
Long remainder = date.getTime() % taskItemNum ;
if
(taskIdList.contains(remainder)){
//根據數據取模,判斷當前待處理數據,是否應由當前任務處理器處理
dateList.add(date);
}
TimeUnit.SECONDS.sleep(
1
);
}
return
dateList;
//返回當前任務處理器需要處理的數據
}
|
21.參數說明
(1)zookeeper參數
zkConnectString:zookeeper注冊中心地址
rootPath:定時任務根目錄,任意指定,調度控制台配置時對應
zkSessionTimeout:超時時間
userName:賬戶,任意指定,調度控制台配置時對應
password:密碼,任意指定,調度控制台配置時對應
isCheckParentPath:設置為true會檢查上級目錄是否已經被用作TBSchedule調度,如果是則啟動任務失敗。
(2)任務參數:
heartBeatRate:心跳頻率(毫秒)
judgeDeadInterval:假定服務死亡間隔(毫秒)
sleepTimeNoData: 當沒有數據的時候,休眠的時間
sleepTimeInterval:在每次數據處理完后休眠的時間
processorType:處理模式,可為SLEEP或NOTSLEEP。
permitRunStartTime:執行開始時間如果為空則不定時,直接執行。
permitRunEndTime:執行結束時間,與執行開始時間之間的時間才可以執行任務。
taskItems: 任務項數組,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}
在調度過程中,某線程分得了獲取數據的任務,假設獲取第1項任務,則在selectTasks()方法的taskItemList參數中包含第1項任務的信息,TaskItemDefine類型,包含:taskItemId、parameter成員變量,分別為:1、{TYPE=B,KIND=2}。可根據該信息取得相應的數據。
fetchDataNumber:selectTasks()方法每次獲取數據的數量
executeNumber:每次執行數量,即execute()方法每次取得的任務數量,只在bean實現IScheduleTaskDealMulti才生效。
threadNumber:每個線程組中的線程數
maxTaskItemsOfOneThreadGroup:每一組線程能分配的最大任務數量,避免在隨着機器的減少把正常的服務器壓死,0或者空表示不限制
taskParameter:任務的自定義參數,可作為selectTasks中的參數傳入。
(3)調度策略參數:
strategyName:策略名稱,必須填寫,不能有中文和特殊字符。
kind:任務類型,Schedule,Java,Bean 大小寫敏感。
taskName:要配置調度策略的任務名稱,與這一任務配置的名稱要一致。
taskParameter:任務參數,逗號分隔的Key-Value。對任務類型為Java、Bean的有效,對任務類型為Schedule的無效,需要通過任務管理來配置。
assignNum:最大線程組數量,是所有機器(JVM)總共運行的線程組的最大數量。
numOfSingleServer單個機器(JVM)的線程組數量,如果是0,則表示無限制。
IPList:策略運行的機器(JVM)IP列表,127.0.0.1或者localhost會在所有機器上運行。
四、注意事項
- 如果分配給某線程的任務還未執行完,重啟該線程所屬進程后,這些任務將會丟失,因此要自行實現冪等,且不要直接kill進程,而是發消息通知各線程執行完畢后安全退出。
當在控制台點擊停止任務的按鈕時。會將任務池中未處理的任務清除,而停止前的在處理的任務將繼續執行。
- 如果要設置任務間隔一定時間運行一次,假設為10秒,可以將permitRunEndTime、permitRunStartTime設置為空,將sleepTimeNoData、sleepTimeInterval均設置為10000,這樣每個線程運行完畢后不管有沒有任務均休眠10秒。
也可以只設置permitRunStartTime,將permitRunEndTime設置為空或者-1。
- 一般來說沒有任務時線程休眠時間間隔較大,而有任務時休眠時間間隔要較小,因此sleepTimeNoData一般都大於sleepTimeInterval。
- 使用同一個zookeeper的不同項目如果使用同一個zookeeper實例時,所使用的zookeeper根目錄不能有父子關系,即使是同一項目的不同實例(例如測試環境、開發環境、准生產環境各部署一套實例)也要使用不具有父子關系的不同根目錄。
- 任務中配置的每次獲取數據量(fetchDataNumber)要大於10倍的線程數(threadNumber),即:
fetchDataNumber >= threadNumber * 最少循環次數10,否則TBSchedule日志會提示:參數設置不合理,系統性能不佳。
- 假定服務死亡間隔judgeDeadInterval至少要大於心跳頻率heartBeatRate的5倍。
- 任務配置出錯時,在控制台會對該任務加紅色高亮底色標識。
- 當線程組運行出現故障未及時取數時,在控制台會對該線程組加紅色高亮底色標識。
- 當運行過程中增加節點或修改配置,日志中可能會出現提示Zookeeper節點不存在的NullPointerException異常,不用理會。
10.理論上單台機器最大線程數為:
線程數threadNumber*單個機器的線程組數量numOfSingleServer,而numOfSingleServer並不是上限,僅有1台機器時,該機器的線程組數量能達到assignNum。
11.TBSchedule給各機器以線程組為單位進行分配,所有機器的線程組總數不會超過最大線程組數量assignNum。
12.一般來說在selectTasks()中獲取任務,然后在execute()方法中處理,在SLEEP處理模式下,最后一個活動線程才會去獲取任務,因此不會出現重復執行任務的情況。但如果在selectTasks()或execute()中再新建線程或線程池來處理任務,會出現新建線程未處理完成,但TBSchedule框架認為已處理結束從而進行下一次獲取任務的操作,可能會重復取出正在處理的任務,因此應盡量避免新建線程和線程池。
13.在selectTasks()中獲取到任務后或者在execute()中處理完任務后應更改狀態,防止下次再次取到,造成重復處理。
14.在SLEEP處理模式下,配置的分片數量應合理,分片較多則同一線程組分配過多分片,對不同分片分別查詢獲取任務則效率會降低,而分片較少則不利於擴展機器。
15.在SLEEP處理模式下,同一時間只會有一個線程執行selectTasks(),其他線程均處於休眠狀態,因此不宜在selectTasks()中進行過多操作而讓其他線程等待時間過長,處理工作應盡量在execute()中進行。或者采用NOTSLEEP模式,讓多個線程可以同時運行selectTasks()獲取不同分片的任務。
NOTSLEEP模式需要實現getComparator(),防止從任務池中取出的某項任務正在被本進程中的其他線程處理。原理是在取任務前先取得正在運行的任務放入maybeRepeatTaskList中,取得任務放入任務池后,再與maybeRepeatTaskList中的每項任務對比。同時取任務時加鎖保證只有一個線程在取任務。
只有在NotSleep模式下getComparator()才有效,在Sleep模式下無效。
執行getComparator()時會遍歷正在處理的任務池。
16.復雜任務可以拆分成多項子任務,並配置不同的策略,為操作最復雜的子任務分配較多線程,從而提高總體的處理效率。
如果不進行拆分,則會有單個線程處理時間較長,並發的線程數較少,處理時間長短不一, 且任務分配不均勻等問題。例如任務為:從FTP中取得不同大小的文件進行解析,將每行數據寫入分庫中。
如果在selectTasks中取得的每個任務對應一個文件,在execute()中處理任務時(解析文件並入庫),效率會非常低。可對解析文件的任務做改造:
改造方案1:在execute()中解析文件后入庫時采用線程池處理。但這樣仍不能解決任務分配不勻的問題,且引入線程池會增加線程數量。尤其是會造成框架錯誤判斷任務已結束,導致重復處理,因此本方案不合理。
改造方案2:將任務拆分為兩個子任務,文件解析和入分庫。
子任務1:在原execute()中對文件解析后不直接入分庫,而是取1000條合成1條記錄存入本地單庫的中間表,解析文件耗時較短且記錄數較少可以較快完成,且時間不均可以忽略。
子任務2:對中間表記錄按照自增主鍵ID分片,selectTasks()中取得記錄,然后拆分成原始單條記錄返回,在execute()中對單條記錄進行入庫處理。
改造方案2的線程數較少,且任務分配會比較均勻,同時避免了單線程處理一個大任務等待時間過長的問題。
17.Zookeeper存儲的數據:機器、策略定義、任務定義、任務分片(包含當前屬於哪個機器處理)
18.在zookeeper中每台機器均可保存不同的線程數等配置,說明不同機器可以使用不同的線程數等配置,但不建議使用不同配置。
19.在多任務模式下,executeNumber不要設置的太大,較小一些可以減少等待最后一個活躍線程的時間,並且如果fetchDataNumber<線程數*executeNumber,則會有線程無法分得任務。任務分配在本進程中進行,並不會請求zookeeper,因此設置的較小一些效率更高。
20.當需要重啟應用時,要在控制台上先把機器全部停止,等待線程組消失,否則直接重啟應用時會出現新的機器實例,舊的機器實例未能釋放分片,導致新的機器獲取不到任務分片無法執行,控制台上會顯示新、舊線程組均為紅色。
21.使用同一zookeeper目錄的多台機器中,先啟動的機器一般為leader,負責分片的分配。
22.控制台顯示某線程組紅色異常,長時間未取數時,可能是取任務的selectTasks()運行異常,或者每次取的任務數量過大,導致長時間未會處理完,可以適當調小eachFetchDataNum。
也有可能是因為在SLEEP模式下任務處理時間過長。
23.分片按線程組進行分配,同一機器中有多個線程組時,該機器分得多個分片,也會均勻分配給線程組,每個線程組各自獨立取任務調度,不會同時取任務。
24.當加入新機器時,會請求獲得分片。框架10秒掃描一次,如果發現機器數量有變化,且占用分片較多的機器完成任務則會自動重新分配分片。
25.如果每次從數據庫里取待處理記錄生成任務時,如果總記錄數較多,即使取到的有效記錄數較少,則掃描整張表花費時間較長,除了建立必要的索引,也應該減少無數據時掃描頻次,即降低sleepTimeInterval,也可在selectTasks()中在取到記錄后檢查數量,如果較少則sleep一段時間再返回任務,也應加大sleepTimeNoData。
26.如果任務處理結束后還要合並結果再進入下一輪處理,則最慢的機器會減慢整體速度,因此要盡量保證任務分片均勻分給不同機器,分片數量要能被機器數量整除,也能被最大線程組數量assignNum整除,這樣每台機器處理的任務數量大致相同。
27.在Zookeeper連接配置中保存時提示:
錯誤信息:Zookeeper connecting ......localhost:2181
同時無法進入其他頁面,可能由於采用不同的用戶名密碼配置過同一目錄造zookeeper數據異常,可以在zookeeper中手動刪除目錄數據,或者更換新目錄后重啟應用。
在zookeeper中刪除目錄方法:
[root@192.168.1.5]$ ./zkClient.sh
[zk: localhost:2181(CONNECTED) 0] addauth digest admin:password
[zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo
在控制台無法修改目錄的賬戶密碼,可在zookeeper客戶端中刪除目錄后重建目錄及賬戶密碼。
28.每位用戶登錄控制台后打開的配置信息均保存在bin目錄下的pamirsScheduleConfig.properties,因此在同一Tomcat下操作不同的TBSchedule目錄時會沖突,已修改TBSchedule的代碼解決了這一問題。
29.selectTasks()方法從數據庫中取得記錄時,可以在select語句中對某字段進行mod取余,這樣只獲取本線程組所分配的分片。一般有多個分庫時,同時也會采用mycat,主鍵ID無法采用自增,常用雪花算法來生成不重復的ID,但對這種ID取模一般不容易均勻,因此可增加創建時間戳字段來用於取模,一般各機器取得的任務數較為均勻。
30.如果使用zookeeper集群,則在tbschedule.properties中配置schedule.zookeeper.address時,格式如下:
IP1:Port1,IP2:Port2,IP3:Port3
31.TBSchedule無法實現任務均衡的轉移,即當一台機器處理任務較多,其他機器較閑時,不會轉到其他機器。
32.如果使用數據庫連接池,則單個機器中的線程數量不要比連接池數量大太多,或者不高於,以防出現線程獲取不到數據庫連接的情況出現。
33.Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,導致其它線程不工作的情況。
在NotSleep模式下,減少了線程休眠的時間,避免大任務阻塞的情況,但為了避免數據被重復處理,增加了CPU在數據比較上的開銷。
同時要求業務接口實現對象的比較接口。
如果對任務處理不允許停頓的情況下建議用NotSleep模式,其它情況建議用Sleep模式。
34.主機編號最小的為Leader,如果是Leader第一次啟動則會清除所有垃圾數據。
35.如果任務是輪詢類型,可將permitRunStartTime、permitRunEndTime均設置為空,將一直運行,可設置sleepTimeNoData、sleepTimeInterval來sleep。
如果要設置在一定時間做內輪詢,則可以同時設置permitRunStartTime、permitRunEndTime,在這一時間段內會執行selectTasks()及execute()。
在到達結束時間時,會將任務池清空,並設置停止運行標志,此時將無法再啟動新的線程運行execute(),因此如果selectTasks()運行時間略長於permitRunEndTime-permitRunStartTime,則execute()可能會永遠都無法被執行到。
例如:permitRunStartTime設置為:0/10 * * * * ?
permitRunEndTime設置為:5/10 * * * * ?
而selectTasks()執行時間為6秒,則在第6秒時execute()沒有機會被執行。
因此對於輪詢任務,最好將permitRunStartTime、permitRunEndTime均設置為空.
將permitRunEndTime設置為-1與為空作用一致。
36.如果任務是定時任務,則可以只設置permitRunStartTime,而將permitRunEndTime設置為空或-1,這樣在selectTasks()取得任務為空時會sleep(),直到下一個開始時間時才會執行。
例如:permitRunStartTime設置為:0/10 * * * * ?
permitRunEndTime設置為:-1
則在每10秒的第0秒開始執行selectTasks()取任務,如果取到任務則會交給其他線程執行execute(),如果未取到則會sleep(),直到下一個開始時間時才會執行。
如果只希望同一時間僅有一個線程處理任務,則可以只設置一個分片,並采用SLEEP模式,numOfSingleServer、assignNum均設置為1。
37.每個心跳周期都會向zookeeper更新心跳信息,如果超過judgeDeadInterval(假定服務死亡間隔)未更新過,則清除zookeeper上的任務信息及Server信息。每個心跳周期也會重新分配分片。
也會清除zookeeper中分配給已消失機器上的任務信息。
38.如果有定時任務執行出現故障,或者因重啟錯過了執行時間,如果要在下一次時間點前再次執行,則可以在控制台上臨時增加任務類型、策略,來臨時定時執行一次,月日也加上防止忘記刪除任務導致多次重復執行。執行完成后再刪除該任務類型、策略。
39.有時應用啟動后日志顯示正常,但不執行任務,有可能是zookeeper中數據出現錯誤,可刪除該目錄,重啟應用即可。
40.在控制台上點擊機器的停止按鈕時,會將zookeeper中該機器的運行狀態設置為false,並清除本機器的任務池中未被處理的任務。在每台機器進程中每2秒刷新一次運行狀態,當檢測到false,則在任務執行完畢后不再取任務處理。
41.SystemTBScheduleManagerFactory也可取消,改用@Bean注解,例如:
ScheduleJobConfiguration.java:
package
com.jfbank.schedule.monitor.alarm.tbs;
import
java.util.HashMap;
import
java.util.Map;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
@Configuration
public
class
ScheduleJobConfiguration{
@Bean
(initMethod =
"init"
)
public
TBScheduleManagerFactory tbScheduleManagerFactory(
@Value
(
"${schedule.zookeeper.address}"
)String zkConnectString,
@Value
(
"${schedule.root.catalog}"
)String rootPath,
@Value
(
"${schedule.timeout}"
)String zkSessionTimeout,
@Value
(
"${schedule.username}"
)String userName,
@Value
(
"${schedule.password}"
)String password,
@Value
(
"${schedule.isCheckParentPath}"
)String isCheckParentPath) {
TBScheduleManagerFactory tbScheduleManagerFactory =
new
TBScheduleManagerFactory();
Map<String, String> zkConfig =
new
HashMap<String, String>();
zkConfig.put(
"zkConnectString"
, zkConnectString);
zkConfig.put(
"rootPath"
, rootPath);
zkConfig.put(
"zkSessionTimeout"
, zkSessionTimeout);
zkConfig.put(
"userName"
, userName);
zkConfig.put(
"password"
, password);
zkConfig.put(
"isCheckParentPath"
, isCheckParentPath);
tbScheduleManagerFactory.setZkConfig(zkConfig);
return
tbScheduleManagerFactory;
}
}
|