在去年的時候,寫過一篇關於websocket的博文:http://www.cnblogs.com/axes/p/3586132.html ,里面主要是借助了nodejs-websocket這個插件,后來還用了socket.io做了些demo,但是,這些都是借助於別人封裝好的插件做出來的,websocket到底是怎么實現的呢自己之前真沒怎么去想過,最近在看朴靈大神的《深入淺出nodejs》時候,看到websocket那一章,看了一下websocket的數據幀的定義,就琢磨着自己用nodejs來實現一下。
客戶端的代碼就不說了,websocket的API還是很簡單的,就通過onmessage、onopen、onclose,以及send方法就可以實現了。
主要說服務端的代碼:
首先是協議的升級,這個比較簡單,就簡述一下:
當在客戶端執行new Websocket("ws://XXX.com/")的時候,客戶端就會發起請求報文進行握手申請,報文中有個很重要的key就是Sec-WebSocket-Key,服務端獲取到key,然后將這個key與字符串258EAFA5-E914-47DA-95CA-C5AB0DC85B11相連,對新的字符串通過sha1安全散列算法計算出結果后,再進行base64編碼,並且將結果放在請求頭的"Sec-WebSocket-Accept"中寫出即可完成握手。然后即可進行數據傳輸
客戶端請求頭截圖:
而服務端的響應則請看代碼:
server.on('upgrade', function (req, socket, upgradeHead) { var key = req.headers['sec-websocket-key']; key = crypto.createHash("sha1").update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").digest("base64"); var headers = [ 'HTTP/1.1 101 Switching Protocols', 'Upgrade: websocket', 'Connection: Upgrade', 'Sec-WebSocket-Accept: ' + key ]; socket.setNoDelay(true); socket.write(headers.join("\r\n") + "\r\n\r\n", 'ascii'); var ws = new WebSocket(socket); webSocketCollector.push(ws); callback(ws); });
upgrade事件其實是http這個模塊的封裝,再往底層就是net模塊的實現,其實都差不多,如果直接用net模塊來實現的話,就是監聽net.createServer返回的server對象的data事件,接收到的第一份數據就是客戶端發來的升級請求報文。
上面那段代碼就完成了websocket的握手,然后就可以開始數據傳輸了。
看數據傳輸之前,先看看websocket數據幀的定義(因為覺得深入淺出nodejs里的幀定義圖最容易理解,所以就貼這張了):
上面的圖中,每一列就是一個字節,一個字節總共是8位,每一位就是一個二進制數,不同位的值會對應不同的意義。
fin:指示這個是消息的最后片段。第一個片段可能也是最后的片段。如果為1即為最后片段。
rsv1、rsv2、rsv3: 各占一個位,用於擴展協商,基本上不怎么需要理,一般都是0
opcode:占四個位,可以表示0~15的十進制,0表示為附加數據幀,1表示為文本數據幀,2表示二進制數據幀,8表示發送一個連接關閉的數據幀,9表示ping,10表示pong,ping和pong都是用於心跳檢測,當一端發送ping時,另一端必須響應pong表示自己仍處於響應狀態。
masked:占一個位,表示是否進行掩碼處理,客戶端發送給服務端時為1,服務端發送給客戶端時為0
payload length:占7位,或者7+16位、或者7+64位。如果第二個字節的后面七個位的十進制值小於或等於125,則直接用這七個位表示數據長度;如果該值為126,說明 125<數據長度<65535(16個位能描述的最大值,也就是16個1的時候),就用第三個字節及第四個字節即16個位來表示;如果該值為127,則說明數據長度已經大於65535,16個位也已經不足以描述數據長度了,就用第三到第十個字節這八個字節來描述數據長度。
masking key:當masked為1的時候才存在,用於對我們需要的數據進行解密。
payload data:我們需要的數據,如果masked為1,該數據會被加密,要通過masking key進行異或運算解密才能獲取到真實數據。
上面的幀定義中,fin位是初學的時候最容易搞混的,fin位看似簡單代表分片的開始和結束,但是有一點要注意的就是,什么情況下才會分片?我剛開始理解的以為是,當發送的數據比較大的時候會進行分片,但是實際操作發現並不是,當我在瀏覽器上發送上萬字節數據的時候,服務端收到的並不是分片數據,而是分塊數據。
什么是分塊數據?其實就是把一段符合websocket數據規范的數據分成多塊傳輸,從而服務端收到的第一塊數據里fin位是1,但是附帶的數據長度卻比palyload length要少很多,在接下來又收到多塊數據,后面收到的數據是沒有上面那些控制頭的,而是純數據。總的來說整個數據可以看成是一整塊,然后分成多塊后發給服務端。其實這個很容易理解,就是socket的分包發送而已。
那什么時候會分片?據我了解,只有當傳輸的數據長度不確定的時候,才會進行分片,比如一邊讀某個數據一邊發送給服務端。如果發送數據長度是確定的,就算數據量很大也只是會進行分塊而不會分片。
引用開濤博客里對分片的解釋:
分片的主要目的是允許當消息開始但不必緩沖該消息時發送一個未知大小的消息。如果消息不能被分片,那么端點將不得不緩沖整個消息以便在首字節發生之前統計出它的長度。對於分片,服務器或中間件可以選擇一個合適大小的緩沖,當緩沖滿時,寫一個片段到網絡。
幀定義解釋完了,就可以根據數據來進行解析了,當有data過來的時候,先獲取需要的數據信息,下面這段代碼將獲取到數據在data里的位置,以及數據長度,masking key以及opcode:
WebSocket.prototype.handleDataStat = function (data) { if (!this.stat) { var dataIndex = 2; //數據索引,因為第一個字節和第二個字節肯定不為數據,所以初始值為2 var secondByte = data[1]; //代表masked位和可能是payloadLength位的第二個字節 var hasMask = secondByte >= 128; //如果大於或等於128,說明masked位為1 secondByte -= hasMask ? 128 : 0; //如果有掩碼,需要將掩碼那一位去掉 var dataLength, maskedData; //如果為126,則后面16位長的數據為數據長度,如果為127,則后面64位長的數據為數據長度 if (secondByte == 126) { dataIndex += 2; dataLength = data.readUInt16BE(2); } else if (secondByte == 127) { dataIndex += 8; dataLength = data.readUInt32BE(2) + data.readUInt32BE(6); } else { dataLength = secondByte; } //如果有掩碼,則獲取32位的二進制masking key,同時更新index if (hasMask) { maskedData = data.slice(dataIndex, dataIndex + 4); dataIndex += 4; } //數據量最大為10kb if (dataLength > 10240) { this.send("Warning : data limit 10kb"); } else { //計算到此處時,dataIndex為數據位的起始位置,dataLength為數據長度,maskedData為二進制的解密數據 this.stat = { index: dataIndex, totalLength: dataLength, length: dataLength, maskedData: maskedData, opcode: parseInt(data[0].toString(16).split("")[1] , 16) //獲取第一個字節的opcode位 }; } } else { this.stat.index = 0; } };
代碼中均有注釋,理解起來應該不難,直接看下一步,獲取到數據信息后,就要對數據進行實際解析了:
經過上面handleDataStat方法的處理,stat中已經有了data的相關數據,先判斷opcode,如果為9說明是客戶端發起的ping心跳檢測,直接返回pong響應,如果為10則為服務端發起的心跳檢測。如果有masking key,則遍歷數據段,對每個字節都與masking key的字節進行異或運算(網上看到一個說法很形象:就是輪流發生X關系),^符號就是進行異或運算啦。如果沒有masking key則直接通過slice方法把數據截取下來。
獲取到數據后,放進datas里保存,因為有可能數據被分塊了,所以再將stat里的長度減去當前數據長度,只有當stat里的長度為0的時候,說明當前幀為最后一幀,然后通過Buffer.concat將所有數據合並,此時再判斷一下opcode,如果opcode為8,則說明客戶端發起了一個關閉請求,而我們獲取到的數據則是關閉原因。如果不為8,則這數據就是我們需要的數據。然后再將stat重置為null,datas數組置空即可。至此,我們的數據解析就完成了。
WebSocket.prototype.dataHandle = function (data) { this.handleDataStat(data); var stat; if (!(stat = this.stat)) return; //如果opcode為9,則發送pong響應,如果opcode為10則置pingtimes為0 if (stat.opcode === 9 || stat.opcode === 10) { (stat.opcode === 9) ? (this.sendPong()) : (this.pingTimes = 0); this.reset(); return; } var result; if (stat.maskedData) { result = new Buffer(data.length-stat.index); for (var i = stat.index, j = 0; i < data.length; i++, j++) { //對每個字節進行異或運算,masked是4個字節,所以%4,借此循環 result[j] = data[i] ^ stat.maskedData[j % 4]; } } else { result = data.slice(stat.index, data.length); } this.datas.push(result); stat.length -= (data.length - stat.index); //當長度為0,說明當前幀為最后幀 if (stat.length == 0) { var buf = Buffer.concat(this.datas, stat.totalLength); if (stat.opcode == 8) { this.close(buf.toString()); } else { this.emit("message", buf.toString()); } this.reset(); } };
完成了客戶端發來的數據解析,還需要一個服務端發數據至客戶端的方法,也就是按照上面所說的幀定義來組裝數據並且發送出去。下面的代碼中基本上每一行都有注釋,應該還是比較容易理解的。
//數據發送 WebSocket.prototype.send = function (message) { if(this.state !== "OPEN") return; message = String(message); var length = Buffer.byteLength(message); // 數據的起始位置,如果數據長度16位也無法描述,則用64位,即8字節,如果16位能描述則用2字節,否則用第二個字節描述 var index = 2 + (length > 65535 ? 8 : (length > 125 ? 2 : 0)); // 定義buffer,長度為描述字節長度 + message長度 var buffer = new Buffer(index + length); // 第一個字節,fin位為1,opcode為1 buffer[0] = 129; // 因為是由服務端發至客戶端,所以無需masked掩碼 if (length > 65535) { buffer[1] = 127; // 長度超過65535的則由8個字節表示,因為4個字節能表達的長度為4294967295,已經完全夠用,因此直接將前面4個字節置0 buffer.writeUInt32BE(0, 2); buffer.writeUInt32BE(length, 6); } else if (length > 125) { buffer[1] = 126; // 長度超過125的話就由2個字節表示 buffer.writeUInt16BE(length, 2); } else { buffer[1] = length; } // 寫入正文 buffer.write(message, index); this.socket.write(buffer); };
除此之外還要實現一個功能,就是心跳檢測:防止服務端長時間不與客戶端交互而導致客戶端關閉連接,所以每隔十秒都會發送一次ping進行心跳檢測
//每隔10秒進行一次心跳檢測,若連續發出三次心跳卻沒收到響應則關閉socket WebSocket.prototype.checkHeartBeat = function () { var that = this; setTimeout(function () { if (that.state !== "OPEN") return; if (that.pingTimes >= 3) { that.close("time out"); return; } //記錄心跳次數 that.pingTimes++; that.sendPing(); that.checkHeartBeat(); }, 10000); }; WebSocket.prototype.sendPing = function () { this.socket.write(new Buffer(['0x89', '0x0'])) }; WebSocket.prototype.sendPong = function () { this.socket.write(new Buffer(['0x8A', '0x0'])) };
最后,在主函數里直接調用,一收到消息就廣播。。。
var server = http.createServer(function(req , res){ router.route(req , res); }).listen(9030); websocket.update(server , function(ws){ ws.on('close' , function(reason){ console.log("socket closed:"+reason); }); ws.on('message' , function(data){ websocket.brocast(data); }); });
至此,整個websocket的實現就完成了,此demo只是大概實現了一下websocket而已,在安全之類方面肯定還是有很多問題,若是真正生產環境中還是用socket.io這類成熟的插件比較好。不過這還是很值得一學的。
附上該demo的github地址:https://github.com/whxaxes/node-test/tree/master/server/websocket
里面的socket.js是該文中引用的代碼完整版。socket_2.js則是后面我在公司內部進行了一次分享時寫的優化版。
如果覺得demo能幫到你,就給個star或者fork唄