大多數用到定時執行的功能都是用任務調度來做的,單身當碰到類似訂餐業務/購物等這種業務就不好處理了,比如購物的訂單功能,在你的訂單管理中有N個訂單,當訂單超過十分鍾未支付的時候自動釋放購物車中的商品,訂單失效。這種高頻率的延遲任務再用任務調度(定時)實現就得不償失了。推薦用Java延遲隊列來實現,DelayQueue是java.util.concurrent中提供的一個類DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中中取走。這種隊列是有序的,即對頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。
1、java延遲隊列實現方式
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; //任務線程 實現delayed接口 public class DelayItem<T extends Runnable> implements Delayed { //到期時間 private final long time; //任務對象 private final T task; //原子類 private static final AtomicLong atomic = new AtomicLong(0); private final long n; public DelayItem(long timeout, T t) { this.time = System.nanoTime() + timeout; this.task = t; this.n = atomic.getAndIncrement(); } //返回與此對象相關的剩余延遲時間,以給定的時間單位表示 public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if(other == this) { return 0; } if(other instanceof DelayItem) { DelayItem<?> x = (DelayItem<?>)other; long diff = tiem - x.time; if(diff < 0) { return -1; }else if(diff > 0) { return 1; }else if( n < x.n){ return -1; }else { return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public T getTask(){ return this.task; } @Override public int hashCode(){ return task.hashCode(); } @Override public boolean equals(Object object){ if(object instanceof DelayItem){ return object.hashCode() == hashCode() ? true : false; } return false; } } //管理延遲任務的類 import java.util.Map; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; //延遲隊列存放有效期對象 public class ItemQueueThread { private static final Logger logger = Logger.getLogger(this.class); private ItemQueueThread(){} //延遲加載(線程安全) private static class LazyHolder{ private static ItemQueueThread itemQueueThread = new ItemQueueThread(); } public static ItemQueueThread getInstance(){ return LazyHolder.itemQueueThread; } //緩存線程池 ExecutorService executor = Executors.newCacheThreadPool(); //線程 private Thread daemonThead; //初始化線程 public void init() { daemonThread = new Thread(() -> { try{ execute(); }cathc(InterruptedException e){ e.printStackTrace(); logger.info(e.getMessage()); } }); System.out.println("init......start"); daemonThread.start(); } private void execute() throws InterrupedException { while(true) { Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces(); System.out.println("線程數...." + map.size()); System.out.println(System.currentTimeMills()); System.out.println(item.size()); System.out.println("線程狀態----" + Thread.currentThread().getState()); try{ //從延遲隊列中取值,如果沒有對象過期責隊列一直等待 DelayItem<?> t1 = item.take(); if(t1 != null){ Runnable task = t1.getTask(); if(task == null){ continue; } executor.execute(task); } }catch(Exception e) { e.printStackTrace(); logger.info(e.getMessage()); } } } //創建空的延遲隊列 private DelayQueue<DelayItem<?>> item = new DelayQueue<>(); //往隊列中添加任務 public void put(long time, Runnable task, TimeUnit timeUnit){ //轉換成ns long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit); DelayItem<?> k = new DelayItem(nanoTime,task); item.put(k);_: } //結束任務 public boolean endTask(DelayItem<Runnable> task){ return item.remove(task); } } //把需要延遲的功能代碼單獨抽取出來作為一個類,繼承Runnable實現run方法 public class DataDemo implements Runnable { int a = -1; public DataDemo(int i){ this.a = i; } @Override public void run(){ System.out.println("超時,要撤銷訂單...." + a); } } //test class import java.util.Random; import java.util.concurrent.TimeUnit; public class DelayTest{ public static void main(String[] args){ ItemQueueThread ith = ItemQueueThread.getInstance(); ith.init(); Random r = new Random(); for(int i = 0; i < 5; i++){ int a = r.nextInt(20); System.out.println("預先知道等待時間:" + a); DataDemo dd = new DataDemo(a);//創建一個任務對象 ith.put(a,dd,TimeUnit.SECONDS);//將任務添加到隊列中 } } } //注意ItemQueueThread的init方法,要在容器初始化的時候就要執行,或在第一次put延遲對象任務之前就要初始化完成,當設定的延遲時間到期時會執行任務對象中的run }
2、定時任務實現方式
引入quartz包
import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; public class DeplayQuartzImpl implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { //doing somethings
System.out.println("going scan database..."); } public static void main(String[] args) throws SchedulerException { //create task JobDetail jobDetail = JobBuilder.newJob(DeplayQuartzImpl.class).withIdentity("job_1", "group_1").build(); //create trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger_1", "group_trigger") .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); //put task into trigger scheduler.scheduleJob(jobDetail, trigger); scheduler.start(); } }
實現java的Delayed接口簡介版
import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class JdkDelayImpl implements Delayed{ private String orderId; private long timeout; public JdkDelayImpl(String orderId, long timeout) { this.orderId = orderId; this.timeout = timeout + System.nanoTime(); } @Override public int compareTo(Delayed delayed) { if(delayed == this) return 0; JdkDelayImpl t = (JdkDelayImpl) delayed; long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } @Override public long getDelay(TimeUnit unit) { return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS ); } void print() { System.out.println(orderId + " order will being delete..."); } public static void main(String[] args) { List<String> list = new ArrayList<String>(); list.add("001"); list.add("002"); list.add("003"); list.add("004"); list.add("005"); DelayQueue<JdkDelayImpl> queue = new DelayQueue<JdkDelayImpl>(); long start = System.currentTimeMillis(); for(int i = 0; i < 5; i++) { queue.put(new JdkDelayImpl(list.get(i), TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS))); try { queue.take().print(); System.out.println("after: " + (System.currentTimeMillis() - start) + " milliSeconds"); } catch (Exception e) { } } } }
3、基於netty方式,添加netty包
import java.util.concurrent.TimeUnit; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; public class NettyDeplayImpl { static class MyTimeTask implements TimerTask { boolean flag; public MyTimeTask(boolean flag) { this.flag = flag; } @Override public void run(Timeout arg0) throws Exception { System.out.println("going to delete order..."); this.flag = false; } } public static void main(String[] args) { MyTimeTask timeTask = new MyTimeTask(true); Timer timer = new HashedWheelTimer(); timer.newTimeout(timeTask, 5, TimeUnit.SECONDS); int i = 1; while(timeTask.flag) { try { Thread.sleep(1000); } catch (Exception e) { } System.err.println(i + " seconds gone"); i++; } } }