基礎介紹
Pub/Sub功能(means Publish, Subscribe)即發布及訂閱功能
- 基於事件的系統中,Pub/Sub是目前廣泛使用的通信模型,它采用事件作為基本的通信機制,提供大規模系統所要求的松散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發布者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
- 消息發布者,即publish客戶端,無需獨占鏈接,你可以在publish消息的同時,使用同一個redis-client鏈接進行其他操作(例如:INCR等)
- 消息訂閱者,即subscribe客戶端,需要獨占鏈接,即進行subscribe期間,redis-client無法穿插其他操作,此時client以阻塞的方式等待“publish端”的消息;這一點很好理解,因此subscribe端需要使用單獨的鏈接,甚至需要在額外的線程中使用。
當使用銀行卡消費的時候,銀行往往會通過微信、短信或郵件通知用戶這筆交易的信息,這便是一種發布訂閱模式,這里的發布是交易信息的發布,訂閱則是各個渠道。這在實際工作中十分常用,Redis 支持這樣的一個模式。
發布訂閱模式首先需要消息源,也就是要有消息發布出來,比如例子中的銀行通知。首先是銀行的記賬系統,收到了交易的命令,成功記賬后,它就會把消息發送出來,這個時候,訂閱者就可以收到這個消息進行處理了,觀察者模式就是這個模式的典型應用了。
終端實現
訂閱,頻道為'chat'
發布消息
代碼實現
subscribe.php
<?php ini_set('default_socket_timeout', -1); //php配置設置不超時 $redis = new Redis(); $redis->connect("127.0.0.1",6379); //$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); //redis方式設置不超時,推薦 $redis->subscribe(['chan'],'callback'); //callback為回調函數名稱 //$redis->subscribe(['chan'],array(new TestCall(),'callback') ); //如果回調函數是類中的方法名,這樣寫 // 回調函數,這里寫處理邏輯 function callback($instance, $channelName, $message) { echo $channelName, "==>", $message, PHP_EOL; //$instance,即為上面創建的redis實例對象,在回調函數中,默認的這個參數就是,因此不需專門傳參。 這里除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE這4條命令之外其它命令都不能使用 //如果要使用redis中的其他命令,這樣實現 $newredis = new Redis(); $newredis->connect("127.0.0.1", 6379); echo $newredis->get('test') . PHP_EOL; $newredis->close(); //可以根據$channelName, $message,處理不同的業務邏輯 switch($chan) { case 'chan-1': ... break; case 'chan-2': ... break; } switch($message) { case 'msg1': ... break; case 'msg2': ... break; } }
publish.php
<?php $redis = new Redis(); $redis->connect("127.0.0.1",6379); $redis->publish('chan','this is a message');
代碼介紹
- subscribe.php中設置不超時
方法1:ini_set('default_socket_timeout', -1);方法2: $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
如果不設置不超時,60s后會報一個錯誤
PHP Fatal error: Uncaught RedisException: read error on connection to 127.0.0.1:6379 in subscribe.php:6
方式一的實現,是通過臨時修改ini的配置值,default_socket_timeout默認為60s,default_socket_timeout是socket流的超時參數,即socket流從建立到傳輸再到關閉整個過程必須要在這個參數設置的時間以內完成,如果不能完成,那么PHP將自動結束這個socket並返回一個警告。
方式二是通過修改redis的配置項,因此僅對redis連接生效,相對於方式1,不會產生意外的對其他方法的影響。
批量訂閱
redis的psubscribe支持通過模式匹配的方式實現批量訂閱,訂閱方式
回調函數寫函數名或者redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回調函數為類中的方法,類名寫你自己定義的類
subscribe.php
<?php //ini_set('default_socket_timeout', -1); //不超時 $redis = new Redis(); $redis->connect("127.0.0.1",6379); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); //匹配方式1:發布可用$redis->publish('mymest','this is a message'); //$redis->psubscribe(['my*'],'psubscribe'); //匹配方式2:發布可用$redis->publish('mydest','this is a message'); //$redis->psubscribe(['my?est'],'psubscribe'); //匹配方式3:發布可用$redis->publish('myaest','this is a message');或$redis->publish('myeest','this is a message'); $redis->psubscribe(['my[ae]est'],'psubscribe'); function psubscribe($redis, $pattern, $chan, $msg) { echo "Pattern: $pattern\n"; echo "Channel: $chan\n"; echo "Payload: $msg\n"; }
模式匹配規則
支持以下幾種,以hello舉例:
h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
特殊字符用\轉義
pubsub方法介紹
public function pubsub( argument )
pubsub獲取pub/sub系統的信息,$keyword可用為"channels", "numsub", 或者"numpat",三種,傳入不同的keyword返回的數據不同
* $redis->pubsub('channels'); // All channels 獲取所有的頻道,返回數組 * $redis->pubsub('channels', '*pattern*'); // Just channels matching your pattern,返回符合匹配模式的頻道 * $redis->pubsub('numsub', array('chan1', 'chan2')); // Get subscriber counts for 'chan1' and 'chan2' //返回每個訂閱頻道的數量,返回數組 * $redis->pubsub('numpat'); // Get the number of pattern subscribers 獲取模式匹配方式的訂閱
轉載至:https://zhuanlan.zhihu.com/p/155667437