Hbase 客戶端Scan
標簽(空格分隔): Hbase
HBase掃描操作Scan
1 介紹
掃描操作的使用和get()方法類似。同樣,和其他函數類似,這里也提供了Scan類。但是由於掃描工作方式類似於迭代器,所以用戶無需調用scan()方法創建實例,只需要調用HTable的getScanner()方法,此方法才是返回真正的掃描器(scanner)實例的同時,用戶也可以使用它迭代獲取數據,Table中的可用的方法如下:
ResultScanner getScanner(Scan scan)
ResultScanner getScanner(byte[] family)
ResultScanner getScanner(byte[] family, byte[] qualifier)
簡單示例:
public static ResultScanner scan(String tableName,String family,String qualifier) {
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner rs = table.getScanner(scan);
// 一般返回ResultScanner,遍歷即可
// if (rs!=null){
// String row = null;
// String quali = null;
// String value = null;
// for (Result result : rs) {
// row = Bytes.toString(CellUtil.cloneRow(result.getColumnLatestCell(family.getBytes(), qualifier.getBytes())));
// quali =Bytes.toString(CellUtil.cloneQualifier(result.getColumnLatestCell(family.getBytes(), qualifier.getBytes())));
// value =Bytes.toString(CellUtil.cloneValue(result.getColumnLatestCell(family.getBytes(), qualifier.getBytes())));
// System.out.println(row+"-"+quali+"-"+value);
// }
// }
return rs;
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table!=null){
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return null;
}
后兩個為了方便用戶,隱式地幫助用戶創建一個Scan實例,邏輯中最后調用getScanner(Scan scan)方法。Scan類擁有以下構造器:
public Scan()
public Scan(byte [] startRow)
public Scan(byte [] startRow, byte [] stopRow)
public Scan(byte [] startRow, Filter filter)
public Scan(Get get)
public Scan(Scan scan)
用戶可以選擇性的提供startRow參數,來定義掃描讀取HBase表的起始行鍵,即行鍵不是必須指定的。同時可選stopRow參數來限定讀取到何處停止。
其實行包括在內,而終止行不包含在內。一般區間表示為[startRow,stopRow)
掃描操作有一個特點:用戶提供的參數不必精確匹配兩行。掃描會匹配相等或者大於給定的起始行的行鍵。如果沒有顯示地指定起始行,它會從表的起始位置開始獲取數據。即如果沒顯式指定 startRow,StopRow,會執行全表掃描。
當遇到了與設置的終止行相同或者大於終止行的行鍵時,掃描也會終止。如果沒有指定終止鍵,會掃描到表尾。
另一個可選的參數叫做過濾器(filter),可直接指向Filter實例。盡管Scan實例通常由空白構造器構造,但其所有可選參數都有對應的getter方法和setter方法。
創建Scan實例后,用戶可能還要給它增加更多限制條件。這種情況下,用戶仍然可以使用空白 參數的掃描,它可以讀取整個表格,包括所有列族以及它們的所有列。可以用多種方法限制要讀取的數據:
public Scan addFamily(byte [] family) // 方法限制返回數據的列族
public Scan addColumn(byte [] family, byte [] qualifier) // 方法限制返回的列
Scan setTimeRange(long minStamp,long maxStamp) // 設置時間范圍
Scan setTimeStamp(long timestamp) // 設置時間戳
Scan setMaxVersions() // 設置最大版本數
Scan setMaxVersions(int maxVersions) // 設置最大版本數
ResultScanner類
掃描操作不會通過一個RPC請求返回所有匹配的行,而是以行為單位進行返回。很明顯,行的數目很大,可能有上千條甚至更多,同時在一次請求中發送大量數據,會占用大量的系統資源並消耗很長時間。例如影響寫等其他影響。
ResultScanner類把掃描操作轉換為類似的get操作,它將每一行數據封裝成一個Result實例,並將所有的Result實例放入一個迭代器中。ResultScanner的一些方法如下:
Result next()
Result[] next(int nbRows)
void close()
掃描器租約
要確保盡早釋放掃描器對象,一個打開的掃描器會占用不少的服務端資源,累計多了會占用大量的堆空間。當使用完ResultScanner之后調用它的close()方法,同時當把close()方法放到try/finally塊中,以保證其在迭代獲取數據過程中出現異常和錯誤時,仍然能執行close()。
設置掃描器緩存
每一個next()調用都會為每一行數據生成一個單獨的RPC請求,即使使用next(int nbRows)方法也是如此,因為該方法僅僅是在客戶端循環地調用next()方法。很顯然,當單元格數據較少時,這樣做的性能不會很好。因此,如果一次RPC請求可以獲取多行數據,這樣更有意義。這樣的方法可以由掃描器的緩存實現,默認情況下,這個緩存是關閉的。
Scan類中提供了設置緩存的方法如下:
public Scan setCacheBlocks(boolean cacheBlocks) // 設置是否應用緩存塊來進行掃描
public boolean getCacheBlocks() // 查看是否支持塊緩存
public Scan setCaching(int caching) // 設置掃描器的緩存行數
public int getCaching() // 獲取掃描器中的緩存行數
用戶需要少量的RPC請求次數和客戶端以及服務器的內存消耗找到平衡點。很多時候,設置掃描器緩存可以提高性能,不過設置的太高就會產生不良的影響:每次調用next()將會占用更長的時間,因為要獲取更多的文件並傳輸到客戶端,如果返回給客戶端的數據超出了其堆的大小,程序就會終止並拋出OutOfMemoryException異常。
Tip:
當傳輸和處理數據的時間超過配置的掃描器租約時間時,用戶將會收到一個ScannerTimeoutException形式拋出的租約過期錯誤。
/**
* 添加掃描
*/
@Test
public void testScanCacheBatch() throws Exception {
//
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t2"));
Scan scan = new Scan();
System.out.println(scan.getBatch());
//三行
scan.setCaching(3) ;
//2列
scan.setBatch(2) ;
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> it = scanner.iterator();
while (it.hasNext()) {
Result r = it.next();
outResult(r);
}
scanner.close();
}
private void outResult(Result r){
System.out.println("=========================");
List<Cell> cells = r.listCells();
for(Cell cell : cells){
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
String f = Bytes.toString(CellUtil.cloneFamily(cell));
String col = Bytes.toString(CellUtil.cloneQualifier(cell));
long ts = cell.getTimestamp();
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(rowkey+"/"+f+":"+col+"/"+ts + "=" + value);
}
}
cache
在默認情況下,如果你需要從hbase中查詢數據,在獲取結果ResultScanner時,hbase會在你每次調用ResultScanner.next()操作時對返回的每個Row執行一次RPC操作。即使你使用ResultScanner.next(int nbRows)時也只是在客戶端循環調用RsultScanner.next()操作,你可以理解為hbase將執行查詢請求以迭代器的模式設計,在執行next()操作時才會真正的執行查詢操作,而對每個Row都會執行一次RPC操作。
因此顯而易見的就會想如果我對多個Row返回查詢結果才執行一次RPC調用,那么就會減少實際的通訊開銷。這個就是hbase配置屬性“hbase.client.scanner.caching”的由來,設置cache可以在hbase配置文件中顯示靜態的配置,也可以在程序動態的設置。
cache值得設置並不是越大越好,需要做一個平衡。cache的值越大,則查詢的性能就越高,但是與此同時,每一次調用next()操作都需要花費更長的時間,因為獲取的數據更多並且數據量大了傳輸到客戶端需要的時間就越長,一旦你超過了maximum heap the client process 擁有的值,就會報outofmemoryException異常。當傳輸rows數據到客戶端的時候,如果花費時間過長,則會拋出ScannerTimeOutException異常。
batch
在cache的情況下,我們一般討論的是相對比較小的row,那么如果一個Row特別大的時候應該怎么處理呢?要知道cache的值增加,那么在client process 占用的內存就會隨着row的增大而增大。在hbase中同樣為解決這種情況提供了類似的操作:Batch。可以這么理解,cache是面向行的優化處理,batch是面向列的優化處理。它用來控制每次調用next()操作時會返回多少列,比如你設置setBatch(5),那么每一個Result實例就會返回5列,如果你的列數為17的話,那么就會獲得四個Result實例,分別含有5,5,5,2個列。
下面會以表格的形式來幫助理解,假設我們擁有10Row,每個row擁有2個family,每個family擁有10個列。(也就是說每個Row含有20列)。
| 緩存 | 批量處理 | Result 個數 | RPC次數 | 說明 |
|---|---|---|---|---|
| 1 | 1 | 200 | 201 | 每個列都作為一個Result實例返回。最后還多一個RPC確認掃描完成 |
| 200 | 1 | 200 | 2 | 每個Result實例都只包含一列的值,不過它們都被一次RPC請求取回 |
| 2 | 10 | 20 | 11 | 批量參數是一行所包含的列數的一半,所以200列除以10,需要20個result實例。同時需要10次RPC請求取回。 |
| 5 | 100 | 10 | 3 | 對一行來講,這個批量參數實在是太大了,所以一行的20列都被放入到了一個Result實例中。同時緩存為5,所以10個Result實例被兩次RPC請求取回。 |
| 5 | 20 | 10 | 3 | 同上,不過這次的批量值與一行列數正好相同,所以輸出與上面一種情況相同 |
| 10 | 10 | 20 | 3 | 這次把表分成了較小的result實例,但使用了較大的緩存值,所以也是只用了兩次RPC請求就返回了數據 |
要計算一次掃描操作的RPC請求的次數,用戶需要先計算出行數和每行列數的乘積。然后用這個值除以批量大小和每行列數中較小的那個值。最后再用除得的結果除以掃描器緩存值。 用數學公式表示如下:
RPC請求的次數=(行數x每行的列數)/Min(每行的列數,批量大小)/掃描器緩存
