websocket實現交互消息demo


1. 編寫場景 :   用戶點擊 import 導入按鈕,系統從遠程庫當中 接近10W條數據到本地Mysql數據庫,在此過程中,要反饋前端目前進度是多少,以便顯示對應進度條。

2. 思考解決方案 : 

  遠程庫導入,假設我們是用的傳統的關系型數據庫,那么要使用類似服務器游標來每次讀取一定規格的數據(比如1次讀取1000條,不可能一次讀10W條,這對內存跟帶寬都是很大負擔),

然后使用多線程(比如fix線程來寫)來寫,然后在此業務代碼之中反饋目前進度。

3. 這里介紹 websocket demo,整個完整實例后續補充專題: 

    3.1 實現方式,一般來說 ,我們有2種后台實現方式 ,一種使用tomcat的websocket實現,一種使用spring的websocket 。

      3.2 tomcat的方式需要tomcat 7.x,JEE7的支持。

      3.3 spring與websocket整合需要spring 4.x,並且使用了socketjs,對不支持websocket的瀏覽器可以模擬websocket使用

4. tomcat方式使用wbescoket

  這里,我使用了springboot來搭建,而springboot默認是tomcat容器,所以我們直接使用springboot項目即可。(注意,可能瀏覽器難識別h5的websocket,那么使用這種方式的

時候,若瀏覽器版本過低,還是使用spring的基於socket.js的吧。)

  4.1 引入pom 文件,添加websocket 依賴 

  核心是@ServerEndpoint這個注解。這個注解是Javaee標准里的注解,tomcat7以上已經對其進行了實現,如果是用傳統方法使用tomcat發布項目,只要在pom文件中引入javaee標准即可使用。

       引入javaee的標准是  (若是springboot可跳過此步驟)

<dependency>
      <groupId>javax</groupId>
      <artifactId>javaee-api</artifactId>
      <version>7.0</version>
      <scope>provided</scope>
    </dependency>

  引入 websocket依賴   (使用springboot只需要引入webscoket即可,但要先引入start依賴)

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

  順便說一句,springboot的高級組件會自動引用基礎的組件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter,所以不要重復引入。

       4.2 使用@ServerEndpoint創立websocket endpoint

        首先要注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint。要注意,如果使用獨立的servlet容器,而不是直接使用springboot的內置容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。

  

package root.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @Auther: pccw
 * @Date: 2018/10/25 10:10
 * @Description:
 */
@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
View Code

       4.3 編寫具體實現類 

  接下來就是寫websocket的具體實現類,很簡單,直接上代碼:

  

package root.websocket;

import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @Auther: pccw
 * @Date: 2018/10/25 10:11
 * @Description:
 */
@ServerEndpoint(value = "/websocket/dictSocket")
@Component
public class ImportDictValueSocket {

    //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;

    //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>();

    //與某個客戶端的連接會話,需要通過它來給客戶端發送數據
    private Session session;

    /**
     * 連接建立成功調用的方法*/
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在線數加1
        System.out.println("有新連接加入!當前在線人數為" + getOnlineCount());
        try {
            sendMessage("當前在線人數"+getOnlineCount()+",當前sessionID:"+this.session.getId());
        } catch (IOException e) {
            System.out.println("IO異常");
        }
    }

    /**
     * 連接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        System.out.println("有一連接關閉!當前在線人數為" + getOnlineCount());
    }

    /**
     * 收到客戶端消息后調用的方法
     *
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) throws IOException, InterruptedException {
        System.out.println("來自客戶端的消息:" + message);

        for(int i=0;i<5;i++){
            // 給當前的session返回進度
            session.getBasicRemote().sendText("當前進度為:"+i*20);
            Thread.sleep(2000);
        }


        //群發消息
      /*  for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }*/
    }

    /**
     * 發生錯誤時調用
     @OnError
     public void onError(Session session, Throwable error) {
     System.out.println("發生錯誤");
     error.printStackTrace();
     }  **/


     public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
         //this.session.getAsyncRemote().sendText(message);
     }


     /**
      * 群發自定義消息
      * */
    public static void sendInfo(String message) throws IOException {
        for (ImportDictValueSocket item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        ImportDictValueSocket.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        ImportDictValueSocket.onlineCount--;
    }
}
實現類

 

  

        使用springboot的唯一區別是要@Component聲明下,而使用獨立容器是由容器自己管理websocket的,但在springboot中連容器都是spring管理的。

雖然@Component默認是單例模式的,但springboot還是會為每個websocket連接初始化一個bean,所以可以用一個靜態set保存起來。

       4.4 前端代碼  (這里我就沒編寫了,我就直接使用 http://www.blue-zero.com/WebSocket/ 在線測試工具完成測試)

       

<!DOCTYPE HTML>
<html>
<head>
    <title>My WebSocket</title>
</head>

<body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body>

<script type="text/javascript">
    var websocket = null;

    //判斷當前瀏覽器是否支持WebSocket
    if('WebSocket' in window){
        websocket = new WebSocket("ws://localhost:8084/websocket");
    }
    else{
        alert('Not support websocket')
    }

    //連接發生錯誤的回調方法
    websocket.onerror = function(){
        setMessageInnerHTML("error");
    };

    //連接成功建立的回調方法
    websocket.onopen = function(event){
        setMessageInnerHTML("open");
    }

    //接收到消息的回調方法
    websocket.onmessage = function(event){
        setMessageInnerHTML(event.data);
    }

    //連接關閉的回調方法
    websocket.onclose = function(){
        setMessageInnerHTML("close");
    }

    //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。
    window.onbeforeunload = function(){
        websocket.close();
    }

    //將消息顯示在網頁上
    function setMessageInnerHTML(innerHTML){
        document.getElementById('message').innerHTML += innerHTML + '<br/>';
    }

    //關閉連接
    function closeWebSocket(){
        websocket.close();
    }

    //發送消息
    function send(){
        var message = document.getElementById('text').value;
        websocket.send(message);
    }
</script>
</html>
View Code

  代碼摘抄自 : https://blog.csdn.net/snakeMoving/article/details/78992838

       4.5 總結 : 

       springboot已經做了深度的集成和優化,要注意是否添加了不需要的依賴、配置或聲明。由於很多講解組件使用的文章是和spring集成的,會有一些配置,在使用springboot時,由於springboot已經有了自己的配置,再這些配置有可能導致各種各樣的異常。

       4.6 測試結果 : 

  

 

 5. 如何使用spring的方式來實現 

  這種方式實現我沒有具體的手寫代碼,摘抄自 : https://www.cnblogs.com/interdrp/p/7903736.html 

  5.1 整合spring

       5.2   WebSocketConfig.java

      這個類是配置類,所以需要在spring mvc配置文件中加入對這個類的掃描,第一個addHandler是對正常連接的配置,第二個是如果瀏覽器不支持websocket,使用socketjs模擬websocket的連接。

      

package com.websocket;  
  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.web.socket.config.annotation.EnableWebSocket;  
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;  
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;  
import org.springframework.web.socket.handler.TextWebSocketHandler;  
  
@Configuration  
@EnableWebSocket  
public class WebSocketConfig implements WebSocketConfigurer {  
    @Override  
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {  
        registry.addHandler(chatMessageHandler(),"/webSocketServer").addInterceptors(new ChatHandshakeInterceptor());  
        registry.addHandler(chatMessageHandler(), "/sockjs/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()).withSockJS();  
    }  
   
    @Bean  
    public TextWebSocketHandler chatMessageHandler(){  
        return new ChatMessageHandler();  
    }  
  
}  
View Code

  

  ChatHandshakeInterceptor.java

  這個類的作用就是在連接成功前和成功后增加一些額外的功能,Constants.java類是一個工具類,兩個常量。

  

package com.websocket;  
  
import java.util.Map;  
import org.apache.shiro.SecurityUtils;  
import org.springframework.http.server.ServerHttpRequest;  
import org.springframework.http.server.ServerHttpResponse;  
import org.springframework.web.socket.WebSocketHandler;  
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;  
  
public class ChatHandshakeInterceptor extends HttpSessionHandshakeInterceptor {  
  
    @Override  
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,  
            Map<String, Object> attributes) throws Exception {  
        System.out.println("Before Handshake");  
        /* 
         * if (request instanceof ServletServerHttpRequest) { 
         * ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) 
         * request; HttpSession session = 
         * servletRequest.getServletRequest().getSession(false); if (session != 
         * null) { //使用userName區分WebSocketHandler,以便定向發送消息 String userName = 
         * (String) session.getAttribute(Constants.SESSION_USERNAME); if 
         * (userName==null) { userName="default-system"; } 
         * attributes.put(Constants.WEBSOCKET_USERNAME,userName); 
         *  
         * } } 
         */  
  
        //使用userName區分WebSocketHandler,以便定向發送消息(使用shiro獲取session,或是使用上面的方式)  
        String userName = (String) SecurityUtils.getSubject().getSession().getAttribute(Constants.SESSION_USERNAME);  
        if (userName == null) {  
            userName = "default-system";  
        }  
        attributes.put(Constants.WEBSOCKET_USERNAME, userName);  
  
        return super.beforeHandshake(request, response, wsHandler, attributes);  
    }  
  
    @Override  
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,  
            Exception ex) {  
        System.out.println("After Handshake");  
        super.afterHandshake(request, response, wsHandler, ex);  
    }  
  
}  
View Code

         

  ChatMessageHandler.java

  這個類是對消息的一些處理,比如是發給一個人,還是發給所有人,並且前端連接時觸發的一些動作

  

package com.websocket;  
  
import java.io.IOException;  
import java.util.ArrayList;  
import org.apache.log4j.Logger;  
import org.springframework.web.socket.CloseStatus;  
import org.springframework.web.socket.TextMessage;  
import org.springframework.web.socket.WebSocketSession;  
import org.springframework.web.socket.handler.TextWebSocketHandler;  
  
public class ChatMessageHandler extends TextWebSocketHandler {  
  
    private static final ArrayList<WebSocketSession> users;// 這個會出現性能問題,最好用Map來存儲,key用userid  
    private static Logger logger = Logger.getLogger(ChatMessageHandler.class);  
  
    static {  
        users = new ArrayList<WebSocketSession>();  
    }  
  
    /** 
     * 連接成功時候,會觸發UI上onopen方法 
     */  
    @Override  
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {  
        System.out.println("connect to the websocket success......");  
        users.add(session);  
        // 這塊會實現自己業務,比如,當用戶登錄后,會把離線消息推送給用戶  
        // TextMessage returnMessage = new TextMessage("你將收到的離線");  
        // session.sendMessage(returnMessage);  
    }  
  
    /** 
     * 在UI在用js調用websocket.send()時候,會調用該方法 
     */  
    @Override  
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {  
        sendMessageToUsers(message);  
        //super.handleTextMessage(session, message);  
    }  
  
    /** 
     * 給某個用戶發送消息 
     * 
     * @param userName 
     * @param message 
     */  
    public void sendMessageToUser(String userName, TextMessage message) {  
        for (WebSocketSession user : users) {  
            if (user.getAttributes().get(Constants.WEBSOCKET_USERNAME).equals(userName)) {  
                try {  
                    if (user.isOpen()) {  
                        user.sendMessage(message);  
                    }  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                break;  
            }  
        }  
    }  
  
    /** 
     * 給所有在線用戶發送消息 
     * 
     * @param message 
     */  
    public void sendMessageToUsers(TextMessage message) {  
        for (WebSocketSession user : users) {  
            try {  
                if (user.isOpen()) {  
                    user.sendMessage(message);  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    @Override  
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {  
        if (session.isOpen()) {  
            session.close();  
        }  
        logger.debug("websocket connection closed......");  
        users.remove(session);  
    }  
  
    @Override  
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {  
        logger.debug("websocket connection closed......");  
        users.remove(session);  
    }  
  
    @Override  
    public boolean supportsPartialMessages() {  
        return false;  
    }  
  
}  
View Code

  

  spring-mvc.xml

  正常的配置文件,同時需要增加對WebSocketConfig.java類的掃描,並且增加

  

xmlns:websocket="http://www.springframework.org/schema/websocket"  
              http://www.springframework.org/schema/websocket   
              <a target="_blank" href="http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd">http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd</a>  

  客戶端: 

  

<script type="text/javascript"  
        src="http://localhost:8080/Bank/js/sockjs-0.3.min.js"></script>  
    <script>  
        var websocket;  
      
        if ('WebSocket' in window) {  
            websocket = new WebSocket("ws://" + document.location.host + "/Bank/webSocketServer");  
        } else if ('MozWebSocket' in window) {  
            websocket = new MozWebSocket("ws://" + document.location.host + "/Bank/webSocketServer");  
        } else {  
            websocket = new SockJS("http://" + document.location.host + "/Bank/sockjs/webSocketServer");  
        }  
      
        websocket.onopen = function(evnt) {};  
        websocket.onmessage = function(evnt) {  
            $("#test").html("(<font color='red'>" + evnt.data + "</font>)")  
        };  
      
        websocket.onerror = function(evnt) {};  
        websocket.onclose = function(evnt) {}  
      
        $('#btn').on('click', function() {  
            if (websocket.readyState == websocket.OPEN) {  
                var msg = $('#id').val();  
                //調用后台handleTextMessage方法  
                websocket.send(msg);  
            } else {  
                alert("連接失敗!");  
            }  
        });  
    </script>  
View Code

  注意導入socketjs時要使用地址全稱,並且連接使用的是http而不是websocket的ws。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM