Java 定時任務 Quartz (三)—— 並發


1 前言

根據 Quartz 的設計,一個 Job 可以綁定多個 Trigger,必然會遇到並發的問題。

2 並發

2.1 復現

讓我們編寫一個並發的例子:

 1 /**
 2  * @author pancc
 3  * @version 1.0
 4  */
 5 public class AcceptConcurrentDemo {
 6 
 7     public static void main(String[] args) throws SchedulerException, InterruptedException {
 8         JobDetail detail = JobBuilder.newJob(AcceptConcurrentJob.class)
 9                 .withIdentity("detail", "group0")
10                 .build();
11 
12 
13         Trigger trigger = TriggerBuilder.newTrigger()
14                 .withIdentity("ben_trigger")
15                 .usingJobData("name", "ben")
16                 .startNow()
17                 .build();
18 
19         Trigger triggers = TriggerBuilder.newTrigger()
20                 .withIdentity("mike_trigger")
21                 .usingJobData("name", "mike")
22                 .forJob("detail", "group0")
23                 .startNow()
24                 .build();
25 
26 
27         Scheduler scheduler = new StdSchedulerFactory().getScheduler();
28 
29         scheduler.start();
30         scheduler.scheduleJob(detail, trigger);
31         scheduler.scheduleJob(triggers);
32         /*
33          * 6 秒鍾后關閉
34          */
35         Thread.sleep(6_000);
36         scheduler.shutdown();
37     }
38 
39     @Data
40     public static class AcceptConcurrentJob implements Job {
41         private String name;
42 
43         @Override
44         public void execute(JobExecutionContext context) {
45             try {
46                 System.out.printf("i am %s \n", name);
47                 Thread.sleep(2_000);
48             } catch (InterruptedException e) {
49                 e.printStackTrace();
50             }
51         }
52     }
53 }

 

請注意上邊的 Details 的 Identity ,設置為 group0.detail,同時我們創建了兩個 Trigger,第二個 trigger 在創建的時候通過指定 Identity 綁定到了目標 Job,接着提交這個 Job,與兩個 Trigger ,可以看到兩個觸發器同時出發了 Job 的 execute 方法

 

上邊的代碼也可以簡化為以下形式:

 1 /**
 2  * @author pancc
 3  * @version 1.0
 4  */
 5 public class AcceptConcurrentDemo {
 6 
 7     public static void main(String[] args) throws SchedulerException, InterruptedException {
 8         JobDetail detail = JobBuilder.newJob(AcceptConcurrentJob.class)
 9                 .withIdentity("detail", "group0")
10                 .build();
11 
12 
13         Trigger trigger = TriggerBuilder.newTrigger()
14                 .withIdentity("ben_trigger")
15                 .usingJobData("name", "ben")
16                 .startNow()
17                 .build();
18 
19         Trigger triggers = TriggerBuilder.newTrigger()
20                 .withIdentity("mike_trigger")
21                 .usingJobData("name", "mike")
22                 .startNow()
23                 .build();
24 
25 
26         Scheduler scheduler = new StdSchedulerFactory().getScheduler();
27 
28         scheduler.start();
29         scheduler.scheduleJob(detail, Sets.newHashSet(trigger,triggers),true);
30         /*
31          * 6 秒鍾后關閉
32          */
33         Thread.sleep(6_000);
34         scheduler.shutdown();
35     }
36 
37     @Data
38     public static class AcceptConcurrentJob implements Job {
39         private String name;
40 
41         @Override
42         public void execute(JobExecutionContext context) {
43             try {
44                 System.out.printf("i am %s \n", name);
45                 Thread.sleep(2_000);
46             } catch (InterruptedException e) {
47                 e.printStackTrace();
48             }
49         }
50     }
51 }

 

2.2 避免並發

為了避免並發,我們可以使用官方提供的注解 @DisallowConcurrentExecution,通過在 類上增加這個注解,我們可以觀察到第二個 trigger 進行了排隊處理:

 1 /**
 2  * @author pancc
 3  * @version 1.0
 4  */
 5 public class RejectConcurrentDemo {
 6 
 7     public static void main(String[] args) throws SchedulerException, InterruptedException {
 8         JobDetail detail = JobBuilder.newJob(RejectConcurrentJob.class)
 9                 .withIdentity("detail", "group0")
10                 .build();
11 
12 
13         Trigger trigger = TriggerBuilder.newTrigger()
14                 .withIdentity("ben_trigger")
15                 .usingJobData("name", "ben")
16                 .startNow()
17                 .build();
18 
19         Trigger triggers = TriggerBuilder.newTrigger()
20                 .withIdentity("mike_trigger")
21                 .usingJobData("name", "mike")
22                 .forJob("detail", "group0")
23                 .startNow()
24                 .build();
25 
26 
27         Scheduler scheduler = new StdSchedulerFactory().getScheduler();
28 
29         scheduler.start();
30         scheduler.scheduleJob(detail, trigger);
31         scheduler.scheduleJob(triggers);
32         /*
33          * 6 秒鍾后關閉
34          */
35         Thread.sleep(6_000);
36         scheduler.shutdown();
37     }
38 
39 
40     @DisallowConcurrentExecution
41     @Data
42     public static class RejectConcurrentJob implements Job {
43         private String name;
44 
45         @Override
46         public void execute(JobExecutionContext context) {
47             try {
48                 System.out.printf("i am %s \n", name);
49                 Thread.sleep(2_000);
50             } catch (InterruptedException e) {
51                 e.printStackTrace();
52             }
53         }
54     }
55 }

 

 

3 避免並發的原理探索

讓我們找到 JobStore 的實現類,在這里是 RAMJobStore,點進去方法 org.quartz.simpl.RAMJobStore#acquireNextTriggers,可以看到這個方法的某個塊:

 

 通過對 Job 類上的是否存在  DisallowConcurrentExecution 注解,如果存在,表示拒絕並發執行 execute 方法。如果與即將執行的 Trigger 調用同一個 JobDetail 對象,則將當前 trigger 放入等待列表。

 

之后當前一個 Trigger 執行完畢,將等待中的 Trigger 重新加回去 RAMJobStore 持有的 trigger 列表中,等待下一次調用發生。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM