Cocos2d-JS/Ajax用Protobuf與NodeJS/Java通信


原文地址:http://www.iclojure.com/blog/articles/2016/04/29/cocos2d-js-ajax-protobuf-nodejs-java

 

Google的Protobuf

Protobuf全稱為“Protocol Buffers”,是Google開源出來的一個序列化協議並配備多種編程語言的實現(Java、C、C++、Python等,甚至JavaScript、ActionScript都有對應的實現),其本質是按照協議規范編寫proto文件,該proto文件內容由若干個message消息體組成,而message消息體是由編程語言中常用的數據類型(int、long、String等)對應的Protobuf字段類型組合而成的,Protobuf的作用是可以幫你把定義好的message消息體按協議編碼(Encode)轉為二進制字節流(byte[]),反之亦可幫你把已編碼的byte[]字節流再解碼(Decode)還原回來。

操作步驟

舉個Java的例子,你想使用Protobuf把Java中的對象轉成byte[]的話,需要如下這幾個步驟:

  1. 根據Protobuf規范編寫proto文件:該proto文件內需要定義一個與Java對象匹配的message消息體,該message消息體中的字段類型與Java對象的成員變量字段類型一一對應上
  2. 利用Protobuf對應Java語言的protoc.exe生成工具去根據第1步定義的proto文件生成對應的Protobu編解碼Java類
  3. 使用第2步生成的Protobuf編解碼Java類對Java對象做編解碼的工作,例如編碼Java對象為byte[]或者解碼byte[]為Java對象

這里用Java代碼舉例(GitHub上的代碼在這里)來解釋說明前面介紹使用Protobuf的步驟:

  1. 編寫proto文件在其中定義message消息體,這里我們定義了一個名為StudentProto的消息體
package tutorial; option java_package = "com.whg.protobuf"; option java_outer_classname = "StudentProtoBuf"; message StudentProto { optional int64 id = 1; optional int32 age = 2; optional bool sex = 3; optional string name = 4; } 
  1. 執行protoc.exe來生成對應的Protobu編解碼Java類,這里寫個名為exec-protoc.bat的腳本來執行protoc.exe

protoc -I=../proto --java_out=../proto ../proto/*.proto

pause

其中-I代表Input輸入proto文件的目錄,而--java_out代表輸出Java類的目錄,最后的參數是一個通配符匹配輸入proto文件的目錄下的所有以.proto的文件,即達到批量生成Protobuf Java類的效果

  1. 然后我們編寫與message消息體對應的Java對象,例如這里的Student類對應的就是StudentProto消息體,注意字段名和類型一一對應上了,其實Java字段名可以不必與消息體名稱一樣,但這么寫也算是一種約定吧,一目了然嘛
public class Student { private long id; private int age; private boolean sex; private String name; public Student() { } public Student(StudentProto proto){ id = proto.getId(); age = proto.getAge(); sex = proto.getSex(); if(proto.hasName()){ name = proto.getName(); } } public byte[] toByteArray(){ StudentProto.Builder builder = StudentProto.newBuilder(); builder.setId(id); builder.setAge(age); builder.setSex(sex); if(name != null){ builder.setName(name); } return builder.build().toByteArray(); } public static Student parse(byte[] bytes){ StudentProto proto = null; try { proto = StudentProto.parseFrom(bytes); } catch (InvalidProtocolBufferException ex) { throw new IllegalArgumentException(ex); } return new Student(proto); } //省略setter/getter方法 } 

然后具體使用就像下面這樣在byte[]和Java對象之間互相編解碼轉換了

public static void main(String[] args) { Student student = new Student(); student.setId(300); student.setAge(30); byte[] bytes = student.toByteArray(); Parser.printHex(bytes); Parser.printInt(bytes); Parser.printBinary(bytes); Student student2 = Student.parse(bytes); System.out.println(student2.getId()); System.out.println(student2.getAge()); } 

有了Protobuf這個序列化byte[]編解碼的利器,相較於文本協議的Xml和Json來說的話,相當於做了很大的壓縮!所以無論是在需要序列化存儲的場景,還是在網絡序列化傳輸場景,Protobuf都不失為一個好抉擇!

序列化傳輸

例如在網絡傳輸的場景下,我們可以用Protobuf在發送端把Java對象編碼為byte[]后發送出去,然后在接收端再用Protobuf解碼(Decode)byte[]來還原成對應的Java對象供程序使用,這樣就可以在網絡傳輸方面極大的縮減網絡流量了。

序列化存儲

而在存儲的場景下,無論是基於內存/磁盤/RMDB的存儲,都能節約成本:

  1. 內存——可以在使用Protobuf編碼為byte[]后存儲進memcached/redis中
  2. 磁盤——同樣可以用Protobuf編碼為byte[]后存儲成文件,需要存儲多個byte[]則需要加入分隔符用於區分單個,類似網絡傳輸中的成幀/解析
  3. RMDB——形如MySql的RMDB,也都有支持byte[]二進制存儲的Blob字段

總結與思考

總結一下,因為Protobuf已經message轉換為二進制字節流byte[]了,而計算機對二進制字節流的操作最在行了,所以除了壓縮節約成本外,其可用性也接近計算機底層處理的本質了:因為無論是什么東西在計算機內的表示都是字節(即8個二進制位)!字符串String可以轉為byte[]、圖片可以轉為byte[]、任何東西想在計算機內表示都必須是byte[]!

前端使用Protobuf發送/接收二進制數據

前面提到過Protobuf網絡傳輸的場景,這里我們就來看看:前端(Cocos2d-JS/Ajax)如何使用Protobuf與后端(NodeJS/Java)通信?由於Protobuf能把proto文件定義的消息體轉換為二進制字節流(byte[]),所以問題就變成:前端(Cocos2d-JS/Ajax)如何使用二進制與后端(NodeJS/Java)通信?

網絡通信一般分為2類:短連接和長連接。短連接一般說的是基於HTTP協議的請求/響應的連接;而長連接則是基於TCP/IP協議的3次握手不隨意中斷的連接;當然其實HTTP協議是基於TCP/IP協議的,只是請求/響應這種模式令其相較TCP/IP來說更“隨意”中斷了一點,但中斷的后果是太浪費底層TCP/IP連接了,所以之后的HTTP1.1以及2.0為了減少浪費提出了Keep-Alive及多路復用等改進,甚至演化出了Html5的WebSocket協議這種基於HTTP協議升級版的全雙通長連接,發展趨勢軌跡:TCP/IP長連接 --> HTTP短連接 --> WebSocket長連接;這也令我想起后端服務器處理請求IO模型的進化軌跡:單線程 --> 多線程 --> 事件驅動單線程

下面根據這2個分類連接說說基於JavaScript的前端(Cocos2d-JS/Ajax)如何用Protobuf與后端(NodeJS/Java)通信

短連接——HTTP

無論是Cocos2d-JS還是Ajax,其進行HTTP通信都是基於JavaScript的XMLHttpRequest對象!所以只要搞清楚XMLHttpRequest對象如何與后端通信發送/接收二進制即可。使用如下幾步來操作XMLHttpRequest發送Protobuf二進制數據:

1. 獲取XMLHttpRequest

Cocos2d-JS里就有XMLHttpRequest對象的支持,直接使用cc.loader.getXMLHttpRequest()即可獲取到;而Ajax里面的XMLHttpRequest對象由於瀏覽器支持不同,可以使用如下代碼獲取

function createXMLHttpRequest(){ if(window.ActiveXObject){ //IE only return new ActiveXObject("Microsoft.XMLHTTP"); }else if(window.XMLHttpRequest){ //others return new XMLHttpRequest(); } } 

2. 設置XMLHttpRequest頭部支持Protobuf協議

無論是Cocos2d-JS還是Ajax,其XMLHttpRequest對象本質都一樣,所以如下open(打開)url以及setRequestHeader(設置請求頭部)等代碼都是通用的

var xhr = cc.loader.getXMLHttpRequest(); // or use createXMLHttpRequest() in Ajax xhr.open("POST", "http://localhost:3000/protobuf"); xhr.setRequestHeader("Content-Type","application/x-protobuf"); xhr.setRequestHeader("Accept","application/x-protobuf"); if (xhr.overrideMimeType){ //這個是必須的,否則返回的是字符串,導致protobuf解碼錯誤 //具體見http://www.ruanyifeng.com/blog/2012/09/xmlhttprequest_level_2.html xhr.overrideMimeType("text/plain; charset=x-user-defined"); } 

3. 前端使用protobuf.js來編解碼Protobuf

protobuf.js是GitHub上使用JavaScript實現Protobuf Buffer協議編解碼的項目,這里我們使用它來作為前端JavaScript編解碼Protobuf的利器

3.1 引入protobuf.js

這里引入的protobuf.js版本為5.0.1,其中主要使用到了long.js、bytebuffer.js和protobuf.js這3個JS文件,如果使用NodeJS的話,直接在package.json添加dependencies依賴配置

"protobufjs": "~5.0.1"

然后使用

npm install

即可完成對該依賴的下載,在node_modules文件夾下找到那3個JS文件拷貝到前端JS文件夾,然后在前端的index.html中引入protobuf.js

<script src="../protobuf/long.js"></script> <script src="../protobuf/bytebuffer.js"></script> <script src="../protobuf/protobuf.js"></script> <script> if (typeof dcodeIO === "undefined" || !dcodeIO.ProtoBuf) { throw(new Error("ProtoBuf.js is not present. Please see www/index.html for manual setup instructions.")); } </script> 

3.2 使用protobuf.js

引入protobuf.js后就可以在JS代碼中使用protobuf.js了,我們這里用於測試的TestProtobuf.proto文件如下

package TestProtobuf; option java_package = "com.why.game.protobuf"; option java_outer_classname = "TestProtobuf"; message TestProto{ optional int32 id = 1; optional string name = 2; } 

然后在JS中加載該TestProtobuf.proto文件,並把該proto文件中定義的TestProto消息體賦值為JS局部變量TestProto

var ProtoBuf = dcodeIO.ProtoBuf, TestProtobuf = ProtoBuf.loadProtoFile("../protobuf/TestProtobuf.proto").build("TestProtobuf"), TestProto = TestProtobuf.TestProto; 

如此一來我們就可以用TestProtobuf.proto文件中定義的消息體來發送/接收二進制了:發送的時候使用XMLHttpRequest對象的send方法發送經由TestProto編碼(encode)后的buffer數組(本質也是二進制字節流),接收的時候同樣使用TestProto解碼(decode)接收到的二進制數據

xhr.onreadystatechange = function(){ if (xhr.readyState == 4 && xhr.status == 200) { var data = xhr.responseText; var protobufResp = TestProto.decode(str2bytes(data)); var jsonResp = JSON.stringify(protobufResp); console.log(jsonResp); } }; var testProto = new TestProto({ id:10014, name:"testProtoName測試987" }); xhr.send(testProto.toBuffer()); 

這里因為瀏覽器會把Ajax返回的二進制數據當做文本數據,所以寫個str2bytes方法把接收到的文本數據按字節一個個做與運算來還原成二進制byte

function str2bytes(str){ var bytes = []; for (var i = 0, len = str.length; i < len; ++i) { var c = str.charCodeAt(i); var byte = c & 0xff; bytes.push(byte); } return bytes; } 

長連接——SocketIO/WebSocket

可以說整個互聯網的普及依靠的是瀏覽器和HTTP協議這一最佳拍檔的完美組合,老早前所說的上網沖浪就是打開瀏覽器,輸入網頁地址,然后等待瀏覽器渲染顯示網頁后閱覽;但HTTP協議的一個短板就是不能即時刷新,即需要自己手動刷新頁面,這也就是為什么貼吧/論壇有“F5已爛”這一說法,因為最新的信息不會自動呈現出來。

雖然到了Web2.0時代由於Ajax的應用這一短板的用戶體驗有了大幅度的改善,但Ajax的本質依舊還是基於HTTP協議的短連接只不過是瀏覽器異步加載完成的響應信息而已;甚至還有使用“輪詢”機制模仿長連接即時性的做法(即定時的用Ajax“拉取”服務器的信息來更新頁面),但由於HTTP短連接本質就不是一個真實的雙通道全開的“穩定”的連接,所以其即時性方面無論如何蹩腳的去模擬總會有或多或少的不爽(例如實現起來費勁麻煩等)。

於是乎Html5的到來順便攜帶了WebSocket:這一在HTTP協議基礎上做出“升級”的“穩定”的長連接協議,其本質上是完全雙通道全開,即服務器和客戶端之間的通道隨時可以進行互相推送消息。而SocketIO協議則是考慮到不是所有的瀏覽器都支持WebSocket,於是做了層WebSocket的封裝,對於不支持WebSocket的瀏覽器其內部可能使用的是Ajax模擬的長連接。

因為SocketIO封裝了WebSocket,所以其API接口和WebSocket大同小異。下面分別介紹使用SocketIO/WebSocket來整合Protobuf發送/接收二進制數據的步驟

引入SocketIO客戶端socket.io-client

socket.io-client是GitHub上使用JavaScript實現SocketIO協議的客戶端,這里引入的socket.io-client版本為1.4.5,其中主要使用到了socket.io.js這個JS文件,如果使用NodeJS的話,直接在package.json添加dependencies依賴配置

"socket.io" : "~1.4.5"

然后使用

npm install

即可完成對該依賴的下載,在node_modules文件夾下找到那個JS文件拷貝到前端JS文件夾,然后在前端的index.html中引入socket.io.js

<script type="text/javascript" src="static/js/lib/socket.io/socket.io.js"></script> 

使用SocketIO客戶端

然后我們在JS代碼中結合protobuf.js來使用socket.io.js來發送/接收二進制消息,這里的測試example.proto文件如下

message Message { required string text = 1; } 

接着使用protobuf.js加載上面的example.proto文件,注意同前面的TestProtobuf.proto對比區別下有無package包聲明其protobuf.js加載和構造消息體的不同之處

var ProtoBuf = dcodeIO.ProtoBuf; var Message = ProtoBuf.loadProtoFile("./example.proto").build("Message"); // Connect to our SocketIO server: node server.js var socket = io.connect("http://localhost:3000"); socket.on("connect", function () { log.value += "Connected\n"; }); socket.on("disconnect", function () { log.value += "Disconnected\n"; }); socket.on("message", function (message) { try{ var msg = Message.decode(message); log.value += "Received: " + msg.text + "\n"; }catch(err){ log.value += "Error: " + err + "\n"; } }); function send() { if (socket.connected) { var msg = new Message(text.value); socket.send(msg.toBuffer()); log.value += "Sent: " + msg.text + "\n"; } else { log.value += "Not connected\n"; } } 

使用WebSocket

下面使用Html5 WebSocket API重寫上面SocketIO發送/接收Protobuf二進制的例子,可以看到其實是大同小異的,除了協議不是HTTP而是WebSocket,其API基本類似

// Connect to our server: node server.js var socket = new WebSocket("ws://localhost:8080/ws"); socket.binaryType = "arraybuffer"; // We are talking binary socket.onopen = function() { log.value += "Connected\n"; }; socket.onclose = function() { log.value += "Disconnected\n"; }; socket.onmessage = function(evt) { try { var msg = Message.decode(evt.data); log.value += "Received: "+msg.text+"\n"; } catch (err) { log.value += "Error: "+err+"\n"; } }; function send() { if (socket.readyState == WebSocket.OPEN) { var msg = new Message(text.value); socket.send(msg.toBuffer()); log.value += "Sent: "+msg.text+"\n"; } else { log.value += "Not connected\n"; } } 

后端使用Protobuf發送/接收二進制數據

這里的后端使用NodeJS和Java實現Protobuf二進制數據的發送/接收,且同樣看看區分短連接和長連接的實現

短連接——HTTP

在原生的NodeJS中可以自己編寫代碼開啟一個簡單的HTTP服務器,並自定義實現對HTTP請求的處理,當然你也可以使用一些現成的Web MVC框架例如Express來簡化開發;而在Java中常見的還是使用Tomcat/JBoss這類已經久經沙場的Web容器比較方便,再配合上SpringMVC/Struts2等Web MVC框架的使用話,可以讓Java Web開發人員把精力集中在業務邏輯處理方面;

NodeJS

不得不說基於JavaScript語言的后端開發平台NodeJS確實很強大,它把瀏覽器Ajax這種事件驅動的異步編程模型的寫法從前端照搬到了后端,其核心庫完美的實現了很多底層模塊並提供友好的對外API,令你啟動一個HTTP服務器也就只需要寫幾行代碼的事情,除此之外引入的模塊化機制完美的避開了JS中常見的“命名污染”,還有類似Java中的Maven一樣的依賴包管理工具——NPM,簡直讓你覺得真的是“處處都運行着JavaScript”,Java處處運行的夢想好像要被JavaScript替代了似的

NodeJS使用protobuf.js處理Protobuf

由於NodeJS基於JavaScript語言,所以我們還是和前端的JavaScript代碼一樣使用protobuf.js來處理Protobuf,且使用了前面提到的TestProtobuf.proto

var ProtoBuf = require("protobufjs"); var TestProtobuf = ProtoBuf.loadProtoFile(protobufDir+"TestProtobuf.proto").build("TestProtobuf"), TestProto = TestProtobuf.TestProto; 

NodeJS啟動HTTP服務並接收/發送二進制數據

在NodeJS中真的是就幾句代碼就啟動HTTP服務器了

var http = require("http"); var server = http.createServer(function(request, response){ //處理request和返回response響應 }); server.listen(3000); 

但這是只一個啥事都沒干的HTTP服務器,真正的HTTP服務器至少能提供靜態文件瀏覽服務,在NodeJS上這也需要我們自己去實現,寫個serveStatic方法:其原理是根據請求路徑去讀取磁盤上的文件,如果存在的話讀取成功后返回給前端,不存在就報404錯誤,為了避免每次都從磁盤讀取我們還可以加入緩存

除了處理靜態文件外,我們的重點還是放在NodeJS使用Protobuf發送/接收二進制數據:當我們識別一個來自客戶端的請求參數是二進制數據時(這里是請求方法是POST且包含protobuf關鍵字),我們需要先收集完全部的二進制數據后方可解析,由於網絡的傳輸可能不是一次到位全部傳輸過來,而是一段段(chunk)的過來,所以就有個收集的過程,這里使用了bufferhelper庫簡化收集網絡二進制數據的過程,具體代碼如下

var server = http.createServer(function(request, response){ var filePath = false; if(request.url == "/"){ filePath = "index.html"; }else if(request.method === "POST"){ if(request.url.indexOf("protobuf") != -1){ //BufferHelper參考鏈接 http://www.infoq.com/cn/articles/nodejs-about-buffer/ var bufferHelper = new BufferHelper(); request.on("data", function (chunk) { bufferHelper.concat(chunk); }); request.on("end", function () { var buffer = bufferHelper.toBuffer(); var testProtoData = TestProto.decode(buffer); response.writeHead(200, {"Content-Type": "application/x-protobuf"}); response.end(testProtoData.toBuffer()); }); } return; }else{ filePath = request.url; } var absPath = webRoot+filePath; serveStatic(response, cache, absPath); }); 

可見在收集完二進制數據后的end回調方法中使用了TestProto來解碼二進制,然后再原封不動的轉換為Buffer后通過response的end方法作為響應返回給HTTP客戶端

Java SpringMVC與Protobuf

Java SpringMVC從4.1.6開始使支持Protobuf協議的自動編解碼,所以需要確保pom.xml文件中的Spring核心包以及SpringMVC包的版本都是4.1.6+,當然也需要確保依賴了Protobuf的Java包

<!-- springframework 4.0.7 RELEASE --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>4.1.7.RELEASE</version> </dependency> <!-- spring mvc --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.1.7.RELEASE</version> </dependency> <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>1.3.1</version> </dependency> <!-- protobuf --> <dependency> <groupId>com.google.code</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.0a</version> </dependency> <dependency> <groupId>com.googlecode.protobuf-java-format</groupId> <artifactId>protobuf-java-format</artifactId> <version>1.2</version> </dependency> 

然后web.xml配置了SpringMVC及其mvc.xml文件位置以及匹配后綴名

<!-- 引入上下文配置文件 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:applicationContext.xml</param-value> </context-param> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <servlet> <servlet-name>spring-web</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring/mvc.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>spring-web</servlet-name> <url-pattern>*.why</url-pattern> </servlet-mapping> 

關鍵的部分在mvc.xml配置中,這里使用mvc:annotation-driven的配置寫法配置了消息轉換器為ProtobufHttpMessageConverter令SpringMVC自動支持Protobuf的編解碼

<!-- 配置只掃描web下面類文件,即controller和interceptors,只關注mvc的配置,整個應用的配置在applicationContext.xml --> <context:component-scan base-package="com.why.game.web.*" /> <mvc:interceptors> <bean class="com.why.game.web.interceptor.ControllerInterceptor" /> </mvc:interceptors> <mvc:annotation-driven> <mvc:message-converters> <bean class="org.springframework.http.converter.protobuf.ProtobufHttpMessageConverter"/> </mvc:message-converters> </mvc:annotation-driven> 

最后在SpringMVC的Controller中可使用RequestEntity直接操作傳遞過來的TestProto,並使用ResponseEntity把TestProto作為響應返回去給HTTP客戶端

@Controller @RequestMapping("/") public class TestController { @RequestMapping(value="/protobuf") @ResponseBody public ResponseEntity<TestProto> protobuf(RequestEntity<TestProto> requestProto){ TestProto testProto = requestProto.getBody(); String s = new String(testProto.toByteArray()); System.out.println(s); System.out.println(testProto); HttpServiceCaller.printProtoStr(s); return ResponseEntity.ok(testProto); } } 

長連接——SocketIO/WebSocket

這里以SocketIO為例來看看其在NodeJS和Java中的使用,其實WebSocket的使用方法也是大同小異,僅僅是API略微差別,但思想步驟是一樣適用的

NodeJS中的SocketIO

與前面介紹的SocketIO客戶端socket.io-client相對應的NodeJS服務端是socket.io,我們需要在package.json添加dependencies依賴配置

"socket.io" : "~1.4.5"

並使用npm install下載安裝后,就可以直接在NodeJS中使用socket.io庫來構建了SocketIO服務器了,下面實現一個簡單的業務邏輯:把接收到的數據轉換成大寫后再發送回去給客戶端

var ProtoBuf = require("protobufjs"); var socketio = require("socket.io"); // Initialize from .proto file var builder = ProtoBuf.loadProtoFile(path.join(__dirname, "www", "example.proto")), Message = builder.build("Message"); // SocketIO adapter var io = socketio.listen(server); io.set("log level", 1); io.sockets.on("connection", function(socket){ console.log(socket.id+" connecting..."); socket.on("disconnect", function() { console.log("WebSocket disconnected"); }); socket.on("message", function(data) { try { // Decode the Message var msg = Message.decode(data); console.log("Received: "+msg.text); // Transform the text to upper case msg.text = msg.text.toUpperCase(); // Re-encode it and send it back socket.send(msg.toBuffer()); //socket.emit('message', msg.toBuffer()); console.log("Sent: "+msg.text); } catch (err) { console.log("Processing failed:", err); } }); }); 

注意上面的代碼先使用require引入了Protobuf和SocketIO模塊,然后初始化Protobuf的消息體並讓SocketIO啟動監聽,這里SocketIO監聽的server其實就是NodeJS創建的HTTP服務器,因為在NodeJS里面HTTP服務器和SocketIO服務器共用同一個端口;接下來就是Protobuf對接收到的二進制數據進行解碼打印,然后把字母轉換為大寫后再編碼發送出去

NodeJS中的WebSocket

在NodeJS中使用WebSocket最簡便的方式是使用GitHub上名為ws的項目,其號稱可能是NodeJS里面速度最快的WebSocket庫,我們可以在package.json添加dependencies依賴配置

"ws": "~0.4"

並使用npm install下載安裝后,就可以在NodeJS里使用ws庫來構建WebSocket服務器了,實現與上面SokcetIO服務器相同邏輯的代碼如下,可見WebSocket與SocketIO的API是大同小異的

// WebSocket adapter var wss = new ws.Server({server: server}); wss.on("connection", function(socket) { console.log("New WebSocket connection"); socket.on("close", function() { console.log("WebSocket disconnected"); }); socket.on("message", function(data, flags) { if (flags.binary) { try { // Decode the Message var msg = Message.decode(data); console.log("Received: "+msg.text); // Transform the text to upper case msg.text = msg.text.toUpperCase(); // Re-encode it and send it back socket.send(msg.toBuffer()); console.log("Sent: "+msg.text); } catch (err) { console.log("Processing failed:", err); } } else { console.log("Not binary data"); } }); }); 

Java中的SocketIO

在Java中我們使用了GitHub上一個名為netty-socketio的項目,由名字可看出其是在Netty框架基礎上實現的SocketIO協議,並提供了事件驅動注冊監聽器的寫法,當你從NodeJS轉換代碼過來時會發現其寫法大同小異:即NodeJS使用on方法來注冊監聽事件,netty-socketio中使用addEventListener方法來實現;NodeJS使用emit觸發事件,而netty-socketio中使用sendEvent來觸發事件等

首先在pom.xml中加入netty-socketio的依賴以及Protobuf的依賴:

<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.11-SNAPSHOT</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.7</version> </dependency> <!-- protobuf --> <dependency> <groupId>com.google.code</groupId> <artifactId>protobuf-java</artifactId> <version>2.4.0a</version> </dependency> <dependency> <groupId>com.googlecode.protobuf-java-format</groupId> <artifactId>protobuf-java-format</artifactId> <version>1.2</version> </dependency> 

接下來就是寫一個實現了ConnectListener和DisconnectListener這2個分別代表連接監聽與斷開監聽的接口的SocketIO服務器類,然后該類內部再使用addEventListener來監聽感興趣的事件,對應上面NodeJS SocketIO服務器邏輯的Java SocketIO服務器類如下

public class SocketIOProtoServer implements ConnectListener, DisconnectListener{ private static final String HOST = "localhost"; private static final int PORT = 3001; private final SocketIOServer server; public SocketIOProtoServer(){ server = new SocketIOServer(config()); } private Configuration config(){ Configuration config = new Configuration(); config.setHostname(HOST); config.setPort(PORT); config.setMaxFramePayloadLength(1024 * 1024); config.setMaxHttpContentLength(1024 * 1024); return config; } public void start(){ server.addConnectListener(this); server.addDisconnectListener(this); server.addEventListener("message", byte[].class, new DataListener<byte[]>() { @Override public void onData(SocketIOClient client, byte[] data, AckRequest ackRequest) { Message message = Message.parse(data); System.out.println("Received: "+message.getText()); // Transform the text to upper case message.setText(message.getText().toUpperCase()); // Re-encode it and send it back client.sendEvent("message", message.toByteArray()); System.out.println("Sent: "+message.getText()); } }); server.start(); System.out.println("\n------ "+this.getClass().getSimpleName()+"start on "+PORT+" ------\n"); } public void stop(){ server.stop(); } @Override public void onConnect(SocketIOClient client) { System.out.println(client.getSessionId()+" connecting..."); } @Override public void onDisconnect(SocketIOClient client) { System.out.println(client.getSessionId()+" disconnecting..."); } public static void main(String[] args){ new SocketIOProtoServer().start(); } } 

Netty-SocketIO除了基於接口實現(例如上面的ConnectListener和DisconnectListener與DataListener這3個接口)完成監聽外,還提供了基於注解的監聽機制(對應上面接口實現的3個注解分別是@OnConnect和@OnDisconnect與@OnEvent),如下基於注解的代碼和上面基於接口實現效果是一樣的,注意添加監聽器部分使用server.addListeners(annotationInstance)即可

public class SocketIOProtoServer{ private static final String HOST = "localhost"; private static final int PORT = 3001; private final SocketIOServer server; public SocketIOProtoServer(){ server = new SocketIOServer(config()); } private Configuration config(){ Configuration config = new Configuration(); config.setHostname(HOST); config.setPort(PORT); config.setMaxFramePayloadLength(1024 * 1024); config.setMaxHttpContentLength(1024 * 1024); return config; } public void start(){ server.addListeners(this); server.start(); System.out.println("\n------ "+this.getClass().getSimpleName()+"start on "+PORT+" ------\n"); } public void stop(){ server.stop(); } @OnConnect public void onConnect(SocketIOClient client) { System.out.println(client.getSessionId()+" connecting..."); } @OnDisconnect public void onDisconnect(SocketIOClient client) { System.out.println(client.getSessionId()+" disconnecting..."); } @OnEvent("message") public void onData(SocketIOClient client, byte[] data, AckRequest ackRequest) { Message message = Message.parse(data); System.out.println("Received: "+message.getText()); // Transform the text to upper case message.setText(message.getText().toUpperCase()); // Re-encode it and send it back client.sendEvent("message", message.toByteArray()); System.out.println("Sent: "+message.getText()); } public static void main(String[] args){ new SocketIOProtoServer().start(); } } 

可見其接收/發送Protobuf二進制的代碼與NodeJS相比其實是相當類似的,然后封裝的Message.java如下

public class Message { private String text; public Message() { } public Message(Example.Message proto){ text = proto.getText(); } public byte[] toByteArray(){ Example.Message.Builder builder = Example.Message.newBuilder(); builder.setText(text); return builder.build().toByteArray(); } public static Message parse(byte[] bytes){ Example.Message proto = null; try { proto = Example.Message.parseFrom(bytes); } catch (InvalidProtocolBufferException ex) { throw new IllegalArgumentException(ex); } return new Message(proto); } public String getText() { return text; } public void setText(String text) { this.text = text; } } 

Java中的WebSocket庫

此處我們基於Netty4下的WebSocket實現包來看看上面SocketIO服務類在Netty中長什么樣,其中大部分代碼均源自Netty自帶的example包里的;首先看看Netty中的WebSocketServer

public final class WebSocketServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "3000")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new WebSocketServerInitializer(sslCtx)); Channel ch = b.bind(PORT).sync().channel(); System.out.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 

如下是WebSocketServerInitializer,里面設置了各種HTTP解碼器,然后還有由HTTP協議升級到WebSocket協議的處理器類WebSocketServerProtocolHandler,最后是WebSocket幀處理器類WebSocketFrameHandler

public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private static final String WEBSOCKET_PATH = "/ws"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); pipeline.addLast(new WebSocketFrameHandler()); } } 

最后看看實現與之前的服務器業務邏輯(把接收到的數據轉換成大寫后再發送回去給客戶端相同的WebSocketFrameHandler類的寫法:

public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { private static final Logger logger = LoggerFactory.getLogger(WebSocketFrameHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // ping and pong frames already handled if (frame instanceof TextWebSocketFrame) { // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); logger.info("{} received {}", ctx.channel(), request); ctx.channel().writeAndFlush(new TextWebSocketFrame(request.toUpperCase(Locale.US))); } else if(frame instanceof BinaryWebSocketFrame){ ByteBuf byteBuf = ((BinaryWebSocketFrame) frame).content(); byte[] data = new byte[byteBuf.capacity()]; byteBuf.readBytes(data); Message message = Message.parse(data); System.out.println("Received: "+message.getText()); // Transform the text to upper case message.setText(message.getText().toUpperCase()); // Re-encode it and send it back byte[] bytes = message.toByteArray(); ByteBuf payload = ctx.alloc().buffer(bytes.length); payload.writeBytes(bytes); ctx.channel().writeAndFlush(new BinaryWebSocketFrame(payload)); System.out.println("Sent: "+message.getText()); } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } } } 

可見在使用Message類編解碼的使用方式是一樣的,只是Netty中接收/發送二進制數據需要基於ByteBuf類去轉換為byte[]給Message編解碼;而在SocketIO中是以泛型編程的方式直接聲明接收二進制數據byte[];

這也導致了在Netty里面可以寫一個統一處理WebSocket的Handler,在處理WebSocket幀時可以判定是字符幀(TextWebSocketFrame還是字節幀(BinaryWebSocketFrame,然后分別做處理;但是在SocketIO里面在添加監聽器addEventListener時就決定了處理類型到底是byte[]還是String,不能是一個泛泛的Object對象然后區分處理,除非自己自定義一個泛泛的SocketIOFrame類然后根據什么內部bit位去判斷到底是轉換為byte[]還是String后才分別處理,這就需要看看netty-socketio的源碼實現去了解了。

源碼

參考

(完)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM