Java的優先級任務隊列的實踐


隊列的基本理解

在說隊列之前說兩個名詞:Task是任務,TaskExecutor是任務執行器

而我們今天要說的隊列就完全符合某機構這個情況,隊列在有Task進來的時候TaskExecutor就立刻開始執行Task,當沒有Task的時候TaskExecutor就處於一個阻塞狀態,當有很多Task的時候Task也需要排隊,TaskExecutor也可以是多個,並且可以指定某幾個Task優先執行或者滯后執行。

綜上所說我們得出一個這樣的關系:隊列相當於某機構TaskExecutor相當於窗口辦事者就是Task

普通隊列

當然很多機構也沒有設置什么軍人優先的窗口,所以隊列也有不帶優先級的隊列,因此我們先來實現一個非優先級的隊列。

和上述某機構不一樣,某機構可以先有機構,再有窗口,再有辦事者。但是我們寫代碼的時候,要想寫一個隊列,那么務必要在隊列中寫TaskExecutor,那么就得先寫好TaskExecutor類,以此類推就得先有Task類。

因此我們先寫一個Task的接口,也就是辦事的人,我把它設計為接口,方便辦各種不同事的人進來:

// 辦事的人。
public interface ITask {
    // 辦事,我們把辦事的方法給辦事的人,也就是你要辦什么事,由你自己決定。
    void run();
}

接下來再寫一個TaskExecutor的類,也就是窗口,用來執行Task,認真看注釋,非常有助於理解:

// 窗口
public class TaskExecutor extends Thread {

    // 在窗口拍的隊,這個隊里面是辦事的人。
    private BlockingQueue<ITask> taskQueue;

    // 這個辦事窗口是否在等待着辦事。
    private boolean isRunning = true;

    public TaskExecutor(BlockingQueue<ITask> taskQueue) {
        this.taskQueue = taskQueue;
    }

    // 下班。
    public void quit() {
        isRunning = false;
        interrupt();
    }

    @Override
    public void run() {
        while (isRunning) { // 如果是上班狀態就待着。
            ITask iTask;
            try {
                iTask = taskQueue.take(); // 叫下一個辦事的人進來,沒有人就等着。
            } catch (InterruptedException e) {
                if (!isRunning) {
                    // 發生意外了,是下班狀態的話就把窗口關閉。
                    interrupt();
                    break; // 如果執行到break,后面的代碼就無效了。
                }
                // 發生意外了,不是下班狀態,那么窗口繼續等待。
                continue;
            }

            // 為這個辦事的人辦事。
            iTask.run();
        }
    }
}

這里要稍微解釋下BlockingQueue<T>#take()方法,這個方法當隊列里面的item為空的時候,它會一直處於阻塞狀態,當隊列中進入item的時候它會立刻有一個返回值,它就和ServerSocket.accept()方法一樣,所以我們把它放入一個Thread中,以免阻塞調用它的線程(Android中可能是主線程)。

辦事的人和窗口都有了,下面我們封裝一個隊列,也就是某機構,用來管理這些窗口:

// 某機構。
public class TaskQueue {

    // 某機構排的隊,隊里面是辦事的人。
    private BlockingQueue<ITask> mTaskQueue;
    // 好多窗口。
    private TaskExecutor[] mTaskExecutors;

    // 在開發者new隊列的時候,要指定窗口數量。
    public TaskQueue(int size) {
        mTaskQueue = new LinkedBlockingQueue<>();
        mTaskExecutors = new TaskExecutor[size];
    }

    // 開始上班。
    public void start() {
        stop();
        // 把各個窗口都打開,讓窗口開始上班。
        for (int i = 0; i < mTaskExecutors.length; i++) {
            mTaskExecutors[i] = new TaskExecutor(mTaskQueue);
            mTaskExecutors[i].start();
        }
    }

    // 統一各個窗口下班。
    public void stop() {
        if (mTaskExecutors != null)
            for (TaskExecutor taskExecutor : mTaskExecutors) {
                if (taskExecutor != null) taskExecutor.quit();
            }
    }

    // 開一個門,讓辦事的人能進來。
    public <T extends ITask> int add(T task) {
        if (!mTaskQueue.contains(task)) {
            mTaskQueue.add(task);
        }
        // 返回排的隊的人數,公開透明,讓外面的人看的有多少人在等着辦事。
        return mTaskQueue.size();
    }
}

某機構、窗口、辦事的人都有了,下面我們就派一個人去一件具體的事,但是上面我的Task是一個接口,所以我們需要用一個類來實現這個接口,來做某一件事:

// 做一件打印自己的id的事。
public class PrintTask implements ITask {

    private int id;

    public PrintTask(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        // 為了盡量模擬窗口辦事的速度,我們這里停頓兩秒。
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ignored) {
        }

        System.out.println("我的id是:" + id);
    }
}

下面就讓我們模擬的虛擬世界運行一次:

public class Main {

    public static void main(String... args) {
        // 這里暫時只開一個窗口。
        TaskQueue taskQueue = new TaskQueue(1);
        taskQueue.start();

        for (int i = 0; i < 10; i++) {
            PrintTask task = new PrintTask(i);
            taskQueue.add(task);
        }
    }

}

沒錯,隊列按照我們理想的狀況打印出來了:

我的id是:0
我的id是:1
我的id是:2
我的id是:3
我的id是:4
我的id是:5
我的id是:6
我的id是:7
我的id是:8
我的id是:9

上面我門只開了一個窗口,下面我多開幾個窗口:

public class Main {

    public static void main(String... args) {
        // 開三個窗口。
        TaskQueue taskQueue = new TaskQueue(3);
        taskQueue.start(); // 某機構開始工作。

        for (int i = 0; i < 10; i++) {
            // new 10 個需要辦事的人,並且進入某機構辦事。
            PrintTask task = new PrintTask(i);
            taskQueue.add(task);
        }
    }
}

這里要說明一下,在初始化的時候我們開了3個窗口,內部的順序應該是這樣的:

某機構的大門開了以后,第一個辦事的人進去到了第一個窗口,第二個辦事的人進去到了第二個窗口,第三個辦事的人進去到了第三個窗口,第四個辦事的人進去排隊在第一位,當第一、第二、第三個窗口中不論哪一個窗口的事辦完了,第四個人就去哪一個窗口繼續辦事,第五個人等待,一次類推。這樣子就達到了隊列同事並發三個任務的效果。

這就是一個普通的隊列,其它的一些特性也是基於此再次封裝的,那么下面我就基於此再把人物的優先級加上,也就是我們上面說的特殊窗口->軍人優先!

優先級隊列

我們排隊等待辦事的時候,來了一個辦事的人,那么如何判斷這個辦事人是否可以優先辦理呢?那就要判斷它是否具有優先的權限甚至他可以優先到什么程度。

所以我們需要讓這個Task有一標志,那就是優先級,所以我用一個枚舉類標記優先級:

public enum Priority {
    LOW, // 最低。
    DEFAULT, // 默認級別。
    HIGH, // 高於默認級別。
    Immediately // 立刻執行。
}

這里我把分了四個等級:最低默認立刻,這個等級肯定要給到我們的辦事的人,也就是Task

public interface ITask {

    void run();

    void setPriority(Priority priority);

    Priority getPriority();
}

可以設置優先級和可以拿到優先級。

下面我們要把上面的LinkedBlockingQueue替換成PriorityBlockingQueue<E>,因為它可以自動做到優先級的比較,它要求泛型<E>,也就是我們的Task必須實現Comparable<E>接口,而Comparable<E>有一個compareTo(E)方法可以對兩個<T>做比較,因此我們的隊列需要改一下實現的方法:

// 某機構。
public class TaskQueue {

    // 某機構排的隊,隊里面是辦事的人。
    private BlockingQueue<ITask> mTaskQueue;
    // 好多窗口。
    private TaskExecutor[] mTaskExecutors;

    // 在開發者new隊列的時候,要指定窗口數量。
    public TaskQueue(int size) {
        mTaskQueue = new PriorityBlockingQueue<>();
        mTaskExecutors = new TaskExecutor[size];
    }

    ...

然后ITask接口繼承Comparable<E>接口:

public interface ITask extends Comparable<ITask> {

    void run();

    void setPriority(Priority priority);

    Priority getPriority();
}

因為有setPriority(Priority)方法和getPriority()方法和Comparable<E>compareTo(E)方法,所以我們的每一個Task都需要實現這幾個方法,這樣就會很麻煩,所以我們封裝一個BasicTask:

public abstract class BasicTask implements ITask {

    // 默認優先級。
    private Priority priority = Priority.DEFAULT;

    @Override
    public void setPriority(Priority priority) {
        this.priority = priority;
    }

    @Override
    public Priority getPriority() {
        return priority;
    }

