概述
這是關於 Swoole 學習的第二篇文章:Swoole Task 的應用。
Swoole 異步Task,主要實現調用異步任務的執行。
常用的場景:異步支付處理、異步訂單處理、異步日志處理、異步發送郵件/短信等。
Swoole 的實現方式是 worker 進程處理數據請求,分配給 task 進程執行。
官方介紹:
task 底層使用Unix Socket管道通信,是全內存的,沒有IO消耗。單進程讀寫性能可達100萬/s,不同的進程使用不同的管道通信,可以最大化利用多核。
本地版本:PHP 7.2.6、Swoole 4.3.1。
不多說,先看效果圖:
代碼
server.php
<?php
class Server
{
private $serv;
public function __construct() {
$this->serv = new swoole_server('0.0.0.0', 9501);
$this->serv->set([
'worker_num' => 2, //開啟2個worker進程
'max_request' => 4, //每個worker進程 max_request設置為4次
'task_worker_num' => 4, //開啟4個task進程
'dispatch_mode' => 2, //數據包分發策略 - 固定模式
]);
$this->serv->on('Start', [$this, 'onStart']);
$this->serv->on('Connect', [$this, 'onConnect']);
$this->serv->on("Receive", [$this, 'onReceive']);
$this->serv->on("Close", [$this, 'onClose']);
$this->serv->on("Task", [$this, 'onTask']);
$this->serv->on("Finish", [$this, 'onFinish']);
$this->serv->start();
}
public function onStart($serv) {
echo "#### onStart ####".PHP_EOL;
echo "SWOOLE ".SWOOLE_VERSION . " 服務已啟動".PHP_EOL;
echo "master_pid: {$serv->master_pid}".PHP_EOL;
echo "manager_pid: {$serv->manager_pid}".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onConnect($serv, $fd) {
echo "#### onConnect ####".PHP_EOL;
echo "客戶端:".$fd." 已連接".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onReceive($serv, $fd, $from_id, $data) {
echo "#### onReceive ####".PHP_EOL;
echo "worker_pid: {$serv->worker_pid}".PHP_EOL;
echo "客戶端:{$fd} 發來的Email:{$data}".PHP_EOL;
$param = [
'fd' => $fd,
'email' => $data
];
$rs = $serv->task(json_encode($param));
if ($rs === false) {
echo "任務分配失敗 Task ".$rs.PHP_EOL;
} else {
echo "任務分配成功 Task ".$rs.PHP_EOL;
}
echo "########".PHP_EOL.PHP_EOL;
}
public function onTask($serv, $task_id, $from_id, $data) {
echo "#### onTask ####".PHP_EOL;
echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;
//業務代碼
for($i = 1 ; $i <= 5 ; $i ++ ) {
sleep(2);
echo "Task {$task_id} 已完成了 {$i}/5 的任務".PHP_EOL;
}
$data_arr = json_decode($data, true);
$serv->send($data_arr['fd'] , 'Email:'.$data_arr['email'].',發送成功');
$serv->finish($data);
echo "########".PHP_EOL.PHP_EOL;
}
public function onFinish($serv,$task_id, $data) {
echo "#### onFinish ####".PHP_EOL;
echo "Task {$task_id} 已完成".PHP_EOL;
echo "########".PHP_EOL.PHP_EOL;
}
public function onClose($serv, $fd) {
echo "Client Close.".PHP_EOL;
}
}
$server = new Server();
client.php
<?php
class Client
{
private $client;
public function __construct() {
$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on('Connect', [$this, 'onConnect']);
$this->client->on('Receive', [$this, 'onReceive']);
$this->client->on('Close', [$this, 'onClose']);
$this->client->on('Error', [$this, 'onError']);
}
public function connect() {
if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
return;
}
}
public function onConnect($cli) {
fwrite(STDOUT, "輸入Email:");
swoole_event_add(STDIN, function() {
fwrite(STDOUT, "輸入Email:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
public function onReceive($cli, $data) {
echo PHP_EOL."Received: ".$data.PHP_EOL;
}
public function send($data) {
$this->client->send($data);
}
public function onClose($cli) {
echo "Client close connection".PHP_EOL;
}
public function onError() {
}
}
$client = new Client();
$client->connect();
小結
一、上面的配置總共開啟了幾個進程?
總共8個進程(1個master進程、1個manager進程、4個task進程、2個worker進程)
重新運行的可能與上圖進程號不一致:
master進程:22481
manager進程:22485
task進程:22488、22489、22490、22491
worker進程:22492、22493
參考官方提供的進程圖:
二、為什么執行了5次后,worker進程號發生了改變?
因為我們設了置worker進程的max_request=4,一個worker進程在完成最大請求次數任務后將自動退出,進程退出會釋放所有的內存和資源,這樣的機制主要是解決PHP進程內存溢出的問題。
三、當task執行任務異常,我們kill一個task進程,會再新增一個嗎?
會。
四、如何設置 task_worker_num ?
最大值不得超過 SWOOLE_CPU_NUM * 1000。
查看本機 CPU 核數:
echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;
根據項目的任務量決定的,比如:1秒會產生200個任務,執行每個任務需要500ms。
想在1s中執行完成200個任務,需要100個task進程。
100 = 200/(1/0.5)
五、如何設置 worker_num ?
默認設置為本機的CPU核數,最大不得超過 SWOOLE_CPU_NUM * 1000。
比如:1個請求耗時10ms,要提供1000QPS的處理能力,那就必須配置10個進程。
10 = 0.01*1000
假設每個進程占用40M內存,10個進程就需要占用400M的內存。
擴展
- Server->taskwait
- Server->taskWaitMulti
- Server->taskCo
參考文檔
本文歡迎轉發,轉發請注明作者和出處,謝謝!