Swoole從入門到入土(23)——多進程[進程池Process\Pool]


Swoole提供的進程池為Process\Pool,基於 Swoole\Server 的 Manager 管理進程模塊實現。可管理多個工作進程。該模塊的核心功能為進程管理,相比 Process 實現多進程,Process\Pool 更加簡單,封裝層次更高,開發者無需編寫過多代碼即可實現進程管理功能,配合 Co\Server 可以創建純協程風格的,能利用多核 CPU 的服務端程序。同樣,我們同過一段代碼引出本節的詳細討論:

$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    echo "Worker#{$workerId} is started\n";
    $redis = new Redis();
    $redis->pconnect('127.0.0.1', 6379);
    $key = "key1";
    while (true) {
         $msg = $redis->brpop($key, 2);
         if ( $msg == null) continue;
         var_dump($msg);
     }
});

$pool->on("WorkerStop", function ($pool, $workerId) {
    echo "Worker#{$workerId} is stopped\n";
});

$pool->start();

現在讓我們一起了解,這個類為我們提供了哪些成員。

 

常量

 

成員函數

1) __construct():構造方法

Swoole\Process\Pool::__construct(int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0, bool $enable_coroutine = false);

$worker_num:工作進程的數量(當一個進程退出后,Pool會及時拉取另一個進程進行補充)

$ipc_type:進程間通信的模式【默認為 0 表示不使用任何進程間通信特性】,這個參數需要注意如下事項:

                   - 設置為 0 時必須設置 onWorkerStart 回調,並且必須在 onWorkerStart 中實現循環邏輯,當 onWorkerStart 函數退出時工作進程會立即退出,之后會由 Manager 進程重新拉起進程;

                   - 設置為 SWOOLE_IPC_MSGQUEUE 表示使用系統消息隊列通信,可設置 $msgqueue_key 指定消息隊列的 KEY,未設置消息隊列 KEY,將申請私有隊列;

                   - 設置為 SWOOLE_IPC_SOCKET 表示使用 Socket 進行通信,需要使用 listen 方法指定監聽的地址和端口;

                   - 設置為 SWOOLE_IPC_UNIXSOCK 表示使用 unixSocket 進行通信,協程模式下使用,強烈推薦用此種方式進程間通訊

                   - 使用非 0 設置時,必須設置 onMessage 回調,onWorkerStart 變更為可選。

$msgqueue_key:消息列隊的key

$enable_coroutine:是否開啟協程支持【使用協程后將無法設置 onMessage 回調】

示例1:

$pool = new Swoole\Process\Pool(1, SWOOLE_IPC_NONE, 0, true);

$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
    while (true) {
        Co::sleep(0.5);
        echo "hello world\n";
    }
});

$pool->start();

示例2:開啟協程后 Swoole 會禁止設置 onMessage 事件回調,需要進程間通訊的話需要將第二個設置為 SWOOLE_IPC_UNIXSOCK 表示使用 unixSocket 進行通信,然后使用 $pool->getProcess()->exportSocket() 導出 Coroutine\Socket 對象,實現 Worker 進程間通信。

$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_UNIXSOCK, 0, true);

$pool->on('workerStart', function (Swoole\Process\Pool $pool, int $workerId) {
   $process = $pool->getProcess(0);
   $socket = $process->exportSocket();
   if ($workerId == 0) {
       echo $socket->recv();
       $socket->send("hello proc1\n");
       echo "proc0 stop\n";
   } else {
       $socket->send("hello proc0\n");
       echo $socket->recv();
       echo "proc1 stop\n";
       $pool->shutdown();
   }
});

$pool->start();

 

2) set():設置參數

Swoole\Process\Pool->set(array $settings)

可以使用 enable_coroutine 來控制是否啟用協程,和構造函數的第四個參數作用一致。

Swoole\Process\Pool->set(['enable_coroutine' => true]);

 

3) on():設置進程池回調函數

Swoole\Process\Pool->on(string $event, callable $function);

$event:指定事件

$function:回調函數

事件說明:

- onWorkerStart:子進程啟動,回調函數格式:

function onWorkerStart(Swoole\Process\Pool $pool, int $workerId) {
    echo "Worker#{$workerId} is started\n";
}

- onWorkerStop:子進程結束,回調函數同onWorkerStart。

- onMessage:消息接收,收到外部投遞的消息。 一次連接只能投遞一次消息,類似於 PHP-FPM 的短連接機制。回調函數格式如下:

function onMessage(Swoole\Process\Pool $pool, string $data) {
    var_dump($data);
}

 

4) listen():監聽 SOCKET,必須在 $ipc_mode = SWOOLE_IPC_SOCKET 時才能使用。

Swoole\Process\Pool->listen(string $host, int $port = 0, int $backlog = 2048): bool

$host:監聽的地址【支持 TCP 和 unixSocket 兩種類型。127.0.0.1 表示監聽 TCP 地址,需要指定 $port。unix:/tmp/php.sock 監聽 unixSocket 地址。】

$port:監聽的端口【在 TCP 模式下需要指定。】

$backlog:監聽的隊列長度

返回值:成功監聽返回 true、監聽失敗返回 false,可調用 swoole_errno 獲取錯誤碼。監聽失敗后,調用 start 時會立即返回 false。

注意:向監聽端口發送數據時,客戶端必須在請求前增加 4 字節、網絡字節序的長度值。協議格式為:packet = htonl(strlen(data)) + data;

示例:

$pool->listen('127.0.0.1', 8089);
$pool->listen('unix:/tmp/php.sock');

 

5) write():向對端寫入數據,必須在 $ipc_mode 為 SWOOLE_IPC_SOCKET 時才能使用。

Swoole\Process\Pool->write(string $data): bool

$data:寫入的數據內容【可多次調用 write,底層會在 onMessage 函數退出后將數據全部寫入 socket 中,並 close 連接】

說明:此方法為內存操作,沒有 IO 消耗,發送數據操作是同步阻塞 IO

示例:

/**** 服務端 ****/
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET);

$pool->on("Message", function ($pool, $message) {
    echo "Message: {$message}\n";
    $pool->write("hello ");
    $pool->write("world ");
    $pool->write("\n");
});

$pool->listen('127.0.0.1', 8089);
$pool->start();


/**** 客戶端 ****/
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
fwrite($fp, pack('N', strlen($msg)) . $msg);
sleep(1);
//將顯示 hello world\n
$data = fread($fp, 8192);
var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
fclose($fp);

 

6) start():啟動工作進程

Swoole\Process\Pool->start(): bool

說明1:

啟動成功,當前進程進入 wait 狀態,管理工作進程;

啟動失敗,返回 false,可使用 swoole_errno 獲取錯誤碼。

說明2:關於進程管理:

- 某個工作進程遇到致命錯誤、主動退出時管理器會進行回收,避免出現僵屍進程
- 工作進程退出后,管理器會自動拉起、創建一個新的工作進程
- 主進程收到 SIGTERM 信號時將停止 fork 新進程,並 kill 所有正在運行的工作進程
- 主進程收到 SIGUSR1 信號時將將逐個 kill 正在運行的工作進程,並重新啟動新的工作進程

說明3:信號處理:

底層僅設置了主進程(管理進程)的信號處理,並未對 Worker 工作進程設置信號,需要開發者自行實現信號的監聽。

- 工作進程為異步模式,請使用 Swoole\Process::signal 監聽信號
- 工作進程為同步模式,請使用 pcntl_signal 和 pcntl_signal_dispatch 監聽信號

在工作進程中應當監聽 SIGTERM 信號,當主進程需要終止該進程時,會向此進程發送 SIGTERM 信號。如果工作進程未監聽 SIGTERM 信號,底層會強行終止當前進程,造成部分邏輯丟失。

示例:

$pool->on("WorkerStart", function ($pool, $workerId) {
    $running = true;
    pcntl_signal(SIGTERM, function () use (&$running) {
        $running = false;
    });
    echo "Worker#{$workerId} is started\n";
    $redis = new Redis();
    $redis->pconnect('127.0.0.1', 6379);
    $key = "key1";
    while ($running) {
         $msg = $redis->brpop($key);
         pcntl_signal_dispatch();
         if ( $msg == null) continue;
         var_dump($msg);
     }
});

 

7)  shutdown():終止工作進程

Swoole\Process\Pool->shutdown(): bool

 

8)  getProcess():獲取當前工作進程對象。返回 Swoole\Process 對象。

Swoole\Process\Pool->getProcess(int $worker_id): Swoole\Process;

$worker_id:指定獲取 worker 【可選參數,默認當前 worker】

注意:

必須在 start 之后,在工作進程的 onWorkerStart 或其他回調函數中調用;

返回的 Process 對象是單例模式,在工作進程中重復調用 getProcess() 將返回同一個對象。

示例:

$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    $process = $pool->getProcess();
    $process->exec("/bin/sh", ["ls", '-l']);
});

$pool->on("WorkerStop", function ($pool, $workerId) {
    echo "Worker#{$workerId} is stopped\n";
});

$pool->start();

 

Swoole提供的進程池成員簡練,使用方便。以上就是進程池的所有內容,下一節我們將討論進程管理器Manager:)

 

 

 

---------------------------  我是可愛的分割線  ----------------------------

最后博主借地宣傳一下,漳州編程小組招新了,這是一個面向漳州青少年信息學/軟件設計的學習小組,有意向的同學點擊鏈接,聯系我吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM