協程與Swoole的原理,相關應用以及適用場景等


什么是協程

協程(Coroutine)也叫用戶態線程,其通過協作而不是搶占來進行切換。相對於進程或者線程,協程所有的操作都可以在用戶態完成,創建和切換的消耗更低。協程是進程的補充,或者是互補關系。

         要理解是什么是“用戶態的線程”,必然就要先理解什么是“內核態的線程”。內核態的線程是由操作系統來進行調度的,在切換線程上下文時,要先保存上一個線程的上下文,然后執行下一個線程,當條件滿足時,切換回上一個線程,並恢復上下文。協程也是如此,只不過,用戶態的線程不是由操作系統來調度的,而是由程序員來調度的,就是所謂的用戶態的線程。

 

協程的執行流程  

 

協程的適用場景

高並發服務,如秒殺系統、高性能API接口、RPC服務器,使用協程模式,服務的容錯率會大大增加,某些接口出現故障時,不會導致整個服務崩潰。

      爬蟲,可實現非常巨大的並發能力,即使是非常慢速的網絡環境,也可以高效地利用帶寬。

      即時通信服務,如IM聊天、游戲服務器、物聯網、消息服務器等等,可以確保消息通信完全無阻塞,每個消息包均可即時地被處理。

 

協程與線程區別

Swoole的協程在底層實現上是單線程的,因此同一時間只有一個協程在工作,協程的執行是串行的。這與線程不同,多個線程會被操作系統調度到多個CPU並行執行。

一個協程正在運行時,其他協程會停止工作。當前協程執行阻塞IO操作時會掛起,底層調度器會進入事件循環。當有IO完成事件時,底層調度器恢復事件對應的協程的執行。

對CPU多核的利用,仍然依賴於Swoole引擎的多進程機制。

 

協程實現

1、swoole的兩種命名空間形式

Swoole支持兩種形式的命名空間一種是Swoole\Coroutine,2.2.0以上可使用Co\命名空間短命名簡化類名。

2、協程默認支持的位置

目前Swoole4僅有部分事件回調函數底層自動創建了協程,以下回調函數可以調用協程客戶端,可以查看這里https://wiki.swoole.com/wiki/page/696.html

 

在不支持協程的位置可以使用go或Co::create創建協程

3、協程的性能測試

通過多個協程連接redis操作對比沒有使用協程的方式

4、協程並發

協程其實也是阻塞運行的,如果,在一個執行中,比如同時查redis,再去查mysql,即使用了上面的協程,也是順序執行的。那么可不可以幾個協程並發執行呢?

通過延遲收包的形式獲取,遇到到IO 阻塞的時候,協程就掛起了,不會阻塞在那里等着網絡回報,而是繼續往下走,swoole當中可以用setDefer()方法聲明延遲收包然后通過recv()方法收包。

5、協程通訊

  使用本地內存,不同的進程之間內存是隔離的。只能在同一進程的不同協程內進行push和pop操作

向通道中寫入數據。

function Coroutine\Channel->push(mixed $data) : bool;

從通道中讀取數據。

function Coroutine\Channel->pop() : mixed;

對協程調用場景,最常見的“生產者-消費者”事件驅動模型,一個協程負責生產產品並將它們加入隊列,另一個負責從隊列中取出產品並使用它。

6、協程的注意問題

如果在多個協程間共用同一個協程客戶端,同步阻塞程序不同,協程是並發處理請求的,因此同一時間可能會有很多個請求在並行處理,一旦共用客戶端連接,就會導致不同協程之間發生數據錯亂。

 

swoole通用協程池的實現

swoole官方的協程池是用只能用在Redis。因為協程池代碼層耦合了Redis實例化邏輯。通過工廠函數實現了通用性。

class RedisPool
{
    /**
     * @var \Swoole\Coroutine\Channel
     */
    protected $pool;

    /**
     * RedisPool constructor.
     * @param int $size 連接池的尺寸
     */
    function __construct($size = 100)
    {
        $this->pool = new Swoole\Coroutine\Channel($size);
        for ($i = 0; $i < $size; $i++)
        {
            $redis = new Swoole\Coroutine\Redis();
            $res = $redis->connect('127.0.0.1', 6379);
            if ($res == false)
            {
                throw new RuntimeException("failed to connect redis server.");
            }
            else
            {
                $this->put($redis);
            }
        }
    }

    function put($redis)
    {
        $this->pool->push($redis);
    }

    function get()
    {
        return $this->pool->pop();
    }
}

 

利用工廠方法的改造如下:

<?php
/**
 * @author xialeistudio
 * @date 2019-05-20
 */

namespace swoole\foundation\pool;

use Swoole\Coroutine\Channel;

/**
 * Swoole generic connection pool
 * Class Pool
 * @package swoole\foundation\pool
 */
class GenericPool
{
    /**
     * @var int pool size
     */
    private $size = 0;
    /**
     * @var callable construct a connection
     */
    private $factory = null;
    /**
     * @var Channel
     */
    private $channel = null;

    /**
     * GenericPool constructor.
     * @param int $size
     * @param callable $factory
     * @throws InvalidParamException
     */
    public function __construct($size, callable $factory)
    {
        $this->size = $size;
        $this->factory = $factory;
        $this->init();
    }


    /**
     * check pool config
     * @throws InvalidParamException
     */
    private function init()
    {
        if ($this->size <= 0) {
            throw new InvalidParamException('The "size" property must be greater than zero.');
        }
        if (empty($this->factory)) {
            throw new InvalidParamException('The "factory" property must be set.');
        }
        if (!is_callable($this->factory)) {
            throw new InvalidParamException('The "factory" property must be callable.');
        }
        $this->bootstrap();
    }

    /**
     * bootstrap pool
     */
    private function bootstrap()
    {
        $this->channel = new Channel($this->size);

        for ($i = 0; $i < $this->size; $i++) {
            $this->channel->push(call_user_func($this->factory));
        }
    }

    /**
     * Acquire a connection
     * @param int $timeout
     * @return mixed
     */
    public function acquire($timeout = 0)
    {
        return $this->channel->pop($timeout);
    }

    /**
     * Release a resource
     * @param mixed $resource
     */
    public function release($resource)
    {
        $this->channel->push($resource);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM