背景
寫這篇文章主要是介紹一下我做數據倉庫ETL同步的過程中遇到的一些有意思的內容和提升程序運行效率的過程。
關系型數據庫:
項目初期:游戲的運營數據比較輕量,相關的運營數據是通過Java后台程序聚合查詢關系型數據庫MySQL完全可以應付,系統通過定時任務每日統計相關數據,等待運營人員查詢即可。
項目中后期:隨着開服數量增多,玩家數量越來越多,數據庫的數據量越來越大,運營后台查詢效率越來越低。對於普通的關系型來說,如MySQL,當單表存儲記錄數超過500萬條后,數據庫查詢性能將變得極為緩慢,而往往我們都不會只做單表查詢,還有多表join。這里假如有100個游戲服,每個服有20張表,而每個表有500W數據,那么:
總數據量 = 100 * 20 * 500W = 10億 按當時的庫表結構,換算成磁盤空間,約為100G左右
我的天吶,現在沒有單機的內存能同一時間載入100G的數據
https://www.zhihu.com/question/19719997
所以,考慮到這一點,Hive被提出來解決難題!
數據倉庫
二、項目架構設計
在這里先說下初期項目架構的探索,因為數據流向,其實最終就是從MYSQL--------->Hive中,我使用的是Jdbc方式。為什么不使用下列工具呢?
- Sqoop, 因為該游戲每個服有將近80張表,然后又有很多服,以后還會更多,而每個服的庫表數據結構其實是完全一樣的,只是IP地址不一樣,使用Sqoop的話,將會需要維護越來越多的腳本,再者Sqoop沒法處理原始數據中一些帶有Hive表定義的行列分隔符
- DataX 阿里開源的數據同步中間件,沒做過詳細研究
1、全局緩存隊列
使用生產者消費者模型,中間使用內存,數據落地成txt
首先生產者通過Jdbc獲取源數據內容,放入固定大小的緩存隊列,同時消費者不斷的從緩存讀取數據,根據不同的數據類型分別讀取出來,並逐條寫入相應的txt文件。
速度每秒約8000條。
這樣做表面上看起來非常美好,流水式的處理,來一條處理一下,可是發現消費的速度遠遠趕不上生產的速度,生產出來的數據會堆積在緩存隊列里面,假如隊列不固定長度的話,這時候還會大量消耗內存,所以為了提升寫入的速度,決定采用下一種方案
2、每一張表一個緩存隊列及writer接口
每張表各自起一個生產者消費者模型,消費者啟動時初始化相應的writer接口,架構設計如下:
table1的生產者通過Jdbc獲取源數據內容,放入table自帶的固定大小的緩存隊列,同時table1相應的消費者不斷的從緩存讀取數據,根據不同的數據類型分別讀取出來,並逐條寫入相應的txt文件。
速度每秒約2W條。
這樣生產者線程可以並發的進行,通過控制生產者線程的數量,可以大大提高處理的效率, 項目關鍵代碼如下:
1)線程池
/*** * * * @描述 任務線程池 */ public class DumpExecuteService { private static ExecutorService dumpServerWorkerService; // 游戲服任務 private static ExecutorService dumpTableWorkerService; // 表數據任務 private static ExecutorService dumpReaderWorkerService; // 讀取數據任務 private static ExecutorService dumpWriterWorkerService; // 寫數據結果任務 /*** * 初始化任務線程池 * @param concurrencyDBCount 並發數量 */ public synchronized static void startup(int concurrencyDBCount) { if (dumpServerWorkerService != null) return; if (concurrencyDBCount > 2) concurrencyDBCount = 2; // 最多支持兩個數據庫任務並發執行 if (concurrencyDBCount < 1) concurrencyDBCount = 1; dumpServerWorkerService = Executors.newFixedThreadPool(concurrencyDBCount, new NamedThreadFactory( "DumpExecuteService.dumpServerWorkerService" + System.currentTimeMillis())); dumpTableWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpTableWorkerService" + System.currentTimeMillis())); dumpWriterWorkerService = Executors.newFixedThreadPool(8, new NamedThreadFactory("DumpExecuteService.dumpWriterWorkerService" + System.currentTimeMillis())); dumpReaderWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpReaderWorkerService" + System.currentTimeMillis())); } public static Future<Integer> submitDumpServerWorker(DumpServerWorkerLogic worker) { return dumpServerWorkerService.submit(worker); } public static Future<Integer> submitDumpWriteWorker(DumpWriteWorkerLogic worker) { return dumpWriterWorkerService.submit(worker); } public static Future<Integer> submitDumpReadWorker(DumpReadWorkerLogic worker) { return dumpReaderWorkerService.submit(worker); } public static Future<Integer> submitDumpTableWorker(DumpTableWorkerLogic worker) { return dumpTableWorkerService.submit(worker); } /*** * 關閉線程池 */ public synchronized static void shutdown() { //執行線程池關閉... } }
說明:該類定義4個線程池,分別用於執行不同的任務
2)游戲服任務線程池
/** * 1) 獲取 游戲服log庫數據庫連接
2) 依次處理單張表 */ public class DumpServerWorkerLogic extends AbstractLogic implements Callable<Integer> { private static Logger logger = LoggerFactory.getLogger(DumpServerWorkerLogic.class); private final ServerPO server;// 數據庫 private final String startDate;// 開始時間 private SourceType sourceType;// 數據來源類型 private Map<String, Integer> resultDBMap;// 表記錄計數 private GameType gameType; public DumpServerWorkerLogic(ServerPO server, String startDate, SourceType sourceType, Map<String, Integer> resultDBMap, GameType gameType) { CheckUtil.checkNotNull("DumpServerWorkerLogic.server", server); CheckUtil.checkNotNull("DumpServerWorkerLogic.startDate", startDate); CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType); CheckUtil.checkNotNull("DumpServerWorkerLogic.resultDBMap", resultDBMap); CheckUtil.checkNotNull("DumpServerWorkerLogic.gameType", gameType); this.server = server; this.startDate = startDate; this.sourceType = sourceType; this.resultDBMap = resultDBMap; this.gameType = gameType; } @Override public Integer call() { // 獲取連接, 並取得該庫的所有表 Connection conn = null; try { conn = JdbcUtils.getDbConnection(server); } catch (Exception e) { throw new GameRuntimeException(e.getMessage(), e); } List<String> tableNames = null; DumpDbInfoBO dumpDbInfoBO = DumpConfig.getDumpDbInfoBO(); int totalRecordCount = 0; try { switch (this.sourceType) { case GAME_LOG: tableNames = JdbcUtils.getAllTableNames(conn); break; case INFOCENTER: tableNames = dumpDbInfoBO.getIncludeInfoTables(); tableNames.add("pay_action"); break; case EVENT_LOG: tableNames = new ArrayList<String>(); Date date = DateTimeUtil.string2Date(startDate, "yyyy-MM-dd"); String sdate = DateTimeUtil.date2String(date, "yyyyMMdd"); String smonth = DateTimeUtil.date2String(date, "yyyyMM"); tableNames.add("log_device_startup" + "_" + smonth); tableNames.add("log_device" + "_" + sdate); break; } // 遍歷table for (String tableName : tableNames) { // 過濾 if (dumpDbInfoBO.getExcludeTables().contains(tableName)) continue; DumpTableWorkerLogic tableTask = new DumpTableWorkerLogic(conn, server, tableName, startDate, resultDBMap, gameType, sourceType); Future<Integer> tableFuture = DumpExecuteService.submitDumpTableWorker(tableTask); int count = tableFuture.get(); totalRecordCount += count; logger.info(String.format("DumpServerWorkerLogic %s-%s.%s be done", startDate, server.getLogDbName(), tableName)); } return totalRecordCount; } catch (Exception e) { throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage()); } finally { JdbcUtils.closeConnection(conn); } } }
3)表處理任務,一個表一個
/*** * * * @描述 創建一個表查詢結果寫任務 (一個表一個) */ public class DumpTableWorkerLogic implements Callable<Integer> { private static Logger logger = LoggerFactory.getLogger(DumpTableWorkerLogic.class); private final String tableName; private final Connection conn; private ServerPO server; private String startDate; private Map<String, Integer> resultDBMap;// 表記錄計數 private GameType gameType; private SourceType sourceType;// 數據來源類型 public DumpTableWorkerLogic(Connection conn, ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap, GameType gameType, SourceType sourceType) { CheckUtil.checkNotNull("DumpTableWorkerLogic.conn", conn); CheckUtil.checkNotNull("DumpTableWorkerLogic.tableName", tableName); CheckUtil.checkNotNull("DumpTableWorkerLogic.server", server); CheckUtil.checkNotNull("DumpTableWorkerLogic.startDate", startDate); CheckUtil.checkNotNull("DumpTableWorkerLogic.resultDBMap", resultDBMap); CheckUtil.checkNotNull("DumpTableWorkerLogic.gameType", gameType); CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType); this.conn = conn; this.tableName = tableName; this.server = server; this.startDate = startDate; this.resultDBMap = resultDBMap; this.gameType = gameType; this.sourceType = sourceType; logger.info("DumpTableWorkerLogic[{}] Reg", tableName); } @Override public Integer call() { logger.info("DumpTableWorkerLogic[{}] Start", tableName); // 寫檢查結果任務 DumpWriteWorkerLogic writerWorker = new DumpWriteWorkerLogic(server, tableName, startDate, resultDBMap, gameType, sourceType); Future<Integer> writeFuture = DumpExecuteService.submitDumpWriteWorker(writerWorker); logger.info("DumpTableWorkerLogic[{}] writer={}", tableName); // 數據查詢任務 DumpReadWorkerLogic readerWorker = new DumpReadWorkerLogic(conn, tableName, writerWorker, startDate); DumpExecuteService.submitDumpReadWorker(readerWorker); logger.info("DumpTableWorkerLogic[{}] reader={}", tableName); try { int writeCount = writeFuture.get(); logger.info("DumpTableWorkerLogic[{}] ---" + startDate + "---" + server.getId() + "---" + tableName + "---導出數據條數---" + writeCount); return writeCount; } catch (Exception e) { throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage()); } } }
4)單表讀取任務線程
/*** * mysql讀取數據任務 * */ public class DumpReadWorkerLogic implements Callable<Integer> { private static Logger logger = LoggerFactory.getLogger(DumpReadWorkerLogic.class); private String tableName; private final Connection conn; private DumpWriteWorkerLogic writerWorker; // 寫結果數據任務 private String startDate;// 開始導出日期 private static final int LIMIT = 50000;// 限制sql一次讀出條數 public DumpReadWorkerLogic(Connection conn, String tableName, DumpWriteWorkerLogic writerWorker, String startDate) { CheckUtil.checkNotNull("MysqlDataReadWorker.conn", conn); CheckUtil.checkNotNull("MysqlDataReadWorker.tableName", tableName); CheckUtil.checkNotNull("MysqlDataReadWorker.startDate", startDate); this.conn = conn; this.tableName = tableName; this.writerWorker = writerWorker; this.startDate = startDate; logger.info("DumpReadWorkerLogic Reg. tableName={}", this.tableName); } @Override public Integer call() { try { List<Map<String, Object>> result = JdbcUtils.queryForList(conn, "show full fields from " + tableName); int index = 0; String querySql = ""; int totalCount = 0; while (true) { int offset = index * LIMIT; querySql = DumpLogic.getTableQuerySql(result, tableName, true, startDate) + " limit " + offset + "," + LIMIT; int row = DumpLogic.query(conn, querySql, writerWorker); totalCount += row; logger.info("tableName=" + tableName + ", offset=" + offset + ", index=" + index + ", row=" + row + ", limit=" + LIMIT); if (row < LIMIT) break; index++; } writerWorker.prepareClose(); logger.info(startDate + "---" + tableName + "---Read.End"); return totalCount; } catch (Exception e) { throw new GameRuntimeException(e, "MysqlDataReadWorker fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage()); } } }
5)單表寫入任務線程
/*** * * * @描述 mysql數據導出任務 */ public class DumpWriteWorkerLogic implements Callable<Integer> { private static final Logger logger = LoggerFactory.getLogger(DumpWriteWorkerLogic.class); private String tableName;// 表名 private AtomicBoolean alive; // 線程是否活着 private BufferedWriter writer; private ArrayBlockingQueue<String> queue; // 消息隊列 private ServerPO server;// 服務器 private String startDate;// 開始時間 private Map<String, Integer> resultDBMap;// 當天某服某表數量記錄 private GameType gameType; private SourceType sourceType;// 數據來源類型 public DumpWriteWorkerLogic(ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap, GameType gameType, SourceType sourceType) { CheckUtil.checkNotNull("DumpWriteWorkerLogic.tableName", tableName); CheckUtil.checkNotNull("DumpWriteWorkerLogic.server", server); CheckUtil.checkNotNull("DumpWriteWorkerLogic.startDate", startDate); CheckUtil.checkNotNull("DumpWriteWorkerLogic.resultDBMap", resultDBMap); CheckUtil.checkNotNull("DumpWriteWorkerLogic.gameType", gameType); CheckUtil.checkNotNull("DumpWriteWorkerLogic.sourceType", sourceType); this.tableName = tableName; this.server = server; this.startDate = startDate; this.queue = new ArrayBlockingQueue<>(65536); this.alive = new AtomicBoolean(true); this.gameType = gameType; this.sourceType = sourceType; this.writer = createWriter(); this.resultDBMap = resultDBMap; logger.info("DumpWriteWorkerLogic Reg. tableName={}", this.tableName); } /*** * 創建writer, 若文件不存在,會新建文件 * * @param serverId * @return */ private BufferedWriter createWriter() { try { File toFile = FileUtils.getFilenameOfDumpTable(sourceType, tableName, startDate, gameType, ".txt"); if (!toFile.exists()) { FileUtils.createFile(sourceType, tableName, startDate, gameType); } return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(toFile, true), Charsets.UTF_8), 5 * 1024 * 1024); } catch (Exception e) { throw new GameRuntimeException(e, "DumpWriteWorkerLogic createWriter fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage()); } } /*** * 寫入文件 * * @param line * 一條記錄 */ private void writeToFile(String line) { try { this.writer.write(line + "\n"); } catch (Exception e) { throw new GameRuntimeException(e, "DumpWriteWorkerLogic writeToFile fail. errorMsg={%s} ", e.getMessage()); } } /** * 記錄數據到消息隊列; 如果消息隊列滿了, 會阻塞直到可以put為止 * * @param result */ public void putToWriterQueue(String line) { CheckUtil.checkNotNull("DumpWriteWorkerLogic putToWriterQueue", line); try { queue.put(line); } catch (InterruptedException e) { throw new GameRuntimeException(e, "DumpWriteWorkerLogic putToWriterQueue fail. errorMsg={%s} ", e.getMessage()); } } /** * 准備關閉 (通知一下"需要處理的用戶數據都處理完畢了"; task 寫完數據, 就可以完畢了) */ public void prepareClose() { alive.set(false); } @Override public Integer call() { logger.info("DumpWriteWorkerLogic Start. tableName={}", this.tableName); try { int totalCount = 0; while (alive.get() || !queue.isEmpty()) { List<String> dataList = new ArrayList<String>(); queue.drainTo(dataList); int count = processDataList(dataList); totalCount += count; } logger.info("DumpWriteWorkerLogic ---" + startDate + "---" + tableName + "---Writer.End"); return totalCount; } catch (Exception exp) { throw new GameRuntimeException(exp, "DumpWriteWorkerLogic call() fail. errorMsg={%s} ", exp.getMessage()); } finally { FileUtil.close(this.writer); } } /*** * 處理數據:寫入本地文件及map * * @param dataList * 數據集合 * @return */ private int processDataList(List<String> dataList) { int totalCount = 0; // 所有記錄 String key = server.getId() + "#" + tableName + "#" + sourceType.getIndex(); if (dataList != null && dataList.size() > 0) { for (String line : dataList) { // 按行寫入文件 writeToFile(line); // 記錄到result_data_record_count if (resultDBMap.get(key) != null) { resultDBMap.put(key, resultDBMap.get(key) + 1); } else { resultDBMap.put(key, 1); } totalCount++; } } return totalCount; } }
內存優化
1、使用Jdbc方式獲取數據,如果這個數據表比較大,那么獲取數據的速度特別慢;
2、這個進程還會占用非常大的內存,並且GC不掉。分析原因,Jdbc獲取數據的時候,會一次將所有數據放入到內存,如果同步的數據表非常大,那么甚至會將內存撐爆。
那么優化的方法是讓Jdbc不是一次全部將數據拿到內存,而是分頁獲取,每次最大limit數設置為50000,請參考read線程。
經過這種架構優化后,5000W數據大約花費40min可完成導出
說明:
因為本文只是記錄項目的設計過程,詳細的代碼后面會開源。