由於使用的laravel框架,所以使用了框架自帶的函數
1. 這里先創建一個鏈接,給一個stream里面添加數據
$streamKey = 'test:stream:queue'; $redis = new \Redis(); $redis->connect('127.0.0.1'); for ($i = 0; $i < 100; $i++) { /** * 隊列名 * *: 表示由Redis自己生成消息ID:規則為[毫秒時間戳+自增數] * 存儲的數據 */ $xAddResult = $redis->xAdd($streamKey, '*', ['field-'.$i => 'value:'.$i*2]); }
2.刪除隊列中的某一條消息
/** * 刪除消息 * 隊列名 * 消息ID */ $xDelResult = $redis->xDel($streamKey, ['1609131229884-0']);
3.查看隊列中的消息
/** * 取出所有的消息 * 隊列名 * 消息開始ID: - 不限制開始ID * 消息結束ID: + 表示不限制 */ $streamResult = $redis->xRange($streamKey, '-', '+'); dd($streamResult);
4.此時如果要消費隊列中的消息,需要先創建一個group與隊列關聯起來,才可以消費隊列中的消息
/** * 創建一個消費組 * 操作類型:['HELP', 'SETID', 'DELGROUP', 'CREATE', 'DELCONSUMER'] * 隊列名 * 消費者 : 這個時候自己隨便起名字就可以 * 消息ID : 0 表示從頭開始 $ 表示不接收老的消息 */ $xGroupResult = $redis->xGroup('CREATE', $streamKey, $streamKey.':group_1', 0); dd($xGroupResult);
5.獲取隊列中的消息
/** * group * 消費者 * [隊列名 => '>' : 特殊>ID,這意味着使用者只想接收從未傳遞給任何其他使用者的消息。這只是意味着,給我新消息。 * 隊列名 => '0' : 任何其他ID(即0或任何其他有效ID或不完整的ID(僅毫秒時間部分))將具有以下效果:返回正在等待用戶發送的ID大於提供的ID的命令的條目。因此,基本上,如果ID不是>,那么該命令將只允許客戶端訪問其掛起的條目:傳遞給它的消息,但尚未確認。] * 一次性取多少條消息 */ $xReadGroupResult = $redis->xReadGroup($streamKey.':group_1', 'consumerA', [$streamKey => '>'], 1);
6.從消費者組內讀取消息並處理完成后,需確認該條消息已處理
/** * 確認消息已處理 * 隊列名 * 消費者組 * 消息ID */ $xAckResult = $redis->xAck($streamKey, $streamKey.':group_1', ['1609131229885-0']); dd($xAckResult);
7. 用來獲消費組或消費內消費者的未處理完畢的消息。
/** * 用來獲消費組或消費內消費者的未處理完畢的消息。 * 隊列名 * 消費者組 */ $pendingResult = $redis->xPending($streamKey, $streamKey.':group_1'); dd($pendingResult);
其他更多的命令可以參考 Redis stream