webscoket方案調研及實踐
一、使用場景
1、考試管理端需要給特定考試用戶單獨暫停考試、繼續考試、加時、減時的操作,當管理端執行了上述的某個操作,需要實時的通知到正在考試的用戶那里。
2、社交聊天、彈幕、多玩家游戲、協同編輯、股票基金實時報價、體育實況更新、視頻會議/聊天、智能家居等需要高實時的場景
二、方案調研
1、Ajax短輪詢
短輪詢:客戶端定時向服務器發送Ajax請求,服務器接到請求后馬上返回響應信息並關閉連接。
優點:后端程序編寫比較容易。
缺點:請求中有大半是無用,浪費帶寬和服務器資源。
2、long-polling長輪詢
長輪詢:客戶端向服務器發送Ajax請求,服務器接到請求后hold住連接,直到有新消息才返回響應信息並關閉連接,客戶端處理完響應信息后再向服務器發送新的請求。
優點:在無消息的情況下不會頻繁的請求,耗費資源小。
缺點:服務器hold連接會消耗資源,返回數據順序無保證,難於管理維護。
3、iframe長連接
長連接:在頁面里嵌入一個隱蔵iframe,將這個隱蔵iframe的src屬性設為對一個長連接的請求或是采用xhr請求,服務器端就能源源不斷地往客戶端輸入數據。
優點:消息即時到達,不發無用請求;管理起來也相對方便。
缺點:服務器維護一個長連接會增加開銷,不同瀏覽器會有加載問題。
4、XHR-streaming
XHR流:服務端使用分塊傳輸編碼(Chunked transfer encoding)的HTTP傳輸機制進行響應,並且服務器端不終止HTTP響應流,讓HTTP始終處於持久連接狀態,當有數據需要發送給客戶端時再進行寫入數據。
優點:通過XHR-Streaming,可以允許服務端連續地發送消息,無需每次響應后再去建立一個連接。
缺點:XHR-streaming連接的時間越長,瀏覽器會占用過多內存,sockjs默認只允許每個xhr-streaming連接輸出128kb數據,超過這個大小時會關閉輸出流,讓瀏覽器重新發起請求。
5、Websocket
websocket:Webscoket是Web瀏覽器和服務器之間的一種全雙工通信協議.一旦Web客戶端與服務器建立起連接,之后的全部數據通信都通過這個連接進行。通信過程中,可互相發送JSON、XML、HTML或圖片等任意格式的數據。
優點:復用長連接,全雙工通信,支持服務器推送消息
缺點:服務器維護一個長連接會增加開銷,不同瀏覽器會支持程度不一
5.1 實現原理
Websocket是應用層第七層上的一個應用層協議,它必須依賴 HTTP 協議進行一次握手 ,握手成功后,數據就直接從 TCP 通道傳輸,與 HTTP 無關了。
WebSocket 交互以 HTTP 請求開始,該請求使用 HTTP“Upgrade”header頭升級或在本例中切換到 WebSocket 協議
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
http升級成websocket請求返回的是狀態碼101,而不是200。
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
成功握手后,HTTP 升級請求下的 TCP 套接字保持打開狀態,以便客戶端和服務器繼續發送和接收消息。
參考
https://segmentfault.com/a/1190000019697463
三、實現方案(Websocket)
在java層面實現Webocket主要有spring、netty兩個方向,由於我們系統使用的是spring系列,所以采用spring的Websocket實現方案。
在spring的Websocket實現中,又可以細分為3種:
1、基於java原生注解:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@Configuration
@EnableWebSocket // 開啟websocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpoint() {
return new ServerEndpointExporter();
}
}
@ServerEndpoint("/myWs") // 聲明websocket端點
@Component
public class WsServerEndpoint {
private static Map<String, Session> onlineUserCache = new HashMap<>();
/**
* 連接成功
*
* @param session
*/
@OnOpen
public void onOpen(Session session) {
System.out.println("連接成功");
}
/**
* 連接關閉
*
* @param session
*/
@OnClose
public void onClose(Session session) {
System.out.println("連接關閉");
}
/**
* 接收到消息
*
* @param text
*/
@OnMessage
public String onMsg(String text) throws IOException {
return "servet 發送:" + text;
}
}
WsServerEndpoint類下的幾個注解需要注意一下,首先是他們的包都在 javax.websocket 下。並不是 spring 提供的,而 jdk 自帶的,下面是他們的具體作用。
- @ServerEndpoint通過這個 spring boot 就可以知道你暴露出去的 ws 應用的路徑,有點類似我們經常用的@RequestMapping。比如你的啟動端口是8080,而這個注解的值是ws,那我們就可以通過 ws://127.0.0.1:8080/ws 來連接你的應用
- @OnOpen當 websocket 建立連接成功后會觸發這個注解修飾的方法,注意它有一個 Session 參數
- @OnClose當 websocket 建立的連接斷開后會觸發這個注解修飾的方法,注意它有一個 Session 參數
- @OnMessage當客戶端發送消息到服務端時,會觸發這個注解修改的方法,它有一個 String 入參表明客戶端傳入的值
- @OnError當 websocket 建立連接時出現異常會觸發這個注解修飾的方法,注意它有一個 Session 參數
另外一點就是服務端如何發送消息給客戶端,服務端發送消息必須通過上面說的 Session 類,通常是在@OnOpen 方法中,當連接成功后把 session 存入 Map 的 value,key 是與 session 對應的用戶標識,當要發送的時候通過 key 獲得 session 再發送,這里可以通過 session.getBasicRemote().sendText()來對客戶端發送消息
2、spring提供的WebSocket API
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
public class MyHandler extends TextWebSocketHandler {
// 建立連接成功事件
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Object token = session.getAttributes().get("token");
if (token != null) {
// 用戶連接成功,放入在線用戶緩存
WsSessionManager.add(token.toString(), session);
} else {
throw new RuntimeException("用戶登錄已經失效!");
}
}
// 接收消息事件
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) {
// ...
}
// 斷開連接時
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Object token = session.getAttributes().get("token");
if (token != null) {
// 用戶退出,移除緩存
WsSessionManager.remove(token.toString());
}
}
}
MyHandler通過繼承 TextWebSocketHandler 類並覆蓋相應方法,可以對 websocket 的事件進行處理,這里可以同原生注解的那幾個注解連起來看.
- afterConnectionEstablished 方法是在 socket 連接成功后被觸發,同原生注解里的 @OnOpen 功能
- afterConnectionClosed 方法是在 socket 連接關閉后被觸發,同原生注解里的 @OnClose 功能
- handleTextMessage 方法是在客戶端發送信息時觸發,同原生注解里的 @OnMessage 功能、
public class WsSessionManager {
/**
* 保存連接 session 的地方
*/
private static ConcurrentHashMap<String, List<WebSocketSession>> SESSION_POOL = new ConcurrentHashMap<>();
/**
* 添加 session
*
* @param key
*/
public static void add(String key, WebSocketSession session) {
// 添加 session
SESSION_POOL.put(key, session);
}
/**
* 刪除 session,會返回刪除的 session
*
* @param key
* @return
*/
public static WebSocketSession remove(String key) {
// 刪除 session
return SESSION_POOL.remove(key);
}
/**
* 刪除並同步關閉連接
*
* @param key
*/
public static void removeAndClose(String key) {
WebSocketSession session = remove(key);
if (session != null) {
try {
// 關閉連接
session.close();
} catch (IOException e) {
// todo: 關閉出現異常處理
e.printStackTrace();
}
}
}
/**
* 獲得 session
*
* @param key
* @return
*/
public static WebSocketSession get(String key) {
// 獲得 session
return SESSION_POOL.get(key);
}
}
這里簡單通過 ConcurrentHashMap 來實現了一個 session 池,用來保存已經登錄的 web socket 的 session。前面提過,服務端發送消息給客戶端必須要通過這個 session。
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("http://mydomain.com");
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}
通過實現 WebSocketConfigurer 類並覆蓋相應的方法進行 websocket 的配置。我們主要覆蓋 registerWebSocketHandlers 這個方法。通過向 WebSocketHandlerRegistry 設置不同參數來進行配置。其中 addHandler 方法添加我們上面的寫的 ws 的 handler 處理類,第二個參數是你暴露出的 ws 路徑。setAllowedOrigins("*") 這個是關閉跨域校驗,方便本地調試,線上推薦打開。
3、基於STOMP消息協議實現
首先需要對SockJS、StompJS以及跟WebSocket三者做簡要的說明。
3.1、SockJS
SockJS是一個JavaScript庫,為了應對許多瀏覽器不支持WebSocket協議的問題,設計了備選SockJS 。SockJS 是 WebSocket 技術的一種模擬。SockJS會盡可能對應 WebSocket API,但如果WebSocket 技術不可用的話,會自動降為輪詢的方式。還提供了心跳檢測的機制。
3.2、StompJS
STOMP—— Simple Text Oriented Message Protocol——面向消息的簡單文本協議。
SockJS 為 WebSocket 提供了 備選方案。 STOMP協議,采用消息訂閱的機制,為瀏覽器 和 server 間的 通信增加適當的消息語義。
3.3、WebSocket、SockJs、STOMP三者關系
WebSocket 是底層協議,SockJS 是WebSocket 的備選方案,是一種兼容實現,而 STOMP 是基於 WebSocket(SockJS)的上層協議。
3.4、代碼實現
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS(); // 1
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app"); // 2
config.enableSimpleBroker("/topic", "/queue"); // 3
}
}
- "/portfolio"是 WebSocket(或 SockJS)客戶端需要連接到 WebSocket 握手的端點的 HTTP URL
- 以“/app”開頭的STOMP消息將被路由到@Controller 類中的@MessageMapping 方法中
- 使用內置的消息代理進行訂閱和廣播;將以“/topic”或“/queue”開頭的消息路由到代理
@Controller
public class WSController {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
@MessageMapping("/hello")
@SendTo("/topic/hello")
public ResponseMessage hello(RequestMessage requestMessage) {
System.out.println("接收消息:" + requestMessage);
return new ResponseMessage("服務端接收到你發的:" + requestMessage);
}
@GetMapping("/sendMsgByUser")
public @ResponseBody
Object sendMsgByUser(String token, String msg) {
simpMessagingTemplate.convertAndSendToUser(token, "/msg", msg);
return "success";
}
}
通過 @MessageMapping 來暴露節點路徑,有點類似 @RequestMapping。注意這里雖然寫的是 hello ,但是我們客戶端調用的真正地址是 /app/hello。 因為我們在上面的 config 里配置了registry.setApplicationDestinationPrefixes("/app")。
@SendTo這個注解會把返回值的內容發送給訂閱了 /topic/hello 的客戶端,與之類似的還有一個@SendToUser 只不過他是發送給用戶端一對一通信的。這兩個注解一般是應答時響應的,如果服務端主動發送消息可以通過 simpMessagingTemplate類的convertAndSend方法。
3.5、 消息流向
對於客戶端來說,既可以發送指定的消息請求"/app/a",又可以訂閱某一個消息主題"/topic/a"。消息訂閱的會被路由到消息代理SimpleBroker中,指定的消息請求會被@MessageMapper路由到指定的方法中,之后再根據特定的主題訂閱到消息代理SimpleBroker中。最后再通過消息代碼返回消息到對應的客戶端。
4、問題
4.1 Websocket連接鑒權問題
前2種可以通過添加握手過程的攔截器,在進行握手前,通過獲取url傳參,進行鑒權;STOMP實現方案可以通過獲取header中的參數來進行鑒權。
4.2 分布式問題
前2種方案,跟客戶端的交互都需要通過Session進行,並且需要在各個JVM中維護自己的Session池,在分布式環境中,跟消息服務A連接的用戶沒辦法發送消息給跟消息服務B連接的用戶。
而Stomp可以通過外部消息中間件MessageBroker的接入解決分布式問題。
4.3 瀏覽器Websocket協議兼容問題
前2種方案只能通過Websocket協議進行握手,當客戶端所在瀏覽器不支持WebSocket協議時,需要再實現一套輪詢的方案來實現客戶端與服務端的交互問題。
而SockJS可以實現當瀏覽器不支持WebSocket協議時,會自動降為輪詢的方式進行交互。
5、消息推送負載均衡方案
上述不管哪種實現方案,都避不開分布式問題。解決分布式問題一般引入第三方中間件,在這里我們可以引入rocketMq、redis、rabbitMq等。消息推送服務都訂閱到中間件,業務系統通過發布到中間件,進而讓各個連接到消息推送服務的客戶端能夠收到消息。
四、方案實踐
1、方案選型
這里我們選基於STOMP消息協議實現的Websocket方案,原因有如下幾點:
1、前后端采用消息訂閱消費機制,無須在各個JVM中維護各個SESSION池;
2、采用STOMP協議通信,更容易與中間件結合,解決分布式連接問題(瀏覽器A連接服務A,瀏覽器B連接服務B,A沒法跟B通信)
3、前端實現采用的是SockJS,能夠檢測各個瀏覽器能否支持Websocket協議,不支持的話,會自己降級成XHR- streaming、iframe的實現方式
4、通過SockJS發起的Websocket連接,可以在header中添加參數,來實現鑒權,不然只能通過跟在url后的參數進行鑒權
2、具體實現
2.1 引入websocket依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.2 添加Websocket消息代理配置
package com.learnfuture.elearning.exam.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import javax.annotation.Resource;
/**
* @author huangyizeng
* @description Websocket 消息代理配置
* @date 2021/8/22
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Resource
private SocketChanelInterceptor socketChanelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry){
//客戶端連接端點
registry.addEndpoint("/exam/websocket")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic","/queue/", "/exchange/");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(socketChanelInterceptor);
}
}
package com.learnfuture.elearning.exam.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author huangyizeng
* @description
* @date 2021/8/22
**/
@Slf4j
@Component
public class SocketChanelInterceptor implements ChannelInterceptor {
/**
* 實際消息發送到頻道之前調用
*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
//1、判斷是否首次連接
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
List<String> nativeHeader = accessor.getNativeHeader("Authorization");
System.out.println(nativeHeader);
}
String jwtToken = accessor.getFirstNativeHeader("token");
return message;
}
}
WebSocketConfig主要是配置Websocket的開放端點,使能SockJS連接端點,並且配置Spring自帶的消息代理的各個過濾器前綴/topic、/queue、/exchange 等。這里的消息代理是作為瀏覽器的消息代理,是瀏覽器跟websocket服務端的訂閱及消費關系。
接着配置Socket的通道攔截器SocketChanelInterceptor,與Websocket端點建立連接以及后續通過該通道接收消息,都會經過該攔截器。在連接/發送消息的時候,可以通過添加header頭,來實現websocket的鑒權。
2.3 編寫Websocket的業務消費者
/**
* @author huangyizeng
* @description 考試用戶消息消費者
* @date 2021/8/26
**/
@Slf4j
@Component
public class SendToExamUserConsumer{
@Resource
private SimpMessagingTemplate template;
@Value("${rocketmq.consumer.group.message.sendToExamUser}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.topic.message}")
private String topic;
@Value("${rocketmq.topic.message.tag.sendToExamUser}")
private String selectorExpression;
@PostConstruct
public void init() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(topic, selectorExpression);
} catch (MQClientException e) {
e.printStackTrace();
}
//設置一個Listener,主要進行消息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
String messageBody = new String(messageExt.getBody());
JSONObject jsonObject = JSONObject.parseObject(messageBody);
WebsocketMsgResp websocketMsgResp = jsonObject.toJavaObject(WebsocketMsgResp.class);
String destination = "/queue/examUserDetailId_" + websocketMsgResp.getExamUserDetailId();
template.convertAndSend(destination, websocketMsgResp);
log.info("考試用戶消息消費成功, message={}", messageBody);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("考試用戶消息消費失敗, message={}", msgs, e);
throw new ServiceException(ApiCode.FAILURE, "考試用戶消息消費失敗", e);
}
}
});
// 調用start()方法啟動consumer
try {
consumer.start();
} catch (MQClientException e) {
log.error("SendToExamUserConsumer 啟動失敗", e);
}
}
}
這里主要是定義了一個業務的RocketMQ消費者,消息模式設為廣播模式,這樣當websocket服務集群化部署時,只要有一條消息生產過來,那每一個websocket服務都能夠消費到。避免了分布式下Websocket的消息推送問題。
2.4 js代碼實現
<script src="/js/websocket.js"></script>
<script src="/js/jquery.min.js"></script>
<script src="/js/sockjs.min.js"></script>
<script src="/js/stomp.min.js"></script>
function connect(url) {
var host = window.location.host; // 帶有端口號
userId = GetQueryString("userId");
var socket = new SockJS("http://localhost:9000/api/exam/websocket?access_token=4c819e0f-a0a0-448a-8f79-9b9a538f5837", null, {timeout : 10000});
stompClient = Stomp.over(socket);
stompClient.connect({"Authorization" : "Bearer 5807a5eb-dbf1-4ac7-8c40-8aeaa25ccf65"}, function (frame) {
writeToScreen("connected: " + frame);
stompClient.subscribe("/queue/examUserDetailId_" + userId, function (response) {
writeToScreen(response.body);
});
}, function (error) {
}
)
}
引入相關js包后,新建SockJS對象后,調用stompClient.connect發起websocket服務連接,通過stompClient.subscribe發起跟websocket服務的消息訂閱,后續websocket服務發送到具體topic下的消息,都會發送到對應的瀏覽器那里。
3、技術難點
3.1 網關鑒權
現在系統架構中,所有需要鑒權的接口都會在gateway中進行鑒權,並且通過在header中,添加對應的Authrization屬性來傳參通過OAuth2的鑒權,而websocket的info接口(比如/api/exam/websocket 是端點,那么在發起websocket連接之前,會先發送一個/api/exam/websocket/info接口查看當前websocket服務是否存在)是沒辦法將token放在header進行調用的,只能通過/info?token=xxx 的形式來發起。
而OAuth默認是先從Header中獲取token,接着再從url中的參數獲取token。此時我們只能通過url傳遞token過去,但是OAuth默認是不允許從url進行獲取的,所以需要手動設置成允許。
優先從header中獲取,其次是從參數中獲取。
OAuth2AuthenticationProcessingFilter.doFilter
設置成允許從參數中獲取token
// token轉換器
ServerBearerTokenAuthenticationConverter converter = new ServerBearerTokenAuthenticationConverter();
converter.setAllowUriQueryParameter(true); // 設置成允許從參數中獲取token
3.2 前端聯調問題
3.2.1 WebSocket is closed before the connection is established.
主要是由於前端本地webpack的代理沒有開啟ws 協議的支持。
3.2.2 Error during WebSocket handshake: Unexpected response code: 400
前端經過nginx代理,而nginx 不支持http請求的升級,而Websocket是首先發送一個http請求,然后將該請求升級成Websocket請求,所以需要添加支持配置:
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";