如何處理大體積 XLSX/CSV/TXT 文件?


在開發過程中,可能會遇到這樣的需求,我們需要從本地的 Excel 或 CSV 等文件中解析出信息,這些信息可能是考勤打卡記錄,可能是日歷信息,也可能是近期賬單流水。但是它們共同的特點是數據多且繁雜,人工錄入的工作量龐大容易出錯,需要花費大量時間。那有沒有什么方法能自動解析文件並獲取有用信息呢?

當這個文件數據量也不是很多的時候,有很多前端工具可供選擇。例如 SheetJS,就提供了從 Excel、CSV 中解析出用信息的很多方法,十分方便。

當數據量只是幾千條的程度的,選擇的余地很多,但是一旦數據量級增加,處理就變得復雜。如果 XLSX/CSV 數據量達到了 100w+ 條,Office、WPS 想打開看一下,都會需要很長的時間。

那又該如何從這樣大體積的 Excel/CSV/TXT 中解析出數據呢?

背景

下面我們通過一個假設的需求,來講述理解整個過程。假設我們需求是從本地 Excel、CSV、TXT(或者其他格式的)文件中解析出數據,並經過清洗后存入本地數據庫文件中。但是這些文件體積可能是 5M、50M、500M 甚至更大。那么在瀏覽器環境下如何上傳?Node 環境下應該如何解析?

首先,我們需要了解的是瀏覽器 Web 頁面如何上傳大體積文件?

Web 頁面如何上傳大體積文件?

Web 頁面一般也是可以上傳大文件的,但是會面臨一個問題。如果要上傳的數據比較大,那么整個上傳過程會比較漫長,再加上上傳過程的不確定因素,一旦失敗,那整個上傳就要從頭再來,耗時很長。

面對這個問題,我們可以通過將大文件分成多份小文件,每一次只上傳一份的方法來解決。這樣即使某個請求失敗了,也無需從頭開始,只要重新上傳失敗的那一份就好了。

如果想要使用這個方法,我們需要滿足以下幾項需求:

  • 大體積文件支持切片上傳
  • 可以斷點續傳
  • 可以得知上傳進度

首先看一下如何進行大文件切割。Web 頁面基本都是通過  來獲取本地文件的。 而通過 input 的 event.target.files 獲取到的 file,其實是一個 File 類的實例,是 Blob 類的子類。

Blob 對象表示一個不可變、原始數據的類文件對象。它的數據可以按文本或二進制的格式進行讀取,也可以轉換成 ReadableStream 來用於數據操作。 簡單理解合一將 Blob  看做二進制容器,表示存放着一個大的二進制文件。Blob 對象有一個很重要的方法:slice(),這里需要注意的是 Blob 對象是不可變的,slice 方法返回的是一個新的 Blob,表示所需要切割的二進制文件。

slice() 方法接受三個參數,起始偏移量,結束偏移量,還有可選的 mime 類型。如果 mime 類型,沒有設置,那么新的 Blob 對象的 mime 類型和父級一樣。而 File 接口基於 Blob,File 對象也包含了slice方法,其結果包含有源 Blob 對象中指定范圍的數據。

看完了切割的方法,我們就可以對二進制文件進行拆分了。拆分示例如下:

function sliceInPiece(file, piece = 1024 * 1024 * 5) {
  let totalSize = file.size; // 文件總大小
  let start = 0; // 每次上傳的開始字節
  let end = start + piece; // 每次上傳的結尾字節
  let chunks = []
  while (start < totalSize) {
    // 根據長度截取每次需要上傳的數據
    // File對象繼承自Blob對象,因此包含slice方法
    let blob = file.slice(start, end); 
    chunks.push(blob)

    start = end;
    end = start + piece;
  }
  return chunks
}

獲得文件切割后的數組后,就可以挨個調用接口上傳至服務端。


let file =  document.querySelector("[name=file]").files[0];

const LENGTH = 1024 * 1024 * 0.1;
let chunks = sliceInPiece(file, LENGTH); // 首先拆分切片

chunks.forEach(chunk=>{
  let fd = new FormData();
  fd.append("file", chunk);
  post('/upload', fd)
})

完成上傳后再至服務端將切片文件拼接成完整文件,讓 FileReader 對象從 Blob 中讀取數據。

當然這里會遇到兩個問題,其一是面對上傳完成的一堆切片文件,服務端要如知道它們的正確順序?其二是如果有多個大體積文件同時上傳,服務端該如何判斷哪個切片屬於哪個文件呢?

前后順序的問題,我們可以通過構造切片的 FormData 時增加參數的方式來處理。比如用參數 ChunkIndex 表示當前切片的順序。

而第二個問題可以通過增加參數比如 sourceFile 等(值可以是當前大體積文件的完整路徑或者更嚴謹用文件的 hash 值)來標記原始文件來源。這樣服務端在獲取到數據時,就可以知道哪些切片來自哪個文件以及切片之間的前后順序。

如果暫時不方便自行構架,也可以考慮使用雲服務,比如又拍雲存儲就支持大文件上傳和斷點續傳的。比如:

斷點續傳

在上傳大文件或移動端上傳文件時,因為網絡質量、傳輸時間過長等原因造成上傳失敗,可以使用斷點續傳。特別地,斷點續傳上傳的圖片不支持預處理。特別地,斷點續傳上傳的文件不能使用其他上傳方式覆蓋,如果需要覆蓋,須先刪除文件。

\

名稱概念

  • 文件分塊:直接切分二進制文件成小塊。分塊大小固定為 1M。最后一個分塊除外。
  • 上傳階段:使用 x-upyun-multi-stage 參數來指示斷點續傳的階段。分為以下三個階段: initate(上傳初始化), upload(上傳中), complete(上傳結束)。各階段依次進行。
  • 分片序號:使用 x-upyun-part-id 參數來指示當前的分片序號,序號從 0 起算。
  • 順序上傳:對於同一個斷點續傳任務,只支持順序上傳。
  • 上傳標識:使用 x-upyun-multi-uuid 參數來唯一標識一次上傳任務, 類型為字符串, 長度為 36 位。
  • 上傳清理:斷點續傳未完成的文件,會保存 24 小時,超過后,文件會被刪除。

可以看到,雲存儲通過分片序號 x-upyun-part-id 和上傳標識 x-upyun-multi-uuid 解決了我們前面提到的兩個問題。這里需要注意的是這兩個數據不是前端自己生成的,而是在初始化上傳后通過 responseHeader 返回的。

前文說的都是使用 Web 頁面要如何上傳大文件。接下來我們來看看 NodeJS 是如何解析、處理這類大體積文件呢?

NodeJS 解析大體積文件

首先需要明確一個概念 NodeJS 里沒有 File 對象,但是有 fs(文件系統) 模塊。fs 模塊支持標准 POSIX 函數建模的方式與文件系統進行交互。\

POSIX 是可移植操作系統接口 Portable Operating System Interface of UNIX 的縮寫。簡單來說 POSIX 就是在不同內核提供的操作系統下提供一個統一的調用接口,比如在 linux 下打開文件和在 widnows 下打開文件。可能內核提供的方式是不同的,但是因為 fs 是支持 POSIX 標准的,因此對程序猿來說無論內核提供的是什么,直接在 Node 里調 fsPromises.open(path, flags[, mode]) 方法就可以使用。

這里簡單用 Vue 舉例說明。Vue 在不同的環境下比如 Web 頁面或 Weex 等等的運行生成頁面元素的方式是不同的。比如在 Web 下的 createElement 是下方這樣:


export function createElement (tagName: string, vnode: VNode): Element {
  const elm = document.createElement(tagName)
  if (tagName !== 'select') {
    return elm
  }
  // false or null will remove the attribute but undefined will not
  if (vnode.data && vnode.data.attrs && vnode.data.attrs.multiple !== undefined) {
    elm.setAttribute('multiple', 'multiple')
  }
  return elm
}

在 Weex 下則是如下情況:

export function createElement (tagName: string): WeexElement {
  return document.createElement(tagName)
}

以上兩種情況下的 createElement 是不一樣的。同理,還有很多其他的創建模塊或者元素的方式也是不同的,但是針對不同平台,Vue 提供了相同的 patch 方法,來進行組件的更新或者創建。

import * as nodeOps from 'web/runtime![]()de-ops'\
import { createPatchFunction } from 'core![]()dom/patch'\
import baseModules from 'core![]()dom/modules/index'\
import platformModules from 'web/runtime/modules/index'\
\
// the directive module should be applied last, after all\
// built-in modules have been applied.\
const modules = platformModules.concat(baseModules)\
\
// nodeops 封裝了一系列DOM操作方法。modules定義了一些模塊的鈎子函數的實現\
export const patch: Function = createPatchFunction({ nodeOps, modules })
import * as nodeOps from 'weex/runtime![]()de-ops'\
import { createPatchFunction } from 'core![]()dom/patch'\
import baseModules from 'core![]()dom/modules/index'\
import platformModules from 'weex/runtime/modules/index'\
\
// the directive module should be applied last, after all\
// built-in modules have been applied.\
const modules = platformModules.concat(baseModules)\
\
export const patch: Function = createPatchFunction({\
  nodeOps,\
  modules,\
  LONG_LIST_THRESHOLD: 10\
})

這樣,無論運行環境的內部實現是否不同,只要調用相同的 patch 方法即可。而 POSIX 的理念是與上面所舉例的情況是相通的。

簡單了解了 POSIX,我們回到 fs 模塊。fs 模塊提供了很多讀取文件的方法,例如:

  • fs.read(fd, buffer, offset, length, position, callback)讀取文件數據。要操作文件,得先打開文件,這個方法的fd,就是調用 fs.open 返回的文件描述符。
  • fs.readFile(path[, options], callback) 異步地讀取文件的全部內容。可以看做是fs.read的進一步封裝。

使用場景如下:

import { readFile } from 'fs';

readFile('/etc/passwd','utf-8', (err, data) => {
  if (err) throw err;
  console.log(data);
});

因為 fs.readFile 函數會緩沖整個文件,如果要讀取的文件體積較小還好,但是如果文件體積較大就會給內存造成壓力。那有沒有對內存壓力較小的方式來讀取文件呢?

有的,我們今天的主角 stream 流登場。

stream

stream 流是用於在 Node.js 中處理流數據的抽象接口。 stream 模塊提供了用於實現流接口的 API。流可以是可讀的、可寫的、或兩者兼而有之。

fs 模塊內有個 fs.createReadStream(path[, options])方法,它返回的是一個可讀流,默認大小為 64k,也就是緩沖 64k。一旦內部讀取緩沖區達到這個閾值,流將暫時停止從底層資源讀取數據,直到消費當前緩沖的數據。

消費數據的方法可以是調 pipe() 方法,也可以被事件直接消費。

// pipe 消費
readable.pipe(writable)

// 或者
// 事件消費
readable.on('data', (chunk) => {
  writable.write(chunk);
});
readable.on('end', () => {
  writable.end();
});

除了可讀流,也有可寫流 fs.createWriteStream(path[, options]), 可以將數據寫入文件中。

好了,所需要的前置知識基本就介紹完畢了,回到正題。假如我們有一個文件夾,里面存放着數十個 XLSX/CSV 文件,且每一個體積都超過了 500M。那該如何從這些文件中讀取信息,並寫入數據庫文件中呢?

批量解析 CSV 文件

假設我們需要解析的文件路徑已經是知道的,可以通過路徑獲取到文件,那么將這些路徑存入一個數組並命名為 needParseArr,我們需要按照順序一個個解析這些  CSV、XLSX 文件信息,並清洗然后寫入數據庫。

首先,是一個個讀的邏輯 (readOneByOne)。

async readOneByOne () {
   try {
    for (let i = 0; i < needParsePathArr.length; i++) {
      const filePath = needParsePathArr[i]
      console.log(`解析到第${i}個文件,文件名:${filePath}`)
      await streamInsertDB(filePath)
    }
  } catch (err) {

  }
}

streamInsertDB 是我們的主要邏輯的入口。

async function streamInsertDB (filePath) {
  return new Promise((resolve, reject) => {
    const ext = path.extname(filePath)
    // 判斷了下文件類型
    if (ext === '.csv') {
      // 解析csv
      parseAndInsertFromCSV(filePath, resolve, reject)
    } else if (ext === '.xlsx') {
      // 自執行函數
      (async function getName () {
        try {
          // 先轉換成csv。也可以不轉換,直接解析xlsx,后文會詳細解釋。
          const csvFileName = await convertXlsx2Csv(filePath)
          // 復用解析csv的邏輯
          parseAndInsertFromCSV(csvFileName, resolve, reject)
        } catch (error) {
          reject(`error: ${error.message || error}`)
        }
      })()
    }
  })
}

parseAndInsertFromCSV 中就是使用我們前面所提到的知識點的主要陣地。 下面簡單介紹一下各個函數:

  • chardet:這個函數的作用是監測 CSV 文件的編碼格式的,畢竟不是每個 CSV 都是 UTF-8 編碼,帶中文的 CSV 編碼類型可能是 GBK 或者 GB18030、GB18031 等等,這種格式不經過處理直接讀取,中文會顯示為亂碼。所以需要執行轉換的函數 iconv 轉換一下。
  • pipe:可以用來建立管道鏈,可以理解為 pipe 的作用就像一個管道,可以對目標流邊讀邊寫,這里我們是一邊解碼一邊重新編碼。
  • insertInBlock:這個函數是獲取到一定數量的數據后(本例中是從 CSV 中解析出 3 萬條左右數據的時候),暫停一下來執行一些操作,比如寫入數據庫或者對里面的數據進行過濾、處理等等,根據實際需要來定。
  • csv:這個函數的作用就是讀出流中的具體數據的。

具體邏輯解釋可以看注釋。

const chardet = require('chardet');
const csv = require('fast-csv'); // 比較快解析csv的速度的工具
const iconv = require('iconv-lite');

const arrayFromParseCSV = []  // 存放解析出來的一行行csv數據的
let count = 0 // 計數
// resolve, reject 是外部函數傳進來的,用以判斷函數執行的狀態,以便正確的進行后續邏輯處理
function parseAndInsertFromCSV (filePath, resolve, reject) {
  const rs = fs.createReadStream(filePath)  // 創建可讀流
  // 這里的防抖和柯里化
  const delayInsert = debounce((isEnd, cb = () => {}) => insertInBlock(isEnd, cb, rs, resolve, reject), 300)
  /// sampleSize: 5120 表示值讀取文件前5120個字節的數據,就可以判斷出文件的編碼類型了,不需要全部讀取
  chardet.detectFile(filePath, { sampleSize: 5120 }).then(encoding => {
    // 如果不是UTF-8編碼,轉換為utf8編碼
    if (encoding !== 'UTF-8') {
      rs.pipe(iconv.decodeStream(encoding))
        .pipe(iconv.encodeStream('UTF-8'))
        .pipe(csv.parse({ header: false, ignoreEmpty: true, trim: true })) // 解析csv
        .on('error', error => {
          reject(`解析csv error: ${error}`)
        })
        .on('data', rows => {
          count++ // 計數,因為我們要分塊讀取和操作
          arrayFromParseCSV.push(rows) // 讀到就推送到數組中
          if (count > 30000) { // 已經讀了30000行,我們就要先把這3w行處理掉,避免占用過多內存。
            rs.pause() // 暫停可讀流
            delayInsert(false) // false 還沒有結束。注意:即使rs.pause, 流的讀取也不是立即暫停的,所以需要防抖。
          }          
        }).on('end', rowCount => {
          console.log(`解析完${filePath}文件一共${rowCount}行`)
          delayInsert(true, () => {
            rs.destroy() // 銷毀流
            resolve('ok') // 一個文件讀取完畢了
          })
        })
    }
  })
}

清洗數據和后續操作的邏輯在 insertInBlock 里。

function insertInBlock (isEnd, cb, filePath, resolve, reject) {
  const arr = doSomethingWithData() // 可能會有一些清洗數據的操作
  // 假如我們后續的需求是將數據寫入數據庫
  const batchInsert = () => {
    batchInsertDatabasePromise().then(() => {
      if (cb && typeof cb === 'function') cb()
      !isEnd && rs.resume() // 這一個片段的數據寫入完畢,可以恢復流繼續讀了
    })
  }
  
  const truely = schemaHasTable() // 比如判斷數據庫中有沒有某個表,有就寫入。沒有先建表再寫入。
  if (truely) { //
     batchInsert()
   } else {
     // 建表或者其他操作,然后再寫入
     doSomething().then(() => batchInsert())
  }
}

這樣,解析和寫入的流程就完成了。雖然很多業務上的代碼進行了簡略,但實現上大體類似這個流程。

批量解析 XLSX 文件

轉化成 CSV?

在前面的代碼實例中,我們利用了利用可寫流 fs.createWriteStream 將 XLSX 文件轉換成 CSV 文件然后復用解析 CSV 。這里需要注意的是,在將數據寫入 CSV 格式文件時,要在最開始寫入 bom 頭 \ufeff。此外也可以用 xlsx-extract 的 convert 函數,將 XLSX 文件轉換成 TSV。


const { XLSX } = require('xlsx-extract')
new XLSX().convert('path/to/file.xlsx', 'path/to/destfile.tsv')
    .on('error', function (err) {
        console.error(err);
    })
    .on('end', function () {
        console.log('written');
    })

可能有人會疑惑,不是 CSV 么,怎么轉換成了 TSV 呢?

其實 tsv 和 CSV 的區別只是字段值的分隔符不同,CSV 用逗號分隔值(Comma-separated values),而 TSVA 用的是制表符分隔值 (Tab-separated values)。前面我們用來快速解析 CSV 文件的 fast-csv 工具是支持選擇制表符\t作為值的分隔標志的。

import { parse } from '@fast-csv/parse';
const stream = parse({ delimiter: '\t' })
    .on('error', error => console.error(error))
    .on('data', row => console.log(row))
    .on('end', (rowCount: number) => console.log(`Parsed ${rowCount} rows`));

直接解析?

那是否可以不轉換成 CSV,直接解析 XLSX 文件呢 ?其實也是可行的。

const { xslx } = require('xlsx-extract') // 流式解析xlsx文件工具
// parser: expat, 需要額外安裝node-expat,可以提高解析速度。
new XLSX().extract(filePath, { sheet_nr: 1, parser: 'expat' })
    .on('row', function (row) {
        // 每一行數據獲取到時都可以觸發
      }).on('error', function (err) {
        // error
     });

但是這種方式有一個缺陷,一旦解析開始,就無法暫停數據讀取的流程。xlsx-extract 封裝了 sax,沒有提供暫停和繼續的方法。

如果我們直接用可讀流去讀取 XLSX 文件會怎么樣呢?

const readStream = fs.createReadableStream('path/to/xlsx.xlsx')

可以看到現在流中數據以 buffer 的形式存在着。但由於 xlsx 格式實際上是一個 zip 存檔的壓縮格式,存放着 XML 結構的文本信息。所以可讀流無法直接使用,需要先解壓縮。

解壓縮可以使用 npm 包 unzipper 。


const unzip = require('unzipper')
const zip = unzip.Parse();
rs.pipe(zip)
  .on('entry', function (entry) {
    console.log('entry ---', entry);
    const fileName = entry.path;
    const { type } = entry; // 'Directory' or 'File'
    const size = entry.vars.uncompressedSize; // There is also compressedSize;
    if (fileName === "this IS the file I'm looking for") {
      entry.pipe(fs.createWriteStream('output/path'));
    } else {
      entry.autodrain();
    }
  })

現在我們已經解壓了文件。

前面提到,xlsx-extract 是 封裝了 sax,而 sax 本身就是用來解析 XML 文本的,那我們這里也可以使用 sax 來對可讀流進行處理。

sax 解析的源碼可以看這里,大致是根據每一個字符來判斷其內容、換行、開始、結束等等,然后觸發對應事件。

const saxStream = require('sax').createStream(false);
saxStream.on('error', function (e) {
  console.error('error!', e);
});
saxStream.on('opentag', function (node) {
  console.log('node ---', node);
});
saxStream.on('text', (text) => console.log('text ---', typeof text, text));

最后將兩者結合起來:

const unzip = require('unzipper');
const saxStream = require('sax').createStream(false);
const zip = unzip.Parse();

saxStream.on('error', function (e) {
  console.error('error!', e);
});
saxStream.on('opentag', function (node) {
  console.log('node ---', node);
});
saxStream.on('text', (text) => {
    console.log('text ---', typeof text, text)
});

rs.pipe(zip)
  .on('entry', function (entry) {
    console.log('entry ---', entry);
    entry.pipe(saxStream)
  })

使用本地的 XLSX 文件測試后,控制台打印出以下信息:

這些信息對應着 XLSX 文檔里的這部分信息。Node 里打印的 ST SI,代表着 xml 的標簽。

這樣,其實我們也拿到了 XLSX 里的數據了,只不過這些數據還需要清洗、匯總、一一對應。同時由於我們是直接在可讀流上操作,自然也可以 pause、resume 流,來實現分塊讀取和其他操作的邏輯。

總結

對體積較小的 XLSX、CSV 文件,基本 SheetJS 就可以滿足各種格式文件的解析需求了,但是一旦文檔體積較大,那么分片、流式讀寫將成為必不可少的方式。

通過前面例子和代碼的分解,我們可以了解這類問題的解決辦法,也可以拓展對類似需求的不同解決思路。一旦我們能對大體積文件的分塊處理有一定的概念和了解,那么在遇到類似問題的時候,就知道實現思路在哪里了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM