前面幾節都是講解pcntl擴展實現的多進程程序。本節給大家介紹swoole擴展的swoole_process
模塊。
swoole多進程
swoole_process 是swoole提供的進程管理模塊,用來替代PHP的pcntl擴展。
首先,確保安裝的swoole版本大於1.7.2:
$ php --ri swoole
swoole
swoole support => enabled
Version => 1.10.1
注意:swoole_process在最新的1.8.0版本已經禁止在Web環境中使用了,所以也只能支持命令行。
swoole提供的多進程擴展基本功能和pcntl提供的一樣,但swoole更易簡單上手,並且提供了:
- 默認基於unixsock的進程間通信;
- 支持消息隊列作為進程間通信;
- 基於signalfd和eventloop處理信號,幾乎沒有任何額外消耗;
- 高精度微秒定時器;
- 配合swoole_event模塊,創建的PHP子進程可以異步的事件驅動模式
swoole_process 模塊提供的方法(Method)主要分為四部分:
- 基礎方法
swoole_process::__construct
swoole_process->start
swoole_process->name
swoole_process->exec
swoole_process->close
swoole_process->exit
swoole_process::kill
swoole_process::wait
swoole_process::daemon
swoole_process::setAffinity
- 管道通信
swoole_process->write
swoole_process->read
swoole_process->setTimeout
swoole_process->setBlocking
- 消息隊列通信
swoole_process->useQueue
swoole_process->statQueue
swoole_process->freeQueue
swoole_process->push
swoole_process->pop
- 信號與定時器
swoole_process::signal
swoole_process::alarm
基礎應用
本例實現的是tcp server,特性:
- 多進程處理客戶端連接
- 子進程退出,Master進程會重新創建一個
- 支持事件回調
- 主進程退出,子進程在干完手頭活后退出
<?php
class TcpServer{
const MAX_PROCESS = 3;//最大進程數
private $pids = []; //存儲子進程pid
private $socket;
private $mpid;
public function run(){
$process = new swoole_process(function(){
$this->mpid = $id = getmypid();
echo time()." Master process, pid {$id}\n";
//創建tcp server
$this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
if(!$this->socket) exit("start server err: $errstr --- $errno");
for($i=0; $i<self::MAX_PROCESS;$i++){
$this->start_worker_process();
}
echo "waiting client...\n";
//Master進程等待子進程退出,必須是死循環
while(1){
foreach($this->pids as $k=>$pid){
if($pid){
$res = swoole_process::wait(false);
if ( $res ){
echo time()." Worker process $pid exit, will start new... \n";
$this->start_worker_process();
unset($this->pids[$k]);
}
}
}
sleep(1);//讓出1s時間給CPU
}
}, false, false); //不啟用管道通信
swoole_process::daemon(); //守護進程
$process->start();//注意:start之后的變量子進程里面是獲取不到的
}
/**
* 創建worker進程,接受客戶端連接
*/
private function start_worker_process(){
$process = new swoole_process(function(swoole_process $worker){
$this->acceptClient($worker);
}, false, false);
$pid = $process->start();
$this->pids[] = $pid;
}
private function acceptClient(&$worker)
{
//子進程一直等待客戶端連接,不能退出
while(1){
$conn = stream_socket_accept($this->socket, -1);
if($this->onConnect) call_user_func($this->onConnect, $conn); //回調連接事件
//開始循環讀取消息
$recv = ''; //實際收到消息
$buffer = ''; //緩沖消息
while(1){
$this->checkMpid($worker);
$buffer = fread($conn, 20);
//沒有收到正常消息
if($buffer === false || $buffer === ''){
if($this->onClose) call_user_func($this->onClose, $conn); //回調斷開連接事件
break;//結束讀取消息,等待下一個客戶端連接
}
$pos = strpos($buffer, "\n"); //消息結束符
if($pos === false){
$recv .= $buffer;
}else{
$recv .= trim(substr($buffer, 0, $pos+1));
if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回調收到消息事件
//客戶端強制關閉連接
if($recv == "quit"){
echo "client close conn\n";
fclose($conn);
break;
}
$recv = ''; //清空消息,准備下一次接收
}
}
}
}
//檢查主進程是否存在,若不存在子進程在干完手頭活后退出
public function checkMpid(&$worker){
if(!swoole_process::kill($this->mpid,0)){
$worker->exit();
// 這句提示,實際是看不到的.需要寫到日志中
echo "Master process exited, I [{$worker['pid']}] also quit\n";
}
}
function __destruct() {
@fclose($this->socket);
}
}
$server = new TcpServer();
$server->onConnect = function($conn){
echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
fwrite($conn,"conn success\n");
};
$server->onMessage = function($conn,$msg){
echo "onMessage --" . $msg . "\n";
fwrite($conn,"received ".$msg."\n");
};
$server->onClose = function($conn){
echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
fwrite($conn,"onClose "."\n");
};
$server->run();
運行后可以使用telnet連接:
telnet 127.0.0.1 9201
由於設置了最大三個子進程,最多只能接受3個客戶端連接。
進程間通信
前面講解的例子里,主進程和子進程直接是沒有直接的數據交互的。如果主進程需要得到的來自子進程的反饋,或者子進程接受來自主進程的數據,那么就需要進程間通信了。
swoole內置了管道通信和消息隊列通信。
管道通信
管道通信主要是數據傳輸:一個進程需要將數據發送給另外一個進程。
這個swoole封裝后,使用非常簡單:
<?php
$workers = [];
for ($i=0; $i<3; $i++) {
$process = new swoole_process(function(swoole_process $worker){
//子進程邏輯
$cmd = $worker->read();
ob_start();
passthru($cmd);//執行外部程序並且顯示未經處理的、原始輸出,會直接打印輸出。
$return = ob_get_clean() ? : ' ';
$return = trim($return).". worker pid:".$worker->pid."\n";
// $worker->write($return);//寫入數據到管道
echo $return;//寫入數據到管道。注意:子進程里echo也是寫入到管道
}, true); //第二個參數為true,啟用管道通信
$pid = $process->start();
$workers[$pid] = $process;
}
foreach($workers as $pid=>$worker){
$worker->write('whoami'); //通過管道發數據到子進程。管道是單向的:發出的數據必須由另一端讀取。不能讀取自己發出去的
$recv = $worker->read();//同步阻塞讀取管道數據
echo "recv result: $recv";
}
//回收子進程
while(count($workers)){
// echo time(). "\n";
foreach($workers as $pid=>$worker){
$ret = swoole_process::wait(false);
if($ret){
echo "worker exit: $pid\n";
unset($workers[$pid]);
}
}
}
運行:
$ php swoole_process_pipe.php
recv result: Linux
recv result: 2018年 06月 24日 星期日 16:18:01 CST
recv result: yjc
worker exit: 14519
worker exit: 14522
worker exit: 14525
注意點:
1、管道數據讀取是同步阻塞的;上面的例子里如果子進程里再加一句$worker->read()
,會一直阻塞。可以使用swoole_event_add
將管道加入到事件循環中,變為異步模式。
2、子進程里的輸出(例如echo)與write效果相同。
3、通過管道發數據到子進程。管道是單向的:發出的數據必須由另一端讀取。不能讀取自己發出去的。
這里額外講解一下swoole_process::wait()
:
1、swoole_process::wait()
默認是阻塞的, swoole_process::wait(false)
則是非阻塞的;
2、swoole_process::wait()
阻塞模式調用一次僅能回收一個子進程,非阻塞模式調用一次不一定能當前就能回收子進程;
3、如果不加swoole_process::wait()
,主進程又是死循環,主進程退出后會變成僵屍進程。
ps -A -o stat,ppid,pid,cmd | grep -e '^[Zz]'
可以查詢僵屍進程。
防盜版聲明:本文系原創文章,發布於公眾號飛鴻影的博客
(fhyblog)及博客園,轉載需作者同意。
消息隊列通信
消息隊列與管道有些不一樣:消息隊列是全局的,所有進程都可以發送、讀取。你可以把它看做redis list結構。
消息隊列更常見的用途是主進程分配任務,子進程消費執行。
<?php
$workers = [];
for ($i=0; $i<3; $i++) {
$process = new swoole_process(function(swoole_process $worker){
//子進程邏輯
sleep(1); //防止父進程還未往消息隊列中加入內容直接退出
while($cmd = $worker->pop()){
// echo "recv from master: $cmd\n";
ob_start();
passthru($cmd);//執行外部程序並且顯示未經處理的、原始輸出,會直接打印輸出。
$return = ob_get_clean() ? : ' ';
$return = "res: ".trim($return).". worker pid: ".$worker->pid."\n";
echo $return;
// sleep(1);
}
$worker->exit(0);
}, false, false); //不創建管道
$process->useQueue(1, 2 | swoole_process::IPC_NOWAIT); //使用消息隊列
$pid = $process->start();
$workers[$pid] = $process;
}
//由於所有進程是共享使用一個消息隊列,所以只需向一個子進程發送消息即可
$worker = current($workers);
for ($i=0; $i<3; $i++) {
$worker->push('whoami'); //發送消息
}
//回收子進程
while(count($workers)){
foreach($workers as $pid=>$worker){
$ret = swoole_process::wait();
if($ret){
echo "worker exit: $pid\n";
unset($workers[$pid]);
}
}
}
運行結果:
$ php swoole_process_quene.php
res: yjc. worker pid: 15885
res: yjc. worker pid: 15886
res: yjc. worker pid: 15887
worker exit: 15885
worker exit: 15886
worker exit: 15887
注意點:
1、所有進程共享使用一個消息隊列;
2、消息隊列的讀取操作是阻塞的,可以在useQueue
的時候第2個參數mode改為2 | swoole_process::IPC_NOWAIT
,則是異步的。mode僅僅設置為2
是阻塞的,示例里去掉swoole_process::IPC_NOWAIT
后讀取消息的while會死循環。
3、子進程前面加了個sleep(1);
,這是為了防止父進程還未往消息隊列中加入內容直接退出。
4、子進程末尾也加了sleep,這是為了防止一個進程把所有消息都消費完了,實際應用需要去掉。
信號與定時器
swoole_process::alarm
支持微秒定時器:
<?php
function ev_timer(){
static $i = 0;
echo "#{$i}\talarm\n";
$i++;
if ($i > 5) {
//清除定時器
swoole_process::alarm(-1);
//退出進程
swoole_process::kill(getmypid());
}
}
//安裝信號
swoole_process::signal(SIGALRM, 'ev_timer');
//觸發定時器信號:單位為微秒。如果為負數表示清除定時器
swoole_process::alarm(100 * 1000);//100ms
echo getmypid()."\n"; //該句會順序執行,后續無需使用while循環防止進程直接退出
運行:
$ php swoole_process_alarm.php
13660
#0 alarm
#1 alarm
#2 alarm
#3 alarm
#4 alarm
#5 alarm
注:alarm不能和
Swoole\Timer
同時使用。
參考
1、Process-Swoole-Swoole文檔中心
https://wiki.swoole.com/wiki/page/p-process.html