前陣子工作中遇到了一個很麻煩的問題。
本人所在的項目組做了一個機遇quartz集群的任務系統。通俗點講就是用quartz框架(quartz是一款能跑定時任務的框架支持復雜的時間表達式)
來執行定時任務。但是這里定時任務的並發數很多,就出現了一個問題,同一個trigger被多個機器重復的觸發了,這就造成了執行的任務數目
比預期的多很多。領導就讓我處理這個問題。
開始我以為是這個框架本身的配置有問題,結果翻了很多資料還是沒解決(這里不過多講這個,有興趣可以留言)。那么問題出在哪里呢?
quartz的任務工作的方式是這樣的。當任務達到觸發條件的時候(當這條任務滿足qrtz_cron_triggers表中定義的相關的時間表達式的時候)
qrtz_triggers表對應的這條記錄的狀態發生改變,同時下次觸發時間根據時間表達式做出改變,同時根據sched_name找到qrtz_job_details
表中的具體job去執行,下面就是具體的業務了。我這里的問題就出在同一時間內(前后相差幾ms)多台機器觸發了同一條trigger。然而這個
我是沒有辦法解決的或者說不想動quartz的源碼(有朋友能從這一步就把問題搞定的可以交流一下),所以我就順着quartz的工作流程繼續往下
到了job這里,由於多個trigger被觸發所以執行了多次job,那么我是否能通過讓他只執行一次job來防止重復執行呢。如果有一種方法可以
讓這個job執行一次就可以達到我的要求了(選擇在job這一步處理其實還是因為不想動源碼,到java這里我就好辦了)。正好我對zookeeper
有一些了解,zookeeper恰好有一種注冊機制可以解決這個問題。
回顧一下zookeeper關於節點注冊的用法:
zookeeper只可以注冊一個同名節點如果節點已經存在則返回nodeexits.
那么運用到我這個場景就是當任務進入job之后用job id(同時觸發的這幾個job的id是一樣的)去向zookeeper完成注冊,由於
id是一樣的那么只能有一個注冊成功,只要在注冊成功的條件下我才允許task。這樣就保證了不做重復的運算。
具體如下:
public class PlatformQuartzJobBean extends QuartzJobBean { private String path = "/zk_triggerID"; private String lock = "/zk_lock"; private static ZooKeeper zk = null; static{ try { zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher()); } catch (IOException e) { logger.error(e.getMessage(),e); } } //任務執行的具體邏輯 protected void executeInternal(JobExecutionContext jobexecutioncontext) throws JobExecutionException { Trigger trigger = jobexecutioncontext.getTrigger(); String triggerName = trigger.getKey().getName();//triggername是唯一的 boolean createSuccess = false; boolean doTask = false; //不對zookeeper注冊執行任務 int childrens = 0; List<String> children = null; boolean onDelete = false;//是否獲取刪除節點的權限 try { zk.create(path+"/"+triggerName, triggerName.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//如果注冊出現問題說明節點存在是重復的任務 createSuccess = true; children = zk.getChildren(path, false); if(children != null){ childrens = children.size(); } if(childrens>99){//節點個數達到100個就執行刪除操作 try { zk.create(lock+"/dodelete", "dodelete".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); onDelete = true; } catch (KeeperException e1) { if(e1 instanceof NodeExistsException){ logger.info("already on delete!"); }else{ logger.error(e1.getMessage(),e1); } } catch (InterruptedException e1) { logger.error(e1.getMessage(),e1); } } //執行具體的任務 execuTask(trigger,triggerName,jobexecutioncontext,af); } catch (KeeperException e) { if(e instanceof NodeExistsException){ logger.info("already on do"); }else if(e instanceof ConnectionLossException){ logger.info("ConnectionLoss ,do task without registered!!"); doTask = true; }else if(e instanceof SessionExpiredException){ logger.info("session expired ,do task without registered!!"); doTask = true; try { zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher()); } catch (IOException e1) { logger.error(e1.getMessage(),e1); } }else{ logger.error(e.getMessage(),e); } } catch (InterruptedException e) { logger.error(e.getMessage(),e); } if(createSuccess && onDelete){//如果創建成功並且root下有執行刪除的權利 try { for(String str:children){ zk.delete(path+"/"+str, -1); } } catch (KeeperException e1) { logger.error(e1.getMessage(),e1); } catch (InterruptedException e1) { logger.error(e1.getMessage(),e1); }finally{ if(onDelete){ try { zk.delete(lock+"/dodelete", -1); } catch (InterruptedException e) { logger.error(e.getMessage(),e); } catch (KeeperException e) { if(e instanceof ConnectionLossException){ logger.info("ConnectionLoss ,reconnect zk!!"); try { zk.close();//人為失效,刪除dodelete節點 zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher()); } catch (InterruptedException e1) { e1.printStackTrace(); } catch (IOException e1) { e1.printStackTrace(); } }else{ logger.error(e.getMessage(),e); } } } } } //如果出現connectloss和sessionexpired 可能是網絡有點問題找不到zookeeper就不管重復問題了完成任務為最重要的 if(doTask){//如果出現connectloss和sessionexpired 就直接執行任務 execuTask(trigger,triggerName,jobexecutioncontext,af); } } }
整個過程就是:當job開始的時候去向zookeeper申請注冊,只有當注冊成功的時候才執行業務,失敗則退出job。同時由於我這里是每天循環的
定時任務所以當zookeeper下的節點數目達到一定的個數的時候加一個刪除鎖(就是向zookeeper create一個ondetele節點),同時刪除之前
的triggername節點,這樣保證了明天這些任務可以繼續完成。至此,任務重復執行的問題就解決了。下一篇博客將簡單的介紹一下zookeeper和
zookeeper的布置,雖然網上這方面東西很多,不過自己寫出來(自己實踐過可以用的),以后可以直接拿來用。。