HBase 寫優化之 BulkLoad 實現數據快速入庫


在第一次建立Hbase表的時候,我們可能需要往里面一次性導入大量的初始化數據。我們很自然地想到將數據一條條插入到Hbase中,或者通過MR方式等。但是這些方式不是慢就是在導入的過程的占用Region資源導致效率低下,所以很不適合一次性導入大量數據。本文將針對這個問題介紹如何通過Hbase的BulkLoad方法來快速將海量數據導入到Hbase中。

  總的來說,使用 Bulk Load 方式由於利用了 HBase 的數據信息是按照特定格式存儲在 HDFS 里的這一特性,直接在 HDFS 中生成持久化的 HFile 數據格式文件,然后完成巨量數據快速入庫的操作,配合 MapReduce 完成這樣的操作,不占用 Region 資源,不會產生巨量的寫入 I/O,所以需要較少的 CPU 和網絡資源。Bulk Load 的實現原理是通過一個 MapReduce Job 來實現的,通過 Job 直接生成一個 HBase 的內部 HFile 格式文件,用來形成一個特殊的 HBase 數據表,然后直接將數據文件加載到運行的集群中。與使用HBase API相比,使用Bulkload導入數據占用更少的CPU和網絡資源。

實現原理

  Bulkload過程主要包括三部分:

  1、從數據源(通常是文本文件或其他的數據庫)提取數據並上傳到HDFS。抽取數據到HDFS和Hbase並沒有關系,所以大家可以選用自己擅長的方式進行,本文就不介紹了。

  2、利用MapReduce作業處理實現准備的數據 。這一步需要一個MapReduce作業,並且大多數情況下還需要我們自己編寫Map函數,而Reduce函數不需要我們考慮,由HBase提供。該作業需要使用rowkey(行鍵)作為輸出Key;KeyValue、Put或者Delete作為輸出Value。MapReduce作業需要使用HFileOutputFormat2來生成HBase數據文件。為了有效的導入數據,需要配置HFileOutputFormat2使得每一個輸出文件都在一個合適的區域中。為了達到這個目的,MapReduce作業會使用HadoopTotalOrderPartitioner類根據表的key值將輸出分割開來。HFileOutputFormat2的方法configureIncrementalLoad()會自動的完成上面的工作。

  3、告訴RegionServers數據的位置並導入數據。這一步是最簡單的,通常需要使用LoadIncrementalHFiles(更為人所熟知是completebulkload工具),將文件在HDFS上的位置傳遞給它,它就會利用RegionServer將數據導入到相應的區域。

整個過程圖如下:

 

代碼實現

上面我們已經介紹了Hbase的BulkLoad方法的原理,我們需要寫個Mapper和驅動程序,實現如下:

使用MapReduce生成HFile文件

public class IteblogBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] items = line.split("\t");
  
            ImmutableBytesWritable rowKey = new ImmutableBytesWritable(items[0].getBytes());
            Put put = new Put(Bytes.toBytes(items[0]));   //ROWKEY
            put.addColumn("f1".getBytes(), "url".getBytes(), items[1].getBytes());
            put.addColumn("f1".getBytes(), "name".getBytes(), items[2].getBytes());
            
            context.write(rowkey, put);
        }
}
 

驅動程序

public class IteblogBulkLoadDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final String SRC_PATH= "hdfs://iteblog:9000/user/iteblog/input";
        final String DESC_PATH= "hdfs://iteblog:9000/user/iteblog/output";
        Configuration conf = HBaseConfiguration.create();
       
        Job job=Job.getInstance(conf);
        job.setJarByClass(IteblogBulkLoadDriver.class);
        job.setMapperClass(IteblogBulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HTable table = new HTable(conf,"blog_info");
        HFileOutputFormat2.configureIncrementalLoad(job,table,table.getRegionLocator());
        FileInputFormat.addInputPath(job,new Path(SRC_PATH));
        FileOutputFormat.setOutputPath(job,new Path(DESC_PATH));
          
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
 

通過BlukLoad方式加載HFile文件

public class LoadIncrementalHFileToHBase {
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(configuration);
        LoadIncrementalHFiles loder = new LoadIncrementalHFiles(configuration);
        loder.doBulkLoad(new Path("hdfs://iteblog:9000/user/iteblog/output"),new HTable(conf,"blog_info"));
    }
}
 

由於Hbase的BulkLoad方式是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些復制數據的操作。后面我將會再介紹如何通過Spark來使用Hbase的BulkLoad方式來初始化數據。

BulkLoad的使用案例

  1、首次將原始數據集載入 HBase- 您的初始數據集可能很大,繞過 HBase 寫入路徑可以顯著加速此進程。
  2、遞增負載 - 要定期加載新數據,請使用 BulkLoad 並按照自己的理想時間間隔分批次導入數據。這可以緩解延遲問題,並且有助於您實現服務級別協議 (SLA)。但是,壓縮觸發器就是 RegionServer 上的 HFile 數目。因此,頻繁導入大量 HFile 可能會導致更頻繁地發生大型壓縮,從而對性能產生負面影響。您可以通過以下方法緩解此問題:調整壓縮設置,確保不觸發壓縮即可存在的最大 HFile 文件數很高,並依賴於其他因素,如 Memstore 的大小 觸發壓縮。
  3、數據需要源於其他位置 - 如果當前系統捕獲了您想在 HBase 中包含的數據,且因業務原因需要保持活動狀態,您可從系統中將數據定期批量加載到 HBase 中,以便可以在不影響系統的前提下對其執行操作。

