一、由於具有多張寬表且字段較多,每個寬表數據大概為4000萬條,根據業務邏輯拼接別名,並每張寬表的固定字段進行left join 拼接SQL。這樣就能根據每個寬表的主列,根據每個寬表的不同字段關聯出一張新的集合。由於下來要進行分頁查詢,如果要使用SparkSQL進行分頁查詢,需要增加序號列,那么就在剛才的Sql之前增加一句 create table tableName as SELECT ROW_NUMBER() OVER() as id,* from (拼接的SQL) 就可創建一張帶自增序列的,業務需要字段的幾張寬表的關聯集合,方便下來分頁。
for(int i=0;i<ColumnNames.size();i++){ SiCustomerLabelInfoModel Column = ColumnNames.get(i); List<CiMdaSysTable> ciMdaSysTable = ciCustomerJDao.getMdaSysTableName(Column.getColumnName()); String alias = "t_" + ciMdaSysTable.get(0).getTableId(); String aliasColumn = alias + "." + Column.getColumnName(); String aliasTable = ciMdaSysTable.get(0).getTableName() +" "+ alias; if(mainTable == null){ mainTable = aliasTable; } if(ciMdaSysTable.get(0).getUpdateCycle() == 1){ mainTable = aliasTable; } ColumnNameList.add(aliasColumn); tableNameList.add(aliasTable); } String[] keyAlias = mainTable.split(" "); String mainKeyColumn = keyAlias[1] + "." + keyColumn; selectResult.append("select ").append(mainKeyColumn); if(StringUtil.isNotEmpty(mainTable)){ fromTableName.append(" from ").append(mainTable); } Iterator<String> table = tableNameList.iterator(); while(table.hasNext()){ String tableName = table.next(); String[] tableAlias = tableName.split(" "); String[] mainAlias = mainTable.split(" "); String alias = tableAlias[1]; String mAlias = mainAlias[1]; if(!mainTable.equals(tableName)){ fromTableName.append(" left join ").append(tableName).append(" on ").append(mAlias).append(".").append(keyColumn) .append(" = ").append(alias).append(".").append(keyColumn).append(" "); } }
fromTableName.append(" ) a");
Iterator<String> column = ColumnNameList.iterator();
while(column.hasNext()){
String columnName = column.next();
selectResult.append(",").append(columnName);
}
selectResult.append(fromTableName);
Createtable.append("create table ").append(cocDwName).append(" as SELECT ROW_NUMBER() OVER() as id,* from").append(" (").append(selectResult);
二、由於業務場景,需要將4000萬條數據最終寫入10個文件,這里通過聲明線程池pool,使用多線程的方法執行,有些人會擔心那不會數據錯亂嗎,不會。因為后面要用分頁sql,根據循環傳入的 i 的值進行處理。
private ExecutorService pools = Executors.newFixedThreadPool(15); if(result = true){ String queryCount = "select count(*) from "+cocDwName; int count = ciCustomerJDao.getDwTotolCount(queryCount); log.info(""+keyColumn); try { for(int i=0;i<10;i++){ CreateDwFileThread jd = new CreateDwFileThread(jndiName,keyColumn,num,cocDwName,count,sysId,i); Future fu = pools.submit(jd); fus.add(fu); } long start = System.currentTimeMillis(); while (true) { boolean done = true; for (Future f : fus) { if (!f.isDone()) { done = false; break; } } if (!done) { try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { log.error("sleep error", e); e.printStackTrace(); } continue; } else { break; } } log.debug("wait tasks finish cost:" + (System.currentTimeMillis() - start)); }catch(Exception e){ result = false; log.error("error", e); } }
三、根據第一步創建的表中的自增序列ID進行分頁,由於要多線程並發執行,所以不能使用傳統分頁的begin與end,根據步驟二中傳入的 i (這里參數為partNumber)進行處理,根據循環,每條線程執行的開始數據必定以上條數據結束的條數為開始,每次將查詢出來的結果集通過list2File寫入文件。這里還有個while循環,因為分成10份還是有400萬條數據啊,還是覺得大,於是就又分成了10次~就是說每次查詢出40萬條寫入文件,直到新加入400萬條flag返回true退出循環。
while(flag == false){ pager.setPageSize(bufferedRowSize); pager.setPageNum(pageNumber); int begin = (pager.getPageNum() - 1) * pager.getPageSize()+createFileCount*partNumber; int end = begin + pager.getPageSize(); if(end >= createFileCount*(partNumber+1)){ end = createFileCount*(partNumber+1); } StringBuffer sql = new StringBuffer() ; sql.append(" select ").append(columns).append(" from ").append(cocDwName).append(" where id > ").append(begin).append(" and ").append(" id < ").append(end+1); JdbcBaseDao jdbcBaseDao = (JdbcBaseDao) SystemServiceLocator.getInstance().getService("jdbcBaseDao"); String BackjndiName = PropertiesUtils.getProperties("JNDI_CI_BACK"); final String file = fileLocalPath + File.separator + dwName+ "_" + String.valueOf(partNumber)+ ".csv"; Log.info("---------sql;:"+ sql + "-------fileName:"+file);
List<Map<String, Object>> dataList = jdbcBaseDao.getBackSimpleJdbcTemplate().queryForList(sql.toString()); if (dataList.size() > 0) { list2File(dataList, title, columns, file, encode, null, null); pageNumber++; } if(end == createFileCount * partNumber + createFileCount){ flag = true; }
有人會問你為啥不用ResultSet 直接放入400萬條數據 為啥還要分開每40萬條數據再分頁寫~ 我想說 我就是想這么干~ 啊哈哈。。。不過程序中貌似是有問題的 沒有考慮到的情景,所以還在推敲。。(Resultset 查出來400萬條不還是放在內存中,還是有可能內存溢出的,分頁寫大不了通過thriftserver多連接幾次spark嘛~ 不過代碼寫的很爛,還在提高哈~)