1 前言
根據 Quartz 的設計,一個 Job 可以綁定多個 Trigger,必然會遇到並發的問題。
2 並發
2.1 復現
讓我們編寫一個並發的例子:
1 /** 2 * @author pancc 3 * @version 1.0 4 */ 5 public class AcceptConcurrentDemo { 6 7 public static void main(String[] args) throws SchedulerException, InterruptedException { 8 JobDetail detail = JobBuilder.newJob(AcceptConcurrentJob.class) 9 .withIdentity("detail", "group0") 10 .build(); 11 12 13 Trigger trigger = TriggerBuilder.newTrigger() 14 .withIdentity("ben_trigger") 15 .usingJobData("name", "ben") 16 .startNow() 17 .build(); 18 19 Trigger triggers = TriggerBuilder.newTrigger() 20 .withIdentity("mike_trigger") 21 .usingJobData("name", "mike") 22 .forJob("detail", "group0") 23 .startNow() 24 .build(); 25 26 27 Scheduler scheduler = new StdSchedulerFactory().getScheduler(); 28 29 scheduler.start(); 30 scheduler.scheduleJob(detail, trigger); 31 scheduler.scheduleJob(triggers); 32 /* 33 * 6 秒鍾后關閉 34 */ 35 Thread.sleep(6_000); 36 scheduler.shutdown(); 37 } 38 39 @Data 40 public static class AcceptConcurrentJob implements Job { 41 private String name; 42 43 @Override 44 public void execute(JobExecutionContext context) { 45 try { 46 System.out.printf("i am %s \n", name); 47 Thread.sleep(2_000); 48 } catch (InterruptedException e) { 49 e.printStackTrace(); 50 } 51 } 52 } 53 }
請注意上邊的 Details 的 Identity ,設置為 group0.detail,同時我們創建了兩個 Trigger,第二個 trigger 在創建的時候通過指定 Identity 綁定到了目標 Job,接着提交這個 Job,與兩個 Trigger ,可以看到兩個觸發器同時出發了 Job 的 execute 方法
上邊的代碼也可以簡化為以下形式:
1 /** 2 * @author pancc 3 * @version 1.0 4 */ 5 public class AcceptConcurrentDemo { 6 7 public static void main(String[] args) throws SchedulerException, InterruptedException { 8 JobDetail detail = JobBuilder.newJob(AcceptConcurrentJob.class) 9 .withIdentity("detail", "group0") 10 .build(); 11 12 13 Trigger trigger = TriggerBuilder.newTrigger() 14 .withIdentity("ben_trigger") 15 .usingJobData("name", "ben") 16 .startNow() 17 .build(); 18 19 Trigger triggers = TriggerBuilder.newTrigger() 20 .withIdentity("mike_trigger") 21 .usingJobData("name", "mike") 22 .startNow() 23 .build(); 24 25 26 Scheduler scheduler = new StdSchedulerFactory().getScheduler(); 27 28 scheduler.start(); 29 scheduler.scheduleJob(detail, Sets.newHashSet(trigger,triggers),true); 30 /* 31 * 6 秒鍾后關閉 32 */ 33 Thread.sleep(6_000); 34 scheduler.shutdown(); 35 } 36 37 @Data 38 public static class AcceptConcurrentJob implements Job { 39 private String name; 40 41 @Override 42 public void execute(JobExecutionContext context) { 43 try { 44 System.out.printf("i am %s \n", name); 45 Thread.sleep(2_000); 46 } catch (InterruptedException e) { 47 e.printStackTrace(); 48 } 49 } 50 } 51 }
2.2 避免並發
為了避免並發,我們可以使用官方提供的注解 @DisallowConcurrentExecution,通過在 類上增加這個注解,我們可以觀察到第二個 trigger 進行了排隊處理:
1 /** 2 * @author pancc 3 * @version 1.0 4 */ 5 public class RejectConcurrentDemo { 6 7 public static void main(String[] args) throws SchedulerException, InterruptedException { 8 JobDetail detail = JobBuilder.newJob(RejectConcurrentJob.class) 9 .withIdentity("detail", "group0") 10 .build(); 11 12 13 Trigger trigger = TriggerBuilder.newTrigger() 14 .withIdentity("ben_trigger") 15 .usingJobData("name", "ben") 16 .startNow() 17 .build(); 18 19 Trigger triggers = TriggerBuilder.newTrigger() 20 .withIdentity("mike_trigger") 21 .usingJobData("name", "mike") 22 .forJob("detail", "group0") 23 .startNow() 24 .build(); 25 26 27 Scheduler scheduler = new StdSchedulerFactory().getScheduler(); 28 29 scheduler.start(); 30 scheduler.scheduleJob(detail, trigger); 31 scheduler.scheduleJob(triggers); 32 /* 33 * 6 秒鍾后關閉 34 */ 35 Thread.sleep(6_000); 36 scheduler.shutdown(); 37 } 38 39 40 @DisallowConcurrentExecution 41 @Data 42 public static class RejectConcurrentJob implements Job { 43 private String name; 44 45 @Override 46 public void execute(JobExecutionContext context) { 47 try { 48 System.out.printf("i am %s \n", name); 49 Thread.sleep(2_000); 50 } catch (InterruptedException e) { 51 e.printStackTrace(); 52 } 53 } 54 } 55 }
3 避免並發的原理探索
讓我們找到 JobStore 的實現類,在這里是 RAMJobStore,點進去方法 org.quartz.simpl.RAMJobStore#acquireNextTriggers,可以看到這個方法的某個塊:
通過對 Job 類上的是否存在 DisallowConcurrentExecution 注解,如果存在,表示拒絕並發執行 execute 方法。如果與即將執行的 Trigger 調用同一個 JobDetail 對象,則將當前 trigger 放入等待列表。
之后當前一個 Trigger 執行完畢,將等待中的 Trigger 重新加回去 RAMJobStore 持有的 trigger 列表中,等待下一次調用發生。