生成HFile程序說明:

①. 最終輸出結果,無論是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

②. 最終輸出部分,Value類型是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer。

③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile文件。好像最新的版本可以多個列族.

④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自動對job進行配置。TotalOrderPartitioner是需要先對key進行整體排序,然后划分到每個reduce中,保證每一個reducer中的的key最小最大值區間范圍,是不會有交集的。因為入庫到HBase的時候,作為一個整體的Region,key是絕對有序的。

⑤. MR例子中最后生成HFile存儲在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。

      說明: 因為在創建HBase表的時候,默認只有一個Region,只有等到這個Region的大小超過一定的閾值之后,才會進行split, 所以為了利用完全分布式加快生成HFile和導入HBase中以及數據負載均衡,所以需要在創建表的時候預先創建分區,可以查閱相關資料(關於HBase調優的資料), 而進行分區時要利用startKey與endKey進行rowKey區間划分(因為導入HBase中,需要rowKey整體有序),所以在導入之前,自己先寫一個MapReduce的Job求最小與最大的rowKey, 即startKey與endKey.

 

3、說明與注意事項:

(1)HFile方式在所有的加載方案里面是最快的,不過有個前提——數據是第一次導入,表是空的。如果表中已經有了數據。HFile再導入到hbase的表中會觸發split操作。

(2)最終輸出結果,無論是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否則報這樣的錯誤:

java.lang.IllegalArgumentException: Can't read partitions file ... Caused by: java.io.IOException: wrong key class: org.apache.hadoop.io.*** is not class org.apache.hadoop.hbase.io.ImmutableBytesWritable

(3)最終輸出部分,Value類型是KeyValue 或Put,對應的Sorter分別是KeyValueSortReducer或PutSortReducer,這個 SorterReducer 可以不指定,因為源碼中已經做了判斷:

if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); }

(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只適合一次對單列族組織成HFile文件,多列簇需要起多個 job,不過新版本的 Hbase 已經解決了這個限制。

(5) MR例子中最后生成HFile存儲在HDFS上,輸出路徑下的子目錄是各個列族。如果對HFile進行入庫HBase,相當於move HFile到HBase的Region中,HFile子目錄的列族內容沒有了。

(6)最后一個 Reduce 沒有 setNumReduceTasks 是因為,該設置由框架根據region個數自動配置的。

(7)下邊配置部分,注釋掉的其實寫不寫都無所謂,因為看源碼就知道configureIncrementalLoad方法已經把固定的配置全配置完了,不固定的部分才需要手動配置。

public class HFileOutput { //job 配置 public static Job configureJob(Configuration conf) throws IOException { Job job = new Job(configuration, "countUnite1"); job.setJarByClass(HFileOutput.class); //job.setNumReduceTasks(2); //job.setOutputKeyClass(ImmutableBytesWritable.class); //job.setOutputValueClass(KeyValue.class); //job.setOutputFormatClass(HFileOutputFormat.class); Scan scan = new Scan(); scan.setCaching(10); scan.addFamily(INPUT_FAMILY); TableMapReduceUtil.initTableMapperJob(inputTable, scan, HFileOutputMapper.class, ImmutableBytesWritable.class, LongWritable.class, job); //這里如果不定義reducer部分,會自動識別定義成KeyValueSortReducer.class 和PutSortReducer.class job.setReducerClass(HFileOutputRedcuer.class); //job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat.configureIncrementalLoad(job, new HTable( configuration, outputTable)); HFileOutputFormat.setOutputPath(job, new Path()); //FileOutputFormat.setOutputPath(job, new Path()); //等同上句 return job; } public static class HFileOutputMapper extends TableMapper<ImmutableBytesWritable, LongWritable> { public void map(ImmutableBytesWritable key, Result values, Context context) throws IOException, InterruptedException { //mapper邏輯部分 context.write(new ImmutableBytesWritable(Bytes()), LongWritable()); } } public static class HFileOutputRedcuer extends Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> { public void reduce(ImmutableBytesWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //reducer邏輯部分 KeyValue kv = new KeyValue(row, OUTPUT_FAMILY, tmp[1].getBytes(), Bytes.toBytes(count)); context.write(key, kv); } } }

4、Refer:

1、Hbase幾種數據入庫(load)方式比較

http://blog.csdn.net/kirayuan/article/details/6371635

2、MapReduce生成HFile入庫到HBase及源碼分析

http://blog.pureisle.net/archives/1950.html

3、MapReduce生成HFile入庫到HBase

http://shitouer.cn/2013/02/hbase-hfile-bulk-load/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM