zookeeper初體驗之關於解決quartz重復執行任務的一種思路


前陣子工作中遇到了一個很麻煩的問題。
本人所在的項目組做了一個機遇quartz集群的任務系統。通俗點講就是用quartz框架(quartz是一款能跑定時任務的框架支持復雜的時間表達式)
來執行定時任務。但是這里定時任務的並發數很多,就出現了一個問題,同一個trigger被多個機器重復的觸發了,這就造成了執行的任務數目
比預期的多很多。領導就讓我處理這個問題。
開始我以為是這個框架本身的配置有問題,結果翻了很多資料還是沒解決(這里不過多講這個,有興趣可以留言)。那么問題出在哪里呢?
quartz的任務工作的方式是這樣的。當任務達到觸發條件的時候(當這條任務滿足qrtz_cron_triggers表中定義的相關的時間表達式的時候)
qrtz_triggers表對應的這條記錄的狀態發生改變,同時下次觸發時間根據時間表達式做出改變,同時根據sched_name找到qrtz_job_details
表中的具體job去執行,下面就是具體的業務了。我這里的問題就出在同一時間內(前后相差幾ms)多台機器觸發了同一條trigger。然而這個
我是沒有辦法解決的或者說不想動quartz的源碼(有朋友能從這一步就把問題搞定的可以交流一下),所以我就順着quartz的工作流程繼續往下
到了job這里,由於多個trigger被觸發所以執行了多次job,那么我是否能通過讓他只執行一次job來防止重復執行呢。如果有一種方法可以
讓這個job執行一次就可以達到我的要求了(選擇在job這一步處理其實還是因為不想動源碼,到java這里我就好辦了)。正好我對zookeeper
有一些了解,zookeeper恰好有一種注冊機制可以解決這個問題。
回顧一下zookeeper關於節點注冊的用法:
zookeeper只可以注冊一個同名節點如果節點已經存在則返回nodeexits.
那么運用到我這個場景就是當任務進入job之后用job id(同時觸發的這幾個job的id是一樣的)去向zookeeper完成注冊,由於
id是一樣的那么只能有一個注冊成功,只要在注冊成功的條件下我才允許task。這樣就保證了不做重復的運算。
具體如下:

public class PlatformQuartzJobBean extends QuartzJobBean {
	private String path = "/zk_triggerID";
	private String lock = "/zk_lock";
	private static ZooKeeper zk = null;
	static{
		try {
			zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
		} catch (IOException e) {
			logger.error(e.getMessage(),e);
		}
	}
	//任務執行的具體邏輯
	protected void executeInternal(JobExecutionContext jobexecutioncontext)
		throws JobExecutionException {
		Trigger trigger = jobexecutioncontext.getTrigger();
		String triggerName = trigger.getKey().getName();//triggername是唯一的
		boolean createSuccess = false;
		boolean doTask = false; //不對zookeeper注冊執行任務
		int childrens = 0;
		List<String> children = null;
		boolean onDelete = false;//是否獲取刪除節點的權限
		try {
			zk.create(path+"/"+triggerName, triggerName.getBytes(), 
					Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//如果注冊出現問題說明節點存在是重復的任務
			createSuccess = true;
			children = zk.getChildren(path, false);
			if(children != null){
				childrens = children.size();
			}
			if(childrens>99){//節點個數達到100個就執行刪除操作
				try {
					zk.create(lock+"/dodelete", "dodelete".getBytes(), 
							Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
					onDelete = true;
				} catch (KeeperException e1) {
					if(e1 instanceof NodeExistsException){
						logger.info("already on delete!");
					}else{
						logger.error(e1.getMessage(),e1);
					}
				} catch (InterruptedException e1) {
					logger.error(e1.getMessage(),e1);
				}
				
			}
			//執行具體的任務
			execuTask(trigger,triggerName,jobexecutioncontext,af);
		} catch (KeeperException e) {
			if(e instanceof NodeExistsException){
				logger.info("already on do");
			}else if(e instanceof ConnectionLossException){
				logger.info("ConnectionLoss ,do task without registered!!");
				doTask = true;
			}else if(e instanceof SessionExpiredException){
				logger.info("session expired ,do task without registered!!");
				doTask = true;
				try {
					zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
				} catch (IOException e1) {
					logger.error(e1.getMessage(),e1);
				}
			}else{
				logger.error(e.getMessage(),e);
			}
		} catch (InterruptedException e) {
			logger.error(e.getMessage(),e);
		}
		if(createSuccess && onDelete){//如果創建成功並且root下有執行刪除的權利
			try {
				for(String str:children){
					zk.delete(path+"/"+str, -1);
				}
			} catch (KeeperException e1) {
				logger.error(e1.getMessage(),e1);
			} catch (InterruptedException e1) {
				logger.error(e1.getMessage(),e1);
			}finally{
				if(onDelete){
					try {
						zk.delete(lock+"/dodelete", -1);
					} catch (InterruptedException e) {
						logger.error(e.getMessage(),e);
					} catch (KeeperException e) {
						if(e instanceof ConnectionLossException){
							logger.info("ConnectionLoss ,reconnect zk!!");
							try {
								zk.close();//人為失效,刪除dodelete節點
								zk = new ZooKeeper(PropsUtil.get("zooKeeperUrl")+":"+PropsUtil.get("zooKeeperPort"), 50000,new ZKWatcher());
							} catch (InterruptedException e1) {
								e1.printStackTrace();
							} catch (IOException e1) {
								e1.printStackTrace();
							}
							
						}else{
							logger.error(e.getMessage(),e);
						}
						
					}
				}
			}
			
		}
		//如果出現connectloss和sessionexpired 可能是網絡有點問題找不到zookeeper就不管重復問題了完成任務為最重要的
		if(doTask){//如果出現connectloss和sessionexpired 就直接執行任務
			execuTask(trigger,triggerName,jobexecutioncontext,af);
		}
	
	}
	
}

整個過程就是:當job開始的時候去向zookeeper申請注冊,只有當注冊成功的時候才執行業務,失敗則退出job。同時由於我這里是每天循環的
定時任務所以當zookeeper下的節點數目達到一定的個數的時候加一個刪除鎖(就是向zookeeper create一個ondetele節點),同時刪除之前
的triggername節點,這樣保證了明天這些任務可以繼續完成。至此,任務重復執行的問題就解決了。下一篇博客將簡單的介紹一下zookeeper和
zookeeper的布置,雖然網上這方面東西很多,不過自己寫出來(自己實踐過可以用的),以后可以直接拿來用。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM