Pool對象是多個Worker對象的容器,同時也是它們的控制器,對Worker功能更高抽象。
從結果可以看出,第一條記錄跟最后一條是相同的,再沒有pool->submit之前$sqls數組中的對象都是正確的,submit之后第一個對象的數據就改變了,不知道是不是pthreads的BUG。
第一條記錄為什么會有問題,前面我已經說過了。這里我們看20個MyWork對象,它們順序的加入到5個MyWorker對象中,如果第一條記錄沒有問題的話,它們分別加入到1,2,3,4,5的MyWorker中,然后遠行run方法。
比如Worker是河,而線程是運行在河里的船。Pool則是管理着多條河。
<?php
//繼承Collectable垃圾收集類,好讓Pool::collect進行收集
class Sql extends Collectable {
private $sql = '';
private $data = array();
public function __construct($sql) {
$this->sql = $sql;
}
public function run() {
$db = $this->worker->getDb();
$res = mysql_query($this->sql, $db);
$tmp = array();
while($row = mysql_fetch_assoc($res)) {
//這里不能使用$this->data[] = $row;這種方式。
$tmp[] = $row;
}
$this->data = $tmp;
//這里工作完后,設置為垃圾
//在Pool::collect中isGarbage()判斷時則為真
$this->setGarbage();
}
public function getData() {
return $this->data;
}
}
class SqlWorker extends Worker {
protected static $db = null;
public function getDb() {
if(!self::$db) {
self::$db = mysql_connect('127.0.0.1', 'root', '');
mysql_select_db('test', self::$db);
}
return self::$db;
}
}
//這里創建5個Worker對象的Pool線程池
$pool = new Pool(5, 'SqlWorker');
//我們創建20個Sql線程對象,並提交到Pool的Worker中
$sqls = array();
for($ix = 0; $ix < 20; ++$ix) {
$sql = new Sql("select * from test order by id limit {$ix},1");
$sqls[] = $sql;
//$pool->submit($sql);不要在這里submit
}
//注意,這里循環提交$sql
//如果把$pool->submit放到前面的for循環內,會出現一個錯誤
//第一個Sql對象的sql語句會跟最后一個相同,導致結果出現問題
foreach($sqls as $sql) {
//這里的submit方法有問題,它會修改$sqls
//導致第一個Sql對象與最后一個相同
//不知是不是BUG
$pool->submit($sql);
}
//等待隊列中執行完成
$pool->shutdown();
$ret = array();
foreach($sqls as $sql) {
$ret[] = $sql->getData();
}
file_put_contents('ret.txt', var_export($ret, true));
//回收已完成的對象
$pool->collect(function($sql){
return $sql->isGarbage();
});
array (
0 =>
array (
0 =>
array (
'id' => '20',
'name' => 'mmm',
),
),
1 =>
array (
0 =>
array (
'id' => '2',
'name' => '222',
),
),
2 =>
array (
0 =>
array (
'id' => '3',
'name' => '333',
),
),
3 =>
array (
0 =>
array (
'id' => '4',
'name' => '444',
),
),
4 =>
array (
0 =>
array (
'id' => '5',
'name' => '555',
),
),
5 =>
array (
0 =>
array (
'id' => '6',
'name' => '666',
),
),
6 =>
array (
0 =>
array (
'id' => '7',
'name' => '777',
),
),
7 =>
array (
0 =>
array (
'id' => '8',
'name' => '888',
),
),
8 =>
array (
0 =>
array (
'id' => '9',
'name' => '999',
),
),
9 =>
array (
0 =>
array (
'id' => '10',
'name' => 'aaa',
),
),
10 =>
array (
0 =>
array (
'id' => '11',
'name' => 'bbb',
),
),
11 =>
array (
0 =>
array (
'id' => '12',
'name' => 'ccc',
),
),
12 =>
array (
0 =>
array (
'id' => '13',
'name' => 'ddd',
),
),
13 =>
array (
0 =>
array (
'id' => '14',
'name' => 'eee',
),
),
14 =>
array (
0 =>
array (
'id' => '15',
'name' => 'fff',
),
),
15 =>
array (
0 =>
array (
'id' => '16',
'name' => 'ggg',
),
),
16 =>
array (
0 =>
array (
'id' => '17',
'name' => 'vvv',
),
),
17 =>
array (
0 =>
array (
'id' => '18',
'name' => 'hhh',
),
),
18 =>
array (
0 =>
array (
'id' => '19',
'name' => 'nnn',
),
),
19 =>
array (
0 =>
array (
'id' => '20',
'name' => 'mmm',
),
),
)
上述代碼我們通過創建一個包含5個SqlWorker對象的pool,然后創建20個Sql對象加入到pool中。
當然我們的Sql類並不一定非要繼承自Collectable類,我們也可自定義判斷什么時候可回收。
<?php
//繼承Threaded類,Threaded提供了隱式的線程安全機制
//這個對象中的所有操作都是線程安全的
class MyWork extends Threaded {
private $name = '';
private $do = false;
private $data = '';
public function __construct($name) {
$this->name = $name;
}
public function run() {
$this->data = "{$this->name} run... in thread [" . $this->worker->getName() . "] \r\n";
//通過do來判斷是否完成
//如果為true,則讓Pool::collect回收
$this->do = true;
}
public function isDo() {
return $this->do;
}
public function getData() {
return $this->data;
}
}
class MyWorker extends Worker {
public static $name = 0;
public function __construct() {
self::$name++;
}
public function run() {
}
public function getName() {
return self::$name;
}
}
$pool = new Pool(5, 'MyWorker');
$works = array();
for($ix = 0; $ix < 20; ++$ix) {
$work = new MyWork($ix);
$works[] = $work;
}
foreach($works as $work) {
$pool->submit($work);
}
$pool->shutdown();
foreach($works as $work) {
echo $work->getData();
}
//回收已完成的對象
$pool->collect(function($work){
//我們通過自定義函數isDo來判斷對象是否執行完畢
return $work->isDo();
});
執行結果如下:
19 run... in thread [5] 1 run... in thread [2] 2 run... in thread [3] 3 run... in thread [4] 4 run... in thread [5] 5 run... in thread [1] 6 run... in thread [2] 7 run... in thread [3] 8 run... in thread [4] 9 run... in thread [5] 10 run... in thread [1] 11 run... in thread [2] 12 run... in thread [3] 13 run... in thread [4] 14 run... in thread [5] 15 run... in thread [1] 16 run... in thread [2] 17 run... in thread [3] 18 run... in thread [4] 19 run... in thread [5]
