一,實現思路
1,先mapreduces得到並傳遞數據。
2,寫好連接表,創建表,插入表hbase數據庫的工具。
3,在reduces中調用寫好的hbase工具。
4,main類提交。
二,代碼書寫
1,mapper
package com; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //傳遞數據 public class mapper extends Mapper<LongWritable, Text, Text, User>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, User>.Context context) throws IOException, InterruptedException { String data = value.toString(); String[] s = data.split(","); System.out.println(data); context.write(new Text("1"), new User(s[0],s[1],s[2],s[3],s[4])); } }
2,hbase工具類
package com; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; public class HbaseUtils { public static final String c="info"; //reducer調用的方法 public static void insertinfo(String ip,String port,String tableName,List<User> list) throws Exception{ Connection con=getConnection(ip,port); HBaseAdmin admin = (HBaseAdmin)con.getAdmin(); Table table = con.getTable(TableName.valueOf(tableName)); boolean b = admin.tableExists(TableName.valueOf(tableName)); if(!b){ createTable(admin,tableName); } insertList(table,list); } //插入數據的方法 private static void insertList(Table table, List<User> list) throws Exception { for (User user : list) { Put put = new Put(user.getId().getBytes()); put.addColumn(c.getBytes(), "name".getBytes(), user.getName().getBytes()); put.addColumn(c.getBytes(), "Age".getBytes(), user.getAge().getBytes()); put.addColumn(c.getBytes(), "Sex".getBytes(), user.getSex().getBytes()); put.addColumn(c.getBytes(), "Part".getBytes(), user.getPart().getBytes()); table.put(put); } } //創建表的方法 private static void createTable(Admin admin, String tableName) throws Exception { HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName)); HColumnDescriptor descriptor2 = new HColumnDescriptor(c); descriptor.addFamily(descriptor2); admin.createTable(descriptor); } //獲得與hbase的連接 private static Connection getConnection(String ip, String port) throws Exception { Configuration configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", ip); configuration.set("hbase.zookeeper.property.clientPort", port); Connection connection = ConnectionFactory.createConnection(configuration); return connection; } }
3,reducer
package com; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class reducer extends Reducer<Text, User, Text, Text>{ @Override protected void reduce(Text keyin, Iterable<User> value, Reducer<Text, User, Text, Text>.Context conetxt) throws IOException, InterruptedException { ArrayList<User> list=new ArrayList<User>(); //克隆迭代器中的數據 for(User user:value) { User user1=new User(); System.out.println(user); try { BeanUtils.copyProperties(user1, user); list.add(user1); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("list+++++++++++++++"+list); //調用hbase工具的方法 try { HbaseUtils.insertinfo("192.168.184.131", "2181", "sw", list); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } conetxt.write(new Text("status"), new Text(":success")); } }
4,main
package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class main { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name", "local"); conf.set("fs.defaultFS", "file:///"); Job wordCountJob = Job.getInstance(conf); //重要:指定本job所在的jar包 wordCountJob.setJarByClass(main.class); //設置wordCountJob所用的mapper邏輯類為哪個類 wordCountJob.setMapperClass(mapper.class); //設置wordCountJob所用的reducer邏輯類為哪個類 wordCountJob.setReducerClass(reducer.class); //設置map階段輸出的kv數據類型 wordCountJob.setMapOutputKeyClass(Text.class); wordCountJob.setMapOutputValueClass(User.class); //設置最終輸出的kv數據類型 wordCountJob.setOutputKeyClass(Text.class); wordCountJob.setOutputValueClass(Text.class); //設置要處理的文本數據所存放的路徑 FileInputFormat.setInputPaths(wordCountJob, "C:\\test\\in6\\data.txt"); FileOutputFormat.setOutputPath(wordCountJob, new Path("C:\\test\\out6")); //提交job給hadoop集群 wordCountJob.waitForCompletion(true); } }