HBase 與 MapReduce 整合
phoenix.apache.org
Mapreduce運行3種方式
本地方式運行:
pc環境
1.1、將 Hadoop安裝本地解壓
1.2、配置 Hadoop的環境變量
添加%HADOOP_HOME%
修改%PATH%添加%HADOOP_HOME%/bin;%HADOOP_HOME%/sbin
3、在解壓的 Hadoop的bin目錄下添加 winutils.exe工具
Java工程
2.1、jdk一定要使用自己的jdk、不要使用 eclipse自帶
2.2、根目錄(src目錄下),不要添加任何 Mapreduce的配置文件hdfs-site.xml yarn-site.xml core-site.xml mapred-site.xml
2.3、在代碼當中,通過conf.set方式來進行指定。conf set("fs.defaults","hdfs://nodel:8020");
2.4、修改 Hadoop源碼
3、右鍵run執行
集群運行兩種方式
Java工程
1、根目錄(Src目錄下),添加 Hadoop的配置文件hdfs-site.xm1 yarn-Site.xm1 core-site.xm1 mapped-site.xm1
2、在代碼當中,指定jar包的位置, config.set(" mapped.jar","D:\WR\wc.jar");
3、修改 Hadoop源碼
4、將工程打jar包
5、右鍵run執行
Java工程
根目錄(src目錄下),添加 Hadoop的配置文件hdfs-site.xm1 yarn-Site.xm1 core-site.xm1 mapped-site.xm1
2、將工程打jar包
手動將jar包上傳到集群當中
4、通過 hadoop命令來運行。 hadoop jar jar位置mr代碼入口(例如: hadoop jar/usr/wc.Jar com.sxt.mr.Wcjob)
在代碼當中指定 Hbase所使用的 Zookeeper集群
(注意:如果 hbase搭建的是仍分布式,那么對應的 Zookeeper就是那台偽分布式的服務器
conf.set("hbase.zookeeper.quorum","nodel, node 2, node3")
System.setproperty("HADOOP_USER_NAME,"root");
HBase與MR整合文檔
49. MapReduce掃描緩存
TableMapReduceUtil現在恢復在傳入的Scan對象上設置掃描程序緩存(將結果返回給客戶端之前緩存的行數)的選項。由於HBase 0.95(HBASE-11558)中的錯誤,此功能丟失了,對於HBase 0.98.5和0.96.3是固定的。選擇掃描儀緩存的優先順序如下:
- 在掃描對象上設置的緩存設置。
- 通過配置選項指定的緩存設置
hbase.client.scanner.caching
,可以在hbase-site.xml中手動設置,也可以通過helper方法設置TableMapReduceUtil.setScannerCaching()
。 - 默認值
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING
,設置為100
。
優化緩存設置是客戶端等待結果的時間與客戶端需要接收的結果集數量之間的平衡。如果緩存設置太大,則客戶端可能會等待很長時間,甚至請求可能會超時。如果設置太小,則掃描需要分多次返回結果。如果您將掃描視為鏟子,則較大的緩存設置類似於較大的鏟子,較小的緩存設置等效於進行更多鏟斗以填充鏟斗。
上面提到的優先級列表允許您設置一個合理的默認值,並為特定操作覆蓋它。
有關更多詳細信息,請參見Scan的API文檔。
50.捆綁的HBase MapReduce作業
HBase JAR還可以用作某些捆綁的MapReduce作業的驅動程序。要了解捆綁的MapReduce作業,請運行以下命令。
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
An example program must be given as the first argument.
Valid program names are:
copytable: Export a table from local cluster to peer cluster
completebulkload: Complete a bulk data load.
export: Write table data to HDFS.
import: Import data written by Export.
importtsv: Import data in TSV format.
rowcounter: Count rows in HBase table
每個有效的程序名稱都是捆綁的MapReduce作業。要運行作業之一,請在以下示例之后對命令建模。
$ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable
51. HBase作為MapReduce作業數據源和數據接收器
HBase可用作MapReduce作業的數據源TableInputFormat和數據接收器TableOutputFormat或MultiTableOutputFormat。編寫MapReduce作業以讀取或寫入HBase時,建議將TableMapper 和/或TableReducer子類化。有關基本用法,請參見無作用傳遞類IdentityTableMapper和IdentityTableReducer。有關更多示例,請參閱RowCounter或查看org.apache.hadoop.hbase.mapreduce.TestTableMapReduce
單元測試。
如果運行使用HBase作為源或接收器的MapReduce作業,則需要在配置中指定源和接收器表及列的名稱。
當您從HBase讀取數據時,會從HBase TableInputFormat
請求區域列表,並創建一個地圖,該地圖可以是map-per-region
或mapreduce.job.maps
,以較小者為准。如果您的工作只有兩張地圖,請加薪mapreduce.job.maps
數量大於區域數量。如果您在每個節點上運行TaskTracer / NodeManager和RegionServer,則地圖將在相鄰的TaskTracker / NodeManager上運行。寫入HBase時,應避免執行Reduce步驟,然后從地圖中寫回HBase。當您的工作不需要MapReduce對地圖發出的數據執行的排序和排序規則時,此方法有效。插入時,HBase會進行“排序”,因此除非有必要,否則就不會進行點雙重排序(並在MapReduce集群周圍進行數據改組)。如果不需要精簡,則地圖可能會在作業結束時發出為報告而處理的記錄計數,或者將精簡數量設置為零並使用TableOutputFormat。如果根據您的情況運行“減少”步驟,
一個新的HBase分區程序HRegionPartitioner可以運行與現有區域數一樣多的reducer。HRegionPartitioner適用於表較大且上傳完成后不會大大改變現有區域數的情況。否則,請使用默認分區程序。
52.在批量導入期間直接寫入HFile
如果要導入到新表中,則可以繞過HBase API並將內容直接寫入文件系統,並格式化為HBase數據文件(HFiles)。您的導入將運行得更快,也許要快一個數量級。有關此機制如何工作的更多信息,請參見批量加載。
53. RowCounter示例
包含的RowCounter MapReduce作業使用TableInputFormat
並計算指定表中的所有行。要運行它,請使用以下命令:
$ ./bin/hadoop jar hbase-X.X.X.jar
這將調用HBase MapReduce驅動程序類。選擇rowcounter
從提供的職位選擇。這會將行計數器使用建議打印到標准輸出。指定表名,要計數的列和輸出目錄。如果您遇到類路徑錯誤,請參見HBase,MapReduce和CLASSPATH。
54.Map-Task Splitting
54.1。默認的HBase MapReduce拆分器
當使用TableInputFormat來在MapReduce作業中獲取HBase表時,其拆分器將為該表的每個區域創建一個映射任務。因此,如果表中有100個區域,則該作業將有100個映射任務-無論在“掃描”中選擇了多少列族。
54.2。定制分離器
對於那些有興趣在實現自定義的分離器,看到法getSplits
中TableInputFormatBase。這就是映射任務分配的邏輯所在。
HBase MapReduce Examples
HBase MapReduce閱讀示例
以下是以只讀方式將HBase用作MapReduce源的示例。具體來說,有一個Mapper實例,但沒有Reducer,並且沒有從Mapper發出任何東西。該工作的定義如下...
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "ExampleRead");
job.setJarByClass(MyReadJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 掃描中的默認值為1,這對MapReduce作業不利
scan.setCacheBlocks(false); // MR工作不要設置為true
// 設置其他scan屬性
...
TableMapReduceUtil.initTableMapperJob(
tableName, // 輸入HBase表名稱
scan, // scan 實例以控制CF和屬性
MyMapper.class, // mapper
null, // mapper output key
null, // mapper output value
job);
job.setOutputFormatClass(NullOutputFormat.class); //因為我們沒有從mapper發出任何東西
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
...而mapper實例將擴展TableMapper ...
public static class MyMapper extends TableMapper<Text, Text> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
//處理來自Result實例的行的數據。
}
}
HBase MapReduce讀/寫示例
以下是通過MapReduce將HBase用作源和接收器的示例。此示例將簡單地將數據從一個表復制到另一個表。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
null, // mapper output key
null, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
null, // reducer class
job);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw
需要說明TableMapReduceUtil
正在做什么,尤其是對於減速器。TableOutputFormat被用作outputFormat類,並且在配置(例如TableOutputFormat.OUTPUT_TABLE
)上設置了幾個參數,並將reducer輸出鍵設置為ImmutableBytesWritable
,reducer值設置為Writable
。這些可以由程序員在工作和配置上設置,但是TableMapReduceUtil
試圖使事情變得更容易。
以下是示例映射器,它將創建一個Put
與輸入匹配的輸出Result
。注意:這就是CopyTable實用程序的作用。
public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
// 這個例子只是從源表中復制數據..
context.write(row, resultToPut(row,value));
}
private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
Put put = new Put(key.get());
for (Cell cell : result.listCells()) {
put.add(cell);
}
return put;
}
}
實際上沒有reducer步驟,因此請TableOutputFormat
注意將其發送Put
到目標表。
這只是一個示例,開發人員可以選擇不使用TableOutputFormat
並自己連接到目標表。
具有多表輸出的HBase MapReduce讀/寫示例
TODO:的示例MultiTableOutputFormat
。
HBase MapReduce匯總到HBase示例
以下示例將HBase用作MapReduce源和接收器,並進行匯總。本示例將對一個表中某個值的不同實例的數量進行計數,並將這些匯總計數寫入另一個表中。
Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
在此示例映射器中,選擇具有字符串值的列作為要匯總的值。該值用作從映射器發出的鍵,並且一個IntWritable
代表實例計數器。
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a .
public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] COUNT = "count".getBytes();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(CF, COUNT, Bytes.toBytes(i));
context.write(null, put);
}
}
HBase MapReduce摘要文件示例
這與上面的摘要示例非常相似,不同的是,它使用HBase作為MapReduce源,但使用HDFS作為接收器。區別在於作業設置和減速機。映射器保持不變。
onfiguration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummaryToFile");
job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
job.setReducerClass(MyReducer.class); // reducer class
job.setNumReduceTasks(1); // at least one, adjust as required
FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
如上所述,在此示例中,先前的Mapper可以保持不變。至於Reducer,它是一個“通用” Reducer,而不是擴展TableMapper和發出Puts。
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i += val.get();
}
context.write(key, new IntWritable(i));
}
}
單詞統計案例(Maven)
pom文件
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<!--定義hadoop版本-->
<hadoop.version>2.7.5</hadoop.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client/0.98.23-hadoop2 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.23-hadoop2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.1.0</version>
</dependency>
<!--hadoop客服端依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hdfs文件系統依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--MapReduce相關的依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
WCRunner
package icu.shaoyayu.hadoop.hbase.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
/**
* @author shaoyayu
* @date 2020/7/15 11:33
* @E_Mail
* @Version 1.0.0
* @readme :
* Hbase與MapReduce結合使用
*/
public class WCRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//配置環境
Configuration conf = new CompoundConfiguration();
conf.set("","hadoopNode02,hadoopNode03,hadoopNode04");
//需要設置存儲的NameNode節點
conf.set("fs.defaultFS", "hdfs://hadoopNode01:8020");
Job job = Job.getInstance(conf);
job.setJarByClass(WCRunner.class);
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//最后參數一定寫false
TableMapReduceUtil.initTableReducerJob("wc", WCReducer.class, job, null, null, null, null, false);
FileInputFormat.addInputPath(job, new Path("/usr/wc"));
// reduce端輸出的key和value的類型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
// job.setOutputFormatClass(cls);
// job.setInputFormatClass(cls);
job.waitForCompletion(true);
}
}
WCMapper
package icu.shaoyayu.hadoop.hbase.mr;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author shaoyayu
* @date 2020/7/15 11:43
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class WCMapper extends Mapper<LongWritable,Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] splits = value.toString().split(" ");
//第二種切割方法
// new StringTokenizer(value.toString()," ");
for (String string : splits) {
context.write(new Text(string), new IntWritable(1));
}
}
}
WCReducer
package icu.shaoyayu.hadoop.hbase.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* @author shaoyayu
* @date 2020/7/15 11:45
* @E_Mail
* @Version 1.0.0
* @readme :
*/
public class WCReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> iter,
Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : iter) {
sum+=intWritable.get();
}
Put put = new Put(key.toString().getBytes());
put.add("cf".getBytes(), "cf".getBytes(), String.valueOf(sum).getBytes());
context.write(null, put);
}
}
源碼分析:TableMapReduceUtil
initTableReducerJob()
TableMapReduceUtil.initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job,
Class partitioner, String quorumAddress, String serverClass,
String serverImpl, boolean addDependencyJars)
/ **
*在提交TableReduce作業之前使用此功能。
*將適當地設置JobConf。
*
* @param table輸出表。
* @param reducer要使用的reducer類。
* @param job當前要調整的作業。確保傳遞的作業*具有所有必要的HBase配置。
* @param partitioner要使用的Partitioner。傳遞<code> null </ code>以使用*默認分區程序。
* @param quorumAddress要寫入的遠程集群;
*輸出到<code> hbase-site.xml </ code>中指定的集群的默認值為null。
*將此字符串設置為備用遠程集群的zookeeper集成
*當您需要減少寫入非默認集群的集群時;例如在集群之間復制表時,源將由<code> hbase-site.xml </code>指定,
*並且該參數將具有遠程集群的集合地址。要傳遞的格式特別。
*傳遞<code> hbase.zookeeper.quorum
* hbase.zookeeper.client.port
* zookeeper.znode.parent
* </ code>,例如<code> server,server2,server3:2181:/ hbase </ code>。
* @param serverClass重新定義了hbase.regionserver.class
* @param serverImpl重新定義了hbase.regionserver.impl
* @param addDependencyJars通過分布式緩存(tmpjars)為任何已配置的作業類上載HBase jar和jars。
* @throws IOException當確定區域計數失敗時。
* /
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job,
Class partitioner, String quorumAddress, String serverClass,
String serverImpl, boolean addDependencyJars) throws IOException {
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
//重點在這個地方設置輸出到HBase里面去
job.setOutputFormatClass(TableOutputFormat.class);
if (reducer != null) job.setReducerClass(reducer);
conf.set(TableOutputFormat.OUTPUT_TABLE, table);
conf.setStrings("io.serializations", conf.get("io.serializations"),
MutationSerialization.class.getName(), ResultSerialization.class.getName());
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format
ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {
conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
}
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
if (partitioner == HRegionPartitioner.class) {
job.setPartitionerClass(HRegionPartitioner.class);
int regions = MetaTableAccessor.getRegionCount(conf, TableName.valueOf(table));
if (job.getNumReduceTasks() > regions) {
job.setNumReduceTasks(regions);
}
} else if (partitioner != null) {
job.setPartitionerClass(partitioner);
}
if (addDependencyJars) {
addDependencyJars(job);
}
initCredentials(job);
}
TableOutputFormat
/ **
*創建一個新的記錄作者。
*
*請注意,基線javadoc給人的印象是只有一個
*每個工作{@link RecordWriter},但在HBase中,如果我們給您一個新的
* RecordWriter每次調用此方法。 完成后,您必須關閉返回的RecordWriter。
*否則將丟失寫入。
*
* @param context當前任務上下文。
* @return新創建的writer實例。
* @throws IOException創建寫入器時失敗。
* @throws InterruptedException作業取消時。
* /
@Override
public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
return new TableRecordWriter();
}
TableRecordWriter
/**
* 將reducer輸出寫入HBase表。
*/
protected class TableRecordWriter
extends RecordWriter<KEY, Mutation> {
private Connection connection;
private BufferedMutator mutator;
/**
* @throws IOException
*
*/
public TableRecordWriter() throws IOException {
String tableName = conf.get(OUTPUT_TABLE);
this.connection = ConnectionFactory.createConnection(conf);
this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
LOG.info("Created table instance for " + tableName);
}
/**
* Closes the writer, in this case flush table commits.
*
* @param context The context.
* @throws IOException When closing the writer fails.
* @see RecordWriter#close(TaskAttemptContext)
*/
@Override
public void close(TaskAttemptContext context) throws IOException {
try {
if (mutator != null) {
mutator.close();
}
} finally {
if (connection != null) {
connection.close();
}
}
}
/ **
*將一個鍵/值對寫入表中。
*
* @param鍵鍵。
* @param value值。
* @throws IOException寫入失敗時。
* @請參見RecordWriter#write(Object,Object)
* /
@Override
public void write(KEY key, Mutation value)
throws IOException {
if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
}
mutator.mutate(value);
}
}
initTableMapperJob()
/ **
*在提交Multi TableMap作業之前使用它。 它將適當設置
*完成工作。
*
* @param scans要讀取的{@link Scan}對象的列表。
* @param mapper要使用的mapper類。
* @param outputKeyClass輸出鍵的類。
* @param outputValueClass輸出值的類。
* @param job當前要調整的作業。 確保通過的工作正在進行
*所有必需的HBase配置。
* @param addDependencyJars上傳HBase jar和任何
*通過分布式緩存(tmpjars)配置作業類。
* @param initCredentials是否初始化作業的hbase身份驗證憑據
* @throws IOException設置細節時失敗。
* /
public static void initTableMapperJob(List<Scan> scans,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars,
boolean initCredentials) throws IOException {
//設置Hbase為輸入對象
job.setInputFormatClass(MultiTableInputFormat.class);
if (outputValueClass != null) {
job.setMapOutputValueClass(outputValueClass);
}
if (outputKeyClass != null) {
job.setMapOutputKeyClass(outputKeyClass);
}
job.setMapperClass(mapper);
Configuration conf = job.getConfiguration();
HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
List<String> scanStrings = new ArrayList<>();
for (Scan scan : scans) {
scanStrings.add(convertScanToString(scan));
}
job.getConfiguration().setStrings(MultiTableInputFormat.SCANS,
scanStrings.toArray(new String[scanStrings.size()]));
if (addDependencyJars) {
addDependencyJars(job);
}
if (initCredentials) {
initCredentials(job);
}
}
MultiTableInputFormat
在父類MultiTableInputFormatBase
中
/ **
*計算將用作地圖任務輸入的分割。 的
*分割數與表中的區域數匹配。
*
* @param context當前作業上下文。
* @return輸入拆分列表。
* @throws IOException創建拆分列表時失敗。
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
* /
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException {
if (scans.isEmpty()) {
throw new IOException("No scans were provided.");
}
Map<TableName, List<Scan>> tableMaps = new HashMap<>();
for (Scan scan : scans) {
byte[] tableNameBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);
if (tableNameBytes == null)
throw new IOException("A scan object did not have a table name");
TableName tableName = TableName.valueOf(tableNameBytes);
List<Scan> scanList = tableMaps.get(tableName);
if (scanList == null) {
scanList = new ArrayList<>();
tableMaps.put(tableName, scanList);
}
scanList.add(scan);
}
List<InputSplit> splits = new ArrayList<>();
Iterator iter = tableMaps.entrySet().iterator();
// Make a single Connection to the Cluster and use it across all tables.
try (Connection conn = ConnectionFactory.createConnection(context.getConfiguration())) {
while (iter.hasNext()) {
Map.Entry<TableName, List<Scan>> entry = (Map.Entry<TableName, List<Scan>>) iter.next();
TableName tableName = entry.getKey();
List<Scan> scanList = entry.getValue();
try (Table table = conn.getTable(tableName);
RegionLocator regionLocator = conn.getRegionLocator(tableName)) {
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(
regionLocator, conn.getAdmin());
Pair<byte[][], byte[][]> keys = regionLocator.getStartEndKeys();
for (Scan scan : scanList) {
if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
throw new IOException("Expecting at least one region for table : "
+ tableName.getNameAsString());
}
int count = 0;
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
for (int i = 0; i < keys.getFirst().length; i++) {
if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
continue;
}
if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
(stopRow.length == 0 || Bytes.compareTo(stopRow,
keys.getFirst()[i]) > 0)) {
byte[] splitStart = startRow.length == 0 ||
Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
keys.getFirst()[i] : startRow;
byte[] splitStop = (stopRow.length == 0 ||
Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
keys.getSecond()[i].length > 0 ?
keys.getSecond()[i] : stopRow;
HRegionLocation hregionLocation = regionLocator.getRegionLocation(
keys.getFirst()[i], false);
String regionHostname = hregionLocation.getHostname();
HRegionInfo regionInfo = hregionLocation.getRegionInfo();
String encodedRegionName = regionInfo.getEncodedName();
long regionSize = sizeCalculator.getRegionSize(
regionInfo.getRegionName());
TableSplit split = new TableSplit(table.getName(),
scan, splitStart, splitStop, regionHostname,
encodedRegionName, regionSize);
splits.add(split);
if (LOG.isDebugEnabled()) {
LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
}
}
}
}
}
}
}
return splits;
}
默認的切片是RowKey的大小