Spring boot集成WebSocket簡單消息代理


1、Websocket場景

  客戶端和服務器需要以高頻率和低延遲交換事件。 對時間延遲都非常敏感,並且還需要以高頻率交換各種各樣的消息。HTML5規范中的(有 Web TCP 之稱的) WebSocket ,就是一種高效節能的雙向通信機制來保證數據的實時傳輸。

2、運行機制

  WebSocket 是 HTML5 一種新的協議。建立在 TCP 之上,實現客戶端和服務端全雙工異步通信

  • WebSocket 是一種雙向通信協議,WebSocket 服務器和 Browser/Client Agent 都能主動的向對方發送或接收數據;
  • WebSocket 需要類似 TCP 的客戶端和服務器端通過握手連接,連接成功后才能相互通信。

  HTTP 請求響應客戶端服務器交互圖

  

 

 

   WebSocket 請求響應客戶端服務器交互圖

  

 

 

   一旦 WebSocket 連接建立后,后續數據都以幀序列的形式傳輸。在客戶端斷開 WebSocket 連接或 Server 端斷掉連接前,不需要客戶端和服務端重新發起連接請求,這樣保證websocket的性能優勢,實時性優勢明顯

Spring 內嵌的簡單消息代理 和 消息流程圖

Simple Broker

  Spring 內置簡單消息代理。這個代理處理來自客戶端的訂閱請求,將它們存儲在內存中,並將消息廣播到具有匹配目標的連接客戶端。

消息流程圖

  

 

 

 消息通道說明如下:

  • "clientInboundChannel" — 用於傳輸從webSocket客戶端接收的消息
  • "clientOutboundChannel" — 用於傳輸向webSocket客戶端發送的消息
  • "brokerChannel" — 用於傳輸從服務器端應用程序代碼向消息代理發送消息
boot集成websocket
pom.xml依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

RequestMessage: 瀏覽器向服務端請求的消息

@Data
public class RequestMessage {
    private String name;
}

RespMessage: 服務端返回給瀏覽器的消息

@Data
public class RespMessage {
    private String message;
}

@Controller類

  • broadcastIndex()方法:使用 @RequestMapping轉到的頁面
  • broadcast()方法上的注解說明
    • @MessageMapping:指定要接收消息的地址,類似@RequestMapping
    • @SendTo默認消息將被發送到與傳入消息相同的目的地,但是目的地前面附加前綴(默認情況下為“/topic”}
@Slf4j
@Controller
public class BroadcastCtl {

    private AtomicInteger counter = new AtomicInteger(0);

    @RequestMapping("/broadcast/index")
    public String broadcastIndex(HttpServletRequest request) {
        log.info("遠程的地址:{}", request.getRemoteAddr());
        return "index";
    }

    @MessageMapping("/receive")
    @SendTo("/topic/getResponse")
    public RespMessage broadcast(RequestMessage requestMessage) {
        log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage));
        RespMessage respMessage = new RespMessage();
        String msg = String.format("接收到的消息:%s條", counter.incrementAndGet());
        respMessage.setMessage(msg);
        return respMessage;
    }
}

配置消息代理

  默認情況下使用內置的消息代理。 類上的注解@EnableWebSocketMessageBroker:此注解表示使用STOMP協議來傳輸基於消息代理的消息,此時可以在@Controller類中使用@MessageMapping

  • 在方法registerStompEndpoints()里addEndpoint方法:添加STOMP協議的端點。這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址;withSockJS:指定端點使用SockJS協議
  • 在方法configureMessageBroker()里設置簡單消息代理,並配置消息的發送的地址符合配置的前綴的消息才發送到這個broker
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer {
    /**
     * 添加STOMP協議的端點。
     * 這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址;
     * withSockJS:指定端點使用SockJS協議
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket-simple")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    /**
     * 配置消息代理
     * 啟動簡單Broker,消息的發送的地址符合配置的前綴來的消息才發送到這個broker
     *
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue");
    }

}

前端stomp、sockjs的配置

  SockJS sockjs是websocket協議的實現,增加對瀏覽器不支持websocket的時候的兼容支持 SockJS的支持的傳輸的協議有3類: WebSocket, HTTP Streaming, and HTTP Long Polling。默認使用websocket,如果瀏覽器不支持websocket,則使用后兩種的方式。 SockJS使用"Get /info"從服務端獲取基本信息。然后客戶端會決定使用哪種傳輸方式。如果瀏覽器使用websocket,則使用websocket。如果不能,則使用Http Streaming,如果還不行,則最后使用 HTTP Long Polling

  引入相關的stomp.js、sockjs.js、jquery.js
<!-- jquery  -->
<script src="/websocket/jquery.js"></script>
<!-- stomp協議的客戶端腳本 -->
<script src="/websocket/stomp.js"></script>
<!-- SockJS的客戶端腳本 -->
<script src="/websocket/sockjs.js"></script>

  前端訪問websocket,重要代碼說明如下:

  • var socket = new SockJS('/websocket-simple'):websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
  • stompClient.subscribe('/topic/getResponse', function(respnose){ … }): 客戶端訂閱消息的目的地址:此值和BroadcastCtl中的@SendTo("/topic/getResponse")注解的配置的值相同
  • stompClient.send("/receive", {}, JSON.stringify({ 'name': name })): 客戶端消息發送的目的地址:服務端使用BroadcastCtl中@MessageMapping("/receive")注解的方法來處理發送過來的消息
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Web Socket</title>
</head>
<body>
<div>
    <div>
        <button id="connect" onclick="connect()">連接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect()">斷開連接</button>
    </div>
    <div id="conversation">
        <label>輸入你的名字</label><input type="text" id="name"/>
        <button id="sendName" onclick="sendName();">發送</button>
        <p id="response"></p>
    </div>
</div>
<script src="/static/js/jquery.js"></script>
<script src="/static/js/stomp.js"></script>
<script src="/static/js/sockjs.js"></script>
<script type="text/javascript">
    var stompClient=null;

    function setConnected(connected){
        document.getElementById('connect').disabled=connected;
        document.getElementById('disconnect').disabled=!connected;
        document.getElementById('conversation').style.visibility=connected?'visible':'hidden';
        $('#response').html();
    }

    function connect(){
        // websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中
        // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
        var socket=new SockJS('/websocket-simple');
        stompClient=Stomp.over(socket);
        stompClient.connect({},function(frame){
            setConnected(true);
            console.log('Connected:'+frame);
            stompClient.subscribe('/topic/getResponse',function(response){
                showResponse(JSON.parse(response.body).message);
            });
        });
    }

    function sendName(){
       var name=$('#name').val();
       // 客戶端消息發送的目的:服務端使用BroadcastCtl
       // 中@MessageMapping("/receive")注解的方法來處理發送過來的消息
       stompClient.send('/receive',{},JSON.stringify({'name':name}));
    }

    window.onload = function(){
      if(stompClient !=null){
         stompClient.disconnet();
      }
      setConnected(false);
      console.log('disconnected');
    }

    function showResponse(message){
      var response=$('#response');
      response.html(message+ "\r\n" +response.html());
    }
</script>
</body>
</html>

攔截器HandshakeInterceptor和ChannelInterceptor

  websocket配置攔截器,默認有兩種:

  • HandshakeInterceptor:攔截websocket的握手請求。在服務端和客戶端在進行握手時會被執行
  • ChannelInterceptor:攔截Message。可以在Message對被在發送到MessageChannel前后查看修改此值,可以在MessageChannel接收MessageChannel對象前后修改此值
    攔截websocket的握手請求。實現接口 HandshakeInterceptor或繼承類DefaultHandshakeHandler
   HttpSessionHandshakeInterceptor:關於httpSession的操作,攔截器用來管理握手和握手后的事情,可以通過請求信息,比如token、或者session判用戶是否可以連接,這樣就能夠防范非法用戶 OriginHandshakeInterceptor:檢查Origin頭字段的合法性
        自定義HandshakeInterceptor
@Slf4j
@Component
public class MyHandShakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        log.info("{} http協議轉換websocket協議前URI:{}", getClass().getCanonicalName(), request.getURI());
        // http協議轉換websoket協議進行前,可以在這里通過session信息判斷用戶登錄是否合法
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
        log.info("{}握手成功后...", getClass().getCanonicalName());
    }
}

   ChannelInterceptor

  攔截器中使用StompHeaderAccessor 或 SimpMessageHeaderAccessor訪問消息

@Slf4j
@Component
public class MyChannelInterceptorAdapter implements ChannelInterceptor {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public boolean preReceive(MessageChannel channel) {
        log.info("{} pre receive", getClass().getCanonicalName());
        return true;
    }

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        log.info("{} preSend", getClass().getCanonicalName());
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        log.info("{}用戶訂閱的目的地={}", getClass().getCanonicalName(), accessor.getDestination());
        return message;
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        log.info("{} after Send Completion", getClass().getCanonicalName());
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();
        if (StompCommand.SUBSCRIBE.equals(command)) {
            log.info("{}訂閱消息發送成功", getClass().getCanonicalName());
            simpMessagingTemplate.convertAndSend("/topic/getResponse", "消息發送成功");
        }
        /**
         * 如果用戶斷開連接
         */
        if (StompCommand.DISCONNECT.equals(command)) {
            log.info("{}用戶斷開連接成功", getClass().getCanonicalName());
            simpMessagingTemplate.convertAndSend("/topic/getResponse", "{'msg':'用戶斷開連接成功'}");
        }
    }
}

  在WebSocketMessageBrokerConfigurer中配置攔截器

       在registerStompEndpoints()方法中通過registry.addInterceptors(myHandShakeInterceptor)添加自定義HandShkeInceptor 攔截

       在configureClientInboundChannel()方法中registration.setInterceptors(myChannelInterceptorAdapter)添加ChannelInterceptor攔截器

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketMessageBrokerConfigure implements WebSocketMessageBrokerConfigurer {

    @Autowired
    private HandshakeInterceptor myHandShakeInterceptor;

    private ChannelInterceptor myChannelInterceptorAdapter = new MyChannelInterceptorAdapter();

    /**
     * 添加STOMP協議的端點。
     * 這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址;
     * withSockJS:指定端點使用SockJS協議
     *
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        String[] paths = new String[]{"/websocket-simple", "/websocket-single"};
        registry.addEndpoint(paths)
                .setAllowedOrigins("*")
                .addInterceptors(myHandShakeInterceptor)
                .withSockJS();
    }

    /**
     * 配置消息代理
     * 啟動簡單Broker,消息的發送的地址符合配置的前綴來的消息才發送到這個broker
     *
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/topic", "/queue");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptorAdapter);
    }
}

  @SendTo和@SendToUser用法

  @SendTo會將消息推送到所有訂閱此消息的連接,即訂閱/發布模式。@SendToUser只將消息推送到特定的一個訂閱者,即點對點模式

    @RequestMapping("/broadcast/single")
    public String broadcastSingle(HttpServletRequest request) {
        log.info("遠程的地址:{}", request.getRemoteAddr());
        return "single";
    }

    @MessageMapping("/receive-single")
    @SendToUser("/topic/getResponse")
    public RespMessage broadcastSingle(RequestMessage requestMessage) {
        log.info("接收到消息:{}", JSONObject.toJSONString(requestMessage));
        RespMessage respMessage = new RespMessage();
        String msg = String.format("接收到的消息:%s條", counter.incrementAndGet());
        respMessage.setMessage(msg);
        return respMessage;
    }

  single.html頁面

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta charset="UTF-8">
    <title>Web Socket</title>
</head>
<body>
<div>
    <div>
        <button id="connect" onclick="connect()">連接</button>
        <button id="disconnect" disabled="disabled" onclick="disconnect()">斷開連接</button>
    </div>
    <div id="conversation">
        <label>輸入你的名字</label><input type="text" id="name"/>
        <button id="sendName" onclick="sendName();">發送</button>
        <p id="response"></p>
    </div>
</div>
<script src="/static/js/jquery.js"></script>
<script src="/static/js/stomp.js"></script>
<script src="/static/js/sockjs.js"></script>
<script type="text/javascript">
    var stompClient=null;

    function setConnected(connected){
        document.getElementById('connect').disabled=connected;
        document.getElementById('disconnect').disabled=!connected;
        document.getElementById('conversation').style.visibility=connected?'visible':'hidden';
        $('#response').html();
    }

    function connect(){
        // websocket的連接地址,此值等於WebSocketMessageBrokerConfigurer中
        // registry.addEndpoint("/websocket-simple").withSockJS()配置的地址
        var socket=new SockJS('/websocket-single');
        stompClient=Stomp.over(socket);
        stompClient.connect({},function(frame){
            setConnected(true);
            console.log('Connected:'+frame);
            stompClient.subscribe('/user/topic/getResponse',function(response){
                showResponse(JSON.parse(response.body).message);
            });
        });
    }

    function disconnect() {
        if (stompClient != null) {
            stompClient.disconnect();
        }
        setConnected(false);
        console.log("Disconnected");
    }

    function sendName(){
       var name=$('#name').val();
       // 客戶端消息發送的目的:服務端使用BroadcastCtl
       // 中@MessageMapping("/receive")注解的方法來處理發送過來的消息
       stompClient.send('/receive-single',{},JSON.stringify({'name':name}));
    }

    window.onload = function(){
      if(stompClient !=null){
         stompClient.disconnet();
      }
      setConnected(false);
      console.log('disconnected');
    }

    function showResponse(message){
      var response=$('#response');
      response.html(message+ "\r\n" +response.html());
    }
</script>
</body>
</html>

      參考: 

      https://www.jianshu.com/p/775920c21766   

      https://juejin.im/post/5ac8cd5c6fb9a028dd4e7ba6


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM