Hbase 與mapreduce結合


Hbase和mapreduce結合

為什么需要用mapreduce去訪問hbase的數據?

——加快分析速度和擴展分析能力

Mapreduce訪問hbase數據作分析一定是在離線分析的場景下應用

 

 

案例1、HBase表數據的轉移

在Hadoop階段,我們編寫的MR任務分別進程了Mapper和Reducer兩個類,而在HBase中我們需要繼承的是TableMapper和TableReducer兩個類。

目標:將fruit表中的一部分數據,通過MR遷入到fruit_mr表中

Step1、構建ReadFruitMapper類,用於讀取fruit表中的數據

 


import java.io.IOException;

 

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

 

public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {

 

       @Override

       protected void map(ImmutableBytesWritable key, Result value, Context context)

       throws IOException, InterruptedException {

              //將fruit的name和color提取出來,相當於將每一行數據讀取出來放入到Put對象中。

              Put put = new Put(key.get());

              //遍歷添加column行

              for(Cell cell: value.rawCells()){

                     //添加/克隆列族:info

                     if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){

                            //添加/克隆列:name

                            if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){

                                   //將該列cell加入到put對象中

                                   put.add(cell);

                                   //添加/克隆列:color

                            }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){

                                   //向該列cell加入到put對象中

                                   put.add(cell);

                            }

                     }

              }

              //將從fruit讀取到的每行數據寫入到context中作為map的輸出

              context.write(key, put);

       }

 

}


 

Step2、構建WriteFruitMRReducer類,用於將讀取到的fruit表中的數據寫入到fruit_mr表中

 


import java.io.IOException;

 

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.io.NullWritable;

 

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

 

       @Override

       protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)

       throws IOException, InterruptedException {

              //讀出來的每一行數據寫入到fruit_mr表中

              for(Put put: values){

                     context.write(NullWritable.get(), put);

              }

       }

 

      

}


 

Step3、構建Fruit2FruitMRJob extends Configured implements Tool,用於組裝運行Job任務

   


//組裝Job

       public int run(String[] args) throws Exception {

              //得到Configuration

              Configuration conf = this.getConf();

              //創建Job任務

              Job job = Job.getInstance(conf, this.getClass().getSimpleName());

              job.setJarByClass(Fruit2FruitMRJob.class);

 

              //配置Job

              Scan scan = new Scan();

              scan.setCacheBlocks(false);

              scan.setCaching(500);

 

              //設置Mapper,注意導入的是mapreduce包下的,不是mapred包下的,后者是老版本

              TableMapReduceUtil.initTableMapperJob(

              "fruit", //數據源的表名

              scan, //scan掃描控制器

              ReadFruitMapper.class,//設置Mapper類

              ImmutableBytesWritable.class,//設置Mapper輸出key類型

              Put.class,//設置Mapper輸出value值類型

              job//設置給哪個JOB

              );

 

              //設置Reducer

              TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);

 

              //設置Reduce數量,最少1個

              job.setNumReduceTasks(1);

 

              boolean isSuccess = job.waitForCompletion(true);

              if(!isSuccess){

                     throw new IOException("Job running with error");

              }

             

              return isSuccess ? 0 : 1;

       }


 

Step4、主函數中調用運行該Job任務


public static void main( String[] args ) throws Exception{

Configuration conf = HBaseConfiguration.create();

int status = ToolRunner.run(conf, new Fruit2FruitMRJob(), args);

System.exit(status);

}


 

 

案例2:從Hbase中讀取數據、分析,寫入hdfs

/**

public abstract class TableMapper<KEYOUT, VALUEOUT>

extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {

}

 * @author duanhaitao@gec.cn

 *

 */

public class HbaseReader {

 

       public static String flow_fields_import = "flow_fields_import";

       static class HdfsSinkMapper extends TableMapper<Text, NullWritable>{

 

              @Override

              protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

 

                     byte[] bytes = key.copyBytes();

                     String phone = new String(bytes);

                     byte[] urlbytes = value.getValue("f1".getBytes(), "url".getBytes());

                     String url = new String(urlbytes);

                     context.write(new Text(phone + "\t" + url), NullWritable.get());

                    

              }

             

       }

      

       static class HdfsSinkReducer extends Reducer<Text, NullWritable, Text, NullWritable>{

             

              @Override

              protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

                    

                     context.write(key, NullWritable.get());

              }

       }

      

       public static void main(String[] args) throws Exception {

              Configuration conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "spark01");

             

              Job job = Job.getInstance(conf);

             

              job.setJarByClass(HbaseReader.class);

             

//            job.setMapperClass(HdfsSinkMapper.class);

              Scan scan = new Scan();

              TableMapReduceUtil.initTableMapperJob(flow_fields_import, scan, HdfsSinkMapper.class, Text.class, NullWritable.class, job);

              job.setReducerClass(HdfsSinkReducer.class);

             

              FileOutputFormat.setOutputPath(job, new Path("c:/hbasetest/output"));

             

              job.setOutputKeyClass(Text.class);

              job.setOutputValueClass(NullWritable.class);

             

              job.waitForCompletion(true);

       }

      

}

 

2.3.2 從hdfs中讀取數據寫入Hbase

q

/**

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>

extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {

}

 * @author duanhaitao@gec.cn

 *

 */

public class HbaseSinker {

 

       public static String flow_fields_import = "flow_fields_import";

       static class HbaseSinkMrMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

              @Override

              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 

                     String line = value.toString();

                     String[] fields = line.split("\t");

                     String phone = fields[0];

                     String url = fields[1];

                    

                     FlowBean bean = new FlowBean(phone,url);

                    

                     context.write(bean, NullWritable.get());

              }

       }

      

       static class HbaseSinkMrReducer extends TableReducer<FlowBean, NullWritable, ImmutableBytesWritable>{

             

              @Override

              protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

                    

                     Put put = new Put(key.getPhone().getBytes());

                     put.add("f1".getBytes(), "url".getBytes(), key.getUrl().getBytes());

                    

                     context.write(new ImmutableBytesWritable(key.getPhone().getBytes()), put);

                    

              }

             

       }

      

       public static void main(String[] args) throws Exception {

              Configuration conf = HBaseConfiguration.create();

              conf.set("hbase.zookeeper.quorum", "spark01");

             

              HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);

             

              boolean tableExists = hBaseAdmin.tableExists(flow_fields_import);

              if(tableExists){

                     hBaseAdmin.disableTable(flow_fields_import);

                     hBaseAdmin.deleteTable(flow_fields_import);

              }

              HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(flow_fields_import));

              HColumnDescriptor hColumnDescriptor = new HColumnDescriptor ("f1".getBytes());

              desc.addFamily(hColumnDescriptor);

             

              hBaseAdmin.createTable(desc);

             

             

              Job job = Job.getInstance(conf);

             

              job.setJarByClass(HbaseSinker.class);

             

              job.setMapperClass(HbaseSinkMrMapper.class);

              TableMapReduceUtil.initTableReducerJob(flow_fields_import, HbaseSinkMrReducer.class, job);

             

              FileInputFormat.setInputPaths(job, new Path("c:/hbasetest/data"));

             

              job.setMapOutputKeyClass(FlowBean.class);

              job.setMapOutputValueClass(NullWritable.class);

             

              job.setOutputKeyClass(ImmutableBytesWritable.class);

              job.setOutputValueClass(Mutation.class);

             

              job.waitForCompletion(true);

             

             

       }

      

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM