swoole之任務和定時器


一、代碼

 

<?php

use Swoole\Server;

/**
 * 面向對象的形式 + task + timer
 */
class WebSocket
{
    public $server;

    public function __construct()
    {
        $this->server = new Swoole\WebSocket\Server("0.0.0.0", 9501);
        $this->server->set([
            'worker_num' => 2,
            'task_worker_num' => 2
        ]);
        // 注冊事件回調
        $this->server->on('open', [$this, 'onOpen']);
        $this->server->on('message', [$this, 'onMessage']);
        $this->server->on('task', [$this, 'onTask']);
        $this->server->on('finish', [$this, 'onFinish']);
        $this->server->on('close', [$this, 'onClose']);
    }

    public function run()
    {
        $this->server->start();
    }

    public function onOpen(Server $server, $request)
    {
        echo "server: handshake success with fd{$request->fd}" . PHP_EOL;
    }

    /**
     * 監聽我是消息事件
     * @param \Swoole\WebSocket\Server $server
     * @param \Swoole\Websocket\Frame $frame 包含了客戶端發來的數據幀信息
     */
    public function onMessage(Swoole\WebSocket\Server $server, Swoole\Websocket\Frame $frame)
    {
        echo "receive from {$frame->fd}:{$frame->data}, opcode:{$frame->opcode}, fin:{$frame->finish}" . PHP_EOL;

        $data = [
            'task' => 1,
            'fd' => $frame->fd,
        ];
        // 投放一個 異步 onTask任務
        $server->task($data);

        if ($frame->fd == 1) {
            $times = 3;
            // 開始一個定時任務 每隔2s執行
            swoole_timer_tick(2000, function ($timerId) use ($server, $frame, &$times) {
                if ($times > 0) {
                    echo '1s: timerId:' . $timerId . PHP_EOL;
                    $times--;
                } else {
                    swoole_timer_clear($timerId);
                    $server->push($frame->fd, 'timer tick over');
                }
            });
        }

        // 指定的時間后執行 比下面的push后執行(異步)10s
        swoole_timer_after(10000, function () use ($server, $frame) {
            $server->push($frame->fd, "task finished " . date('Y-m-d H:i:s'));
        });

        // 不會堵塞 返回給客戶端信息
        $server->push($frame->fd, "服務器返回數據");
    }

    /**
     * 處理一些耗時的任務
     * @param Server $serv
     * @param int $taskId 任務ID
     * @param int $srcWorkerId 來自於哪個worker進程
     * @param mixed $data 異步投遞的數據 任務的內容
     * @return string
     */
    public function onTask(Server $serv, $taskId, $srcWorkerId, $data)
    {
        // 處理一些耗時的任務
        // print_r($data);
        sleep(10);
        if (isset($data['fd'])) {
            $serv->push($data['fd'], 'task start:' . date("Y-m-d H:i:s"));
        }

        // onFinsh 告訴worker進程
        return "我是任務處理完后回調" . PHP_EOL;
    }

    /**
     * 任務處理完后執行
     * @param Server $serv
     * @param int $taskId
     * @param string $data onTask返回的內容
     */
    public function onFinish(Server $serv, $taskId, $data)
    {
        echo 'taskId:' . $taskId . ' && task finished && data is ' . $data;
    }

    /**
     * TCP客戶端連接關閉
     * @param Server $ser
     * @param int $fd
     * @param int $reactorId 來自那個reactor線程
     */
    public function onClose(Server $ser, $fd, $reactorId)
    {
        echo "client {$fd} closed\n";
    }
}

$server = new WebSocket();
$server->run();

客戶端用的還是原來的 ws_client.html

 

 服務器輸出:

 

 

 

文檔: https://wiki.swoole.com/wiki/page/397.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM