用例:有N個文件,每個文件只有一列主鍵,每個文件代表一種屬性。即當如PRI1主鍵在A文件中,說明PRI1具有A屬性。這種場景,一般用於數據的篩選,比如需要既有屬性A又有屬性B的主鍵有哪些?就是這類場景。
如何處理該場景?
1. 解題思路
如果拋卻如題所說文件限制,那我們如何解決?
比如,我們可以將每個文件數據導入到redis中,數據結構為hash, redis-key為pri主鍵,hash-key為屬性X, hash-value為1或不存在。在做判定的時候,只需找到對應的key, 再去判斷其是否具有對應屬性即可解決問題了。
這個方案看起來比較合適,但有兩個缺點:1. redis內存數據庫,容量有限,不一定能滿足大數據量的場景; 2. 針對反向查詢的需求無法滿足,即想要查找既含有A屬性又含有B屬性的主鍵列表,就很難辦到。
再比如,我們可以使用類似於mysql之類的關系型數據,先將單文件數據導致單表中,表名以相應屬性標識命名,然后以sql形式進行臨時計算即可。sql參考如下:
select COALESCE(ta.id,tb.id) as id, case when ta.id is not null then 1 else 0 end as ta_flag, case when tb.id is not null then 1 else 0 end as tb_flag from table_a as ta full join table_b as tb on ta.id=tb.id;
應該說這種解決方案算是比較好的了,在計算不大的情況下,這種復雜度在數據庫領域簡直是小場面了。需要再次說明的是,在數據庫會新建一個個的小表,它只有一列主鍵數據,然后在查詢的時候再進行計算。這種方案的問題在於,當標識越來越多之后,就會導致小表會越來越多,甚至可能超出數據庫限制。原本是一個一般的需求,卻要要求非常好數據庫支持,也不太好嘛。
不過,上面這個問題,也可以解決。比如我們可以使用行轉列的形式,將以上小表轉換成一張大表,隨后將小表刪除,從而達到數據庫的普通要求。合並語句也不復雜。參考如下:
create table w_xx as select COALESCE(ta.id,tb.id) as id, case when ta.id is not null then 1 else 0 end as ta_flag, case when tb.id is not null then 1 else 0 end as tb_flag from table_a as ta full join table_b as tb on ta.id=tb.id;
如此,基本完美了。
2. 基於文件的行轉列數據join
如果我沒有外部存儲介質,那當如何?如題,直接基於文件,將多個合並起來。看起來並非難事。
如果不考慮內存問題,則可以將每個文件讀入為list, 轉換為map存儲,和上面的redis實現方案類似。只是可能不太現實,也比較簡單,忽略實現。
再簡單化,如果我們每個文件中保存的主鍵都是有序的,要想合並就更簡單了。
基本思路是,兩兩文件合並,依次讀取行,然后比對是否有相等的值,然后寫到新文件中即可。
另外,如果要做並行計算,可以考慮使用上一篇文章提到的 fork/join 框架,非常合場景呢。
2.1. 文件行轉列合並主體框架
主要算法為依次遍歷各文件,進行數據判定,然后寫目標文件。具體實現如下:
/** * 功能描述: 文件合並工具類 * */ @Slf4j public class FileJoiner { /** * router結果文件分隔符 */ private static final String CSV_RESULT_FILE_SEPARATOR = ","; /** * 合並文件語義,等價sql: * select coalesce(a.id, b.id, c.id...) id, * case when a.id is not null then '1' else '' end f_a, * case when b.id is not null then '1' else '' end f_b, * ... * from a * full join b on a.id = b.id * full join c on a.id = c.id * ... * ; */ public JoinFileDescriptor joinById(JoinFileDescriptor a, JoinFileDescriptor b) throws IOException { JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); if(a.getLineCnt() <= 0 && b.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(a.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先b后a fieldDesc.addAll(b.getFieldInfo()); fieldDesc.addAll(a.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } if(b.getLineCnt() <= 0) { List<FileFieldDesc> fieldDesc = new ArrayList<>(); // 先a后b fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } // 正式合並 a b 表 String mergedPath = a.getPath() + ".m" + a.getDeep(); long cnt = -1; try(BufferedReader aReader = new BufferedReader(new FileReader(a.getPath()))) { try(BufferedReader bReader = new BufferedReader(new FileReader(b.getPath()))) { a.setReader(aReader); b.setReader(bReader); try(OutputStream outputStream = FileUtils.openOutputStream(new File(mergedPath))) { cnt = unionTwoBufferStream(a, b, outputStream); } } } mergedDesc.setPath(mergedPath); mergedDesc.setLineCnt(cnt); mergedDesc.incrDeep(); // 先a后b List<FileFieldDesc> fieldDesc = new ArrayList<>(); a.getFieldInfo().forEach(FileFieldDesc::writeOk); b.getFieldInfo().forEach(FileFieldDesc::writeOk); fieldDesc.addAll(a.getFieldInfo()); fieldDesc.addAll(b.getFieldInfo()); mergedDesc.setFieldInfo(fieldDesc); return mergedDesc; } /** * 合並多文件,無序的,但各字段位置可定位 * * @param fileList 待合並的文件列表 * @param orderedFieldList 需要按序排列 * @return 合並后文件信息及字段列表 * @throws Exception 合並出錯拋出 */ public JoinFileDescriptor joinMultiFile(List<JoinFileDescriptor> fileList, List<String> orderedFieldList) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); FileJoinFJTask fjTask = new FileJoinFJTask(fileList); ForkJoinTask<JoinFileDescriptor> future = forkJoinPool.submit(fjTask); JoinFileDescriptor mergedFile = future.get(); // List<String> orderedFieldList = new ArrayList<>(); // for (JoinFileDescriptor file1 : fileList) { // List<String> field1 = file1.getFieldInfo().stream() // .map(FileFieldDesc::getFieldName) // .collect(Collectors.toList()); // orderedFieldList.addAll(field1); // } return rewriteFileBySelectField(mergedFile, orderedFieldList); } /** * 按照要求字段順序重寫文件內容 * * @param originFile 當前文件描述 * @param orderedFields 目標字段序列 * @return 處理好的文件實例(元數據或獲取) * @throws IOException 寫文件異常拋出 */ public JoinFileDescriptor rewriteFileBySelectField(JoinFileDescriptor originFile, List<String> orderedFields) throws IOException { List<FileFieldDesc> fieldDescList = originFile.getFieldInfo(); if(checkIfCurrentFileInOrder(fieldDescList, orderedFields)) { log.info("當前文件已按要求排放好,無需再排: {}", orderedFields); return originFile; } Map<String, FieldOrderIndicator> indicatorMap = composeFieldOrderIndicator(fieldDescList, orderedFields); AtomicLong lineCounter = new AtomicLong(0); String targetFilePath = originFile.getPath() + ".of"; try(BufferedReader aReader = new BufferedReader(new FileReader(originFile.getPath()))) { try(OutputStream outputStream = FileUtils.openOutputStream(new File(targetFilePath))) { String lineData; while ((lineData = aReader.readLine()) != null) { String[] cols = StringUtils.splitPreserveAllTokens( lineData, CSV_RESULT_FILE_SEPARATOR); // 空行 if(cols.length == 0) { continue; } // id,1,... StringBuilder sb = new StringBuilder(cols[0]); for (String f1 : orderedFields) { sb.append(CSV_RESULT_FILE_SEPARATOR); FieldOrderIndicator fieldDescIndicator = indicatorMap.get(f1); if(fieldDescIndicator == null || (fieldDescIndicator.fieldIndex >= cols.length && fieldDescIndicator.fieldDesc.getWriteFlag() == 1)) { continue; } sb.append(cols[fieldDescIndicator.fieldIndex]); } writeLine(outputStream, sb.toString(), lineCounter); } } } JoinFileDescriptor mergedDesc = new JoinFileDescriptor(); mergedDesc.setPath(targetFilePath); mergedDesc.setLineCnt(lineCounter.get()); mergedDesc.setFieldInfo( orderedFields.stream() .map(r -> FileFieldDesc.newField(r, 1)) .collect(Collectors.toList())); return mergedDesc; } /** * 構造字段下標指示器 * * @param currentFieldDescList 當前字段排列情況 * @param orderedFields 目標序列的字段列表 * @return {"a":{"fieldIndex":1, "fieldDesc":{"name":"aaa", "writeFlag":1}}} */ private Map<String, FieldOrderIndicator> composeFieldOrderIndicator(List<FileFieldDesc> currentFieldDescList, List<String> orderedFields) { Map<String, FieldOrderIndicator> indicatorMap = new HashMap<>(orderedFields.size()); outer: for (String f1 : orderedFields) { for (int i = 0; i < currentFieldDescList.size(); i++) { FileFieldDesc originField1 = currentFieldDescList.get(i); if (f1.equals(originField1.getFieldName())) { indicatorMap.put(f1, new FieldOrderIndicator(i + 1, originField1)); continue outer; } } indicatorMap.put(f1, null); } return indicatorMap; } /** * 檢測當前文件是按字段先后要求排放好 * * @param currentFieldDescList 現有文件字段排列情況 * @param orderedFields 期望排列的順序列表 * @return true:已排好序,無需再排; false:未按要求排好 */ private boolean checkIfCurrentFileInOrder(List<FileFieldDesc> currentFieldDescList, List<String> orderedFields) { if(orderedFields.size() != currentFieldDescList.size()) { return true; } for (int j = 0; j < orderedFields.size(); j++) { String targetFieldName = orderedFields.get(j); FileFieldDesc possibleFieldDesc = currentFieldDescList.get(j); if(possibleFieldDesc != null && targetFieldName.equals(possibleFieldDesc.getFieldName()) && possibleFieldDesc.getWriteFlag() == 1) { continue; } return false; } return true; } /** * 計算兩個數據流取並集 ( A ∪ B) * * 並將 A/B 標簽位寫到后置位置中, 1代表存在,空代表存在 * 如A存在且B存在,則寫結果為: A,1,1 * 如A存在但B不存在, 則寫結果為: A,1, * 如A不存在但B存在, 則寫結果為: B,,1 * * 當A或B中存在多列時,以第一列為主鍵進行關聯 * 如A為: 111 * B為: 111,,1,1 * 則合並后的結果為: 111,1,,1,1 * * @return 最終寫入的文件行數 */ private long unionTwoBufferStream(JoinFileDescriptor a, JoinFileDescriptor b, OutputStream targetOutputStream) throws IOException { String lineDataLeft; String lineDataRight; // String lineDataLast = null; AtomicLong lineNumCounter = new AtomicLong(0); BufferedReader leftBuffer = a.getReader(); BufferedReader rightBuffer = b.getReader(); lineDataRight = rightBuffer.readLine(); // 主鍵固定在第一列 int idIndex = 1; String leftId = null; String rightId = getIdColumnValueFromLineData(lineDataRight, idIndex); String lastId = null; int cmpV; while ((lineDataLeft = leftBuffer.readLine()) != null) { // 以左表基礎迭代,所以優先檢查右表 leftId = getIdColumnValueFromLineData(lineDataLeft, idIndex); if(lineDataRight != null && (cmpV = leftId.compareTo(rightId)) >= 0) { do { if(rightId.equals(lastId)) { lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); // 合並左右數據 continue; } writeLine(targetOutputStream, joinLineData(cmpV == 0 ? lineDataLeft : null, lineDataRight, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = rightId; lineDataRight = rightBuffer.readLine(); rightId = getIdColumnValueFromLineData( lineDataRight, idIndex); } while (lineDataRight != null && (cmpV = leftId.compareTo(rightId)) >= 0); } // 左右相等時,右表數據已寫成功,直接跳過即可 if(leftId.equals(lastId)) { continue; } writeLine(targetOutputStream, joinLineData(lineDataLeft, null, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = leftId; } // 處理可能剩余的右表數據 while (lineDataRight != null) { rightId = getIdColumnValueFromLineData(lineDataRight, idIndex); if(rightId.equals(lastId)) { lineDataRight = rightBuffer.readLine(); continue; } writeLine(targetOutputStream, joinLineData(null, lineDataRight, a.getFieldInfo(), b.getFieldInfo()), lineNumCounter); lastId = rightId; lineDataRight = rightBuffer.readLine(); } return lineNumCounter.get(); } /** * 依據字段順序合並兩行數據(以左行為先) * * 最后一個字段為本次需要進行追加的字段 * * @param leftLineData 左邊數據 * @param rightLineData 右邊數據 * @param leftFields 左邊字段信息(可能未寫入左邊數據中) * @param rightFields 右邊字段信息(可能未寫入右邊數據中) * @return 合並后的結果 */ private String joinLineData(String leftLineData, String rightLineData, List<FileFieldDesc> leftFields, List<FileFieldDesc> rightFields) { if(StringUtils.isBlank(leftLineData) && StringUtils.isBlank(rightLineData)) { return ""; } int leftEmptyFieldIndex = getFieldEmptyPlaceholderIndex(leftFields); int rightEmptyFieldIndex = getFieldEmptyPlaceholderIndex(rightFields); // 1. 只有右值, 將右值首字段移至行首,其余放右尾部 if(StringUtils.isBlank(leftLineData)) { return joinFieldByRight(rightLineData, leftFields, rightFields, rightEmptyFieldIndex); } // 2. 只有左值 if(StringUtils.isBlank(rightLineData)) { return joinFieldByLeft(leftLineData, leftFields, rightFields, leftEmptyFieldIndex); } // 3. 左右均有部分值 return joinFieldByLeftRight(leftLineData, rightLineData, leftFields, rightFields, leftEmptyFieldIndex, rightEmptyFieldIndex); } /** * 關聯一行僅有右值的數據 * * @param rightLineData 右值數據行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param emptyFieldIndex 空占位的 * @return 合並后的字段,此時全部字段均已填充 */ private String joinFieldByRight(String rightLineData, List<FileFieldDesc> leftFields, List<FileFieldDesc> rightFields, int emptyFieldIndex) { String[] rightCols = StringUtils.splitPreserveAllTokens( rightLineData, CSV_RESULT_FILE_SEPARATOR); if(emptyFieldIndex != -1 && rightCols.length != emptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + rightCols.length + ", 實際未寫:" + (emptyFieldIndex + 1)); } // s1. 填充首列 StringBuilder lineResultBuilder = new StringBuilder(rightCols[0]); // s2. 填充空值左列 for (int i = 0; i < leftFields.size(); i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } // s3. 填充右值有值列 for (int i = 1; i < rightCols.length; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR) .append(rightCols[i]); } // s4. 填充右值空值列, 最末留與當前字段使用 if(rightCols.length < rightFields.size() + 1) { if(emptyFieldIndex != -1) { for (int i = emptyFieldIndex; i < rightFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 右值存在字段位寫1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } return lineResultBuilder.toString(); } /** * 關聯一行僅有右值的數據 * * @param leftLineData 左值數據行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param emptyFieldIndex 空占位的 * @return 合並后的字段,此時全部字段均已填充 */ private String joinFieldByLeft(String leftLineData, List<FileFieldDesc> leftFields, List<FileFieldDesc> rightFields, int emptyFieldIndex) { String[] cols = StringUtils.splitPreserveAllTokens( leftLineData, CSV_RESULT_FILE_SEPARATOR); if(emptyFieldIndex != -1 && cols.length != emptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + cols.length + ", 實際未寫:" + (emptyFieldIndex + 1)); } // s1. 直接保留左值非空值 StringBuilder lineResultBuilder = new StringBuilder(leftLineData); // s2. 填充左值空值 if(cols.length < rightFields.size() + 1) { if(emptyFieldIndex != -1) { for (int i = emptyFieldIndex; i < leftFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } // s3. 填充右值空值 for (int i = 0; i < rightFields.size(); i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } return lineResultBuilder.toString(); } /** * 關聯一行僅有右值的數據 * * @param leftLineData 左值數據行(可能含有空值占位未填充) * @param rightLineData 右值數據行(可能含有空值占位未填充) * @param leftFields 左列字段列表 * @param rightFields 右列字段列表 * @param leftEmptyFieldIndex 空占位的 * @param rightEmptyFieldIndex 空占位的 * @return 合並后的字段,此時全部字段均已填充 */ private String joinFieldByLeftRight(String leftLineData, String rightLineData, List<FileFieldDesc> leftFields, List<FileFieldDesc> rightFields, int leftEmptyFieldIndex, int rightEmptyFieldIndex) { String[] leftCols = StringUtils.splitPreserveAllTokens( leftLineData, CSV_RESULT_FILE_SEPARATOR); if(leftEmptyFieldIndex != -1 && leftCols.length != leftEmptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + leftCols.length + ", 實際未寫:" + (leftEmptyFieldIndex + 1)); } String[] rightCols = StringUtils.splitPreserveAllTokens( rightLineData, CSV_RESULT_FILE_SEPARATOR); if(rightEmptyFieldIndex != -1 && rightCols.length != rightEmptyFieldIndex + 1) { throw new RuntimeException("字段位置不匹配:" + rightCols.length + ", 實際未寫:" + (rightEmptyFieldIndex + 1)); } // s1. 直接保留左值非空值 StringBuilder lineResultBuilder = new StringBuilder(leftLineData); // s2. 填充左值空值, 最后一位留給當前字段 if(leftCols.length < leftFields.size() + 1) { if(leftEmptyFieldIndex != -1) { for (int i = leftEmptyFieldIndex; i < leftFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 左值存在字段位寫1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } // s3. 填充右值非空值,第一列忽略 for (int i = 1; i < rightCols.length; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR) .append(rightCols[i]); } if(rightCols.length < rightFields.size() + 1) { if(rightEmptyFieldIndex != -1) { for (int i = rightEmptyFieldIndex; i < rightFields.size() - 1; i++) { lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR); } } // 右值存在字段位寫1 lineResultBuilder.append(CSV_RESULT_FILE_SEPARATOR).append("1"); } return lineResultBuilder.toString(); } /** * 獲取首個字段未被填充值的位置 * * @param fieldList 所有字段列表 * @return 首個未填充的字段位置 */ private int getFieldEmptyPlaceholderIndex(List<FileFieldDesc> fieldList) { for (int i = 0; i < fieldList.size(); i++) { FileFieldDesc f1 = fieldList.get(i); if(f1.getWriteFlag() == 0) { return i; } } return -1; } /** * 從一行數據中讀取id列字段值 * * @param lineData 該行內容 * @param idIndex id列所在下標,從1開始計算 * @return id的值 */ private String getIdColumnValueFromLineData(String lineData, int idIndex) { if(lineData == null) { return null; } if(idIndex <= 0) { log.warn("id行下標給定錯誤:{}," + "返回整行,請注意排查原因", idIndex); return lineData; } // 固定使用','分隔多列數據 String[] cols = StringUtils.splitPreserveAllTokens(lineData, CSV_RESULT_FILE_SEPARATOR); // 列超限,返回空 if(idIndex > cols.length) { log.warn("id列下標超限,請排查:{} -> {}", lineData, idIndex); return ""; } return cols[idIndex - 1]; } /** * 寫單行數據到輸出流(帶計數器) */ private void writeLine(OutputStream outputStream, String lineData, AtomicLong counter) throws IOException { if(counter.get() > 0) { outputStream.write("\n".getBytes()); } outputStream.write(lineData.getBytes()); counter.incrementAndGet(); } /** * 字段序列號指示器 */ private class FieldOrderIndicator { int fieldIndex; FileFieldDesc fieldDesc; FieldOrderIndicator(int fieldIndex, FileFieldDesc fieldDesc) { this.fieldIndex = fieldIndex; this.fieldDesc = fieldDesc; } } /** * 文件join任務分解類 */ private static class FileJoinFJTask extends RecursiveTask<JoinFileDescriptor> { private static final FileJoiner joiner = new FileJoiner(); private List<JoinFileDescriptor> fileList; public FileJoinFJTask(List<JoinFileDescriptor> fileList) { this.fileList = fileList; } @Override public JoinFileDescriptor compute() { int len = fileList.size(); if(len > 2) { int mid = len / 2; FileJoinFJTask subTask1 = new FileJoinFJTask(fileList.subList(0, mid)); subTask1.fork(); FileJoinFJTask subTask2 = new FileJoinFJTask(fileList.subList(mid, len)); subTask2.fork(); JoinFileDescriptor m1 = subTask1.join(); JoinFileDescriptor m2 = subTask2.join(); return joinTwoFile(m1, m2); } if(len == 2) { return joinTwoFile(fileList.get(0), fileList.get(1)); } // len == 1 if(len == 1) { return fileList.get(0); } throw new RuntimeException("待合並的文件數為0?->" + fileList.size()); } /** * 合並兩個有序文件 * * @param m1 文件1 * @param m2 文件2 * @return 合並后的文件 */ private JoinFileDescriptor joinTwoFile(JoinFileDescriptor m1, JoinFileDescriptor m2) { try { // System.out.println("join file1:" + m1.getPath().substring(82) + ", fields:" + m1.getFieldInfo() // + ", file2:" + m2.getPath().substring(82) + ", fields:" + m2.getFieldInfo()); return joiner.joinById(m1, m2); } catch (IOException e) { log.error("合並文件失敗,{}, {}", m1, m2, e); throw new RuntimeException(e); } } } }
總體算法框架就是這樣了,外部調用時,可以串行計算調用 joinById, 自行合並。也可以直接joinMultiFile, 內部進行並行計算了。然后,最后再可以按照自行要求,做順序固化。此處並行計算的方案,正則上篇中講到的fork/join.
2.2. 幾個輔助類
如上計算過程中,需要使用一些輔助型數據結構,以表達清楚過程。以下為輔助類信息:
// 1. JoinFileDescriptor import java.io.BufferedReader; import java.util.List; /** * 功能描述: 需要關聯join的文件描述類 * */ public class JoinFileDescriptor { /** * 文件路徑 */ private String path; /** * 文件行數 */ private long lineCnt; /** * 字段名列表,按先后排列寫入文件 */ private List<FileFieldDesc> fieldInfo; /** * 合並深度,未合並時為0 */ private int deep; public JoinFileDescriptor() { } public JoinFileDescriptor(String path, int lineCnt, List<FileFieldDesc> fieldInfo) { this.path = path; this.lineCnt = lineCnt; this.fieldInfo = fieldInfo; } private transient BufferedReader reader; public BufferedReader getReader() { return reader; } public void setReader(BufferedReader reader) { this.reader = reader; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public long getLineCnt() { return lineCnt; } public void setLineCnt(long lineCnt) { this.lineCnt = lineCnt; } public List<FileFieldDesc> getFieldInfo() { return fieldInfo; } public void setFieldInfo(List<FileFieldDesc> fieldInfo) { this.fieldInfo = fieldInfo; } public int getDeep() { return deep; } public void incrDeep() { this.deep++; } @Override public String toString() { return "JoinFileDescriptor{" + "path='" + path + '\'' + ", lineCnt=" + lineCnt + ", fieldInfo=" + fieldInfo + ", deep=" + deep + '}'; } } // 2. FileFieldDesc /** * 功能描述: 文件字段描述 * */ public class FileFieldDesc { /** * 字段名列表,按先后排列寫入文件 */ private String fieldName; /** * 字段是否被真實寫入文件, * <p> * 1:已寫入,0:未寫入(序號排在前面的字段,需要后字段合並時同步寫入) */ private int writeFlag; private FileFieldDesc(String fieldName) { this.fieldName = fieldName; } public static FileFieldDesc newField(String fieldName) { return new FileFieldDesc(fieldName); } public static FileFieldDesc newField(String fieldName, int writeFlag) { FileFieldDesc f = new FileFieldDesc(fieldName); f.setWriteFlag(writeFlag); return f; } public String getFieldName() { return fieldName; } public void setFieldName(String fieldName) { this.fieldName = fieldName; } public int getWriteFlag() { return writeFlag; } public void setWriteFlag(int writeFlag) { this.writeFlag = writeFlag; } public void writeOk() { writeFlag = 1; } @Override public String toString() { return "FileFieldDesc{" + "fieldName='" + fieldName + '\'' + ", writeFlag=" + writeFlag + '}'; } }
還是很簡單的吧。
2.3. 單元測試
沒有測試不算完成,一個好的測試應該包含所有可能的計算情況,結果。比如幾個文件合並,合並后有幾行,哪幾行的數據應該如何等等。害,那些留給使用者自行完善吧。簡單測試如下。
/** * 功能描述: 文件合並工具類測試 * */ public class FileJoinerTest { @Before public void setup() { // 避免log4j解析報錯 System.setProperty("catalina.home", "/tmp"); } @Test public void testJoinById() throws Exception { long startTime = System.currentTimeMillis(); List<String> resultLines; String classpath = this.getClass().getResource("/").getPath(); JoinFileDescriptor file1 = new JoinFileDescriptor( classpath + "file/t0/crowd_a.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_a"))); JoinFileDescriptor file2 = new JoinFileDescriptor( classpath + "file/t0/crowd_b.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_b"))); FileJoiner joiner = new FileJoiner(); JoinFileDescriptor fileMerged = joiner.joinById(file1, file2); resultLines = FileUtils.readLines(new File(fileMerged.getPath()), "utf-8"); System.out.println("result:" + fileMerged); Assert.assertEquals("合並結果行數不正確", 6L, fileMerged.getLineCnt()); Assert.assertEquals("道行合並結果不正確", "6001,1,1", resultLines.get(0)); Assert.assertEquals("道行合並結果不正確", "6011,,1", resultLines.get(5)); JoinFileDescriptor file3 = new JoinFileDescriptor( classpath + "file/t0/crowd_c.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_c"))); fileMerged = joiner.joinById(fileMerged, file3); System.out.println("result3:" + fileMerged); JoinFileDescriptor file4 = new JoinFileDescriptor( classpath + "file/t0/crowd_d.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_d"))); fileMerged = joiner.joinById(fileMerged, file4); System.out.println("result4:" + fileMerged); JoinFileDescriptor file6 = new JoinFileDescriptor( classpath + "file/t0/crowd_f.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_f"))); fileMerged = joiner.joinById(fileMerged, file6); System.out.println("result4:" + fileMerged); JoinFileDescriptor file5 = new JoinFileDescriptor( classpath + "file/t0/crowd_e.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_e"))); fileMerged = joiner.joinById(fileMerged, file5); System.out.println("result4:" + fileMerged); fileMerged = joiner.rewriteFileBySelectField(fileMerged, Arrays.asList("crowd_a", "crowd_b", "crowd_c", "crowd_d", "crowd_e", "crowd_f")); System.out.println("result4:" + fileMerged); System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms"); } @Test public void testJoinByIdUseForkJoin() throws Exception { long startTime = System.currentTimeMillis(); List<JoinFileDescriptor> sortedFileList = new ArrayList<>(); String classpath = this.getClass().getResource("/").getPath(); JoinFileDescriptor file1 = new JoinFileDescriptor( classpath + "file/t0/crowd_a.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_a"))); sortedFileList.add(file1); JoinFileDescriptor file2 = new JoinFileDescriptor( classpath + "file/t0/crowd_b.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_b"))); sortedFileList.add(file2); JoinFileDescriptor file3 = new JoinFileDescriptor( classpath + "file/t0/crowd_c.csv", 5, Collections.singletonList(FileFieldDesc.newField("crowd_c"))); sortedFileList.add(file3); JoinFileDescriptor file4 = new JoinFileDescriptor( classpath + "file/t0/crowd_d.csv", 4, Collections.singletonList(FileFieldDesc.newField("crowd_d"))); sortedFileList.add(file4); JoinFileDescriptor file5 = new JoinFileDescriptor( classpath + "file/t0/crowd_e.csv", 10, Collections.singletonList(FileFieldDesc.newField("crowd_e"))); sortedFileList.add(file5); JoinFileDescriptor file6 = new JoinFileDescriptor( classpath + "file/t0/crowd_f.csv", 10, Collections.singletonList(FileFieldDesc.newField("crowd_f"))); sortedFileList.add(file6); Collections.shuffle(sortedFileList); FileJoiner joiner = new FileJoiner(); JoinFileDescriptor fileMerged = joiner.joinMultiFile(sortedFileList, Arrays.asList("crowd_a", "crowd_b", "crowd_c", "crowd_d", "crowd_e", "crowd_f")); System.out.println("fileMerged:" + fileMerged); System.out.println("costTime:" + (System.currentTimeMillis() - startTime) + "ms"); } }
下面這個並行計算沒有斷言,一是懶得加,二是這種確實也復雜,這也是和分布系統排查問題難表暗合之意。另外值得一提的是,為了驗證代碼的穩定性,單測中添加了一個文件的隨機打亂,從而保證了任意順序都可拿到最終結果。而在實際應用中,可以按照文件行數大小排序,使用小文件與小文件合,大文件與大文件合,從而避免許多空行讀而浪費性能。這也是自己實現的好處,想起來哪里想調整下,立即橫刀立馬。
下面給幾個樣例文件:
// crowd_a.csv 6001 6002 6003 6009 // crowd_b.csv 6001 6002 6003 6006 6011 // crowd_c.csv 6001 6003 6006 6009 ... e,f,g ...
以上工具類,可以看作是對前面所示sql語義的同等實現,雖不能與官方同日而語,但也有一定的應用場景,只待各位發現。供諸君參考。(誰知道呢,也許你用MR更簡單更高效)