官方建議分離 workerman和mvc框架的結合,我去,這不是有點腦缺氧嗎?
大量的業務邏輯,去獨立增加方法和類庫在寫一次,實際業務中是不現實和不實際的
gateway增加一些這方面的工作,但是我看了源碼之后,就發現還是只能自己做
先增加composer require workerman/workerman 或者walkor/workerman ,但是官方的github是 walkor/workerman,注意一下
可以去 https://packagist.org查看是否有包
首先結合Console做命令
建立一個Command
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Workerman\Worker; use App\Work\WorkermanWork; class Workerman extends Command { protected $taskserver; /* * 操作參數 * 注意只能在 * start 啟動 * stop 停止 * relaod 只能重啟邏輯代碼,核心workerman_init無法重啟,注意看官方文檔 * status 查看狀態 * connections 查看連接狀態(需要Workerman版本>=3.5.0) * */ protected $action = array('start', 'stop', 'reload', 'status', 'connections'); /** * The name and signature of the console command. * * @var string */ protected $signature = 'Workerman {action}'; /** * The console command description. * * @var string */ protected $description = 'Workerman'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * * @return mixed * * 注意 * */ public function handle() { $action = $this->argument('action'); if (!in_array($action, $this->action)) { $this->error('Error Action'); exit; } //初始化workerman WorkermanWork::workerman_init($action); } }
注冊到Kernel
class Kernel extends ConsoleKernel { /** * The Artisan commands provided by your application. * * @var array */ protected $commands = [ \App\Console\Commands\Workerman::class, ];
WorkermanWork的內容
<?php namespace App\Work; use App\Work\BaseWork as Base; use Illuminate\Support\Facades\DB; use App\Work\CommonWork; use Workerman\Worker; use Workerman\Lib\Timer; use App\Models\OperationLog; use App\Models\Users; class WorkermanWork extends Base { static $connection_count = 0; public static function workerman_init($action = null) { global $argv; $argv[0] = 'workerman:websocket'; $argv[1] = $action; $argv[2] = '-d'; // 心跳 define('HEARTBEAT_TIME', 30); //初始化 $worker = new Worker("websocket://172.17.1.247:9099"); $worker->name = 'MessagePushWorker'; //linux 用戶線上是www // $worker->user = 'www'; //守護模式信息輸出文件地址 // $worker->stdoutFile = "./workerman.log"; //工作進程總數 測試環境4個 $worker->count = 4; //正式環境 // $ws->count = 10; //建立鏈接 處理邏輯 $worker->onConnect = function($connection) { // 有新的客戶端連接時,連接數+1 self::$connection_count++; self::onConnect($connection); }; //接受消息 處理邏輯 $worker->onMessage = function($connection, $data) { self::onMessage($connection, $data); }; //關閉鏈接 處理邏輯 $worker->onClose = function($connection) { // 客戶端關閉時,連接數-1 self::$connection_count--; self::onClose($connection); }; // 進程啟動后設置一個30秒運行一次的定時器 // $worker->onWorkerStart = function($worker) { // Timer::add(30, function()use($worker) { // // }); // }; // 開始 Worker::runAll(); } //建立鏈接 處理邏輯 public static function onConnect($connection) { //測試5秒一次 線上30秒一次 Timer::add(10, function() use($connection) { if (!empty($_SESSION['user_id'])) { $Users = Users::where('id', $_SESSION['user_id'])->first(); if (!empty($Users)) { $message_count = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->count(); $message_list = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->orderBy('id', 'desc')->get(['content', 'id', 'create_time'])->toArray(); if (!empty($message_list)) { $connection->send(json_encode(['code' => 200, 'msg' => '請求成功', 'data' => $message_list, 'connections' => self::$connection_count, 'message_count' => $message_count])); } } // else { // $connection->send(json_encode(['code' => 201, 'msg' => '無效user_id', 'connections' => self::$connection_count])); // } } // else { // $retrun_data = json_encode(['code' => 201, 'msg' => 'session user_id不存在', 'connections' => self::$connection_count]); // $connection->send($retrun_data); // } }); } //接受消息 處理邏輯 public static function onMessage($connection, $data) { //解析數據,非合法的json數據不處理 if (!empty($data)) { if (is_json($data)) { $data = json_decode($data, true); switch ($data['type']) { // 客戶端回應服務端的心跳 case 'ping': $connection->send(json_encode(['code' => 200, 'msg' => '服務存活', 'data' => [], 'connections' => self::$connection_count])); case 'login': $Users = Users::where('id', $data['user_id'])->first(); if (empty($Users)) { $connection->send(json_encode(['code' => 201, 'msg' => '用戶ID無效或者錯誤', 'data' => [], 'connections' => self::$connection_count])); } else { $_SESSION['user_id'] = $data['user_id']; $connection->send(json_encode(['code' => 200, 'msg' => '登錄成功', 'data' => [], 'connections' => self::$connection_count])); } } } } else { $connection->send(json_encode(['code' => 201, 'msg' => '數據請求為空', 'data' => [], 'connections' => self::$connection_count])); } // $data = $data . '----總共有連接數:' . self::$connection_count; // $connection->send("return_data: $data "); } //關閉鏈接 處理邏輯 public static function onClose($connection) { } }
建立socket的時間設置一個10秒的定時器,這個基於session的控制,登錄之后立即請求發送
{"type":"login","user_id":"24"}
因為如果登錄,就檢索OperationLog表里屬於這個用戶ID的消息,每10秒推送一次數據
運行 命令
php artisan Workerman start 啟動
$argv[2] = '-d';
注釋掉就是測試模式,加上就是守護模式,就是線上使用的
2019年7月12日09:43:56
注意:上面是臨時測試代碼。業務代碼使用try catch處理異常和錯誤