php模擬並發


 

原文: http://blog.csdn.net/zhang_xinglong/article/details/16339867

-------------------------------------------------------------------------------------------------------------------------------

並發請求理論描述:假設有一個client,程序邏輯是要請求三個不同的server,處理各自的響應。傳統模型當然是順序執行,先發送第一個請求,等待收到響應數據后再發送第二個請求,以此類推。就像是單核CPU,一次只能處理一件事,其他事情被暫時阻塞。而並發模式可以讓三個server同時處理各自請求,這就可以使大量時間復用。

畫個圖更好說明問題:

前者為阻塞模式,忽略請求響應等時間,總耗時為700ms;而后者非阻塞模式,由於三個請求可以同時得到處理,總耗時只有300ms。所謂阻塞方式block,顧名思義,就是進程或是線程執行到這些函數時必須等待某個事件的發生,如果事件沒有發生,進程或線程就被阻塞,函數不能立即返回。所謂非阻塞方式non-block,就是進程或線程執行此函數時不必非要等待事件的發生,一旦執行肯定返回,以返回值的不同來反映函數的執行情況,如果事件發生則與阻塞方式相同,若事件沒有發生則返回一個代碼來告知事件未發生,而進程或線程繼續執行,所以效率較高。

PHP本身是不支持多線程的,但是它可以利用Linux和apache的多線程能力。php模擬的多線程其實只是多進程,並不是真正的多線程。以下是幾種php模擬多線程的方法:
1.php+shell (利用linux os)

php代碼(test.php):

 

[php]  view plain  copy
 
  1. <?php  
  2. for($i = 0; $i < 10; $i++)  
  3. {  
  4.     echo $i;  
  5.     sleep(5); //這里為了方便看效果sleep一下讓腳本執行時間更長  
  6. }  
  7. ?>  

shell代碼(test.sh):

 

 

[plain]  view plain  copy
 
  1. #!/bin/bash  
  2. for i in 1 2 3 4 5  
  3. do  
  4.     /usr/bin/php -r -q test.php &  
  5. done  

注意:
在請求php代碼的那行末尾有一個&符號,這個是關鍵,不加的話是不能進行多線程的,&表示將服務推送到后台執行,因此在shell的每次的循環中不必等php的代碼全部執行完在請求下一個文件,而是同時進行的,這樣就實現了多線程,下面運行下shell看下效果,這里你將看到5個test.php進程,再利用linux的定時器,定時請求這個shell,在處理一些需要多線程的任務,例如,批量下載時,非常好用!
參考:http://blog.csdn.net/tianmohust/article/details/8208627

 

2.php+pcntl(利用linux os)
只能用在Unix Like OS,Windows不可用。且推薦僅僅在CLI模式運行,不要在WEB服務器環境運行。

 

[php]  view plain  copy
 
  1. <?php  
  2. declare(ticks=1);  
  3. //是否等待進程結束  
  4. $bWaitFlag = FALSE;  
  5. //進程總數  
  6. $intNum = 10;  
  7. //進程PID數組  
  8. $pids = array();  
  9. echo ("Start\n");  
  10. for($i = 0; $i < $intNum; $i++)  
  11. {  
  12.     //產生子進程,而且從當前行之下開試運行代碼,而且不繼承父進程的數據信息  
  13.     $pids[$i] = pcntl_fork();  
  14.     if( ! $pids[$i])  
  15.     {  
  16.         //子進程進程代碼段_Start  
  17.         $str = "";  
  18.         sleep(5+$i);  
  19.         for ($j = 0; $j < $i; $j++)  
  20.         {  
  21.             $str .= "*";  
  22.         }  
  23.         echo "$i -> " . time() . " $str \n";  
  24.         exit();  
  25.         //子進程進程代碼段_End  
  26.     }  
  27. }  
  28.   
  29. if ($bWaitFlag)  
  30. {  
  31.     for($i = 0; $i < $intNum; $i++)  
  32.     {  
  33.         pcntl_waitpid($pids[$i], $status, WUNTRACED);  
  34.         echo "wait $i -> " . time() . "\n";  
  35.     }  
  36. }  
  37. echo ("End\n");  
  38. ?>  

運行結果如下:

 

 

[plain]  view plain  copy
 
  1. [qiao@oicq qiao]$ php test.php          
  2. Start  
  3. End  
  4. [qiao@oicq qiao]$ ps -aux | grep "php"  
  5. qiao      32275   0.0   0.5 49668 6148 pts/1     S     14:03    0:00 /usr/local/php4/b  
  6. qiao      32276   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  7. qiao      32277   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  8. qiao      32278   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  9. qiao      32279   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  10. qiao      32280   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  11. qiao      32281   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  12. qiao      32282   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  13. qiao      32283   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  14. qiao      32284   0.0   0.5 49668 6152 pts/1     S     14:03    0:00 /usr/local/php4/b  
  15. qiao      32286   0.0   0.0   1620   600 pts/1     S     14:03    0:00 grep php  
  16. [qiao@oicq qiao]$ 0 -> 1133503401    
  17. 1 -> 1133503402 *  
  18. 2 -> 1133503403 **  
  19. 3 -> 1133503404 ***  
  20. 4 -> 1133503405 ****  
  21. 5 -> 1133503406 *****  
  22. 6 -> 1133503407 ******  
  23. 7 -> 1133503408 *******  
  24. 8 -> 1133503409 ********  
  25. 9 -> 1133503410 *********  
  26.   
  27. [qiao@oicq qiao]$  

如果$bWaitFlag=TURE,則結果如下: 

 

 

[plain]  view plain  copy
 
  1. [qiao@oicq qiao]$ php test.php          
  2. Start  
  3. 0 -> 1133503602    
  4. wait 0 -> 1133503602  
  5. 1 -> 1133503603 *  
  6. wait 1 -> 1133503603  
  7. 2 -> 1133503604 **  
  8. wait 2 -> 1133503604  
  9. 3 -> 1133503605 ***  
  10. wait 3 -> 1133503605  
  11. 4 -> 1133503606 ****  
  12. wait 4 -> 1133503606  
  13. 5 -> 1133503607 *****  
  14. wait 5 -> 1133503607  
  15. 6 -> 1133503608 ******  
  16. wait 6 -> 1133503608  
  17. 7 -> 1133503609 *******  
  18. wait 7 -> 1133503609  
  19. 8 -> 1133503610 ********  
  20. wait 8 -> 1133503610  
  21. 9 -> 1133503611 *********  
  22. wait 9 -> 1133503611  
  23. End  
  24. [qiao@oicq qiao]$   

從多進程的例子可以看出,使用pcntl_fork()之后,將生成一個子進程,而且子進程運行的代碼,從pcntl_fork()之后的代碼開始,而子進程不繼承父進程的數據信息(實際上是把父進程的數據做了一個全新的拷貝),因而使用if(!$pids[$i]) 來控制子進程實際運行的代碼段。
參考:http://hi.baidu.com/tangyubinsir/item/43c04f85ea7709d4d1f8cd84和http://www.itlearner.com/article/4908
3.php+pthreads
參考:http://blog.csdn.net/leinchu/article/details/11795985
4.php+socket(利用web server)
假設你要建立一個服務來檢查正在運行的n台服務器,以確定他們還在正常運轉。你可能會寫下面這樣的代碼:

[php]  view plain  copy
 
  1. <?php  
  2. $hosts = array("www.baidu.com", "www.sohu.com", "www.163.com");  
  3.   
  4. $timeout = 15;  
  5. $status = array();  
  6.   
  7. foreach ($hosts as $host)  
  8. {  
  9.     $errno = 0;  
  10.     $errstr = "";  
  11.     $s = fsockopen($host, 80, $errno, $errstr, $timeout);  
  12.     if ($s)  
  13.     {  
  14.         $status[$host] = "Connected\n";  
  15.         fwrite($s, "HEAD / HTTP/1.0\r\nHost: $host\r\n\r\n"); //第二個參數是HTTP協議中規定的請求頭,不明白的請看RFC中的定義  
  16.         do  
  17.         {  
  18.             $data = fread($s, 8192);  
  19.             if (strlen($data) == 0)  
  20.             {  
  21.                 break;  
  22.             }  
  23.             $status[$host] .= $data; //返回連接狀態  
  24.         }  
  25.         while (true);  
  26.         fclose($s);  
  27.     }  
  28.     else  
  29.     {  
  30.         $status[$host] = "Connection failed: $errno $errstrn";  
  31.     }  
  32. }  
  33. echo '<pre>';  
  34. print_r($status);  
  35. ?>  

它運行的很好,但是在fsockopen()分析完hostname並且建立一個成功的連接(或者延時$timeout秒)之前,擴充這段代碼來管理大量服務器將耗費很長時間。
因此我們必須放棄這段代碼;我們可以建立異步連接-不需要等待fsockopen返回連接狀態。PHP仍然需要解析hostname(所以直接使用ip更加明智),不過將在打開一個連接之后立刻返回,繼而我們就可以連接下一台服務器。
有兩種方法可以實現;PHP5中可以使用新增的stream_socket_client()函數直接替換掉fsocketopen()。PHP5之前的版本,你需要自己動手,用sockets擴展解決問題。
下面是PHP5中的解決方法:

 

 

[php]  view plain  copy
 
  1. <?php  
  2. $hosts = array("www.baidu.com", "www.sohu.com", "www.163.com");  
  3.   
  4. $timeout = 15;  
  5. $status = array();  
  6.   
  7.   
  8. $sockets = array();  
  9. /* Initiate connections to all the hosts simultaneously */  
  10. foreach ($hosts as $id => $host)   
  11. {  
  12.     $s = stream_socket_client(  
  13.             "$host:80", $errno, $errstr, $timeout,   
  14.             TREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);  
  15.     /* 這里需要稍微延遲一下,否則下面fwrite中的socket句柄不一定能真正使用 
  16.      * 這里應該是PHP的一處bug,查了一下,官方bug早在08年就有人提交了 
  17.      * 我的5.2.8中尚未解決,不知最新的5.3中是否修正 
  18.      */  
  19.     usleep(10);  
  20.     if ($s)   
  21.     {  
  22.         $sockets[$id] = $s;  
  23.         $status[$hosts[$id]] = "in progress";  
  24.     }  
  25.     else   
  26.     {    
  27.         $status[$hosts[$id]] = "failed, $errno $errstr";  
  28.     }  
  29. }  
  30.       
  31. /* Now, wait for the results to come back in */  
  32. while (count($sockets))   
  33. {  
  34.     $read = $write = $sockets;  
  35. //     $e = null;  
  36.     /* This is the magic function - explained below */  
  37.     $n = stream_select($read, $write, $e, $timeout);  
  38.     if ($n > 0) //據說stream_select返回值不總是可信任的    
  39. //     if (count($read))    
  40.     {  
  41.     /* readable sockets either have data for us, or are failed connection attempts */  
  42.         foreach ($read as $r)   
  43.         {  
  44.             /* stream_select generally shuffles $read, so we need to 
  45.              compute from which socket(s) we're reading. */  
  46.             $id = array_search($r, $sockets);  
  47.             $data = fread($r, 8192);  
  48.             /* A socket is readable either because it has 
  49.              data to read, OR because it's at EOF. */  
  50.             if (strlen($data) == 0)   
  51.             {  
  52.                 if ($status[$hosts[$id]] == "in progress")   
  53.                 {  
  54.                     $status[$hosts[$id]] = "failed to connect";  
  55.                 }  
  56.                 fclose($r);  
  57.                 unset($sockets[$id]);  
  58.             }  
  59.             else   
  60.             {  
  61.                     $status[$hosts[$id]] = $data;  
  62.             }  
  63.         }  
  64.           
  65.         /* writeable sockets can accept an HTTP request */  
  66.         foreach ($write as $w)   
  67.         {  
  68.                 $id = array_search($w, $sockets);  
  69.                 if(is_resource($w) && feof($w) === FALSE)  
  70.                 {  
  71.                     @fwrite($w, "HEAD / HTTP/1.0\r\nHost: " . $hosts[$id] .  "\r\n\r\n");  
  72. //                     $flag && $status[$hosts[$id]] = "waiting for response";  
  73.                 }  
  74.         }  
  75.     }  
  76.     else   
  77.     {  
  78.             /* timed out waiting; assume that all hosts associated with $sockets are faulty */  
  79.             foreach ($sockets as $id => $s)   
  80.             {  
  81.                 $status[$hosts[$id]] = "timed out " . $status[$hosts[$id]];  
  82.             }  
  83.             break;  
  84.     }  
  85. }  
  86.       
  87. echo '<pre>';var_dump($status);  
  88. ?>  

我們用stream_select()等待sockets打開的連接事件。stream_select()調用系統的select()函數來工作:前面三個參數是你要使用的streams的數組;你可以對其讀取,寫入和獲取異常(分別針對三個參數)。stream_select()可以通過設置$timeout(秒)參數來等待事件發生-事件發生時,相應的sockets數據將寫入你傳入的參數。
下面是PHP4.1.0之后版本的實現,如果你已經在編譯PHP時包含了sockets(ext/sockets)支持,你可以使用根上面類似的代 碼,只是需要將上面的streams/filesystem函數的功能用ext/sockets函數實現。主要的不同在於我們用下面的函數代替 stream_socket_client()來建立連接:

 

 

[php]  view plain  copy
 
  1. <?php  
  2. // This value is correct for Linux, other systems have other values   
  3. define('EINPROGRESS', 115);    
  4. function non_blocking_connect($host, $port, &$errno, &$errstr, $timeout) {     
  5.         $ip = gethostbyname($host);     
  6.         $s = socket_create(AF_INET, SOCK_STREAM, 0);     
  7.         if (socket_set_nonblock($s)) {      
  8.            $r = @socket_connect($s, $ip, $port);      
  9.            if ($r || socket_last_error() == EINPROGRESS) {       
  10.                   $errno = EINPROGRESS;       
  11.                   return $s;      
  12.                }     
  13.          }     
  14.         $errno = socket_last_error($s);     
  15.         $errstr = socket_strerror($errno);     
  16.         socket_close($s);     
  17.         return false;    
  18. }    
  19. ?>  


現在用socket_select()替換掉stream_select(),用socket_read()替換掉fread(),用socket_write()替換掉fwrite(),用socket_close()替換掉fclose()就可以執行腳本了! PHP5的先進之處在於,你可以用stream_select()處理幾乎所有的stream。例如你可以通過include STDIN用它接收鍵盤輸入並保存進數組,你還可以接收通過proc_open()打開的管道中的數據。
注:select在socket編程中還是比較重要的,可是對於初學socket的人來說都不太愛用select寫程序,他們只是習慣寫諸如connect、 accept、recv或recvfrom這樣的阻塞程序。可是使用select就可以完成非阻塞方式工作的程序,它能夠監視我們需要監視的文件描述符的變化情況——讀寫或是異常。
參考:http://blog.csdn.net/21aspnet/article/details/7420024
5.php+curl
(1)經典curl並發機制和存在問題
經典的cURL實現機制在網上很容易找到, 比如參考PHP在線手冊的如下實現方式:

[php]  view plain  copy
 
  1. <?php  
  2. function classic_curl($urls, $delay) {  
  3.     $queue = curl_multi_init();  
  4.     $map = array();  
  5.   
  6.     foreach ($urls as $url) {  
  7.         // create cURL resources  
  8.         $ch = curl_init();  
  9.   
  10.         // set URL and other appropriate options  
  11.         curl_setopt($ch, CURLOPT_URL, $url);  
  12.   
  13.         curl_setopt($ch, CURLOPT_TIMEOUT, 1);  
  14.         curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);  
  15.         curl_setopt($ch, CURLOPT_HEADER, 0);  
  16.         curl_setopt($ch, CURLOPT_NOSIGNAL, true);  
  17.   
  18.         // add handle  
  19.         curl_multi_add_handle($queue, $ch);  
  20.         $map[$url] = $ch;  
  21.     }  
  22.   
  23.     $active = null;  
  24.   
  25.     // execute the handles  
  26.     do {  
  27.         $mrc = curl_multi_exec($queue, $active);  
  28.     } while ($mrc == CURLM_CALL_MULTI_PERFORM);  
  29.   
  30.     while ($active > 0 && $mrc == CURLM_OK) {  
  31.         if (curl_multi_select($queue, 0.5) != -1) {  
  32.             do {  
  33.                 $mrc = curl_multi_exec($queue, $active);  
  34.             } while ($mrc == CURLM_CALL_MULTI_PERFORM);  
  35.         }  
  36.     }  
  37.   
  38.     $responses = array();  
  39.     foreach ($map as $url=>$ch) {  
  40.         $responses[$url] = callback(curl_multi_getcontent($ch), $delay);  
  41.         curl_multi_remove_handle($queue, $ch);  
  42.         curl_close($ch);  
  43.     }  
  44.   
  45.     curl_multi_close($queue);  
  46.     return $responses;  
  47. }  
  48. ?>  

首先將所有的URL壓入並發隊列, 然后執行並發過程, 等待所有請求接收完之后進行數據的解析等后續處理. 在實際的處理過程中, 受網絡傳輸的影響, 部分URL的內容會優先於其他URL返回, 但是經典cURL並發必須等待最慢的那個URL返回之后才開始處理, 等待也就意味着CPU的空閑和浪費. 如果URL隊列很短, 這種空閑和浪費還處在可接受的范圍, 但如果隊列很長, 這種等待和浪費將變得不可接受.
(2)改進的rolling curl並發方式
仔細分析不難發現經典cURL並發還存在優化的空間, 優化的方式時當某個URL請求完畢之后盡可能快的去處理它, 邊處理邊等待其他的URL返回, 而不是等待那個最慢的接口返回之后才開始處理等工作, 從而避免CPU的空閑和浪費. 閑話不多說, 下面貼上具體的實現:

 

 

[php]  view plain  copy
 
  1. <?php  
  2. function rolling_curl($urls, $delay) {  
  3.     $queue = curl_multi_init();  
  4.     $map = array();  
  5.   
  6.     foreach ($urls as $url) {  
  7.         $ch = curl_init();  
  8.   
  9.         curl_setopt($ch, CURLOPT_URL, $url);  
  10.         curl_setopt($ch, CURLOPT_TIMEOUT, 1);  
  11.         curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);  
  12.         curl_setopt($ch, CURLOPT_HEADER, 0);  
  13.         curl_setopt($ch, CURLOPT_NOSIGNAL, true);  
  14.   
  15.         curl_multi_add_handle($queue, $ch);  
  16.         $map[(string) $ch] = $url;  
  17.     }  
  18.   
  19.     $responses = array();  
  20.     do {  
  21.         while (($code = curl_multi_exec($queue, $active)) == CURLM_CALL_MULTI_PERFORM) ;  
  22.   
  23.         if ($code != CURLM_OK) { break; }  
  24.   
  25.         // a request was just completed -- find out which one  
  26.         while ($done = curl_multi_info_read($queue)) {  
  27.   
  28.             // get the info and content returned on the request  
  29.             $info = curl_getinfo($done['handle']);  
  30.             $error = curl_error($done['handle']);  
  31.             $results = callback(curl_multi_getcontent($done['handle']), $delay);  
  32.             $responses[$map[(string) $done['handle']]] = compact('info', 'error', 'results');  
  33.   
  34.             // remove the curl handle that just completed  
  35.             curl_multi_remove_handle($queue, $done['handle']);  
  36.             curl_close($done['handle']);  
  37.         }  
  38.   
  39.         // Block for data in / output; error handling is done by curl_multi_exec  
  40.         if ($active > 0) {  
  41.             curl_multi_select($queue, 0.5);  
  42.         }  
  43.   
  44.     } while ($active);  
  45.   
  46.     curl_multi_close($queue);  
  47.     return $responses;  
  48. }  
  49. ?>  

(3)兩種並發實現的性能對比
性能測試中用到的回調函數為:

 

 

[php]  view plain  copy
 
  1. function callback($data, $delay) {  
  2.     preg_match_all('/<h3>(.+)<\/h3>/iU', $data, $matches);  
  3.     usleep($delay);  
  4.     return compact('data', 'matches');  
  5. }  

數據處理回調無延遲時: Rolling Curl略優, 但性能提升效果不明顯.數據處理回調延遲5毫秒: Rolling Curl完勝, 性能提升40%左右.通過上面的性能對比, 在處理URL隊列並發的應用場景中Rolling cURL應該是更加的選擇, 並發量非常大(1000+)時, 可以控制並發隊列的最大長度, 比如20, 每當1個URL返回並處理完畢之后立即加入1個尚未請求的URL到隊列中, 這樣寫出來的代碼會更加健壯, 不至於並發數太大而卡死或崩潰.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM