php 消息隊列_php隊列的實現思路和詳細過程


一:隊列場景

當我們使用某訊或者某浪的郵箱時,點擊群發郵件之后,只需等待很短的時間,瀏覽器 提示提交成功,正在發送之類的信息時,用戶就可以關掉瀏覽器,稍后,收件地址欄里的郵箱將陸續收到該群發郵件,再比如群發定時郵件,以及當商城系統中有客 戶下單,客戶,客服,倉庫等相關人員收到訂單郵件信息。諸如此類,隊列的應用范圍是如此之廣。

二:普通工程師的解決方案和架構師的解決方案
方案1:建表存郵件,消息等,用定時程序取出發送。

方案2:抽象到更高一層,開發一套通用異步處理隊列適用於任何復雜的業務邏輯
那么,作為架構師,使用隊列的做法,將抽象層和業務層分離,可具有良好的擴展性和可維護性。相比較而言就高明了許多,

下面就我們介紹一下自定義隊列的實現思路和方法。

三 :隊列總體設計

1:需要隊列程序,提供加入隊列接口和取隊列接口等
2:需要存儲隊列,文件或者數據庫
3:需要定時程序取出隊列並執行
4:其它擴展功能:優先級,日志,定時等

代碼的目錄結構如下,每個文件的作用用//注釋來標明
|–addTask.php //添加任務到隊列的例子
|–cronMission.php //定時任務調度程序,例如linux中受crontab

直接調用的文件,業務邏輯工程師可以在這個文件中靈活定義自己的隊列任務,從而不用每個隊列任務 都需要上服務器修改crontab,從而在安全性,便捷性方面有很大提高
|–db.php //數據庫操作
|–db.sql //建立隊列需要用到的基本表結構
|–doQueue.php //執行隊列任務
|–Queue.class.php //隊列核心業務在這里定義,包括將任務加入隊列,讀隊列,更改隊列任務狀態
|–sendMsg.php //隊列要實現具體任務的業務接口,比如現有系統的發送消息的接口,這里例子中因為將此隊列程序和現有系統系統集成,用寫入日志來演示

四 :隊列具體實現一:建任務存儲表
1:先來個最基本的:

  1.  
    CREATE TABLE`queue` (
  2.  
    id int(11) NOT NULL auto_increment primarykey,
  3.  
    taskphp varchar(128) NOT NULL default '',
  4.  
    param text not null default '',
  5.  
    status tinyint not null default 0,
  6.  
    ctime timestamp NOT NULL default CURRENT_TIMESTAMP,
  7.  
    KEY (ctime)
  8.  
    ) ENGINE =InnoDBDEFAULT CHARSET=utf8;

字段解釋:
taskphp:處理業務的接口文件
param:處理業務的接口文件需要接收的參數
status:任務處理狀態,0為未處理,處理完畢更改為1

五 、隊列具體實現二:定義調用接口

  1.  
    <?php
  2.  
     
  3.  
    /**
  4.  
    * @author:cyw0413
  5.  
    * 任務隊列實現
  6.  
    * date:2018-11-01
  7.  
    */
  8.  
     
  9.  
    include_once('db.php');
  10.  
    class Queue
  11.  
    {
  12.  
    /**
  13.  
    * 把任務扔到隊列
  14.  
    *
  15.  
    * @param string $taskphp 執行任務的程序
  16.  
     
  17.  
    * @param string $param 執行任務程序所用的參數
  18.  
     
  19.  
    * 例如,群發消息加入隊列:
  20.  
     
  21.  
    * $arr = array(
  22.  
     
  23.  
    * "uid" => 4,//發信息的人的UID
  24.  
     
  25.  
    * "uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID
  26.  
     
  27.  
    * "content" => 'xxxxx',//信息內容
  28.  
     
  29.  
    * );
  30.  
     
  31.  
    * $cqueue = new Queue();
  32.  
     
  33.  
    * $cqueue->add("/app/send_msg.php", serialize($arr));
  34.  
    *
  35.  
    */
  36.  
     
  37.  
    public function add($taskphp,$param)
  38.  
    {
  39.  
     
  40.  
    $taskphp = mysql_real_escape_string($taskphp);
  41.  
    //$param = mysql_real_escape_string($param);
  42.  
     
  43.  
    $param = $param;
  44.  
    $sql = "insert into queue (taskphp, param) values('".$taskphp."', '".$param."')";
  45.  
    $re = execute($sql);
  46.  
     
  47.  
    if ($re){
  48.  
    $pid = mysql_insert_id();
  49.  
    return $pid;
  50.  
    } else{
  51.  
    return false;
  52.  
     
  53.  
    }
  54.  
     
  55.  
    }
  56.  
     
  57.  
     
  58.  
     
  59.  
    /**
  60.  
    * 讀取任務隊列
  61.  
    *
  62.  
    * @param string $limit 一次取多少條
  63.  
    */
  64.  
    public function getQueueTask($limit = 1000)
  65.  
    {
  66.  
     
  67.  
    $limit = (int)$limit;
  68.  
    $sql = "select id, taskphp, param from queue where status = 0 order by id asc";
  69.  
    $re = query($sql);
  70.  
    return $re;
  71.  
    }
  72.  
     
  73.  
    /**
  74.  
    * 更新任務狀態
  75.  
    *
  76.  
    * @param string $limit 一次取多少條
  77.  
    */
  78.  
    public function updateTaskByID($id)
  79.  
    {
  80.  
    $id = (int)$id;
  81.  
    $mtime = time();
  82.  
    $sql = "update queue set status =1, mtime = ".$mtime." where id = ".$id;
  83.  
    $re = execute($sql);
  84.  
    return $re;
  85.  
    }
  86.  
     
  87.  
     
  88.  
    public static function a2s($arr)
  89.  
    {
  90.  
    $str = "";
  91.  
    foreach ($arr as $key => $value){
  92.  
    if (is_array($value)){
  93.  
    foreach ($value as $value2){
  94.  
    $str .= urlencode($key) . "[]=" . urlencode($value2) . "&";
  95.  
    }
  96.  
    } else{
  97.  
    $str .= urlencode($key) . "=" . urlencode($value) . "&";
  98.  
    }
  99.  
    }
  100.  
    return $str;
  101.  
    }
  102.  
     
  103.  
     
  104.  
    public static function s2a($str){
  105.  
    $arr = array();
  106.  
    parse_str( $str, $arr);
  107.  
    return $arr;
  108.  
    }
  109.  
     
  110.  
    }
  111.  
     
  112.  
    ?>

1:加入隊列接口
l //$param1 為執行任務的程序,$param2 為程序參數,可以為序列化的數據
l $cqueue->add($param1,$param2);
2: 讀取隊列接口
l $tasks = $cqueue->getQueueTask($limit = 1000);
3:更新任務狀態
l $cqueue->updateTaskStatus($id);
4:a2s是自定義的一個數組轉換字符串方法,這里不要使用json_encode,容易出現問題,同樣,從數據庫中取出轉換為數組的時候,使用s2a方法
l $re = $cqueue->add("sendMsg.php", Queue::a2s($arr));

六、隊列具體實現三:寫執行隊列的程序
根據設計,執行隊列的程序文件是 do_queue.php , 它的主要功能是把任務從隊列表里取出來,並且在后台執行。

do_queue.php部分代碼:

  1.  
    <?php
  2.  
     
  3.  
    $phpcmd = exec("which php"); //查找到php安裝位置
  4.  
    $cqueue = new Queue();
  5.  
     
  6.  
    $tasks = $cqueue->getQueueTask(200);
  7.  
    foreach ($tasks as $t){
  8.  
    $taskphp = $t['taskphp'];
  9.  
    $param = $t['param'];
  10.  
    $job = $phpcmd . " " . escapeshellarg($taskphp) . " " . escapeshellarg($param);
  11.  
    system( $job);

七、具體任務的業務實現

還是拿群發消息來做例子,我們需要寫好一個群發消息的程序,這個程序接收事先定義好的參數,然后根據參數調用發消息的接口把消息發送出去。
這個一般由做業務功能的工程師實現。但是架構師事先得寫文檔例子,教會別人使用。
send_msg.php:

  1.  
    <?php
  2.  
     
  3.  
    $para = $argv[1];
  4.  
    $arr = unserialize($para);
  5.  
     
  6.  
    $cmessage = new Message();
  7.  
     
  8.  
    foreach($arr['uids'] as $touid){
  9.  
    $cmessage->send($arr['uid'], $touid, $arr['content']);
  10.  
    }

八、服務器部署一:配置crontab

咱們執行隊列的程序都寫好了, 這個程序怎么觸發呢,當然就要用到linux的定時任務,每隔一定的時間,執行do_queue.php一次。但是呢,這里不是直接調用 do_queue.php,咱們再提高一層,加個調度程序cron_mission.php, 在cron_mission.php里面調用do_queue.php
配置定時任務 crontab:
l crontab –e
l * * * * * cd /ucai/schedule;php cron_mission.php >> cron_mission.log
#可以先使用crontab -l查看本機已經使用的定時任務

九、服務器部署二:寫定時任務調度程序
思路:將定時任務寫入到任務調度程序cron_mission.php中,這樣可以在cron_mission.php中靈活控制隊列任務。相比較直接通 過crontab控制doQueue.php而言,避免了頻繁修改服務器上的crontab,從安全,便於維護等角度來說,都是上策。

cron_mission.php 示例:

  1.  
    <?php
  2.  
     
  3.  
    if ($minute % 5 == 0){
  4.  
     
  5.  
    if(chdir($site_dir."app/")) {
  6.  
     
  7.  
    $cmd = "$phpcmd do_queue.php > do_queue.log &";
  8.  
    echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
  9.  
    system( $cmd);
  10.  
    }
  11.  
    }

十、開啟多進程並發執行隊列
思路:對任務序列進行編號,數據庫中執行的時候
where條件加上id%每個隊列要執行任務總數=隊列編號
這樣可以避免重復處理
例如:每個進程執行10條任務,修改如下1:定時任務的修改

修改前:

  1.  
    <?php
  2.  
    if ($minute % 5 == 0){
  3.  
    if(chdir($site_dir."app/")) {
  4.  
    $cmd = "$phpcmd do_queue.php > do_queue.log &";
  5.  
    echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
  6.  
    system( $cmd);
  7.  
    }
  8.  
    }

修改后:

  1.  
    <?php
  2.  
     
  3.  
    if ($minute % 5 == 0){
  4.  
    for ($i=0; $i < 10; $i++) {
  5.  
    $cmd = "$phpcmd doQueue.php 10 $i>> doQueueMission".date('Y-m-d').".log ";
  6.  
    echo date("Y-m-d H:i:s") . "t : " .$cmd."n";
  7.  
    system( $cmd);
  8.  
    }
  9.  
    }

//每次進行10個進程,$i來區分是當前的進程標示2:隊列執行程序的修改

修改前:

  1.  
    <?php
  2.  
    $phpcmd = 'D:workwampbinphpphp5.3.10php ';
  3.  
    $cqueue = new Queue();
  4.  
    $tasks = $cqueue->getQueueTask(200);

修改后:

  1.  
    <?php
  2.  
    $phpcmd = 'D:workwampbinphpphp5.3.10php ';
  3.  
    $total=$argv[1];
  4.  
    $i=$argb[2];
  5.  
    $cqueue = new Queue();
  6.  
    $tasks = $cqueue->getQueueTask($total,$i,200);

3:取隊列接口的修改

修改前:

  1.  
    <?php
  2.  
    public function getQueueTask($limit = 1000)
  3.  
    {
  4.  
    $limit = (int)$limit;
  5.  
    $sql = "select id, taskphp, param from queue where status = 0 order by id asc";
  6.  
    $re = query($sql);
  7.  
    return $re;
  8.  
    }

修改后:

  1.  
    <?php
  2.  
    public function getQueueTask($total,$i,$limit = 1000)
  3.  
    {
  4.  
    $limit = (int)$limit;
  5.  
    $sql = "select id, taskphp, param from queue where status = 0 and id%$total=$i order by id asc";
  6.  
    $re = query($sql);
  7.  
    return $re;
  8.  
    }

4:需要關注服務器壓力
進程數定為多少,取決於服務器壓力

十一、實現任務優先級
1:任務存儲表加優先級字段
在數據表里,加一個優先級字段,按字段值的數值大小來區分優先級
2:修改取隊列任務接口,按優先級取
同樣是在sql語句中增加order by

十二、記錄隊列日志
1:關鍵地方加echo
2:shell腳本的>>和>的各自作用

總結:
我們這里的隊列實現借助了服務器的計划任務來實現,例如linux中的crontab,這本身是linux系統中的一個程序,平時我們還可以使用他來進行 定時執行.sh腳本,例如將數據庫備份打包並ftp傳送到指定服務器上,這個功能不需要借助php腳本,直接用.sh腳本就可以實現。在這里我們巧妙的將 crontab和php腳本結合,並且使用crontab來不斷調用一個隊列調度接口cronMission.php,再通過 cronMission.php直接來控制具體什么時候或者是滿足什么條件來執行什么隊列任務。

這里面幾個需要注意的地方
1:往數據庫中存取數據時,不要直接使用json_encode或者json_decode,容易造成一些意外問題,在代碼中,我們定義了a2s和s2a兩個方法,分別是處理數組轉為字符串,和從數據庫中讀取字符串后轉為數組。


2:當任務量比較大,同時服務器負載又沒有充分利用的時候,可以使用多進程並發處理,在並發處理的時候需要考慮一個問題,就是如何避免重復,在這里我們使 用了,對隊列任務進行標記,每次從數據庫中讀取一個進程需要處理的一批任務,使用數據庫中id與批次標示取余等於0的方法來區分,避免不同批次的隊列,重 復處理相同任務。(上面步驟10中有具體實現)

以上內容希望幫助到大家,很多PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限於:分布式架構、高可擴展、高性能、高並發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家  ,需要請戳這里鏈接  或者請看個人主頁以及知乎專欄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM