1.簡介
MapReduce計算框架是二代hadoop的YARN一部分,能夠提供大數據量的平行批處理。MR只提供了基本的計算方法,之所以能夠使用在不用的數據格式上包括HBase表上是因為特定格式上的數據讀取和寫入都實現了各自的inputformat和outputformat,這樣MR就通過這兩個接口屏蔽了各個數據源的產異性,統一計算框架。本文主要介紹如何讓HBase表作為MR計算框架的輸入和輸出源,並通過實現一個簡歷二級索引的小例子來介紹。
2. HBase與MR關系
HBase和MapReduce,這兩者並沒有直接關系,隸屬於不同的項目。這里講到的MapReduce on HBase是指利用HBase表做為MR計算框架的數據輸入源或者輸出源源,使得能夠利用MR的並行計算能力計算HBase的內部數據。
3. 運行環境
之前所述,HBase和MapReduce沒有直接關系,所以在編程的時候我們需要分別引入MR和HBase包,在運行的時候也要做相關的設置讓HBase的包被MR感知到。
在運行HBase相關的MR任務的時候我們可以將HBase相關包和配置文件拷貝到Hadoop運行目錄中,如hbase-site.xml 拷貝到$HADOOP_HOME/conf再將HBase jars 拷貝到 $HADOOP_HOME/lib,但是並不推薦這樣的做法,因為一會污染hadoop的安裝環境,二還需要重啟hadoop才能起效。
所以我們可以按如下的推薦做法來運行MR程序
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar <your jar> <param .......>
例如在我環境下我利用如下命令提交HBase自帶的樣例程序:
[hadoop@xufeng-3 lib]$ HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar hbase-server-1.0.0-cdh5.4.2.jar rowcounter usertable
以上會將/opt/hadoop/hbase/bin/hbase classpath 下的所有文件拷貝的hdfs上以供后續程序運行時候引用,缺點就是可能只能在安裝有HBase環境的機器上執行。
4.簡單二級索引的實現
下面以一個簡單的二級索引實現為例子講解HBase MR程序的編寫。
需要注意的是現在HBase包存在兩套MR引用包,分別是org.apache.hadoop.hbase.mapred和org.apache.hadoop.hbase.mapreduce。稱之為舊API和新API。通常社區推薦的是新API,舊API后續版本有被淘汰的計划。
所謂基於框架的代碼實現之前的博客也有介紹,簡單來說就是:
1. 書寫固定的框架代碼
2. 在框架中填充自身業務的代碼邏輯
固定框架代碼
一般的MR程序中Mapper方法是必須的,Reducer方法就根據業務需要吧。
下屬代碼中TableMapReduceUtil為了我們提供了極大的便利,她在內部為我們指定了TableInputFormat和TableOutputFormat等一些列的工作。
/** * 利用MR程序簡歷HBase二級索引 * @author newbeefeng * */ public class YourAction { /** * 運行方法 * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 創建配置 Configuration conf = HBaseConfiguration.create(); // 創建job Job job = Job.getInstance(conf); // 設定job名稱 job.setJobName("名稱"); // 設定任務類(當前類) job.setJarByClass(YourAction.class); // 掃描 Scan scan = new Scan(); // 設定caching scan.setCaching(1000); // 對於mr程序來說必須設定為false scan.setCacheBlocks(false); // 利用TableMapReduceUtil初始化mapper TableMapReduceUtil.initTableMapperJob("數據源表名", scan, YourMapper.class, Text.class, Text.class, job); // 利用TableMapReduceUtil初始化reducer TableMapReduceUtil.initTableReducerJob("數據輸出表名", YourReducer.class, job); // 提交並等待任務運行完畢 boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } } /** * 實現具體的mapper類,這個類定義是必須的,因為mr任務可以沒有reducer但是一定要有mapper * * 此類繼承TableMapper,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可 * * * @author newbeefeng * */ class YourMapper extends TableMapper<Text,Text> { // 實現具體map業務邏輯 @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { } } /** * 實現具體的reducer類 * * 此類繼承TableReducer,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可 * * * @author newbeefeng * */ class YourReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { // 實現具體mreduce業務邏輯 @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { } }
基於框架代碼的簡單二級索引的業務實現
/** * 利用MR程序簡歷HBase二級索引 * @author newbeefeng * */ public class BatchCreateSecondIndex { /** * 運行方法 * @param args * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 創建配置 Configuration conf = HBaseConfiguration.create(); // 創建job Job job = Job.getInstance(conf); // 設定job名稱 job.setJobName("mapreduce on HBase for create second index!"); // 設定任務類(當前類) job.setJarByClass(BatchCreateSecondIndex.class); // 掃描 Scan scan = new Scan(); // 設定caching scan.setCaching(1000); // 對於mr程序來說必須設定為false scan.setCacheBlocks(false); // 利用TableMapReduceUtil初始化mapper TableMapReduceUtil.initTableMapperJob("mr_secondindex_resouce", scan, IndexMapper.class, Text.class, Text.class, job); // 利用TableMapReduceUtil初始化reducer TableMapReduceUtil.initTableReducerJob("mr_secondindex_result", IndexReducer.class, job); // 提交並等待任務運行完畢 boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } } /** * 實現具體的mapper類,這個類定義是必須的,因為mr任務可以沒有reducer但是一定要有mapper * * 此類繼承TableMapper,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可 * * * @author newbeefeng * */ class IndexMapper extends TableMapper<Text,Text> { // 實現具體map業務邏輯 @Override protected void map(ImmutableBytesWritable key, Result value, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException { Text k = new Text(Bytes.toString(key.get())); Text v = new Text(Bytes.toString(value.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))); // 這里其實是直接將每行數據給了reduce,不做任何處理,其實這個二級索引完全可以在map階段完成全部工作 // 但是為了演示需要,還是寫了reduce System.out.println("k = " + k); System.out.println("v = " + v); context.write(k, v); } } /** * 實現具體的reducer類 * * 此類繼承TableReducer,此抽象類幫助我們實現了基本默認實現,用戶只要關心具體的業務即可 * * * @author newbeefeng * */ class IndexReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { // 實現具體mreduce業務邏輯 @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { Text value = null; // 根據map邏輯。values中也只會有一個數據 for(Text text : values) { value = text; } // 構造put將數據寫入當job中指定的表中 Put put = new Put(Bytes.toBytes(key.toString() + "|" + value.toString())); put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(value.toString())); System.out.println(put); // 執行寫入 context.write(null, put); } }
5. 測試
將上述工程打包后,放入HBase環境機器上執行:
HADOOP_CLASSPATH=`/opt/hadoop/hbase/bin/hbase classpath` hadoop jar Test-MapreduceOnHBase.jar cn.com.newbee.feng.mr.BatchCreateSecondIndex
測試結果:現在索引表中value列已經在rowkey中:
hbase(main):002:0> scan 'mr_secondindex_resouce' ROW COLUMN+CELL lisi column=f:age, timestamp=1469887264781, value=25 wangwu column=f:age, timestamp=1469887270347, value=30 zhangsan column=f:age, timestamp=1469887260046, value=20 zhaoliu column=f:age, timestamp=1469887275702, value=35 4 row(s) in 0.3490 seconds hbase(main):003:0> scan 'mr_secondindex_result' ROW COLUMN+CELL lisi|25 column=f:age, timestamp=1469890284944, value=25 wangwu|30 column=f:age, timestamp=1469890284944, value=30 zhangsan|20 column=f:age, timestamp=1469890284944, value=20 zhaoliu|35 column=f:age, timestamp=1469890284944, value=35 4 row(s) in 0.0280 seconds
6. 總結
MapReduce on HBase 內部其實還是使用了HBase客戶端插入的方式將數據在MAP階段或者在reduce階段將數據通過API插入到目標表中。HBase為了配合MR計算框架實現了TableInputFormat和TableOutputFormat。並為開發者提供了便利的API去操作如TableMapReduceUtil以及TableMapper和TableReducer等,用戶只要將注意力集中在具體的map和reduce業務上即可。
7.參考:
https://hbase.apache.org/book.html#mapreduce
8.代碼:
下載地址:https://github.com/xufeng79x/MapreduceOnHBaseTest