簡介
很久之前就想寫這篇文章了,主要是介紹一下我做數據同步的過程中遇到的一些有意思的內容,和提升效率的過程。
當前在數據處理的過程中,數據同步如同血液一般充滿全過程,如圖:

數據同步開源產品對比:
DataX,是淘寶的開源項目,可惜不支持Postgresql
Sqoop,Apache開源項目,同步過程中字段需要嚴格一致,不方便擴展,不易於二次開發
整體設計思路:
使用生產者消費者模型,中間使用內存,數據不落地,直接插入目標數據

優化過程:
1、插入數據部分:
首先生產者通過Jdbc獲取源數據內容,放入固定大小的緩存隊列,同時消費者不斷的從緩存讀取數據,根據不同的數據類型分別讀取出來,並逐條插入目標數據庫。
速度每秒300條,每分鍾1.8W條。
這樣做表面上看起來非常美好,流水式的處理,來一條處理一下,可是發現插入的速度遠遠趕不上讀取的速度,所以為了提升寫入的速度,決定采用批量處理的方法,事例代碼:
@Override public Boolean call() { long beginTime = System.currentTimeMillis(); this.isRunning.set(true); try { cyclicBarrier.await(); int lineNum = 0; int commitCount = 0; // 緩存數量 List<RowData> tmpRowDataList = new ArrayList<RowData>();// 緩存數組 while (this.isGetDataRunning.get() || this.queue.size() > 0) { // 從隊列獲取一條數據 RowData rowData = this.queue.poll(1, TimeUnit.SECONDS); if (rowData == null) { logger.info("this.isGetDataRunning:" + this.isGetDataRunning + ";this.queue.size():" + this.queue.size()); Thread.sleep(10000); continue; } // 添加到緩存數組 tmpRowDataList.add(rowData); lineNum++; commitCount++; if (commitCount == SyncConstant.INSERT_SIZE) { this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear(); // 清空緩存 commitCount = 0; } if (lineNum % SyncConstant.LOGGER_SIZE == 0) { logger.info(" commit line: " + lineNum + "; queue size: " + queue.size()); } } this.insertContractAch(tmpRowDataList); // 批量寫入 tmpRowDataList.clear();// 清空緩存 logger.info(" commit line end: " + lineNum); } catch (Exception e) { logger.error(" submit data error" , e); } finally { this.isRunning.set(false); } logger.info(String.format("SubmitDataToDatabase used %s second times", (System.currentTimeMillis() - beginTime) / 1000.00)); return true; } /** * 批量插入數據 * * @param rowDatas * @return */ public int insertContractAch(List<RowData> rowDatas) { final List<RowData> tmpObjects = rowDatas; String sql = SqlService.createInsertPreparedSql(tableMetaData); // 獲取sql try { int[] index = this.jdbcTemplate.batchUpdate(sql, new PreparedStatementSetter(tmpObjects, this.columnMetaDataList)); return index.length; } catch (Exception e) { logger.error(" insertContractAch error: " , e); } return 0; } /** * 處理批量插入的回調類 */ private class PreparedStatementSetter implements BatchPreparedStatementSetter { private List<RowData> rowDatas; private List<ColumnMetaData> columnMetaDataList; /** * 通過構造函數把要插入的數據傳遞進來處理 */ public PreparedStatementSetter(List<RowData> rowDatas, List<ColumnMetaData> columnList) { this.rowDatas = rowDatas; this.columnMetaDataList = columnList; } @Override public void setValues(PreparedStatement ps, int i) throws SQLException { RowData rowData = this.rowDatas.get(i); for (int j = 0; j < rowData.getColumnObjects().length; j++) { // 類型轉換 try { ColumnAdapterService.setParameterValue(ps, j + 1, rowData.getColumnObjects()[j], this.columnMetaDataList.get(j).getType()); } catch (Exception e) { ps.setObject(j + 1, null); } } } }
咱們不是需要講解代碼,所以這里截取了代碼片段,全部的代碼github上有,感興趣的同學可以看看。PreparedStatement的好處,可以參考文章:http://www.cnblogs.com/liqiu/p/3825544.html
由於增加批量插入的功能,終於速度提升到每秒1000條
2、多線程優化
每秒1000條,速度依然不理想,特別是寫的速度跟不上讀取的速度,隊列是滿的,如圖:

所以只能提升消費者的數量,采用了多消費者的模式:

速度提升到每秒3000條。
3、升級讀取方式
這時候觀察,隨着消費者的增加,觀察緩存隊列經常有空的情況,也就是說生產跟不上消費者速度,如果增加生產者的線程,那么也會增加程序的復雜性,因為勢必要將讀取的數據進行分割。所以采用Pgdump的方式直接獲取數據(並不是所有情況都適用,比如數據中有特殊的分隔符與設定的分隔符一樣,或者有分號,單引號之類的)
代碼片段如下:
/** * 將數據放入緩存隊列 */ public void putCopyData() { DataSourceMetaData dataSource = dataSourceService.getDataSource(syncOptions.getSrcDataSourceName()); String copyCommand = this.getCopyCommand(dataSource, querySql); //獲取copy命令 ShellExecuter.execute(copyCommand, queue,columnMetaDatas); } /** * 執行copy的shell命令 * @param dataSource * @param sql * @return */ public String getCopyCommand(DataSourceMetaData dataSource, String sql){ String host = dataSource.getIp(); String user = dataSource.getUserName(); String dataBaseName = dataSource.getDatabaseName(); //String psqlPath = "/Library/PostgreSQL/9.3/bin/psql"; String psqlPath = "/opt/pg93/bin/psql"; String execCopy = psqlPath + " -h " + host + " -U " + user + " " + dataBaseName +" -c \"COPY (" + sql + ") TO STDOUT WITH DELIMITER E'"+ HiveDivideConstant.COPY_COLUMN_DIVIDE+"' CSV NULL AS E'NULL'\" "; // 執行copy命令 LOGGER.info(execCopy); return execCopy; }
意思就是通過執行一個Shell程序,獲取數據,然后讀取進程的輸出流,不斷寫入緩存。這樣生產者的問題基本都解決了,速度完全取決於消費者寫入數據庫的速度了。下面是執行Shell的Java方法代碼:
public static int execute(String shellPath, LinkedBlockingQueue<RowData> queue, List<ColumnMetaData> columnMetaDatas) { int success = -1; Process pid = null; String[] cmd; try { cmd = new String[]{"/bin/sh", "-c", shellPath}; // 執行Shell命令 pid = Runtime.getRuntime().exec(cmd); if (pid != null) { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pid.getInputStream()), SyncConstant.SHELL_STREAM_BUFFER_SIZE); try { String line; while ((line = bufferedReader.readLine()) != null) { // LOGGER.info(String.format("shell info output [%s]", line)); String[] columnObjects = line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1); if (columnObjects.length != columnMetaDatas.size()) { LOGGER.error(" 待同步的表有特殊字符,不能使用copy [{}] ", line); throw new RuntimeException("待同步的表有特殊字符,不能使用copy " + line); } RowData rowData = new RowData(line.split(HiveDivideConstant.COPY_COLUMN_DIVIDE.toString(), -1)); queue.put(rowData); } } catch (Exception ioe) { LOGGER.error(" execute shell error", ioe); } finally { try { if (bufferedReader != null) { bufferedReader.close(); } } catch (Exception e) { LOGGER.error("execute shell, get system.out error", e); } } success = pid.waitFor(); if (success != 0) { LOGGER.error("execute shell error "); } } else { LOGGER.error("there is not pid "); } } catch (Exception ioe) { LOGGER.error("execute shell error", ioe); } finally { if (null != pid) { try { //關閉錯誤輸出流 pid.getErrorStream().close(); } catch (IOException e) { LOGGER.error("close error stream of process fail. ", e); } finally { try { //關閉標准輸入流 pid.getInputStream().close(); } catch (IOException e) { LOGGER.error("close input stream of process fail.", e); } finally { try { pid.getOutputStream().close(); } catch (IOException e) { LOGGER.error(String.format("close output stream of process fail.", e)); } } } } } return success; }
4、內存優化
在上線一段時間之后,發現兩個問題:1、使用Jdbc方式獲取數據,如果這個數據表比較大,那么獲取第一條數據的速度特別慢;2、這個進程還會占用非常大的內存,並且GC不掉。分析原因,是Postgresql的Jdbc獲取數據的時候,會一次將所有數據放入到內存,如果同步的數據表非常大,那么甚至會將內存撐爆。

那么優化的方法是設置使Jdbc不是一次全部將數據拿到內存,而是批次獲取,代碼如下:
con.setAutoCommit(false); //並不是所有數據庫都適用,比如hive就不支持,orcle不需要 stmt.setFetchSize(10000); //每次獲取1萬條記錄
整體設計方案:

現在這個項目已經開源,代碼放在:https://github.com/autumn-star/synchronous
