一、概述
HBase本身提供了非常多種數據導入的方式,通常有兩種經常使用方式:
1、使用HBase提供的TableOutputFormat,原理是通過一個Mapreduce作業將數據導入HBase
2、還有一種方式就是使用HBase原生Client API
這兩種方式因為須要頻繁的與數據所存儲的RegionServer通信。一次性入庫大量數據時,特別占用資源,所以都不是最有效的。了解過HBase底層原理的應該都知道,HBase在HDFS中是以HFile文件結構存儲的,一個比較高效便捷的方法就是使用 “Bulk Loading”方法直接生成HFile,即HBase提供的HFileOutputFormat類。
二、Bulk Load基本原理
Bulk Load處理由兩個主要步驟組成
1、准備數據文件
Bulk Load的第一步。會執行一個Mapreduce作業,當中使用到了HFileOutputFormat輸出HBase數據文件:StoreFile。
HFileOutputFormat的作用在於使得輸出的HFile文件能夠適應單個region。使用TotalOrderPartitioner類將map輸出結果分區到各個不同的key區間中,每一個key區間都相應着HBase表的region。
2、導入HBase表
第二步使用completebulkload工具將第一步的結果文件依次交給負責文件相應region的RegionServer,並將文件move到region在HDFS上的存儲文件夾中。一旦完畢。將數據開放給clients。
假設在bulk load准備導入或在准備導入與完畢導入的臨界點上發現region的邊界已經改變,completebulkload工具會自己主動split數據文件到新的邊界上。可是這個過程並非最佳實踐,所以用戶在使用時須要最小化准備導入與導入集群間的延時,特別是當其它client在同一時候使用其它工具向同一張表導入數據。
注意:
bulk load的completebulkload步驟。就是簡單的將importtsv或HFileOutputFormat的結果文件導入到某張表中。使用類似下面命令
hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable
命令會非常快運行完畢。將/user/todd/myoutput下的HFile文件導入到mytable表中。注意:假設目標表不存在。工具會自己主動創建表。
三、生成HFile程序說明:
1、終於輸出結果。不管是map還是reduce,輸出部分key和value的類型必須是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。2、終於輸出部分,Value類型是KeyValue 或Put。相應的Sorter各自是KeyValueSortReducer或PutSortReducer。
3、MR樣例中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat僅僅適合一次對單列族組織成HFile文件。
4、MR樣例中HFileOutputFormat.configureIncrementalLoad(job, table);自己主動對job進行配置。SimpleTotalOrderPartitioner是須要先對key進行總體排序,然后划分到每個reduce中,保證每個reducer中的的key最小最大值區間范圍,是不會有交集的。由於入庫到HBase的時候,作為一個總體的Region,key是絕對有序的。
5、MR樣例中最后生成HFile存儲在HDFS上。輸出路徑下的子文件夾是各個列族。假設對HFile進行入庫HBase。相當於move HFile到HBase的Region中。HFile子文件夾的列族內容沒有了。
四、演示樣例
1、創建表
create 'hfiletable','fm1','fm2'
2、准備原始數據
key1 fm1:col1 value1 key1 fm1:col2 value2 key1 fm2:col1 value3 key4 fm1:col1 value4
3、導入HBase MR
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class BulkLoadJob { static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class); public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] valueStrSplit = value.toString().split("\t"); String hkey = valueStrSplit[0]; String family = valueStrSplit[1].split(":")[0]; String column = valueStrSplit[1].split(":")[1]; String hvalue = valueStrSplit[2]; final byte[] rowKey = Bytes.toBytes(hkey); final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey); Put HPut = new Put(rowKey); byte[] cell = Bytes.toBytes(hvalue); HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell); context.write(HKey, HPut); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String inputPath = args[0]; String outputPath = args[1]; HTable hTable = null; try { Job job = Job.getInstance(conf, "ExampleRead"); job.setJarByClass(BulkLoadJob.class); job.setMapperClass(BulkLoadJob.BulkLoadMap.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); // speculation job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); // in/out format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(outputPath)); hTable = new HTable(conf, args[2]); HFileOutputFormat2.configureIncrementalLoad(job, hTable); if (job.waitForCompletion(true)) { FsShell shell = new FsShell(conf); try { shell.run(new String[]{"-chmod", "-R", "777", args[1]}); } catch (Exception e) { logger.error("Couldnt change the file permissions ", e); throw new IOException(e); } //載入到hbase表 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); loader.doBulkLoad(new Path(outputPath), hTable); } else { logger.error("loading failed."); System.exit(1); } } catch (IllegalArgumentException e) { e.printStackTrace(); } finally { if (hTable != null) { hTable.close(); } } } }
4、查看數據
hbase(main):003:0> scan 'hfiletable' ROW COLUMN+CELL key2 column=fm1:col1, timestamp=1437794332921, value=value1 key2 column=fm1:col2, timestamp=1437794332921, value=value2 key2 column=fm2:col1, timestamp=1437794332921, value=value3 key3 column=fm1:col1, timestamp=1437794332921, value=value4 2 row(s) in 0.1910 seconds
五、總結
盡管importtsv工具使用與大多數場景,用戶有時希望自己編程生成數據,或以其它格式導入數據,比方importtsv須要在導入前確定每條數據column維度,一旦我們的數據的維度是依據數據內容本身的。importtsv就無法滿足需求。這時就須要對工具改造。能夠查看ImportTsv.java和HFileOutputFormat的javaDoc。
completebulkload相同能夠編程化實現,能夠查看LoadIncrementalHFiles類。