Spring boot集成WebSocket简单消息代理


1、Websocket场景

  客户端和服务器需要以高频率和低延迟交换事件。 对时间延迟都非常敏感,并且还需要以高频率交换各种各样的消息。HTML5规范中的(有 Web TCP 之称的) WebSocket ,就是一种高效节能的双向通信机制来保证数据的实时传输。

2、运行机制

  WebSocket 是 HTML5 一种新的协议。建立在 TCP 之上,实现客户端和服务端全双工异步通信

  • WebSocket 是一种双向通信协议,WebSocket 服务器和 Browser/Client Agent 都能主动的向对方发送或接收数据;
  • WebSocket 需要类似 TCP 的客户端和服务器端通过握手连接,连接成功后才能相互通信。

  HTTP 请求响应客户端服务器交互图

  

 

 

   WebSocket 请求响应客户端服务器交互图

  

 

 

   一旦 WebSocket 连接建立后,后续数据都以帧序列的形式传输。在客户端断开 WebSocket 连接或 Server 端断掉连接前,不需要客户端和服务端重新发起连接请求,这样保证websocket的性能优势,实时性优势明显

Spring 内嵌的简单消息代理 和 消息流程图

Simple Broker

  Spring 内置简单消息代理。这个代理处理来自客户端的订阅请求,将它们存储在内存中,并将消息广播到具有匹配目标的连接客户端。

消息流程图

  

 

 

 消息通道说明如下:

  • "clientInboundChannel" — 用于传输从webSocket客户端接收的消息
  • "clientOutboundChannel" — 用于传输向webSocket客户端发送的消息
  • "brokerChannel" — 用于传输从服务器端应用程序代码向消息代理发送消息
boot集成websocket
pom.xml依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

RequestMessage: 浏览器向服务端请求的消息

@Data
public class RequestMessage {
    private String name;
}

RespMessage: 服务端返回给浏览器的消息

@Data
public class RespMessage {
    private String message;
}

@Controller类

  • broadcastIndex()方法:使用 @RequestMapping转到的页面
  • broadcast()方法上的注解说明
    • @MessageMapping:指定要接收消息的地址,类似@RequestMapping
    • @SendTo默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”}
@Slf4j
@Controller
public class BroadcastCtl {

    private AtomicInteger counter = new AtomicInteger(0);

    @RequestMapping("/broadcast/index")
    public String broadcastIndex(HttpServletRequest request) {
        log.info("远程的地址:{}", request.getRemoteAddr());
        return "index";
    }

    @MessageMapping("/receive")
    @SendTo("/topic/getResponse")
    public RespMessage broadcast(RequestMessage requestMessage) {
        log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage));
        RespMessage respMessage = new RespMessage();
        String msg = String.format("接收到的消息:%s条", counter.incrementAndGet());
        respMessage.setMessage(msg);
        return respMessage;
    }
}

配置消息代理

  默认情况下使用内置的消息代理。 类上的注解@EnableWebSocketMessageBroker:此注解表示使用STOMP协议来传输基于消息代理的消息,此时可以在@Controller类中使用@MessageMapping

  • 在方法registerStompEndpoints()里addEndpoint方法:添加STOMP协议的端点。这个HTTP URL是供WebSocket或SockJS客户端访问的地址;withSockJS:指定端点使用SockJS协议
  • 在方法configureMessageBroker()里设置简单消息代理,并配置消息的发送的地址符合配置的前缀的消息才发送到这个broker
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer {
    /**
     * 添加STOMP协议的端点。
     * 这个HTTP URL是供WebSocket或SockJS客户端访问的地址;
     * withSockJS:指定端点使用SockJS协议
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket-simple")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    /**
     * 配置消息代理
     * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker
     *
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue");
    }

}

前端stomp、sockjs的配置

  SockJS sockjs是websocket协议的实现,增加对浏览器不支持websocket的时候的兼容支持 SockJS的支持的传输的协议有3类: WebSocket, HTTP Streaming, and HTTP Long Polling。默认使用websocket,如果浏览器不支持websocket,则使用后两种的方式。 SockJS使用"Get /info"从服务端获取基本信息。然后客户端会决定使用哪种传输方式。如果浏览器使用websocket,则使用websocket。如果不能,则使用Http Streaming,如果还不行,则最后使用 HTTP Long Polling

  引入相关的stomp.js、sockjs.js、jquery.js
<!-- jquery  -->
<script src="/websocket/jquery.js"></script>
<!-- stomp协议的客户端脚本 -->
<script src="/websocket/stomp.js"></script>
<!-- SockJS的客户端脚本 -->
<script src="/websocket/sockjs.js"></script>

  前端访问websocket,重要代码说明如下:

  • var socket = new SockJS('/websocket-simple'):websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
  • stompClient.subscribe('/topic/getResponse', function(respnose){ … }): 客户端订阅消息的目的地址:此值和BroadcastCtl中的@SendTo("/topic/getResponse")注解的配置的值相同
  • stompClient.send("/receive", {}, JSON.stringify({ 'name': name })): 客户端消息发送的目的地址:服务端使用BroadcastCtl中@MessageMapping("/receive")注解的方法来处理发送过来的消息
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Web Socket</title>
</head>
<body>
<div>
    <div>
        <button id="connect" onclick="connect()">连接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect()">断开连接</button>
    </div>
    <div id="conversation">
        <label>输入你的名字</label><input type="text" id="name"/>
        <button id="sendName" onclick="sendName();">发送</button>
        <p id="response"></p>
    </div>
</div>
<script src="/static/js/jquery.js"></script>
<script src="/static/js/stomp.js"></script>
<script src="/static/js/sockjs.js"></script>
<script type="text/javascript">
    var stompClient=null;

    function setConnected(connected){
        document.getElementById('connect').disabled=connected;
        document.getElementById('disconnect').disabled=!connected;
        document.getElementById('conversation').style.visibility=connected?'visible':'hidden';
        $('#response').html();
    }

    function connect(){
        // websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中
        // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
        var socket=new SockJS('/websocket-simple');
        stompClient=Stomp.over(socket);
        stompClient.connect({},function(frame){
            setConnected(true);
            console.log('Connected:'+frame);
            stompClient.subscribe('/topic/getResponse',function(response){
                showResponse(JSON.parse(response.body).message);
            });
        });
    }

    function sendName(){
       var name=$('#name').val();
       // 客户端消息发送的目的:服务端使用BroadcastCtl
       // 中@MessageMapping("/receive")注解的方法来处理发送过来的消息
       stompClient.send('/receive',{},JSON.stringify({'name':name}));
    }

    window.onload = function(){
      if(stompClient !=null){
         stompClient.disconnet();
      }
      setConnected(false);
      console.log('disconnected');
    }

    function showResponse(message){
      var response=$('#response');
      response.html(message+ "\r\n" +response.html());
    }
</script>
</body>
</html>

拦截器HandshakeInterceptor和ChannelInterceptor

  websocket配置拦截器,默认有两种:

  • HandshakeInterceptor:拦截websocket的握手请求。在服务端和客户端在进行握手时会被执行
  • ChannelInterceptor:拦截Message。可以在Message对被在发送到MessageChannel前后查看修改此值,可以在MessageChannel接收MessageChannel对象前后修改此值
    拦截websocket的握手请求。实现接口 HandshakeInterceptor或继承类DefaultHandshakeHandler
   HttpSessionHandshakeInterceptor:关于httpSession的操作,拦截器用来管理握手和握手后的事情,可以通过请求信息,比如token、或者session判用户是否可以连接,这样就能够防范非法用户 OriginHandshakeInterceptor:检查Origin头字段的合法性
        自定义HandshakeInterceptor
@Slf4j
@Component
public class MyHandShakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        log.info("{} http协议转换websocket协议前URI:{}", getClass().getCanonicalName(), request.getURI());
        // http协议转换websoket协议进行前,可以在这里通过session信息判断用户登录是否合法
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        log.info("{}握手成功后...", getClass().getCanonicalName());
    }
}

   ChannelInterceptor

  拦截器中使用StompHeaderAccessor 或 SimpMessageHeaderAccessor访问消息

@Slf4j
@Component
public class MyChannelInterceptorAdapter implements ChannelInterceptor {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        log.info("{} pre receive", getClass().getCanonicalName());
        return true;
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        log.info("{} preSend", getClass().getCanonicalName());
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        log.info("{}用户订阅的目的地={}", getClass().getCanonicalName(), accessor.getDestination());
        return message;
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        log.info("{} after Send Completion", getClass().getCanonicalName());
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)) {
            log.info("{}订阅消息发送成功", getClass().getCanonicalName());
            simpMessagingTemplate.convertAndSend("/topic/getResponse", "消息发送成功");
        }
        /**
         * 如果用户断开连接
         */
        if (StompCommand.DISCONNECT.equals(command)) {
            log.info("{}用户断开连接成功", getClass().getCanonicalName());
            simpMessagingTemplate.convertAndSend("/topic/getResponse", "{'msg':'用户断开连接成功'}");
        }
    }
}

  在WebSocketMessageBrokerConfigurer中配置拦截器

       在registerStompEndpoints()方法中通过registry.addInterceptors(myHandShakeInterceptor)添加自定义HandShkeInceptor 拦截

       在configureClientInboundChannel()方法中registration.setInterceptors(myChannelInterceptorAdapter)添加ChannelInterceptor拦截器

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private HandshakeInterceptor myHandShakeInterceptor;

    private ChannelInterceptor myChannelInterceptorAdapter = new MyChannelInterceptorAdapter();

    /**
     * 添加STOMP协议的端点。
     * 这个HTTP URL是供WebSocket或SockJS客户端访问的地址;
     * withSockJS:指定端点使用SockJS协议
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        String[] paths = new String[]{"/websocket-simple", "/websocket-single"};
        registry.addEndpoint(paths)
                .setAllowedOrigins("*")
                .addInterceptors(myHandShakeInterceptor)
                .withSockJS();
    }

    /**
     * 配置消息代理
     * 启动简单Broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker
     *
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptorAdapter);
    }
}

  @SendTo和@SendToUser用法

  @SendTo会将消息推送到所有订阅此消息的连接,即订阅/发布模式。@SendToUser只将消息推送到特定的一个订阅者,即点对点模式

    @RequestMapping("/broadcast/single")
    public String broadcastSingle(HttpServletRequest request) {
        log.info("远程的地址:{}", request.getRemoteAddr());
        return "single";
    }

    @MessageMapping("/receive-single")
    @SendToUser("/topic/getResponse")
    public RespMessage broadcastSingle(RequestMessage requestMessage) {
        log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage));
        RespMessage respMessage = new RespMessage();
        String msg = String.format("接收到的消息:%s条", counter.incrementAndGet());
        respMessage.setMessage(msg);
        return respMessage;
    }

  single.html页面

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Web Socket</title>
</head>
<body>
<div>
    <div>
        <button id="connect" onclick="connect()">连接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect()">断开连接</button>
    </div>
    <div id="conversation">
        <label>输入你的名字</label><input type="text" id="name"/>
        <button id="sendName" onclick="sendName();">发送</button>
        <p id="response"></p>
    </div>
</div>
<script src="/static/js/jquery.js"></script>
<script src="/static/js/stomp.js"></script>
<script src="/static/js/sockjs.js"></script>
<script type="text/javascript">
    var stompClient=null;

    function setConnected(connected){
        document.getElementById('connect').disabled=connected;
        document.getElementById('disconnect').disabled=!connected;
        document.getElementById('conversation').style.visibility=connected?'visible':'hidden';
        $('#response').html();
    }

    function connect(){
        // websocket的连接地址,此值等于WebSocketMessageBrokerConfigurer中
        // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
        var socket=new SockJS('/websocket-single');
        stompClient=Stomp.over(socket);
        stompClient.connect({},function(frame){
            setConnected(true);
            console.log('Connected:'+frame);
            stompClient.subscribe('/user/topic/getResponse',function(response){
                showResponse(JSON.parse(response.body).message);
            });
        });
    }

    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnected(false);
        console.log("Disconnected");
    }

    function sendName(){
       var name=$('#name').val();
       // 客户端消息发送的目的:服务端使用BroadcastCtl
       // 中@MessageMapping("/receive")注解的方法来处理发送过来的消息
       stompClient.send('/receive-single',{},JSON.stringify({'name':name}));
    }

    window.onload = function(){
      if(stompClient !=null){
         stompClient.disconnet();
      }
      setConnected(false);
      console.log('disconnected');
    }

    function showResponse(message){
      var response=$('#response');
      response.html(message+ "\r\n" +response.html());
    }
</script>
</body>
</html>

      参考: 

      https://www.jianshu.com/p/775920c21766   

      https://juejin.im/post/5ac8cd5c6fb9a028dd4e7ba6


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM