1. 项目背景 : 前几天以及最近的博客,都在完成一个接口,前端点击“导入”按钮,后台完成大数据量导入,并且在导入的过程中持续告知导入情况;
2. 思考分析 : 大数据量导入,假设是10W条,那么如果是使用关系型数据库则可以使用jdbc的fetch游标移动完成工作,并且在插入的时候可以使用多线程加快速度,
在最开始的时候查询出总数据量,除以我们规定的5000次导入一次就得到了子线程应当执行的次数,并返回给前端,之后子线程开始执行,每执行一次,返回一个
状态码给前端,则前端依据此2个环节可推断进度,并能显示对应的进度条效果。
3. 代码实践 :
3.1 pom 加入 websokcet依赖
<!-- 引入websokcet --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>1.3.5.RELEASE</version> </dependency>
3.2 springboot 注入 websocket配置类

package root.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @Date: 2018/10/25 10:10 * @Description: 注入 websocekt配置类 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
3.3 springboot 加入自己要对外的websocket类

package root.websocket; import org.apache.ibatis.session.SqlSession; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import root.report.control.dict.TestTwo; import root.report.db.DbFactory; import root.report.service.DictService; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * @Date: 2018/10/25 10:11 * @Description: 注入dictSocket接口 */ @ServerEndpoint(value = "/websocket/dictSocket") @Component public class ImportDictValueSocket { private static Logger log = Logger.getLogger(ImportDictValueSocket.class); private DictService dictService; //此处是解决无法注入的关键 private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { ImportDictValueSocket.applicationContext = applicationContext; } //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 private static int onlineCount = 0; //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /** * 连接建立成功调用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在线数加1 /*System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); try { sendMessage("当前在线人数"+getOnlineCount()+",当前sessionID:"+this.session.getId()); } catch (IOException e) { System.out.println("IO异常"); }*/ } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //从set中删除 subOnlineCount(); //在线数减1 log.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { log.info("来自客户端的消息:" + message); // 调用 TestTwo的 导入方法,一旦连接成功 , 则 持续通信 try { dictService = applicationContext.getBean(DictService.class); String result = dictService.importFuncDictValueByDictId(session,Integer.parseInt(message)); // 需要加锁与否? synchronized (result){ log.info("结果为"+result); sendMessage(result); // 返回 "over" session.close(); // 关闭掉 } }catch (Exception e){ log.error(e.getMessage()); sendMessage(e.getMessage()); } //群发消息 /* for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 发生错误时调用 @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } **/ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群发自定义消息 * */ public static void sendInfo(String message) throws IOException { for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { ImportDictValueSocket.onlineCount++; } public static synchronized void subOnlineCount() { ImportDictValueSocket.onlineCount--; } }
3.3-1 注意到 : 我们在 websocket 当中 无法正常引入 @Service等 业务class,这些必须手动在 websocket生成,或者我们从 容器当中读到,所以我们要在启动类当中做如下处理:
public static void main(String[] args) { ConfigurableApplicationContext configurableApplicationContext = SpringApplication.run(ReportServerApplication.class, args); // 解决websocket 当中无法注入bean的方法 ImportDictValueSocket.setApplicationContext(configurableApplicationContext); }
3.3-2 业务类代码

package root.report.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.session.SqlSession; import org.apache.log4j.Logger; import org.dom4j.*; import org.dom4j.io.OutputFormat; import org.dom4j.io.XMLWriter; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.SAXException; import root.configure.AppConstants; import root.configure.MybatisCacheConfiguration; import root.report.db.DbFactory; import root.report.util.JsonUtil; import root.report.util.XmlUtil; import javax.websocket.Session; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.sql.*; import java.util.*; import java.util.Date; import java.util.concurrent.*; import java.util.function.Consumer; @Service public class DictService { // 测试使用 fetch 按照指定规格读取数据 , 源表为 test_dict 目标表名为 test_import public void fetchBySql(SqlSession sqlSession,String sourceSql,Consumer<ResultSet> consumer){ // 通过 conn 得到 真正的连接(要解析 数据库名,从而真正的 数据库连接通道) // 而在我们这里则不需要这么复杂,前面过程都由DbFactory 完成了,传递进来的sqlSession 就是一个 连接好的通道 Connection conn = sqlSession.getConnection(); PreparedStatement stm = null; // 开始时间 Long begin = new Date().getTime(); String sql = sourceSql; try { final int countRow = 1000; stm = conn.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); // stm.setFetchSize(Integer.MIN_VALUE); // 设置游标 1000 每次读取一千行数据 stm.setFetchSize(countRow); stm.setFetchDirection(ResultSet.FETCH_REVERSE); // ResultSet rs = stm.executeQuery(); // rs.setFetchSize(Integer.MIN_VALUE); consumer.accept(rs); // 把 当前的 rs 传递给消费者 }catch (Exception e){ e.printStackTrace(); } } public String importFuncDictValueByDictId(Session session, int dict_id){ // 先决条件 : 根据 dict_id 得到 sourceSql SqlSession sqlSession = DbFactory.Open(DbFactory.FORM); String dbName = sqlSession.selectOne("dict.getDictDbByDictId",dict_id); if(StringUtils.isBlank(dbName)) return("此DictId所对应的数据库为空,无法操作!"); final int countRow = 1000; final List<Map<String, Object>> list = new ArrayList<>(); // 初始化对应的数据库 SqlSession sourceSqlSession = DbFactory.Open(dbName); if(sourceSqlSession==null) return("数据库无法连接,无法操作!"); String sourceSql = null; try { sourceSql = this.getSqlTemplate("数据字典",String.valueOf(dict_id),false); } catch (Exception e){ log.info(e.getMessage()); return "无法从xml文件得到对应的sql,无法查询!"; } Long begin = new Date().getTime(); Connection conn = sqlSession.getConnection(); final int poolSize = 5; ExecutorService fixedThreadPool = Executors.newFixedThreadPool(poolSize); fetchBySql(sourceSqlSession,sourceSql,rs -> { try { String prefix = "INSERT INTO func_dict_value (dict_id,value_code,value_name) VALUES "; final StringBuffer suffix = new StringBuffer(); // 设置事务为非自动提交 conn.setAutoCommit(false); // 非提交能减少日志的生成,从而加快执行速度 PreparedStatement pst = (PreparedStatement) conn.prepareStatement(""); List<Map> mapList = new ArrayList<>(); if (rs != null) { rs.last(); // 移动到最后面 BigDecimal result = new BigDecimal((double) rs.getRow()/5000).setScale(0, BigDecimal.ROUND_UP); int countSize = result.intValue(); try { session.getBasicRemote().sendText(String.valueOf(countSize)); // 第一次直接发送给websocket 客户端1个 本次执行总数统计. } catch (IOException e) { e.printStackTrace(); } rs.first(); // 移动到最前面 // 坑处1,如果使用了firt的话,那么直接达到了第一条,则不要使用 while(rs.next()) if(rs!=null && StringUtils.isNotBlank(rs.getInt("code")+"")){ Map<String, Object> map = new HashMap<>(); map.put("code",rs.getInt("code")); map.put("name",rs.getString("name")); mapList.add(map); } while (rs.next()) { Map<String, Object> map = new HashMap<>(); map.put("code",rs.getInt("code")); map.put("name",rs.getString("name")); mapList.add(map); if(rs.getRow()%5000==0){ // 执行批量插入操作 List<Map> finalMapList = mapList; final StringBuffer sb = new StringBuffer(); Future<String> stringFuture = fixedThreadPool.submit( new Callable<String>() { @Override public String call() throws Exception { String name = Thread.currentThread().getName(); long threadId = Thread.currentThread().getId(); log.info("thread name: "+name+",id为"+threadId+"执行了一次"); for (Map tempMap : finalMapList) { // 构建SQL后缀 sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),"); } return "success"; } } ); mapList = new ArrayList<>(); try { String s = stringFuture.get(); if ("success".equals(s)) { String sql = prefix + sb.substring(0, sb.length() - 1); // 构建完整SQL pst.addBatch(sql); // 添加执行SQL pst.executeBatch(); // 执行操作 session.getBasicRemote().sendText("success"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } // 执行完之后 mapList 是个 不足5000个的,这个时候我们再去执行一次 添加操作 List<Map> finalMapList = mapList; System.out.println("mapList大小为:"+mapList.size()); System.out.println("finalMapList:"+finalMapList.size()); final StringBuffer sb = new StringBuffer(); Future<String> stringFuture = fixedThreadPool.submit( new Callable<String>() { @Override public String call() throws Exception { String name = Thread.currentThread().getName(); long threadId = Thread.currentThread().getId(); log.info("thread name: "+name+"id为"+threadId+"执行了一次"); for (Map tempMap : finalMapList) { // 构建SQL后缀 sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),"); } return "success"; } } ); try { String s = stringFuture.get(); if ("success".equals(s)) { String sql = prefix + sb.substring(0, sb.length() - 1); // 构建完整SQL pst.addBatch(sql); // 添加执行SQL pst.executeBatch(); // 执行操作 session.getBasicRemote().sendText("success"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } conn.commit(); pst.close(); conn.close(); } Long end = new Date().getTime(); log.info("条数据从远程库导入到本地花费时间 : " + (end - begin) + " ms"); } catch (SQLException e) { // e.printStackTrace(); // 批量执行遇到异常直接 用log打印,不要中断 log.info(e.getMessage()); }finally { fixedThreadPool.shutdown(); // 一定要shutdown 否则线程只是被回收到了线程池 } }); return "over"; } }
3.4 使用在线工具 websocket测试
http://www.blue-zero.com/WebSocket/
测试效果截图 :
数据库效果截图 :