前陣子工作中遇到了一個很麻煩的問題。
本人所在的項目組做了一個機遇quartz集群的任務系統。通俗點講就是用quartz框架(quartz是一款能跑定時任務的框架支持復雜的時間表達式)
來執行定時任務。但是這里定時任務的並發數很多,就出現了一個問題,同一個trigger被多個機器重復的觸發了,這就造成了執行的任務數目
比預期的多很多。領導就讓我處理這個問題。
開始我以為是這個框架本身的配置有問題,結果翻了很多資料還是沒解決(這里不過多講這個,有興趣可以留言)。那么問題出在哪里呢?
quartz的任務工作的方式是這樣的。當任務達到觸發條件的時候(當這條任務滿足qrtz_cron_triggers表中定義的相關的時間表達式的時候)
qrtz_triggers表對應的這條記錄的狀態發生改變,同時下次觸發時間根據時間表達式做出改變,同時根據sched_name找到qrtz_job_details
表中的具體job去執行,下面就是具體的業務了。我這里的問題就出在同一時間內(前后相差幾ms)多台機器觸發了同一條trigger。然而這個
我是沒有辦法解決的或者說不想動quartz的源碼(有朋友能從這一步就把問題搞定的可以交流一下),所以我就順着quartz的工作流程繼續往下
到了job這里,由於多個trigger被觸發所以執行了多次job,那么我是否能通過讓他只執行一次job來防止重復執行呢。如果有一種方法可以
讓這個job執行一次就可以達到我的要求了(選擇在job這一步處理其實還是因為不想動源碼,到java這里我就好辦了)。正好我對zookeeper
有一些了解,zookeeper恰好有一種注冊機制可以解決這個問題。
回顧一下zookeeper關於節點注冊的用法:
zookeeper只可以注冊一個同名節點如果節點已經存在則返回nodeexits.
那么運用到我這個場景就是當任務進入job之后用job id(同時觸發的這幾個job的id是一樣的)去向zookeeper完成注冊,由於
id是一樣的那么只能有一個注冊成功,只要在注冊成功的條件下我才允許task。這樣就保證了不做重復的運算。
具體如下:
public class PlatformQuartzJobBean extends QuartzJobBean {
private String path = "/zk_triggerID";
private String lock = "/zk_lock";
private static ZooKeeper zk = null;
static{
try {
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (IOException e) {
logger.error(e.getMessage(),e);
}
}
//任務執行的具體邏輯
protected void executeInternal(JobExecutionContext jobexecutioncontext)
throws JobExecutionException {
Trigger trigger = jobexecutioncontext.getTrigger();
String triggerName = trigger.getKey().getName();//triggername是唯一的
boolean createSuccess = false;
boolean doTask = false; //不對zookeeper注冊執行任務
int childrens = 0;
List<String> children = null;
boolean onDelete = false;//是否獲取刪除節點的權限
try {
zk.create(path+"/"+triggerName, triggerName.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//如果注冊出現問題說明節點存在是重復的任務
createSuccess = true;
children = zk.getChildren(path, false);
if(children != null){
childrens = children.size();
}
if(childrens>99){//節點個數達到100個就執行刪除操作
try {
zk.create(lock+"/dodelete", "dodelete".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
onDelete = true;
} catch (KeeperException e1) {
if(e1 instanceof NodeExistsException){
logger.info("already on delete!");
}else{
logger.error(e1.getMessage(),e1);
}
} catch (InterruptedException e1) {
logger.error(e1.getMessage(),e1);
}
}
//執行具體的任務
execuTask(trigger,triggerName,jobexecutioncontext,af);
} catch (KeeperException e) {
if(e instanceof NodeExistsException){
logger.info("already on do");
}else if(e instanceof ConnectionLossException){
logger.info("ConnectionLoss ,do task without registered!!");
doTask = true;
}else if(e instanceof SessionExpiredException){
logger.info("session expired ,do task without registered!!");
doTask = true;
try {
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (IOException e1) {
logger.error(e1.getMessage(),e1);
}
}else{
logger.error(e.getMessage(),e);
}
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}
if(createSuccess && onDelete){//如果創建成功並且root下有執行刪除的權利
try {
for(String str:children){
zk.delete(path+"/"+str, -1);
}
} catch (KeeperException e1) {
logger.error(e1.getMessage(),e1);
} catch (InterruptedException e1) {
logger.error(e1.getMessage(),e1);
}finally{
if(onDelete){
try {
zk.delete(lock+"/dodelete", -1);
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
} catch (KeeperException e) {
if(e instanceof ConnectionLossException){
logger.info("ConnectionLoss ,reconnect zk!!");
try {
zk.close();//人為失效,刪除dodelete節點
zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}else{
logger.error(e.getMessage(),e);
}
}
}
}
}
//如果出現connectloss和sessionexpired 可能是網絡有點問題找不到zookeeper就不管重復問題了完成任務為最重要的
if(doTask){//如果出現connectloss和sessionexpired 就直接執行任務
execuTask(trigger,triggerName,jobexecutioncontext,af);
}
}
}
整個過程就是:當job開始的時候去向zookeeper申請注冊,只有當注冊成功的時候才執行業務,失敗則退出job。同時由於我這里是每天循環的
定時任務所以當zookeeper下的節點數目達到一定的個數的時候加一個刪除鎖(就是向zookeeper create一個ondetele節點),同時刪除之前
的triggername節點,這樣保證了明天這些任務可以繼續完成。至此,任務重復執行的問題就解決了。下一篇博客將簡單的介紹一下zookeeper和
zookeeper的布置,雖然網上這方面東西很多,不過自己寫出來(自己實踐過可以用的),以后可以直接拿來用。。
