Bulk Load-HBase數據導入最佳實踐


一、概述

HBase本身提供了非常多種數據導入的方式,通常有兩種經常使用方式:

1、使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將數據導入HBase

2、還有一種方式就是使用HBase原生Client API

這兩種方式因為須要頻繁的與數據所存儲的RegionServer通信。一次性入庫大量數據時,特別占用資源,所以都不是最有效的。了解過HBase底層原理的應該都知道,HBase在HDFS中是以HFile文件結構存儲的,一個比較高效便捷的方法就是使用 “Bulk Loading”方法直接生成HFile,即HBase提供的HFileOutputFormat類。


二、Bulk Load基本原理

Bulk Load處理由兩個主要步驟組成

1、准備數據文件

Bulk Load的第一步。會執行一個Mapreduce作業,當中使用到了HFileOutputFormat輸出HBase數據文件:StoreFile。

HFileOutputFormat的作用在於使得輸出的HFile文件能夠適應單個region。使用TotalOrderPartitioner類將map輸出結果分區到各個不同的key區間中,每一個key區間都相應着HBase表的region。

2、導入HBase表

第二步使用completebulkload工具將第一步的結果文件依次交給負責文件相應region的RegionServer,並將文件move到region在HDFS上的存儲文件夾中。一旦完畢。將數據開放給clients。

假設在bulk load准備導入或在准備導入與完畢導入的臨界點上發現region的邊界已經改變,completebulkload工具會自己主動split數據文件到新的邊界上。可是這個過程並非最佳實踐,所以用戶在使用時須要最小化准備導入與導入集群間的延時,特別是當其它client在同一時候使用其它工具向同一張表導入數據。


注意:

bulk load的completebulkload步驟。就是簡單的將importtsv或HFileOutputFormat的結果文件導入到某張表中。使用類似下面命令

hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable

命令會非常快運行完畢。將/user/todd/myoutput下的HFile文件導入到mytable表中。注意:假設目標表不存在。工具會自己主動創建表。


三、生成HFile程序說明:

1、終於輸出結果。不管是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
2、終於輸出部分,Value類型是KeyValue 或Put。相應的Sorter各自是KeyValueSortReducer或PutSortReducer。
3、MR樣例中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat僅僅適合一次對單列族組織成HFile文件。


4、MR樣例中HFileOutputFormat.configureIncrementalLoad(job, table);自己主動對job進行配置。SimpleTotalOrderPartitioner是須要先對key進行總體排序,然后划分到每個reduce中,保證每個reducer中的的key最小最大值區間范圍,是不會有交集的。由於入庫到HBase的時候,作為一個總體的Region,key是絕對有序的。
5、MR樣例中最后生成HFile存儲在HDFS上。輸出路徑下的子文件夾是各個列族。假設對HFile進行入庫HBase。相當於move HFile到HBase的Region中。HFile子文件夾的列族內容沒有了。


四、演示樣例

1、創建表

create 'hfiletable','fm1','fm2'

2、准備原始數據

key1	fm1:col1	value1
key1	fm1:col2	value2
key1	fm2:col1	value3
key4	fm1:col1	value4

3、導入HBase MR

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class BulkLoadJob {
    static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class);

    public static class BulkLoadMap extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            String[] valueStrSplit = value.toString().split("\t");
            String hkey = valueStrSplit[0];
            String family = valueStrSplit[1].split(":")[0];
            String column = valueStrSplit[1].split(":")[1];
            String hvalue = valueStrSplit[2];
            final byte[] rowKey = Bytes.toBytes(hkey);
            final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey);
            Put HPut = new Put(rowKey);
            byte[] cell = Bytes.toBytes(hvalue);
            HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell);
            context.write(HKey, HPut);

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String inputPath = args[0];
        String outputPath = args[1];
        HTable hTable = null;
        try {
            Job job = Job.getInstance(conf, "ExampleRead");
            job.setJarByClass(BulkLoadJob.class);
            job.setMapperClass(BulkLoadJob.BulkLoadMap.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            // speculation
            job.setSpeculativeExecution(false);
            job.setReduceSpeculativeExecution(false);
            // in/out format
			job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(HFileOutputFormat2.class);

            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, new Path(outputPath));

            hTable = new HTable(conf, args[2]);
            HFileOutputFormat2.configureIncrementalLoad(job, hTable);

            if (job.waitForCompletion(true)) {
                FsShell shell = new FsShell(conf);
                try {
                    shell.run(new String[]{"-chmod", "-R", "777", args[1]});
                } catch (Exception e) {
                    logger.error("Couldnt change the file permissions ", e);
                    throw new IOException(e);
                }
                //載入到hbase表
                LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
                loader.doBulkLoad(new Path(outputPath), hTable);
            } else {
                logger.error("loading failed.");
                System.exit(1);
            }

        } catch (IllegalArgumentException e) {
            e.printStackTrace();
        } finally {
            if (hTable != null) {
                hTable.close();
            }
        }
    }
}

4、查看數據

hbase(main):003:0> scan 'hfiletable'
ROW                                                   COLUMN+CELL                                                                                                                                                  
 key2                                                 column=fm1:col1, timestamp=1437794332921, value=value1                                                                                                       
 key2                                                 column=fm1:col2, timestamp=1437794332921, value=value2                                                                                                       
 key2                                                 column=fm2:col1, timestamp=1437794332921, value=value3                                                                                                       
 key3                                                 column=fm1:col1, timestamp=1437794332921, value=value4                                                                                                       
2 row(s) in 0.1910 seconds

五、總結

盡管importtsv工具使用與大多數場景,用戶有時希望自己編程生成數據,或以其它格式導入數據,比方importtsv須要在導入前確定每條數據column維度,一旦我們的數據的維度是依據數據內容本身的。importtsv就無法滿足需求。這時就須要對工具改造。能夠查看ImportTsv.java和HFileOutputFormat的javaDoc。

completebulkload相同能夠編程化實現,能夠查看LoadIncrementalHFiles類。








免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM