hbase 從hbase上讀取數據寫入到hdfs


Mapper

 1 package cn.hbase.mapreduce.hb2hdfs;  2 
 3 import java.io.IOException;  4 import org.apache.hadoop.hbase.client.Result;  5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  6 import org.apache.hadoop.hbase.mapreduce.TableMapper;  7 
 8 /**
 9  * 10  * @author Tele 11  * 12  * 輸出key 行鍵 輸出out 讀出的一行數據 13  */
14 
15 public class ReadFruitFromHbMapper extends TableMapper<ImmutableBytesWritable, Result> { 16 
17  @Override 18     protected void map(ImmutableBytesWritable key, Result value, Context context) 19             throws IOException, InterruptedException { 20  context.write(key, value); 21  } 22 }

Reducer

 1 package cn.hbase.mapreduce.hb2hdfs;  2 
 3 import java.io.IOException;  4 
 5 import org.apache.hadoop.hbase.Cell;  6 import org.apache.hadoop.hbase.CellScanner;  7 import org.apache.hadoop.hbase.CellUtil;  8 import org.apache.hadoop.hbase.client.Result;  9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 10 import org.apache.hadoop.hbase.util.Bytes; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Reducer; 14 
15 /**
16  * 17  * @author Tele 18  * 19  */
20 
21 public class WriteFruit2HdfsReducer extends Reducer<ImmutableBytesWritable, Result, NullWritable, Text> { 22  @Override 23     protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) 24             throws IOException, InterruptedException { 25         for (Result result : values) { 26             CellScanner scanner = result.cellScanner(); 27             while (scanner.advance()) { 28                 Cell cell = scanner.current(); 29                 Text text = new Text(); 30                 // 封裝數據
31                 String row = Bytes.toString(CellUtil.cloneRow(cell)) + "\t"; 32                 String cf = Bytes.toString(CellUtil.cloneFamily(cell)) + "\t"; 33                 String cn = Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t"; 34                 String value = Bytes.toString(CellUtil.cloneValue(cell)) + "\t"; 35 
36                 StringBuffer buffer = new StringBuffer(); 37  buffer.append(row).append(cf).append(cn).append(value); 38  text.set(buffer.toString()); 39 
40                 // 寫出
41  context.write(NullWritable.get(), text); 42  } 43 
44  } 45 
46  } 47 }

Runner

 1 package cn.hbase.mapreduce.hb2hdfs;  2 
 3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.conf.Configured;  5 import org.apache.hadoop.fs.Path;  6 import org.apache.hadoop.hbase.HBaseConfiguration;  7 import org.apache.hadoop.hbase.client.Result;  8 import org.apache.hadoop.hbase.client.Scan;  9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 10 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17 
18 /**
19  * 20  * @author Tele 21  * 22  */
23 
24 public class FruitRunner extends Configured implements Tool { 25 
26     public int run(String[] args) throws Exception { 27 
28         System.setProperty("HADOOP_USER_NAME", "tele"); 29         // 實例化job
30         Job job = Job.getInstance(this.getConf()); 31 
32         // 設置jar
33         job.setJarByClass(FruitRunner.class); 34 
35         // 設置緩存行鍵
36         Scan scan = new Scan(); 37         scan.setCaching(300); 38       
39         // 組裝mapper
40         TableMapReduceUtil.initTableMapperJob("fruit", scan, ReadFruitFromHbMapper.class, ImmutableBytesWritable.class, 41                 Result.class, job); 42         // 組裝reuder
43         job.setReducerClass(WriteFruit2HdfsReducer.class); 44         job.setOutputKeyClass(NullWritable.class); 45         job.setOutputValueClass(Text.class); 46 
47         FileOutputFormat.setOutputPath(job, new Path("/outputfruit")); 48 
49         // reduce個數
50         job.setNumReduceTasks(1); 51 
52         // 提交
53         return job.waitForCompletion(true) ? 0 : 1; 54  } 55 
56     public static void main(String[] args) throws Exception { 57         Configuration conf = HBaseConfiguration.create(); 58         ToolRunner.run(conf, new FruitRunner(), args); 59  } 60 
61 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM