1. 項目背景 : 前幾天以及最近的博客,都在完成一個接口,前端點擊“導入”按鈕,后台完成大數據量導入,並且在導入的過程中持續告知導入情況;
2. 思考分析 : 大數據量導入,假設是10W條,那么如果是使用關系型數據庫則可以使用jdbc的fetch游標移動完成工作,並且在插入的時候可以使用多線程加快速度,
在最開始的時候查詢出總數據量,除以我們規定的5000次導入一次就得到了子線程應當執行的次數,並返回給前端,之后子線程開始執行,每執行一次,返回一個
狀態碼給前端,則前端依據此2個環節可推斷進度,並能顯示對應的進度條效果。
3. 代碼實踐 :
3.1 pom 加入 websokcet依賴
<!-- 引入websokcet --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>1.3.5.RELEASE</version> </dependency>
3.2 springboot 注入 websocket配置類

package root.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @Date: 2018/10/25 10:10 * @Description: 注入 websocekt配置類 */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
3.3 springboot 加入自己要對外的websocket類

package root.websocket; import org.apache.ibatis.session.SqlSession; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import root.report.control.dict.TestTwo; import root.report.db.DbFactory; import root.report.service.DictService; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * @Date: 2018/10/25 10:11 * @Description: 注入dictSocket接口 */ @ServerEndpoint(value = "/websocket/dictSocket") @Component public class ImportDictValueSocket { private static Logger log = Logger.getLogger(ImportDictValueSocket.class); private DictService dictService; //此處是解決無法注入的關鍵 private static ApplicationContext applicationContext; public static void setApplicationContext(ApplicationContext applicationContext) { ImportDictValueSocket.applicationContext = applicationContext; } //靜態變量,用來記錄當前在線連接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每個客戶端對應的MyWebSocket對象。 private static CopyOnWriteArraySet<ImportDictValueSocket> webSocketSet = new CopyOnWriteArraySet<ImportDictValueSocket>(); //與某個客戶端的連接會話,需要通過它來給客戶端發送數據 private Session session; /** * 連接建立成功調用的方法*/ @OnOpen public void onOpen(Session session) { this.session = session; webSocketSet.add(this); //加入set中 addOnlineCount(); //在線數加1 /*System.out.println("有新連接加入!當前在線人數為" + getOnlineCount()); try { sendMessage("當前在線人數"+getOnlineCount()+",當前sessionID:"+this.session.getId()); } catch (IOException e) { System.out.println("IO異常"); }*/ } /** * 連接關閉調用的方法 */ @OnClose public void onClose() { webSocketSet.remove(this); //從set中刪除 subOnlineCount(); //在線數減1 log.info("有一連接關閉!當前在線人數為" + getOnlineCount()); } /** * 收到客戶端消息后調用的方法 * * @param message 客戶端發送過來的消息*/ @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { log.info("來自客戶端的消息:" + message); // 調用 TestTwo的 導入方法,一旦連接成功 , 則 持續通信 try { dictService = applicationContext.getBean(DictService.class); String result = dictService.importFuncDictValueByDictId(session,Integer.parseInt(message)); // 需要加鎖與否? synchronized (result){ log.info("結果為"+result); sendMessage(result); // 返回 "over" session.close(); // 關閉掉 } }catch (Exception e){ log.error(e.getMessage()); sendMessage(e.getMessage()); } //群發消息 /* for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { e.printStackTrace(); } }*/ } /** * 發生錯誤時調用 @OnError public void onError(Session session, Throwable error) { System.out.println("發生錯誤"); error.printStackTrace(); } **/ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 群發自定義消息 * */ public static void sendInfo(String message) throws IOException { for (ImportDictValueSocket item : webSocketSet) { try { item.sendMessage(message); } catch (IOException e) { continue; } } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { ImportDictValueSocket.onlineCount++; } public static synchronized void subOnlineCount() { ImportDictValueSocket.onlineCount--; } }
3.3-1 注意到 : 我們在 websocket 當中 無法正常引入 @Service等 業務class,這些必須手動在 websocket生成,或者我們從 容器當中讀到,所以我們要在啟動類當中做如下處理:
public static void main(String[] args) { ConfigurableApplicationContext configurableApplicationContext = SpringApplication.run(ReportServerApplication.class, args); // 解決websocket 當中無法注入bean的方法 ImportDictValueSocket.setApplicationContext(configurableApplicationContext); }
3.3-2 業務類代碼

package root.report.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import org.apache.commons.lang3.StringUtils; import org.apache.ibatis.session.SqlSession; import org.apache.log4j.Logger; import org.dom4j.*; import org.dom4j.io.OutputFormat; import org.dom4j.io.XMLWriter; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.xml.sax.SAXException; import root.configure.AppConstants; import root.configure.MybatisCacheConfiguration; import root.report.db.DbFactory; import root.report.util.JsonUtil; import root.report.util.XmlUtil; import javax.websocket.Session; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.math.BigDecimal; import java.sql.*; import java.util.*; import java.util.Date; import java.util.concurrent.*; import java.util.function.Consumer; @Service public class DictService { // 測試使用 fetch 按照指定規格讀取數據 , 源表為 test_dict 目標表名為 test_import public void fetchBySql(SqlSession sqlSession,String sourceSql,Consumer<ResultSet> consumer){ // 通過 conn 得到 真正的連接(要解析 數據庫名,從而真正的 數據庫連接通道) // 而在我們這里則不需要這么復雜,前面過程都由DbFactory 完成了,傳遞進來的sqlSession 就是一個 連接好的通道 Connection conn = sqlSession.getConnection(); PreparedStatement stm = null; // 開始時間 Long begin = new Date().getTime(); String sql = sourceSql; try { final int countRow = 1000; stm = conn.prepareStatement(sql,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY); // stm.setFetchSize(Integer.MIN_VALUE); // 設置游標 1000 每次讀取一千行數據 stm.setFetchSize(countRow); stm.setFetchDirection(ResultSet.FETCH_REVERSE); // ResultSet rs = stm.executeQuery(); // rs.setFetchSize(Integer.MIN_VALUE); consumer.accept(rs); // 把 當前的 rs 傳遞給消費者 }catch (Exception e){ e.printStackTrace(); } } public String importFuncDictValueByDictId(Session session, int dict_id){ // 先決條件 : 根據 dict_id 得到 sourceSql SqlSession sqlSession = DbFactory.Open(DbFactory.FORM); String dbName = sqlSession.selectOne("dict.getDictDbByDictId",dict_id); if(StringUtils.isBlank(dbName)) return("此DictId所對應的數據庫為空,無法操作!"); final int countRow = 1000; final List<Map<String, Object>> list = new ArrayList<>(); // 初始化對應的數據庫 SqlSession sourceSqlSession = DbFactory.Open(dbName); if(sourceSqlSession==null) return("數據庫無法連接,無法操作!"); String sourceSql = null; try { sourceSql = this.getSqlTemplate("數據字典",String.valueOf(dict_id),false); } catch (Exception e){ log.info(e.getMessage()); return "無法從xml文件得到對應的sql,無法查詢!"; } Long begin = new Date().getTime(); Connection conn = sqlSession.getConnection(); final int poolSize = 5; ExecutorService fixedThreadPool = Executors.newFixedThreadPool(poolSize); fetchBySql(sourceSqlSession,sourceSql,rs -> { try { String prefix = "INSERT INTO func_dict_value (dict_id,value_code,value_name) VALUES "; final StringBuffer suffix = new StringBuffer(); // 設置事務為非自動提交 conn.setAutoCommit(false); // 非提交能減少日志的生成,從而加快執行速度 PreparedStatement pst = (PreparedStatement) conn.prepareStatement(""); List<Map> mapList = new ArrayList<>(); if (rs != null) { rs.last(); // 移動到最后面 BigDecimal result = new BigDecimal((double) rs.getRow()/5000).setScale(0, BigDecimal.ROUND_UP); int countSize = result.intValue(); try { session.getBasicRemote().sendText(String.valueOf(countSize)); // 第一次直接發送給websocket 客戶端1個 本次執行總數統計. } catch (IOException e) { e.printStackTrace(); } rs.first(); // 移動到最前面 // 坑處1,如果使用了firt的話,那么直接達到了第一條,則不要使用 while(rs.next()) if(rs!=null && StringUtils.isNotBlank(rs.getInt("code")+"")){ Map<String, Object> map = new HashMap<>(); map.put("code",rs.getInt("code")); map.put("name",rs.getString("name")); mapList.add(map); } while (rs.next()) { Map<String, Object> map = new HashMap<>(); map.put("code",rs.getInt("code")); map.put("name",rs.getString("name")); mapList.add(map); if(rs.getRow()%5000==0){ // 執行批量插入操作 List<Map> finalMapList = mapList; final StringBuffer sb = new StringBuffer(); Future<String> stringFuture = fixedThreadPool.submit( new Callable<String>() { @Override public String call() throws Exception { String name = Thread.currentThread().getName(); long threadId = Thread.currentThread().getId(); log.info("thread name: "+name+",id為"+threadId+"執行了一次"); for (Map tempMap : finalMapList) { // 構建SQL后綴 sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),"); } return "success"; } } ); mapList = new ArrayList<>(); try { String s = stringFuture.get(); if ("success".equals(s)) { String sql = prefix + sb.substring(0, sb.length() - 1); // 構建完整SQL pst.addBatch(sql); // 添加執行SQL pst.executeBatch(); // 執行操作 session.getBasicRemote().sendText("success"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } } // 執行完之后 mapList 是個 不足5000個的,這個時候我們再去執行一次 添加操作 List<Map> finalMapList = mapList; System.out.println("mapList大小為:"+mapList.size()); System.out.println("finalMapList:"+finalMapList.size()); final StringBuffer sb = new StringBuffer(); Future<String> stringFuture = fixedThreadPool.submit( new Callable<String>() { @Override public String call() throws Exception { String name = Thread.currentThread().getName(); long threadId = Thread.currentThread().getId(); log.info("thread name: "+name+"id為"+threadId+"執行了一次"); for (Map tempMap : finalMapList) { // 構建SQL后綴 sb.append("(" +dict_id+",'"+ tempMap.get("code") + "'," + "'" + tempMap.get("name") + "'),"); } return "success"; } } ); try { String s = stringFuture.get(); if ("success".equals(s)) { String sql = prefix + sb.substring(0, sb.length() - 1); // 構建完整SQL pst.addBatch(sql); // 添加執行SQL pst.executeBatch(); // 執行操作 session.getBasicRemote().sendText("success"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } conn.commit(); pst.close(); conn.close(); } Long end = new Date().getTime(); log.info("條數據從遠程庫導入到本地花費時間 : " + (end - begin) + " ms"); } catch (SQLException e) { // e.printStackTrace(); // 批量執行遇到異常直接 用log打印,不要中斷 log.info(e.getMessage()); }finally { fixedThreadPool.shutdown(); // 一定要shutdown 否則線程只是被回收到了線程池 } }); return "over"; } }
3.4 使用在線工具 websocket測試
http://www.blue-zero.com/WebSocket/
測試效果截圖 :
數據庫效果截圖 :