一.需求分析
最近接到一個需求,導入十萬級,甚至可能百萬數據量的記錄了車輛黑名單的Excel文件,借此機會分析下編碼過程;
首先將這個需求拆解,發現有三個比較復雜的問題:
問題一:Excel文件導入后首先要被解析為存放對象的列表,數據量大的情況下可能會導致內存溢出,解析時間過長;
問題二:插入數據庫的時候,數據量大,寫入的時間長
問題三:要對數據庫中的現有數據進項判斷,不僅僅要做插入動作,還要將數據庫的數據與導入的數據對比,判斷是否做更新操作
其中:
問題一和問題三,可以看做同一類,因為主要涉及內存計算導致的性能問題,以及內存占用過大的溢出問題,
關於這兩個問題,現在線上的機器基本上是4核8G的配置集群部署,內存並不是關鍵,我會在另一篇文章中給出我的方案,
今天主要針對問題二,寫入的數據庫的問題給出我的方案,
問題二主要是多次寫入數據庫的問題,顯然,如果有幾十萬條數據,那么是不可能連續寫幾十萬次的,不然要寫到后年馬月才能全部入庫,
解決方案:
這里我主要采用了多線程的寫入方式,十萬條數據,2000條寫一次(可以自己定義),用線程池提交多個線程任務同時寫入,提高性能
二.代碼環境
Springboot2.1.3+POI+PGSQL
controller層代碼
@PostMapping("/upload") public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception { //從數據庫查詢出現有的數據,根據去重的字段分組去構建成一個HashMap,通過containsKey()判斷 //將需要更新的數據放到updateList中 List<User> updateList=new ArrayList<>(); //已取值的行數 int rowNum = 0; //列號 int colNum = 0; //真正有數據的行數 int realRowCount = 0; //得到工作空間 Workbook workbook = null; try { workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename()); } catch (IOException e) { e.printStackTrace(); } //得到工作表 int numberOfSheets = workbook.getNumberOfSheets(); for (int i = 0; i < numberOfSheets; i++) { Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i) realRowCount = sheet.getPhysicalNumberOfRows(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); List<User> list = new ArrayList<>(); User user = null; for(Row row:sheet) { if(realRowCount == rowNum) { break; } //空行跳過 if(ExcelUtil.isBlankRow(row)) { continue; } if(row.getRowNum() == -1) { continue; }else { //第一行表頭跳過 if(row.getRowNum() == 0) { continue; } } rowNum ++; colNum = 1; user = new User(); ExcelUtil.validCellValue(sheet, row, colNum, "id"); user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1))); ExcelUtil.validCellValue(sheet, row, ++ colNum, "name"); user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1))); //判斷是否是已存在的數據,如果是就更新,不是就新增 //updateList.add(user); list.add(user); } //新增的邏輯 userService.saveBatch(list); System.out.println(list); } }
service層代碼
@Service public class UserServiceImpl implements IUserService { @Autowired private UserMapper userMapper; @Override public void saveBatch(List<User> list) throws Exception { //一個線程處理200條數據 int count = 200; //數據集合大小 int listSize = list.size(); //開啟的線程數 int runSize = (listSize / count) + 1; //存放每個線程的執行數據 List<User> newlist = null; //創建一個線程池,數量和開啟線程的數量一樣 //Executors 的寫法 // ExecutorService executor = Executors.newFixedThreadPool(runSize); //ThreadPoolExecutor的寫法 ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); //創建兩個個計數器 CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(runSize); //循環創建線程 for (int i = 0; i < runSize; i++) { //計算每個線程執行的數據 if ((i + 1) == runSize) { int startIndex = (i * count); int endIndex = list.size(); newlist = list.subList(startIndex, endIndex); } else { int startIndex = (i * count); int endIndex = (i + 1) * count; newlist = list.subList(startIndex, endIndex); } //線程類 ImportThread mythead = new ImportThread(newlist, begin, end,userMapper); //這里執行線程的方式是調用線程池里的executor.execute(mythead)方法。 executor.execute(mythead); } begin.countDown(); end.await(); //執行完關閉線程池 executor.shutdown(); }
線程類
public class ImportThread implements Runnable { public ImportThread() { } UserMapper userMapper; private List<User> list; private CountDownLatch begin; private CountDownLatch end; /** * 方法名: ImportThread * 方法描述: 創建個構造函數初始化 list,和其他用到的參數 * @throws */ public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) { this.list = list; this.begin = begin; this.end = end; this.userMapper=userMapper; } @Override public void run() { try { //執行完讓線程直接進入等待 userMapper.saveBatch(list); begin.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //這里要主要了,當一個線程執行完 了計數要減一不然這個線程會被一直掛起 //這個方法就是直接把計數器減一的 end.countDown(); } } }