HBase BulkLoad批量寫入數據實戰


1.概述

在進行數據傳輸中,批量加載數據到HBase集群有多種方式,比如通過HBase API進行批量寫入數據、使用Sqoop工具批量導數到HBase集群、使用MapReduce批量導入等。這些方式,在導入數據的過程中,如果數據量過大,可能耗時會比較嚴重或者占用HBase集群資源較多(如磁盤IO、HBase Handler數等)。今天這篇博客筆者將為大家分享使用HBase BulkLoad的方式來進行海量數據批量寫入到HBase集群。

2.內容

在使用BulkLoad之前,我們先來了解一下HBase的存儲機制。HBase存儲數據其底層使用的是HDFS來作為存儲介質,HBase的每一張表對應的HDFS目錄上的一個文件夾,文件夾名以HBase表進行命名(如果沒有使用命名空間,則默認在default目錄下),在表文件夾下存放在若干個Region命名的文件夾,Region文件夾中的每個列簇也是用文件夾進行存儲的,每個列簇中存儲就是實際的數據,以HFile的形式存在。路徑格式如下:

/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>

2.1 實現原理

按照HBase存儲數據按照HFile格式存儲在HDFS的原理,使用MapReduce直接生成HFile格式的數據文件,然后在通過RegionServer將HFile數據文件移動到相應的Region上去。流程如下圖所示:

2.2. 生成HFile文件

HFile文件的生成,可以使用MapReduce來進行實現,將數據源准備好,上傳到HDFS進行存儲,然后在程序中讀取HDFS上的數據源,進行自定義封裝,組裝RowKey,然后將封裝后的數據在回寫到HDFS上,以HFile的形式存儲到HDFS指定的目錄中。實現代碼如下:

 /**
  * Read DataSource from hdfs & Gemerator hfile.
  * 
  * @author smartloli.
  *
  *         Created by Aug 19, 2018
  */public class GemeratorHFile2 {
     static class HFileImportMapper2 extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

        protected final String CF_KQ = "cf";

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            System.out.println("line : " + line);
            String[] datas = line.split(" ");
            String row = new Date().getTime() + "_" + datas[1];
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row));
            KeyValue kv = new KeyValue(Bytes.toBytes(row), this.CF_KQ.getBytes(), datas[1].getBytes(), datas[2].getBytes());
            context.write(rowkey, kv);
        }
    }

    public static void main(String[] args) {
        if (args.length != 1) {
            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
            return;
        }
        Configuration conf = new Configuration();
        conf.addResource(new Path(args[0]));
        conf.set("hbase.fs.tmp.dir", "partitions_" + UUID.randomUUID());
        String tableName = "person";
        String input = "hdfs://nna:9000/tmp/person.txt";
        String output = "hdfs://nna:9000/tmp/pres";
        System.out.println("table : " + tableName);
        HTable table;
        try {
            try {
                FileSystem fs = FileSystem.get(URI.create(output), conf);
                fs.delete(new Path(output), true);
                fs.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            Connection conn = ConnectionFactory.createConnection(conf);
            table = (HTable) conn.getTable(TableName.valueOf(tableName));
            Job job = Job.getInstance(conf);
            job.setJobName("Generate HFile");

            job.setJarByClass(GemeratorHFile2.class);
            job.setInputFormatClass(TextInputFormat.class);
            job.setMapperClass(HFileImportMapper2.class);
            FileInputFormat.setInputPaths(job, input);
            FileOutputFormat.setOutputPath(job, new Path(output));

            HFileOutputFormat2.configureIncrementalLoad(job, table);
            try {
                job.waitForCompletion(true);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

在HDFS目錄/tmp/person.txt中,准備數據源如下:

11 smartloli 100
22 smartloli2 101
33 smartloli3 102

然后,將上述代碼編譯打包成jar,上傳到Hadoop集群進行執行,執行命令如下:

hadoop jar GemeratorHFile2.jar /data/soft/new/apps/hbaseapp/hbase-site.xml

如果在執行命令的過程中,出現找不到類的異常信息,可能是本地沒有加載HBase依賴JAR包,在當前用戶中配置如下環境變量信息:

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*:classpath

然后,執行source命令使配置的內容立即生生效。

2.3. 執行預覽

在成功提交任務后,Linux控制台會打印執行任務進度,也可以到YARN的資源監控界面查看執行進度,結果如下所示:

等待任務的執行,執行完成后,在對應HDFS路徑上會生成相應的HFile數據文件,如下圖所示:

2.4 使用BulkLoad導入到HBase

然后,在使用BulkLoad的方式將生成的HFile文件導入到HBase集群中,這里有2種方式。一種是寫代碼實現導入,另一種是使用HBase命令進行導入。

2.4.1 代碼實現導入

通過LoadIncrementalHFiles類來實現導入,具體代碼如下:

/**
 * Use BulkLoad inport hfile from hdfs to hbase.
 * 
 * @author smartloli.
 *
 * Created by Aug 19, 2018
 */public class BulkLoad2HBase {
 
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.out.println("<Usage>Please input hbase-site.xml path.</Usage>");
            return;
        }
        String output = "hdfs://cluster1/tmp/pres";
        Configuration conf = new Configuration();
        conf.addResource(new Path(args[0]));
        HTable table = new HTable(conf, "person");
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
        loader.doBulkLoad(new Path(output), table);
    }

}

執行上述代碼,運行結果如下:

BulkLoad源碼過程簡述

程序中調用了LoadIncrementalHFiles的doBulkLoad方法進行HFile的移動。其主要流程如下:
1、初始化一個線程池,設置線程的最大數量
2、根據參數獲取是否對HFile的格式進行驗證
3、初始化一個queue,然后遍歷MapReduce輸出的目錄下的所有HFIles文件,為每一個HFile包裝一個LoadQueueItem,並加入到queue中
4、檢查是否有非法的列簇名
5、遍歷隊列,嘗試將HFie加載到一個region中,如果失敗,它將返回需要重試的HFie列表。如果成功,它將返回一個空列表,整個過程是原子性的。
6、從RegionServer中獲取到Region的名稱后,檢查是否可以安全的使用BulkLoad。如果為False,則使用ProtobufUtil的bulkLoadHFile。否則將使用SecureBulkLoadClient的bulkLoadHFile,將HFile Load到HBase目錄下面。
7、如果HFile的BulkLoad失敗了,將會嘗試將失敗的HFile將重新移回原來的位置。

其中需要注意的有:
1、當HFile的數量極大時,檢查HFile的格式將會成為最耗時的階段。可以通過設置hbase.loadincremental.validate.hfile來決定是否對HFile的格式進行檢查(可見HBASE-13985)
2、BulkLoad階段中,采用Callable和Future實現並發,一但BulkLoad失敗,HFile需要重新排隊,然后重試。重試次數可以通過hbase.client.retries.number進行設置,HBase1.2.5中默認為31次。
3、BulkLoad過程結束后,會發現MapReduce輸出目錄下的HFile文件都被移走了,說明全部的HFile都導入成功。如果想要試驗的話,可以先備份一下,免得再跑一邊MapReduce。

Load階段為什么這么慢

1、在Load階段階段中,如果HFile文件過多,會觸發hBase的compact和split操作。因此BulkLoad只是繞過了數據Put到Memstore和MemStoreFlush這個階段。
2、當HFile的數量極大時,檢查HFile的格式將會成為最耗時的階段,可以設置不檢查。

Bulk load的使用還是需要看場景,對於股市數據來說,使用Bulk load的導入效率可能沒有直接寫來得更快,但是其不占用 Region 資源和大量的IO資源,基本上不影響其它業務的運行,還是可以忍受的。

2.4.2 使用HBase命令進行導入

先將生成好的HFile文件遷移到目標集群(即HBase集群所在的HDFS上),然后在使用HBase命令進行導入,執行命令如下:

# 先使用distcp遷移hfile
hadoop distcp -Dmapreduce.job.queuename=queue_1024_01 -update -skipcrccheck -m 10 /tmp/pres hdfs://nns:9000/tmp/pres

# 使用bulkload方式導入數據
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /tmp/pres person

最后,我們可以到指定的RegionServer節點上查看導入的日志信息,如下所示為導入成功的日志信息:

12018-08-19 16:30:34,969 INFO  [B.defaultRpcServer.handler=7,queue=1,port=16020] regionserver.HStore: Successfully loaded store file hdfs://cluster1/tmp/pres/cf/7b455535f660444695589edf509935e9 into store cf (new location: hdfs://cluster1/hbase/data/default/person/2d7483d4abd6d20acdf16533a3fdf18f/cf/d72c8846327d42e2a00780ac2facf95b_SeqId_4_)

 

2.5 驗證

使用BulkLoad方式導入數據后,可以進入到HBase集群,使用HBase Shell來查看數據是否導入成功,預覽結果如下:

3.總結

本篇博客為了演示實戰效果,將生成HFile文件和使用BulkLoad方式導入HFile到HBase集群的步驟進行了分解,實際情況中,可以將這兩個步驟合並為一個,實現自動化生成與HFile自動導入。如果在執行的過程中出現RpcRetryingCaller的異常,可以到對應RegionServer節點查看日志信息,這里面記錄了出現這種異常的詳細原因。

https://mp.weixin.qq.com/s/0Eej-xzBVq3_Vw1y4tA4Dw


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM