摘要
加載數據到HBase的方式有多種,通過HBase API導入或命令行導入或使用第三方(如sqoop)來導入或使用MR來批量導入(耗費磁盤I/O,容易在導入的過程使節點宕機),但是這些方式不是慢就是在導入的過程的占用Region資料導致效率低下,今天要講的就是利用HBase在HDFS存儲原理及MapReduce的特性來快速導入海量的數據
HBase數據在HDFS下是如何存儲的?
HBase中每張Table在根目錄(/HBase)下用一個文件夾存儲,Table名為文件夾名,在Table文件夾下每個Region同樣用一個文件夾存儲,每個Region文件夾下的每個列族也用文件夾存儲,而每個列族下存儲的就是一些HFile文件,HFile就是HBase數據在HFDS下存儲格式,其整體目錄結構如下:
/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>
HBase數據寫路徑


(圖來自Cloudera)
在put數據時會先將數據的更新操作信息和數據信息寫入WAL,在寫入到WAL后,數據就會被放到MemStore中,當MemStore滿后數據就會被flush到磁盤(即形成HFile文件),在這過程涉及到的flush,split,compaction等操作都容易造成節點不穩定,數據導入慢,耗費資源等問題,在海量數據的導入過程極大的消耗了系統性能,
避免
這些問題最好的方法就是使用BlukLoad的方式來加載數據到HBase中。
原理
利用HBase數據按照HFile格式存儲在HDFS的原理,使用Mapreduce直接生成HFile格式文件后,RegionServers再將HFile文件移動到相應的Region目錄下
其流程如下圖:


(圖來自Cloudera)
導入過程
1.使用MapReduce生成HFile文件
GenerateHFile類
public class GenerateHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split("\t"); String ROWKEY = items[1] + items[2] + items[3]; ImmutableBytesWritable rowkey = new ImmutableBytesWritable(ROWKEY.getBytes()); Put put = new Put(ROWKEY.getBytes()); //ROWKEY put.addColumn("INFO".getBytes(), "URL".getBytes(), items[0].getBytes()); put.addColumn("INFO".getBytes(), "SP".getBytes(), items[1].getBytes()); //出發點 put.addColumn("INFO".getBytes(), "EP".getBytes(), items[2].getBytes()); //目的地 put.addColumn("INFO".getBytes(), "ST".getBytes(), items[3].getBytes()); //出發時間 put.addColumn("INFO".getBytes(), "PRICE".getBytes(), Bytes.toBytes(Integer.valueOf(items[4]))); //價格 put.addColumn("INFO".getBytes(), "TRAFFIC".getBytes(), items[5].getBytes());//交通方式 put.addColumn("INFO".getBytes(), "HOTEL".getBytes(), items[6].getBytes()); //酒店 context.write(rowkey, put); } }
GenerateHFileMain類
public class GenerateHFileMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { final String INPUT_PATH= "hdfs://master:9000/INFO/Input"; final String OUTPUT_PATH= "hdfs://master:9000/HFILE/Output"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf("TRAVEL")); Job job=Job.getInstance(conf); job.getConfiguration().set("mapred.jar", "/home/hadoop/TravelProject/out/artifacts/Travel/Travel.jar"); //預先將程序打包再將jar分發到集群上 job.setJarByClass(GenerateHFileMain.class); job.setMapperClass(GenerateHFile.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setOutputFormatClass(HFileOutputFormat2.class); HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("TRAVEL"))) FileInputFormat.addInputPath(job,new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true)?0:1); }
注意
1.Mapper的輸出Key類型必須是包含Rowkey的ImmutableBytesWritable格式,Value類型必須為KeyValue或Put類型,當導入的數據有多列時使用Put,只有一個列時使用KeyValue
2.job.setMapOutPutValueClass的值決定了job.setReduceClass的值,這里Reduce主要起到了對數據進行排序的作用,當job.setMapOutPutValueClass的值Put.class和KeyValue.class分別對應job.setReduceClass的PutSortReducer和KeyValueSortReducer
3.在創建表時對表進行預分區再結合MapReduce的並行計算機制能加快HFile文件的生成,如果對表進行了預分區(Region)就設置Reduce數等於分區數(Region)
4.在多列族的情況下需要進行多次的context.write
2.通過BlukLoad方式加載HFile文件
public class LoadIncrementalHFileToHBase { public static void main(String[] args) throws Exception { Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); Table table = connection.getTable(TableName.valueOf("TRAVEL")); LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf); load.doBulkLoad(new Path("hdfs://master:9000/HFILE/OutPut"), admin,table,connection.getRegionLocator(TableName.valueOf("TRAVEL"))); } }
由於BulkLoad是繞過了Write to WAL,Write to MemStore及Flush to disk的過程,所以並不能通過WAL來進行一些復制數據的操作
優點:
1.導入過程不占用Region資源
2.能快速導入海量的數據
3.節省內存
參考文章: