java延遲隊列


大多數用到定時執行的功能都是用任務調度來做的,單身當碰到類似訂餐業務/購物等這種業務就不好處理了,比如購物的訂單功能,在你的訂單管理中有N個訂單,當訂單超過十分鍾未支付的時候自動釋放購物車中的商品,訂單失效。這種高頻率的延遲任務再用任務調度(定時)實現就得不償失了。推薦用Java延遲隊列來實現,DelayQueue是java.util.concurrent中提供的一個類DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中中取走。這種隊列是有序的,即對頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。

1、java延遲隊列實現方式

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

//任務線程 實現delayed接口
public class DelayItem<T extends Runnable> implements Delayed {
//到期時間
private final long time;
//任務對象
private final T task;
//原子類
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;

public DelayItem(long timeout, T t) {
    this.time = System.nanoTime() + timeout;
    this.task = t;
    this.n = atomic.getAndIncrement();
}
//返回與此對象相關的剩余延遲時間,以給定的時間單位表示
public long getDelay(TimeUnit unit) {
    return unit.convert(this.time - System.nanoTime(),TimeUnit.NANOSECONDS);
}

public int compareTo(Delayed other) {
    if(other == this) {
    return 0;
    }
    if(other instanceof DelayItem) {
        DelayItem<?> x = (DelayItem<?>)other;
        long diff = tiem - x.time;
        if(diff < 0) {
            return -1;
        }else if(diff > 0) {
            return 1;
        }else if( n < x.n){
            return -1;
        }else {
            return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -           other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
    public T getTask(){
        return this.task;
    }
    @Override
    public int hashCode(){
        return task.hashCode();
    }
    @Override
    public boolean equals(Object object){
        if(object instanceof DelayItem){
            return object.hashCode() == hashCode() ? true : false;
    }
    return false;
}
}

//管理延遲任務的類
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
//延遲隊列存放有效期對象
public class ItemQueueThread {
private static final Logger logger = Logger.getLogger(this.class);
private ItemQueueThread(){}
//延遲加載(線程安全)
private static class LazyHolder{
    private static ItemQueueThread itemQueueThread = new ItemQueueThread();
}
public static ItemQueueThread getInstance(){
    return LazyHolder.itemQueueThread;
}
//緩存線程池
ExecutorService executor = Executors.newCacheThreadPool();
//線程
private Thread daemonThead;
//初始化線程
public void init() {
    daemonThread = new Thread(() -> {
        try{
            execute();
        }cathc(InterruptedException e){
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    });
    System.out.println("init......start");
    daemonThread.start();
}

private void execute() throws InterrupedException {
    while(true) {
        Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
        System.out.println("線程數...." + map.size());
        System.out.println(System.currentTimeMills());
        System.out.println(item.size());
        System.out.println("線程狀態----" + Thread.currentThread().getState());
        try{
            //從延遲隊列中取值,如果沒有對象過期責隊列一直等待
            DelayItem<?> t1 = item.take();
            if(t1 != null){
                Runnable task = t1.getTask();
                    if(task == null){
                        continue;
                }
                executor.execute(task);
            }
        }catch(Exception e) {
            e.printStackTrace();
            logger.info(e.getMessage());
        }
    }
}
//創建空的延遲隊列
private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
//往隊列中添加任務
public void put(long time, Runnable task, TimeUnit timeUnit){
    //轉換成ns
    long nanoTime = TimeUnit.NANOSECONDS.convert(time,timeUnit);
    DelayItem<?> k = new DelayItem(nanoTime,task);
    item.put(k);_:
}
//結束任務
public boolean endTask(DelayItem<Runnable> task){
    return item.remove(task);
}
}

//把需要延遲的功能代碼單獨抽取出來作為一個類,繼承Runnable實現run方法
public class DataDemo implements Runnable {
    int a = -1;
    public DataDemo(int i){
        this.a = i;
}
@Override
public void run(){
    System.out.println("超時,要撤銷訂單...." + a);
}
}

//test class
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DelayTest{
public static void main(String[] args){
    ItemQueueThread  ith = ItemQueueThread.getInstance();
    ith.init();
    Random r = new Random();
    for(int i = 0; i < 5; i++){
        int a = r.nextInt(20);
        System.out.println("預先知道等待時間:" + a);
        DataDemo dd = new DataDemo(a);//創建一個任務對象
        ith.put(a,dd,TimeUnit.SECONDS);//將任務添加到隊列中
    }
}
}
//注意ItemQueueThread的init方法,要在容器初始化的時候就要執行,或在第一次put延遲對象任務之前就要初始化完成,當設定的延遲時間到期時會執行任務對象中的run
}            

  

2、定時任務實現方式

引入quartz包

import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class DeplayQuartzImpl implements Job{

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //doing somethings 
System.out.println(
"going scan database..."); } public static void main(String[] args) throws SchedulerException { //create task JobDetail jobDetail = JobBuilder.newJob(DeplayQuartzImpl.class).withIdentity("job_1", "group_1").build(); //create trigger Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("trigger_1", "group_trigger") .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(3).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); //put task into trigger scheduler.scheduleJob(jobDetail, trigger); scheduler.start(); } }

 

實現java的Delayed接口簡介版

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class JdkDelayImpl implements Delayed{

    private String orderId;
    private long timeout;
    
    
    public JdkDelayImpl(String orderId, long timeout) {
        this.orderId = orderId;
        this.timeout = timeout + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed delayed) {
        if(delayed == this)
            return 0;
        JdkDelayImpl t = (JdkDelayImpl) delayed;
        long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS );
    }
    
    void print() {
        System.out.println(orderId + " order will being delete...");
    }
    
    
    public static void main(String[] args) {
        List<String> list = new ArrayList<String>();
        list.add("001");
        list.add("002");
        list.add("003");
        list.add("004");
        list.add("005");
        
        DelayQueue<JdkDelayImpl> queue = new DelayQueue<JdkDelayImpl>();
        long start = System.currentTimeMillis();
        for(int i = 0; i < 5; i++) {
            queue.put(new JdkDelayImpl(list.get(i), 
                    TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS)));
            try {
                queue.take().print();
                System.out.println("after: " + (System.currentTimeMillis() - start)
                        + " milliSeconds");
                
            } catch (Exception e) {
            }
        }
    }

}

 

3、基於netty方式,添加netty包

import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

public class NettyDeplayImpl {

    static class MyTimeTask implements TimerTask {

        boolean flag;
        
        public MyTimeTask(boolean flag) {
            this.flag = flag;
        }

        @Override
        public void run(Timeout arg0) throws Exception {
            System.out.println("going to delete order...");
            this.flag = false;
        }
    }
    
    public static void main(String[] args) {
        MyTimeTask timeTask = new MyTimeTask(true);
        Timer timer = new HashedWheelTimer();
        timer.newTimeout(timeTask, 5, TimeUnit.SECONDS);
        int i = 1;
        while(timeTask.flag) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
            System.err.println(i + " seconds gone");
            i++;
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM