DelayQueue源码解析


DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。

DelayQueue主要用于两个方面:
- 缓存:清掉缓存中超时的缓存数据
- 任务超时处理 

class DelayedEle implements Delayed {

    private final long delayTime; //延迟时间
    private final long expire;  //到期时间
    private String data;   //数据

    public DelayedEle(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay; 
    }

    /**
     * 剩余时间=到期时间-当前时间
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 优先队列里面优先级规则
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delayTime);
        sb.append(", expire=").append(expire);
        sb.append(", data='").append(data).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

看了DelayQueue的内部结构就对上面几个关键点一目了然了,但是这里有一点需要注意,DelayQueue的元素都必须继承Delayed接口。同时也可以从这里初步理清楚DelayQueue内部实现的机制了:以支持优先级无界队列的PriorityQueue作为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高


 基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。

DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据为空的操作(消费者)才会被阻塞
放的时候不用因为满了而阻塞,取的时候空了会阻塞等待有时间没到也会阻塞等到,自己也可以指定超时时间。


 只有一个getDelay(TimeUnit)方法,该方法返回与此对象相关的的剩余时间。同时我们看到Delayed接口继承自Comperable接口,所以实现Delayed接口的类还必须要定义一个compareTo方法,该方法提供与此接口的getDelay方法一致的排序。

DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。

q:优先队列,用于存储元素,并按优先级排序
leader:用于优化内部阻塞通知的线程,第一个阻塞等待的线程。
以支持优先级的PriorityQueue无界队列作为一个容器,因为元素都必须实现Delayed接口,可以根据元素的过期时间来对元素进行排列,因此,先过期的元素会在队首,每次从队列里取出来都是最先要过期的元素。

leader是等待获取队列元素的线程,应用主从式设计减少不必要的等待。如果leader不等于空,表示已经有线程在等待获取队列的元素。所以,通过await()方法让出当前线程等待信号。如果leader等于空,则把当前线程设置为leader,当一个线程为leader,它会使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。 

private final PriorityQueue<E> q = new PriorityQueue<E>();
private final transient ReentrantLock lock = new ReentrantLock();
private Thread leader = null;
private final Condition available = lock.newCondition();

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);  //放一个元素
            if (q.peek() == e) {  //队列的第0个元素是这个元素,放进去的元素成为第0个元素。最大堆第0和元素是最大的,依次是第23大,第4567大,
                leader = null;    //  leader 线程为空
                available.signal();   //其他线程唤醒,如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)   //时间不到不走
            return null;
        else
            return q.poll();   //  取出优先级队列里面第0个元素,优先级队列是根据时间排序了的
    } finally {
        lock.unlock();
    }
}

线程阻塞地方,与代码对应关系。 

推论1:有leader线程等待的时候,新的线程要取,必然加入await队列排队。所有已经await的线程,唤醒了非leader线程就会继续await。每次唤醒一个处于await的线程。同时之能一个线程取就是leader去取。
await释放锁别的线程可以执行,singal不会释放锁别的线程不执行。
推论2:take(),put()方法全部锁住,只能一个线程调用take或put。里面有await就不行了,await后别的线程就可以进来take,put方法,只要不是执行take里面的await方法,那么就只能一个线程执行take,put方法。
推论3:如果都不设置等待时间,那么就没有优先级,只有等到放的时候  或者  成功取到的线程唤醒,都设置等待时间,时间到了一起抢。
推论4:取的时候为空等待,时间到了直接返回,有元素但是时间没到,就设置leader
推论5:等待取的线程,有一个是leader,就是由第一优先权的线程也是第一个尝试获取的线程。所有的等待取的线程中,下一次不一定是ledaer去取,如果唤醒了非leader线程,那也不行,要继续等到,一定要leader线程唤醒后去取,其他线程才能去取,然后成为新的leader。

 方法调用是根据线程随机来的,一旦走进去方法就根据代码来了。

 

public E take() throws InterruptedException {
    // 获取全局独占锁,只能同时一个线程取或者放,唤醒只是唤醒一个线程。 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 获取队首元素
            E first = q.peek();
            // 队首为空,则阻塞当前线程
            if (first == null) 
                available.await();//等待offer方法唤醒,再次循环去peek,
            else {
                // 获取队首元素的超时时间
                long delay = first.getDelay(NANOSECONDS);//返回还需等待3秒
                // 已超时,直接出队
                if (delay <= 0)
                    return q.poll();
                // 释放first的引用,避免内存泄漏
                first = null; 
                //有元素,但是元素没到时间  
                // leader != null表明有其他线程在等待,阻塞当前线程,自己不是第一个等待线程
                if (leader != null)
                    available.await();  //等待获取了头元素的线程唤醒,或者offer方法唤醒,再次循环去peek,
                else {
                    //没有其他线程在等待, leader指向当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 超时阻塞,只需要等3秒,醒了之后也要重新获取。
                        available.awaitNanos(delay);
                    } finally {
                        // 释放leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
        if (leader == null && q.peek() != null)
            available.signal();
        // 释放全局独占锁
        lock.unlock();
    }
}

这里为什么如果不设置first = null,则会引起内存泄漏呢?线程A到达,列首元素没有到期,设置leader = 线程A,这是线程B来了因为leader != null,则会阻塞,线程C一样。假如线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,但是问题是它还被线程B、线程C持有着,所以不会回收,这里只有两个线程,如果有线程D、线程E…呢?这样会无限期的不能回收,就会造成内存泄漏。 

 try里面有return,也要先执行finally在执行return。Condition.await(16,TimeUnit.SECONDS):睡醒之后,获得了锁就执行,没有获得锁阻塞。睡的中间被人singnal了,可以醒来,有锁执行,无锁阻塞

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM