(三)HBase之Bulkload


三、課堂目標

1. 掌握hbase的客戶端API操作

2. 掌握hbase集成MapReduce

3. 掌握hbase集成hive

4. 掌握hbase表的rowkey設計

5. 掌握hbase表的熱點

6. 掌握hbase表的數據備份

7. 掌握hbase二級索引

四、知識要點

1. hbase客戶端API操作

  • 創建Maven工程,添加依賴
 <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

 

  • hbase表的增刪改查操作

具體操作詳細見==《hbase表的增刪改查操作.md》==文檔

1、初始化一個init方法

2、創建一個表

3、修改表屬性

4、put添加數據

5、get查詢單條數據

6、scan批量查詢數據

7、delete刪除表中的列數據

8、刪除表

9、過濾器的使用

  • 過濾器的類型很多,但是考科一分為兩大類--比較過濾器專用過濾器
  • 過濾器的作用是在服務端判斷數據是否滿足條件,然后只將滿足條件的數據返回給客戶端

 

9.1、hbase過濾器的比較運算符

LESS  <

LESS_OR_EQUAL <=

EQUAL =

NOT_EQUAL <>

GREATER_OR_EQUAL >=

GREATER >

9.2、hbase過濾器的比較器(指定比較機制)

BinaryComparator  按字節索引順序比較指定字節數組
BinaryPrefixComparator 跟前面相同,只是比較左端的數據是否相同
NullComparator 判斷給定的是否為空
BitComparator 按位比較
RegexStringComparator 提供一個正則的比較器,僅支持 EQUAL 和非EQUAL
SubstringComparator 判斷提供的子串是否出現在value中。

 

9.3、過濾器使用實戰

9.3.1、針對行鍵的前綴過濾器

  • PrefixFilter
public void testFilter1() throws Exception {

// 針對行鍵的前綴過濾器
  Filter pf = new PrefixFilter(Bytes.toBytes("liu"));//"liu".getBytes()
  testScan(pf);
}

     //定義一個方法,接受一個過濾器,返回結果數據
public void testScan(Filter filter) throws Exception {
        Table table = conn.getTable(TableName.valueOf("t_user_info"));

        Scan scan = new Scan();
        //設置過濾器
        scan.setFilter(filter);

        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> iter = scanner.iterator();
        //遍歷所有的Result對象,獲取結果
        while (iter.hasNext()) {
            Result result = iter.next();
            List<Cell> cells = result.listCells();
            for (Cell c : cells) {
                //獲取行鍵
                byte[] rowBytes = CellUtil.cloneRow(c);
                //獲取列族
                byte[] familyBytes = CellUtil.cloneFamily(c);
                //獲取列族下的列名稱
                byte[] qualifierBytes = CellUtil.cloneQualifier(c);
                //列字段的值
                byte[] valueBytes = CellUtil.cloneValue(c);

                System.out.print(new String(rowBytes)+" ");
                System.out.print(new String(familyBytes)+":");
                System.out.print(new String(qualifierBytes)+" ");
                System.out.println(new String(valueBytes));
            }
            System.out.println("-----------------------");
        }
        }

 

 

9.3.2 行過濾器

RowFilter

  public void testFilter2() throws Exception {

// 行過濾器  需要一個比較運算符和比較器
RowFilter rf1 = new RowFilter(CompareFilter.CompareOp.LESS, new        BinaryComparator(Bytes.toBytes("user002")));
         testScan(rf1);

         RowFilter rf2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("01"));//rowkey包含"01"子串的
         testScan(rf2);
} 

 

 

9.3.3 列族過濾器

FamilyFilter

  public void testFilter3() throws Exception {

//針對列族名的過濾器   返回結果中只會包含滿足條件的列族中的數據
        FamilyFilter ff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("base_info")));
        FamilyFilter ff2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));
        testScan(ff2);

}  

 

9.3.4 列名過濾器

QualifierFilter

public void testFilter4() throws Exception {

//針對列名的過濾器 返回結果中只會包含滿足條件的列的數據
    QualifierFilter qf1 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("password")));
    QualifierFilter qf2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("user")));
    testScan(qf2);
}

 

9.3.5 列值的過濾器

SingleColumnValueFilter

public void testFilter4() throws Exception {
    
//針對指定一個列的value的比較器來過濾
        ByteArrayComparable comparator1 = new RegexStringComparator("^zhang"); //以zhang開頭的
        ByteArrayComparable comparator2 = new SubstringComparator("si");       //包含"si"子串
        SingleColumnValueFilter scvf = new SingleColumnValueFilter("base_info".getBytes(), "username".getBytes(), CompareFilter.CompareOp.EQUAL, comparator2);
        testScan(scvf);

}

 

9.3.6 多個過濾器同時使用

public void testFilter4() throws Exception {
    
//多個過濾器同時使用   select * from t1 where id >10 and age <30
    
//構建一個列族的過濾器            
FamilyFilter cfff1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(Bytes.toBytes("base")));

//構建一個列的前綴過濾器
            ColumnPrefixFilter cfff2 = new ColumnPrefixFilter("password".getBytes());

//指定多個過濾器是否同時都要滿足條件
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);

            filterList.addFilter(cfff1);
            filterList.addFilter(cfff2);
            testScan(filterList);
}   

 

 

 

 

 

2 hbase集成MapReduce

HBase表中的數據最終都是存儲在HDFS上,HBase天生的支持MR的操作,我們可以通過MR直接處理HBase表中的數據,並且MR可以將處理后的結果直接存儲到HBase表中。

參考地址:http://hbase.apache.org/book.html#mapreduce

2.1 實戰一

需求

  • ==讀取hbase某張表中的數據,然后把結果寫入到另外一張hbase表==
package com.kaikeba;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

public class HBaseMR {

    public static class HBaseMapper extends TableMapper<Text,Put>{
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
             //獲取rowkey的字節數組
            byte[] bytes = key.get();
            String rowkey = Bytes.toString(bytes);
            //構建一個put對象
            Put put = new Put(bytes);
            //獲取一行中所有的cell對象
            Cell[] cells = value.rawCells();
            for (Cell cell : cells) {
                  // f1列族
                if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    // name列名
                     if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                          put.add(cell);
                     }
                     // age列名
                    if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                }
            }
            if(!put.isEmpty()){
              context.write(new Text(rowkey),put);
            }

        }
    }

     public  static  class HbaseReducer extends TableReducer<Text,Put,ImmutableBytesWritable>{
         @Override
         protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
             for (Put put : values) {
                 context.write(null,put);
             }
         }
     }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Scan scan = new Scan();

        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseMR.class);
        //使用TableMapReduceUtil 工具類來初始化我們的mapper
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf(args[0]),scan,HBaseMapper.class,Text.class,Put.class,job);

        //使用TableMapReduceUtil 工具類來初始化我們的reducer
        TableMapReduceUtil.initTableReducerJob(args[1],HbaseReducer.class,job);

        //設置reduce task個數
         job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

 

打成jar包提交到集群中運行

hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.HBaseMR t1 t2

2.2 實戰二

需求

  • ==讀取HDFS文件,把內容寫入到HBase表中==

hdfs上數據文件 user.txt

0001 xiaoming 20
0002 xiaowang 30
0003 xiaowu 40

代碼開發

package com.kaikeba;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;



public class Hdfs2Hbase {

    public static class HdfsMapper extends Mapper<LongWritable,Text,Text,NullWritable> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value,NullWritable.get());
        }
    }

    public static class HBASEReducer extends TableReducer<Text,NullWritable,ImmutableBytesWritable> {

        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            String[] split = key.toString().split(" ");
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
            put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());
            context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(Hdfs2Hbase.class);

        job.setInputFormatClass(TextInputFormat.class);
        //輸入文件路徑
        TextInputFormat.addInputPath(job,new Path(args[0]));
        job.setMapperClass(HdfsMapper.class);
        //map端的輸出的key value 類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //指定輸出到hbase的表名
        TableMapReduceUtil.initTableReducerJob(args[1],HBASEReducer.class,job);

        //設置reduce個數
        job.setNumReduceTasks(1);

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

 

創建hbase表 t3

create 't3','f1'

打成jar包提交到集群中運行

hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.Hdfs2Hbase /data/user.txt t3

2.3 實戰三

需求

  • ==通過bulkload的方式批量加載數據到HBase表中==

把hdfs上面的這個路徑/input/user.txt的數據文件,轉換成HFile格式,然后load到user這張表里面中

知識點描述

加載數據到HBase當中去的方式多種多樣,我們可以使用HBase的javaAPI或者使用sqoop將我們的數據寫入或者導入到HBase當中去,但是這些方式不是慢就是在導入的過程的占用Region資料導致效率低下,我們也可以通過MR的程序,將我們的數據直接轉換成HBase的最終存儲格式HFile,然后直接load數據到HBase當中去即可

HBase數據正常寫流程回顧

 

 

 bulkload方式的處理示意圖

 

 

 好處

(1).導入過程不占用Region資源
 
(2).能快速導入海量的數據
 
(3).節省內存

==1、開發生成HFile文件的代碼==

package com.kaikeba;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class HBaseLoad {

    public static class LoadMapper  extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split(" ");
            Put put = new Put(Bytes.toBytes(split[0]));
            put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
            put.addColumn("f1".getBytes(),"age".getBytes(), split[2].getBytes());
            context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            final String INPUT_PATH=  "hdfs://node1:9000/input";
            final String OUTPUT_PATH= "hdfs://node1:9000/output_HFile";
            Configuration conf = HBaseConfiguration.create();

            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf("t4"));
            Job job= Job.getInstance(conf);

            job.setJarByClass(HBaseLoad.class);
            job.setMapperClass(LoadMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            //指定輸出的類型HFileOutputFormat2
            job.setOutputFormatClass(HFileOutputFormat2.class);

         HFileOutputFormat2.configureIncrementalLoad(job,table,connection.getRegionLocator(TableName.valueOf("t4")));
            FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
            FileOutputFormat.setOutputPath(job,new Path(OUTPUT_PATH));
            System.exit(job.waitForCompletion(true)?0:1);


    }
}

 

==2、打成jar包提交到集群中運行==

hadoop jar hbase_java_api-1.0-SNAPSHOT.jar com.kaikeba.HBaseLoad

==3、觀察HDFS上輸出的結果==

 

 

 ==4、加載HFile文件到hbase表中==

 代碼加載

package com.kaikeba;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class LoadData {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
    //獲取數據庫連接
    Connection connection =  ConnectionFactory.createConnection(configuration);
    //獲取表的管理器對象
    Admin admin = connection.getAdmin();
    //獲取table對象
    TableName tableName = TableName.valueOf("t4");
    Table table = connection.getTable(tableName);
    //構建LoadIncrementalHFiles加載HFile文件
    LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
    load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin,table,connection.getRegionLocator(tableName));
 }
}

 

命令加載

命令格式

hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable

先將hbase的jar包添加到hadoop的classpath路徑下

export HBASE_HOME=/opt/bigdata/hbase
export HADOOP_HOME=/opt/bigdata/hadoop
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

命令加載演示

hadoop jar /opt/bigdata/hbase/lib/hbase-server-1.2.1.jar completebulkload /output_HFile t5 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM