1. 編寫場景 : 用戶點擊 import 導入按鈕,系統從遠程庫當中 接近10W條數據到本地Mysql數據庫,在此過程中,要反饋前端目前進度是多少,以便顯示對應進度條。
2. 思考解決方案 :
遠程庫導入,假設我們是用的傳統的關系型數據庫,那么要使用類似服務器游標來每次讀取一定規格的數據(比如1次讀取1000條,不可能一次讀10W條,這對內存跟帶寬都是很大負擔),
然后使用多線程(比如fix線程來寫)來寫,然后在此業務代碼之中反饋目前進度。
3. 這里介紹 websocket demo,整個完整實例后續補充專題:
3.1 實現方式,一般來說 ,我們有2種后台實現方式 ,一種使用tomcat的websocket實現,一種使用spring的websocket 。
3.2 tomcat的方式需要tomcat 7.x,JEE7的支持。
3.3 spring與websocket整合需要spring 4.x,並且使用了socketjs,對不支持websocket的瀏覽器可以模擬websocket使用
4. tomcat方式使用wbescoket
這里,我使用了springboot來搭建,而springboot默認是tomcat容器,所以我們直接使用springboot項目即可。(注意,可能瀏覽器難識別h5的websocket,那么使用這種方式的
時候,若瀏覽器版本過低,還是使用spring的基於socket.js的吧。)
4.1 引入pom 文件,添加websocket 依賴
核心是@ServerEndpoint這個注解。這個注解是Javaee標准里的注解,tomcat7以上已經對其進行了實現,如果是用傳統方法使用tomcat發布項目,只要在pom文件中引入javaee標准即可使用。
引入javaee的標准是 (若是springboot可跳過此步驟)
<dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency>
引入 websocket依賴 (使用springboot只需要引入webscoket即可,但要先引入start依賴)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>1.3.5.RELEASE</version> </dependency>
順便說一句,springboot的高級組件會自動引用基礎的組件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter,所以不要重復引入。
4.2 使用@ServerEndpoint創立websocket endpoint
首先要注入ServerEndpointExporter,這個bean會自動注冊使用了@ServerEndpoint注解聲明的Websocket endpoint。要注意,如果使用獨立的servlet容器,而不是直接使用springboot的內置容器,就不要注入ServerEndpointExporter,因為它將由容器自己提供和管理。

package root.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @Auther: pccw * @Date: 2018/10/25 10:10 * @Description: */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
4.3 編寫具體實現類
接下來就是寫websocket的具體實現類,很簡單,直接上代碼:

package root.websocket; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * @Auther: pccw * @Date: 2018/10/25 10:11 * @Description: */ @ServerEndpoint(value = "/websocket/dictSocket") @Component public class ImportDictValueSocket { //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>(); //與某個客戶端的連接會話,需要通過它來給客戶端發送數據 private Session session; /** * 連接建立成功調用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在線數加1 System.out.println("有新連接加入!當前在線人數為" + getOnlineCount()); try { sendMessage("當前在線人數"+getOnlineCount()+",當前sessionID:"+this.session.getId()); } catch (IOException e) { System.out.println("IO異常"); } } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 System.out.println("有一連接關閉!當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { System.out.println("來自客戶端的消息:" + message); for(int i=0;i<5;i++){ // 給當前的session返回進度 session.getBasicRemote().sendText("當前進度為:"+i*20); Thread.sleep(2000); } //群發消息 /* for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 發生錯誤時調用 @OnError public void onError(Session session, Throwable error) { System.out.println("發生錯誤"); error.printStackTrace(); } **/ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message) throws IOException { for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { ImportDictValueSocket.onlineCount++; } public static synchronized void subOnlineCount() { ImportDictValueSocket.onlineCount--; } }
使用springboot的唯一區別是要@Component聲明下,而使用獨立容器是由容器自己管理websocket的,但在springboot中連容器都是spring管理的。
雖然@Component默認是單例模式的,但springboot還是會為每個websocket連接初始化一個bean,所以可以用一個靜態set保存起來。
4.4 前端代碼 (這里我就沒編寫了,我就直接使用 http://www.blue-zero.com/WebSocket/ 在線測試工具完成測試)

<!DOCTYPE HTML> <html> <head> <title>My WebSocket</title> </head> <body> Welcome<br/> <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> <div id="message"> </div> </body> <script type="text/javascript"> var websocket = null; //判斷當前瀏覽器是否支持WebSocket if('WebSocket' in window){ websocket = new WebSocket("ws://localhost:8084/websocket"); } else{ alert('Not support websocket') } //連接發生錯誤的回調方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //連接成功建立的回調方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回調方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //連接關閉的回調方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket連接,防止連接還沒斷開就關閉窗口,server端會拋異常。 window.onbeforeunload = function(){ websocket.close(); } //將消息顯示在網頁上 function setMessageInnerHTML(innerHTML){ document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //關閉連接 function closeWebSocket(){ websocket.close(); } //發送消息 function send(){ var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
代碼摘抄自 : https://blog.csdn.net/snakeMoving/article/details/78992838
4.5 總結 :
springboot已經做了深度的集成和優化,要注意是否添加了不需要的依賴、配置或聲明。由於很多講解組件使用的文章是和spring集成的,會有一些配置,在使用springboot時,由於springboot已經有了自己的配置,再這些配置有可能導致各種各樣的異常。
4.6 測試結果 :
5. 如何使用spring的方式來實現
這種方式實現我沒有具體的手寫代碼,摘抄自 : https://www.cnblogs.com/interdrp/p/7903736.html
5.1 整合spring
5.2 WebSocketConfig.java
這個類是配置類,所以需要在spring mvc配置文件中加入對這個類的掃描,第一個addHandler是對正常連接的配置,第二個是如果瀏覽器不支持websocket,使用socketjs模擬websocket的連接。

package com.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.TextWebSocketHandler; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(chatMessageHandler(),"/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()); registry.addHandler(chatMessageHandler(), "/sockjs/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()).withSockJS(); } @Bean public TextWebSocketHandler chatMessageHandler(){ return new ChatMessageHandler(); } }
ChatHandshakeInterceptor.java
這個類的作用就是在連接成功前和成功后增加一些額外的功能,Constants.java類是一個工具類,兩個常量。

package com.websocket; import java.util.Map; import org.apache.shiro.SecurityUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public class ChatHandshakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("Before Handshake"); /* * if (request instanceof ServletServerHttpRequest) { * ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) * request; HttpSession session = * servletRequest.getServletRequest().getSession(false); if (session != * null) { //使用userName區分WebSocketHandler,以便定向發送消息 String userName = * (String) session.getAttribute(Constants.SESSION_USERNAME); if * (userName==null) { userName="default-system"; } * attributes.put(Constants.WEBSOCKET_USERNAME,userName); * * } } */ //使用userName區分WebSocketHandler,以便定向發送消息(使用shiro獲取session,或是使用上面的方式) String userName = (String) SecurityUtils.getSubject().getSession().getAttribute(Constants.SESSION_USERNAME); if (userName == null) { userName = "default-system"; } attributes.put(Constants.WEBSOCKET_USERNAME, userName); return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { System.out.println("After Handshake"); super.afterHandshake(request, response, wsHandler, ex); } }
ChatMessageHandler.java
這個類是對消息的一些處理,比如是發給一個人,還是發給所有人,並且前端連接時觸發的一些動作

package com.websocket; import java.io.IOException; import java.util.ArrayList; import org.apache.log4j.Logger; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; public class ChatMessageHandler extends TextWebSocketHandler { private static final ArrayList<WebSocketSession> users;// 這個會出現性能問題,最好用Map來存儲,key用userid private static Logger logger = Logger.getLogger(ChatMessageHandler.class); static { users = new ArrayList<WebSocketSession>(); } /** * 連接成功時候,會觸發UI上onopen方法 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("connect to the websocket success......"); users.add(session); // 這塊會實現自己業務,比如,當用戶登錄后,會把離線消息推送給用戶 // TextMessage returnMessage = new TextMessage("你將收到的離線"); // session.sendMessage(returnMessage); } /** * 在UI在用js調用websocket.send()時候,會調用該方法 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { sendMessageToUsers(message); //super.handleTextMessage(session, message); } /** * 給某個用戶發送消息 * * @param userName * @param message */ public void sendMessageToUser(String userName, TextMessage message) { for (WebSocketSession user : users) { if (user.getAttributes().get(Constants.WEBSOCKET_USERNAME).equals(userName)) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } break; } } } /** * 給所有在線用戶發送消息 * * @param message */ public void sendMessageToUsers(TextMessage message) { for (WebSocketSession user : users) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { session.close(); } logger.debug("websocket connection closed......"); users.remove(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); users.remove(session); } @Override public boolean supportsPartialMessages() { return false; } }
spring-mvc.xml
正常的配置文件,同時需要增加對WebSocketConfig.java類的掃描,並且增加
xmlns:websocket="http://www.springframework.org/schema/websocket" http://www.springframework.org/schema/websocket <a target="_blank" href="http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd">http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd</a>
客戶端:

<script type="text/javascript" src="http://localhost:8080/Bank/js/sockjs-0.3.min.js"></script> <script> var websocket; if ('WebSocket' in window) { websocket = new WebSocket("ws://" + document.location.host + "/Bank/webSocketServer"); } else if ('MozWebSocket' in window) { websocket = new MozWebSocket("ws://" + document.location.host + "/Bank/webSocketServer"); } else { websocket = new SockJS("http://" + document.location.host + "/Bank/sockjs/webSocketServer"); } websocket.onopen = function(evnt) {}; websocket.onmessage = function(evnt) { $("#test").html("(<font color='red'>" + evnt.data + "</font>)") }; websocket.onerror = function(evnt) {}; websocket.onclose = function(evnt) {} $('#btn').on('click', function() { if (websocket.readyState == websocket.OPEN) { var msg = $('#id').val(); //調用后台handleTextMessage方法 websocket.send(msg); } else { alert("連接失敗!"); } }); </script>
注意導入socketjs時要使用地址全稱,並且連接使用的是http而不是websocket的ws。