Swoole http server + yaf, swoole socket server + protobuf 等小結


擁抱swoole, 擁抱更好的php

Swoole 是什么?

Yaf 是什么?

接觸swoole已經4年多了,一直沒有好好靜下心來學習。一直在做web端的應用,對網絡協議和常駐內存型服務器一竅不通。一不留神swoole已經從小眾擴展變成了流行框架,再不學習就完了

swoole + yaf

swoole server 的角色

還是先用swoole來做一個http server。
常見的php web應用,通常是apache+fast-cgi 或者 nginx + php-fpm。這里以php-fpm為例,我們配置nginx.conf的時候都要配置一個

location ~*\.php$ {
        root            /usr/share/nginx/html;
        fastcgi_index   index.php;
        fastcgi_pass    127.0.0.1:9000;
        include         fastcgi_params;
        ...
}

主要是這句 fastcgi_pass 127.0.0.1:9000;。就是說nginx 匹配到請求的uri是php后綴的時候,就把http request 轉交給127.0.0.1:9000處理了。如果你查看或者修改過php-fpm的配置文件,就知道9000是php-fpm的默認端口。那么到這里我們就清楚了,nginx把php文件交給php-fpm處理,php-fpm執行php腳本后返回http response給nginx。
接下來就好理解swoole http server 的作用以及應該扮演的角色。swoole http server 自己接受http請求,處理靜態文件和php腳本,然后返回給客戶端。swoole server 的配置項中有一個 document_root 用來告訴swoole 從哪里讀取靜態文件。當然,我們仍然可以用nginx來處理靜態文件,只把php腳本交給swoole處理,這里需要修改nginx.conf,用nginx的代理功能 proxy_pass

location ~ .(gif|jpg|jpeg|png|bmp|swf|css|js)$ {  
    root     /data/www/swoole-server/public;  
} 
        
location / {
    proxy_http_version 1.1;
    proxy_set_header Connection "keep-alive";
    proxy_set_header X-Real-IP $remote_addr;
    proxy_pass http://127.0.0.1:9501;
}

以上說了這么多,作為一個php web開發人員,應該可以大概理解平常寫的邏輯代碼,就是在swoole server 的 onRequest中。包括平常的PHP全局變量 _SERVER, _COOKIE _GET _POST 等等,都在swoole server 的回調函數的參數 Request 中。那么我們接下來在onRequest回調中,自然要解析 uri,然后做路由解析進入到具體的業務邏輯。最簡單的就是直接require uri的這個php腳本,也就是第一次接觸php的script模式。路由解析,加載控制器MVC渲染這些都是框架最擅長的事情,因此在onRequest中我們引入框架,返回結果給swoole response對象。

接入Yaf

Swoole 的worker子進程是實際的工作進程,在收到客戶端request的時候,swoole把request發送給worker,調用onRequest回調處理。如果我們在onRequest中引入Yaf 創建yaf app對象,由於onRequest是一個輪詢事件回調,worker會重復創建yaf app,yaf app實際上處於相同的上下文,因此會提示已經存在yaf application對象。而且,我們並不需要在這里重復讀取我們的配置文件。我們把yaf application 放在 onWorkerStart 中,一個worker 只產生一個yaf app對象,這個yaf對象輪詢處理request uri 。
Swoole Http Server onWorkerStart & onRequest

public function onWorkerStart($serv, $work_id) {
    // var_dump(get_included_files()); // 打印worker啟動前已經加載的php文件
    cli_set_process_title('swoole_worker_'.$work_id); // 設置worker子進程名稱
    Yaf\Registry::set('swoole_serv', $serv);
    $this->app = new Yaf\Application( APPLICATION_PATH . "conf/application.ini");
    $this->app->bootstrap();
}

public function onRequest($request, $response) {
    // print_r($request->server);
    $uri = $request->server['request_uri'];
    printf("[%s]get %s\n", date('Y-m-d H:i:s'), $uri);
    if ($uri == '/favicon.ico') {
        $response->status(404);
        $response->end();
    } else {
        Yaf\Registry::set('swoole_req', $request);
        Yaf\Registry::set('swoole_res', $response);
        // yaf 會自動輸出腳本內容,因此這里使用緩存區接受交給swoole response 對象返回
        ob_start();
        $this->app->getDispatcher()->dispatch(new Yaf\Request\Http($this->rewrite($uri))); // rewrite 中可以應用自己的規則
        $data = ob_get_clean();
        $response->end(data);
    }
}

如果你用過yaf,接下來只需要寫一個標准的yaf框架應用就可以了。yaf 框架的public文件夾不再需要入口文件 index.php,nginx 中也不再需要重寫uri規則,想想為啥

Swoole WebSocket

理解了http server 之后,我們再來創建一個websocket 服務器。websocket是web開發人員相對更熟悉的服務器,瀏覽器用javascript可以寫一個現成的客戶端。swoole websocket服務器與http 服務器大同小異,只不過onRequest()方法變成了onMessage()$response->end()變成了$server->push();
websocket是有狀態的長連接,http是無狀態的。無狀態意思是說http你只需要知道request是什么,然后給他response,不管是誰,請求幾次request,都是一樣的response。而有狀態的意思是,對於每一個請求,你需要分辨它是誰。因此對於相同的請求,可能會有不同的處理。websocket的每個客戶端鏈接有唯一標識fd,有點類似於會話session id 的意思。
與onRequest()方法類似,在onMessage()方法中,我們需要對客戶端發送的數據進行路由解析,然后想客戶端返回結果。不過這里不再是http協議的url請求格式了,是我們自己組裝的協議數據包,比如一個JSON結構,包括action,controller,module等等。我們仍然可以引入yaf框架,利用他的類庫自動加載Loader和路由Dispatcher機制,來處理客戶端請求,這里不再贅述。

public function onMessage(\Swoole\Websocket\Server $serv, \Swoole\Websocket\Frame $frame) {
    $route = json_decode($frame->data);
    if ($route->module) {
        try {
            ob_start();
            $this->app->getDispatcher()->dispatch(new Yaf\Request\Simple('cli', $route->module, $route->controller, $route->action, $route->params));
            $response = ob_get_clean();
        } catch (Exception $e) {
            // handle exception
        }
        $serv->push($frame->fd, $response);
    } else {
        printf("[%s] unknow message: %s\n", date('Y-m-d H:i:s'), $frame->data);
    }
}

PHP 使用 Protobuf 消息

上面我們使用了一個 JSON 協議傳輸websocket的例子,而 Protobuf 是 與JSON 類似的一種消息協議,除此之外,大家熟知的xml也是一種消息協議。ProtoBuf 是google開源的一種通信協議,既然是google的,那么別問,學就對了。

什么是ProtoBuf

相比JSON與XML,ProtoBuf的好處體現在

  • 解析快。為什么比XML,JSON的字符串解析快呢,google大神們說快那就是快,別問。
  • 節省包體大小。它把我們的消息結構體轉為二進制流進行傳輸,到了另一端再通過相同的結構體定義解析還原。
  • 天然的消息加密。傳輸過程中是二進制,xml或者json還需要進一步加解密才能保密。與之同時帶來的缺點,就是可讀性差。你看着一堆二進制串,在消息解析出來之前完全不知道發的是啥(個人認為並不是什么缺點)。

php 處理protobuf

用php處理protobuf我們需要用到兩個東西

我們在解析protobuf二進制流之前,是需要先指定對應的消息結構體的,因此我們不能只發送一個protobuf,至少應該再附帶一個消息ID。通過這個消息ID對應的結構體,我們才能解析具體的protobuf消息。
php處理二進制數據需要用到pack()unpack()。如果像我一樣沒接觸過的同學,可以臨時補補課,學習一下字節序什么的

PHP中pack、unpack的詳細用法

假設我們有一個int32位無符號消息ID,那么每個包體的結構就是 消息ID+protobuf。發送消息之前,我們進行數據打包

public function pack($msg_id, $msg_body) {
    $proto_class = Proto::GetResponseMessageProto($msg_id); // 由消息ID獲取對應的proto結構體類名
    if (!$proto_class ) {
        $this->err = 'No msg id matched.';
        return FALSE;
    }
    try {
        $msg_obj = new $proto_class ();
        // 定義消息
        $msg_obj->mergeFromArray($msg_body);
        // 打包protobuf
        $buf_str= $msg_obj->serializeToString();
        // 拼接消息體
        $this->bufString = pack('N', $msg_id). $buf_str;;
        return TRUE;
    } catch (\Exception $e){
        $this->err = $e->getMessage();
        return FALSE;
    }
}

數據打包相對簡單些,數據解包會有一點曲折。也就是在這里我感覺PHP在處理二進制數據上有點局限,也可能是我沒有掌握更高效的方法。如果有的話,還望各位讀者不吝賜教。

public function unpack($msg) {
    $data = unpack('Nmsg_id/a*msg_body', $msg);
    $msg_id = $data['msg_id'];
    // 暫時把protobuf解析成字符串
    $buf_str = $data['msg_body'];
    $proto_class = Proto::GetRequestMessageProto($msg_id);
    if (!$proto_class) {
        $this->err = 'No msg id matched.';
        return FALSE;
        // handle error.
    }
    try {
        $msg_obj = new $proto_class();
        // 上面已經把probuf解析成了字符串,因此這里需要再轉化為二進制
        $msg_obj->mergeFromString(pack('a*', $buf_str));
        print_r($msg_obj->serializeToJsonString()); // protobuf 類的讀取接口比較少,建議去看看源碼
    } catch (\Exception $e) {
        $this->err = $e->getMessage();
        return FALSE;
        // handle invalid msg
//            throw new MessageParseException('Invalid message');
    }
    $this->msg_obj = $msg_obj->serializeToJsonString(); // 消息體
    $this->msg_id = $msg_id; // 消息ID
    return TRUE;
}

接收消息的處理

// onMessage
public function onMessage(swoole_websocket_server $serv, swoole_websocket_frame $frame) {
    $msg = new \Message\Message();
    if ($msg->unpack($frame->data)) {
        printf("[%s] receive data: %d %s\n", date('Y-m-d H:i:s'), $msg->msg_id, $msg->msg_obj);
        // dispatcher
        list($module, $controller, $action) = $this->dispatch($msg->msg_id); // 自己的消息路由,就是某一個消息ID交給哪個控制器進行處理
        try {
            ob_start();
            $this->app->getDispatcher()->dispatch(new Yaf\Request\Simple('cli', $module, $controller, $action, json_decode($msg->msg_obj, TRUE)));
            $response = ob_get_clean();
            $code = 0;
        } catch (Exception $e) {
            $response = json_encode(['err' => $e->getMessage()]);
            $code = -1;
        }
        print_r($response);
        if (!$msg->pack($msg->msg_id, $response)) {
            print_r('msg pack err:'. $msg->err);
        } else {
            $serv->push($frame->fd, $msg->bufString, WEBSOCKET_OPCODE_BINARY); // websocket 發送二進制
        }
    } else {
        printf("[%s] unpack err: %s\n", date('Y-m-d H:i:s'), $frame->data);
        print_r('msg unpack err:'. $msg->err);
    }
}

附前端javascript的示例

javascript處理相對來說還更簡單,用到的是 ArrayBuffer

var protoRoot = null;
protobuf.load('/data/game.proto', function(err, root) {
    if (err)
        throw err;
    protoRoot = root;
});
function writeBuf(msgid, buf) {
    // buf 是protobuf消息的二進制結果
    var length = buf.length;
    var buffer = new ArrayBuffer(buf.length + 4); // 消息ID占4位
    var dv = new DataView(buffer);
    dv.setUint32(0, msgid, false); // 大端字節序
    for (let i=0;i<buf.length;i++) {
        dv.setInt8(4+i, buf[i]); // 逐字節寫入buffer
    }
    console.log(buffer);
    return buffer;
}
function readBuf(buf) {
    var dv = new DataView(buf);
    var msgid = dv.getUint32(0, false);
    var buf = new Uint8Array(buf, 4); // 截取消息ID后面的字節,交給protobuf解析
    return [msgid, buf];
}
function Request_Message(msg, req, callback) {
    // 將客戶端請求的消息msg轉成protobuf 
    var RequestMessage = protoRoot.lookupType("dapianzi."+req); // 這里需要加上命名空間
    var errMsg = RequestMessage.verify(msg);
    if (errMsg)
        throw Error(errMsg);
    var message = RequestMessage.fromObject(msg);
    var buffer = RequestMessage.encode(message).finish();
    callback(buffer); // 下一步調用writeBuf 產生消息包,發送給服務器
}
function Response_Message(buf, res, callback) {
    // buf 是readBuf()中返回的二進制串,這里交給protobuf解析成消息體
    var ResponseMessage = protoRoot.lookupType("dapianzi."+res);
    var message = ResponseMessage.decode(buf);
    var object = ResponseMessage.toObject(message, {
        longs: String,
        enums: String,
        bytes: String,
    });
    callback(object); // 進行客戶端邏輯
}

后記

在websocket服務器中使用yaf還是覺得比較牽強,畢竟yaf是一個web框架,使用它僅僅是可以比較方便的使用lib自動加載,以及路由映射。因此,還是得自己想辦法寫一個簡單的框架,實現消息路由,類庫加載,事件注冊,和全局對象的容器管理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM