前言
通過spark獲取hbase數據的過程中,遇到了InputFormat。文章主要圍繞InputFormat介紹。會牽扯到spark,mapreduce,hbase相關內容
InputFormat
InputFormat是mapreduce提供的數據源格式接口,也就是說,通過該接口可以支持讀取各種各樣的數據源(文件系統,數據庫等),從而進行mapreduce計算。
在有這個概念的基礎上分析InputFormat的源碼。
public abstract class InputFormat<K, V> { /* * 獲取數據的分區信息,每個分區包裝成InputSplit,返回一個List<InputSplit> * 注意這里的分區是邏輯分區 * 比如一個文件,一共有100個字符,假如安裝每個分區10個字符,那么一共有10個分區 */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; /* * 根據分區信息,獲取RecordReader,RecordReader其實就是一個加強版的迭代器,只不過返回的是kv格式的數據 * 可以看到,這里只有一個InputSplit,也就是只有一個分區,也就是說是分區內部的迭代 */ public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
這樣大概就理解了這個接口的定位,一個是how to defined partition,一個是how to get data from partition,下面再實例化到spark的應用場景。
TableInputFormat
Spark篇
通過spark的mapreduce接口取hbase數據一定會用到下面的代碼
//hbaseConfig HBaseConfiguration //TableInputFormat InputFormat的子類 表示輸入數據源 //ImmutableBytesWritable 數據源的key //Result 數據源的value //如果寫過mapreduce任務,這個方法和mapreduce的啟動配置類似,只不過輸出都是rdd,所以就不用聲明了 val hBaseRDD = sc.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
那這個是怎么個流程呢
首先,SparkContext會創建一個RDD
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
然后就over了…
這其實是spark的調度機制,只有遇到action操作的時候才會真正提交一個job,這里就不詳述了。跳過這一段,直接看NewHadoopRDD中的方法,最關鍵的兩個方法,compute()和getPartitions(),是和InputFormat的兩個方法一一對應的。
·getPartitions()
override def getPartitions: Array[Partition] = { //實例化InputFormat對象 也就是我們傳入的TableInputFormat(可能是其它InputFormat,這里只是舉個例子) val inputFormat = inputFormatClass.newInstance inputFormat match { case configurable: Configurable => configurable.setConf(_conf) case _ => } val jobContext = new JobContextImpl(_conf, jobId) //拿到所有split val rawSplits = inputFormat.getSplits(jobContext).toArray //拿到總分區數,並轉換為spark的套路 val result = new Array[Partition](rawSplits.size) for (i <- 0 until rawSplits.size) { //把每個split封裝成partition result(i) = new NewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } result }
·compute()
由於代碼太多會引起不適,貼一點關鍵代碼
//一樣的,實例化InputFormat對象 private val format = inputFormatClass.newInstance format match { case configurable: Configurable => configurable.setConf(conf) case _ => } //滿足mapreduce的一切要求... private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) private var finished = false private var reader = try { //拿到關鍵的RecordReader val _reader = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) _reader } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true null } //喜聞樂見的hasNext和next override def hasNext: Boolean = { if (!finished && !havePair) { try { finished = !reader.nextKeyValue } catch { case e: IOException if ignoreCorruptFiles => logWarning( s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}", e) finished = true } if (finished) { // Close and release the reader here; close() will also be called when the task // completes, but for tasks that read from many files, it helps to release the // resources early. close() } havePair = !finished } !finished } override def next(): (K, V) = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } havePair = false if (!finished) { inputMetrics.incRecordsRead(1) } if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) { updateBytesRead() } (reader.getCurrentKey, reader.getCurrentValue) }
省略了無數代碼,大概就是把RecordReader封裝成Iterator(這坑爹的mapreduce不能直接拿Iterator作接口嗎)
Spark做的大概就是這樣事情,剩下的是hbase做的
Hbase篇
TableInputFormat是hbase提供的接口,用來兼容mapreduce,沒想到被spark這個濃眉大眼的截去了。
直奔主題找TableInputFormat的關鍵代碼
·getSplits()
RegionSizeCalculator sizeCalculator =
new RegionSizeCalculator(getRegionLocator(), getAdmin());
TableName tableName = getTable().getName();
Pair<byte[][], byte[][]> keys = getStartEndKeys();
if (keys == null || keys.getFirst() == null ||
keys.getFirst().length == 0) {
HRegionLocation regLoc =
getRegionLocator().getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false);
if (null == regLoc) {
throw new IOException("Expecting at least one region.");
}
List<InputSplit> splits = new ArrayList<>(1);
//拿到region的數量,用來做為partitin的數量
long regionSize = sizeCalculator.getRegionSize(regLoc.getRegionInfo().getRegionName());
//創建TableSplit,也就是InputSplit
TableSplit split = new TableSplit(tableName, scan,
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
.getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0], regionSize);
splits.add(split);
·createRecordReader()
final TableRecordReader trr = this.tableRecordReader != null ? this.tableRecordReader : new TableRecordReader(); Scan sc = new Scan(this.scan); sc.setStartRow(tSplit.getStartRow()); sc.setStopRow(tSplit.getEndRow()); trr.setScan(sc); trr.setTable(getTable()); return new RecordReader<ImmutableBytesWritable, Result>() { @Override public void close() throws IOException { trr.close(); closeTable(); } @Override public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException { return trr.getCurrentKey(); } @Override public Result getCurrentValue() throws IOException, InterruptedException { return trr.getCurrentValue(); } @Override public float getProgress() throws IOException, InterruptedException { return trr.getProgress(); } @Overrid public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException { trr.initialize(inputsplit, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return trr.nextKeyValue(); } };
這個應該挺清楚的,花式創建RecordReader..
總結
Spark為了兼容mapreduce,給出了類似hadoopRDD()的接口,hbase為了兼容mapreduce,給出了TableInputFormat之類的接口。從而使得spark可以通過hbase獲取數據,當然方法不只這一種。
