1.簡介:
- DelayQueue是一個無界阻塞隊列,只有在延遲期滿時,才能從中提取元素。
- 隊列的頭部,是延遲期滿后保存時間最長的delay元素。
2.使用場景:
- 緩存系統設計:使用DelayQueue保存緩存元素的有效期,用一個線程循環查詢DelayQueue,一旦從DelayQueue中取出元素,就表示有元素到期。
- 定時任務調度:使用DelayQueue保存當天要執行的任務和執行的時間,一旦從DelayQueue中獲取到任務,就開始執行,比如Timer,就是基於DelayQueue實現的。
3.使用條件:
- 存放DelayQueue的元素,必須繼承Delay接口,Delay接口使對象成為延遲對象。
- 該接口強制實現兩個方法:
1.CompareTo(Delayed o):用於比較延時,隊列里元素的排序依據,這個是Comparable接口的方法,因為Delay實現了Comparable接口,所以需要實現。
2.getDelay(TimeUnit unit):這個接口返回到激活日期的--剩余時間,時間單位由單位參數指定。 - 此隊列不允許使用null元素。
BlockingQueue中take、offer、put、add
| | 拋出異常 | 特殊值 | 阻塞 | 超時 |
| 插入 | add(e)
| offer(e)
| put(e)
| offer(e, time, unit)
|
| 移除 | remove()
| poll()
| take()
| poll(time, unit)
|
| 檢查 | element()
| peek()
| 不可用 | 不可用 |
- offer:插入到隊列,成功返回true,如果當前沒有可用空間,返回false。
- add:入隊,如果沒有可用空間,拋出異常,IllegalStateException。
- put:插入隊列,等待可用空間,阻塞,直到能夠有空間插入元素。
- take:獲取並移除隊列頭部,在元素變的可用之前一直等待。
三、實際開發中的應用
簡單的延時隊列要有三部分:第一實現了Delayed接口的消息體、第二消費消息的消費者、第三存放消息的延時隊列
1.消息體。實現接口 Delayed ,重寫方法 compareTo 和 getDelay
package com.chitic.supplywater.app.aop; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Worker<T> implements Delayed { //消息類型 private int type; // 消息內容 private T body; //創建時刻的時間 private long start = System.currentTimeMillis(); //延遲時長,這個是必須的屬性因為要按照這個判斷延時時長。 private long excuteTime; // 延遲任務是否到時就是按照這個方法判斷如果返回的是負數則說明到期 // 否則還沒到期 @Override public long getDelay(TimeUnit unit) { return unit.convert((start + this.excuteTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } /** *比較時間,以最接近執行時間的任務,排在最前面 */ @Override public int compareTo(Delayed delayed) { Worker msg = (Worker) delayed; return (int)(((start + this.excuteTime) - System.currentTimeMillis()) -((msg.start + msg.excuteTime) - System.currentTimeMillis())) ; } public int getType() { return this.type; } public T getBody() { return this.body; } public Worker(int type, T body, long excuteTime) { this.type = type; this.body = body; this.excuteTime = excuteTime; } }
管理器
package com.chitic.supplywater.app.aop; import java.util.concurrent.DelayQueue; public class DelayQueueManager { private static DelayQueue<Worker> delayQueue; static { // 創建延時隊列 delayQueue = new DelayQueue<>(); } public static DelayQueue<Worker> getDelayQueue(){ return delayQueue; } public static void putDelayQueue(Worker worker){ delayQueue.put(worker); } }
2,消費者
package com.chitic.supplywater.app.aop; import java.util.concurrent.DelayQueue; public class ConsumerDelayQueue implements Runnable { // 延時隊列 ,消費者從其中獲取消息進行消費 private DelayQueue<Worker> queue; public ConsumerDelayQueue(DelayQueue<Worker> queue) { this.queue = queue; } @Override public void run() { while (true) { try { Worker take = queue.take(); System.out.println("消費消息類型:" + take.getType() + " 消息體:" + take.getBody()+":"+System.currentTimeMillis()); //TODO 此處可以進行推送等一系列操作 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } }
3,存放消息的延時隊列
@Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test public void test() throws InterruptedException { Order order = new Order(); order.setOrderId("9000"); order.setOrderName("買個蘋果12"); Worker<Order> orderWorker = new Worker<>(1, order, 5000); DelayQueueManager.putDelayQueue(orderWorker); threadPoolTaskExecutor.execute(new ConsumerDelayQueue(DelayQueueManager.getDelayQueue())); //為了方便看效果 for (int i = 1; i <= 10; i++) { Thread.sleep(1000); System.out.println("========================="+ i); } }
線程池配置
package com.chitic.supplywater.app.aop; import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * @Description //TODO 線程池配置 定時任務,異步任務 * @Author GaoX * @Date 2020/12/15 16:07 */ @Configuration @EnableAsync @EnableScheduling @Slf4j public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer { /** * @description : 定時任務使用的線程池 * @params: [] * @return: org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler * @author: GaoX * @date: 2020/12/15 16:07 */ @Bean(destroyMethod = "shutdown", name = "taskScheduler") public ThreadPoolTaskScheduler taskScheduler(){ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(10); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(600); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } /** * @description : 異步任務執行線程池 * @params: [] * @return: org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor * @author: GaoX * @date: 2020/12/15 16:10 */ @Bean(name = "asyncExecutor") public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線程數量 executor.setCorePoolSize(8); //隊列中最大任務數 executor.setQueueCapacity(20); //線程空閑后最大存活時間 executor.setKeepAliveSeconds(100); //最大線程數量 executor.setMaxPoolSize(10); //線程名稱前綴 executor.setThreadNamePrefix("taskExecutor-"); //當達到最大線程數時如何處理新任務 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { ThreadPoolTaskScheduler taskScheduler = taskScheduler(); scheduledTaskRegistrar.setTaskScheduler(taskScheduler); } @Override public Executor getAsyncExecutor() { return asyncExecutor(); } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (throwable, method, objects) -> { log.error("異步任務執行出現異常, message {}, emthod {}, params {}", throwable, method, objects); }; } }