[原創]HBase學習筆記(4)- 數據導入


需要分別從Oracle和文本文件往HBase中導入數據,這里介紹幾種數據導入方案。

1.使用importTSV導入HBase

importTSV支持增量導入。新數據插入,已存在數據則修改。

1.1.首先將待導入文本test_import.txt放到hdfs集群

文本格式如下(從網上找的虛擬話單數據)。逗號分隔,共13個字段,其中第1個字段作為rowkey。

1,12026546272,2013/10/19,20:52,33分18秒,被叫,13727310234,北京市,省際,0,32.28,0.4,全球通商旅88套餐
2,12026546272,2013/10/19,20:23,33分18秒,被叫,13727310234,北京市,省際,0,32.28,0.4,全球通商旅88套餐
3,16072996404,2013/10/19,20:52,10分52秒,主叫,19271253211,北京市,省際,0,2.8,1.9,全球通商旅88套餐
4,10023895821,2013/10/19,20:52,09分20秒,被叫,15115468122,綿陽市,省內,0,45.91,5.26,全球通商旅88套餐
5,13381653644,2013/10/19,20:53,06分00秒,被叫,10991482287,北京市,省際,0,54.79,7.16,全球通商旅88套餐
6,18695195919,2013/10/19,21:37,27分00秒,主叫,14858652217,綿陽市,省內,0,36.27,6.68,全球通商旅88套餐
7,11396010469,2013/10/19,21:37,27分02秒,主叫,12939968466,綿陽市,省內,0,65.63,4.45,全球通商旅88套餐
8,15109754362,2013/10/19,21:37,05分00秒,被叫,14240771580,綿陽市,省內,0,66.86,5.75,全球通商旅88套餐
9,13845944798,2013/10/19,21:37,13分50秒,被叫,13648619896,廣州市,省際,0,60.71,3.39,全球通商旅88套餐
10,17883953443,2013/10/19,21:38,37分54秒,被叫,10110778698,廣州市,省際,0,55.14,1.45,全球通商旅88套餐
11,19643495044,2013/10/19,21:38,49分34秒,主叫,14581482419,廣州市,省際,0,16.84,1.36,全球通商旅88套餐

1.2.在HBase中創建表:create ‘test_import’, ‘cf’

1.3.使用importTSV導入

執行命令:

$hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 test_import /test_import.txt

其中:

-Dimporttsv.separator指定分隔符,只支持單字節分隔符。

-Dimporttsv.columns指定導入的列。HBASE_ROW_KEY是關鍵字,導入數據時必須指定rowkey。

其它可選參數:

執行后自動提交MapReduce任務進行導入:

1.4.用scan命令查看hbase中test_import表的內容

一共11條記錄。

1.5.使用get查看記錄

 

2.使用importTSV+bulkload導入HBase

先使用importTSV生成HFile文件,再使用bulkload導入HBase

這種方式對RegionServer更友好一些,加載數據幾乎不占用RegionServer的計算資源,只是在HDFS上移動HFile文件,然后通過HMaster將該RegionServer的一個或多個Region上線。

2.1.生成HFile文件

使用importTSV生成HFile文件。與上一中方法略有不同的是,在importTSV執行時指定-Dimporttsv.bulk.output參數,則是生成HFile文件到指定文件,而不會直接導入HBase表。

$hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator=","  -Dimporttsv.columns=HBASE_ROW_KEY,cf:field1,cf:field2,cf:field3,cf:field4,cf:field5,cf:field6,cf:field7,cf:field8,cf:field9,cf:field10,cf:field11,cf:field12 -Dimporttsv.bulk.output=/test_import_outputdir/ test_import /test_import.txt

查看/test_import_outputdir/數據:

2.2.導入HBase表

使用bulkload導入HBase表。執行速度非常快。

執行命令:

$hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /test_import_outputdir/ test_import 

2.3.使用get查看記錄

 

3.支持多字節分隔符導入HBase

importTSV只支持單字節分隔符,不支持多字節分隔符,比如“|@|”如果要實現多字節分隔符,需要自己編寫MapReduce作業生成HFile文件,或者導入HBase。

3.1.數據格式

待導入數據如下。|@|作為分隔符,第1個字段是主鍵。

1|@|12026546272|@|2013/10/19|@|20:52|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省際|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
2|@|12026546272|@|2013/10/19|@|20:23|@|33分18秒|@|被叫|@|13727310234|@|北京市|@|省際|@|0|@|32.28|@|0.4|@|全球通商旅88套餐
......
......
...... 

3.2.編寫Mapper生成HFile

我們只要使用1個Mapper生成HFile即可,不需要額外寫Reducer。示例代碼如下。

public class HfileGenMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

    protected void map(LongWritable key, Text value,
                       Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context)
            throws IOException, InterruptedException {
        // 第一個字段作為rowkey
        ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                value.toString().split("\\|@\\|")[0].getBytes());
        List<KeyValue> list = createKeyValue(value.toString());
        Iterator<KeyValue> itor = list.iterator();
        while (itor.hasNext()) {
            KeyValue kv = itor.next();
            if (kv != null) {
                context.write(rowkey, kv);
            }
        }
    }

    // 解析一行記錄,得到多個KeyValue對象。
    private List<KeyValue> createKeyValue(String line) {
        List<KeyValue> list = new ArrayList<KeyValue>();
        String[] fields = line.split("\\|@\\|");
        String rowkey = fields[0];
        String columnFamily = "cf";
        for (int i = 1; i < fields.length; i++) {
            String qualifyName = "field" + String.valueOf(i);
            String value = fields[i];
            KeyValue kv = new KeyValue(rowkey.getBytes(), columnFamily.getBytes(),
                    qualifyName.getBytes(), System.currentTimeMillis(), value.getBytes());
            list.add(kv);
        }
        return list;
    }
}

3.3.創建作業實例

創建job實例,填寫相關配置。指定輸入輸出數據,設置Mapper和Reducer,並提交作業。

public class BulkLoadHFileJob extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int status = ToolRunner.run(new BulkLoadHFileJob(), args);
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        this.setConf(HBaseConfiguration.create(this.getConf()));
        getConf().set("hbase.zookeeper.property.clientPort", "2181");
        getConf().set("hbase.zookeeper.quorum", "W122PC04VM07,W122PC05VM07,W122PC06VM07");
        String inputPath = "hdfs://cluster1/test_import2.txt";
        String outputPath = "hdfs://cluster1/test_import2_outputdir";

        Job job = Job.getInstance(getConf(), "Hdfs2HFile test");
        job.setJarByClass(BulkLoadHFileJob.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(HfileGenMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        Connection connection = ConnectionFactory.createConnection(getConf());
        TableName tableName = TableName.valueOf("table_test");
        HFileOutputFormat2.configureIncrementalLoad(job, connection.getTable(tableName), connection.getRegionLocator(tableName));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

3.4.提交作業

使用hadoop客戶端提交作業。

hadoop jar ./hbasetest.jar com.hbase.test.hdfs2hbase.BulkLoadHFileJob

3.5.導入HBase

使用bulkload將生成的HFile導入HBase,速度非常快。具體操作步驟參考2.2節。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM