四 分析easyswoole源碼(啟動服務&Cache組件原理)


前文提到的在系統設置Cache組件 Cache::getInstance()的時候,會去調用processManager去創建Cache的進程,然后以管道通信的方式進行設置緩存和獲取緩存。

Cache是以單例模式實現的。構造器會進行如下操作

//根據配置創建指定數目的Cache服務進程,然后啟動。
$num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默認配置數目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1
if($num <= 0){
   return;
}
$this->cliTemp = new SplArray();//這個數組以后會給單元測試時候單獨使用,正常模式這個數組是不使用的
//若是在主服務創建,而非單元測試調用
if(ServerManager::getInstance()->getServer()){
    //創建了一個swoole_table ,表名為__Cache,里面存儲data(后面就講到其實這里存儲的是操作Cache的指令)作用是用來做GC(防止Cache被撐爆)
    TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[
        'data'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>10*1024
        ],
        'microTime'=>[
            'type'=>Table::TYPE_STRING,
            'size'=>15
        ]
    ],2048);
    $this->processNum = $num;
    for ($i=0;$i < $num;$i++){
        ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class);
    }
}

ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)這句話才是Cache的核心邏輯。

ProcessManager::getInstance()這句話主要做了下面的操作
ProcessManager 的__construct構造函數創建了一個swoole_table,表名是process_hash_map

TableManager::getInstance()->add(
    'process_hash_map',[
        'pid'=>[
            'type'=>Table::TYPE_INT,
            'size'=>10
        ]
    ],256
);

addProcess($this->generateProcessName($i),CacheProcess::class);
$this->generateProcessName($i)這個代碼很簡單就是根據$i來設置進程名稱
addProcess 是在processList存儲CacheProcess::class的實例,具體代碼如下

$key = md5($processName);
if(!isset($this->processList[$key])){
    try{

        $process = new $processClass($processName,$args,$async);
        $this->processList[$key] = $process;
        return true;
    }catch (\Throwable $throwable){
        Trigger::throwable($throwable);
        return false;
    }
}else{
    trigger_error("you can not add the same name process : {$processName}.{$processClass}");
    return false;
}

那么CacheProcess::class的實例話做了什么操作呢
$this->cacheData = new SplArray();//這里很關鍵,為什么這么說每個Cache進程實際保存的緩存值都是在這里的,每個Cache進程都有自己的一個cacheData數組
$this->persistentTime = Config::getInstance()->getConf('EASY_CACHE.PERSISTENT_TIME');
parent::__construct($processName, $args);
CacheProcess::class繼承於AbstractProcess
AbstractProcess的構造方法

$this->async = $async;
$this->args = $args;
$this->processName = $processName;
$this->swooleProcess = new \swoole_process([$this,'__start'],false,2);
ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服務會addProcess一個Cache的任務進程。

__start方法主要是給swoole_table,表名為process_hash_map插入當前CacheProcess的進程名為key,進程IDpid為value。並且注冊進程退出的事件。

if(PHP_OS != 'Darwin'){
    $process->name($this->getProcessName());
}
TableManager::getInstance()->get('process_hash_map')->set(
    md5($this->processName),['pid'=>$this->swooleProcess->pid]
);
ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
if (extension_loaded('pcntl')) {
    pcntl_async_signals(true);
}
Process::signal(SIGTERM,function ()use($process){
    $this->onShutDown();
    TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
    swoole_event_del($process->pipe);
    $this->swooleProcess->exit(0);
});
if($this->async){
    swoole_event_add($this->swooleProcess->pipe, function(){
        $msg = $this->swooleProcess->read(64 * 1024);
        $this->onReceive($msg);
    });
}
$this->run($this->swooleProcess);

$this->run($this->swooleProcess)這個函數是CacheProcess如果配置了persistentTime,就會開啟一個定時器定時去取$file = Config::getInstance()->getConf('TEMP_DIR')."/{$processName}.data";的數據備份,默認是0也就是不會去做定時數據落地的操作

看到這里才是Cache組件在第一次實例化的時候做的相關事情,總結就是創建了指定數量的Cache進程綁定到swoole服務器上。在全局的process_hash_map表中能找到對應的Cache進程ID。然后Cache進程是可以以管道方式來進行通信。

 

set緩存方法

public function set($key,$data)
{
    if(!ServerManager::getInstance()->isStart()){
        $this->cliTemp->set($key,$data);
    }
    if(ServerManager::getInstance()->getServer()){
        $num = $this->keyToProcessNum($key);
        $msg = new Msg();
        $msg->setCommand('set');
        $msg->setArg('key',$key);
        $msg->setData($data);
        ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(\swoole_serialize::pack($msg));//直接把需要緩存的數據,封裝成msg然后write給hash映射到的Cache進程
    }
}

當進程獲取到的時候會回調onReceive方法

public function onReceive(string $str,...$agrs)
{
    // TODO: Implement onReceive() method.

    $msg = \swoole_serialize::unpack($str);
    $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME);
    if(count($table) > 1900){
        //接近閾值的時候進行gc檢測
        //遍歷Table 依賴pcre 如果發現無法遍歷table,檢查機器是否安裝pcre-devel
        //超過0.1s 基本上99.99%為無用數據。
        $time = microtime(true);
        foreach ($table as $key => $item){
            if(round($time - $item['microTime']) > 0.1){
                $table->del($key);
            }
        }
    }
    if($msg instanceof Msg){
        switch ($msg->getCommand()){
            case 'set':{
                $this->cacheData->set($msg->getArg('key'),$msg->getData());
                break;
            }
            case 'get':{
                $ret = $this->cacheData->get($msg->getArg('key'));
                $msg->setData($ret);
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
            case 'del':{
                $this->cacheData->delete($msg->getArg('key'));
                break;
            }
            case 'flush':{
                $this->cacheData->flush();
                break;
            }
            case 'enQueue':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $que->enqueue($msg->getData());
                break;
            }
            case 'deQueue':{

                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                    $this->cacheData->set($msg->getArg('key'),$que);
                }
                $ret = null;
                if(!$que->isEmpty()){
                    $ret = $que->dequeue();
                }
                $msg->setData($ret);
                //deQueue 有cli 服務未啟動的請求,但無token
                if(!empty($msg->getToken())){
                    $table->set($msg->getToken(),[
                        'data'=>\swoole_serialize::pack($msg),
                        'microTime'=>microtime(true)
                    ]);
                }
                break;
            }
            case 'queueSize':{
                $que = $this->cacheData->get($msg->getArg('key'));
                if(!$que instanceof \SplQueue){
                    $que = new \SplQueue();
                }
                $msg->setData($que->count());
                $table->set($msg->getToken(),[
                    'data'=>\swoole_serialize::pack($msg),
                    'microTime'=>microtime(true)
                ]);
                break;
            }
        }
    }
}

這里一開始會進行緩存GC確保內存不會撐爆

set方法會直接給$this->cacheData,設置緩存值。

 

get方法比較特殊,它會去給Cache進程發送get的命令,然后Cache讀取到命令會將值寫到_Cache,Swoole_table表中。然后再去讀取(這個會有一個while循環,類似自旋)出緩存內容。這樣的好處,可以確保可以讀取到當時的數據緩存,不會因為高並發讀取到最新的緩存值內容。而且還能更有效的做gc,防止Cache內存撐爆。

public function get($key,$timeOut = 0.01)
{
    if(!ServerManager::getInstance()->isStart()){
        return $this->cliTemp->get($key);
    }
    $num = $this->keyToProcessNum($key);
    $token = Random::randStr(9);//這個是一個憑證,是確保獲取到自己此刻想獲取的cache數據,和事務類似為了保證可重復讀
    $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num));
    $msg = new  Msg();
    $msg->setArg('timeOut',$timeOut);
    $msg->setArg('key',$key);
    $msg->setCommand('get');
    $msg->setToken($token);
    $process->getProcess()->write(\swoole_serialize::pack($msg));
    return $this->read($token,$timeOut);
}

$process->getProcess()->write(\swoole_serialize::pack($msg))發這個包給Cache進程,Cache進程會進行下面這些操作

$ret = $this->cacheData->get($msg->getArg('key'));//獲取到當前的緩存值
$msg->setData($ret);
//將當前的內容設置到_Cache表中,token是請求的時候發過來的憑證原樣拼裝。這有什么好處呢,就是確保在高並發下,在A時刻獲取的緩存,不會拿到后面B時刻更新的值。
$table->set($msg->getToken(),[
    'data'=>\swoole_serialize::pack($msg),
    'microTime'=>microtime(true)
]);

$this->read($token,$timeOut);
//這里的操作是直接從_Cache表中獲取緩存數據,如果緩存存在並且進程調度沒有超時,然后在表中將取過數據的內容刪除掉返回
private function read($token,$timeOut)
{
    $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME);
    $start = microtime(true);
    $data = null;
    while(true){
        usleep(1);
        if($table->exist($token)){
            $data = $table->get($token)['data'];
            $data = \swoole_serialize::unpack($data);
            if(!$data instanceof Msg){
                $data = null;
            }
            break;
        }
        if(round($start - microtime(true),3) > $timeOut){
            break;
        }
    }
    $table->del($token);
    if($data){
        return $data->getData();
    }else{
        return null;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM