轉載請標注原鏈接:http://www.cnblogs.com/xczyd/p/5577124.html
客戶在使用HBase的時候,經常會抱怨說寫入太慢,並發上不去等等。從前我遇到這種情況,一般都二話不說,直接去看HBase集群的負載,看看有什么性能瓶頸等等。
某老司機說,且慢,先看看用戶怎么寫的客戶端訪問HBase集群的代碼。
於是花了一些時間去看。
不看不知道,一看就嚇尿。客戶(也包括我們自己的實施)寫出來的客戶端,很多時候存在很多低級錯誤,比如:
(1)濫用sychronize;
(2)創建了連接不釋放;
(3)明明只需要調用一次的API,卻進行了多次調用,要是碰巧遇到比較花時間的API,那性能就可想而知了;
(4)其他各種幺蛾子...
為此,本篇僅從HBase的Java API入手,通過源碼分析和簡單的實驗,找到最合適Java API調用方法(主要服務於高並發場景)。
如果對HBase的Java API不熟悉的話,可以先去官網看一下文檔。
下面開始正文:
使用Java API與HBase集群交互時,需要先創建一個HTable的實例,再使用該實例提供的方法來進行插入/刪除/查詢等操作。
要創建HTable對象,要先創建一個包含了HBase集群信息的配置實例Configuration conf,其一般創建方法如下:
Configuration conf = HBaseConfiguration.create(); //設置HBase集群的IP和端口 conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX"); conf.set("hbase.zookeeper.property.clientPort", "2181");
在擁有了conf之后,可以通過HTable提供的如下兩種構造方法來創建HTable實例:
方法一:直接利用conf來創建HTable實例
對應的構造函數如下:
public HTable(Configuration conf, final TableName tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); this.configuration = conf; this.pool = getDefaultExecutor(conf); this.finishSetup(); }
注意紅色部分的代碼。在這種構造方法中,會調用HConnectionManager的getConnection函數,這個函數以conf作為輸入參數,來獲取了一個HConnection的實例connection。熟悉odbc,jdbc的話,會知道使用Java API進行數據庫操作的時候,都會創建一個類似的connection/connection pool來維護一些數據庫與客戶端之間相互的連接。對於Hbase來說,承擔類似角色的就是HConnection。不過與oracle不同的一點是,HConnection實際上去連接的並不是HBase集群本身,而是維護其關鍵數據信息的Zookeeper(簡稱ZK)集群。有關ZK的內容在這里不做展開,不熟悉的話可以單純地理解為一個獨立的元信息管理角色。回過來看getConnection函數,其具體實現如下:
public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } }
其中,CONNECTION_INSTANCES的類型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。所謂HConnectionImplementation其實就是HConnection的具體實現。繼續注意紅色部分的三行代碼。第一行,通過conf創建了一個HConnectionKey的實例connectionKey;第二行,去CONNECTION_INSTANCES中查找是否存在與connectionKey對應的一個HConnection的實例;第三行,如果不存在,那么調用createConnection來創建一個HConnection的實例,否則直接返回剛才從Map中查找得到的HConnection對象
不嫌麻煩,再看一下HConnectionKey的構造函數和重寫的hashCode函數,代碼分別如下:
HConnectionKey(Configuration conf) { Map<String, String> m = new HashMap<String, String>(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); }
}
public int hashCode() { final int prime = 31; int result = 1; if (username != null) { result = username.hashCode(); } for (String property : CONNECTION_PROPERTIES) { String value = properties.get(property); if (value != null) { result = prime * result + value.hashCode(); } } return result; }
可以看到,hashCode函數被重寫以后,其返回值實際上是username的hashCode函數的返回值,而username來自於currentuser,currentuser又來自於provider,provider是由conf創建的。可以看出,只要有相同的conf,就能創建出相同的username,也就能保證HConnectionKey的hashCode函數被重寫以后,能夠在username相同時返回相同的值。而CONNECTION_INSTANCES是一個LinkedHashMap,其get函數會調用HConnectionKey的hashCode函數來判斷該對象是否已經存在。因此,getConnection函數的本質就是根據conf信息返回connection對象,對每一個內容相同的conf,只會返回一個connection
方法二:調用createConnection方法來顯式地創建Hconnection的實例,再將其作為輸入參數來創建HTable實例
createConnection方法和Htable對應的構造函數分別如下:
public static HConnection createConnection(Configuration conf) throws IOException { UserProvider provider = UserProvider.instantiate(conf); return createConnection(conf, false, null, provider.getCurrent()); } static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = true; this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); this.finishSetup(); }
可以看出,這種構造HTable的方法會通過反射來創建一個新的HConnection實例,而不像方法一中那樣共享一個HConnection實例。
值得一提的是,通過此種方法創建出來的HConnection,是需要在不再使用的時候顯式調用close方法去釋放掉的,否則容易造成端口占用等問題。
那么,上述兩種方法,在執行插入/刪除/查找的時候,性能如何呢?不妨先從代碼角度分析一下。為了簡便,先分析HTable在執行put(插入)操作時具體做的事情。
HTable的put函數如下:
public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doPut(put); if (autoFlush) { flushCommits(); } } private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ writeAsyncBuffer.add(put); backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } } private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { do { ap.submit(writeAsyncBuffer, true); } while (synchronous && !writeAsyncBuffer.isEmpty()); if (synchronous) { ap.waitUntilDone(); } if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); while (!writeAsyncBuffer.isEmpty()) { ap.submit(writeAsyncBuffer, true); } ap.waitUntilDone(); if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility writeAsyncBuffer.addAll(ap.getFailedOperations()); } RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } } }
如紅色部分所表示,調用順序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是類AsyncProcess的對象。因此追蹤到AsyncProcess類,其代碼如下:
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { submitLowPriority(rows, atLeastOne, false); } public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); long currentTaskCnt = tasksDone.get(); boolean alreadyLooped = false; NonceGenerator ng = this.hConnection.getNonceGenerator(); do { if (alreadyLooped){ // if, for whatever reason, we looped, we want to be sure that something has changed. waitForNextTaskDone(currentTaskCnt); currentTaskCnt = tasksDone.get(); } else { alreadyLooped = true; } // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, posInList); if (loc == null) { // loc is null if there is an error such as meta not available. it.remove(); } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); addAction(loc, action, actionsByServer, ng); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority); } private HRegionLocation findDestLocation(Row row, int posInList) { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try { loc = hConnection.locateRegion(this.tableName, row.getRow()); if (loc == null) { locationException = new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } } catch (IOException e) { locationException = e; } if (locationException != null) { // There are multiple retries in locateRegion already. No need to add new. // We can't continue with this row, hence it's the last retry. manageError(posInList, row, false, locationException, null); return null; } return loc; }
這里代碼的主要實現機制是異步調用,也就是說,並非每一次put操作都是直接往HBase里面寫數據的,而是等到緩存區域內的數據多到一定程度(默認設置是2M),再進行一次寫操作。當然這次操作在Server端應當還是要排隊執行的,具體執行機制這里不作展開。可以確定的是,HConnection在插入/查詢/刪除的Java API中,只是起到一個定位RegionServer的作用,在定位到RegionServer之后,操作都是由client端通過rpc調用完成的,與客戶端創建的connection的數目無關。另外,locateRegion其實只有在沒有命中緩存的時候才會進行rpc通信,其他時候都是直接從緩存中獲取RegionServer信息,詳情可以查看locateRegion的源碼,這里也不再展開。
代碼分析告一段落,通過分析可以確定,createConnection的方法創建出大量的HConnection並不會對寫入性能有任何幫助。相反,由於白白浪費了資源,還會比getConnection更慢。但是慢多少,無法僅憑代碼作出判斷。
不妨簡單做一個實驗來驗證上述論斷:
服務器環境:四台linux服務器組成的HBase集群, 內存64G,ping一次平均約5ms(嚴謹一點的話應該再提供一下cpu核數、頻率,以及磁盤轉速等信息)
客戶端環境:在Mac上裝的ubuntu虛擬機,分配內存10G,CPU、網絡和磁盤讀寫速度都要比物理機慢不少,但是不影響結論
實驗代碼如下:
public class HbaseConectionTest { public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX"); conf.set("hbase.zookeeper.property.clientPort", "2181"); ThreadInfo info = new ThreadInfo(); info.setTableNamePrefix("test"); info.setColNames("col1,col2"); info.setTableCount(1); info.setConnStrategy("CREATEWITHCONF");//CREATEWITHCONF,CREATEWITHCONN info.setWriteStrategy("SEPERATE");//OVERLAP,SEPERATE info.setLifeCycle(60000L); int threadCount = 100; for(int i=0;i<threadCount;i++){ //createTable(tableNamePrefix+i,colNames,conf); } // for(int i=0;i<threadCount;i++){ new Thread(new WriteThread(conf,info,i)).start(); } //HBaseAdmin admin = new HBaseAdmin(conf); //System.out.println(admin.tableExists("test")); } public static void createTable(String tableName,String[] colNames,Configuration conf) { System.out.println("start create table "+tableName); try { HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); if (hBaseAdmin.tableExists(tableName)) { System.out.println(tableName + " is exist"); //hBaseAdmin.disableTable(tableName); //hBaseAdmin.deleteTable(tableName); return; } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); for(int i=0;i<colNames.length;i++) { tableDescriptor.addFamily(new HColumnDescriptor(colNames[i])); } hBaseAdmin.createTable(tableDescriptor); } catch (Exception ex) { ex.printStackTrace(); } System.out.println("end create table "+tableName); } } //Thread執行操作的配置信息 class ThreadInfo { private int tableCount; String tableNamePrefix; String[] colNames; //CREATEBYCONF or CREATEBYCONN String connStrategy; //overlap or seperate String writeStrategy; long lifeCycle; public ThreadInfo(){ } public int getTableCount() { return tableCount; } public void setTableCount(int tableCount) { this.tableCount = tableCount; } public String getTableNamePrefix() { return tableNamePrefix; } public void setTableNamePrefix(String tableNamePrefix) { this.tableNamePrefix = tableNamePrefix; } public String[] getColNames() { return colNames; } public void setColNames(String[] colNames) { this.colNames = colNames; } public void setColNames(String colNames) { if(colNames == null){ this.colNames = null; } else{ this.colNames = colNames.split(","); } } public String getWriteStrategy() { return writeStrategy; } public void setWriteStrategy(String writeStrategy) { this.writeStrategy = writeStrategy; } public String getConnStrategy() { return connStrategy; } public void setConnStrategy(String connStrategy) { this.connStrategy = connStrategy; } public long getLifeCycle() { return lifeCycle; } public void setLifeCycle(long lifeCycle) { this.lifeCycle = lifeCycle; } } class WriteThread implements Runnable{ private Configuration conf; private ThreadInfo info; private int index; public WriteThread(Configuration conf,ThreadInfo info,int index){ this.conf = conf; this.info = info; this.index = index; } @Override public void run(){ String threadName = Thread.currentThread().getName(); int operationCount = 0; HTable[] htables = null; HConnection conn = null; int tableCount = info.getTableCount(); String tableNamePrefix = info.getTableNamePrefix(); String[] colNames = info.getColNames(); String connStrategy = info.getConnStrategy(); String writeStrategy = info.getWriteStrategy(); long lifeCycle = info.getLifeCycle(); System.out.println(threadName+": started with index "+index); try{ if (connStrategy.equals("CREATEWITHCONN")) { conn = HConnectionManager.createConnection(conf); if (writeStrategy.equals("SEPERATE")) { htables = new HTable[1]; htables[0] = new HTable(TableName.valueOf(tableNamePrefix+(index%tableCount)), conn); } else if(writeStrategy.equals("OVERLAP")) { htables = new HTable[tableCount]; for (int i = 0; i < tableCount; i++) { htables[i] = new HTable(TableName.valueOf(tableNamePrefix+i), conn); } } else{ return; } } else if (connStrategy.equals("CREATEWITHCONF")) { conn = null; if (writeStrategy.equals("SEPERATE")) { htables = new HTable[1]; htables[0] = new HTable(conf,TableName.valueOf(tableNamePrefix+(index%tableCount))); } else if(writeStrategy.equals("OVERLAP")) { htables = new HTable[tableCount]; for (int i = 0; i < tableCount; i++) { htables[i] = new HTable(conf,TableName.valueOf(tableNamePrefix+i)); } } else{ return; } } else { return; } long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); Map<HTable,HColumnDescriptor[]> table_columnFamilies = new HashMap<HTable,HColumnDescriptor[]>(); for(int i=0;i<htables.length;i++){ table_columnFamilies.put(htables[i],htables[i].getTableDescriptor().getColumnFamilies()); } while(end-start<=lifeCycle){ HTable table = htables.length==1?htables[0]:htables[(int)Math.random()*htables.length]; long s1 = System.currentTimeMillis(); double r = Math.random(); HColumnDescriptor[] columnFamilies = table_columnFamilies.get(table); Put put = generatePut(threadName,columnFamilies,colNames,operationCount); table.put(put); if(r>0.999){ System.out.println(System.currentTimeMillis()-s1); } operationCount++; end = System.currentTimeMillis(); } if(conn != null){ conn.close(); } }catch(Exception ex){ ex.printStackTrace(); } System.out.println(threadName+": ended with operation count:"+operationCount); } private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,String[] colNames,int operationCount){ Put put = new Put(Bytes.toBytes(threadName+"_"+operationCount)); for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); //System.out.println("familyName:"+familyName); for(int j=0;j<colNames.length;j++){ if(familyName.equals(colNames[j])) { // String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j)); String val = ""+columnName.hashCode()%100; put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val)); } } } //System.out.println(put.toString()); return put; } }
簡單來說就是先創建一些有兩列的HBase表,然后創建一些線程分別采用getConnection策略和createConnection策略來寫1分鍾的數據。當然寫幾張表,寫多久,寫什么,怎么寫都可以調整。比如我這里就設計了固定寫一張表或者隨機寫一張表幾種邏輯。需要注意一下紅色部分的代碼,這里預先獲得了要寫的HBase表的列信息。做這個動作的原因是getTableDescriptor是會產生網絡開銷的,建議寫代碼時盡量少調用,以免增加不必要的額外開銷(事實上這個額外開銷是很巨大的)。
具體實驗數據如下表所示,具體值因為網絡波動等原因會有所差異。總的來說,在並發較高(線程數大於30)的時候,getConnection方法速度要明顯快於createConnection;在並發較低的(線程數小於等於10)的時候,createConnection則稍微占優。另外,使用getConnection的時候,寫一張表的速度在高並發場景下要明顯快於寫多張表,但是在低並發情況下此現象不明顯;使用createConnection的時候,無論並發高低,寫一張表的速度與寫多張表大致相同,甚至還偏慢。
上述現象與代碼分析的結果並不完全一致。不一致的地方主要包括如下兩點:
(1)為什么線程少的時候,createConnection占優?理論上應該持平才是。這一點無法得到很合理的解釋,存疑;
(2)為什么線程很多的時候,createConnection會慢這么多?這里猜測服務端的ZK要維護大量連接會負載過大,即便是多個regionServer在負責具體的寫操作,也仍舊會導致性能下降。
這兩個疑點還有待進一步論證。盡管如此,還是可以先建議大家在使用Java API與HBase交互時,尤其是處理高並發場景的時候,盡量使用getConnection的辦法去創建HTable對象,避免維護不必要的connection導致浪費資源。
thread_count | table_count | conn_strategy | write_strategy | interval | result |
1 | 1 | CONF | OVERLAP | 60s | 10000*1=10000 |
5 | 1 | CONF | OVERLAP | 60s | 11000*5=55000 |
10 | 1 | CONF | OVERLAP | 60s | 12000*10=120000 |
30 | 1 | CONF | OVERLAP | 60s | 8300*30=249000 |
60 | 1 | CONF | OVERLAP | 60s | 6000*60=360000 |
100 | 1 | CONF | OVERLAP | 60s | 4700*100=470000 |
1 | 1 | CONN | OVERLAP | 60s | 12000*1=12000 |
5 | 1 | CONN | OVERLAP | 60s | 16000*5=80000 |
10 | 1 | CONN | OVERLAP | 60s | 10000*10=100000 |
30 | 1 | CONN | OVERLAP | 60s | 2500*30=75000 |
60 | 1 | CONN | OVERLAP | 60s | 1200*60=72000 |
100 | 1 | CONN | OVERLAP | 60s | 1000*100=100000 |
5 | 5 | CONF | SEPERATE | 60s | 10600*5=53000 |
10 | 10 | CONF | SEPERATE | 60s | 11900*10=119000 |
30 | 30 | CONF | SEPERATE | 60s | 6900*30=207000 |
60 | 60 | CONF | SEPERATE | 60s | 3650*60=219000 |
100 | 100 | CONF | SEPERATE | 60s | 2500*100=250000 |
5 | 5 | CONN | SEPERATE | 60s | 14000*5=70000 |
10 | 10 | CONN | SEPERATE | 60s | 10500*10=105000 |
30 | 30 | CONN | SEPERATE | 60s | 3250*30=97500 |
60 | 60 | CONN | SEPERATE | 60s | 1450*60=87000 |
100 | 100 | CONN | SEPERATE | 60s | 930*100=93000 |