一、概述
HBase官方提供了基於Mapreduce的批量數據導入工具:Bulk load和ImportTsv。關於Bulk load大家能夠看下我還有一篇博文。
通常HBase用戶會使用HBase API導數,可是假設一次性導入大批量數據,可能占用大量Regionserver資源,影響存儲在該Regionserver上其它表的查詢,本文將會從源代碼上解析ImportTsv數據導入工具。探究怎樣高效導入數據到HBase。
二、ImportTsv介紹
ImportTsv是Hbase提供的一個命令行工具。能夠將存儲在HDFS上的自己定義分隔符(默認\t)的數據文件。通過一條命令方便的導入到HBase表中,對於大數據量導入很有用,當中包括兩種方式將數據導入到HBase表中:
第一種是使用TableOutputformat在reduce中插入數據;
另外一種是先生成HFile格式的文件,再運行一個叫做CompleteBulkLoad的命令,將文件move到HBase表空間文件夾下。同一時候提供給client查詢。
三、源代碼解析
本文基於CDH5 HBase0.98.1,ImportTsv的入口類是org.apache.hadoop.hbase.mapreduce.ImportTsv
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); String columns[] = conf.getStrings(COLUMNS_CONF_KEY); if (hfileOutPath != null) { if (!admin.tableExists(tableName)) { LOG.warn(format("Table '%s' does not exist.", tableName)); // TODO: this is backwards. Instead of depending on the existence of a table, // create a sane splits file for HFileOutputFormat based on data sampling. createTable(admin, tableName, columns); } HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); job.setMapOutputKeyClass(ImmutableBytesWritable.class); if (mapperClass.equals(TsvImporterTextMapper.class)) { job.setMapOutputValueClass(Text.class); job.setReducerClass(TextSortReducer.class); } else { job.setMapOutputValueClass(Put.class); job.setCombinerClass(PutCombiner.class); } HFileOutputFormat.configureIncrementalLoad(job, table); } else { if (mapperClass.equals(TsvImporterTextMapper.class)) { usage(TsvImporterTextMapper.class.toString() + " should not be used for non bulkloading case. use " + TsvImporterMapper.class.toString() + " or custom mapper whose value type is Put."); System.exit(-1); } // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); }
從ImportTsv.createSubmittableJob方法中推斷參數BULK_OUTPUT_CONF_KEY開始。這步直接影響ImportTsv的Mapreduce作業終於以哪種方式入HBase庫
假設不為空而且用戶沒有自己定義Mapper實現類(參數importtsv.mapper.class)時,則使用PutSortReducer,當中會對Put排序,假設每行記錄有非常多column,則會占用Reducer大量的內存資源進行排序。
Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); job.setOutputFormatClass(TableOutputFormat.class);假設為空,調用TableMapReduceUtil.initTableReducerJob初始化TableOutputformat的Reducer輸出。此方式不須要使用Reducer,由於直接在mapper的Outputformat中會批量的調用Put API將數據提交到Regionserver上(相當於並行的運行HBase Put API)
四、實戰
1、使用TableOutputformat的Put API上傳數據,非bulk-loading
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c <tablename> <hdfs-inputdir>2、使用bulk-loading生成StoreFiles(HFile)
step1、生成Hfile
$ bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=a,b,c -Dimporttsv.bulk.output=hdfs://storefile-outputdir <tablename> <hdfs-data-inputdir>step2、完畢導入
$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
五、總結
在使用ImportTsv時。一定要注意參數importtsv.bulk.output的配置,通常來說使用Bulk output的方式對Regionserver來說更加友好一些,這樣的方式載入數據差點兒不占用Regionserver的計算資源。由於僅僅是在HDFS上移動了HFile文件,然后通知HMaster將該Regionserver的一個或多個region上線。