php中如何實現多進程
一、總結
一句話總結:
php多進程需要pcntl,posix擴展支持
可以通過 php - m 查看,沒安裝的話需要重新編譯php,加上參數--enable-pcntl,posix一般默認會有
1、php多進程使用場景?
日常任務中,有時需要通過php腳本執行一些日志分析,隊列處理等任務,當數據量比較大時,可以使用多進程來處理
2、php的cli模式是什么?
命令行模式:cli : Command Line Interface(命令行接口)
3、php多進程使用限制?
多進程實現只能在cli模式下,在web服務器環境下,會出現無法預期的結果,我測試報錯:Call to undefined function: pcntl_fork()
4、php多進程核心函數?
pcntl_fork(創建子進程)、pcntl_wait(阻塞當前進程)
pcntl_fork: 一次調用兩次返回,在父進程中返回子進程pid,在子進程中返回0,出錯返回-1。 pcntl_wait ( int &$status [, int $options ] ): 阻塞當前進程,直到任意一個子進程退出或收到一個結束當前進程的信號,注意是結束當前進程的信號,子進程結束發送的SIGCHLD不算。使用$status返回子進程的狀態碼,並可以指定第二個參數來說明是否以阻塞狀態調用 阻塞方式調用的,函數返回值為子進程的pid,如果沒有子進程返回值為-1; 非阻塞方式調用,函數還可以在有子進程在運行但沒有結束的子進程時返回0。 pcntl_waitpid ( int $pid , int &$status [, int $options ] ) 功能同pcntl_wait,區別為waitpid為等待指定pid的子進程。當pid為-1時pcntl_waitpid與pcntl_wait 一樣。在pcntl_wait和pcntl_waitpid兩個函數中的$status中存了子進程的狀態信息。
5、php中一個始終保持固定個數的子進程在跑的例子?
根據需求使用pcntl_fork(創建子進程)、pcntl_wait(阻塞當前進程)等核心函數
<?php //最大的子進程數量 $maxChildPro = 8; //當前的子進程數量 $curChildPro = 0; //當子進程退出時,會觸發該函數,當前子進程數-1 function sig_handler($sig) { global $curChildPro; switch ($sig) { case SIGCHLD: echo 'SIGCHLD', PHP_EOL; $curChildPro--; break; } } //配合pcntl_signal使用,簡單的說,是為了讓系統產生時間雲,讓信號捕捉函數能夠捕捉到信號量 declare(ticks = 1); //注冊子進程退出時調用的函數。SIGCHLD:在一個進程終止或者停止時,將SIGCHLD信號發送給其父進程。 pcntl_signal(SIGCHLD, "sig_handler"); while (true) { $curChildPro++; $pid = pcntl_fork(); if ($pid) { //父進程運行代碼,達到上限時父進程阻塞等待任一子進程退出后while循環繼續 if ($curChildPro >= $maxChildPro) { pcntl_wait($status); } } else { //子進程運行代碼 $s = rand(2, 6); sleep($s); echo "child sleep $s second quit", PHP_EOL; exit; } }
二、php多進程總結
參考:php多進程總結
https://www.cnblogs.com/leezhxing/p/5223289.html">php多進程總結
場景:日常任務中,有時需要通過php腳本執行一些日志分析,隊列處理等任務,當數據量比較大時,可以使用多進程來處理。
准備:php多進程需要pcntl,posix擴展支持,可以通過 php - m 查看,沒安裝的話需要重新編譯php,加上參數--enable-pcntl,posix一般默認會有。
注意:
多進程實現只能在cli模式下,在web服務器環境下,會出現無法預期的結果,我測試報錯:
Call to undefined function: pcntl_fork()
一個錯誤 pcntl_fork causing “errno=32 Broken pipe” #474 ,看https://github.com/phpredis/phpredis/issues/474
注意兩點:如果是在循環中創建子進程,那么子進程中最后要exit,防止子進程進入循環。
子進程中的打開連接不能拷貝,使用的還是主進程的,需要用多例模式。
pcntl_fork:
一次調用兩次返回,在父進程中返回子進程pid,在子進程中返回0,出錯返回-1。
pcntl_wait ( int &$status [, int $options ] ):
阻塞當前進程,直到任意一個子進程退出或收到一個結束當前進程的信號,注意是結束當前進程的信號,子進程結束發送的SIGCHLD不算。使用$status返回子進程的狀態碼,並可以指定第二個參數來說明是否以阻塞狀態調用
阻塞方式調用的,函數返回值為子進程的pid,如果沒有子進程返回值為-1;
非阻塞方式調用,函數還可以在有子進程在運行但沒有結束的子進程時返回0。
/** 確保這個函數只能運行在SHELL中 */ if (substr(php_sapi_name(), 0, 3) !== 'cli') { die("cli mode only"); }
#!/bin/bash for((i=1;i<=8;i++)) do /usr/local/bin/php multiprocessTest.php & done wait
上面的shell程序,列了一個很簡單的多進程程序,用一個for循環,實現了8進程並發來跑multiprocessTest.php這個程序。最后的wait語句,也可以使主進程,再等待所有進程都執行完后再往下執行的需求。
這個程序是沒有問題的,很多現有的代碼也都這樣實現,但是這個程序的並發數是不可控的,即我們無法根據機器的核數去調度每一個進程的開關。
若我們的機器有8核或者更多,上面的程序是沒有問題的,所有核都能充分利用,並且互相之間,沒有爭搶資源的情況出現。
但我們的機器要沒有8核的話會是什么情況,同一時間運行的進程數多於核數,那么系統就會出現進程分配調度的問題,爭搶資源也跟着相應而來,一個進程不能保證獨立連續的執行,所有的進程運行會聽從系統的調度,這樣就會有更多的不確定因素出現。
<?php //最大的子進程數量 $maxChildPro = 8; //當前的子進程數量 $curChildPro = 0; //當子進程退出時,會觸發該函數,當前子進程數-1 function sig_handler($sig) { global $curChildPro; switch ($sig) { case SIGCHLD: echo 'SIGCHLD', PHP_EOL; $curChildPro--; break; } } //配合pcntl_signal使用,簡單的說,是為了讓系統產生時間雲,讓信號捕捉函數能夠捕捉到信號量 declare(ticks = 1); //注冊子進程退出時調用的函數。SIGCHLD:在一個進程終止或者停止時,將SIGCHLD信號發送給其父進程。 pcntl_signal(SIGCHLD, "sig_handler"); while (true) { $curChildPro++; $pid = pcntl_fork(); if ($pid) { //父進程運行代碼,達到上限時父進程阻塞等待任一子進程退出后while循環繼續 if ($curChildPro >= $maxChildPro) { pcntl_wait($status); } } else { //子進程運行代碼 $s = rand(2, 6); sleep($s); echo "child sleep $s second quit", PHP_EOL; exit; } }
<?php $childs = array(); // Fork10個子進程 for ($i = 0; $i < 10; $i++) { $pid = pcntl_fork(); if ($pid == -1) die('Could not fork'); if ($pid) { echo "parent \n"; $childs[] = $pid; } else { // Sleep $i+1 (s). 子進程可以得到$i參數 sleep($i + 1); // 子進程需要exit,防止子進程也進入for循環 exit(); } } while (count($childs) > 0) { foreach ($childs as $key => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); //-1代表error, 大於0代表子進程已退出,返回的是子進程的pid,非阻塞時0代表沒取到退出子進程 if ($res == -1 || $res > 0) unset($childs[$key]); } sleep(1); }
<?php function _fetchLog() { $password = $this->_getPassword(); $online_log_path = NginxConf::getArchiveDir($this->_stat_day); $task_log_path = QFrameConfig::getConfig('LOG_PATH'); $children = array(); $success = true; foreach($this->_server_list as $host => $value) { $local_dir = $this->_prepareLocalDir($host); $task_log = "$task_log_path/fetch_log.$host"; $cmd = "sshpass -p $password rsync -av -e 'ssh -o StrictHostKeyChecking=no' $host:$online_log_path/* $local_dir >> $task_log 2>&1"; $pid = pcntl_fork(); if(-1 === $pid) { LogSvc::log('stat_pv_by_citycode_error', 'could not fork'); exit('could not fork'); } else if(0 === $pid) { system($cmd, $return_value); if(0 !== $return_value) { LogSvc::log('stat_pv_by_citycode_error', "rsync $host error"); } exit($return_value); } else { $children[$pid] = 1; } } while(!empty($children)) { $pid = pcntl_waitpid(-1, $status, WNOHANG); if(0 === $pid) { sleep(1); } else { if(0 !== pcntl_wexitstatus($status)) { $success = false; } unset($children[$pid]); } } return $success; }
posix_kill(posix_getpid(), SIGHUP); 為自己生成SIGHUP信號
declare(ticks = 1); //php < 5.3
pcntl_signal()
函數僅僅是注冊信號和它的處理方法,真正接收到信號並調用其處理方法的是pcntl_signal_dispatch()
函數
必須在循環里調用,為了檢測是否有新的信號等待dispatching。
pcntl_signal_dispatch()
declare(ticks = 1);
表示每執行一條低級指令,就檢查一次信號,如果檢測到注冊的信號,就調用其信號處理器。
kill [PID]
命令,未加任何其他參數的話,程序會接收到一個SIGTERM信號。
<?php // 定義一個處理器,接收到SIGINT信號后只輸出一行信息 function signalHandler($signal) { if ($signal == SIGINT) { echo 'SIGINT', PHP_EOL; } } // 信號注冊:當接收到SIGINT信號時,調用signalHandler()函數 pcntl_signal(SIGINT, 'signalHandler'); /** * PHP < 5.3 使用 * 配合pcntl_signal使用,表示每執行一條低級指令,就檢查一次信號,如果檢測到注冊的信號,就調用其信號處理器。 */ if (!function_exists("pcntl_signal_dispatch")) { declare(ticks=1); } while (true) { $s = sleep(10); echo $s, PHP_EOL; //信號會喚醒sleep,返回剩余的秒數。 // do something for ($i = 0; $i < 5; $i++) { echo $i . PHP_EOL; usleep(100000); } /** * PHP >= 5.3 * 調用已安裝的信號處理器 * 必須在循環里調用,為了檢測是否有新的信號等待dispatching。 */ if (!function_exists("pcntl_signal_dispatch")) { pcntl_signal_dispatch(); } }
<?php declare(ticks = 1); function signal_handler($signal) { print "Caught SIGALRM\n"; pcntl_alarm(5); } pcntl_signal(SIGALRM, "signal_handler", true); pcntl_alarm(5); for(;;) { }
<?php /** * 父進程通過pcntl_wait等待子進程退出 * 子進程通過信號kill自己,也可以在父進程中發送kil信號結束子進程 */ //生成子進程 $pid = pcntl_fork(); if($pid == -1){ die('could not fork'); }else{ if($pid){ $status = 0; //阻塞父進程,直到子進程結束,不適合需要長時間運行的腳本. //可使用pcntl_wait($status, WNOHANG)實現非阻塞式 pcntl_wait($status); exit; }else{ //結束當前子進程,以防止生成僵屍進程 if(function_exists("posix_kill")){ posix_kill(getmypid(), SIGTERM); }else{ system('kill -9'. getmypid()); } exit; } }
(1) 父進程通過wait和waitpid等函數等待子進程結束,這會導致父進程掛起。它不適合子進程需要長時間運行的情況(會導致超時)。
執行wait()或waitpid()系統調用,則子進程在終止后會立即把它在進程表中的數據返回給父進程,此時系統會立即刪除該進入點。在這種情形下就不會產生defunct進程。
(2) 如果父進程很忙,那么可以用signal函數為SIGCHLD安裝handler。在子進程結束后,父進程會收到該信號,可以在handler中調用wait回收。
(3) 如果父進程不關心子進程什么時候結束,那么可以用signal(SIGCLD, SIG_IGN)或signal(SIGCHLD, SIG_IGN)通知內核,自己對子進程的結束不感興趣,那么子進程結束后,內核會回收,並不再給父進程發送信號
(4)fork兩次,父進程fork一個子進程,然后繼續工作,子進程fork一個孫進程后退出,那么孫進程被init接管,孫進程結束后,init會回收。不過子進程的回收還要自己做。

ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'
命令注解:
-A 參數列出所有進程
-o 自定義輸出字段 我們設定顯示字段為 stat(狀態), ppid(進程父id), pid(進程id),cmd(命令)這四個參數
狀態為 z或者Z 的進程為僵屍進程,所以我們使用grep抓取stat狀態為zZ進程
運行結果如下:

這時,可以使用 kill -HUP 5255 殺掉這個進程。如果再次查看僵屍進程還存在,可以kill -HUP 5253(父進程)。
如果有多個僵屍進程,可以通過
ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'|awk 'print{$2}'|xargs kill -9
處理。
多進程--進程間通信(IPC)
場景:日常任務中,有時需要通過php腳本執行一些日志分析,隊列處理等任務,當數據量比較大時,可以使用多進程來處理。
准備:php多進程需要pcntl,posix擴展支持,可以通過 php - m 查看,沒安裝的話需要重新編譯php,加上參數--enable-pcntl,posix一般默認會有。
注意:
多進程實現只能在cli模式下,在web服務器環境下,會出現無法預期的結果,我測試報錯:
Call to undefined function: pcntl_fork()
一個錯誤 pcntl_fork causing “errno=32 Broken pipe” #474 ,看https://github.com/phpredis/phpredis/issues/474
注意兩點:如果是在循環中創建子進程,那么子進程中最后要exit,防止子進程進入循環。
子進程中的打開連接不能拷貝,使用的還是主進程的,需要用多例模式。
pcntl_fork:
一次調用兩次返回,在父進程中返回子進程pid,在子進程中返回0,出錯返回-1。
pcntl_wait ( int &$status [, int $options ] ):
阻塞當前進程,直到任意一個子進程退出或收到一個結束當前進程的信號,注意是結束當前進程的信號,子進程結束發送的SIGCHLD不算。使用$status返回子進程的狀態碼,並可以指定第二個參數來說明是否以阻塞狀態調用
阻塞方式調用的,函數返回值為子進程的pid,如果沒有子進程返回值為-1;
非阻塞方式調用,函數還可以在有子進程在運行但沒有結束的子進程時返回0。
/** 確保這個函數只能運行在SHELL中 */ if (substr(php_sapi_name(), 0, 3) !== 'cli') { die("cli mode only"); }
#!/bin/bash for((i=1;i<=8;i++)) do /usr/local/bin/php multiprocessTest.php & done wait
上面的shell程序,列了一個很簡單的多進程程序,用一個for循環,實現了8進程並發來跑multiprocessTest.php這個程序。最后的wait語句,也可以使主進程,再等待所有進程都執行完后再往下執行的需求。
這個程序是沒有問題的,很多現有的代碼也都這樣實現,但是這個程序的並發數是不可控的,即我們無法根據機器的核數去調度每一個進程的開關。
若我們的機器有8核或者更多,上面的程序是沒有問題的,所有核都能充分利用,並且互相之間,沒有爭搶資源的情況出現。
但我們的機器要沒有8核的話會是什么情況,同一時間運行的進程數多於核數,那么系統就會出現進程分配調度的問題,爭搶資源也跟着相應而來,一個進程不能保證獨立連續的執行,所有的進程運行會聽從系統的調度,這樣就會有更多的不確定因素出現。
<?php //最大的子進程數量 $maxChildPro = 8; //當前的子進程數量 $curChildPro = 0; //當子進程退出時,會觸發該函數,當前子進程數-1 function sig_handler($sig) { global $curChildPro; switch ($sig) { case SIGCHLD: echo 'SIGCHLD', PHP_EOL; $curChildPro--; break; } } //配合pcntl_signal使用,簡單的說,是為了讓系統產生時間雲,讓信號捕捉函數能夠捕捉到信號量 declare(ticks = 1); //注冊子進程退出時調用的函數。SIGCHLD:在一個進程終止或者停止時,將SIGCHLD信號發送給其父進程。 pcntl_signal(SIGCHLD, "sig_handler"); while (true) { $curChildPro++; $pid = pcntl_fork(); if ($pid) { //父進程運行代碼,達到上限時父進程阻塞等待任一子進程退出后while循環繼續 if ($curChildPro >= $maxChildPro) { pcntl_wait($status); } } else { //子進程運行代碼 $s = rand(2, 6); sleep($s); echo "child sleep $s second quit", PHP_EOL; exit; } }
<?php $childs = array(); // Fork10個子進程 for ($i = 0; $i < 10; $i++) { $pid = pcntl_fork(); if ($pid == -1) die('Could not fork'); if ($pid) { echo "parent \n"; $childs[] = $pid; } else { // Sleep $i+1 (s). 子進程可以得到$i參數 sleep($i + 1); // 子進程需要exit,防止子進程也進入for循環 exit(); } } while (count($childs) > 0) { foreach ($childs as $key => $pid) { $res = pcntl_waitpid($pid, $status, WNOHANG); //-1代表error, 大於0代表子進程已退出,返回的是子進程的pid,非阻塞時0代表沒取到退出子進程 if ($res == -1 || $res > 0) unset($childs[$key]); } sleep(1); }
<?php function _fetchLog() { $password = $this->_getPassword(); $online_log_path = NginxConf::getArchiveDir($this->_stat_day); $task_log_path = QFrameConfig::getConfig('LOG_PATH'); $children = array(); $success = true; foreach($this->_server_list as $host => $value) { $local_dir = $this->_prepareLocalDir($host); $task_log = "$task_log_path/fetch_log.$host"; $cmd = "sshpass -p $password rsync -av -e 'ssh -o StrictHostKeyChecking=no' $host:$online_log_path/* $local_dir >> $task_log 2>&1"; $pid = pcntl_fork(); if(-1 === $pid) { LogSvc::log('stat_pv_by_citycode_error', 'could not fork'); exit('could not fork'); } else if(0 === $pid) { system($cmd, $return_value); if(0 !== $return_value) { LogSvc::log('stat_pv_by_citycode_error', "rsync $host error"); } exit($return_value); } else { $children[$pid] = 1; } } while(!empty($children)) { $pid = pcntl_waitpid(-1, $status, WNOHANG); if(0 === $pid) { sleep(1); } else { if(0 !== pcntl_wexitstatus($status)) { $success = false; } unset($children[$pid]); } } return $success; }
posix_kill(posix_getpid(), SIGHUP); 為自己生成SIGHUP信號
declare(ticks = 1); //php < 5.3
pcntl_signal()
函數僅僅是注冊信號和它的處理方法,真正接收到信號並調用其處理方法的是pcntl_signal_dispatch()
函數
必須在循環里調用,為了檢測是否有新的信號等待dispatching。
pcntl_signal_dispatch()
declare(ticks = 1);
表示每執行一條低級指令,就檢查一次信號,如果檢測到注冊的信號,就調用其信號處理器。
kill [PID]
命令,未加任何其他參數的話,程序會接收到一個SIGTERM信號。
<?php // 定義一個處理器,接收到SIGINT信號后只輸出一行信息 function signalHandler($signal) { if ($signal == SIGINT) { echo 'SIGINT', PHP_EOL; } } // 信號注冊:當接收到SIGINT信號時,調用signalHandler()函數 pcntl_signal(SIGINT, 'signalHandler'); /** * PHP < 5.3 使用 * 配合pcntl_signal使用,表示每執行一條低級指令,就檢查一次信號,如果檢測到注冊的信號,就調用其信號處理器。 */ if (!function_exists("pcntl_signal_dispatch")) { declare(ticks=1); } while (true) { $s = sleep(10); echo $s, PHP_EOL; //信號會喚醒sleep,返回剩余的秒數。 // do something for ($i = 0; $i < 5; $i++) { echo $i . PHP_EOL; usleep(100000); } /** * PHP >= 5.3 * 調用已安裝的信號處理器 * 必須在循環里調用,為了檢測是否有新的信號等待dispatching。 */ if (!function_exists("pcntl_signal_dispatch")) { pcntl_signal_dispatch(); } }
<?php declare(ticks = 1); function signal_handler($signal) { print "Caught SIGALRM\n"; pcntl_alarm(5); } pcntl_signal(SIGALRM, "signal_handler", true); pcntl_alarm(5); for(;;) { }
<?php /** * 父進程通過pcntl_wait等待子進程退出 * 子進程通過信號kill自己,也可以在父進程中發送kil信號結束子進程 */ //生成子進程 $pid = pcntl_fork(); if($pid == -1){ die('could not fork'); }else{ if($pid){ $status = 0; //阻塞父進程,直到子進程結束,不適合需要長時間運行的腳本. //可使用pcntl_wait($status, WNOHANG)實現非阻塞式 pcntl_wait($status); exit; }else{ //結束當前子進程,以防止生成僵屍進程 if(function_exists("posix_kill")){ posix_kill(getmypid(), SIGTERM); }else{ system('kill -9'. getmypid()); } exit; } }
(1) 父進程通過wait和waitpid等函數等待子進程結束,這會導致父進程掛起。它不適合子進程需要長時間運行的情況(會導致超時)。
執行wait()或waitpid()系統調用,則子進程在終止后會立即把它在進程表中的數據返回給父進程,此時系統會立即刪除該進入點。在這種情形下就不會產生defunct進程。
(2) 如果父進程很忙,那么可以用signal函數為SIGCHLD安裝handler。在子進程結束后,父進程會收到該信號,可以在handler中調用wait回收。
(3) 如果父進程不關心子進程什么時候結束,那么可以用signal(SIGCLD, SIG_IGN)或signal(SIGCHLD, SIG_IGN)通知內核,自己對子進程的結束不感興趣,那么子進程結束后,內核會回收,並不再給父進程發送信號
(4)fork兩次,父進程fork一個子進程,然后繼續工作,子進程fork一個孫進程后退出,那么孫進程被init接管,孫進程結束后,init會回收。不過子進程的回收還要自己做。

ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'
命令注解:
-A 參數列出所有進程
-o 自定義輸出字段 我們設定顯示字段為 stat(狀態), ppid(進程父id), pid(進程id),cmd(命令)這四個參數
狀態為 z或者Z 的進程為僵屍進程,所以我們使用grep抓取stat狀態為zZ進程
運行結果如下:

這時,可以使用 kill -HUP 5255 殺掉這個進程。如果再次查看僵屍進程還存在,可以kill -HUP 5253(父進程)。
如果有多個僵屍進程,可以通過
ps -A -ostat,ppid,pid,cmd | grep -e '^[Zz]'|awk 'print{$2}'|xargs kill -9
處理。
多進程--進程間通信(IPC)
多進程--守護進程
多進程--socket實現簡單TCP server
<?php static public function sendSDKMsg($version) {/*{{{*/ if(!self::sendRandChance(self::EA_LAST_TIME_KEY.":".$version)) return false; $fp = @fsockopen( "udp://".self::UDP_HOST , self::UDP_PORT , $errno ); if( !$fp ) return false; stream_set_timeout( $fp , 0 , 100 ); stream_set_blocking( $fp , 0 ); $sysinfo = posix_uname(); $msg = $version." - ".$sysinfo['nodename']." - ".date('Y-m-d H:i:s',time()); $res = fwrite( $fp , $msg ); fclose($fp); }/*}}}*/ static public function sendRandChance($key) {/*{{{*/ $now = microtime(true); if(function_exists("eaccelerator_get")) { $lastInserTime = eaccelerator_get($key); if(!$lastInserTime) $lastInserTime = 0; if( ($now - $lastInserTime) < self::SEND_INTERVAL ) return false; eaccelerator_put($key, $now); return true; }else if(function_exists("apc_fetch")) { $lastInserTime = apc_fetch($key); if(!$lastInserTime) $lastInserTime = 0; if( ($now - $lastInserTime) < self::SEND_INTERVAL ) return false; apc_store($key, $now); return true; } $rand = rand(1,60); if((time()%60 == $rand) && rand(0,20) == 3) { return true; } return false; }/*}}}*/