1、Websocket场景
客户端和服务器需要以高频率和低延迟交换事件。 对时间延迟都非常敏感,并且还需要以高频率交换各种各样的消息。HTML5规范中的(有 Web TCP 之称的) WebSocket ,就是一种高效节能的双向通信机制来保证数据的实时传输。
2、运行机制
WebSocket 是 HTML5 一种新的协议。建立在 TCP 之上,实现客户端和服务端全双工异步通信
- WebSocket 是一种双向通信协议,WebSocket 服务器和 Browser/Client Agent 都能主动的向对方发送或接收数据;
- WebSocket 需要类似 TCP 的客户端和服务器端通过握手连接,连接成功后才能相互通信。
HTTP 请求响应客户端服务器交互图
WebSocket 请求响应客户端服务器交互图
一旦 WebSocket 连接建立后,后续数据都以帧序列的形式传输。在客户端断开 WebSocket 连接或 Server 端断掉连接前,不需要客户端和服务端重新发起连接请求,这样保证websocket的性能优势,实时性优势明显
Spring 内嵌的简单消息代理 和 消息流程图
Simple Broker
Spring 内置简单消息代理。这个代理处理来自客户端的订阅请求,将它们存储在内存中,并将消息广播到具有匹配目标的连接客户端。
消息流程图
消息通道说明如下:
- "clientInboundChannel" — 用于传输从webSocket客户端接收的消息
- "clientOutboundChannel" — 用于传输向webSocket客户端发送的消息
- "brokerChannel" — 用于传输从服务器端应用程序代码向消息代理发送消息
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
RequestMessage: 浏览器向服务端请求的消息
@Data public class RequestMessage { private String name; }
RespMessage: 服务端返回给浏览器的消息
@Data public class RespMessage { private String message; }
@Controller类
- broadcastIndex()方法:使用 @RequestMapping转到的页面
- broadcast()方法上的注解说明
- @MessageMapping:指定要接收消息的地址,类似@RequestMapping
- @SendTo默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”}
@Slf4j @Controller public class BroadcastCtl { private AtomicInteger counter = new AtomicInteger(0); @RequestMapping("/broadcast/index") public String broadcastIndex(HttpServletRequest request) { log.info("远程的地址:{}", request.getRemoteAddr()); return "index"; } @MessageMapping("/receive") @SendTo("/topic/getResponse") public RespMessage broadcast(RequestMessage requestMessage) { log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage)); RespMessage respMessage = new RespMessage(); String msg = String.format("接收到的消息:%s条", counter.incrementAndGet()); respMessage.setMessage(msg); return respMessage; } }
配置消息代理
默认情况下使用内置的消息代理。 类上的注解@EnableWebSocketMessageBroker:此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping
- 在方法registerStompEndpoints()里addEndpoint方法:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址;withSockJS:指定端点使用SockJS协议
- 在方法configureMessageBroker()里设置简单消息代理,并配置消息的发送的地址符合配置的前缀的消息才发送到这个broker
@Configuration @EnableWebSocketMessageBroker public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer { /** * 添加STOMP协议的端点。 * 这个HTTP URL是供WebSocket或SockJS客户端访问的地址; * withSockJS:指定端点使用SockJS协议 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/websocket-simple") .setAllowedOrigins("*") .withSockJS(); } /** * 配置消息代理 * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); } }
前端stomp、sockjs的配置
SockJS sockjs是websocket协议的实现,增加对浏览器不支持websocket的时候的兼容支持 SockJS的支持的传输的协议有3类: WebSocket, HTTP Streaming, and HTTP Long Polling。默认使用websocket,如果浏览器不支持websocket,则使用后两种的方式。 SockJS使用"Get /info"从服务端获取基本信息。然后客户端会决定使用哪种传输方式。如果浏览器使用websocket,则使用websocket。如果不能,则使用Http Streaming,如果还不行,则最后使用 HTTP Long Polling
<!-- jquery --> <script src="/websocket/jquery.js"></script> <!-- stomp协议的客户端脚本 --> <script src="/websocket/stomp.js"></script> <!-- SockJS的客户端脚本 --> <script src="/websocket/sockjs.js"></script>
前端访问websocket,重要代码说明如下:
- var socket = new SockJS('/websocket-simple'):websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
- stompClient.subscribe('/topic/getResponse', function(respnose){ … }): 客户端订阅消息的目的地址:此值和BroadcastCtl中的@SendTo("/topic/getResponse")注解的配置的值相同
- stompClient.send("/receive", {}, JSON.stringify({ 'name': name })): 客户端消息发送的目的地址:服务端使用BroadcastCtl中@MessageMapping("/receive")注解的方法来处理发送过来的消息
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"> <title>Web Socket</title> </head> <body> <div> <div> <button id="connect" onclick="connect()">连接</button> <button id="disconnect" disabled="disabled" onclick="disconnect()">断开连接</button> </div> <div id="conversation"> <label>输入你的名字</label><input type="text" id="name"/> <button id="sendName" onclick="sendName();">发送</button> <p id="response"></p> </div> </div> <script src="/static/js/jquery.js"></script> <script src="/static/js/stomp.js"></script> <script src="/static/js/sockjs.js"></script> <script type="text/javascript"> var stompClient=null; function setConnected(connected){ document.getElementById('connect').disabled=connected; document.getElementById('disconnect').disabled=!connected; document.getElementById('conversation').style.visibility=connected?'visible':'hidden'; $('#response').html(); } function connect(){ // websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中 // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址 var socket=new SockJS('/websocket-simple'); stompClient=Stomp.over(socket); stompClient.connect({},function(frame){ setConnected(true); console.log('Connected:'+frame); stompClient.subscribe('/topic/getResponse',function(response){ showResponse(JSON.parse(response.body).message); }); }); } function sendName(){ var name=$('#name').val(); // 客户端消息发送的目的:服务端使用BroadcastCtl // 中@MessageMapping("/receive")注解的方法来处理发送过来的消息 stompClient.send('/receive',{},JSON.stringify({'name':name})); } window.onload = function(){ if(stompClient !=null){ stompClient.disconnet(); } setConnected(false); console.log('disconnected'); } function showResponse(message){ var response=$('#response'); response.html(message+ "\r\n" +response.html()); } </script> </body> </html>
拦截器HandshakeInterceptor和ChannelInterceptor
websocket配置拦截器,默认有两种:
- HandshakeInterceptor:拦截websocket的握手请求。在服务端和客户端在进行握手时会被执行
- ChannelInterceptor:拦截Message。可以在Message对被在发送到MessageChannel前后查看修改此值,可以在MessageChannel接收MessageChannel对象前后修改此值
@Slf4j @Component public class MyHandShakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { log.info("{} http协议转换websocket协议前URI:{}", getClass().getCanonicalName(), request.getURI()); // http协议转换websoket协议进行前,可以在这里通过session信息判断用户登录是否合法 return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { log.info("{}握手成功后...", getClass().getCanonicalName()); } }
ChannelInterceptor
拦截器中使用StompHeaderAccessor 或 SimpMessageHeaderAccessor访问消息
@Slf4j @Component public class MyChannelInterceptorAdapter implements ChannelInterceptor { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { log.info("{} pre receive", getClass().getCanonicalName()); return true; } @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { log.info("{} preSend", getClass().getCanonicalName()); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); log.info("{}用户订阅的目的地={}", getClass().getCanonicalName(), accessor.getDestination()); return message; } @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { log.info("{} after Send Completion", getClass().getCanonicalName()); StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)) { log.info("{}订阅消息发送成功", getClass().getCanonicalName()); simpMessagingTemplate.convertAndSend("/topic/getResponse", "消息发送成功"); } /** * 如果用户断开连接 */ if (StompCommand.DISCONNECT.equals(command)) { log.info("{}用户断开连接成功", getClass().getCanonicalName()); simpMessagingTemplate.convertAndSend("/topic/getResponse", "{'msg':'用户断开连接成功'}"); } } }
在WebSocketMessageBrokerConfigurer中配置拦截器
在registerStompEndpoints()方法中通过registry.addInterceptors(myHandShakeInterceptor)添加自定义HandShkeInceptor 拦截
在configureClientInboundChannel()方法中registration.setInterceptors(myChannelInterceptorAdapter)添加ChannelInterceptor拦截器
@Configuration @EnableWebSocketMessageBroker public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer { @Autowired private HandshakeInterceptor myHandShakeInterceptor; private ChannelInterceptor myChannelInterceptorAdapter = new MyChannelInterceptorAdapter(); /** * 添加STOMP协议的端点。 * 这个HTTP URL是供WebSocket或SockJS客户端访问的地址; * withSockJS:指定端点使用SockJS协议 * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { String[] paths = new String[]{"/websocket-simple", "/websocket-single"}; registry.addEndpoint(paths) .setAllowedOrigins("*") .addInterceptors(myHandShakeInterceptor) .withSockJS(); } /** * 配置消息代理 * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker * * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(myChannelInterceptorAdapter); } }
@SendTo和@SendToUser用法
@SendTo会将消息推送到所有订阅此消息的连接,即订阅/发布模式。@SendToUser只将消息推送到特定的一个订阅者,即点对点模式
@RequestMapping("/broadcast/single") public String broadcastSingle(HttpServletRequest request) { log.info("远程的地址:{}", request.getRemoteAddr()); return "single"; } @MessageMapping("/receive-single") @SendToUser("/topic/getResponse") public RespMessage broadcastSingle(RequestMessage requestMessage) { log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage)); RespMessage respMessage = new RespMessage(); String msg = String.format("接收到的消息:%s条", counter.incrementAndGet()); respMessage.setMessage(msg); return respMessage; }
single.html页面
<!DOCTYPE html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="UTF-8"> <title>Web Socket</title> </head> <body> <div> <div> <button id="connect" onclick="connect()">连接</button> <button id="disconnect" disabled="disabled" onclick="disconnect()">断开连接</button> </div> <div id="conversation"> <label>输入你的名字</label><input type="text" id="name"/> <button id="sendName" onclick="sendName();">发送</button> <p id="response"></p> </div> </div> <script src="/static/js/jquery.js"></script> <script src="/static/js/stomp.js"></script> <script src="/static/js/sockjs.js"></script> <script type="text/javascript"> var stompClient=null; function setConnected(connected){ document.getElementById('connect').disabled=connected; document.getElementById('disconnect').disabled=!connected; document.getElementById('conversation').style.visibility=connected?'visible':'hidden'; $('#response').html(); } function connect(){ // websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中 // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址 var socket=new SockJS('/websocket-single'); stompClient=Stomp.over(socket); stompClient.connect({},function(frame){ setConnected(true); console.log('Connected:'+frame); stompClient.subscribe('/user/topic/getResponse',function(response){ showResponse(JSON.parse(response.body).message); }); }); } function disconnect() { if (stompClient != null) { stompClient.disconnect(); } setConnected(false); console.log("Disconnected"); } function sendName(){ var name=$('#name').val(); // 客户端消息发送的目的:服务端使用BroadcastCtl // 中@MessageMapping("/receive")注解的方法来处理发送过来的消息 stompClient.send('/receive-single',{},JSON.stringify({'name':name})); } window.onload = function(){ if(stompClient !=null){ stompClient.disconnet(); } setConnected(false); console.log('disconnected'); } function showResponse(message){ var response=$('#response'); response.html(message+ "\r\n" +response.html()); } </script> </body> </html>
参考: