前文提到的在系統設置Cache組件 Cache::getInstance()的時候,會去調用processManager去創建Cache的進程,然后以管道通信的方式進行設置緩存和獲取緩存。
Cache是以單例模式實現的。構造器會進行如下操作
//根據配置創建指定數目的Cache服務進程,然后啟動。 $num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默認配置數目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1 if($num <= 0){ return; } $this->cliTemp = new SplArray();//這個數組以后會給單元測試時候單獨使用,正常模式這個數組是不使用的 //若是在主服務創建,而非單元測試調用 if(ServerManager::getInstance()->getServer()){ //創建了一個swoole_table ,表名為__Cache,里面存儲data(后面就講到其實這里存儲的是操作Cache的指令)作用是用來做GC(防止Cache被撐爆) TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[ 'data'=>[ 'type'=>Table::TYPE_STRING, 'size'=>10*1024 ], 'microTime'=>[ 'type'=>Table::TYPE_STRING, 'size'=>15 ] ],2048); $this->processNum = $num; for ($i=0;$i < $num;$i++){ ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class); } }
ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)這句話才是Cache的核心邏輯。
ProcessManager::getInstance()這句話主要做了下面的操作
ProcessManager 的__construct構造函數創建了一個swoole_table,表名是process_hash_map
TableManager::getInstance()->add( 'process_hash_map',[ 'pid'=>[ 'type'=>Table::TYPE_INT, 'size'=>10 ] ],256 );
addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)這個代碼很簡單就是根據$i來設置進程名稱
addProcess 是在processList存儲CacheProcess::class的實例,具體代碼如下
$key = md5($processName); if(!isset($this->processList[$key])){ try{ $process = new $processClass($processName,$args,$async); $this->processList[$key] = $process; return true; }catch (\Throwable $throwable){ Trigger::throwable($throwable); return false; } }else{ trigger_error("you can not add the same name process : {$processName}.{$processClass}"); return false; }
那么CacheProcess::class的實例話做了什么操作呢
$this->cacheData = new SplArray();//這里很關鍵,為什么這么說每個Cache進程實際保存的緩存值都是在這里的,每個Cache進程都有自己的一個cacheData數組
$this->persistentTime = Config::getInstance()->getConf('EASY_CACHE.PERSISTENT_TIME');
parent::__construct($processName, $args);
CacheProcess::class繼承於AbstractProcess
AbstractProcess的構造方法
$this->async = $async; $this->args = $args; $this->processName = $processName; $this->swooleProcess = new \swoole_process([$this,'__start'],false,2); ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服務會addProcess一個Cache的任務進程。
__start方法主要是給swoole_table,表名為process_hash_map插入當前CacheProcess的進程名為key,進程IDpid為value。並且注冊進程退出的事件。
if(PHP_OS != 'Darwin'){ $process->name($this->getProcessName()); } TableManager::getInstance()->get('process_hash_map')->set( md5($this->processName),['pid'=>$this->swooleProcess->pid] ); ProcessManager::getInstance()->setProcess($this->getProcessName(),$this); if (extension_loaded('pcntl')) { pcntl_async_signals(true); } Process::signal(SIGTERM,function ()use($process){ $this->onShutDown(); TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName)); swoole_event_del($process->pipe); $this->swooleProcess->exit(0); }); if($this->async){ swoole_event_add($this->swooleProcess->pipe, function(){ $msg = $this->swooleProcess->read(64 * 1024); $this->onReceive($msg); }); } $this->run($this->swooleProcess);
$this->run($this->swooleProcess)這個函數是CacheProcess如果配置了persistentTime,就會開啟一個定時器定時去取$file = Config::getInstance()->getConf('TEMP_DIR')."/{$processName}.data";的數據備份,默認是0也就是不會去做定時數據落地的操作
看到這里才是Cache組件在第一次實例化的時候做的相關事情,總結就是創建了指定數量的Cache進程綁定到swoole服務器上。在全局的process_hash_map表中能找到對應的Cache進程ID。然后Cache進程是可以以管道方式來進行通信。
set緩存方法
public function set($key,$data) { if(!ServerManager::getInstance()->isStart()){ $this->cliTemp->set($key,$data); } if(ServerManager::getInstance()->getServer()){ $num = $this->keyToProcessNum($key); $msg = new Msg(); $msg->setCommand('set'); $msg->setArg('key',$key); $msg->setData($data); ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要緩存的數據,封裝成msg然后write給hash映射到的Cache進程 } }
當進程獲取到的時候會回調onReceive方法
public function onReceive(string $str,...$agrs) { // TODO: Implement onReceive() method. $msg = \swoole_serialize::unpack($str); $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME); if(count($table) > 1900){ //接近閾值的時候進行gc檢測 //遍歷Table 依賴pcre 如果發現無法遍歷table,檢查機器是否安裝pcre-devel //超過0.1s 基本上99.99%為無用數據。 $time = microtime(true); foreach ($table as $key => $item){ if(round($time - $item['microTime']) > 0.1){ $table->del($key); } } } if($msg instanceof Msg){ switch ($msg->getCommand()){ case 'set':{ $this->cacheData->set($msg->getArg('key'),$msg->getData()); break; } case 'get':{ $ret = $this->cacheData->get($msg->getArg('key')); $msg->setData($ret); $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); break; } case 'del':{ $this->cacheData->delete($msg->getArg('key')); break; } case 'flush':{ $this->cacheData->flush(); break; } case 'enQueue':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); $this->cacheData->set($msg->getArg('key'),$que); } $que->enqueue($msg->getData()); break; } case 'deQueue':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); $this->cacheData->set($msg->getArg('key'),$que); } $ret = null; if(!$que->isEmpty()){ $ret = $que->dequeue(); } $msg->setData($ret); //deQueue 有cli 服務未啟動的請求,但無token if(!empty($msg->getToken())){ $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); } break; } case 'queueSize':{ $que = $this->cacheData->get($msg->getArg('key')); if(!$que instanceof \SplQueue){ $que = new \SplQueue(); } $msg->setData($que->count()); $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); break; } } } }
這里一開始會進行緩存GC確保內存不會撐爆
set方法會直接給$this->cacheData,設置緩存值。
get方法比較特殊,它會去給Cache進程發送get的命令,然后Cache讀取到命令會將值寫到_Cache,Swoole_table表中。然后再去讀取(這個會有一個while循環,類似自旋)出緩存內容。這樣的好處,可以確保可以讀取到當時的數據緩存,不會因為高並發讀取到最新的緩存值內容。而且還能更有效的做gc,防止Cache內存撐爆。
public function get($key,$timeOut = 0.01) { if(!ServerManager::getInstance()->isStart()){ return $this->cliTemp->get($key); } $num = $this->keyToProcessNum($key); $token = Random::randStr(9);//這個是一個憑證,是確保獲取到自己此刻想獲取的cache數據,和事務類似為了保證可重復讀 $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num)); $msg = new Msg(); $msg->setArg('timeOut',$timeOut); $msg->setArg('key',$key); $msg->setCommand('get'); $msg->setToken($token); $process->getProcess()->write(\swoole_serialize::pack($msg)); return $this->read($token,$timeOut); }
$process->getProcess()->write(\swoole_serialize::pack($msg))發這個包給Cache進程,Cache進程會進行下面這些操作
$ret = $this->cacheData->get($msg->getArg('key'));//獲取到當前的緩存值 $msg->setData($ret); //將當前的內容設置到_Cache表中,token是請求的時候發過來的憑證原樣拼裝。這有什么好處呢,就是確保在高並發下,在A時刻獲取的緩存,不會拿到后面B時刻更新的值。 $table->set($msg->getToken(),[ 'data'=>\swoole_serialize::pack($msg), 'microTime'=>microtime(true) ]); $this->read($token,$timeOut);
//這里的操作是直接從_Cache表中獲取緩存數據,如果緩存存在並且進程調度沒有超時,然后在表中將取過數據的內容刪除掉返回 private function read($token,$timeOut) { $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME); $start = microtime(true); $data = null; while(true){ usleep(1); if($table->exist($token)){ $data = $table->get($token)['data']; $data = \swoole_serialize::unpack($data); if(!$data instanceof Msg){ $data = null; } break; } if(round($start - microtime(true),3) > $timeOut){ break; } } $table->del($token); if($data){ return $data->getData(); }else{ return null; } }