Java中的阻塞隊列DelayQueue


目錄

1.DelayQueue介紹

DelayQueue

DelayQueue用於存放擁有過期時間的元素阻塞隊列,只有當元素過期,才能從隊列中取出。什么時候元素過期?當元素的getDelay()方法返回值小於等於0。

DelayQueue內部用PriorityQueue實現,可認為DelayQueue=BlockingQueue+PriorityQueue+Delayed

1.1Delayed接口

public interface Comparable<T> {
    public int compareTo(T o);
}
​
//實現該接口的對象帶有時間延遲屬性,只有當對象的時間延遲過期,才能對對象進行操作
public interface Delayed extends Comparable<Delayed> {
  //返回當前對象的剩余時間延遲。0或者負數表示對象已過期。
    long getDelay(TimeUnit unit);
}

因此,DelayQueue中存放的對象需要實現compareTo()方法和getDelay()方法。

 

2.DelayQueue源碼分析

2.1領導者/追隨者(Leader/Follower)模式

2.1.1半同步/半異步(Half-Sync/Half-Async)模式

在我們編寫網絡服務程序時,比較簡單的方式是per client per thread模型,這種模型當客戶端連接數快速增長是就會出現性能瓶頸,我們不能不斷的開啟新的線程,當然我們肯定是會使用線程池,但是線程的管理和頻繁的線程調度也會影響性能。

 java 1.4給我們帶來了NIO編程模型,由於它的讀寫操作都是無阻塞的,這樣使我們能夠只用一個線程處理所有的IO事件,當然我們不會真的只用一個線程來處理,比較常見的編寫NIO網絡服務程序的模型是半同步-半異步模式,其實現原理大體上是單線程同步處理網絡IO請求,當有請求到達時,將該請求放入一個工作隊列中,由另外的線程處理,主線程繼續等待新的網絡IO請求。

這種編程模型的缺點主要是:

1).使用工作隊列帶來的內存的動態分配問題。
2).每次網絡IO請求,總是分配給另外一個線程處理,這樣頻繁的線程的context switching也會影響性能

2.1.2領導者/追隨者(Leader/Follower)模式

針對以上缺點,解決的方案是使用Leader-Follower線程模型,它的基本思想是所有的線程被分配成兩種角色: 

Leader和Follower,一般同時只有一個Leader線程,所有的Follower線程排隊等待成為Leader線程,線程池啟動時自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程,並將其提拔為新的Leader,然后自己去處理這個網絡事件,處理完畢后加入Follower線程等待隊列,等待重新成為Leader. 

關鍵點:
(1)只有1個leader線程,可以有若干的follower線程;
(2)線程有3種狀態:leading/processing/following;
(3)有一把鎖,搶到的就是leading;
(4)事件來到時,leading線程會對其進行處理,從而轉化為processing狀態;
(5)處理完成后,嘗試搶鎖,搶到則又變為leading,否則變為followering;
(6)followering不干事,就是搶鎖,力圖成為leading;

 

 

這種線程模型的優點:

1)這個線程模型主要解決了內存的動態分配問題,我們不需要不斷的將到來的網絡IO事件放入隊列,

2)並且等待網絡IO事件的線程在等到網絡事件的發生后,是自己處理的這個事件,也就是說沒有context switching的過程.

Leader-Follower模式

打個比方:

  1. 話說一個地方有一群有組織無紀律的人從事山賊這個很有前途的職業。

  2. 一般就是有一個山賊在山路口察看,其他人在林子里面睡覺。

  3. 假如發現有落單的過往客商,望風的山賊就會弄醒一個睡覺的山賊,然后自己去打劫。

  4. 醒來的山賊接替作望風的事情。

  5. 打劫的山賊搞定以后,就會去睡覺,直到被其他望風的山賊叫醒來望風為止。

  6. 有時候過往客商太多,而山賊數量不夠,有些客商就能僥幸平安通過山嶺(所有山賊都去打劫其他客商了)。

計算機版本:

  1. 有若干個線程(一般組成線程池)用來處理大量的事件

  2. 有一個線程作為領導者,等待事件的發生;其他的線程作為追隨者,僅僅是睡眠。

  3. 假如有事件需要處理,領導者會從追隨者中指定一個新的領導者,自己去處理事件。

  4. 喚醒的追隨者作為新的領導者等待事件的發生。

  5. 處理事件的線程處理完畢以后,就會成為追隨者的一員,直到被喚醒成為領導者。

  6. 假如需要處理的事件太多,而線程數量不夠(能夠動態創建線程處理另當別論),則有的事件可能會得不到處理。

簡單理解,就是最多只有一個線程在處理,其他線程在睡眠。

2.2創建

public DelayQueue() {}
 
//內部使用優先級隊列PriorityQueue來實現對元素的排序,PriorityQueue默認為小根堆
private final PriorityQueue<E> q = new PriorityQueue<E>();
//
private final transient ReentrantLock lock = new ReentrantLock();
//與鎖關聯的available條件。
a.當隊頭元素延遲時間過期,變得可用,b.或者新的線程需要成為leader,通過調用available.signal()方法喚醒阻塞的線程。
private final Condition available = lock.newCondition();
​
//使用Leader-Follower模式來等待隊頭的元素變為available(即時間延遲過期),最小化等待時間。
//1.當一個線程成為leader,它只等待下一個時間延遲過期的元素,其他線程則無限等待。
//2.lead線程必須在從take()、poll()方法返回之前,喚醒某個線程,除非在此期間某個線程成為leader。
//3.無論何時隊頭元素被更早過期的元素替代,leader屬性將通過重置為null使其失效,並且某個等待的線程(不一定是當前leader)被喚醒。
//4.所以,線程們在等待的時候必須等待着獲得或失去領導權。
/**
Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;


2.3 put方法

//插入元素到隊列中
//由於隊列是無界的,此方法永遠不會阻塞
public void put(E e) {
    offer(e);
}
​
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
  //獲得鎖
    lock.lock();
    try {
    //元素入隊
        q.offer(e);
    //如果入隊后,隊頭元素(即小根堆的堆頂元素)為當前元素
        if (q.peek() == e) {//不是很理解,為什么是這個判斷條件
      //將leader置為null,並喚醒某個阻塞的線程,讓其成為leader
            leader = null;
      //喚醒阻塞在available上的線程
            available.signal();
        }
        return true;
    } finally {
    //釋放鎖
        lock.unlock();
    }
} 

 

2.4take方法

//移除隊頭元素。
//如果隊頭元素(即時間延遲最小的元素)時間延遲未過期,阻塞等待,直到其過期
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  //獲得鎖
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
      //如果隊列為空,當前線程在available上阻塞等待
            if (first == null)
                available.await();
            else {
        //如果隊頭元素的時間延遲已過期,元素出隊
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
        //如果隊頭元素的時間延遲未過期
                first = null; // don't retain ref while waiting
        //【使用leader/follower模式】
                if (leader != null) //如果leader為null,當前線程阻塞等待
                    available.await();
                else {//如果leader不為空,當前線程成為leader,並等待delay時間
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
            //等待delay時間后,隊頭元素的時間延遲已過期,當前leader任務完成,將leader置為null
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
    //如果隊列中有元素,且leader為null,將喚醒某個阻塞的線程,讓其成為leader
        if (leader == null && q.peek() != null)
            available.signal();
    //釋放鎖
        lock.unlock();
    }
}

 

3.DelayQueue的應用場景

3.1使用DelayQueue模擬實現session

3.1.1場景分析

Session有以下特點: 

1、以唯一鍵key來插入和獲取對象 

2、Session有自動過期時間,到期后系統會自動清理。 

3、每次獲取session對象,該key值所在的對象生命周期重置,過期時間從當前時間開始重新計算。

3.1.2實現思路

1、對於特點1,采用hashmap來保存session存儲對象 

2、對於特點2,3,利用DelayQueue延遲隊列來實現:

創建一個延遲隊列ptrqueue,每當有session插入hashmap時,就同步往ptrqueue隊列插入一個與session的key同名的指針對象(該指針實現了Delayed接口,通過key值指向hashmap中對應元素);

每當讀取session操作時,就更新ptrqueue隊列中對應指針的到期時間;

專門開啟一個守護線程(阻塞式)從ptrqueue隊列中獲取過期的指針,再根據指針刪除hashmap中對應元素

3.1.3實現代碼

public class DelayedDemo {
​
    public static void main(String[] args) throws InterruptedException {
        TSession sessionService=new TSession();
        sessionService.ConnectionAndStart();
        /*模擬客戶端調用*/
        sessionService.put("userIdentity", "tangwenming");
        Thread.sleep(4000);
        sessionService.put("userGroup", "super");
​
        sessionService.get("userIdentity");
​
        sessionService.get("userGroup");
        Thread.sleep(2000);
        sessionService.get("userGroup");
        Thread.sleep(2000);
        sessionService.get("userGroup");
        Thread.sleep(2000);
        sessionService.get("userGroup");
        Thread.sleep(5500);
        sessionService.get("userGroup");
        sessionService.get("userIdentity");
    }
​
}
class TSession{
    /*從conf中獲取session自動過期時間,單位:秒*/
    private static int liveTime=Integer.valueOf(getConfig("livetime"));
    /*指針保存隊列*/
    DelayQueue<Itemptr> ptrqueue=new DelayQueue<Itemptr>();
    /*Session數據存儲map*/
    public ConcurrentHashMap<String, Object> datapool = new ConcurrentHashMap<String, Object>();
​
    public void put(String key,Object value){
        /*插入session數據池*/
        datapool.put(key, value);
        /*插入對應key值的指針*/
        Itemptr ptr=new Itemptr(key,liveTime);
        ptrqueue.remove(ptr);/*更新過期時間step1*/
        ptrqueue.put(ptr);/*更新過期時間step2*/
        System.out.println("插入"+key+":"+value+",生命周期初始化:"+liveTime+"秒");
    }
    public Object get(String key){
        Object resultObject= datapool.get(key);
        if(resultObject!=null){
            /*刷新對應key值的指針*/
            Itemptr ptr=new Itemptr(key,liveTime);
            ptrqueue.remove(ptr);
            ptrqueue.put(ptr);
            System.out.println("獲取"+key+"成功:"+resultObject+",生命周期重新計算");
        }else{
            /*從session池中返回對象*/
            System.out.println("獲取"+key+"失敗:"+resultObject+"。對象已過期");
        }
        return resultObject;
    }
    private void sesseion_gc(){
        Itemptr ptr;
        while (true){
            try {
                /*阻塞線程等待直到獲取超時的元素指針
                 *獲取成功后從隊列中刪除節點
                  在while true循環塊中確實比非阻塞式的poll節省資源*/
                ptr = ptrqueue.take();
                /*根據指針刪除session對象*/
                datapool.remove(ptr.getKey());
                System.out.println("刪除過期key="+ptr.getKey()+"的元素");
                /*降低cpu負擔,根據業務需要和硬件調整*/
                Thread.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    private static String getConfig(String key){
        return "5";/*單位:秒*/
    }
    /*以守護進程運行gc回收方法*/
    public void ConnectionAndStart(){
        Thread sessionThread=new Thread(){
            @Override
            public void run(){
                sesseion_gc();
                }
        };
        sessionThread.setDaemon(true);
        sessionThread.start();
    }
}
class Itemptr implements Delayed{
​
    private String key;
    public String getKey() {
        return key;
    }
​
    private long liveTime ;
    private long removeTime;
​
    public long getRemoveTime() {
        return removeTime;
    }
    public Itemptr(String key,long liveTime){
        this.key=key;
        this.liveTime = liveTime;
        this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.SECONDS) + System.nanoTime();
    }
    @Override
    public int compareTo(Delayed o) {
        if (o == null) return 1;
        if (o == this) return  0;
        if (o instanceof Itemptr){
            Itemptr ptr = (Itemptr)o;
            /*用過期時間排序,確定優先級。
             * DelayQueue按照升序(由小到大)排序的,也就是臨近當前時間的優先出隊*/
            if (removeTime > ptr.getRemoveTime() ) {
                return 1;
            }else if (removeTime == ptr.getRemoveTime()) {
                return 0;
            }else {
                return -1;
            }
        }
        return 1;
    }
​
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(removeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
​
    /*
     * 隊列remove()判斷時使用equals比較:指針隊列只需要判斷key字符相同即可
     * remove(Object o)
       * Removes a single instance of the specified element from this queue, if it is present, whether or not it has expired.
     */
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof Itemptr) {
            if (obj==this)
                return true;
            return ((Itemptr)obj).getKey() == this.getKey() ?true:false;
        }
        return false;
    }
​
​
}
 
 

 

4.參考資料

http://www.cs.wustl.edu/~schmidt/POSA/POSA2/ 《面向模式的軟件體系結構2:用於並發和網絡化對象的模式》官網

http://blog.csdn.net/jmxyandy/article/details/7338896 Leader Follower線程模型簡單實現

http://www.cnblogs.com/duzouzhe/archive/2009/09/28/1575813.html 領導者/追隨者模式

http://blog.csdn.net/goldlevi/article/details/7705180 Leader/Follower多線程網絡模型介紹

http://blog.csdn.net/soonfly/article/details/58599087 Java多線程/並發27、DelayQueue延遲隊列模擬實現Session

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM