最外層start.php,設置全局啟動模式,加載Application里的個子服務目錄下應用的啟動文件(start開頭,這些文件都是workman\work類的子類,在載入文件的同時,這些子服務會生成對象,work類的構造方法會把生成的work對象都存入work類的靜態變量$_workers,方便主文件后續設置以及啟動子服務,全局啟動模式子服務是不會啟動的),設置時區,注冊自動加載,調用workman\Worker:runall啟動所有服務。
主文件:
// 標記是全局啟動 define('GLOBAL_START', 1); require_once __DIR__ . '/Workerman/Autoloader.php'; // 加載所有Applications/*/start.php,以便啟動所有服務 foreach(glob(__DIR__.'/Applications/*/start*.php') as $start_file) { require_once $start_file; } // 運行所有服務 Worker::runAll();
子服務文件start_gateway,提供接入請求的服務(class Gateway extends Worker):
// gateway 進程,這里使用Text協議,可以用telnet測試 $gateway = new Gateway("Text://0.0.0.0:8282"); // gateway名稱,status方便查看 $gateway->name = 'YourAppGateway'; // gateway進程數 $gateway->count = 4; // 本機ip,分布式部署時使用內網ip $gateway->lanIp = '127.0.0.1'; // 內部通訊起始端口,假如$gateway->count=4,起始端口為4000 // 則一般會使用4001 4002 4003 4004 4個端口作為內部通訊端口 $gateway->startPort = 2300; // 心跳間隔 //$gateway->pingInterval = 10; // 心跳數據 //$gateway->pingData = '{"type":"ping"}'; /* // 當客戶端連接上來時,設置連接的onWebSocketConnect,即在websocket握手時的回調 $gateway->onConnect = function($connection) { $connection->onWebSocketConnect = function($connection , $http_header) { // 可以在這里判斷連接來源是否合法,不合法就關掉連接 // $_SERVER['HTTP_ORIGIN']標識來自哪個站點的頁面發起的websocket鏈接 if($_SERVER['HTTP_ORIGIN'] != 'http://kedou.workerman.net') { $connection->close(); } // onWebSocketConnect 里面$_GET $_SERVER是可用的 // var_dump($_GET, $_SERVER); }; }; */ // 如果不是在根目錄啟動,則運行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
接入服務除了注冊到全局靜態$_workers變量,還設置了路由:
public function __construct($socket_name, $context_option = array()) { parent::__construct($socket_name, $context_option); //隨機返回一個bussness的連接 $this->router = array("\\GatewayWorker\\Gateway", 'routerRand'); $backrace = debug_backtrace(); $this->_appInitPath = dirname($backrace[0]['file']); }
子服務文件start_bussinessworker,提供實際的業務處理,和gateway服務內部通訊類似nginx與php(class BusinessWorker extends Worker):
// bussinessWorker 進程 $worker = new BusinessWorker(); // worker名稱 $worker->name = 'YourAppBusinessWorker'; // bussinessWorker進程數量 $worker->count = 4; // 如果不是在根目錄啟動,則運行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
附帶基類work的構造函數:
/** * worker構造函數 * * @param string $socket_name * @param array $context_option */ public function __construct($socket_name = '', $context_option = array()) { // 保存worker實例 $this->workerId = spl_object_hash($this); self::$_workers[$this->workerId] = $this; self::$_pidMap[$this->workerId] = array(); // 獲得實例化文件路徑,用於自動加載設置根目錄 $backrace = debug_backtrace(); $this->_appInitPath = dirname($backrace[0]['file']); // 設置socket上下文 if($socket_name) { $this->_socketName = $socket_name; if(!isset($context_option['socket']['backlog'])) { $context_option['socket']['backlog'] = self::DEFAUL_BACKLOG; } $this->_context = stream_context_create($context_option); } }
這里可以看到self::$_workers[$this->workerId] = $this;記錄全局worker實例,
self::$_pidMap這個用來記錄各個子服務開始fork后的所有子進程id
$context_option['socket']['backlog'] = self::DEFAUL_BACKLOG;設置嵌套字上下文里的未accept隊列長度
在后面運行實例的listen方法監聽的時候會傳遞到stream_socket_server方法里。
runAll的流程如下:
/** * 運行所有worker實例 * @return void */ public static function runAll() { // 初始化環境變量(pid文件,log文件,status文件,定時器信號回調) self::init(); // 解析命令(運行,重啟,停止,重載,狀態,從命令判斷主進程是否以守護進程啟動) // 啟動之后通過php start.php XXX命令會到這里!因為第一步設置了這個文件的pid(這里可以看到pid對應到文件位置的重要性),所以后面的命令會對應為發送信號 self::parseCommand(); // 嘗試以守護進程模式運行(fork兩次進程,重置進程sid) self::daemonize(); // 初始化所有worker實例,主要是監聽端口(記錄所有子服務worker實例的最長名稱name長度,最長嵌套字名socket_name長度,最長運行用戶名user長度; // 所有定義了協議的子服務(Gate)開始監聽也就是啟動服務,子服務實例監聽嵌套字對象mainSoctke,還沒有注冊accept回調) // 到這里所有Gate子服務啟動監聽,但是沒有accept self::initWorkers(); // 初始化所有信號處理函數(為主進程注冊stop,stats,reload的信號回調signalHandler) self::installSignal(); // 保存主進程pid(將獲取daemonize方法后的新的主進程sid,存入init方法后的pid文件) self::saveMasterPid(); // 創建子進程(worker進程)並運行(主進程通過self::$_pidMap用來記錄子服務進程創建的各自進程號,方便后面發送信號,
// 生成的子進程置空主進程的全局變量,self::$_pidMap,self::$_workers,如果在子服務文件里定義了self::$stdoutFile文件地址,
// 會重定向子服務子進程的stdout和stderr,直接運行work實例的run方法)
// 到這里子服務已經在子進程中運行,后面的代碼就只有主服務執行
// 子進程的run方法會通過libevent綁定子服務mainSocket的accept回調,在accept回調方法里才有定義后面怎么處理請求socket
// 子進程的run方法會通過libevent綁定重新綁定信號量,以及用libevent來注入定時器
// 子進程的run方法會回調用戶在子服務文件里的onWorkStar方法
// 子進程進入事件監聽輪詢
// 上面是基類中的run方法,基於gateway的子服務,會實現自己的onworkStar方法,然后在調用基類的run,這樣可以在onWorkStar里實現gate與worker的連接
// 這里不知道怎么處理子進程對accpet時候的驚群 self::forkWorkers(); // 展示啟動界面(打印所有啟動的子服務的信息,由於initWorkers獲取了各個子服務實例的名稱等信息長度可以很好的格式化展示) self::displayUI(); // 嘗試重定向標准輸入輸出(重定向主服務進程) self::resetStd(); // 監控所有子進程(worker進程)(處理主進程的信號量;通過pcntl_wait循環監聽子進程狀態,保持子進程的運行) /* 什么是平滑重啟? 平滑重啟不同於普通的重啟,平滑重啟可以做到在不影響用戶的情況下重啟服務,以便重新載入PHP程序,完成業務代碼更新。 平滑重啟一般應用於業務更新或者版本發布過程中,能夠避免因為代碼發布重啟服務導致的暫時性服務不可用的影響。 注意:只有在on{...}回調中載入的文件平滑重啟后才會自動更新,啟動腳本中直接載入的文件或者寫死的代碼運行reload不會自動更新。 平滑重啟原理 WorkerMan分為主進程和子進程,主進程負責監控子進程,子進程負責接收客戶端的連接和連接上發來的請求數據, 做相應的處理並返回數據給客戶端。當業務代碼更新時,其實我們只要更新子進程,便可以達到更新代碼的目的。 當WorkerMan主進程收到平滑重啟信號時,主進程會向其中一個子進程發送安全退出(讓對應進程處理完畢當前請求后才退出)信號, 當這個進程退出后,主進程會重新創建一個新的子進程(這個子進程載入了新的PHP代碼),然后主進程再次向另外一個舊的進程發送停止 命令,這樣一個進程一個進程的重啟,直到所有舊的進程全部被置換為止。 我們看到平滑重啟實際上是讓舊的業務進程逐個退出然后並逐個創建新的進程做到的。為了在平滑重啟時不影響客用戶,這就要求進程中不 要保存用戶相關的狀態信息,即業務進程最好是無狀態的,避免由於進程退出導致信息丟失。 */ //上面是官網對平滑啟動的說明,設計的代碼就是reload方法的 $one_worker_pid = current(self::$_pidsToRestart );這里是處理主進程的平滑啟動信號的 // 在主進程里獲取所有設置為可以平滑啟動的子進程的pid,然后取一個發送平滑啟動信號信號,這個信號到子進程,其實子進程會通過stopAll方法停止運行 // exit(0); // 主進程監聽到子進程退出,然后重新生成一個新的子進程,然后把這個子進程的id從self::$_pidsToRestart里刪除,然后再次調用reload方法去殺掉下一個子進程 // self::monitorWorkers(); }
主要的過程已經描述清楚了,主服務在主進程里,子服務開啟監聽之后,主服務開始fork,然后記錄子服務進程的對應的pid,然后通過信號量來處理用戶命令以及管理子服務進程。子服務在子進程里實現accpet監聽回調。
work基類的主要代碼片段:
子服務listen: // 獲得應用層通訊協議以及監聽的地址,udp會轉換為傳輸協議 list($scheme, $address) = explode(':', $this->_socketName, 2); // 如果有指定應用層協議,則檢查對應的協議類是否存在 if($scheme != 'tcp' && $scheme != 'udp') { $scheme = ucfirst($scheme); $this->_protocol = '\\Protocols\\'.$scheme; if(!class_exists($this->_protocol)) { $this->_protocol = "\\Workerman\\Protocols\\$scheme"; if(!class_exists($this->_protocol)) { throw new Exception("class \\Protocols\\$scheme not exist"); } } } elseif($scheme === 'udp') { $this->transport = 'udp'; } // flag $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN; $errno = 0; $errmsg = ''; $this->_mainSocket = stream_socket_server($this->transport.":".$address, $errno, $errmsg, $flags, $this->_context); if(!$this->_mainSocket) { throw new Exception($errmsg); }
創建子進程: // 創建子進程 while(count(self::$_pidMap[$worker->workerId]) < $worker->count) { static::forkOneWorker($worker); } } } /** * 創建一個子進程 * @param Worker $worker * @throws Exception */ protected static function forkOneWorker($worker) { $pid = pcntl_fork(); // 主進程記錄子進程pid if($pid > 0) { self::$_pidMap[$worker->workerId][$pid] = $pid; } // 子進程運行 elseif(0 === $pid) { // 啟動過程中嘗試重定向標准輸出 if(self::$_status === self::STATUS_STARTING) { self::resetStd(); } self::$_pidMap = array(); self::$_workers = array($worker->workerId => $worker); Timer::delAll(); self::setProcessTitle('WorkerMan: worker process ' . $worker->name . ' ' . $worker->getSocketName()); self::setProcessUser($worker->user); $worker->run(); exit(250); } else { throw new Exception("forkOneWorker fail"); } }
子進程執行的基類run方法: /** * 運行worker實例 */ public function run() { // 注冊進程退出回調,用來檢查是否有錯誤 register_shutdown_function(array("\\Workerman\\Worker", 'checkErrors')); // 設置自動加載根目錄 Autoloader::setRootPath($this->_appInitPath); // 如果沒有全局事件輪詢,則創建一個 if(!self::$globalEvent) { if(extension_loaded('libevent')) { self::$globalEvent = new Libevent(); } else { self::$globalEvent = new Select(); } // 監聽_mainSocket上的可讀事件(客戶端連接事件)也只有Gate才有這個事件 if($this->_socketName) { if($this->transport !== 'udp') { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection')); } else { self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection')); } } } // 重新安裝事件處理函數,使用全局事件輪詢監聽信號事件 self::reinstallSignal(); // 用全局事件輪詢初始化定時器 Timer::init(self::$globalEvent); // 如果有設置進程啟動回調,則執行 if($this->onWorkerStart) { call_user_func($this->onWorkerStart, $this); } // 子進程主循環 self::$globalEvent->loop(); }
主服務監聽子服務進程: pcntl_signal_dispatch(); // 掛起進程,直到有子進程退出或者被信號打斷 $status = 0; $pid = pcntl_wait($status, WUNTRACED); // 如果有信號到來,嘗試觸發信號處理函數 pcntl_signal_dispatch(); // 有子進程退出 if($pid > 0) { // 查找是哪個進程組的,然后再啟動新的進程補上 foreach(self::$_pidMap as $worker_id => $worker_pid_array) { if(isset($worker_pid_array[$pid])) { $worker = self::$_workers[$worker_id]; // 檢查退出狀態 if($status !== 0) { self::log("worker[".$worker->name.":$pid] exit with status $status"); } // 統計,運行status命令時使用 if(!isset(self::$_globalStatistics['worker_exit_info'][$worker_id][$status])) { self::$_globalStatistics['worker_exit_info'][$worker_id][$status] = 0; } self::$_globalStatistics['worker_exit_info'][$worker_id][$status]++; // 清除子進程信息 unset(self::$_pidMap[$worker_id][$pid]); break; } } // 如果不是關閉狀態,則補充新的進程 if(self::$_status !== self::STATUS_SHUTDOWN) { self::forkWorkers(); // 如果該進程是因為運行reload命令退出,則繼續執行reload流程 if(isset(self::$_pidsToRestart[$pid])) { unset(self::$_pidsToRestart[$pid]); self::reload(); } }
平滑啟動過程: /** * 執行平滑重啟流程 * @return void */ protected static function reload() { // 主進程部分 if(self::$_masterPid === posix_getpid()) { // 設置為平滑重啟狀態 if(self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN) { self::log("Workerman[".basename(self::$_startFile)."] reloading"); self::$_status = self::STATUS_RELOADING; } // 如果有worker設置了reloadable=false,則過濾掉 $reloadable_pid_array = array(); foreach(self::$_pidMap as $worker_id =>$worker_pid_array) { $worker = self::$_workers[$worker_id]; if($worker->reloadable) { foreach($worker_pid_array as $pid) { $reloadable_pid_array[$pid] = $pid; } } } // 得到所有可以重啟的進程 self::$_pidsToRestart = array_intersect(self::$_pidsToRestart , $reloadable_pid_array); // 平滑重啟完畢 if(empty(self::$_pidsToRestart)) { if(self::$_status !== self::STATUS_SHUTDOWN) { self::$_status = self::STATUS_RUNNING; } return; } // 繼續執行平滑重啟流程 $one_worker_pid = current(self::$_pidsToRestart ); // 給子進程發送平滑重啟信號 posix_kill($one_worker_pid, SIGUSR1); // 定時器,如果子進程在KILL_WORKER_TIMER_TIME秒后沒有退出,則強行殺死 Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false); } // 子進程部分 else { // 如果當前worker的reloadable屬性為真,則執行退出 $worker = current(self::$_workers); if($worker->reloadable) { self::stopAll(); } } }
到這里主進程已經准備好了,子進程(Gate)已經開始監聽了(還未講gate與worker的連接通信,以及gate怎么接受請求,然后worker怎么處理請求)。上面說了gate與worker的連接是在OnWorkStart里實現的。后面就來看看
gate的run方法里保存了用戶自定義的方法,然后自己的onWorkStart,onConnect,onMessage,onClose,onWorkstop都已定義好
/** * 運行 * @see Workerman.Worker::run() */ public function run() { // 保存用戶的回調,當對應的事件發生時觸發 $this->_onWorkerStart = $this->onWorkerStart; $this->onWorkerStart = array($this, 'onWorkerStart'); // 保存用戶的回調,當對應的事件發生時觸發 $this->_onConnect = $this->onConnect; $this->onConnect = array($this, 'onClientConnect'); // onMessage禁止用戶設置回調 $this->onMessage = array($this, 'onClientMessage'); // 保存用戶的回調,當對應的事件發生時觸發 $this->_onClose = $this->onClose; $this->onClose = array($this, 'onClientClose'); // 保存用戶的回調,當對應的事件發生時觸發 $this->_onWorkerStop = $this->onWorkerStop; $this->onWorkerStop = array($this, 'onWorkerStop'); // 記錄進程啟動的時間 $this->_startTime = time(); // 運行父方法 parent::run(); }
在看看他的OnworkStart方法,也就是子進程運行后執行的方法
/** * 當Gateway啟動的時候觸發的回調函數 * @return void */ public function onWorkerStart() { // 分配一個內部通訊端口 // 主進程pid-子進程pid+startPort保證每個子進程的內部端口不同 $this->lanPort = function_exists('posix_getppid') ? $this->startPort - posix_getppid() + posix_getpid() : $this->startPort; if($this->lanPort<0 || $this->lanPort >=65535) { $this->lanPort = rand($this->startPort, 65535); } // 如果有設置心跳,則定時執行 if($this->pingInterval > 0) { $timer_interval = $this->pingNotResponseLimit > 0 ? $this->pingInterval/2 : $this->pingInterval; Timer::add($timer_interval, array($this, 'ping')); } //別名內部通訊協議 if(!class_exists('\Protocols\GatewayProtocol')) { class_alias('\GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol'); } // 初始化gateway內部的監聽,用於監聽worker的連接已經連接上發來的數據
// 這里內部鏈接在同一個ip+端口的情況下有兩個服務
// 這個時候listen由於全局的事件self::$globalEvent在子進程的run方法里在回調OnWorkStart之前已經定義,所以不像主進程一樣在listen的不監聽accept事件 $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerTcpWorker->listen(); $this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}"); $this->_innerUdpWorker->transport = 'udp'; $this->_innerUdpWorker->listen(); // 重新設置自動加載根目錄 Autoloader::setRootPath($this->_appInitPath); // 設置內部監聽的相關回調 $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage'); $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect'); $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose'); // 注冊gateway的內部通訊地址,worker去連這個地址,以便gateway與worker之間建立起TCP長連接 if(!$this->registerAddress()) { $this->log('registerAddress fail and exit'); Worker::stopAll(); } if($this->_onWorkerStart) { call_user_func($this->_onWorkerStart, $this); } }
可以看到他有新建了兩個個work實例,innerTcpWorker,innerUdpWorker,並且在同一個地址{$this->lanIp}:{$this->lanPort}下開啟了兩個服務,並且注冊了與worker通信的回調事件onWorkerConnect,onWorkerMessage,onWorkerClose。使用的協議是內部通訊協議GatewayProtocol。registerAddress方法通過文件鎖把這個子進程的內部通訊服務地址{$this->lanIp}:{$this->lanPort}記錄到一個公共地方,可能是文件,可能是memcache,可能是redis,后兩種支持分布式部署gate與work,不然就要走同一台機器上。用后面兩種可以部署多個gate,然后其他機器部署work。這時候定義的內部服務通過listen方法已經有accept監聽事件了,如果work跟與gate連接就會進入到設置的Worker的回調方法里,客戶端與Gate連接就會進入到Client方法里,因為他們是兩種不同的work實例,監聽的不同的端口。
到這里Gate子服務已經准備好了,除了自己是work實例提供給客戶端的連接服務,被主進程管理之外;每個子Gate進程都會在新建一個work實例來提供對worker子進程的訪問服務;對客戶端的服務有new Gate的時候指定協議,對子worker進程的服務是默認協議,並且tcp與udp都監聽了。后面的步奏應該是子worker進程在workstart方法里從子Gate服務建立內部服務是注冊全局的內部通訊服務地址,連接到Gate,這樣gate的內部服務監聽就把子worker服務的地址記錄下來。
子worker進程服務通過tcp與gate進程服務通信,通過在連接上的監聽,實現消息的傳遞,worker進程在通過與Event文件的回調來通知客戶端的請求,Event處理完畢之后,通過lib/gateway文件,直接udp到Gate來實現信息的傳遞。
分布式中的每台機器有主進程管理子進程,子Gate進程處理監聽客戶度與內部Work進程,Gate記錄全局的客戶端id與Gate的對應關系到全局儲存器,也記錄自己的內部服務地址到全局存儲器。每個子Gate進程記錄連接自己的內部worker地址。然后子worker啟動時候從全局內部服務地址取地址進行tcp連接,記錄與自己連接的Gate地址,client,gate,work直接的通信就打通了,如果一個客戶端要與另一個客戶端通信,在Event處理時從全局的client與gate的對以關系里得到要發送的client連接的gate,然后給這個gate發送udp信息,再由gate轉發到對的client。
下面看看基類的accept方法:
/** * 接收一個客戶端連接 * @param resource $socket * @return void */ public function acceptConnection($socket) { // 獲得客戶端連接 $new_socket = @stream_socket_accept($socket, 0); // 驚群現象,忽略 if(false === $new_socket) { return; } // 初始化連接對象 $connection = new TcpConnection($new_socket); $this->connections[$connection->id] = $connection; $connection->worker = $this; $connection->protocol = $this->_protocol; $connection->onMessage = $this->onMessage; $connection->onClose = $this->onClose; $connection->onError = $this->onError; $connection->onBufferDrain = $this->onBufferDrain; $connection->onBufferFull = $this->onBufferFull; // 如果有設置連接回調,則執行 if($this->onConnect) { try { call_user_func($this->onConnect, $connection); } catch(Exception $e) { ConnectionInterface::$statistics['throw_exception']++; self::log($e); } } } /** * 處理udp連接(udp其實是無連接的,這里為保證和tcp連接接口一致) * * @param resource $socket * @return bool */ public function acceptUdpConnection($socket) { $recv_buffer = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $remote_address); if(false === $recv_buffer || empty($remote_address)) { return false; } // 模擬一個連接對象 $connection = new UdpConnection($socket, $remote_address); if($this->onMessage) { if($this->_protocol) { /** @var \Workerman\Protocols\ProtocolInterface $parser */ $parser = $this->_protocol; $recv_buffer = $parser::decode($recv_buffer, $connection); } ConnectionInterface::$statistics['total_request']++; try { call_user_func($this->onMessage, $connection, $recv_buffer); } catch(Exception $e) { ConnectionInterface::$statistics['throw_exception']++; } } }
整體上來說就是tcp就是新建一個客戶端連接,然后使用tcpConnecttion類封裝,包括通信協議,然后回調onConnect事件;udp直接從連接獲取數據,然后通過協議解析數據,回調onMessage方法。
gate的內部服務建立好了,再看看business子服務的run方法:
/** * 運行 * @see Workerman.Worker::run() */ public function run() { $this->_onWorkerStart = $this->onWorkerStart; $this->onWorkerStart = array($this, 'onWorkerStart'); parent::run(); } /** * 當進程啟動時一些初始化工作 * @return void */ protected function onWorkerStart() { if(!class_exists('\Protocols\GatewayProtocol')) { class_alias('\GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtocol'); } Timer::add(1, array($this, 'checkGatewayConnections')); $this->checkGatewayConnections(); \GatewayWorker\Lib\Gateway::setBusinessWorker($this); if($this->_onWorkerStart) { call_user_func($this->_onWorkerStart, $this); } }
這里business子服務直接就是循環連接所有的Gate了。
連接打通之后,怎么通信呢,這就要看事件驅動管理以及協議了。