一直有一個問題,今天調查了一下源碼算是明白了。
===問題===
通過java api(如下代碼所示)在創建表的時候,可以通過setMemStoreFlushSize函數來指定memstore的大小,
在集群配置文件中,也可以通過配置hbase.hregion.memstore.flush.size來指定memstore大小。
這兩個地方指定的memestore的有什么區別和關聯?
★參考代碼
package api; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.regionserver.BloomType; public class create_table_sample1 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("t1")); desc.setMemStoreFlushSize(2097152L); //2M(默認128M) desc.setMaxFileSize(10485760L); //10M(默認10G) HColumnDescriptor family1 = new HColumnDescriptor(constants.COLUMN_FAMILY_DF.getBytes()); family1.setTimeToLive(2 * 60 * 60 * 24); //過期時間 family1.setMaxVersions(2); //版本數 desc.addFamily(family1); HColumnDescriptor family2 = new HColumnDescriptor(constants.COLUMN_FAMILY_EX.getBytes()); family2.setTimeToLive(3 * 60 * 60 * 24); //過期時間 family2.setMinVersions(2); //最小版本數 family2.setMaxVersions(3); //版本數 family2.setBloomFilterType(BloomType.ROW); //布隆過濾方式 desc.addFamily(family2); admin.createTable(desc); admin.close(); connection.close(); } }
===解答===
源碼位置:hbase-1.3.1\hbase-server\src\main\java\org\apache\hadoop\hbase\regionserver\
文件名:HRegion.java
函數名:setHTableSpecificConf
調用位置:HRegion類的構造函數
函數內容:
void setHTableSpecificConf() { if (this.htableDescriptor == null) return; long flushSize = this.htableDescriptor.getMemStoreFlushSize(); if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); } this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); }
從上面的源代碼中可以得到如下結論:
1、HRegion(每個Table會分為很多個HRegion分布在不同的HRegionServer中)對象在創建時,會初始化memstoreFlushSize。
2、它的計算首先是由Table決定的,即每個表可以設定自己的memstoreFlushSize。
通過關鍵字MEMSTORE_FLUSHSIZE來設定,或通過HTableDescriptor類中的setMemStoreFlushSize()方法來設定。
3、如果表中未設定,則通過集群參數hbase.hregion.memstore.flush.size來初始化。
4、如果集群參數也未配置的話,則默認為1024*1024*128L,即128M。
所以,可以為不同的表配置不同的MemStore大小。需要在創建表的時候指定。
如果表未單獨配置,則采用集群的統一配置。默認128M。
===擴展===
上面setHTableSpecificConf的源代碼中,還進行了blockingMemStoreSize的初期化,這個參數是什么呢?
從代碼中可以看到,這個參數來源於集群配置項hbase.hregion.memstore.block.multiplier。這個參數與hbase.hregion.memstore.flush.size息息相關。
參數作用:
當一個HRegion中的MemStore的總大小(包含多個Store)超過閾值后,會出發flush請求。
該參數是個倍數,表示一個HRegion的MemStore的總大小最大可以是“hbase.hregion.memstore.flush.size”的幾倍。
如果超過這個值,則會阻塞該HRegion的寫請求,等待flush。
HRegion.java中的put方法。調用了checkResources()
@Override public void put(Put put) throws IOException { checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); startRegionOperation(Operation.PUT); try { // All edits for the given row (across all column families) must happen atomically. doBatchMutate(put); } finally { closeRegionOperation(Operation.PUT); } }
checkResources()方法內容如下:
/* * Check if resources to support an update. * * We throw RegionTooBusyException if above memstore limit * and expect client to retry using some kind of backoff */ private void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; if (this.memstoreSize.get() > this.blockingMemStoreSize) { blockedRequestsCount.increment(); requestFlush(); throw new RegionTooBusyException("Above memstore limit, " + "regionName=" + (this.getRegionInfo() == null ? "unknown" : this.getRegionInfo().getRegionNameAsString()) + ", server=" + (this.getRegionServerServices() == null ? "unknown" : this.getRegionServerServices().getServerName()) + ", memstoreSize=" + memstoreSize.get() + ", blockingMemStoreSize=" + blockingMemStoreSize); } }
--END--