1、Websocket場景
客戶端和服務器需要以高頻率和低延遲交換事件。 對時間延遲都非常敏感,並且還需要以高頻率交換各種各樣的消息。HTML5規范中的(有 Web TCP 之稱的) WebSocket ,就是一種高效節能的雙向通信機制來保證數據的實時傳輸。
2、運行機制
WebSocket 是 HTML5 一種新的協議。建立在 TCP 之上,實現客戶端和服務端全雙工異步通信
- WebSocket 是一種雙向通信協議,WebSocket 服務器和 Browser/Client Agent 都能主動的向對方發送或接收數據;
- WebSocket 需要類似 TCP 的客戶端和服務器端通過握手連接,連接成功后才能相互通信。
HTTP 請求響應客戶端服務器交互圖

WebSocket 請求響應客戶端服務器交互圖

一旦 WebSocket 連接建立后,后續數據都以幀序列的形式傳輸。在客戶端斷開 WebSocket 連接或 Server 端斷掉連接前,不需要客戶端和服務端重新發起連接請求,這樣保證websocket的性能優勢,實時性優勢明顯
Spring 內嵌的簡單消息代理 和 消息流程圖
Simple Broker
Spring 內置簡單消息代理。這個代理處理來自客戶端的訂閱請求,將它們存儲在內存中,並將消息廣播到具有匹配目標的連接客戶端。
消息流程圖

消息通道說明如下:
- "clientInboundChannel" — 用於傳輸從webSocket客戶端接收的消息
- "clientOutboundChannel" — 用於傳輸向webSocket客戶端發送的消息
- "brokerChannel" — 用於傳輸從服務器端應用程序代碼向消息代理發送消息
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
RequestMessage: 瀏覽器向服務端請求的消息
@Data public class RequestMessage { private String name; }
RespMessage: 服務端返回給瀏覽器的消息
@Data public class RespMessage { private String message; }
@Controller類
- broadcastIndex()方法:使用 @RequestMapping轉到的頁面
- broadcast()方法上的注解說明
- @MessageMapping:指定要接收消息的地址,類似@RequestMapping
- @SendTo默認消息將被發送到與傳入消息相同的目的地,但是目的地前面附加前綴(默認情況下為“/topic”}
@Slf4j @Controller public class BroadcastCtl { private AtomicInteger counter = new AtomicInteger(0); @RequestMapping("/broadcast/index") public String broadcastIndex(HttpServletRequest request) { log.info("遠程的地址:{}", request.getRemoteAddr()); return "index"; } @MessageMapping("/receive") @SendTo("/topic/getResponse") public RespMessage broadcast(RequestMessage requestMessage) { log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage)); RespMessage respMessage = new RespMessage(); String msg = String.format("接收到的消息:%s條", counter.incrementAndGet()); respMessage.setMessage(msg); return respMessage; } }
配置消息代理
默認情況下使用內置的消息代理。 類上的注解@EnableWebSocketMessageBroker:此注解表示使用STOMP協議來傳輸基於消息代理的消息,此時可以在@Controller類中使用@MessageMapping
- 在方法registerStompEndpoints()里addEndpoint方法:添加STOMP協議的端點。這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址;withSockJS:指定端點使用SockJS協議
- 在方法configureMessageBroker()里設置簡單消息代理,並配置消息的發送的地址符合配置的前綴的消息才發送到這個broker
@Configuration @EnableWebSocketMessageBroker public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer { /** * 添加STOMP協議的端點。 * 這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址; * withSockJS:指定端點使用SockJS協議 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/websocket-simple") .setAllowedOrigins("*") .withSockJS(); } /** * 配置消息代理 * 啟動簡單Broker,消息的發送的地址符合配置的前綴來的消息才發送到這個broker * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); } }
前端stomp、sockjs的配置
SockJS sockjs是websocket協議的實現,增加對瀏覽器不支持websocket的時候的兼容支持 SockJS的支持的傳輸的協議有3類: WebSocket, HTTP Streaming, and HTTP Long Polling。默認使用websocket,如果瀏覽器不支持websocket,則使用后兩種的方式。 SockJS使用"Get /info"從服務端獲取基本信息。然后客戶端會決定使用哪種傳輸方式。如果瀏覽器使用websocket,則使用websocket。如果不能,則使用Http Streaming,如果還不行,則最后使用 HTTP Long Polling
<!-- jquery --> <script src="/websocket/jquery.js"></script> <!-- stomp協議的客戶端腳本 --> <script src="/websocket/stomp.js"></script> <!-- SockJS的客戶端腳本 --> <script src="/websocket/sockjs.js"></script>
前端訪問websocket,重要代碼說明如下:
- var socket = new SockJS('/websocket-simple'):websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
- stompClient.subscribe('/topic/getResponse', function(respnose){ … }): 客戶端訂閱消息的目的地址:此值和BroadcastCtl中的@SendTo("/topic/getResponse")注解的配置的值相同
- stompClient.send("/receive", {}, JSON.stringify({ 'name': name })): 客戶端消息發送的目的地址:服務端使用BroadcastCtl中@MessageMapping("/receive")注解的方法來處理發送過來的消息
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"> <title>Web Socket</title> </head> <body> <div> <div> <button id="connect" onclick="connect()">連接</button> <button id="disconnect" disabled="disabled" onclick="disconnect()">斷開連接</button> </div> <div id="conversation"> <label>輸入你的名字</label><input type="text" id="name"/> <button id="sendName" onclick="sendName();">發送</button> <p id="response"></p> </div> </div> <script src="/static/js/jquery.js"></script> <script src="/static/js/stomp.js"></script> <script src="/static/js/sockjs.js"></script> <script type="text/javascript"> var stompClient=null; function setConnected(connected){ document.getElementById('connect').disabled=connected; document.getElementById('disconnect').disabled=!connected; document.getElementById('conversation').style.visibility=connected?'visible':'hidden'; $('#response').html(); } function connect(){ // websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中 // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址 var socket=new SockJS('/websocket-simple'); stompClient=Stomp.over(socket); stompClient.connect({},function(frame){ setConnected(true); console.log('Connected:'+frame); stompClient.subscribe('/topic/getResponse',function(response){ showResponse(JSON.parse(response.body).message); }); }); } function sendName(){ var name=$('#name').val(); // 客戶端消息發送的目的:服務端使用BroadcastCtl // 中@MessageMapping("/receive")注解的方法來處理發送過來的消息 stompClient.send('/receive',{},JSON.stringify({'name':name})); } window.onload = function(){ if(stompClient !=null){ stompClient.disconnet(); } setConnected(false); console.log('disconnected'); } function showResponse(message){ var response=$('#response'); response.html(message+ "\r\n" +response.html()); } </script> </body> </html>
攔截器HandshakeInterceptor和ChannelInterceptor
websocket配置攔截器,默認有兩種:
- HandshakeInterceptor:攔截websocket的握手請求。在服務端和客戶端在進行握手時會被執行
- ChannelInterceptor:攔截Message。可以在Message對被在發送到MessageChannel前后查看修改此值,可以在MessageChannel接收MessageChannel對象前后修改此值
@Slf4j @Component public class MyHandShakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { log.info("{} http協議轉換websocket協議前URI:{}", getClass().getCanonicalName(), request.getURI()); // http協議轉換websoket協議進行前,可以在這里通過session信息判斷用戶登錄是否合法 return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { log.info("{}握手成功后...", getClass().getCanonicalName()); } }
ChannelInterceptor
攔截器中使用StompHeaderAccessor 或 SimpMessageHeaderAccessor訪問消息
@Slf4j @Component public class MyChannelInterceptorAdapter implements ChannelInterceptor { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { log.info("{} pre receive", getClass().getCanonicalName()); return true; } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { log.info("{} preSend", getClass().getCanonicalName()); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); log.info("{}用戶訂閱的目的地={}", getClass().getCanonicalName(), accessor.getDestination()); return message; } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { log.info("{} after Send Completion", getClass().getCanonicalName()); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)) { log.info("{}訂閱消息發送成功", getClass().getCanonicalName()); simpMessagingTemplate.convertAndSend("/topic/getResponse", "消息發送成功"); } /** * 如果用戶斷開連接 */ if (StompCommand.DISCONNECT.equals(command)) { log.info("{}用戶斷開連接成功", getClass().getCanonicalName()); simpMessagingTemplate.convertAndSend("/topic/getResponse", "{'msg':'用戶斷開連接成功'}"); } } }
在WebSocketMessageBrokerConfigurer中配置攔截器
在registerStompEndpoints()方法中通過registry.addInterceptors(myHandShakeInterceptor)添加自定義HandShkeInceptor 攔截
在configureClientInboundChannel()方法中registration.setInterceptors(myChannelInterceptorAdapter)添加ChannelInterceptor攔截器
@Configuration @EnableWebSocketMessageBroker public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer { @Autowired private HandshakeInterceptor myHandShakeInterceptor; private ChannelInterceptor myChannelInterceptorAdapter = new MyChannelInterceptorAdapter(); /** * 添加STOMP協議的端點。 * 這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址; * withSockJS:指定端點使用SockJS協議 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { String[] paths = new String[]{"/websocket-simple", "/websocket-single"}; registry.addEndpoint(paths) .setAllowedOrigins("*") .addInterceptors(myHandShakeInterceptor) .withSockJS(); } /** * 配置消息代理 * 啟動簡單Broker,消息的發送的地址符合配置的前綴來的消息才發送到這個broker * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptorAdapter); } }
@SendTo和@SendToUser用法
@SendTo會將消息推送到所有訂閱此消息的連接,即訂閱/發布模式。@SendToUser只將消息推送到特定的一個訂閱者,即點對點模式
@RequestMapping("/broadcast/single")
public String broadcastSingle(HttpServletRequest request) {
log.info("遠程的地址:{}", request.getRemoteAddr());
return "single";
}
@MessageMapping("/receive-single")
@SendToUser("/topic/getResponse")
public RespMessage broadcastSingle(RequestMessage requestMessage) {
log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage));
RespMessage respMessage = new RespMessage();
String msg = String.format("接收到的消息:%s條", counter.incrementAndGet());
respMessage.setMessage(msg);
return respMessage;
}
single.html頁面
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"> <title>Web Socket</title> </head> <body> <div> <div> <button id="connect" onclick="connect()">連接</button> <button id="disconnect" disabled="disabled" onclick="disconnect()">斷開連接</button> </div> <div id="conversation"> <label>輸入你的名字</label><input type="text" id="name"/> <button id="sendName" onclick="sendName();">發送</button> <p id="response"></p> </div> </div> <script src="/static/js/jquery.js"></script> <script src="/static/js/stomp.js"></script> <script src="/static/js/sockjs.js"></script> <script type="text/javascript"> var stompClient=null; function setConnected(connected){ document.getElementById('connect').disabled=connected; document.getElementById('disconnect').disabled=!connected; document.getElementById('conversation').style.visibility=connected?'visible':'hidden'; $('#response').html(); } function connect(){ // websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中 // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址 var socket=new SockJS('/websocket-single'); stompClient=Stomp.over(socket); stompClient.connect({},function(frame){ setConnected(true); console.log('Connected:'+frame); stompClient.subscribe('/user/topic/getResponse',function(response){ showResponse(JSON.parse(response.body).message); }); }); } function disconnect() { if (stompClient != null) { stompClient.disconnect(); } setConnected(false); console.log("Disconnected"); } function sendName(){ var name=$('#name').val(); // 客戶端消息發送的目的:服務端使用BroadcastCtl // 中@MessageMapping("/receive")注解的方法來處理發送過來的消息 stompClient.send('/receive-single',{},JSON.stringify({'name':name})); } window.onload = function(){ if(stompClient !=null){ stompClient.disconnet(); } setConnected(false); console.log('disconnected'); } function showResponse(message){ var response=$('#response'); response.html(message+ "\r\n" +response.html()); } </script> </body> </html>
參考:
