關於Redis stream的使用


由於使用的laravel框架,所以使用了框架自帶的函數

1. 這里先創建一個鏈接,給一個stream里面添加數據

$streamKey = 'test:stream:queue';

        $redis = new \Redis();
        $redis->connect('127.0.0.1');

        for ($i = 0; $i < 100; $i++) {
            /**
             * 隊列名
             * *: 表示由Redis自己生成消息ID:規則為[毫秒時間戳+自增數]
             * 存儲的數據
             */
            $xAddResult = $redis->xAdd($streamKey, '*', ['field-'.$i => 'value:'.$i*2]);
        }

 

 

 

 

2.刪除隊列中的某一條消息

/**
 * 刪除消息
 * 隊列名
 * 消息ID
 */
 $xDelResult = $redis->xDel($streamKey, ['1609131229884-0']);

 

3.查看隊列中的消息

/**
  * 取出所有的消息
  * 隊列名
  * 消息開始ID: - 不限制開始ID
  * 消息結束ID: + 表示不限制
  */
  $streamResult = $redis->xRange($streamKey, '-', '+');
  dd($streamResult);

  

4.此時如果要消費隊列中的消息,需要先創建一個group與隊列關聯起來,才可以消費隊列中的消息

/**
  * 創建一個消費組
  * 操作類型:['HELP', 'SETID', 'DELGROUP', 'CREATE', 'DELCONSUMER']
  * 隊列名
  * 消費者 : 這個時候自己隨便起名字就可以
  * 消息ID : 0 表示從頭開始 $ 表示不接收老的消息
  */
  $xGroupResult = $redis->xGroup('CREATE', $streamKey, $streamKey.':group_1', 0);
  dd($xGroupResult);

  

5.獲取隊列中的消息

/**
         * group
         * 消費者
         * [隊列名 => '>' : 特殊>ID,這意味着使用者只想接收從未傳遞給任何其他使用者的消息。這只是意味着,給我新消息。
         *  隊列名 => '0' : 任何其他ID(即0或任何其他有效ID或不完整的ID(僅毫秒時間部分))將具有以下效果:返回正在等待用戶發送的ID大於提供的ID的命令的條目。因此,基本上,如果ID不是>,那么該命令將只允許客戶端訪問其掛起的條目:傳遞給它的消息,但尚未確認。]
         * 一次性取多少條消息
         */
        $xReadGroupResult = $redis->xReadGroup($streamKey.':group_1', 'consumerA', [$streamKey => '>'], 1);

  

 

 

6.從消費者組內讀取消息並處理完成后,需確認該條消息已處理

/**
         * 確認消息已處理
         * 隊列名
         * 消費者組
         * 消息ID
         */
        $xAckResult = $redis->xAck($streamKey, $streamKey.':group_1', ['1609131229885-0']);
        dd($xAckResult);

  

7. 用來獲消費組或消費內消費者的未處理完畢的消息。

/**
         * 用來獲消費組或消費內消費者的未處理完畢的消息。
         * 隊列名
         * 消費者組
         */
        $pendingResult = $redis->xPending($streamKey, $streamKey.':group_1');
        dd($pendingResult);

  

 

 

其他更多的命令可以參考 Redis stream

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM