PriorityBlockingQueue用法


摘自《Java編程思想》

package com.test.concurrency;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>
{
    private Random rand = new Random(47);
    private static int counter = 0;
    private final int id = counter++;
    private final int priority;// 用於表示當前任務的執行優先級
    
    protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
    
    public PrioritizedTask(int priority)
    {
        this.priority = priority;
        sequence.add(this);
    }
    /**
     * 用於比較任務的優先級
     */
    @Override
    public int compareTo(PrioritizedTask o)
    {
        return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);
    }
    
    @Override
    public void run()
    {
        try
        {
            TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println(this);
    }
    
    @Override
    public String toString()
    {
        return String.format("[%1$-3d]", priority) + " Task " + id;
    }
    
    public String summary()
    {
        return "(" + id + ":" + priority + ")";
    }
    
}

/**
 * 該任務負責展示當前sequence中的任務與正常執行的任務進行對比以及停止線程
 */
class EndSentinel extends PrioritizedTask
{
    private ExecutorService exec;
    
    public EndSentinel(ExecutorService e)
    {
        super(-1);
        exec = e;
    }
    
    public void run()
    {
        int count = 0;
        for (PrioritizedTask pt : sequence)
        {
            System.out.println(pt.summary());
            if (++count % 5 == 0)
            {
                System.out.println();
            }
        }
        System.out.println();
        System.out.println(this + " Calling shutdownNow");
        exec.shutdownNow();
    }
}

/**
 *負責生成任務,分別造不同級別的任務添加到執行隊列中 
 *
 */
class PrioritizedTaskProduncer implements Runnable
{
    private Random rand = new Random(47);
    private Queue<Runnable> queue;
    private ExecutorService exec;
    
    public PrioritizedTaskProduncer(Queue<Runnable> q, ExecutorService e)
    {
        queue = q;
        exec = e;
    }
    
    @Override
    public void run()
    {
        //20個不同優先級的任務,通過隨機數來產生
        for (int i = 0; i < 20; i++)
        {
            queue.add(new PrioritizedTask(rand.nextInt(10)));
            Thread.yield();
        }
        try
        {
            //20個優先級高的任務
            for (int i = 0; i < 10; i++)
            {
                
                TimeUnit.MILLISECONDS.sleep(250);
                queue.add(new PrioritizedTask(10));
            }
            //優先級0-9的執行任務
            for (int i = 0; i < 10; i++)
            {
                queue.add(new PrioritizedTask(i));
            }
            
            queue.add(new EndSentinel(exec));
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskProducer");
    }
}
/**
 *負責消費任務
 */
class PrioritizedTaskConsumer implements Runnable
{
    private PriorityBlockingQueue<Runnable> q;
    
    public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q)
    {
        this.q = q;
    }
    
    @Override
    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                q.take().run();
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        System.out.println("Finished PrioritizedTaskConsumer");
    }
}

public class PriorityBlockingQueueDemo
{
    public static void main(String[] args) throws InterruptedException
    {
        ExecutorService exec = Executors.newCachedThreadPool();
        PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
        exec.execute(new PrioritizedTaskProduncer(queue, exec));
        //此處需特別注意,需要等待所有的任務已經生成完后,再執行消費,否則,可能會看到消費並非按照優先級執行的
        Thread.sleep(5000);
        exec.execute(new PrioritizedTaskConsumer(queue));
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM