需要分別從Oracle和文本文件往HBase中導入數據,這里介紹幾種數據導入方案。
1.使用importTSV導入HBase
importTSV支持增量導入。新數據插入,已存在數據則修改。
1.1.首先將待導入文本test_import.txt放到hdfs集群
文本格式如下(從網上找的虛擬話單數據)。逗號分隔,共13個字段,其中第1個字段作為rowkey。
1,12026546272,2013/10/19,20:52,33分18秒,被叫,13727310234,北京市,省際,0,32.28,0.4,全球通商旅88套餐
2,12026546272,2013/10/19,20:23,33分18秒,被叫,13727310234,北京市,省際,0,32.28,0.4,全球通商旅88套餐
3,16072996404,2013/10/19,20:52,10分52秒,主叫,19271253211,北京市,省際,0,2.8,1.9,全球通商旅88套餐
4,10023895821,2013/10/19,20:52,09分20秒,被叫,15115468122,綿陽市,省內,0,45.91,5.26,全球通商旅88套餐
5,13381653644,2013/10/19,20:53,06分00秒,被叫,10991482287,北京市,省際,0,54.79,7.16,全球通商旅88套餐
6,18695195919,2013/10/19,21:37,27分00秒,主叫,14858652217,綿陽市,省內,0,36.27,6.68,全球通商旅88套餐
7,11396010469,2013/10/19,21:37,27分02秒,主叫,12939968466,綿陽市,省內,0,65.63,4.45,全球通商旅88套餐
8,15109754362,2013/10/19,21:37,05分00秒,被叫,14240771580,綿陽市,省內,0,66.86,5.75,全球通商旅88套餐
9,13845944798,2013/10/19,21:37,13分50秒,被叫,13648619896,廣州市,省際,0,60.71,3.39,全球通商旅88套餐
10,17883953443,2013/10/19,21:38,37分54秒,被叫,10110778698,廣州市,省際,0,55.14,1.45,全球通商旅88套餐
11,19643495044,2013/10/19,21:38,49分34秒,主叫,14581482419,廣州市,省際,0,16.84,1.36,全球通商旅88套餐
1.2.在HBase中創建表:create ‘test_import’, ‘cf’
1.3.使用importTSV導入
執行命令:
$hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 test_import /test_import.txt
其中:
-Dimporttsv.separator指定分隔符,只支持單字節分隔符。
-Dimporttsv.columns指定導入的列。HBASE_ROW_KEY是關鍵字,導入數據時必須指定rowkey。
其它可選參數:
執行后自動提交MapReduce任務進行導入:
1.4.用scan命令查看hbase中test_import表的內容
一共11條記錄。
1.5.使用get查看記錄
2.使用importTSV+bulkload導入HBase
先使用importTSV生成HFile文件,再使用bulkload導入HBase。
這種方式對RegionServer更友好一些,加載數據幾乎不占用RegionServer的計算資源,只是在HDFS上移動HFile文件,然后通過HMaster將該RegionServer的一個或多個Region上線。
2.1.生成HFile文件
使用importTSV生成HFile文件。與上一中方法略有不同的是,在importTSV執行時指定-Dimporttsv.bulk.output參數,則是生成HFile文件到指定文件,而不會直接導入HBase表。
$hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 -Dimporttsv.bulk.output=/test_import_outputdir/ test_import /test_import.txt
查看/test_import_outputdir/數據:
2.2.導入HBase表
使用bulkload導入HBase表。執行速度非常快。
執行命令:
$hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /test_import_outputdir/ test_import
2.3.使用get查看記錄
3.支持多字節分隔符導入HBase
importTSV只支持單字節分隔符,不支持多字節分隔符,比如“|@|”。如果要實現多字節分隔符,需要自己編寫MapReduce作業生成HFile文件,或者導入HBase。
3.1.數據格式
待導入數據如下。|@|作為分隔符,第1個字段是主鍵。
1|@|12026546272|@|2013/10/19|@|20:52|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省際|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
2|@|12026546272|@|2013/10/19|@|20:23|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省際|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
......
......
......
3.2.編寫Mapper生成HFile
我們只要使用1個Mapper生成HFile即可,不需要額外寫Reducer。示例代碼如下。
public class HfileGenMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context) throws IOException, InterruptedException { // 第一個字段作為rowkey ImmutableBytesWritable rowkey = new ImmutableBytesWritable( value.toString().split("\\|@\\|")[0].getBytes()); List<KeyValue> list = createKeyValue(value.toString()); Iterator<KeyValue> itor = list.iterator(); while (itor.hasNext()) { KeyValue kv = itor.next(); if (kv != null) { context.write(rowkey, kv); } } } // 解析一行記錄,得到多個KeyValue對象。 private List<KeyValue> createKeyValue(String line) { List<KeyValue> list = new ArrayList<KeyValue>(); String[] fields = line.split("\\|@\\|"); String rowkey = fields[0]; String columnFamily = "cf"; for (int i = 1; i < fields.length; i++) { String qualifyName = "field" + String.valueOf(i); String value = fields[i]; KeyValue kv = new KeyValue(rowkey.getBytes(), columnFamily.getBytes(), qualifyName.getBytes(), System.currentTimeMillis(), value.getBytes()); list.add(kv); } return list; } }
3.3.創建作業實例
創建job實例,填寫相關配置。指定輸入輸出數據,設置Mapper和Reducer,並提交作業。
public class BulkLoadHFileJob extends Configured implements Tool { public static void main(String[] args) throws Exception { int status = ToolRunner.run(new BulkLoadHFileJob(), args); System.exit(status); } @Override public int run(String[] args) throws Exception { this.setConf(HBaseConfiguration.create(this.getConf())); getConf().set("hbase.zookeeper.property.clientPort", "2181"); getConf().set("hbase.zookeeper.quorum", "W122PC04VM07,W122PC05VM07,W122PC06VM07"); String inputPath = "hdfs://cluster1/test_import2.txt"; String outputPath = "hdfs://cluster1/test_import2_outputdir"; Job job = Job.getInstance(getConf(), "Hdfs2HFile test"); job.setJarByClass(BulkLoadHFileJob.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(HfileGenMapper.class); job.setReducerClass(KeyValueSortReducer.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat2.class); job.setPartitionerClass(SimpleTotalOrderPartitioner.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); Connection connection = ConnectionFactory.createConnection(getConf()); TableName tableName = TableName.valueOf("table_test"); HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName)); return job.waitForCompletion(true) ? 0 : 1; } }
3.4.提交作業
使用hadoop客戶端提交作業。
hadoop jar ./hbasetest.jar com.hbase.test.hdfs2hbase.BulkLoadHFileJob
3.5.導入HBase
使用bulkload將生成的HFile導入HBase,速度非常快。具體操作步驟參考2.2節。