前幾天公司有個業務需求,要求接收到網易考拉的推送數據並批量讀取刪除XML文件給到指定目錄下,與海關清關接口對接。(海關接口是以讀取XML文件獲取數據,好過時的技術...)。
不廢話先上我的思路
1,獲取海關指定文件夾內所有xml文件
2,根據服務器配置計算出每個php處理n個xml文件所需cpu以及內存開銷
3,根據進程數量用算法計算每個進程需要處理多少個xml文件以及開啟多少個進程
4,主進程等待子進程執行結束后才能退出
我遇到的難點
1,再對最大進程開啟數預測后得出了一個值 例如70個進程,那么怎樣才能將這些xml文件分配到指定進程上不會引起不同進程處理同一個文件呢,我的代碼實現為
1 //計算進程啟動數量以及每個進程執行的文件操作 2 $fl_array = array(); 3 4 //將數據從新轉化 5 $new_array_list = array(); 6 foreach ($new_array as $k => $v){ 7 array_push($new_array_list,$v); 8 } 9 10 //每個進程的文件操作數量 11 $fl = floor($array_count/$this->pro)+1; 12 //第一步循環最大進程數 13 $c = 0; 14 15 for ($i=0;$i<$this->pro;$i++){ 16 //第二步每個進程的文件操作數量 17 for ($j=0;$j<$fl;$j++){ 18 if(isset($new_array_list[$c])){ 19 $fl_array[$i][] = $new_array_list[$c]; 20 } 21 $c++; 22 } 23 }
其中數組$new_array_list為所有xml文件的文件名的數組最終得到的結果是一個二維數組,每個數組內有若干個xml文件名,這樣就為了之后的循環開啟子進程做了准備
2,fork子進程
foreach ($fl_array as $k =>$v){ $nPID = pcntl_fork(); // 創建子進程 if($nPID == 0){ try{ Yii::app()->db->createCommand( " SET AUTOCOMMIT=0; BEGIN WORK; " )->execute(); foreach ($fl_array[$k] as $k2=>$v2){ //處理業務邏輯 $filed = array(); $sql_item_select = " /*master*/SELECT `custemstates` FROM t_rocord_head WHERE `copno`='".$v2['copNo']."';"; $item = yii::app()->db->createCommand($sql_item_select)->queryRow(); if($v2['returnStatus'] > $item['custemstates']){ if(isset($v2['invtNo'])) array_push($filed," `invtno` = '".$v2['invtNo']."' "); //清單編號 if(isset($v2['returnStatus'])) array_push($filed," `custemstates` = '".$v2['returnStatus']."' "); //海關回執狀態編碼 if(isset($v2['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v2['selfcusflag']."' "); //海關回執狀態編碼 if(isset($v2['returnTime'])) array_push($filed," `custemreturntime` = '".$v2['returnTime']."' "); //海關回執時間 if(isset($v2['returnInfo'])) array_push($filed," `custemmessage` = '".$v2['returnInfo']."' "); //海關回執消息 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 array_push($filed," `updateuser` = 'system' "); //更新時間 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v2['copNo']."' and (`delflag` <> '1' OR `delflag` is null)"; yii::app()->db->createCommand($sql_update)->execute(); }else if($v2['returnStatus'] < '0' && $item['custemstates'] == '2'){ if(isset($v2['invtNo'])) array_push($filed," `invtno` = '".$v2['invtNo']."' "); //清單編號 if(isset($v2['returnStatus'])) array_push($filed," `custemstates` = '".$v2['returnStatus']."' "); //海關回執狀態編碼 if(isset($v2['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v2['selfcusflag']."' "); //海關回執狀態編碼 if(isset($v2['returnTime'])) array_push($filed," `custemreturntime` = '".$v2['returnTime']."' "); //海關回執時間 if(isset($v2['returnInfo'])) array_push($filed," `custemmessage` = '".$v2['returnInfo']."' "); //海關回執消息 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 array_push($filed," `updateuser` = 'system' "); //更新時間 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v2['copNo']."' and (`delflag` <> '1' OR `delflag` is null);"; yii::app()->db->createCommand($sql_update)->execute(); } } yii::app()->db->createCommand( "COMMIT WORK;" )->execute(); }catch (Exception $e){ echo "子進程錯誤"; exec("kill -9 ".$masterpid.""); exit(); } $oW = fopen($this->sPipePath, 'w'); fwrite($oW, $k."\n"); // 當前任務處理完比,在管道中寫入數據 fclose($oW); exit(0); // 執行完后退出 } }
其中 exec("kill -9 ".$masterpid.""); 是為了避免子進程異常后無法被退出造成的僵屍進程產生
3,子進程執行完畢,主進程退出
1 // 父進程 2 $oR = fopen($this->sPipePath, 'r'); 3 stream_set_blocking($oR, false); // 將管道設置為非堵塞,用於適應超時機制 4 $sData = ''; // 存放管道中的數據 5 $nLine = 0; 6 $nStart = time(); 7 while ($nLine < count($fl_array) && (time() - $nStart) < $this->timeout) { 8 $sLine = fread($oR, 1024); 9 if (empty($sLine)) { 10 continue; 11 } 12 13 //echo "current line: {$sLine}\n"; 14 // 用於分析多少任務處理完畢,通過‘\n’標識 15 foreach(str_split($sLine) as $c) { 16 if ("\n" == $c) { 17 ++$nLine; 18 } 19 } 20 $sData .= $sLine; 21 } 22 //echo "Final line count:$nLine\n"; 23 fclose($oR); 24 unlink($this->sPipePath); // 刪除管道,已經沒有作用了 25 26 // 等待子進程執行完畢,避免僵屍進程 27 $n = 0; 28 while ($n < count($fl_array)) { 29 $nStatus = -1; 30 $nPID = pcntl_wait($nStatus, WNOHANG); 31 if ($nPID > 0) { 32 //echo "{$nPID} exit\n"; 33 ++$n; 34 } 35 } 36 37 // 驗證結果,主要查看結果中是否每個任務都完成了 38 $arr2 = array(); 39 foreach(explode("\n", $sData) as $i) {// trim all 40 if (is_numeric(trim($i))) { 41 array_push($arr2, $i); 42 } 43 } 44 $arr2 = array_unique($arr2); 45 46 if ( count($arr2) == count($fl_array)) { 47 48 echo "ok".date('Y-m-d h:i:s'); 49 exit(); 50 } else { 51 echo "error count " . count($arr2) . date('Y-m-d h:i:s')."\n"; 52 exit(); 53 }
就這樣將需求搞定!
下面為代碼全貌,如有不正確的地方,請指出
1 <?php 2 /** 3 * Created by PhpStorm. 4 * User: Administrator 5 * Date: 2018/6/15 6 * Time: 9:06 7 */ 8 class RemoveXmlCommand extends CConsoleCommand{ 9 10 // /home/wwwroot/192.168.1.126/qgsb_cms/qgsb_cms/protected/yiic removexml run 11 private $config; 12 //文件存儲模式 1,liunx 2,windows 13 private $storage_model; 14 15 //文件存儲路徑 16 private $storage_in_path_liunx; 17 18 //進程管道文件路徑 19 private $piRealPath; 20 21 private $storage_in_path_windows; 22 23 private $storage_real_path; 24 25 //開啟處理進程個數 26 private $pro; 27 //超時設置 28 private $timeout; //毫秒級 29 //管道 30 private $sPipePath; 31 32 //海關狀態編碼優先級配置 33 private $returnStatus = [ 34 '2'=>[ 35 'field'=>'00', 36 'level'=>'1' 37 ], 38 // '0'=>[ 39 // 'field'=>'01', 40 // 'level'=>'2' 41 // ], 42 '120'=>[ 43 'field'=>'02', 44 'level'=>'3' 45 ], 46 '300'=>[ 47 'field'=>'03', 48 'level'=>'4' 49 ], 50 '500'=>[ 51 'field'=>'04', 52 'level'=>'5' 53 ], 54 '700'=>[ 55 'field'=>'05', 56 'level'=>'6' 57 ], 58 '800'=>[ 59 'field'=>'06', 60 'level'=>'7' 61 ], 62 ]; 63 64 65 function __construct() { 66 $this->config = require_once(Yii::app()->basePath.'/config/customs.php'); 67 68 $this->storage_model = $this->config['remove_xml']['storage_model']; 69 $this->storage_in_path_liunx = $this->config['remove_xml']['storage_in_path_liunx']; 70 $this->piRealPath = $this->config['remove_xml']['piRealPath']; 71 $this->storage_in_path_windows = $this->config['remove_xml']['storage_in_path_windows']; 72 $this->pro = $this->config['remove_xml']['pro']; 73 $this->timeout = $this->config['remove_xml']['timeout']; 74 75 $this->storage_real_path = $this->_systemType($this->storage_model); 76 } 77 78 private function _systemType($model){ 79 switch ($model){ 80 case 1: 81 $this->storage_real_path = $this->storage_in_path_liunx; 82 break; 83 case 2: 84 $this->storage_real_path = $this->storage_in_path_windows; 85 break; 86 default: 87 return; 88 } 89 return $this->storage_real_path; 90 } 91 92 93 public function actionRunpro(){ 94 //拿到文件列表並進行數據處理拼接 95 $xmlFlieArray = $this->_getXmlList($this->storage_real_path); 96 97 //取得前2000位數組為了避免cpu爆炸 98 $xmlFlieArray = array_slice($xmlFlieArray,0,2000); 99 100 101 //初始化處理后的數據 102 $makeFlieArry = array(); 103 if(!empty($xmlFlieArray)){ 104 foreach ($xmlFlieArray as $k => $v){ 105 $xml = simplexml_load_file($this->storage_real_path.$v,'SimpleXMLElement', LIBXML_NOCDATA); 106 $jsonStr = json_encode($xml); 107 $jsonArray = json_decode($jsonStr,true); 108 $jsonArray['InventoryReturn']['filename'] = $v; 109 array_push($makeFlieArry,$jsonArray['InventoryReturn']); 110 } 111 }else{ 112 echo "沒有可刪除的xml文件".date('y-m-d h:i:s'); 113 exit(); 114 } 115 116 117 foreach ($makeFlieArry as $k => $v){ 118 if($v['returnStatus'] == "CIQ101"){ 119 unset($makeFlieArry[$k]); 120 } 121 } 122 123 foreach ($makeFlieArry as $k => $v){ 124 //如果不是一個文件的情況 125 if(isset($makeFlieArry[$k]) && isset($makeFlieArry[$k+1])){ 126 if($makeFlieArry[$k]['copNo'] == $makeFlieArry[$k+1]['copNo']){ 127 //開啟邏輯判斷優先級 128 if(($makeFlieArry[$k+1]['returnStatus'] > $makeFlieArry[$k]['returnStatus']) || ($makeFlieArry[$k+1]['returnStatus'] < '0' && $makeFlieArry[$k]['returnStatus'] == '2')){ 129 unset($makeFlieArry[$k]); 130 }else{ 131 unset($makeFlieArry[$k+1]); 132 } 133 134 } 135 } 136 137 } 138 139 $new_array = $makeFlieArry; 140 141 142 foreach ($new_array as $k => $v){ 143 if($v['returnStatus'] > '0'){ 144 $new_array[$k]['selfcusflag'] = $this->returnStatus[$v['returnStatus']]['field']; 145 }else{ 146 $new_array[$k]['selfcusflag'] = '01'; 147 } 148 } 149 150 //更新數據庫並刪除文件 151 if(!empty($new_array)){ 152 153 $array_count = count($new_array); 154 155 if($this->pro > $array_count && $array_count>0){ 156 //當文件小於進程數 可以直接用單進程跑任務 157 158 try{ 159 Yii::app()->db->createCommand( " SET AUTOCOMMIT=0; BEGIN WORK; " )->execute(); 160 161 foreach ($new_array as $k =>$v){ 162 $filed = array(); 163 $sql_item_select = " /*master*/SELECT `custemstates` FROM t_rocord_head WHERE `copno`='".$v['copNo']."';"; 164 $item = yii::app()->db->createCommand($sql_item_select)->queryRow(); 165 166 if($v['returnStatus'] > $item['custemstates']){ 167 if(isset($v['invtNo'])) array_push($filed," `invtno` = '".$v['invtNo']."' "); //清單編號 168 if(isset($v['returnStatus'])) array_push($filed," `custemstates` = '".$v['returnStatus']."' "); //海關回執狀態編碼 169 if(isset($v['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v['selfcusflag']."' "); //海關回執狀態編碼 170 if(isset($v['returnTime'])) array_push($filed," `custemreturntime` = '".$v['returnTime']."' "); //海關回執時間 171 if(isset($v['returnInfo'])) array_push($filed," `custemmessage` = '".$v['returnInfo']."' "); //海關回執消息 172 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 173 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 174 array_push($filed," `updateuser` = 'system' "); //更新時間 175 176 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v['copNo']."' and (`delflag` <> '1' OR `delflag` is null);"; 177 178 yii::app()->db->createCommand($sql_update)->execute(); 179 180 181 }else if($v['returnStatus'] < '0' && $item['custemstates'] == '2'){ 182 183 if(isset($v['invtNo'])) array_push($filed," `invtno` = '".$v['invtNo']."' "); //清單編號 184 if(isset($v['returnStatus'])) array_push($filed," `custemstates` = '".$v['returnStatus']."' "); //海關回執狀態編碼 185 if(isset($v['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v['selfcusflag']."' "); //海關回執狀態編碼 186 if(isset($v['returnTime'])) array_push($filed," `custemreturntime` = '".$v['returnTime']."' "); //海關回執時間 187 if(isset($v['returnInfo'])) array_push($filed," `custemmessage` = '".$v['returnInfo']."' "); //海關回執消息 188 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 189 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 190 array_push($filed," `updateuser` = 'system' "); //更新時間 191 192 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v['copNo']."' and (`delflag` <> '1' OR `delflag` is null);"; 193 yii::app()->db->createCommand($sql_update)->execute(); 194 195 } 196 } 197 198 yii::app()->db->createCommand( "COMMIT WORK;" )->execute(); 199 200 }catch (Exception $e){ 201 echo "更新數據庫並刪除文件步驟錯誤".date('y-m-d h:i:s'); 202 exit(); 203 } 204 205 206 207 foreach ($xmlFlieArray as $k => $v){ 208 unlink($this->storage_real_path.$v); 209 } 210 211 echo "成功刪除文件並且更新數據庫!".date('y-m-d h:i:s'); 212 exit(); 213 }else{ 214 //開啟多進程模式 215 //檢測pcntl_fork擴展是否開啟了 216 if (!function_exists('pcntl_fork')) { 217 die("pcntl_fork not existing"); 218 } 219 220 if (!function_exists('exec')) { 221 die("exec not existing"); 222 } 223 224 //當前主進程號 225 $masterpid = posix_getpid(); 226 //創建管道 227 $this->sPipePath = $this->piRealPath."my_pipe.".$masterpid; 228 229 if (!posix_mkfifo($this->sPipePath, 0666)) { 230 die("create pipe {$this->sPipePath} error"); 231 } 232 //計算進程啟動數量以及每個進程執行的文件操作 233 $fl_array = array(); 234 235 //將數據從新轉化 236 $new_array_list = array(); 237 foreach ($new_array as $k => $v){ 238 array_push($new_array_list,$v); 239 } 240 241 //每個進程的文件操作數量 242 $fl = floor($array_count/$this->pro)+1; 243 //第一步循環最大進程數 244 $c = 0; 245 246 for ($i=0;$i<$this->pro;$i++){ 247 //第二步每個進程的文件操作數量 248 for ($j=0;$j<$fl;$j++){ 249 if(isset($new_array_list[$c])){ 250 $fl_array[$i][] = $new_array_list[$c]; 251 } 252 $c++; 253 } 254 } 255 256 foreach ($fl_array as $k =>$v){ 257 $nPID = pcntl_fork(); // 創建子進程 258 259 if($nPID == 0){ 260 try{ 261 Yii::app()->db->createCommand( " SET AUTOCOMMIT=0; BEGIN WORK; " )->execute(); 262 foreach ($fl_array[$k] as $k2=>$v2){ 263 //處理業務邏輯 264 $filed = array(); 265 266 $sql_item_select = " /*master*/SELECT `custemstates` FROM t_rocord_head WHERE `copno`='".$v2['copNo']."';"; 267 $item = yii::app()->db->createCommand($sql_item_select)->queryRow(); 268 269 270 271 if($v2['returnStatus'] > $item['custemstates']){ 272 273 if(isset($v2['invtNo'])) array_push($filed," `invtno` = '".$v2['invtNo']."' "); //清單編號 274 if(isset($v2['returnStatus'])) array_push($filed," `custemstates` = '".$v2['returnStatus']."' "); //海關回執狀態編碼 275 if(isset($v2['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v2['selfcusflag']."' "); //海關回執狀態編碼 276 if(isset($v2['returnTime'])) array_push($filed," `custemreturntime` = '".$v2['returnTime']."' "); //海關回執時間 277 if(isset($v2['returnInfo'])) array_push($filed," `custemmessage` = '".$v2['returnInfo']."' "); //海關回執消息 278 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 279 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 280 array_push($filed," `updateuser` = 'system' "); //更新時間 281 282 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v2['copNo']."' and (`delflag` <> '1' OR `delflag` is null)"; 283 284 285 286 yii::app()->db->createCommand($sql_update)->execute(); 287 288 }else if($v2['returnStatus'] < '0' && $item['custemstates'] == '2'){ 289 290 if(isset($v2['invtNo'])) array_push($filed," `invtno` = '".$v2['invtNo']."' "); //清單編號 291 if(isset($v2['returnStatus'])) array_push($filed," `custemstates` = '".$v2['returnStatus']."' "); //海關回執狀態編碼 292 if(isset($v2['selfcusflag'])) array_push($filed," `selfcusflag` = '".$v2['selfcusflag']."' "); //海關回執狀態編碼 293 if(isset($v2['returnTime'])) array_push($filed," `custemreturntime` = '".$v2['returnTime']."' "); //海關回執時間 294 if(isset($v2['returnInfo'])) array_push($filed," `custemmessage` = '".$v2['returnInfo']."' "); //海關回執消息 295 array_push($filed," `sendflag` = '0' "); //是否發送給客戶 296 array_push($filed," `updatedate` = '".date('Y-m-d h:i:s')."' "); //更新時間 297 array_push($filed," `updateuser` = 'system' "); //更新時間 298 299 $sql_update = " /*master*/UPDATE t_rocord_head SET ".implode(",",$filed)." WHERE `copno` = '".$v2['copNo']."' and (`delflag` <> '1' OR `delflag` is null);"; 300 yii::app()->db->createCommand($sql_update)->execute(); 301 302 303 } 304 } 305 306 yii::app()->db->createCommand( "COMMIT WORK;" )->execute(); 307 }catch (Exception $e){ 308 echo "子進程錯誤"; 309 exec("kill -9 ".$masterpid.""); 310 exit(); 311 } 312 313 $oW = fopen($this->sPipePath, 'w'); 314 fwrite($oW, $k."\n"); // 當前任務處理完比,在管道中寫入數據 315 fclose($oW); 316 exit(0); // 執行完后退出 317 } 318 } 319 320 foreach ($xmlFlieArray as $k => $v){ 321 unlink($this->storage_real_path.$v); 322 } 323 // 父進程 324 $oR = fopen($this->sPipePath, 'r'); 325 stream_set_blocking($oR, false); // 將管道設置為非堵塞,用於適應超時機制 326 $sData = ''; // 存放管道中的數據 327 $nLine = 0; 328 $nStart = time(); 329 while ($nLine < count($fl_array) && (time() - $nStart) < $this->timeout) { 330 $sLine = fread($oR, 1024); 331 if (empty($sLine)) { 332 continue; 333 } 334 335 //echo "current line: {$sLine}\n"; 336 // 用於分析多少任務處理完畢,通過‘\n’標識 337 foreach(str_split($sLine) as $c) { 338 if ("\n" == $c) { 339 ++$nLine; 340 } 341 } 342 $sData .= $sLine; 343 } 344 //echo "Final line count:$nLine\n"; 345 fclose($oR); 346 unlink($this->sPipePath); // 刪除管道,已經沒有作用了 347 348 // 等待子進程執行完畢,避免僵屍進程 349 $n = 0; 350 while ($n < count($fl_array)) { 351 $nStatus = -1; 352 $nPID = pcntl_wait($nStatus, WNOHANG); 353 if ($nPID > 0) { 354 //echo "{$nPID} exit\n"; 355 ++$n; 356 } 357 } 358 359 // 驗證結果,主要查看結果中是否每個任務都完成了 360 $arr2 = array(); 361 foreach(explode("\n", $sData) as $i) {// trim all 362 if (is_numeric(trim($i))) { 363 array_push($arr2, $i); 364 } 365 } 366 $arr2 = array_unique($arr2); 367 368 if ( count($arr2) == count($fl_array)) { 369 370 echo "ok".date('Y-m-d h:i:s'); 371 exit(); 372 } else { 373 echo "error count " . count($arr2) . date('Y-m-d h:i:s')."\n"; 374 exit(); 375 } 376 377 } 378 379 }else{ 380 echo "更新數據庫並刪除文件步驟錯誤".date('y-m-d h:i:s'); 381 exit(); 382 } 383 } 384 385 //獲取目標文件夾內文件 386 private function _getXmlList($path){ 387 $files = scandir($path); 388 $result = []; 389 foreach ($files as $file) { 390 if ($file != '.' && $file != '..') { 391 //判斷是否為xml文件 392 if(pathinfo($file)['extension'] == "xml"){ 393 if (is_dir($path . '/' . $file)) { 394 scanFile($path . '/' . $file); 395 } else { 396 $result[] = basename($file); 397 } 398 } 399 400 } 401 } 402 403 return $result; 404 } 405 }