[How to] MapReduce on HBase ----- 簡單二級索引的實現


1.簡介

  MapReduce計算框架是二代hadoop的YARN一部分,能夠提供大數據量的平行批處理。MR只提供了基本的計算方法,之所以能夠使用在不用的數據格式上包括HBase表上是因為特定格式上的數據讀取和寫入都實現了各自的inputformat和outputformat,這樣MR就通過這兩個接口屏蔽了各個數據源的產異性,統一計算框架。本文主要介紹如何讓HBase表作為MR計算框架的輸入和輸出源,並通過實現一個簡歷二級索引的小例子來介紹。

 

2. HBase與MR關系

  HBase和MapReduce,這兩者並沒有直接關系,隸屬於不同的項目。這里講到的MapReduce on HBase是指利用HBase表做為MR計算框架的數據輸入源或者輸出源源,使得能夠利用MR的並行計算能力計算HBase的內部數據。

 

3. 運行環境

  之前所述,HBase和MapReduce沒有直接關系,所以在編程的時候我們需要分別引入MR和HBase包,在運行的時候也要做相關的設置讓HBase的包被MR感知到。

在運行HBase相關的MR任務的時候我們可以將HBase相關包和配置文件拷貝到Hadoop運行目錄中,如hbase-site.xml 拷貝到$HADOOP_HOME/conf再將HBase jars 拷貝到 $HADOOP_HOME/lib,但是並不推薦這樣的做法,因為一會污染hadoop的安裝環境,二還需要重啟hadoop才能起效。

  所以我們可以按如下的推薦做法來運行MR程序

$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar <your jar> <param .......>

   例如在我環境下我利用如下命令提交HBase自帶的樣例程序:

[hadoop@xufeng-3 lib]$ HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar hbase-server-1.0.0-cdh5.4.2.jar rowcounter usertable

  以上會將/opt/hadoop/hbase/bin/hbase classpath 下的所有文件拷貝的hdfs上以供后續程序運行時候引用,缺點就是可能只能在安裝有HBase環境的機器上執行。

 

4.簡單二級索引的實現

  下面以一個簡單的二級索引實現為例子講解HBase MR程序的編寫。

  需要注意的是現在HBase包存在兩套MR引用包,分別是org.apache.hadoop.hbase.mapredorg.apache.hadoop.hbase.mapreduce。稱之為舊API和新API。通常社區推薦的是新API,舊API后續版本有被淘汰的計划。

  所謂基於框架的代碼實現之前的博客也有介紹,簡單來說就是:

  1. 書寫固定的框架代碼

  2. 在框架中填充自身業務的代碼邏輯

  固定框架代碼

    一般的MR程序中Mapper方法是必須的,Reducer方法就根據業務需要吧。

    下屬代碼中TableMapReduceUtil為了我們提供了極大的便利,她在內部為我們指定了TableInputFormat和TableOutputFormat等一些列的工作。

/**
 * 利用MR程序簡歷HBase二級索引
 * @author newbeefeng
 *
 */
public class YourAction {

    /**
     * 運行方法
     * @param args
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 創建配置
        Configuration conf = HBaseConfiguration.create();
        
        // 創建job
        Job job = Job.getInstance(conf);
        
        // 設定job名稱
        job.setJobName("名稱");
        
        // 設定任務類(當前類)
        job.setJarByClass(YourAction.class);
        
        // 掃描
        Scan scan = new Scan();
        // 設定caching
        scan.setCaching(1000);
        // 對於mr程序來說必須設定為false
        scan.setCacheBlocks(false);
        
        // 利用TableMapReduceUtil初始化mapper
        TableMapReduceUtil.initTableMapperJob("數據源表名", scan, YourMapper.class, Text.class, Text.class, job);
           
        // 利用TableMapReduceUtil初始化reducer
        TableMapReduceUtil.initTableReducerJob("數據輸出表名", YourReducer.class, job);
        
        // 提交並等待任務運行完畢
        boolean b = job.waitForCompletion(true);
        if (!b) {
          throw new IOException("error with job!");
        }
        
    }
    
}

/**
 * 實現具體的mapper類,這個類定義是必須的,因為mr任務可以沒有reducer但是一定要有mapper
 * 
 * 此類繼承TableMapper,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可
 * 
 * 
 * @author newbeefeng
 *
 */
class YourMapper extends TableMapper<Text,Text>
{

    // 實現具體map業務邏輯
    @Override
    protected void map(ImmutableBytesWritable key, Result value,
            Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
            throws IOException, InterruptedException {
    }

}

/**
 * 實現具體的reducer類
 * 
 * 此類繼承TableReducer,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可
 * 
 * 
 * @author newbeefeng
 *
 */
class YourReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
{

    // 實現具體mreduce業務邏輯
    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
            throws IOException, InterruptedException {
   
    }
    
}

 

  基於框架代碼的簡單二級索引的業務實現

/**
 * 利用MR程序簡歷HBase二級索引
 * @author newbeefeng
 *
 */
public class BatchCreateSecondIndex {

    /**
     * 運行方法
     * @param args
     * @throws IOException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 創建配置
        Configuration conf = HBaseConfiguration.create();
        
        // 創建job
        Job job = Job.getInstance(conf);
        
        // 設定job名稱
        job.setJobName("mapreduce on HBase for create second index!");
        
        // 設定任務類(當前類)
        job.setJarByClass(BatchCreateSecondIndex.class);
        
        // 掃描
        Scan scan = new Scan();
        // 設定caching
        scan.setCaching(1000);
        // 對於mr程序來說必須設定為false
        scan.setCacheBlocks(false);
        
        // 利用TableMapReduceUtil初始化mapper
        TableMapReduceUtil.initTableMapperJob("mr_secondindex_resouce", scan, IndexMapper.class, Text.class, Text.class, job);
           
        // 利用TableMapReduceUtil初始化reducer
        TableMapReduceUtil.initTableReducerJob("mr_secondindex_result", IndexReducer.class, job);
        
        // 提交並等待任務運行完畢
        boolean b = job.waitForCompletion(true);
        if (!b) {
          throw new IOException("error with job!");
        }
        
    }
    
}

/**
 * 實現具體的mapper類,這個類定義是必須的,因為mr任務可以沒有reducer但是一定要有mapper
 * 
 * 此類繼承TableMapper,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可
 * 
 * 
 * @author newbeefeng
 *
 */
class IndexMapper extends TableMapper<Text,Text>
{

    // 實現具體map業務邏輯
    @Override
    protected void map(ImmutableBytesWritable key, Result value,
            Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
            throws IOException, InterruptedException {
        Text k = new Text(Bytes.toString(key.get()));
        Text v = new Text(Bytes.toString(value.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))));
        // 這里其實是直接將每行數據給了reduce,不做任何處理,其實這個二級索引完全可以在map階段完成全部工作
        // 但是為了演示需要,還是寫了reduce
        System.out.println("k = " + k);
        System.out.println("v = " + v);
        context.write(k, v);
    }

}

/**
 * 實現具體的reducer類
 * 
 * 此類繼承TableReducer,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可
 * 
 * 
 * @author newbeefeng
 *
 */
class IndexReducer extends TableReducer<Text, Text, ImmutableBytesWritable>
{

    // 實現具體mreduce業務邏輯
    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)
            throws IOException, InterruptedException {
        Text value = null;
        
        // 根據map邏輯。values中也只會有一個數據
        for(Text text : values)
        {
            value = text;
        }
        
        // 構造put將數據寫入當job中指定的表中
        Put put = new Put(Bytes.toBytes(key.toString() + "|" + value.toString()));
        put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(value.toString()));
        System.out.println(put);
        
        // 執行寫入
        context.write(null, put);    
    }
    
}

 

5. 測試

  將上述工程打包后,放入HBase環境機器上執行:

HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar Test-MapreduceOnHBase.jar cn.com.newbee.feng.mr.BatchCreateSecondIndex

  測試結果:現在索引表中value列已經在rowkey中:

hbase(main):002:0> scan 'mr_secondindex_resouce'
ROW                                            COLUMN+CELL                                                                                                                           
 lisi                                          column=f:age, timestamp=1469887264781, value=25                                                                                       
 wangwu                                        column=f:age, timestamp=1469887270347, value=30                                                                                       
 zhangsan                                      column=f:age, timestamp=1469887260046, value=20                                                                                       
 zhaoliu                                       column=f:age, timestamp=1469887275702, value=35                                                                                       
4 row(s) in 0.3490 seconds

hbase(main):003:0> scan 'mr_secondindex_result'
ROW                                            COLUMN+CELL                                                                                                                           
 lisi|25                                       column=f:age, timestamp=1469890284944, value=25                                                                                       
 wangwu|30                                     column=f:age, timestamp=1469890284944, value=30                                                                                       
 zhangsan|20                                   column=f:age, timestamp=1469890284944, value=20                                                                                       
 zhaoliu|35                                    column=f:age, timestamp=1469890284944, value=35                                                                                       
4 row(s) in 0.0280 seconds

 

  

6. 總結

  MapReduce on HBase 內部其實還是使用了HBase客戶端插入的方式將數據在MAP階段或者在reduce階段將數據通過API插入到目標表中。HBase為了配合MR計算框架實現了TableInputFormat和TableOutputFormat。並為開發者提供了便利的API去操作如TableMapReduceUtil以及TableMapper和TableReducer等,用戶只要將注意力集中在具體的map和reduce業務上即可。

 

7.參考:

    https://hbase.apache.org/book.html#mapreduce

8.代碼:

  下載地址:https://github.com/xufeng79x/MapreduceOnHBaseTest

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM