hbase是基於大表的數據庫 ===================================== 隨機訪問和實時讀寫 hbase和hive的區別: hbase:低延遲實時性,不支持分析 hive:高延遲,分析工具 awk '{print $1}' //默認以'\t'分割,截串取第一個成員 hbase原理: ======================== hdfs://mycluster/user/hbase/data //hbase數據 namespace //文件夾 Htable //文件夾 Hregion //區域,文件夾 CF //列族, 文件夾 數據 //文件形式存在(row,cf,col,value) hbase在寫入數據時,先寫入到memstore中,到達一定的值或重啟的時候會實例化為文件 scan 'hbase:meta' ns1:t1,,1522573626932.02aab65df5b31 column=info:regioninfo, timestamp=1522630951985, value={ENCODED => 02aab65df5b31c1145a8b799ced42c92, NAM c1145a8b799ced42c92. E => 'ns1:t1,,1522573626932.02aab65df5b31c1145a8b799ced42c92.', STARTKEY => '', ENDKEY => ''} ns1:t1,,1522573626932.02aab65df5b31 column=info:seqnumDuringOpen, timestamp=1522630951985, value=\x00\x00\x00\x00\x00\x00\x00\x15 c1145a8b799ced42c92. ns1:t1,,1522573626932.02aab65df5b31 column=info:server, timestamp=1522630951985, value=s102:16020 c1145a8b799ced42c92. ns1:t1,,1522573626932.02aab65df5b31 column=info:serverstartcode, timestamp=1522630951985, value=1522629569862 c1145a8b799ced42c92. hbase:meta表存放的是所有表的元數據,即表(區域)和regionserver的映射關系 問題1:hbase:meta表是誰負責管轄? //每個表(包括元數據表),都分別由一個regionserver負責的 問題2:元數據中的元數據,hbase:meta的元數據存放位置? //hbase:meta表的數據信息,由zk負責管轄 普通表: 表體由regionserver負責管理,元數據是由hbase:meta表負責管理 hbase:meta表: 表體由regionserver負責管理,元數據是由zk負責管轄 寫(put)流程: 1、客戶端先聯系zk,找到hbase:meta表的regionserver,在此請求數據 2、通過hbase:meta的數據,找到表(區域)的映射關系,通過regionserver定位到指定表(區域) 3、查詢信息一旦被觸發,會被緩存,以便下次使用 4、用戶向區域服務器(RegionServer)發起put請求時,會將請求交給對應的區域(region)實例來處理。 5、決定數據是否需要寫到由HLog實現的預寫日志(WAL)中。 6、一旦數據被寫入到WAL中,數據會被放到MemStore中 7、memstore如果寫滿,則會被刷寫(flush)到磁盤中,生成新的hfile 區域:region管理 ====================================== 是regionserver的基本管理單位 rowKey是以字節數組進行排序 表切割: split 'ns1:t1','row500' 區域切割: split 'ns1:t1,row500,1522634115455.b88d6163f7e2eced2f2297c7fe938b3b.','row750' 創建表,進行預切割: create 'ns1:t2', 'f1', SPLITS => [ '20', '30', '40'] 移動區域: move '43e14524a984a4bf6a813a4f2a8ac8eb' , 's103,16020,1522634665677' 表切割之后,之前表所在的區域會被切割成兩個新的區域,原區域處於離線狀態,再次啟動hbase時,原區域相關信息會被清空 區域合並 merge_region '28d725266b30bcef636e994f08c9e8d9','976bc7a3b8769184c1a0a5aca1339d99' 緊湊compact: ==================================== 解決flush產生的小文件(hfile),將其進行合並 compact 'ns1:t1' //合並小文件,重啟后生效 compact_rs 's103,16020,1522634665677' //合並regionserver中所有region major_compact //合並小文件,即刻生效 regionname //區域名,eg:ns1:t1,row500,1522634115455.b88d6163f7e2eced2f2297c7fe938b3b. encoded_regionname //編碼區域名,eg:b88d6163f7e2eced2f2297c7fe938b3b 注意不加'.' server_name //regionserver名字, eg:s102,16020,1522634665523 hbase數據寫入流程,代碼分析 ========================================================== 在數據寫入的時候,hbase會初始化一個2M的mutator緩沖區 通過Arrays.asList(m)將Mutation對象,即數據轉換成List writeAsyncBuffer證明,數據寫入是異步寫入 每次寫入數據都會在內部進行自動清理 寫入時將傳來的Mutation對象,即數據放在LinkedList中 put命令時:1、傳入put對象 在傳每個數據時候,都會進行autoflush自動清理,且需要一次rpc通信 2、傳入put集合 將集合傳入並且處理之后,進行一次flush,需要一次rpc通信 10000 關閉自動flush 關閉寫前日志 關閉flush+WAL ---------------------------------------------------------------------- put 38,532ms 3,227ms 10,769ms 2,959ms putBatch 3,507ms 在put中設置關閉自動flush HTable.setAutoFlush(false,false); 在put中關閉WAL寫入 put.setDurability(Durability.SKIP_WAL); WAL ============================= Write ahead log 適用於容災,當機器發生斷電,memstore數據很可能會丟失 WAL:數據寫入到WAL則證明寫入成功,存儲對數據的修改,類似於hadoop的edits文件 如果數據庫崩潰,可以有效地回放日志 ***** 強烈建議用戶不要關閉WAL 實現類:org.apache.hadoop.hbase.regionserver.wal.FSHLog WAL在0.x版本中使用的是SeqFile 新版中不是 memstore: ================================ 1、刷寫到磁盤,閾值是5M <property> <name>hbase.hregion.preclose.flush.size</name> <value>5242880</value> </property> 2、關閉regionserver會使memstore強制刷寫到磁盤 hbase基本命令: ============================ create_namespace 'ns1' //create_namespace 'ns1' drop_namespace 'ns1' //drop_namespace 'ns1' create 'ns1:t1','f1','f2' //create 'ns1:t1','f1','f2' put 'ns1:t1','row1','f1:name','tom' //put 'ns1:t1','row1','f1:name','tom' scan 'ns1:t1' //scan 'ns1:t1' alter 'ns1:t5', 'f3' //添加列族 //alter 'ns1:t5' , 'f3' hbase文件: =========================== 根級文件:/user/hbase WALs //預寫日志,和hadoop中的edits文件類似,作用是容災 //通過以下配置進行周期性滾動 <property> <name>hbase.regionserver.logroll.period</name> <value>3600000</value> <description>毫秒為單位,默認一小時</description> </property> oldWALs //舊的預寫日志,當時間超過1小時,WAL會被回滾到此處 //十分鍾后此文件會被清除,配置文件如下 <property> <name>hbase.master.logcleaner.ttl</name> <value>600000</value> <description>毫秒為單位,默認十分鍾</description> </property> hbase.id //hbase唯一id hbase.version //hbase集群版本信息 corrupt //損壞日志文件 表級文件:/user/hbase/data/ns1/t1 .tabledesc/.tableinfo.0000000001 //串行化后的表描述符desc 'ns1:t1' region級文件: .regioninfo //region的描述,和tableinfo類似 recovered.edits //WAL日志恢復文件 當region足夠大的時候,會自動切分成兩個region,將原文件一分為二 region默認大小是10G ================================================== <property> <name>hbase.hregion.max.filesize</name> <value>10737418240</value> <description> Maximum HStoreFile size. If any one of a column families' HStoreFiles has grown to exceed this value, the hosting HRegion is split in two.</description> </property> 注意:當所有region同時達到閾值時候,會產生切割風暴,嚴重消耗資源 1、設置預切割: create 'ns1:t2', 'f1', SPLITS => [ '20', '30', '40'] 2、設置一個非常大的值,然后手動切割 HFile: ============================= 是hbase的文件格式,以k/v形式存儲,k/v均是字節數組 HFile包括以下內容: 讀取或寫入壓縮塊的存儲空間。 每個塊所指定的I/O操作的壓縮編解碼器 臨時的key存儲 臨時的value存儲 hfile索引,存在於內存,占用空間約為(56+AvgKeySize)*NumBlocks. 性能優化建議 **** 最小塊大小,推薦在 8KB to 1MB之間 順序讀寫推薦大塊,但不便於隨機訪問(因為需要解壓更多的數據) 小塊便於隨機讀寫,但是需要占用更多內存,但是創建起來更慢(因為塊多,每次壓縮都需要flush操作) 由於壓縮緩存,最小塊大小應該在20KB-30KB. 查看hfile: ==================== hbase hfile -a -b -p -v -h -f hdfs://mycluster/user/hbase/data/ns1/t1/77c55f893ecec3bbb2dfd02e3737c0c2/f1/5406345aabc640309be64abd21a2a500 key:row1/f1:age/1522634241444/Put/vlen=1/seqid=22 value:1 Cell: ============================= * 1) row * 2) column family * 3) column qualifier * 4) timestamp * 5) type * 6) MVCC version //multiple version concurrency contorl 多版本並發控制 * 7) value scan操作: ======================= 限定列掃描: HTable.getScanner(Bytes.toBytes("f1"),Bytes.toBytes("name")); //HTable.getScanner(Bytes.toBytes("f1"),Bytes.toBytes("name")); 指定列族掃描: HTable.getScanner(Bytes.toBytes("f1")); //Htable.getScanner(Bytes.toBytes("f1")); 全表掃描 HTable.getScanner(Scanner) //Htable.getScanner(Scanner) 限定row范圍進行掃描: new Scan(Byte[] startKey,Byte[] stopKey) //new Scan(Byte[] startKey,Byte[] stopKey) catch和batch 每次it.next()的時候,都會調用一次rpc,效率很差 避免此問題,引入cache的概念 catch:將指定數量行數進行緩存,到達閾值,通過一個RPC共同發給客戶端 <property> <name>hbase.client.scanner.caching</name> <value>2147483647</value> <description>每次查詢緩存的數量</description> </property> cache 10 1000 10000 1 ------------------------------------------------------------- time 6,423ms 5,434ms 5,831ms 14,339ms batch: 緩存指定的列數,到達閾值,通過一個RPC共同發給客戶端 RPC次數: row x col / min(cache,rowNo) / min(batch, colNo) +1 result次數: row x col / min(batch, colNo) //只和batch有關 hbase過濾器: =================================================== 相當於sql的where子句 使用謂詞下推的原理,通過server端過濾,返回給client數據 hbase過濾器中的比較方法: CompareFilter.CompareOp.EQUAL ====> '=' 使用過濾器: 1、初始化過濾器 //new RowFilter 2、初始化參數:比較方法 //CompareFilter.CompareOp.EQUAL 比較器 // 過濾器: RowFilter //行過濾器 //RowFilter FamilyFilter //列族過濾器 //FamilyFilter QualifierFilter //列過濾器 //QualifierFilter ValueFilter //值過濾器 //ValueFileter SingleColumnValueFilter //單列值過濾器,通過搜索制定列族和制定col的值,返回整行數據 //SingleColumnValueFilter //相當於select * from xxx where id=1; TimeStampFilter //時間戳過濾器 //TimeStampFilter 比較器: BinaryComparator //二進制比較器 //BianryComparator RegexStringComparator //正則比較器, 比較時最好使用EQUAL //RegexStringComparator SubstringComparator //子串比較器, 比較時最好使用EQUAL //SubstringComparator /** * 組合過濾器 * @throws Exception */ @Test public void testCombineFilter() throws Exception { long start = System.currentTimeMillis(); //初始化配置信息 //Configuration conf = new Configuration(); Configuration conf = HBaseConfiguration.create(); //入口點,創建連接 Connection conn = ConnectionFactory.createConnection(conf); //通過getTable方法獲取表的實例 HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t5")); //HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t5")); //Scan scan = new Scan(); Scan scan = new Scan(); //行過濾器,使用正則過濾器,過濾出111結尾的rowKey RowFilter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*111")); //RowFilter filter = new RowFilter(CompareFilter.CompareOpEQUAL,new RegexStringComparator(".*111")); //列族過濾器,使用二進制過濾器,過濾出f2列族的數據 FamilyFilter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("f2"))); //FamilyFilter filter2 = new FamilyFilter(ComparaFilter.CompareOp.EQUAL,new BinaryComparator(Bytes.toBytes("f2"))); FilterList filterList = new FilterList(); //FilterList filterList = new FilterList(); filterList.addFilter(filter1); //filterList.addFilter(filter1); filterList.addFilter(filter2); //filterList.addFilter(filter2); scan.setFilter(filterList); //scan.setFilter(filterList); //通過掃描器得到結果集 ResultScanner rs = table.getScanner(scan); ResultScanner rs = table.getScanner(scan); //得到迭代器 Iterator<Result> it = rs.iterator(); rs.iterator(); TestFilter.printVal(it); TestFilter.printVal(it); table.close(); //table.close(); System.out.println(System.currentTimeMillis() - start); } public static void printVal(Iterator<Result> it){ while (it.hasNext()){ Result next = it.next(); List<Cell> cells = next.listCells(); //next.listCells(); for (Cell cell : cells) { String val = Bytes.toString(CellUtil.cloneValue(cell)); //Bytes.toString(CellUtil.cloneValue(cell)); String clo = Bytes.toString(CellUtil.cloneQualifier(cell)); //Bytes.toString(CellUtil.cloneQualifier(cell)) String cf = Bytes.toString(CellUtil.cloneFamily(cell)); //Bytes.toString(CellUtil.cloneFamily(cell)); String row = Bytes.toString(CellUtil.cloneRow(cell)); //Bytes.toString(CellUtil.cloneRow(cell)); System.out.println(row+"/"+cf+"/"+clo+"/"+val); } } }