1. 使用“連接池”
如果每次和Hbase交互時都去新建連接的話,顯然是低效率的,HBase也提供類連接池相關的API。
1.1. HTablePool
早期的API中使用它,但很不幸,現在它已經過時了。在次不再描述。
1.2. HConnection
取代HTablePool的就是現在的HConnection,可以通過它拿到幾乎所有關於HBase的相關操作對象。
private static HConnection connection = null; private static Configuration conf =null; static{ try { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "Hadoop-master01,Hadoop-slave01,Hadoop-slave02"); connection = HConnectionManager.createConnection(getHBaseConfiguration()); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } }
2. 讀 優化
2.1. 根據rowkey
如果本操作中只有一個rowkey的話,大可以使用下邊的方式(單個讀):
byte[] rowkey = new byte[]{......}; Get get = new Get(rowkey); Result result = destTable.get(get);
若有多個rowkey的話,可以使用如下方式(批量讀):
List<byte[]> rowList = new ArrayList<byte[]>(); List<Get> gets = new ArrayList<Get>(); for(byte[] row:rowList){ gets.add(new Get(row:)); } Result[] results = destTable.get(gets);
2.2. 使用Scan
Scan scan = new Scan(); ResultScanner resultScanner = srcTable.getScanner(scan);
可以通過設置hbase.client.scanner.caching參數來設置resultScanner從服務器一次抓取的數據條數。默認是一次一條,這樣可以大大的增加結果集游標移動的效率(resultScanner.next())。
設置這個參數的方法有三個:
- HBase的conf配置文件hdfs-site.xml里可以配置
- 表的對象:hTable.setScannerCaching(10000);
- 掃面器對象:scan.setCaching(10000);
另外,還可以通過:
scan.addColumn(Bytes.toBytes("sm"), Bytes.toBytes("ip"));
設置掃描的列,減少不必要的網絡流量,提升讀表效率。
3. 寫 優化
寫數據的操作中每條提交一個put,其中包含了rowkey,還有對於的一列或多列值。
3.1. 寫入單條數據
byte[] row = Bytes.toBytes(...); Put put = new Put(row); put.add(Bytes.toBytes(...), Bytes.toBytes(...), Bytes.toBytes(...)); table.put(put); table.flushCommits();
其中,table.put(put)是把數據提交到HDFS里,執行了table.flushCommits()之后,將會把數據提交到HBase中。
3.2. 寫入多條數據
在寫入多條數據時,就會涉及到數據提交和緩存的問題,具體如下:
- 客戶端維護緩存
使用HTable.setAutoFlush(true)設置客戶端寫入數據時自動維護緩存,當數據達到緩存上限時自動提交數據,這個參數默認是開啟的。設置客戶端自行維護緩存時,可更具需求來設置緩存的大小,HTable.setWriteBufferSize(writeBufferSize)。
但是在實際開發中,並不提倡這種方法。原因是每次table.put(put)去連接hdfs的時間開銷是頻繁的,不適合大吞吐量的批量寫入。
- 手動維護緩存
可以把要寫入的數據先放入本地內存中,然后使用table.put(List<Put>)來提交數據。這樣來減少客戶端和集群的交互次數,提高傳輸的吞吐量。
List<Put> puts = new ArrayList<Put>(); for(int i=0; i<100000; i++){ byte[] rowkey = Bytes.toBytes(RandomStringUtils.random(8,"ABCDESSSSS")); byte[] value = Bytes.toBytes(RandomStringUtils.random(10,"IOJKJHHJNNBGHIKKLM<NH")); Put put = new Put(rowkey); put.add(Bytes.toBytes(FAMILY_CF), Bytes.toBytes("value"), value); puts.add(put); if(i%10000==0){ table.put(puts); table.flushCommits(); puts.clear(); } }
3.3. 自增列
destTable.incrementColumnValue(rowkey, Bytes.toBytes(FAMILY_CF), Bytes.toBytes("testIncrement"),Long.parseLong("1") ,true);
往testIncrement列自增1.在批處理系統中,這種使用方法需要慎用,它每次執行都會提交數據,不能實現這一列的批量提交。