通過BulkLoad的方式快速導入海量數據


摘要

加載數據到HBase的方式有多種,通過HBase API導入或命令行導入或使用第三方(如sqoop)來導入或使用MR來批量導入(耗費磁盤I/O,容易在導入的過程使節點宕機),但是這些方式不是慢就是在導入的過程的占用Region資料導致效率低下,今天要講的就是利用HBase在HDFS存儲原理及MapReduce的特性來快速導入海量的數據
 
HBase數據在HDFS下是如何存儲的?
HBase中每張Table在根目錄(/HBase)下用一個文件夾存儲,Table名為文件夾名,在Table文件夾下每個Region同樣用一個文件夾存儲,每個Region文件夾下的每個列族也用文件夾存儲,而每個列族下存儲的就是一些HFile文件,HFile就是HBase數據在HFDS下存儲格式,其整體目錄結構如下:
/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>
 
HBase數據寫路徑
                                                                              (圖來自Cloudera)
在put數據時會先將數據的更新操作信息和數據信息寫入WAL,在寫入到WAL后,數據就會被放到MemStore中,當MemStore滿后數據就會被flush到磁盤(即形成HFile文件),在這過程涉及到的flush,split,compaction等操作都容易造成節點不穩定,數據導入慢,耗費資源等問題,在海量數據的導入過程極大的消耗了系統性能, 避免 這些問題最好的方法就是使用BlukLoad的方式來加載數據到HBase中。
 
原理
利用HBase數據按照HFile格式存儲在HDFS的原理,使用Mapreduce直接生成HFile格式文件后,RegionServers再將HFile文件移動到相應的Region目錄下
其流程如下圖:
                                                                      (圖來自Cloudera)
導入過程
1.使用MapReduce生成HFile文件
GenerateHFile類
public class GenerateHFile extends Mapper<LongWritable,
        Text, ImmutableBytesWritable, Put>{
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] items = line.split("\t");

            String ROWKEY = items[1] + items[2] + items[3];
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes());
            Put put = new Put(ROWKEY.getBytes());   //ROWKEY
            put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes());
            put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes());  //出發點
            put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes());  //目的地
            put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes());   //出發時間
            put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4])));  //價格
            put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式
            put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店
          
            context.write(rowkey, put); 
        }
}

 

GenerateHFileMain類
public class GenerateHFileMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        final String INPUT_PATH= "hdfs://master:9000/INFO/Input";
        final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output";
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        Table table = connection.getTable(TableName.valueOf("TRAVEL"));
        Job job=Job.getInstance(conf);
        job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar");  //預先將程序打包再將jar分發到集群上
        job.setJarByClass(GenerateHFileMain.class);
        job.setMapperClass(GenerateHFile.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")))
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
        System.exit(job.waitForCompletion(true)?0:1);
    }
 
注意
1.Mapper的輸出Key類型必須是包含Rowkey的ImmutableBytesWritable格式,Value類型必須為KeyValue或Put類型,當導入的數據有多列時使用Put,只有一個列時使用KeyValue
 
2.job.setMapOutPutValueClass的值決定了job.setReduceClass的值,這里Reduce主要起到了對數據進行排序的作用,當job.setMapOutPutValueClass的值Put.class和KeyValue.class分別對應job.setReduceClass的PutSortReducer和KeyValueSortReducer
 
3.在創建表時對表進行預分區再結合MapReduce的並行計算機制能加快HFile文件的生成,如果對表進行了預分區(Region)就設置Reduce數等於分區數(Region)
 
4.在多列族的情況下需要進行多次的context.write
 
 
2.通過BlukLoad方式加載HFile文件
public class LoadIncrementalHFileToHBase {
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        Table table = connection.getTable(TableName.valueOf("TRAVEL"));
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
        load.doBulkLoad(new Path("hdfs://master:9000/HFILE/OutPut"), admin,table,connection.getRegionLocator(TableName.valueOf("TRAVEL")));
    }
}
 
由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些復制數據的操作
 
優點:
 
1.導入過程不占用Region資源
 
2.能快速導入海量的數據
 
3.節省內存
 
 
參考文章:
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM