十萬級百萬級數據量的Excel文件導入並寫入數據庫


一.需求分析

  最近接到一個需求,導入十萬級,甚至可能百萬數據量的記錄了車輛黑名單的Excel文件,借此機會分析下編碼過程;

  首先將這個需求拆解,發現有三個比較復雜的問題:

 

  問題一:Excel文件導入后首先要被解析為存放對象的列表,數據量大的情況下可能會導致內存溢出,解析時間過長;

  問題二:插入數據庫的時候,數據量大,寫入的時間長

  問題三:要對數據庫中的現有數據進項判斷,不僅僅要做插入動作,還要將數據庫的數據與導入的數據對比,判斷是否做更新操作

  

  其中:

  問題一和問題三,可以看做同一類,因為主要涉及內存計算導致的性能問題,以及內存占用過大的溢出問題,

  關於這兩個問題,現在線上的機器基本上是4核8G的配置集群部署,內存並不是關鍵,我會在另一篇文章中給出我的方案,

  今天主要針對問題二,寫入的數據庫的問題給出我的方案,

  

  問題二主要是多次寫入數據庫的問題,顯然,如果有幾十萬條數據,那么是不可能連續寫幾十萬次的,不然要寫到后年馬月才能全部入庫,

  解決方案:

    這里我主要采用了多線程的寫入方式,十萬條數據,2000條寫一次(可以自己定義),用線程池提交多個線程任務同時寫入,提高性能

 

二.代碼環境

  Springboot2.1.3+POI+PGSQL

  controller層代碼

    @PostMapping("/upload")
    public void upload1(MultipartFile file, @Validated UploadReq req) throws Exception {
        //從數據庫查詢出現有的數據,根據去重的字段分組去構建成一個HashMap,通過containsKey()判斷
        //將需要更新的數據放到updateList中
        List<User> updateList=new ArrayList<>();

        //已取值的行數
        int rowNum = 0;
        //列號
        int colNum = 0;
        //真正有數據的行數
        int realRowCount = 0;
        //得到工作空間
        Workbook workbook = null;

        try {
            workbook = ExcelUtil.getWorkbookByInputStream(file.getInputStream(), file.getOriginalFilename());
        } catch (IOException e) {
            e.printStackTrace();
        }
        //得到工作表
        int numberOfSheets = workbook.getNumberOfSheets();
        for (int i = 0; i < numberOfSheets; i++) {
            Sheet sheet = ExcelUtil.getSheetByWorkbook(workbook, i)
            realRowCount = sheet.getPhysicalNumberOfRows();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            List<User> list = new ArrayList<>();
            User user = null;

            for(Row row:sheet) {
                if(realRowCount == rowNum) {
                    break;
                }
                //空行跳過
                if(ExcelUtil.isBlankRow(row)) {
                    continue;
                }
                if(row.getRowNum() == -1) {
                    continue;
                }else {
                    //第一行表頭跳過
                    if(row.getRowNum() == 0) {
                        continue;
                    }
                }
                rowNum ++;
                colNum = 1;
                user = new User();
                ExcelUtil.validCellValue(sheet, row, colNum, "id");
                user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                ExcelUtil.validCellValue(sheet, row, ++ colNum, "name");
                user.setId(Integer.valueOf(ExcelUtil.getCellValue(sheet, row, colNum - 1)));
                //判斷是否是已存在的數據,如果是就更新,不是就新增
                //updateList.add(user);
                list.add(user);
                
            }
            
            //新增的邏輯
            userService.saveBatch(list);
            System.out.println(list);
        }
    }

 

  service層代碼

@Service
public class UserServiceImpl implements IUserService {

    @Autowired
    private UserMapper userMapper;


    @Override
    public void saveBatch(List<User> list) throws Exception {
        //一個線程處理200條數據
        int count = 200;
        //數據集合大小
        int listSize = list.size();
        //開啟的線程數
        int runSize = (listSize / count) + 1;
        //存放每個線程的執行數據
        List<User> newlist = null;

        //創建一個線程池,數量和開啟線程的數量一樣
        //Executors 的寫法
        // ExecutorService executor = Executors.newFixedThreadPool(runSize);

        //ThreadPoolExecutor的寫法
        ThreadPoolExecutor executor = new ThreadPoolExecutor(runSize, runSize, 1,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        //創建兩個個計數器
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(runSize);
        //循環創建線程
        for (int i = 0; i < runSize; i++) {
            //計算每個線程執行的數據
            if ((i + 1) == runSize) {
                int startIndex = (i * count);
                int endIndex = list.size();
                newlist = list.subList(startIndex, endIndex);
            } else {
                int startIndex = (i * count);
                int endIndex = (i + 1) * count;
                newlist = list.subList(startIndex, endIndex);
            }
            //線程類
            ImportThread mythead = new ImportThread(newlist, begin, end,userMapper);
            //這里執行線程的方式是調用線程池里的executor.execute(mythead)方法。
            executor.execute(mythead);
        }
        begin.countDown();
        end.await();
        //執行完關閉線程池
        executor.shutdown();
    }

  線程類

public class ImportThread implements Runnable {


    public ImportThread() {
    }

    UserMapper userMapper;
    private List<User> list;
    private CountDownLatch begin;
    private CountDownLatch end;

    /**
     * 方法名: ImportThread
     * 方法描述: 創建個構造函數初始化 list,和其他用到的參數
     * @throws
     */
    public ImportThread(List<User> list, CountDownLatch begin, CountDownLatch end,UserMapper userMapper) {
        this.list = list;
        this.begin = begin;
        this.end = end;
        this.userMapper=userMapper;
    }

    @Override
    public void run() {
        try {
            //執行完讓線程直接進入等待
            userMapper.saveBatch(list);
            begin.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //這里要主要了,當一個線程執行完 了計數要減一不然這個線程會被一直掛起
            //這個方法就是直接把計數器減一的
            end.countDown();
        }
    }

}

 

  

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM