Pool對象是多個Worker對象的容器,同時也是它們的控制器,對Worker功能更高抽象。
從結果可以看出,第一條記錄跟最后一條是相同的,再沒有pool->submit之前$sqls數組中的對象都是正確的,submit之后第一個對象的數據就改變了,不知道是不是pthreads的BUG。
第一條記錄為什么會有問題,前面我已經說過了。這里我們看20個MyWork對象,它們順序的加入到5個MyWorker對象中,如果第一條記錄沒有問題的話,它們分別加入到1,2,3,4,5的MyWorker中,然后遠行run方法。
比如Worker是河,而線程是運行在河里的船。Pool則是管理着多條河。
<?php //繼承Collectable垃圾收集類,好讓Pool::collect進行收集 class Sql extends Collectable { private $sql = ''; private $data = array(); public function __construct($sql) { $this->sql = $sql; } public function run() { $db = $this->worker->getDb(); $res = mysql_query($this->sql, $db); $tmp = array(); while($row = mysql_fetch_assoc($res)) { //這里不能使用$this->data[] = $row;這種方式。 $tmp[] = $row; } $this->data = $tmp; //這里工作完后,設置為垃圾 //在Pool::collect中isGarbage()判斷時則為真 $this->setGarbage(); } public function getData() { return $this->data; } } class SqlWorker extends Worker { protected static $db = null; public function getDb() { if(!self::$db) { self::$db = mysql_connect('127.0.0.1', 'root', ''); mysql_select_db('test', self::$db); } return self::$db; } } //這里創建5個Worker對象的Pool線程池 $pool = new Pool(5, 'SqlWorker'); //我們創建20個Sql線程對象,並提交到Pool的Worker中 $sqls = array(); for($ix = 0; $ix < 20; ++$ix) { $sql = new Sql("select * from test order by id limit {$ix},1"); $sqls[] = $sql; //$pool->submit($sql);不要在這里submit } //注意,這里循環提交$sql //如果把$pool->submit放到前面的for循環內,會出現一個錯誤 //第一個Sql對象的sql語句會跟最后一個相同,導致結果出現問題 foreach($sqls as $sql) { //這里的submit方法有問題,它會修改$sqls //導致第一個Sql對象與最后一個相同 //不知是不是BUG $pool->submit($sql); } //等待隊列中執行完成 $pool->shutdown(); $ret = array(); foreach($sqls as $sql) { $ret[] = $sql->getData(); } file_put_contents('ret.txt', var_export($ret, true)); //回收已完成的對象 $pool->collect(function($sql){ return $sql->isGarbage(); });
array ( 0 => array ( 0 => array ( 'id' => '20', 'name' => 'mmm', ), ), 1 => array ( 0 => array ( 'id' => '2', 'name' => '222', ), ), 2 => array ( 0 => array ( 'id' => '3', 'name' => '333', ), ), 3 => array ( 0 => array ( 'id' => '4', 'name' => '444', ), ), 4 => array ( 0 => array ( 'id' => '5', 'name' => '555', ), ), 5 => array ( 0 => array ( 'id' => '6', 'name' => '666', ), ), 6 => array ( 0 => array ( 'id' => '7', 'name' => '777', ), ), 7 => array ( 0 => array ( 'id' => '8', 'name' => '888', ), ), 8 => array ( 0 => array ( 'id' => '9', 'name' => '999', ), ), 9 => array ( 0 => array ( 'id' => '10', 'name' => 'aaa', ), ), 10 => array ( 0 => array ( 'id' => '11', 'name' => 'bbb', ), ), 11 => array ( 0 => array ( 'id' => '12', 'name' => 'ccc', ), ), 12 => array ( 0 => array ( 'id' => '13', 'name' => 'ddd', ), ), 13 => array ( 0 => array ( 'id' => '14', 'name' => 'eee', ), ), 14 => array ( 0 => array ( 'id' => '15', 'name' => 'fff', ), ), 15 => array ( 0 => array ( 'id' => '16', 'name' => 'ggg', ), ), 16 => array ( 0 => array ( 'id' => '17', 'name' => 'vvv', ), ), 17 => array ( 0 => array ( 'id' => '18', 'name' => 'hhh', ), ), 18 => array ( 0 => array ( 'id' => '19', 'name' => 'nnn', ), ), 19 => array ( 0 => array ( 'id' => '20', 'name' => 'mmm', ), ), )
上述代碼我們通過創建一個包含5個SqlWorker對象的pool,然后創建20個Sql對象加入到pool中。
當然我們的Sql類並不一定非要繼承自Collectable類,我們也可自定義判斷什么時候可回收。
<?php //繼承Threaded類,Threaded提供了隱式的線程安全機制 //這個對象中的所有操作都是線程安全的 class MyWork extends Threaded { private $name = ''; private $do = false; private $data = ''; public function __construct($name) { $this->name = $name; } public function run() { $this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] \r\n"; //通過do來判斷是否完成 //如果為true,則讓Pool::collect回收 $this->do = true; } public function isDo() { return $this->do; } public function getData() { return $this->data; } } class MyWorker extends Worker { public static $name = 0; public function __construct() { self::$name++; } public function run() { } public function getName() { return self::$name; } } $pool = new Pool(5, 'MyWorker'); $works = array(); for($ix = 0; $ix < 20; ++$ix) { $work = new MyWork($ix); $works[] = $work; } foreach($works as $work) { $pool->submit($work); } $pool->shutdown(); foreach($works as $work) { echo $work->getData(); } //回收已完成的對象 $pool->collect(function($work){ //我們通過自定義函數isDo來判斷對象是否執行完畢 return $work->isDo(); });
執行結果如下:
19 run... in thread [5] 1 run... in thread [2] 2 run... in thread [3] 3 run... in thread [4] 4 run... in thread [5] 5 run... in thread [1] 6 run... in thread [2] 7 run... in thread [3] 8 run... in thread [4] 9 run... in thread [5] 10 run... in thread [1] 11 run... in thread [2] 12 run... in thread [3] 13 run... in thread [4] 14 run... in thread [5] 15 run... in thread [1] 16 run... in thread [2] 17 run... in thread [3] 18 run... in thread [4] 19 run... in thread [5]