Swoole和Redis實現的並發隊列處理系統


由於PHP不支持多線程,但是作為一個完善的系統,有很多操作都是需要異步完成的。為了完成這些異步操作,我們做了一個基於Redis隊列任務系統。

大家知道,一個消息隊列處理系統主要分為兩大部分:消費者和生產者。

在我們的系統中,主系統作為生產者,任務系統作為消費者。

 

具體的工作流程如下:

1、主系統將需要需要處理的任務名稱+任務參數push到隊列中。

2、任務系統實時的對任務隊列進行pop,pop出來一個任務就fork一個子進程,由子進程完成具體的任務邏輯。

 

具體代碼如下:

 1 /**
 2  * 啟動守護進程
 3  */
 4 public function runAction() {
 5     Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
 6     while (true) {
 7         $this->fork_process();
 8     }
 9     exit;
10 }
11 
12 /**
13  * 創建子進程
14  */
15 private function fork_process() {
16     $ppid = getmypid();
17     $pid = pcntl_fork();
18     if ($pid == 0) {//子進程
19         $pid = posix_getpid();
20         //echo "* Process {$pid} was created \n\n";
21         $this->mq_process();
22         exit;
23     } else {//主進程
24         $pid = pcntl_wait($status, WUNTRACED); //取得子進程結束狀態
25         if (pcntl_wifexited($status)) {
26             //echo "\n\n* Sub process: {$pid} exited with {$status}";
27             //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
28         } else {
29             Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
30         }
31     }
32 }
33 
34 /**
35  * 業務任務隊列處理
36  */
37 private function mq_process() {
38     $data_pop = $this->masterRedis->rPop($this->redis_list_key);
39     $data = json_decode($data_pop, 1);
40     if (!$data) {
41         return FALSE;
42     }
43     $worker = '_task_' . $data['worker'];
44     $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
45     $params = $data['params'];
46     $class = new $class_name();
47     $class->$worker($params);
48     return TRUE;
49 }

 

 

這是一個簡單的任務處理系統。

通過這個任務系統幫助我們實現了異步,到目前為止已經穩定運行了將近一年。

但很可惜,它是一個單進程的系統。它是一直在不斷的fork,如果有任務就處理,沒有任務就跳過。

這樣很穩定。

但問題有兩個:一是不斷地fork、pop會浪費服務器資源,二是不支持並發!

第一個問題還好,但第二個問題就很嚴重。

當主系統 同時 拋過來大量的任務時,任務的處理時間就會無限的拉長。

 

新的設計

為了解決並發的問題,我們計划做一個更加高效強壯的隊里處理系統。

因為在PHP7之前不支持多線程,所以我們采用多進程。

從網上找了不少資料,大多所謂的多進程都是N個進程同時在后台運行。

顯然這是不合適的。

我的預想是:每pop出一個任務就fork一個任務,任務執行完成后子進程結束。

 

遇到的問題

1、如何控制最大進程數

這個問題很簡單,那就是每fork一個子進程就自增一次。而當子進程執行完成就自減一次。

自增沒有問題,我們就在主進程中操作就完了。那么該如何自減呢?

可能你會說,當然是在子進程中啊。但這里你需要注意:當fork的時候是從主進程復制了一份資源給子進程,這就意味着你無法在子進程中操作主進程中的計數器!

所以,這里就需要了解一個知識點:信號。

具體的可以自行Google,這里直接看代碼。

1 // install signal handler for dead kids
2 pcntl_signal(SIGCHLD, array($this, "sig_handler"));

 

 

這就安裝了一個信號處理器。當然還缺少一點。

declare(ticks = 1);

 

 

declare是一個控制結構語句,具體的用法也請去Google。

這句代碼的意思就是每執行一條低級語句就調用一次信號處理器。

這樣,每當子進程結束的時候就會調用信號處理器,我們就可以在信號處理器中進行自減。

 

2、如何解決進程殘留

在多進程開發中,如果處理不當就會導致進程殘留。

為了解決進程殘留,必須得將子進程回收。

那么如何對子進程進行回收就是一個技術點了。

在pcntl的demo中,包括很多博文中都是說在主進程中回收子進程。

但我們是基於Redis的brpop的,而brpop是阻塞的。

這就導致一個問題:當執行N個任務之后,任務系統空閑的時候主進程是阻塞的,而在發生阻塞的時候子進程還在執行,所以就無法完成最后幾個子進程的進程回收。。。

這里本來一直很糾結,但當我將信號處理器搞定之后就也很簡單了。

進程回收也放到信號處理器中去。

 

新系統的評估

pcntl是一個進程處理的擴展,但很可惜它對多進程的支持非常乏力。

所以這里采用Swoole擴展中的Process。

具體代碼如下:

 1 declare(ticks = 1);
 2 class JobDaemonController extends Yaf_Controller_Abstract{
 3 
 4     use Trait_Redis;
 5 
 6     private $maxProcesses = 800;
 7     private $child;
 8     private $masterRedis;
 9     private $redis_task_wing = 'task:wing'; //待處理隊列
10 
11     public function init(){
12         // install signal handler for dead kids
13         pcntl_signal(SIGCHLD, array($this, "sig_handler"));
14         set_time_limit(0);
15         ini_set('default_socket_timeout', -1); //隊列處理不超時,解決redis報錯:read error on connection
16     }
17 
18     private function redis_client(){
19         $rds = new Redis();
20         $rds->connect('redis.master.host',6379);
21         return $rds;
22     }
23 
24     public function process(swoole_process $worker){// 第一個處理
25         $GLOBALS['worker'] = $worker;
26         swoole_event_add($worker->pipe, function($pipe) {
27             $worker = $GLOBALS['worker'];
28             $recv = $worker->read();            //send data to master
29 
30             sleep(rand(1, 3));
31             echo "From Master: $recv\n";
32             $worker->exit(0);
33         });
34         exit;
35     }
36 
37     public function testAction(){
38         for ($i = 0; $i < 10000; $i++){
39             $data = [
40                 'abc' => $i,
41                 'timestamp' => time().rand(100,999)
42             ];
43             $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
44         }
45         exit;
46     }
47 
48     public function runAction(){
49         while (1){
50 //            echo "\t now we de have $this->child child processes\n";
51             if ($this->child < $this->maxProcesses){
52                 $rds = $this->redis_client();
53                 $data_pop = $rds->brpop($this->redis_task_wing, 3);//無任務時,阻塞等待
54                 if (!$data_pop){
55                     continue;
56                 }
57                 echo "\t Starting new child | now we de have $this->child child processes\n";
58                 $this->child++;
59                 $process = new swoole_process([$this, 'process']);
60                 $process->write(json_encode($data_pop));
61                 $pid = $process->start();
62             }
63         }
64     }
65 
66     private function sig_handler($signo) {
67 //        echo "Recive: $signo \r\n";
68         switch ($signo) {
69             case SIGCHLD:
70                 while($ret = swoole_process::wait(false)) {
71 //                    echo "PID={$ret['pid']}\n";
72                     $this->child--;
73                 }
74         }
75     }
76 }

 

最終,經過測試,單核1G的服務器執行1到3秒的任務可以做到800的並發。

多PHPer在進階的時候總會遇到一些問題和瓶頸,業務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限於:分布式架構、高可擴展、高性能、高並發、服務器性能調優、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家,需要的加群(點擊→)677079770


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM