1. 编写场景 : 用户点击 import 导入按钮,系统从远程库当中 接近10W条数据到本地Mysql数据库,在此过程中,要反馈前端目前进度是多少,以便显示对应进度条。
2. 思考解决方案 :
远程库导入,假设我们是用的传统的关系型数据库,那么要使用类似服务器游标来每次读取一定规格的数据(比如1次读取1000条,不可能一次读10W条,这对内存跟带宽都是很大负担),
然后使用多线程(比如fix线程来写)来写,然后在此业务代码之中反馈目前进度。
3. 这里介绍 websocket demo,整个完整实例后续补充专题:
3.1 实现方式,一般来说 ,我们有2种后台实现方式 ,一种使用tomcat的websocket实现,一种使用spring的websocket 。
3.2 tomcat的方式需要tomcat 7.x,JEE7的支持。
3.3 spring与websocket整合需要spring 4.x,并且使用了socketjs,对不支持websocket的浏览器可以模拟websocket使用
4. tomcat方式使用wbescoket
这里,我使用了springboot来搭建,而springboot默认是tomcat容器,所以我们直接使用springboot项目即可。(注意,可能浏览器难识别h5的websocket,那么使用这种方式的
时候,若浏览器版本过低,还是使用spring的基于socket.js的吧。)
4.1 引入pom 文件,添加websocket 依赖
核心是@ServerEndpoint这个注解。这个注解是Javaee标准里的注解,tomcat7以上已经对其进行了实现,如果是用传统方法使用tomcat发布项目,只要在pom文件中引入javaee标准即可使用。
引入javaee的标准是 (若是springboot可跳过此步骤)
<dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency>
引入 websocket依赖 (使用springboot只需要引入webscoket即可,但要先引入start依赖)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>1.3.5.RELEASE</version> </dependency>
顺便说一句,springboot的高级组件会自动引用基础的组件,像spring-boot-starter-websocket就引入了spring-boot-starter-web和spring-boot-starter,所以不要重复引入。
4.2 使用@ServerEndpoint创立websocket endpoint
首先要注入ServerEndpointExporter,这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint。要注意,如果使用独立的servlet容器,而不是直接使用springboot的内置容器,就不要注入ServerEndpointExporter,因为它将由容器自己提供和管理。

package root.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @Auther: pccw * @Date: 2018/10/25 10:10 * @Description: */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
4.3 编写具体实现类
接下来就是写websocket的具体实现类,很简单,直接上代码:

package root.websocket; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * @Auther: pccw * @Date: 2018/10/25 10:11 * @Description: */ @ServerEndpoint(value = "/websocket/dictSocket") @Component public class ImportDictValueSocket { //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** * 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); try { sendMessage("当前在线人数"+getOnlineCount()+",当前sessionID:"+this.session.getId()); } catch (IOException e) { System.out.println("IO异常"); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { System.out.println("来自客户端的消息:" + message); for(int i=0;i<5;i++){ // 给当前的session返回进度 session.getBasicRemote().sendText("当前进度为:"+i*20); Thread.sleep(2000); } //群发消息 /* for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 发生错误时调用 @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } **/ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); //this.session.getAsyncRemote().sendText(message); } /** * 群发自定义消息 * */ public static void sendInfo(String message) throws IOException { for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { ImportDictValueSocket.onlineCount++; } public static synchronized void subOnlineCount() { ImportDictValueSocket.onlineCount--; } }
使用springboot的唯一区别是要@Component声明下,而使用独立容器是由容器自己管理websocket的,但在springboot中连容器都是spring管理的。
虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
4.4 前端代码 (这里我就没编写了,我就直接使用 http://www.blue-zero.com/WebSocket/ 在线测试工具完成测试)

<!DOCTYPE HTML> <html> <head> <title>My WebSocket</title> </head> <body> Welcome<br/> <input id="text" type="text" /><button onclick="send()">Send</button> <button onclick="closeWebSocket()">Close</button> <div id="message"> </div> </body> <script type="text/javascript"> var websocket = null; //判断当前浏览器是否支持WebSocket if('WebSocket' in window){ websocket = new WebSocket("ws://localhost:8084/websocket"); } else{ alert('Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function(){ setMessageInnerHTML("error"); }; //连接成功建立的回调方法 websocket.onopen = function(event){ setMessageInnerHTML("open"); } //接收到消息的回调方法 websocket.onmessage = function(event){ setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function(){ setMessageInnerHTML("close"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function(){ websocket.close(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML){ document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭连接 function closeWebSocket(){ websocket.close(); } //发送消息 function send(){ var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
代码摘抄自 : https://blog.csdn.net/snakeMoving/article/details/78992838
4.5 总结 :
springboot已经做了深度的集成和优化,要注意是否添加了不需要的依赖、配置或声明。由于很多讲解组件使用的文章是和spring集成的,会有一些配置,在使用springboot时,由于springboot已经有了自己的配置,再这些配置有可能导致各种各样的异常。
4.6 测试结果 :
5. 如何使用spring的方式来实现
这种方式实现我没有具体的手写代码,摘抄自 : https://www.cnblogs.com/interdrp/p/7903736.html
5.1 整合spring
5.2 WebSocketConfig.java
这个类是配置类,所以需要在spring mvc配置文件中加入对这个类的扫描,第一个addHandler是对正常连接的配置,第二个是如果浏览器不支持websocket,使用socketjs模拟websocket的连接。

package com.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.handler.TextWebSocketHandler; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(chatMessageHandler(),"/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()); registry.addHandler(chatMessageHandler(), "/sockjs/webSocketServer").addInterceptors(new ChatHandshakeInterceptor()).withSockJS(); } @Bean public TextWebSocketHandler chatMessageHandler(){ return new ChatMessageHandler(); } }
ChatHandshakeInterceptor.java
这个类的作用就是在连接成功前和成功后增加一些额外的功能,Constants.java类是一个工具类,两个常量。

package com.websocket; import java.util.Map; import org.apache.shiro.SecurityUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public class ChatHandshakeInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { System.out.println("Before Handshake"); /* * if (request instanceof ServletServerHttpRequest) { * ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) * request; HttpSession session = * servletRequest.getServletRequest().getSession(false); if (session != * null) { //使用userName区分WebSocketHandler,以便定向发送消息 String userName = * (String) session.getAttribute(Constants.SESSION_USERNAME); if * (userName==null) { userName="default-system"; } * attributes.put(Constants.WEBSOCKET_USERNAME,userName); * * } } */ //使用userName区分WebSocketHandler,以便定向发送消息(使用shiro获取session,或是使用上面的方式) String userName = (String) SecurityUtils.getSubject().getSession().getAttribute(Constants.SESSION_USERNAME); if (userName == null) { userName = "default-system"; } attributes.put(Constants.WEBSOCKET_USERNAME, userName); return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { System.out.println("After Handshake"); super.afterHandshake(request, response, wsHandler, ex); } }
ChatMessageHandler.java
这个类是对消息的一些处理,比如是发给一个人,还是发给所有人,并且前端连接时触发的一些动作

package com.websocket; import java.io.IOException; import java.util.ArrayList; import org.apache.log4j.Logger; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; public class ChatMessageHandler extends TextWebSocketHandler { private static final ArrayList<WebSocketSession> users;// 这个会出现性能问题,最好用Map来存储,key用userid private static Logger logger = Logger.getLogger(ChatMessageHandler.class); static { users = new ArrayList<WebSocketSession>(); } /** * 连接成功时候,会触发UI上onopen方法 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("connect to the websocket success......"); users.add(session); // 这块会实现自己业务,比如,当用户登录后,会把离线消息推送给用户 // TextMessage returnMessage = new TextMessage("你将收到的离线"); // session.sendMessage(returnMessage); } /** * 在UI在用js调用websocket.send()时候,会调用该方法 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { sendMessageToUsers(message); //super.handleTextMessage(session, message); } /** * 给某个用户发送消息 * * @param userName * @param message */ public void sendMessageToUser(String userName, TextMessage message) { for (WebSocketSession user : users) { if (user.getAttributes().get(Constants.WEBSOCKET_USERNAME).equals(userName)) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } break; } } } /** * 给所有在线用户发送消息 * * @param message */ public void sendMessageToUsers(TextMessage message) { for (WebSocketSession user : users) { try { if (user.isOpen()) { user.sendMessage(message); } } catch (IOException e) { e.printStackTrace(); } } } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { session.close(); } logger.debug("websocket connection closed......"); users.remove(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { logger.debug("websocket connection closed......"); users.remove(session); } @Override public boolean supportsPartialMessages() { return false; } }
spring-mvc.xml
正常的配置文件,同时需要增加对WebSocketConfig.java类的扫描,并且增加
xmlns:websocket="http://www.springframework.org/schema/websocket" http://www.springframework.org/schema/websocket <a target="_blank" href="http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd">http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd</a>
客户端:

<script type="text/javascript" src="http://localhost:8080/Bank/js/sockjs-0.3.min.js"></script> <script> var websocket; if ('WebSocket' in window) { websocket = new WebSocket("ws://" + document.location.host + "/Bank/webSocketServer"); } else if ('MozWebSocket' in window) { websocket = new MozWebSocket("ws://" + document.location.host + "/Bank/webSocketServer"); } else { websocket = new SockJS("http://" + document.location.host + "/Bank/sockjs/webSocketServer"); } websocket.onopen = function(evnt) {}; websocket.onmessage = function(evnt) { $("#test").html("(<font color='red'>" + evnt.data + "</font>)") }; websocket.onerror = function(evnt) {}; websocket.onclose = function(evnt) {} $('#btn').on('click', function() { if (websocket.readyState == websocket.OPEN) { var msg = $('#id').val(); //调用后台handleTextMessage方法 websocket.send(msg); } else { alert("连接失败!"); } }); </script>
注意导入socketjs时要使用地址全称,并且连接使用的是http而不是websocket的ws。