Mapper
1 package cn.hbase.mapreduce.hb2hdfs; 2
3 import java.io.IOException; 4 import org.apache.hadoop.hbase.client.Result; 5 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 6 import org.apache.hadoop.hbase.mapreduce.TableMapper; 7
8 /**
9 * 10 * @author Tele 11 * 12 * 輸出key 行鍵 輸出out 讀出的一行數據 13 */
14
15 public class ReadFruitFromHbMapper extends TableMapper<ImmutableBytesWritable, Result> { 16
17 @Override 18 protected void map(ImmutableBytesWritable key, Result value, Context context) 19 throws IOException, InterruptedException { 20 context.write(key, value); 21 } 22 }
Reducer
1 package cn.hbase.mapreduce.hb2hdfs; 2
3 import java.io.IOException; 4
5 import org.apache.hadoop.hbase.Cell; 6 import org.apache.hadoop.hbase.CellScanner; 7 import org.apache.hadoop.hbase.CellUtil; 8 import org.apache.hadoop.hbase.client.Result; 9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 10 import org.apache.hadoop.hbase.util.Bytes; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Reducer; 14
15 /**
16 * 17 * @author Tele 18 * 19 */
20
21 public class WriteFruit2HdfsReducer extends Reducer<ImmutableBytesWritable, Result, NullWritable, Text> { 22 @Override 23 protected void reduce(ImmutableBytesWritable key, Iterable<Result> values, Context context) 24 throws IOException, InterruptedException { 25 for (Result result : values) { 26 CellScanner scanner = result.cellScanner(); 27 while (scanner.advance()) { 28 Cell cell = scanner.current(); 29 Text text = new Text(); 30 // 封裝數據
31 String row = Bytes.toString(CellUtil.cloneRow(cell)) + "\t"; 32 String cf = Bytes.toString(CellUtil.cloneFamily(cell)) + "\t"; 33 String cn = Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t"; 34 String value = Bytes.toString(CellUtil.cloneValue(cell)) + "\t"; 35
36 StringBuffer buffer = new StringBuffer(); 37 buffer.append(row).append(cf).append(cn).append(value); 38 text.set(buffer.toString()); 39
40 // 寫出
41 context.write(NullWritable.get(), text); 42 } 43
44 } 45
46 } 47 }
Runner
1 package cn.hbase.mapreduce.hb2hdfs; 2
3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.Result; 8 import org.apache.hadoop.hbase.client.Scan; 9 import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 10 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 11 import org.apache.hadoop.io.NullWritable; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.util.Tool; 16 import org.apache.hadoop.util.ToolRunner; 17
18 /**
19 * 20 * @author Tele 21 * 22 */
23
24 public class FruitRunner extends Configured implements Tool { 25
26 public int run(String[] args) throws Exception { 27
28 System.setProperty("HADOOP_USER_NAME", "tele"); 29 // 實例化job
30 Job job = Job.getInstance(this.getConf()); 31
32 // 設置jar
33 job.setJarByClass(FruitRunner.class); 34
35 // 設置緩存行鍵
36 Scan scan = new Scan(); 37 scan.setCaching(300); 38
39 // 組裝mapper
40 TableMapReduceUtil.initTableMapperJob("fruit", scan, ReadFruitFromHbMapper.class, ImmutableBytesWritable.class, 41 Result.class, job); 42 // 組裝reuder
43 job.setReducerClass(WriteFruit2HdfsReducer.class); 44 job.setOutputKeyClass(NullWritable.class); 45 job.setOutputValueClass(Text.class); 46
47 FileOutputFormat.setOutputPath(job, new Path("/outputfruit")); 48
49 // reduce個數
50 job.setNumReduceTasks(1); 51
52 // 提交
53 return job.waitForCompletion(true) ? 0 : 1; 54 } 55
56 public static void main(String[] args) throws Exception { 57 Configuration conf = HBaseConfiguration.create(); 58 ToolRunner.run(conf, new FruitRunner(), args); 59 } 60
61 }