WebSocket協議詳解以及WebSocket小型服務實現的技術總結
WebSocket是什么?(下面簡稱ws)
WebSocket是一種在單個TCP連接上進行全雙工通信的網絡傳輸協議。客戶端與服務端完成一次握手后,兩者之間可以創建持久性的連接,並進行雙向數據傳輸。
ws技術可以解決什么什么樣的業務場景問題?
業務場景
客戶端需要持續監測服務器數據變動的業務場景下,如股票交易、搶單、即時通訊、多人協作服務等。這些業務場景對於消息的即時性有很高的要求,而且還要保證消息的可靠傳遞
WS的技術背景
在ws協議出現之前,對於實時性要求高的業務場景往往采用的是基於HTPP協議的的輪詢技術。也就是每隔一段時間向服務器發送請求,如果有最新的數據就返回給客戶端。這種傳統的模式有很明顯的缺點,即客戶端向服務器不斷的發送請求,然而HTTP請求與回復的過程中包含很多較長的頭部(因為HTTP協議是基於文本的協議),其中真正有效的數據可能只是很小的一部分,會導致消耗很多無效的帶寬資源。而且消息的實時性的指標與輪詢的頻率成正相關,但是頻率越高所消耗的帶寬資源就越高,隨着互聯網用戶規模的龐大,顯然遇到了瓶頸。
對於HTTP輪詢協議的改進,就出現了Comet技術。Comet中普遍采用的HTTP長連接也會消耗服務器資源。本質上是將HTTP連接的超時時間強制延長,來減少帶寬的浪費,但本質上還是HTTP技術,需要反復的發出請求。
ws技術的出現解決了上面的問題。ws通過兼容HTTP協議常用的80和443端口來實現客戶端和服務器的雙向通信,可以繞過大多數防火牆的限制。
ws技術的優點
- 減少控制開銷,消息頭部只有2至10字節,相比於HTTP協議的頭部明顯減少
- 更強的實時性,由於是全雙工的通道,在客戶端和服務可以同時向對方發送消息,相對於HTTP請求需要等待客戶端發起請求服務端才能響應,延遲明顯更少
- 保持連接狀態(有狀態的連接),ws和http協議的不同點就是,http協議是無狀態的協議,業務上為了保證連接的狀態,每次進行通信都會攜帶狀態消息(如身份認證等)。而ws協議是有狀態協議,彼此之間通信無需重復傳遞狀態消息
- 可以傳輸二進制數據,相對HTTP,可以更輕松地處理二進制內容。
ws的的協議細節
ws連接的創建需要客戶端首先發起請求連接,而握手請求使用的是HTTP協議。在客戶端的請求頭上的Upgrade字段是websocket,表明需要升級為ws協議進行通訊。服務器在收到請求后,返回101狀態碼表示服務理解了客戶端的請求,並將通過Upgrade消息頭通知客戶端采用不同的協議來完成這個請求。
典型的握手請求
客戶端HTTP請求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13
服務端響應
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
字段說明
- Connection必須設置Upgrade,表示客戶端希望連接升級
- Upgrade字段必須設置Websocket,表示希望升級到Websocket協議。
- Sec-WebSocket-Key是隨機的字符串,服務器端會用這些數據來構造出一個SHA-1的信息摘要。把“Sec-WebSocket-Key”加上一個特殊字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然后計算SHA-1摘要,之后進行Base64編碼,將結果做為“Sec-WebSocket-Accept”頭的值,返回給客戶端。如此操作,可以盡量避免普通HTTP請求被誤認為Websocket協議。
- Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13,之前草案的版本均應當棄用。
- Origin字段是可選的,通常用來表示在瀏覽器中發起此Websocket連接所在的頁面,類似於Referer。但是,與Referer不同的是,Origin只包含了協議和主機名稱。
- 其他一些定義在HTTP協議中的字段,如Cookie等,也可以在Websocket中使用。
數據幀格式
從左到右,單位是Bit,RFC6455參考,5.2章節
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
FIN:1bit
如果是1代表這是單條消息,沒有后續分片了。而如果是0代表,代表此數據幀是不是一個完整的消息,而是一個消息的分片,並且不是最后一個分片后面還有其他分片
RSV1, RSV2, RSV3: 1 bit each
必須是0,除非客戶端和服務端使用WS擴展時,可以為非0。
Opcode: 4bit
這個為操作碼,表示對后面的有效數據荷載的具體操作,如果未知接收端需要斷開連接
-
%x0:表示連續幀
-
%x1:表示文本幀
-
%x2:表示二進制幀
-
%x3-7:保留用於其他非控制幀
-
%x8:表示連接關閉
-
%x9:表示ping操作
-
%xA:表示pong操作
-
%xB-F:保留用於其他控制幀
Mask: 1bit
是否進行過掩碼,比如客戶端給服務端發送消息,需要進行掩碼操作。而服務端到客戶端不需要
Payload Length: 7 bits, 7+16 bits, or 7+64 bits
“有效載荷數據”的長度(以字節為單位):如果為0-125,則為有效載荷長度。 如果為126,則以下2個字節解釋為16位無符號整數是有效載荷長度。 如果是127,以下8個字節解釋為64位無符號整數(最高有效位必須為0)是有效載荷長度。 多字節長度數量以網絡字節順序表示。 注意在所有情況下,必須使用最小字節數進行編碼長度,例如124字節長的字符串的長度不能編碼為序列126、0、124。有效載荷長度是“擴展數據”的長度+“應用程序數據”。 “擴展數據”的長度可以是零,在這種情況下,有效負載長度是 “應用程序數據”。
Masking-key: 0 or 4 bytes (32bit)
所有從客戶端傳送到服務端的數據幀,數據載荷都進行了掩碼操作,Mask為1,且攜帶了4字節的Masking-key。如果Mask為0,則沒有Masking-key。
Payload data: (x+y) bytes
“有效載荷數據”定義為串聯的“Extension data”與“Application data”。
- Extension data: x bytes
如果沒有協商使用擴展的話,擴展數據數據為0字節。所有的擴展都必須聲明擴展數據的長度,或者可以如何計算出擴展數據的長度。此外,擴展如何使用必須在握手階段就協商好。如果擴展數據存在,那么載荷數據長度必須將擴展數據的長度包含在內。
- Application data: y bytes
任意的應用數據,在擴展數據之后(如果存在擴展數據),占據了數據幀剩余的位置。載荷數據長度 減去 擴展數據長度,就得到應用數據的長度。
spring體系下如何搭建ws服務?
采用SpringBoot搭建WS服務
項目依賴
POM.xml
<!-- 需要的依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
可以看到srping-boot-starter-websocket
包含了spring-boot-starter-web
,所以我們不需要再引入服務器依賴。其中還用到了,spring-message
和spring=websocket
可以看到srping-boot-starter-websocket
默認使用tomcat作為web服務器,所以我們后續可以進行HTTP接口的開發
依賴搞定之后需要,進行ws服務的實現。要實現ws服務器,需要實現一下幾個部分,與上面的ws通訊過程對應
- 握手攔截處理器
- 消息處理器
- Session管理器(可選,簡單應用可以不用session管理器)
- WS配置Bean
握手攔截處理器
/*
握手處理器,用於客戶端的握手請求
需要實現HandshakeInterceptor接口並注冊層spring的一個Bean
*/
@Component
@Slf4j
public class CustomHandshakeInterceptor implements HandshakeInterceptor {
public final static String TOKEN = "token";
public final static String CHANNEL_ID = "channelId";
// 握手前的調用,可在這里進行請求的校驗工作(如權限的校驗)
@Override
public boolean beforeHandshake(
ServerHttpRequest serverHttpRequest,
ServerHttpResponse serverHttpResponse,
WebSocketHandler webSocketHandler,
Map<String, Object> attributes) throws Exception
{
final String queryString = serverHttpRequest.getURI().getQuery();
final Map<String, String> queryMap = mapQueryString(queryString);
if (queryMap.containsKey(TOKEN) && queryMap.containsKey(CHANNEL_ID)) {
// attributes是可以用來綁定一些自定義的數據到當前session上,在session的整個生命周期內都可以獲取到
attributes.put(TOKEN, queryMap.get(TOKEN));
attributes.put(CHANNEL_ID, queryMap.get(CHANNEL_ID));
// 校驗成功返回true,失敗返回false,拒絕連接
return true;
}
return false;
}
private Map<String, String> mapQueryString(String queryString) {
Map<String, String> paramMap = new HashMap<>();
if (StringUtils.isEmpty(queryString)) {
return paramMap;
}
final String[] kvArray = queryString.split("&");
for (String kv : kvArray) {
final String[] kvPair = kv.split("=");
if (kvPair.length != 2) {
continue;
}
String key = kvPair[0];
String value = kvPair[1];
if (StringUtils.isNotEmpty(key) && StringUtils.isNotEmpty(value)) {
paramMap.put(key, value);
}
}
return paramMap;
}
@Override
public void afterHandshake(
ServerHttpRequest serverHttpRequest,
ServerHttpResponse serverHttpResponse,
WebSocketHandler webSocketHandler,
Exception e)
{
// 握手之后調用
}
}
消息處理器
消息處理器是ws服務的核心處理器,客戶端發送來的消息都會進來進行處理
/*
消息處理器,用處接收來自客戶端的請求
需要繼承AbstractWebSocketHandler這個抽象類來實現自己的自定義消息處理器
TextWebSocketHandler是用於處理文本消息處理器,也是AbstractWebSocketHandler的派生類
將CustomWebSocketHandler注冊層spring的一個Bean
*/
Component
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler {
@Resource
private WsSessionManager wsSessionManager;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
final Object channelId = session.getAttributes().get(CustomHandshakeInterceptor.CHANNEL_ID);
if (Objects.isNull(channelId)) {
throw new RuntimeException("ID獲取異常");
}
final Object wsTmpToken = session.getAttributes().get(CustomHandshakeInterceptor.TOKEN);
if (Objects.isNull(wsTmpToken)) {
throw new RuntimeException("認證token丟失");
}
// 托管session
final boolean addSuccess = wsSessionManager.add(channelId.toString(), session);
if (!addSuccess) {
session.close(CloseStatus.NORMAL.withReason("頻道被占用,請更換頻道"));
}
}
// 來自客戶端的消息在此處理
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws IOException {
log.debug("客戶端消息: {}", message.getPayload());
}
// 在連接斷后后會調用該方法進行回收處理
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
final Object channelId = session.getAttributes().get(CustomHandshakeInterceptor.CHANNEL_ID);
log.info("斷開客戶端連接:channelId={}", channelId);
// 移除session
wsSessionManager.remove(channelId.toString());
log.info("Session移除成功:channelId={}", channelId);
}
}
Session管理器
/*
Session管理器,用於管理session,並注冊層spring的一個Bean
*/
@Component
@Slf4j
public class WsSessionManager {
// 使用ConcurrentHashMap在多線程寫時保證線程安全
private static final ConcurrentHashMap<String, WebSocketSession>
SESSION_POOL = new ConcurrentHashMap<>();;
// 托管連接
public boolean add(String id, WebSocketSession session) {
final WebSocketSession oldSession = get(id);
if (oldSession != null) {
if (oldSession.isOpen()) {
// 同頻道的連接存在並且活躍的狀態的話,托管失敗
return false;
}
// 移除失效的的老連接
remove(id);
}
// 加入Map容器進行托管
SESSION_POOL.put(id, session);
return true;
}
// 獲取連接
public WebSocketSession get(String id) {
return SESSION_POOL.get(id);
}
// 移除連接
public WebSocketSession remove(String id) {
// 從Map容器中移除
final WebSocketSession session = SESSION_POOL.remove(id);
if (session != null && session.isOpen()) {
try {
// 移除后關閉連接
session.close();
}catch (IOException e) {
log.error(String.format("Session關閉異常, channelId=%s", id), e);
}
}
return session;
}
}
配置WS
/*
這是一個配置Bean,實現了WebSocketConfigurer接口
使用@Configuration注解將該類注冊成一個配置類
使用@EnableWebSocket開啟WebSocket自動配置
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
// 注入握手攔截器
@Resource
private CustomHandshakeInterceptor customHandshakeInterceptor;
// 注入消息處理器
@Resource
private CustomWebSocketHandler customWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 注冊消息處理器,並使用 "/channel"作為處理器的標識,客戶端連接路徑使用"/channel"就把請求發送給指定的處理器
registry.addHandler(customWebSocketHandler, "/channel")
.addInterceptors(customHandshakeInterceptor) // 注冊攔截器
.setAllowedOrigins("*"); // 允許跨域
}
}
至此基本的ws服務搭建完成。啟動服務,使用客戶端連接使用
提供一個簡單的前端代碼,通過控制台簡單使用
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<input type="text" placeholder="token" id="token">
<input type="text" placeholder="channelId" id="channelId">
<input type="button" value="連接" onclick="link()">
<input type="button" value="關閉" onclick="close()">
<body>
</body>
<script>
var ws
var timer
function link() {
// 打開一個 web socket
var token = document.getElementById("token").value
var channelId = document.getElementById("channelId").value
ws = new WebSocket(`ws://localhost:8080/registry?token=${token}&channelId=${channelId}`);
ws.onopen = function () {
console.log("連接完成,可以發送數據");
// 固定頻率發送消息保持連接在線
timer = setInterval(() => {
ws.send(Date.now())
}, 10000)
};
ws.onmessage = function (evt) {
var received_msg = evt.data;
console.log("數據已接收...");
console.log(received_msg)
};
ws.onclose = function () {
// 關閉 websocket
console.log("連接已關閉...");
};
ws.onerror = function (err) {
console.error(err)
}
}
function close() {
if (ws === null || ws === undefined) {
alert("請先連接")
}
ws.close()
if (!timer) {
alert("請先連接")
}
timer.clear()
}
</script>
</html>
實際開發中ws服務設計需要注意的方面
業務層消息的確認機制
雖然WS是基於TCP連接的通訊機制,TCP協議特性能保證傳輸層一定能成功發送數據。但是對於復雜業務和可靠性要求高的業務,最佳的實踐是在業務層進行消息的確認。服務端對每一條消息映射一個唯一的標識,客戶端在收到消息后,需要將該消息的唯一標識返回給服務端。否則服務端進行一定次數的重試推送,從而來保證消息的可靠推送。
心跳機制
對於服務側重在服務端消息推送的業務。那客戶端需要隨時保持對連接的監聽,而長時間沒有數據來往的情況下,不同的客戶端和服務端實現會嘗試關閉連接。所以為了保證服務端的及時推送,客戶端需要和服務端保持一定頻率的心跳連接。
- 可以定時的往服務端發送消息,如果發現連接斷開,則重新發起連接請求
- 可以服務端對客戶端發送ping操作,客戶端響應pong操作來實現心跳
分布式多實例部署的ws服務session的共享問題
由於ws連接的特殊性,即連接是有狀態的。所以一旦連接斷開后狀態就消失了,下次再進行連接時和上一次的連接並不能對應上。所以平常Web開發中常用的基於序列化和反序列化機制的外部緩存對於面向長連接的ws來說是無法實現的。所以需要通過下面的幾種機制來實現
定向分配機制
配備服務的連接注冊中心,將用戶和真實連接的節點進行映射。在需要向某一個指定用戶推送消息時,通過連接注冊中心找到當前處理當前用戶連接的真實節點,然后將消息推送給處理節點,處理節點轉發消息給用戶。
優點:推送精准,避免無效的廣播開銷,架構可以根據業務壓力的增加進行水平的擴展,適用於大型服務架構
缺點:實現復雜,需要獨立開發一個分布式的連接管理中心。生產上需要高可用架構的設計
MQ或總線的廣播機制
通過MQ或Redis的消息訂閱機制,進行消息的廣播。將ws服務節點接入到同一的MQ或者Redis中,訂閱同於個主題或頻道。當需要發送消息的時候,將消息廣播給所有節點,節點收到廣播后會去匹配當前消息的目標連接是否在本節點上。如果在本節點就進行消息推送。不在本節點就自動忽略。
優點:實現簡單,維護方便。架構上只需引入一個MQ或這Redis中間件。適合ws服務節點規模不大的場景
缺點:需要良好的代碼實現,搞不好容易發生廣播風暴,拖垮集群。而且一般一個用戶只會連接再集群中的某一個節點,而將消息廣播給每一個節點,其實是沒必要的。當集群規模擴張到一定程度,當發送一個廣播后,所有節點開始計算,導致集群的計算負載短時間內出現峰值。
總結
實際開發中,如果預測到集群規模不到的情況。可以優先考慮使用廣播機制進行消息廣播。但集群規模很大的情況下,考慮定向分配的架構設計。