主要思路是用一個set做前端去重緩沖, 若干個list做后端的多優先級消息隊列, 用一個進程來進行分發, 即從set中分發消息到隊列.
set緩沖的設計為當天有效, 所以有個零點問題,有可能在零點前set中剛放進去的消息沒有分發即失效, 這一點可以用另一個進程彌補處理前一天的遺留消息和刪除前一天的緩沖
<?php /** * @author * */ class MsgQuery { // TODO - Insert your code here const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩沖key前綴 const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩沖key前綴 const SCORE_NUM = 5; // 優先級划分數目 const MIN_SCORE = 1; // 最小優先級 static $MAX_SCORE; static $instance = null; private $redis; public static function getInstance($redis) { if (null == self::$instance) { self::$instance = new MsgQuery ( $redis ); } return self::$instance; } /** * 添加消息到消息緩沖區 * @param int $score 優先級(1-5) * @param string $msg 消息 */ public function add($score, $msg) { // 添加到消息緩沖 $socre = intval ( $score ); if ($socre < self::MIN_SCORE) { $score = self::MIN_SCORE; } if ($score > self::$MAX_SCORE) { $score = self::$MAX_SCORE; } $cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' ); $cacheData = array ( 'score' => $score, 'msg' => $msg ); $this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) ); } /** * 將消息從緩沖區移動到相應的優先級隊列中 */ public function moveToQuery() { // 獲取當前緩沖區沒有入隊列的消息 $dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd'); $cacheKey = self::KEY_CACHE_PREFIX.date('Ymd'); $msgs = $this->redis->sDiff($cacheKey, $dealKey); foreach ($msgs as $cachedData){ // 放入已處理集合 $this->redis->sAdd ( $dealKey, $cachedData ); // 壓入相應的優先級隊列 $cachedData = unserialize($cachedData); $score = $cachedData['score']; $msg = $cachedData['msg']; $queryKey = self::KEY_QUERY_PREFIX.$score; $this->redis->rPush($queryKey, $msg); } unset($cachedData); } /** * 從隊列阻塞式出棧一個最高優先級消息 * @return string msg */ public function bPop(){ $queryKeys = array(); for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){ $queryKeys[] = self::KEY_QUERY_PREFIX.$score; } $msg = $this->redis->blPop($queryKeys, 0); return $msg[1]; } private function __construct($redis) { $this->redis = $redis; $this->redis->connect (); self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1; } private function __destruct() { $this->redis->close (); } } ?>