easyswoole中隊列的使用
隊列的使用其實在easyswoole官方文檔中已經有相關介紹,但是它只給了一個Redis驅動示例,以此來說明隊列的使用流程。實際開發中,這個還不能拿來直接用。下面記錄一下在實際項目中是如何使用隊列。
1、Queue介紹
Easyswoole封裝實現了一個輕量級的隊列,默認以Redis作為隊列驅動器。可以自己實現一個隊列驅動來實現用kafka或者啟動方式的隊列存儲。
從上可知,Queue並不是一個單獨使用的組件,它更像一個對不同驅動的隊列進行統一封裝的門面組件。
2、安裝
composer require easyswoole/queue
3、使用
隊列的使用流程分三個步驟:注冊隊列驅動器、設置消費進程、生產者投遞任務
1)注冊隊列驅動器
在EasySwooleEvent.php中的mainServeCreate()中注冊:
1 public static function mainServerCreate(EventRegister $register) 2 { 3 $confInstance = \EasySwoole\EasySwoole\Config::getInstance(); 4 5 // 注冊熱重啟 6 $swooleServer = ServerManager::getInstance()->getSwooleServer(); 7 $swooleServer->addProcess((new HotReload('HotReload', ['disableInotify' => false]))->getProcess()); 8 9 static::registerOrmPool($confInstance, 'MYSQL', 80, 160, 'default'); // 注冊v1.0的數據庫pool 10 static::registerOrmPool($confInstance, 'MYSQL_READ', 80, 160, 'read'); // 注冊v1.0的read數據庫pool 11 12 static::registerRedisPool($confInstance, 'REDIS', 10, 80, 'redis'); // 注冊redis鏈接池 13 14 //注冊隊列驅動器 15 $driver = static::registerQueue($confInstance, 'REDIS', 'queue-depart'); 16 DepartmentQueue::getInstance($driver); 17 18 //設置消費進程 19 ServerManager::getInstance()->addProcess(new SyncDepartment()); 20 }
2)設置消費進程
比如這里的:
ServerManager::getInstance()->addProcess(new SyncDepartment());
SyncDepartment類必須繼承AbstractProcess類,在重新實現的run()方法中自定義消費的相關業務處理
SyncDepartment.php代碼如下:
1 class SyncDepartment extends AbstractProcess 2 { 3 public function run($arg) 4 { 5 // TODO: Implement run() method. 6 go(function () { 7 \App\Queue\DepartmentQueue::getInstance()->consumer()->listen(function (\EasySwoole\Queue\Job $job) { 8 set_time_limit(0); 9 $data = $job->getJobData(); 10 $access_token = $data['access_token']; 11 $type = $data['type']; 12 DepartmentLogic::getAllDepartmentsFromZw($access_token,$type); 13 }); 14 }); 15 } 16 17 }
3)生產者投遞任務
場景舉例一:
場景舉例二:
關鍵代碼貼上:
1 $job = new Job(); 2 $data = ['access_token' => $access_token,'type'=>1]; 3 $job->setJobData($data); 4 DepartmentQueue::getInstance()->producer()->push($job);
參考鏈接:https://www.easyswoole.com/Cn/Components/Queue/install.html