WebSocket
傳統的瀏覽器和服務器之間的交互模式是基於請求/響應的模式,雖然可以使用js發送定時任務讓瀏覽器在服務器中拉取但是弊端很明顯,首先就是不能避免的延遲,其次就是頻繁的請求,讓服務器的壓力驟然提升
WebSocket是H5新增的協議,用於構建瀏覽器和服務器之間的不受限的長連接的通信模式,不再局限於請求/響應式的模型,服務端可以主動推送消息給客戶端,(游戲有某個玩家得獎了的彈幕)基於這個特性我們可以構建我們的實時的通信程序
協議詳情:
websocket建立連接時,是通過瀏覽器發送的HTTP請求,報文如下:
GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13
- 首先GET請求是以
ws
開頭的 - 其中請求頭中的
Upgrade: websocket Connection: Upgrade
表示嘗試建立WebSocket連接
對於服務端的相應數據
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string
其中的101
,表示服務端支持WebSocket協議, 雙方基於Http請求,成功建立起WebSocket連接,雙方之間的通信也不再通過HTTP
JS對WebSocket的封裝對象
對於JS的WebSocket對象來說,它常用 4個回調方法,以及兩個主動方法
方法名 | 作用 |
---|---|
onopen() | 和服務端成功建立連接后回調 |
onmessage(e) | 收到服務端的的消息后回調,e為消息對象 |
onerror() | 鏈接出現異常回調,如服務端關閉 |
onclose() | 客戶端單方面斷開連接時回調 |
send(e) | 主動向服務端推送消息 |
close() | 主動關閉通道 |
再次對WebSocket進行封裝
知道了回調函數回調時機,我們接下來要做的就是在他的整個生命周期的不同回調函數中,添加我們指定的動作就ok了,下面是通過Window定義一個全局的聊天對象CHAT
window.CHAT={
var socket = null;
// 初始化socket
init:function(){
// 判斷當前的瀏覽器是否支持WebSocket
if(window.WebSocket){
// 檢驗當前的webSocket是否存在,以及連接的狀態,如已經連接,直接返回
if(CHAT.socket!=null&&CHAT.socket!=undefined&&CHAT.socket.readyState==WebSocket.OPEN){
return false;
}else{// 實例化 , 第二個ws是我們可以自定義的, 根據后端的路由來
CHAT.socket=new WebSocket("ws://192.168.43.10:9999/ws");
// 初始化WebSocket原生的方法
CHAT.socket.onopen=CHAT.myopen();
CHAT.socket.onmessage=CHAT.mymessage();
CHAT.socket.onerror=CHAT.myerror();
CHAT.socket.onclose=CHAT.myclose();
}
}else{
alert("當前設備不支持WebSocket");
}
}
// 發送聊天消息
chat:function(msg){
// 如果的當前的WebSocket是連接的狀態,直接發送 否則從新連接
if(CHAT.socket.readyState==WebSocket.OPEN&&CHAT.socket!=null&&CHAT.socket!=undefined){
socket.send(msg);
}else{
// 重新連接
CHAT.init();
// 延遲一會,從新發送
setTimeout(1000);
CHAT.send(msg);
}
}
// 當連接建立完成后對調
myopen:function(){
// 拉取連接建立之前的未簽收的消息記錄
// 發送心跳包
}
mymessage:function(msg){
// 因為服務端可以主動的推送消息,我們提前定義和后端統一msg的類型, 如,拉取好友信息的消息,或 聊天的消息
if(msg==聊天內容){
// 發送請求簽收消息,改變請求的狀態
// 將消息緩存到本地
// 將msg 轉換成消息對象, 植入html進行渲染
}else if(msg==拉取好友列表){
// 發送請求更新好友列表
}
}
myerror:function(){
console.log("連接出現異常...");
}
myclose:function(){
console.log("連接關閉...");
}
keepalive: function() {
// 構建對象
var dataContent = new app.DataContent(app.KEEPALIVE, null, null);
// 發送心跳
CHAT.chat(JSON.stringify(dataContent));
// 定時執行函數, 其他操作
// 拉取未讀消息
// 拉取好友信息
}
}
對消息類型的約定
WebSocket對象通過send(msg)
;方法向后端提交數據,常見的數據如下:
- 客戶端發送聊天消息
- 客戶端簽收消息
- 客戶端發送心跳包
- 客戶端請求建立連接
為了使后端接收到不同的類型的數據做出不同的動作, 於是我們約定發送的msg的類型;
// 消息action的枚舉,這個枚舉和后端約定好,統一值
CONNECT: 1, // 第一次(或重連)初始化連接
CHAT: 2, // 聊天消息
SIGNED: 3, // 消息簽收
KEEPALIVE: 4, // 客戶端保持心跳
PULL_FRIEND:5, // 重新拉取好友
// 消息模型的構造函數
ChatMsg: function(senderId, receiverId, msg, msgId){
this.senderId = senderId;
this.receiverId = receiverId;
this.msg = msg;
this.msgId = msgId;
}
// 進一步封裝兩個得到最終版消息模型的構造函數
DataContent: function(action, chatMsg, extand){
this.action = action;
this.chatMsg = chatMsg;
this.extand = extand;
}
如何發送數據?
我們使用js,給發送按鈕綁定點擊事件,一經觸發,從緩存中獲取出我們需要的參數,調用
CHAT.chat(Json.stringify(dataContent));
后端netty會解析dataContent的類型,進一步處理
如何簽收未與服務器連接時好友發送的消息?
-
消息的簽收時機:
之所以會有未簽收的信息,是因為客戶端未與服務端建立WebSocket連接, 當服務端判斷他維護的channel組中沒有接受者的channel時,不會發送數據,而是把數據持久化到數據庫,並且標記flag=未讀, 所以我們簽收信息自然放在客戶端和服務端建立起連接時的回調函數中執行 -
步驟:
- 客戶端通過js請求,拉取全部的和自己相關的flag=未讀的消息實體列表
- 從回調函數數中,把列表中的數據取出,緩存在本地
- 將列表中的數據回顯在html頁面中
- 和后端約定,將該列表中所有的實例的id取出,用逗號分隔拼接成字符串, 以
action=SIGNED
的方式發送給后端,讓其進行簽收
Netty對WebSocket的支持
首先每一個Netty服務端的程序都是神似的,想創建不同的服務端,就得給Netty裝配的pipeline不同的Handler
針對聊天程序,處理String類型的Json信息,我們選取SimpleChannelInboundHandler
, 他是個典型的入站處理器,並且如果我們沒有出來數據,她會幫我們回收 重寫它里面未實現抽象方法,這些抽象方法同樣是回調方法, 當一個新的Channel進來, 它注冊進Selector上的過程中,會回調不同的抽象方法
方法名 | 回調時機 |
---|---|
handlerAdded(ChannelHandlerContext ctx) | Pepiline中的Handler添加完成回調 |
channelRegistered(ChannelHandlerContext ctx) | channel注冊進Selector后回調 |
channelActive(ChannelHandlerContext ctx) | channel處於活動狀態回調 |
channelReadComplete(ChannelHandlerContext ctx) | channel, read結束后回調 |
userEventTriggered(ChannelHandlerContext ctx, Object evt) | 當出現用戶事件時回調,如 讀/寫 |
channelInactive(ChannelHandlerContext ctx) | 客戶端斷開連接時回調 |
channelUnregistered(ChannelHandlerContext ctx) | 客戶端斷開連接后,取消channel的注冊時回調 |
handlerRemoved(ChannelHandlerContext ctx) | 取消channel的注冊后,將channel移除ChannelGroup后回調 |
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) | 出現異常時回調 |
handler的設計編碼
要做到點對點的聊天,前提是服務端擁有全部的channel因為所有數據的讀寫都依賴於它,而 netty為我們提供了ChannelGroup
用來保存所有新添加進來的channel, 此外點對點的聊天,我們需要將用戶信息和它所屬的channel進行一對一的綁定,才可以精准的匹配出兩個channel進而數據交互, 因此添加UserChannel映射類
public class UserChanelRelationship {
private static HashMap<String, Channel> manager = new HashMap<>();
public static void put(String sendId,Channel channel){
manager.put(sendId,channel);
}
public static Channel get(String sendId){
return manager.get(sendId);
}
public static void outPut(){
for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
}
}
}
我們把User和Channel之間的關系以鍵值對的形式存放進Map中,服務端啟動后,程序就會維護這個map, 那么問題來了? 什么時候添加兩者之間的映射關系呢? 看上handler的回調函數,我們選擇 channelRead0()
當我們判斷出 客戶端發送過來的信息是 CONNECT
類型時,添加映射關系
下面是handler的處理編碼
public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用於管理整個客戶端的 組
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();
// 1. 獲取客戶端發送的消息
String content = frame.text();
System.out.println(" content: "+content);
// 2. 判斷不同的消息的類型, 根據不同的類型進行不同的處理
// 當建立連接時, 第一次open , 初始化channel,將channel和數據庫中的用戶做一個唯一的關聯
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();
if (action == MsgActionEnum.CHAT.type) {
// 3. 把聊天記錄保存到數據庫
// 4. 同時標記消息的簽收狀態 [未簽收]
// 5. 從我們的映射中獲取接受方的chanel 發送消息
// 6. 從 chanelGroup中查找 當前的channel是否存在於 group, 只有存在,我們才進行下一步發送
// 6.1 如果沒有接受者用戶channel就不writeAndFlush, 等着用戶上線后,通過js發起請求拉取未接受的信息
// 6.2 如果沒有接受者用戶channel就不writeAndFlush, 可以選擇推送
}else if (action == MsgActionEnum.CONNECT.type){
// 當建立連接時, 第一次open , 初始化channel,將channel和數據庫中的用戶做一個唯一的關聯
String sendId = dataContent.getChatMsg().getSenderId();
UserChanelRelationship.put(sendId,currentChanenl);
}else if(action == MsgActionEnum.SINGNED.type){
// 7. 當用戶沒有上線時,發送消息的人把要發送的消息持久化在數據庫,但是卻沒有把信息寫回到接受者的channel, 把這種消息稱為未簽收的消息
// 8. 簽收消息, 就是修改數據庫中消息的簽收狀態, 我們和前端約定,前端如何簽收消息在上面有提到
String extend = dataContent.getExtand();
// 擴展字段在 signed類型代表 需要被簽收的消息的id, 用逗號分隔
String[] msgIdList = extend.split(",");
List<String> msgIds = new ArrayList<>();
Arrays.asList(msgIdList).forEach(s->{
if (null!=s){
msgIds.add(s);
}
});
if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
// 批量簽收
}
}else if (action == MsgActionEnum.KEEPALIVE.type){
// 6. 心跳類型
System.out.println("收到來自channel 為" +currentChanenl+" 的心跳包... ");
}
}
// handler 添加完成后回調
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 獲取鏈接, 並且若想要群發的話,就得往每一個channel中寫數據, 因此我們得在創建連接時, 把channel保存起來
System.err.println("handlerAdded");
users .add(ctx.channel());
}
// 用戶關閉了瀏覽器回調
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 斷開連接后, channel會自動移除group
// 我們主動的關閉進行, channel會被移除, 但是我們如果是開啟的飛行模式,不會被移除
System.err.println("客戶端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 發生異常關閉channel, 並從ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}
... 其他方法
前后端的心跳維持
雙方建立起WebSocket連接后,服務端需要明確的知道,自己維護的諸多channel中,誰已經掛掉了, 為了提高性能,需要及早把廢棄的channel移除ChanelGroup
客戶端殺掉了進程,或者開啟了飛行模式, 這時服務端是感知不到它維護的channel中已經有一個不能使用了,首先來說,維護一個不能使用的channel會影響性能,而且當這個channel的好友給他發送消息時,服務端認為用戶在線,於是向一個不存在的channel寫入刷新數據,會帶來額外的麻煩
這時我們就需要添加心跳機制,客戶端設置定時任務,每個一段時間就往服務端發送心跳包,心跳包的內容是什么不是重點,它的作用就是告訴服務端自己還active, N多個客戶端都要向服務端發送心跳,這並不會增加服務端的請求,因為這個請求是通過WebSocket的send方法發送過去的,只不過dataContent的類型是 KEEPALIVE , 同樣這是我們提前約定好的(此外,服務端向客戶端發送心跳看起來是沒有必要的)
於是對於后端來說,我們發送的心跳包,會使得當前客戶端對應的channel的channelRead0()方法回調, netty為我們提供了心跳相關的handler, 每一次的chanelRead0()的回調,都是read/write事件, 下面是netty對心跳的支持的實現
/**
* @Author: Changwu
* @Date: 2019/7/2 9:33
* 我們的心跳handler不需要實現handler0方法,我們選擇,直接繼承SimpleInboundHandler的父類
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 我們重寫 EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 當出現read/write 讀寫寫空閑時觸發
if(evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state()== IdleState.READER_IDLE){ // 讀空閑
System.out.println(ctx.channel().id().asShortText()+" 讀空閑... ");
}else if (event.state()==IdleState.WRITER_IDLE){
System.out.println(ctx.channel().id().asShortText()+" 寫空閑... ");
}else if (event.state()==IdleState.ALL_IDLE){
System.out.println("channel 讀寫空閑, 准備關閉當前channel , 當前UsersChanel的數量: "+MyHandler.users.size());
Channel channel = ctx.channel();
channel.close();
System.out.println("channel 關閉后, UsersChanel的數量: "+MyHandler.users.size());
}
}
}
Handler我們不再使用SimpleChannelInboundHandler了,因為它當中的方法都是抽象方法,而我們需要回調的函數時機是,每次當有用戶事件時回調, 比如read,write事件, 這些事件可以證明channel還活着,對應的方法是userEventTriggered()
此外, ChannelInboundHandlerAdapter是netty中,適配器模式的體現, 它實現了全都抽象方法,然后他的實現方法中並不是在干活,而是把這個事件往下傳播下去了,現在我們重寫userEventTriggered()
執行的就是我們的邏輯
另外,我們需要在pipeline中添加handler
...
/ 添加netty為我們提供的 檢測空閑的處理器, 每 20 40 60 秒, 會觸發userEventTriggered事件的回調
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());
服務端主動向客戶端推送數據
如, 添加好友的操作中, A向B發送添加好友請求的過程,會經過如下幾步
- A向服務端發送ajax請求,將自己的id, 目標朋友的id持久化到 數據庫,請求friend_request表
- 用戶B上線,通過js,向后端拉取friend_request表中有沒有關於自己的信息,於是服務端把A的請求給B推送過去
- 在B的前端回顯A的請求, B進一步處理這個信息, 此時兩種情況
- B拒絕了A的請求: 后端把friend_request表關於AB的信息清除
- B同意了A的請求: 后端在firend_List表中,將AB雙方的信息都持久化進去, 這時我們可以順勢在后端的方法中,給B推送最新的聯系人信息, 但是這不屬於主動推送,因為這次會話是客戶端主動發起的
但是A卻不知道,B已經同意了,於是需要給A主動的推送數據, 怎么推送呢? 我們需要在上面的UserChannel的關系中,拿出發送者的channel, 然后往回writeAndFlush
內容,這時A就得知B已經同意了,重新加載好友列表