HBase結合MapReduce批量導入(HDFS中的數據導入到HBase)


HBase結合MapReduce批量導入

 1 package hbase;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.hbase.client.Put;
 8 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 9 import org.apache.hadoop.hbase.mapreduce.TableReducer;
10 import org.apache.hadoop.hbase.util.Bytes;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.NullWritable;
13 import org.apache.hadoop.io.Text;
14 import org.apache.hadoop.mapreduce.Counter;
15 import org.apache.hadoop.mapreduce.Job;
16 import org.apache.hadoop.mapreduce.Mapper;
17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
18 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
19 
20 public class BatchImport {
21     static class BatchImportMapper extends
22             Mapper<LongWritable, Text, LongWritable, Text> {
23         SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");
24         Text v2 = new Text();
25 
26         protected void map(LongWritable key, Text value, Context context)
27                 throws java.io.IOException, InterruptedException {
28             final String[] splited = value.toString().split("\t");
29             try {
30                 final Date date = new Date(Long.parseLong(splited[0].trim()));
31                 final String dateFormat = dateformat1.format(date);
32                 String rowKey = splited[1] + ":" + dateFormat;//設置行鍵:手機號碼+日期時間
33                 v2.set(rowKey + "\t" + value.toString());
34                 context.write(key, v2);
35             } catch (NumberFormatException e) {
36                 final Counter counter = context.getCounter("BatchImport",
37                         "ErrorFormat");
38                 counter.increment(1L);
39                 System.out.println("出錯了" + splited[0] + " " + e.getMessage());
40             }
41         };
42     }
43 
44     static class BatchImportReducer extends
45             TableReducer<LongWritable, Text, NullWritable> {
46         protected void reduce(LongWritable key,
47                 java.lang.Iterable<Text> values, Context context)
48                 throws java.io.IOException, InterruptedException {
49             for (Text text : values) {
50                 final String[] splited = text.toString().split("\t");
51 
52                 final Put put = new Put(Bytes.toBytes(splited[0]));//第一列行鍵
53                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"),
54                         Bytes.toBytes(splited[1]));//第二列日期
55                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"),
56                         Bytes.toBytes(splited[2]));//第三列手機號碼
57                 // 省略其他字段,調用put.add(....)即可
58                 context.write(NullWritable.get(), put);
59             }
60         };
61     }
62 
63     public static void main(String[] args) throws Exception {
64         final Configuration configuration = new Configuration();
65         // 設置zookeeper
66         configuration.set("hbase.zookeeper.quorum", "hadoop0");
67         // 設置hbase表名稱
68         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");//先在shell下創建一個表:create 'wlan_log','cf'
69         // 將該值改大,防止hbase超時退出
70         configuration.set("dfs.socket.timeout", "180000");
71 
72         final Job job = new Job(configuration, "HBaseBatchImport");
73 
74         job.setMapperClass(BatchImportMapper.class);
75         job.setReducerClass(BatchImportReducer.class);
76         // 設置map的輸出,不設置reduce的輸出類型
77         job.setMapOutputKeyClass(LongWritable.class);
78         job.setMapOutputValueClass(Text.class);
79 
80         job.setInputFormatClass(TextInputFormat.class);
81         // 不再設置輸出路徑,而是設置輸出格式類型
82         job.setOutputFormatClass(TableOutputFormat.class);
83 
84         FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");//將手機上網日志文件上傳到HDFS中的input文件中
85 
86         job.waitForCompletion(true);
87     }
88 }

 

在eclipse中將上面代碼運行成功后,就可以去HBase shell中查看結果:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM