insert.java
package cn.ls.insert;
import cn.ls.util.HbaseConn;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Created by Administrator on 2016/12/30.
*/
public class Insert {
public static void main(String[] args) throws IOException {
/*Connection hbaseConn = new HbaseConn().getHbaseConn();
HTableInterface table = (HTableInterface) hbaseConn.getTable(TableName.valueOf(args[0]));
Put put = new Put(Bytes.toBytes(args[1])); // zookey
put.add(Bytes.toBytes(args[2]), Bytes.toBytes(args[3]),Bytes.toBytes(args[4])); // 列族,qualifier,value
table.put(put);
table.close();// 釋放資源
hbaseConn.close();*/
/*hbase里面其實沒有列的概念,列就是數據*/
// insertOne();
insertBatch();
}
private static void insertBatch() throws IOException {
// 批量插入
Connection hbaseConn = new HbaseConn().getHbaseConn();
HTableInterface table = (HTableInterface) hbaseConn.getTable(TableName.valueOf("testCreate1228"));
List<Put> list = new ArrayList<Put>();
for (int i =0; i< 10; i++){
Put put = new Put(Bytes.toBytes("20161130abc" + i));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("lie1"+i), Bytes.toBytes("20161130abc"+i));
list.add(put);
}
table.put(list);
table.close();
hbaseConn.close();
}
private static void insertOne() throws IOException {
// 插入單條
Connection hbaseConn = new HbaseConn().getHbaseConn();
HTableInterface table = (HTableInterface) hbaseConn.getTable(TableName.valueOf("testCreate1228"));
Put put = new Put(Bytes.toBytes("20161130abc3"));
put.add(Bytes.toBytes("f1"), Bytes.toBytes("lie1"), Bytes.toBytes("20161130abc"));
table.put(put);
table.close();
hbaseConn.close();
}
}
MyBulkload.java
package cn.ls.insert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyBulkload {
public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
@Override
protected void setup(Mapper.Context context) throws IOException,
InterruptedException {
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 數據按\t切分組織, 也可以自定義的方式來解析, 比如復雜的json/xml文本行
String line = value.toString();
String[] terms = line.split("\t");
if (terms.length == 4) {
byte[] rowkey = terms[0].getBytes();
ImmutableBytesWritable imrowkey = new ImmutableBytesWritable(rowkey);
// 寫入context中, rowkey => keyvalue, 列族:列名 info:name, info:age, info:phone
context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(terms[1])));
context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(terms[2])));
context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(terms[3])));
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: MyBulkload <table_name> <data_input_path> <hfile_output_path>");
System.exit(2);
}
String tableName = args[0];
String inputPath = args[1];
String outputPath = args[2];
// 創建的HTable實例用於, 用於獲取導入表的元信息, 包括region的key范圍划分
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, tableName);
Job job = Job.getInstance(conf, "MyBulkload");
job.setMapperClass(MyBulkMapper.class);
job.setJarByClass(MyBulkload.class);
job.setInputFormatClass(TextInputFormat.class);
// 最重要的配置代碼, 需要重點分析
HFileOutputFormat.configureIncrementalLoad(job, table);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}