什么是協程
協程(Coroutine)也叫用戶態線程,其通過協作而不是搶占來進行切換。相對於進程或者線程,協程所有的操作都可以在用戶態完成,創建和切換的消耗更低。協程是進程的補充,或者是互補關系。
要理解是什么是“用戶態的線程”,必然就要先理解什么是“內核態的線程”。內核態的線程是由操作系統來進行調度的,在切換線程上下文時,要先保存上一個線程的上下文,然后執行下一個線程,當條件滿足時,切換回上一個線程,並恢復上下文。協程也是如此,只不過,用戶態的線程不是由操作系統來調度的,而是由程序員來調度的,就是所謂的用戶態的線程。
協程的執行流程
協程的適用場景
高並發服務,如秒殺系統、高性能API接口、RPC服務器,使用協程模式,服務的容錯率會大大增加,某些接口出現故障時,不會導致整個服務崩潰。
爬蟲,可實現非常巨大的並發能力,即使是非常慢速的網絡環境,也可以高效地利用帶寬。
即時通信服務,如IM聊天、游戲服務器、物聯網、消息服務器等等,可以確保消息通信完全無阻塞,每個消息包均可即時地被處理。
協程與線程區別
Swoole的協程在底層實現上是單線程的,因此同一時間只有一個協程在工作,協程的執行是串行的。這與線程不同,多個線程會被操作系統調度到多個CPU並行執行。
一個協程正在運行時,其他協程會停止工作。當前協程執行阻塞IO操作時會掛起,底層調度器會進入事件循環。當有IO完成事件時,底層調度器恢復事件對應的協程的執行。
對CPU多核的利用,仍然依賴於Swoole引擎的多進程機制。
協程實現
1、swoole的兩種命名空間形式
Swoole支持兩種形式的命名空間一種是Swoole\Coroutine,2.2.0以上可使用Co\命名空間短命名簡化類名。
2、協程默認支持的位置
目前Swoole4僅有部分事件回調函數底層自動創建了協程,以下回調函數可以調用協程客戶端,可以查看這里https://wiki.swoole.com/wiki/page/696.html
在不支持協程的位置可以使用go或Co::create創建協程
3、協程的性能測試
通過多個協程連接redis操作對比沒有使用協程的方式
4、協程並發
協程其實也是阻塞運行的,如果,在一個執行中,比如同時查redis,再去查mysql,即使用了上面的協程,也是順序執行的。那么可不可以幾個協程並發執行呢?
通過延遲收包的形式獲取,遇到到IO 阻塞的時候,協程就掛起了,不會阻塞在那里等着網絡回報,而是繼續往下走,swoole當中可以用setDefer()方法聲明延遲收包然后通過recv()方法收包。
5、協程通訊
使用本地內存,不同的進程之間內存是隔離的。只能在同一進程的不同協程內進行push和pop操作
向通道中寫入數據。
function Coroutine\Channel->push(mixed $data) : bool;
從通道中讀取數據。
function Coroutine\Channel->pop() : mixed;
對協程調用場景,最常見的“生產者-消費者”事件驅動模型,一個協程負責生產產品並將它們加入隊列,另一個負責從隊列中取出產品並使用它。
6、協程的注意問題
如果在多個協程間共用同一個協程客戶端,同步阻塞程序不同,協程是並發處理請求的,因此同一時間可能會有很多個請求在並行處理,一旦共用客戶端連接,就會導致不同協程之間發生數據錯亂。
swoole通用協程池的實現
swoole官方的協程池是用只能用在Redis。因為協程池代碼層耦合了Redis實例化邏輯。通過工廠函數實現了通用性。
class RedisPool
{
/**
* @var \Swoole\Coroutine\Channel
*/
protected $pool;
/**
* RedisPool constructor.
* @param int $size 連接池的尺寸
*/
function __construct($size = 100)
{
$this->pool = new Swoole\Coroutine\Channel($size);
for ($i = 0; $i < $size; $i++)
{
$redis = new Swoole\Coroutine\Redis();
$res = $redis->connect('127.0.0.1', 6379);
if ($res == false)
{
throw new RuntimeException("failed to connect redis server.");
}
else
{
$this->put($redis);
}
}
}
function put($redis)
{
$this->pool->push($redis);
}
function get()
{
return $this->pool->pop();
}
}
利用工廠方法的改造如下:
<?php
/**
* @author xialeistudio
* @date 2019-05-20
*/
namespace swoole\foundation\pool;
use Swoole\Coroutine\Channel;
/**
* Swoole generic connection pool
* Class Pool
* @package swoole\foundation\pool
*/
class GenericPool
{
/**
* @var int pool size
*/
private $size = 0;
/**
* @var callable construct a connection
*/
private $factory = null;
/**
* @var Channel
*/
private $channel = null;
/**
* GenericPool constructor.
* @param int $size
* @param callable $factory
* @throws InvalidParamException
*/
public function __construct($size, callable $factory)
{
$this->size = $size;
$this->factory = $factory;
$this->init();
}
/**
* check pool config
* @throws InvalidParamException
*/
private function init()
{
if ($this->size <= 0) {
throw new InvalidParamException('The "size" property must be greater than zero.');
}
if (empty($this->factory)) {
throw new InvalidParamException('The "factory" property must be set.');
}
if (!is_callable($this->factory)) {
throw new InvalidParamException('The "factory" property must be callable.');
}
$this->bootstrap();
}
/**
* bootstrap pool
*/
private function bootstrap()
{
$this->channel = new Channel($this->size);
for ($i = 0; $i < $this->size; $i++) {
$this->channel->push(call_user_func($this->factory));
}
}
/**
* Acquire a connection
* @param int $timeout
* @return mixed
*/
public function acquire($timeout = 0)
{
return $this->channel->pop($timeout);
}
/**
* Release a resource
* @param mixed $resource
*/
public function release($resource)
{
$this->channel->push($resource);
}
}