碰到這樣一個事:我們往hbase里面導數據, 補了快一年的數據了,結果發現某個列的數據有幾個月是有問題的,不能用,所以需要將這個列的有問題的幾個月數據全部干掉, 查了hbase的命令,發現沒有這種根據rowkey范圍直接刪除某個列的命令. 所以只能自己寫了: 可以采用客戶端編程的方式,也可以采用hbase on mr的方式,我這里采用的是hbase on mr的方式。原因是如果采用客戶端編程的方式,需要scan所有的主鍵,然后判斷rowkey是否符合刪除的要求,如果符合則刪除,因為數據量很大,這種方式可能太慢,其次是怕把客戶端直接給弄死了。采用mr分布式的做法就不用擔心這方面的問題。
注:
1. hbase的版本是: HBase 0.98.9
2. rowkey的形式是 userid+yyyyMMdd的形式, 比如: 0000120181103, 這里需要把20180406之前的數據的某個列( f:cl )干掉,代碼如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class HbaseDelColMr {
static class DelColMapper extends TableMapper<Text, NullWritable> {
private Text dekKey = new Text();
// key: rowkey
// result: 一行的數據
@Override
public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
//拿到 rowkey
String rowkey = Bytes.toString(key.get());
// 判斷 rowkey 是否需要刪除 rowkey的類型類似這種字符串 12556565620180405
String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length());
//如果在20180406之前的數據全部需要刪掉
if (Integer.parseInt(dateStr) < 20180406) {
dekKey.set(rowkey);
context.write(dekKey, NullWritable.get());
}
}
}
static class DelColReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
@Override
public void reduce(Text delKey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// delKey 這就是要刪除的rowkey
Delete delete = new Delete(Bytes.toBytes(delKey.toString()));
//設置要刪除的列
delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
context.write(new ImmutableBytesWritable(Bytes.toBytes(delKey.toString())), delete);
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
//configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test");
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
for (String ar:otherArgs) {
System.out.println(ar+" ======================================");
}
Job job = Job.getInstance(configuration);
job.setJobName("HbaseDelColMr");
job.setJarByClass(HbaseDelColMr.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
otherArgs[0], //輸入表 "dt_list_detail_test"
scan, // scan 對象
DelColMapper.class,
Text.class, //mapper輸出的key類型
NullWritable.class, //mapper輸出的value類型
job
);
TableMapReduceUtil.initTableReducerJob(
otherArgs[0],// 輸出表 "dt_list_detail_test"
DelColReducer.class,
job);
job.setNumReduceTasks(10);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("任務出錯.....");
}
}
}
還有一種效率更高更加簡便的方式, 就是去掉reduce階段, 如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class HbaseDelColMr2 {
static class DelColMapper extends TableMapper<ImmutableBytesWritable, Delete> {
@Override
public void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
String rowkey = Bytes.toString(key.get()); //拿到 rowkey
// 判斷 rowkey 是否需要刪除 rowkey的類型類似這種字符串 12556565620180405
String dateStr = rowkey.substring(rowkey.length() - 8, rowkey.length());
//如果在20180406之前的數據全部需要刪掉
if (Integer.parseInt(dateStr) < 20180406) {
//設置要刪除的列
Delete delete = new Delete(Bytes.toBytes(rowkey));
delete.deleteColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
context.write(key, delete); //需要測試如果沒有reduce階段,這里是否會直接寫入到hbase, 補充:結論是可以的
}
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "zk_1,zk_2,zk_3,zk_4,zk_5");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
//configuration.set("hbase.local.dir", "/tmp/hbase-local-dir_test");
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
for (String ar:otherArgs) {
System.out.println(ar+" ======================================");
}
Job job = Job.getInstance(configuration);
job.setJobName("HbaseDelColMr2");
job.setJarByClass(HbaseDelColMr2.class);
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cl"));
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
otherArgs[0], //輸入表 "dt_list_detail_test"
scan, // scan 對象
DelColMapper.class,
null, //沒有輸出,直接寫入hbase
null, //沒有輸出,直接寫入hbase
job
);
TableMapReduceUtil.initTableReducerJob(
otherArgs[0],// 輸出表 "dt_list_detail_test"
null,
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("任務出錯.....");
}
}
}
打包調用:
export HADOOP_CLASSPATH=`hbase classpath`
yarn jar ./hbaseDeltest.jar xxx.HbaseDelColMr -D mapreduce.job.queuename=xxx dt_list_detail_test
這樣子就可以啦,上面兩種方式隨便選一種就ok了。。。。。。