用Redis實現優先級隊列


在最近在面試過程中,張先森遇到一個面試官這么問,如果一個並發很大的消息應用,想要根據請求的優先級來處理,該怎么做。我當時只是籠統地回答用redis,面試官點了點頭,這個問題就此通過。

那么用redis究竟如何解決這個問題呢,下面就簡單說一下吧。

首先抓出問題里面幾個關鍵字,一是並發量大,二是請求的優先級。

先談談並發量大,對於一個消息系統,服務端必然會接受很多客戶端的請求,這些請求一般來說都是異步的,用戶不必等待請求被處理。對於這類需求,我們需要有一個能緩存住大量消息請求的東西,用redis來做這個是非常合適的。基本上來說,redis能緩存住的消息數量只取決於內存大小,而且我們需要的只是隊列最基本的操作:進隊和出隊,它們的時間復雜度都是O(1),因此性能上很高。

具體來說,redis里面有一個list結構,我們可以利用list構造一個FIFO(先進先出)的隊列,所有請求就在這個隊列里面排隊等待處理。redis的list有lpush,rpush,lpop和rpop這么幾個常用的操作,如果我們要構造FIFO隊列,可以用lpush和rpop(或者用rpush和lpop),注意進隊和出隊方向相反即可。

第二個關鍵字,請求的優先級。我們先假設一個最簡單的場景,有三個優先級:高中低三級。可以設置3個list結構,比如叫queue_h,queue_m,queue_l,分別對應三個優先級。我們的代碼流程可以這樣來寫:

首先設置3個優先級的list。

寫入端:

1. 根據請求的優先級往相應list里lpush數據。 

讀出端:

1. 可以采用定時輪詢的方式,按序依次檢查高、中、低三個list的長度(可以使用llen命令),如果該list長度大於0,說明當前隊列需要立即被處理。

2. 從這個list中rpop數據,然后處理數據。

需要注意的是,因為有分優先級,所以只有在高優先級的請求都被處理完以后才能去處理中低優先級的請求,這是一個大前提。

有人可能會問,如果我的優先級分類遠大於3個呢,比如有1000個優先級怎么辦,總不能設置1000個list吧,這樣太蛋疼了。這種情況也不是完全沒可能,也許有的系統就是這么多優先級呢。

這種需求我們可以結合分段來處理,比如0-99,100-199...900-999,先把優先級分成幾個等分,然后在各個分段中使用有序集合,有序集合可以對集合內的元素排序,有序集合在插入一個元素的時候使用二分查找法,所以在比較大的數據量面前效率還是可以的,如果請求數實在太多,可以考慮進一步細分優先級的分段,以減少有序列表元素的數量。在一個請求進來時,首先確定它的優先級分段,把這個請求放到相應的有序集合中。在處理部分,需要有一個服務書按優先級高到低順序遍歷優先級的分段,然后直接取優先級最高的請求來處理(在有序集合中取最高或最低的元素時間復雜度都是O(1))。

下面是一些代碼示例,用node.js編寫,只分了三個優先級。

 1 // 生產者
 2 
 3 var redisClient = require("./lib/redis");
 4 var redisConf = require("./config/config.json").redis;
 5 
 6 redisClient.config(redisConf);
 7 
 8 var client = redisClient.client;
 9 
10 // 優先級隊列,低中高三個等級
11 var priorityQueues = ["queue_h", "queue_m", "queue_l"];
12 
13 function getRandomNum(min, max) {
14     var range = max - min;
15     var rand = Math.random();
16     return(min + Math.round(rand * range));
17 }
18 
19 // 每隔兩秒產生10條數據
20 setInterval(function(){
21     var count = 10;
22     for (var i = 0; i < count; i++) {
23         var idx = getRandomNum(0, 2);
24         console.log("push: " + priorityQueues[idx]);
25         client.lpush(priorityQueues[idx], "abc");
26     }
27 }, 2000);
// 消費者

var async = require("async");
var redisClient = require("./lib/redis");
var redisConf = require("./config/config.json").redis;

redisClient.config(redisConf);

var client = redisClient.client;

// 優先級隊列,pushMessage低中高三個等級
var priorityQueues = ["queue_h", "queue_m", "queue_l"];

// 依次檢查高中低三個優先級的list,遵循FIFO
function getMessage(){
    // 分別檢查所有優先級隊列中有沒有數據
    async.parallel([
            function(callback){
                client.llen(priorityQueues[0], function(err, len){
                    callback(null, len);
                });
            },
            function(callback){
                client.llen(priorityQueues[1], function(err, len){
                    callback(null, len);
                });
            },
            function(callback){
                client.llen(priorityQueues[2], function(err, len){
                    callback(null, len);
                });
            }
        ],
        function(err, results){
            if (err) {
                console.log(err);
                return;
            }
            for (var i = 0; i < results.length; i++){
                if (results[i] > 0){
                    client.rpop(priorityQueues[i], function(err, res){
                        console.log("pop: " + priorityQueues[i] + " " + res);
                    });
                    return;
                }
                if (i == 2){
                    console.log('No message can be handled.');
                    return;
                }
            }
        });
}

// 每20ms獲取一次數據
setInterval(function(){
    getMessage();
}, 20);

代碼實現比較簡單,主要實現了高中低三個優先級的情況。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM