Redis設計主要是用來做緩存的,但是由於它自身的某種特性使得它可以用來做消息隊列。
它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓其有能力做消息隊列;
另外,做消息隊列的其他特性例如FIFO(先入先出)也很容易實現,只需要一個list對象從頭取數據,從尾部塞數據即可;
Redis能做消息隊列還得益於其list對象blpop brpop接口以及Pub/Sub(發布/訂閱)的某些接口,它們都是阻塞版的,所以可以用來做消息隊列。(List : lpush / rpop)
方式一:生產者與消費者模式
使用list結構作為隊列,rpush生產消息,lpop消費消息,當lpop沒有消息的時候,要適當sleep一會再重試。
或者,不用sleep,直接用blpop指令,在沒有消息的時候,它會阻塞住直到消息到來。
具體文章可查看:PHP與redis隊列實現電商訂單自動確認收貨
方式二:發布訂閱者模式
使用pub/sub主題訂閱者模式,可以實現1:N的消息隊列。
基於事件的系統中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規模系統所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
消息發布者,即publish客戶端,無需獨占鏈接,你可以在publish消息的同時,使用同一個redis-client鏈接進行其他操作(例如:INCR等)
消息訂閱者,即subscribe客戶端,需要獨占鏈接,即進行subscribe期間,redis-client無法穿插其他操作,此時client以阻塞的方式等待“publish端”的消息;這一點很好理解,因此subscribe端需要使用單獨的鏈接,甚至需要在額外的線程中使用。
redis做異步隊列的缺點
在消費者下線的情況下,生產的消息會丟失。此場景,建議用MQ。
使用MQ可查看:在分布式系統上,你是如何處理數據的一致性的
下面用代碼實際實現,用發布訂閱者模式,給大家講解講解
消息訂閱者subscribe.php
<?php
ini_set('default_socket_timeout', -1); //php配置設置不超時
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
//$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); //redis方式設置不超時,推薦
$redis->subscribe(['chan'],'callback'); //callback為回調函數名稱
//$redis->subscribe(['chan'],array(new TestCall(),'callback') ); //如果回調函數是類中的方法名,這樣寫
// 回調函數,這里寫處理邏輯
function callback($instance, $channelName, $message)
{
echo $channelName, "==>", $message, PHP_EOL;
//$instance,即為上面創建的redis實例對象,在回調函數中,默認的這個參數就是,因此不需專門傳參。 這里除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE這4條命令之外其它命令都不能使用
//如果要使用redis中的其他命令,這樣實現
$newredis = new Redis();
$newredis->connect("127.0.0.1", 6379);
echo $newredis->get('test') . PHP_EOL;
$newredis->close();
//可以根據$channelName, $message,處理不同的業務邏輯
switch($chan) {
case 'chan-1':
...
break;
case 'chan-2':
...
break;
}
switch($message) {
case 'msg1':
...
break;
case 'msg2':
...
break;
}
}
subscribe.php中設置不超時
方法1:ini_set('default_socket_timeout', -1);方法2:$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
如果不設置不超時,60s后會報一個錯誤
PHP Fatal error: Uncaught RedisException: read error on connection to 127.0.0.1:6379 in subscribe.php:6
方式一的實現,是通過臨時修改ini的配置值,default_socket_timeout默認為60s,default_socket_timeout是socket流的超時參數,即socket流從建立到傳輸再到關閉整個過程必須要在這個參數設置的時間以內完成,如果不能完成,那么PHP將自動結束這個socket並返回一個警告。
方式二是通過修改redis的配置項,因此僅對redis連接生效,相對於方式1,不會產生意外的對其他方法的影響。
消息發布者publish.php
<?php
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
$redis->publish('chan','this is a message');
批量訂閱
redis的psubscribe支持通過模式匹配的方式實現批量訂閱,訂閱方式
$redis->psubscribe(['my*'],'psubscribe'); //回調函數寫函數名
或者
$redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回調函數為類中的方法,類名寫你自己定義的類
subscribe.php
<?php
//ini_set('default_socket_timeout', -1); //不超時
$redis = new Redis();
$redis->connect("127.0.0.1",6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
//匹配方式1:發布可用$redis->publish('mymest','this is a message');
//$redis->psubscribe(['my*'],'psubscribe');
//匹配方式2:發布可用$redis->publish('mydest','this is a message');
//$redis->psubscribe(['my?est'],'psubscribe');
//匹配方式3:發布可用$redis->publish('myaest','this is a message');或$redis->publish('myeest','this is a message');
$redis->psubscribe(['my[ae]est'],'psubscribe');
function psubscribe($redis, $pattern, $chan, $msg) {
echo "Pattern: $pattern\n";
echo "Channel: $chan\n";
echo "Payload: $msg\n";
}
模式匹配規則
支持以下幾種,以hello舉例:
h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
特殊字符用\轉義
pubsub方法介紹
public function pubsub( $keyword, $argument )
pubsub獲取pub/sub系統的信息,$keyword可用為"channels", "numsub", 或者"numpat",三種,傳入不同的keyword返回的數據不同
* $redis->pubsub('channels'); // All channels 獲取所有的頻道,返回數組
* $redis->pubsub('channels', '*pattern*'); // Just channels matching your pattern,返回符合匹配模式的頻道
* $redis->pubsub('numsub', array('chan1', 'chan2')); // Get subscriber counts for 'chan1' and 'chan2' //返回每個訂閱頻道的數量,返回數組
* $redis->pubsub('numpat'); // Get the number of pattern subscribers 獲取模式匹配方式的訂閱