項目背景:
在這次影像系統中,我們利用大數據平台做的是文件(圖片、視頻等)批次的增刪改查,每個批次都包含多個文件,上傳完成以后要添加文件索引(文件信息及批次信息),由於在Hbase存儲的過程中,每個文件都對應一個文件rowKey,一個批次就會有很多個RoweKey,查詢的下載的時候就必須根據每個文件的rowkey找到對應的文件,如果一個批次有很多個文件的話,就需要查找很多次,這樣是很浪費時間的,一開始沒注意這么多,開發並且完成功能測試后,覺得一切OK,但是作為大數據后台,對效率的要求非常高,在壓力測試的時候出現了問題,並發量上來之后,查詢下載的速度非常慢,TPS總上不去,仔細分析代碼后,發現了問題。
改進之前的部分代碼如下:
public List<FileInfo> batchGetFileMeta (String systemType, String batchNo, String fileName,String versionNo,BufferedOutputStream bw) { List<FileInfo> fileInfoList = new ArrayList<FileInfo>(); FileStoreInfo fileStoreInfo = batchGetFileStoreInfo(systemType,batchNo, versionNo,bw); if(fileStoreInfo == null){ return null; } List<String> fileNameList=batchGetFileNameByBathNO(systemType,batchNo, fileName, versionNo,bw); if(fileNameList == null || fileNameList.size()==0 ){ return null; } if(fileNameList.size()==1 && ("".equals(fileNameList.get(0)))){ fileInfoList.add(null); return fileInfoList; } int hash = batchNo.hashCode(); String rowKey = ""; String fileNName = fileStoreInfo.getFile_N_Name(); String[] fileNNameArray = fileNName.split(Constants.SPLIT); for(int i=0;i<fileNNameArray.length;i++){ for(int j=0;j<fileNameList.size();j++){ String[] fileNInfo = fileNNameArray[i].split(Constants.SPLITF); if(fileNInfo[0].equals(fileNameList.get(j))){ String version = fileNInfo[1]; String versionNow = version; if(versionNow != null && !versionNow.equals("")){ int length2 = versionNow.length(); for (int k=0 ;k<3-length2 ;k++) { versionNow = "0"+versionNow; } } rowKey = hash + "1" +batchNo + versionNow + fileNInfo[0]; FileInfo fileInfo = batchGetFileMetaByIndex(systemType, rowKey, bw); if(fileInfo == null){ return null; } fileInfo.setFileVersionNO(version); fileInfoList.add(fileInfo); } } } return fileInfoList; } public FileInfo batchGetFileMetaByIndex(String systemType, String rowKey, BufferedOutputStream bw) { Map<String,String> fileInfoMaps = new HashMap<String,String>(); fileInfoMaps = HbaseUtil.queryBykey(Constants.HBASE_TAB+systemType, rowKey, Constants.HBASE_FAMILYY_CF1, Constants.HBASE_COLUMN_L); if(fileInfoMaps == null ){ return null; } String fileInfoStr = fileInfoMaps.get("value"); FileInfo fileInfo = new FileInfo(); fileInfo = (FileInfo) Utils.jsonToObj(fileInfoStr,fileInfo); if(fileInfo == null){ return null; } String userdefinede = getUserDefinedE(systemType, rowKey); fileInfo.setUserDefined(userdefinede); return fileInfo; } public Map<String,String > queryBykey(String tableName, String rowKey,String fam, String col) { Map<String, String> result = new HashMap<String, String>(); HTable table=null; try { if(isExistTable(tableName)){ table = new HTable(conf, tableName); Get scan = new Get(rowKey.getBytes()); Result r = table.get(scan); byte[] bs = r.getValue(Bytes.toBytes(fam), Bytes.toBytes(col)); String value = Bytes.toString(bs); result.put("value", value); table.close(); return result; }else{ return null; } } catch (IOException e) { e.printStackTrace(); return null; }
測試結果如下:
雖然時間比較少,但是遠遠不能滿足效率要求。仔細分析上面代碼不難發現:由於業務需要查詢數據的時候要校驗文件信息,所以代碼中出現了循環套循環的情況,如果某批次的文件數量特別多的話那么循環查詢的次數的增長不是一個數量級的,相當大的一個數字,問題的原因在於拼接rowkey,然后拿着rowkey去查詢,循環多少次就查多少次,雖然Hbase查詢速度快,但這樣也是在浪費時間,經過思考和研究HbaseAPI的時候發現,Hbase支持rowkey批量查詢,思路大概是這樣的:
1) 循環文件信息,循環之中得到拼接rowkey的信息
2) 把得到的rowkey放入list中
3) 循環完畢,用List去查Hbase,將得到的信息放入Map返回
4) 獲取Map中的信息
下面是改進之后的代碼:
改進后:
public List<FileInfo> batchGetFileMetaByBathNo(String systemType, String batchNo, String fileName,String versionNo,BufferedOutputStream bw) { List<FileInfo> fileInfoList = new ArrayList<FileInfo>(); FileStoreInfo fileStoreInfo =batchGetFileStoreInfo(systemType, batchNo, versionNo,bw); if(fileStoreInfo == null){ return null; } List<String>fileNameList=batchGetFileNameByBathNO(systemType, batchNo, fileName, versionNo,bw); if(fileNameList == null || fileNameList.size()==0 ){ return null; } if(fileNameList.size()==1 && ("".equals(fileNameList.get(0)))){ fileInfoList.add(null); return fileInfoList; } String rowKey = ""; List<String> rowkeylist=new ArrayList<>(); String fileNName = fileStoreInfo.getFile_N_Name(); String[] fileNNameArray = fileNName.split(Constants.SPLIT); for(int i=0;i<fileNNameArray.length;i++){ for(int j=0;j<fileNameList.size();j++){ String[] fileNInfo = fileNNameArray[i].split(Constants.SPLITF); if(fileNInfo[0].equals(fileNameList.get(j))){ String version = fileNInfo[1]; String versionNow = version; if(versionNow != null && !versionNow.equals("")){ versionNow=PublicMethod.chengeVerNo(versionNow); } rowKey = batchNo.hashCode()+"1"+batchNo+ versionNow + fileNInfo[0]; rowkeylist.add(rowKey); } } } fileInfoList = batchGetFileMetaByIndex(systemType, rowkeylist, bw); return fileInfoList; } public List<FileInfo> batchGetFileMetaByIndex(String systemType, List<String> rowKey, BufferedOutputStream bw) { List<FileInfo> fileInfoList=new ArrayList<>(); List<Map<String,String>>list=HbaseUtil.queryByList(Constants.HBASE_TAB+systemType, rowKey); if(list.size()==0){ return null; } for(Map<String,String> resultMap:list){ FileInfo fileInfo = new FileInfo(); String LValue = resultMap.get(Constants.HBASE_COLUMN_L); String EValue=resultMap.get(Constants.HBASE_COLUMN_E); if(!"".equals(LValue)&&null!=LValue){ fileInfo = (FileInfo) Utils.jsonToObj(LValue,fileInfo); } if(!"".equals(EValue)&&null!=EValue&&fileInfo!=null){ fileInfo.setUserDefined(EValue); } fileInfoList.add(fileInfo); } return fileInfoList; } public List<Map<String, String>> queryByList(String tableName,List<String> rowKeyList){ Connection connection=null; List<Map<String, String>> list=new ArrayList<>(); List<Get> getList=new ArrayList<Get>(); try { connection=ConnectionFactory.createConnection(conf); Table table=connection.getTable(TableName.valueOf(tableName)); for(String rowKey:rowKeyList){ Get get=new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(Constants.HBASE_FAMILYY_CF1)); getList.add(get); } Result[]results=table.get(getList); for (Result result:results) { Map<String, String> listMap=new HashMap<>(); for(Cell kv:result.rawCells()){ if(Bytes.toString(kv.getQualifier()).equals(Constants.HBASE_COLUMN_E)){ listMap.put(Constants.HBASE_COLUMN_E, Bytes.toString(CellUtil.cloneValue(kv))); }else{ listMap.put(Constants.HBASE_COLUMN_L, Bytes.toString(CellUtil.cloneValue(kv))); } } list.add(listMap); } } catch (IOException e) { e.fillInStackTrace(); }finally{ try { if(connection!=null&&!connection.isClosed()){ connection.close(); } } catch (IOException e) { e.fillInStackTrace(); } } return list; }
Hbase是支持批量查詢的,經過改進之后,從代碼中就可以看出,效率提升了很多,我們對10000條數據進行了測試,發現提升的效率非常明顯,下面是測試圖:
進過優化后,從時間上,我們可以看到,提升的效率非常明顯,這就告訴我們在做項目寫代碼的時候,不要只局限於功能的實現,還要考慮效率上的可行性,從一開始就要做好鋪墊,否則到后期再改是非常麻煩的。