    // 做優先級比較。
    @Override
    public int compareTo(ITask another) {
        final Priority me = this.getPriority();
        final Priority it = another.getPriority();
        return me == it ? [...] : it.ordinal() - me.ordinal();
    }
}

其它都好說,我們看到compareTo(E)方法就不太理解了,這里說一下這個方法:

compareTo(E)中傳進來的E是另一個Task,如果當前Task比另一個Task更靠前就返回負數,如果比另一個Task靠后,那就返回正數,如果優先級相等,那就返回0。

這里要特別注意,我們看到上面當兩個Task優先級不一樣的時候調用了Priority.orinal()方法,並有后面的orinal減去了當前的orinal,怎么理解呢?首先要理解Priority.orinal()方法,在Java中每一個枚舉值都有這個方法,這個枚舉的值是它的下標+1,也就是[index + 1],所以我們寫的Priority類其實可以這樣理解:

public enum Priority {
    1,
    2,
    3,
    4
}

繼續,如果給當前Task比較低,給compareTo(E)中的Task設置的優先級別比較高,那么Priority不一樣,那么返回的值就是整數,因此當前Task就會被PriorityBlockingQueue<E>排到后面,如果調換那么返回結果也就調換了。

但是我們注意到me == it ? [...] : it.ordinal() - me.ordinal();中的[...]是什么鬼啊?因為這里缺一段代碼呀哈哈哈(這個作者怎么傻乎乎的),這一段代碼的意思是當優先級別一樣的時候怎么辦,那就是誰先加入隊列誰排到前面唄,那么怎樣返回值呢,我們怎么知道哪個Task先加入隊列呢?這個時候可愛的我就出現了,我給它給一個序列標記它什么時候加入隊列的不久完事了,於是我們可以修改下ITask接口,增加兩個方法:

public interface ITask extends Comparable<ITask> {

    void run();

    void setPriority(Priority priority);

    Priority getPriority();

    void setSequence(int sequence);

    int getSequence();
}

我們用setSequence(int)標記它加入隊列的順序,然后因為setSequence(int)getSequence()是所有Task都需要實現的,所以我們在BasicTask中實現這兩個方法:

public abstract class BasicTask implements ITask {

    // 默認優先級。
    private Priority priority = Priority.DEFAULT;
    private int sequence;

    @Override
    public void setPriority(Priority priority) {
        this.priority = priority;
    }

    @Override
    public Priority getPriority() {
        return priority;
    }

    @Override
    public void setSequence(int sequence) {
        this.sequence = sequence;
    }

    @Override
    public int getSequence() {
        return sequence;
    }

    // 做優先級比較。
    @Override
    public int compareTo(ITask another) {
        final Priority me = this.getPriority();
        final Priority it = another.getPriority();
        return me == it ?  this.getSequence() - another.getSequence() :
            it.ordinal() - me.ordinal();
    }
}

看到了吧,剛才的[...]已經變成了this.getSequence() - another.getSequence(),這里需要和上面的it.ordinal() - me.ordinal();的邏輯對應,上面說到如果給當前Task比較低,給compareTo(E)中的Task設置的優先級別比較高,那么Priority不一樣,那么返回的值就是整數,因此當前Task就會被PriorityBlockingQueue<E>排到后面,如果調換那么返回結果也就調換了。

這里的邏輯和上面對應就是和上面的邏輯相反,因為這里是當兩個優先級一樣時的返回,上面是兩個優先級不一樣時的返回,所以當優先級別一樣時,返回負數表示當前Task在前,返回正數表示當前Task在后,正好上面上的邏輯對應。

接下來就是給Task設置序列了,於是我們在TaskQueue中的T void add(T)方法做個手腳:

public class TaskQueue {

    private AtomicInteger mAtomicInteger = new AtomicInteger();

    ...

    public TaskQueue(int size) {
        ...
    }

    public void start() {
        ...
    }

    public void stop() {
        ...
    }

    public <T extends ITask> int add(T task) {
        if (!mTaskQueue.contains(task)) {
            task.setSequence(mAtomicInteger.incrementAndGet()); // 注意這行。
            mTaskQueue.add(task);
        }
        return mTaskQueue.size();
    }
}

這里我們使用了AtomicInteger類,它的incrementAndGet()方法會每次遞增1,其實它相當於:

mAtomicInteger.addAndGet(1);

其它具體用法請自行搜索,這里不再贅述。

到此為止,我們的優先級別的隊列就實現完畢了,我們來做下測試:

public static void main(String... args) {
    // 開一個窗口,這樣會讓優先級更加明顯。
    TaskQueue taskQueue = new TaskQueue(1);
    taskQueue.start(); //  // 某機構開始工作。

    // 為了顯示出優先級效果,我們預添加3個在前面堵着,讓后面的優先級效果更明顯。
    taskQueue.add(new PrintTask(110));
    taskQueue.add(new PrintTask(112));
    taskQueue.add(new PrintTask(122));

    for (int i = 0; i < 10; i++) { // 從第0個人開始。
    PrintTask task = new PrintTask(i);
    if (1 == i) { 
        task.setPriority(Priority.LOW); // 讓第2個進入的人最后辦事。
    } else if (8 == i) {
        task.setPriority(Priority.HIGH); // 讓第9個進入的人第二個辦事。
    } else if (9 == i) {
        task.setPriority(Priority.Immediately); // 讓第10個進入的人第一個辦事。
    } 
    // ... 其它進入的人,按照進入順序辦事。
    taskQueue.add(task);
}

沒錯這就是我們看到的效果:

我的id是:9
我的id是:8
我的id是:110
我的id是:112
我的id是:122
我的id是:0
我的id是:2
我的id是:3
我的id是:4
我的id是:5
我的id是:6
我的id是:7
我的id是:1

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM