laravel整合workerman做消息推送系統


官方建議分離 workerman和mvc框架的結合,我去,這不是有點腦缺氧嗎?

大量的業務邏輯,去獨立增加方法和類庫在寫一次,實際業務中是不現實和不實際的

gateway增加一些這方面的工作,但是我看了源碼之后,就發現還是只能自己做

 先增加composer require workerman/workerman  或者walkor/workerman ,但是官方的github是 walkor/workerman,注意一下

可以去 https://packagist.org查看是否有包

 

首先結合Console做命令

建立一個Command

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Workerman\Worker;
use App\Work\WorkermanWork;

class Workerman extends Command {

    protected $taskserver;
    /*
     * 操作參數
     * 注意只能在
     * start 啟動
     * stop 停止
     * relaod  只能重啟邏輯代碼,核心workerman_init無法重啟,注意看官方文檔
     * status 查看狀態
     * connections 查看連接狀態(需要Workerman版本>=3.5.0)
     * 
     */
    protected $action = array('start', 'stop', 'reload', 'status', 'connections');

    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'Workerman {action}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Workerman';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct() {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     * 
     * 注意
     * 
     */
    public function handle() {
        $action = $this->argument('action');

        if (!in_array($action, $this->action)) {
            $this->error('Error Action');
            exit;
        }
        //初始化workerman
        WorkermanWork::workerman_init($action);
    }

}

注冊到Kernel

class Kernel extends ConsoleKernel {

    /**
     * The Artisan commands provided by your application.
     *
     * @var array
     */
    protected $commands = [

        \App\Console\Commands\Workerman::class,
    ];

 

WorkermanWork的內容

<?php

namespace App\Work;

use App\Work\BaseWork as Base;
use Illuminate\Support\Facades\DB;
use App\Work\CommonWork;
use Workerman\Worker;
use Workerman\Lib\Timer;
use App\Models\OperationLog;
use App\Models\Users;

class WorkermanWork extends Base {

    static $connection_count = 0;

    public static function workerman_init($action = null) {
        global $argv;

        $argv[0] = 'workerman:websocket';
        $argv[1] = $action;
        $argv[2] = '-d';
//        心跳
        define('HEARTBEAT_TIME', 30);
        //初始化
        $worker = new Worker("websocket://172.17.1.247:9099");
        $worker->name = 'MessagePushWorker';
        //linux 用戶線上是www
//        $worker->user = 'www';
        //守護模式信息輸出文件地址
//        $worker->stdoutFile = "./workerman.log";
        //工作進程總數 測試環境4個
        $worker->count = 4;
        //正式環境
//        $ws->count = 10;
        //建立鏈接 處理邏輯
        $worker->onConnect = function($connection) {
//             有新的客戶端連接時,連接數+1
            self::$connection_count++;
            self::onConnect($connection);
        };
        //接受消息 處理邏輯
        $worker->onMessage = function($connection, $data) {

            self::onMessage($connection, $data);
        };
        //關閉鏈接 處理邏輯
        $worker->onClose = function($connection) {
//            客戶端關閉時,連接數-1
            self::$connection_count--;
            self::onClose($connection);
        };

        // 進程啟動后設置一個30秒運行一次的定時器
//        $worker->onWorkerStart = function($worker) {
//            Timer::add(30, function()use($worker) {
//                
//            });
//        };
        // 開始
        Worker::runAll();
    }

    //建立鏈接 處理邏輯
    public static function onConnect($connection) {
        //測試5秒一次 線上30秒一次
        Timer::add(10, function() use($connection) {

            if (!empty($_SESSION['user_id'])) {
                $Users = Users::where('id', $_SESSION['user_id'])->first();
                if (!empty($Users)) {
                    $message_count = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->count();
                    $message_list = OperationLog::where('user_id', $_SESSION['user_id'])->where('is_read', 1)->orderBy('id', 'desc')->get(['content', 'id', 'create_time'])->toArray();

                    if (!empty($message_list)) {
                        $connection->send(json_encode(['code' => 200, 'msg' => '請求成功', 'data' => $message_list, 'connections' => self::$connection_count, 'message_count' => $message_count]));
                    }
                }
//                else {
//                    $connection->send(json_encode(['code' => 201, 'msg' => '無效user_id', 'connections' => self::$connection_count]));
//                }
            }
//            else {
//                $retrun_data = json_encode(['code' => 201, 'msg' => 'session user_id不存在', 'connections' => self::$connection_count]);
//                $connection->send($retrun_data);
//            }
        });
    }

    //接受消息 處理邏輯
    public static function onMessage($connection, $data) {
        //解析數據,非合法的json數據不處理
        if (!empty($data)) {
            if (is_json($data)) {
                $data = json_decode($data, true);
                switch ($data['type']) {
                    // 客戶端回應服務端的心跳
                    case 'ping':
                        $connection->send(json_encode(['code' => 200, 'msg' => '服務存活', 'data' => [], 'connections' => self::$connection_count]));
                    case 'login':
                        $Users = Users::where('id', $data['user_id'])->first();
                        if (empty($Users)) {
                            $connection->send(json_encode(['code' => 201, 'msg' => '用戶ID無效或者錯誤', 'data' => [], 'connections' => self::$connection_count]));
                        } else {
                            $_SESSION['user_id'] = $data['user_id'];
                            $connection->send(json_encode(['code' => 200, 'msg' => '登錄成功', 'data' => [], 'connections' => self::$connection_count]));
                        }
                }
            }
        } else {
            $connection->send(json_encode(['code' => 201, 'msg' => '數據請求為空', 'data' => [], 'connections' => self::$connection_count]));
        }


//        $data = $data . '----總共有連接數:' . self::$connection_count;
//        $connection->send("return_data: $data ");
    }

    //關閉鏈接 處理邏輯
    public static function onClose($connection) {
        
    }

}

 

建立socket的時間設置一個10秒的定時器,這個基於session的控制,登錄之后立即請求發送

{"type":"login","user_id":"24"}

因為如果登錄,就檢索OperationLog表里屬於這個用戶ID的消息,每10秒推送一次數據

運行 命令 

php artisan Workerman start 啟動

$argv[2] = '-d';

注釋掉就是測試模式,加上就是守護模式,就是線上使用的

2019年7月12日09:43:56

 注意:上面是臨時測試代碼。業務代碼使用try catch處理異常和錯誤


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM