TableInputFormat分片及分片數據讀取源碼級分析


  我們在MapReduce中TextInputFormat分片和讀取分片數據源碼級分析 這篇中以TextInputFormat為例講解了InputFormat的分片過程以及RecordReader讀取分片數據的過程。接下來咱們分析TableInputFormat的分片信息和數據讀取過程。

  TableInputFormat這是專門處理基於HBase的MapReduce的輸入數據的格式類。我們可以看看繼承結構:(1)public class TableInputFormat extends TableInputFormatBase implements Configurable;(2)public abstract class TableInputFormatBase extends InputFormat<ImmutableBytesWritable, Result>。其中InputFormat是輸入格式的基類。

  TableInputFormat類主要是構造HTable對象和Scan對象,主要在方法setConf(Configuration configuration)構造,代碼如下:

 1 /**
 2    * Sets the configuration. This is used to set the details for the table to
 3    * be scanned.
 4    *
 5    * @param configuration  The configuration to set.
 6    * @see org.apache.hadoop.conf.Configurable#setConf(
 7    *   org.apache.hadoop.conf.Configuration)
 8    */
 9   @Override
10   public void setConf(Configuration configuration) {
11     this.conf = configuration;
12     String tableName = conf.get(INPUT_TABLE);
13     try {
14       setHTable(new HTable(new Configuration(conf), tableName));
15     } catch (Exception e) {
16       LOG.error(StringUtils.stringifyException(e));
17     }
18 
19     Scan scan = null;
20 
21     if (conf.get(SCAN) != null) {
22       try {
23         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
24       } catch (IOException e) {
25         LOG.error("An error occurred.", e);
26       }
27     } else {
28       try {
29         scan = new Scan();
30 
31         if (conf.get(SCAN_ROW_START) != null) {
32           scan.setStartRow(Bytes.toBytes(conf.get(SCAN_ROW_START)));
33         }
34 
35         if (conf.get(SCAN_ROW_STOP) != null) {
36           scan.setStopRow(Bytes.toBytes(conf.get(SCAN_ROW_STOP)));
37         }
38 
39         if (conf.get(SCAN_COLUMNS) != null) {
40           addColumns(scan, conf.get(SCAN_COLUMNS));
41         }
42 
43         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
44           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
45         }
46 
47         if (conf.get(SCAN_TIMESTAMP) != null) {
48           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
49         }
50 
51         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
52           scan.setTimeRange(
53               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
54               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
55         }
56 
57         if (conf.get(SCAN_MAXVERSIONS) != null) {
58           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
59         }
60 
61         if (conf.get(SCAN_CACHEDROWS) != null) {
62           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
63         }
64 
65         // false by default, full table scans generate too much BC churn
66         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
67       } catch (Exception e) {
68           LOG.error(StringUtils.stringifyException(e));
69       }
70     }
71 
72     setScan(scan);
73   }
View Code

  首先會通過配置信息獲取HBase表名,然后構造一個HTable對象;如果用戶自己的作業中有配置Scan的話,就會解碼Scan字符串轉換為Scan對象;如果用戶沒配置Scan就會創建一個默認的Scan,進行一些基本配置。

  關於TableInputFormatBase,我們重點還是講兩個方法:RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, TaskAttemptContext context)方法和List<InputSplit> getSplits(JobContext context)方法,前者是讀取分片信息所指的數據供TableMapper處理,后者是構造HBase表的分片信息。這里的分片信息是TableSplit extends InputSplit implements Writable, Comparable,這個TableSpli維護4個字段,HBase表名:byte [] tableName、scan起始rowkey:byte [] startRow、scan結束rowkey:byte [] endRow、以及該region所在節點:String regionLocation。

  一、先看getSplits方法吧,這里一個split一般對應一個完整region,除非用戶設定的開始和結束rowkey不是region的邊界,代碼如下:

 1   /**
 2    * Calculates the splits that will serve as input for the map tasks. The
 3    * number of splits matches the number of regions in a table.
 4    *
 5    * @param context  The current job context.
 6    * @return The list of input splits.
 7    * @throws IOException When creating the list of splits fails.
 8    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
 9    *   org.apache.hadoop.mapreduce.JobContext)
10    */
11   @Override
12   public List<InputSplit> getSplits(JobContext context) throws IOException {
13     if (table == null) {
14         throw new IOException("No table was provided.");
15     }
16     // Get the name server address and the default value is null.
17     this.nameServer =
18       context.getConfiguration().get("hbase.nameserver.address", null);
19     
20     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();//獲取所有Region的開始rowkey和結束rowkey
21     if (keys == null || keys.getFirst() == null ||
22         keys.getFirst().length == 0) {    //
23         //table的第一個region的startKey必須是EMPTY_BYTE_ARRAY,否則輸出FIRST_REGION_STARTKEY_NOT_EMPTY信息 
24       HRegionLocation regLoc = table.getRegionLocation(
25           HConstants.EMPTY_BYTE_ARRAY, false);
26       if (null == regLoc) {    //一個region也沒有
27         throw new IOException("Expecting at least one region.");
28       }
29       List<InputSplit> splits = new ArrayList<InputSplit>(1);
30       //構造一個TableSplit,起始rowkey和結束rowkey都是EMPTY_BYTE_ARRAY
31       InputSplit split = new TableSplit(table.getTableName(),
32           HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc
33               .getHostnamePort().split(Addressing.HOSTNAME_PORT_SEPARATOR)[0]);
34       splits.add(split);
35       return splits;        //返回分片信息
36     }
37     List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
38     for (int i = 0; i < keys.getFirst().length; i++) {    //有多個region
39       if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {    //這個方法一直返回true
40         continue;
41       }
42       HServerAddress regionServerAddress = 
43         table.getRegionLocation(keys.getFirst()[i]).getServerAddress();
44       InetAddress regionAddress =
45         regionServerAddress.getInetSocketAddress().getAddress();//獲取region所在地址
46       String regionLocation;
47       try {
48         regionLocation = reverseDNS(regionAddress);//將地址轉換為字符串的主機名
49       } catch (NamingException e) {
50         LOG.error("Cannot resolve the host name for " + regionAddress +
51             " because of " + e);
52         regionLocation = regionServerAddress.getHostname();
53       }
54 
55             byte[] startRow = scan.getStartRow();    //獲取scan的開始rowkey
56             byte[] stopRow = scan.getStopRow();    //獲取scan的結束rowkey
57             // determine if the given start an stop key fall into the region
58             //比較用戶設定的rowkey范圍在那些region之中
59       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
60                      Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
61           (stopRow.length == 0 ||
62            Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
63         byte[] splitStart = startRow.length == 0 ||
64           Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
65             keys.getFirst()[i] : startRow;
66         byte[] splitStop = (stopRow.length == 0 ||
67           Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
68           keys.getSecond()[i].length > 0 ?
69             keys.getSecond()[i] : stopRow;
70         InputSplit split = new TableSplit(table.getTableName(),
71           splitStart, splitStop, regionLocation);    //構造TableSplit
72         splits.add(split);
73         if (LOG.isDebugEnabled()) {
74           LOG.debug("getSplits: split -> " + i + " -> " + split);
75         }
76       }
77     }
78     return splits;
79   }
View Code

   Pair這個結構由兩部分組成:start和end,都是byte[][],這是一個二維byte數組,其中一維是region的順序,二維是start或者end的字符序列。通過getFirst方法就可以獲取所有region的start rowkey,通過getSecond可以獲得所有的end rowkey。

  這里還有一個要注意的就是每個HBase表的第一個region是沒有start rowkey,最后一個region是沒有end  rowkey,這里的“沒有”是指的是table.getStartEndKeys()這個方法獲取的結果,另外大伙也可以從WEB UI中查看指定的HBase表的region信息也可以看到第一個region的start和最后一個region的end並沒有顯示。還有就是如果這個HBase表只有一個region的話,getFirst方法返回是沒有數據的;getSecond也沒數據。

  (1)、getSplits方法中的第一個if語句段是上面說的只有一個region的情況。每個HBase表的第一個region的start rowkey都是EMPTY_BYTE_ARRAY,這是一個長度為0的byte數組。table.getRegionLocation方法會找指定的rowkey所在的region所在信息HRegionLocation。如果只有一個region的話,就先找EMPTY_BYTE_ARRAY所在region信息,如果沒有這樣的信息就是出現了錯誤;如果如果有這樣的信息的話,就構建一個長度為1的InputSplit列表splits,構造一個TableSplit:設定HBase表名,scan起始和結束rowkey都是EMPTY_BYTE_ARRAY,再加上region所在節點,把這個TableSplit加入splits,返回這個splits。

  (2)、接下來就是HBase表有多個region的情況,構建長度為keys.getFirst().length的InputSplit列表,然后遍歷keys.getFirst()獲取每個region的位置信息並將其轉換成String類型(reverseDNS方法從reverseDNSCacheMap中獲取,reverseDNSCacheMap是存儲IPAddress => HostName的映射);然后獲取用戶設定的起始和結束rowkey,並和當前的region的起始和比較結束,如果有rowkey包含這個region就會將這region當做一個InputSplit放入列表中,最后待遍歷完之后返回split列表。判斷當前region是否應該加入InputSplit列表的條件就是循環中的最后一個if語句段,條件是:((startRow.length == 0 || keys.getSecond()[i].length == 0 ||Bytes.compareTo (startRow, keys.getSecond()[i]) < 0) && (stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)),分解這個條件為兩部分:A、(startRow.length == 0 || keys.getSecond()[i].length == 0 ||Bytes.compareTo (startRow, keys.getSecond()[i]) < 0)這個是要確定用戶設定的start rowkey是否小於當前region的結束rowkey;B、(stopRow.length == 0 || Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)這個是要確定用戶設定的end rowkey是否大於當前region的開始rowkey,這倆條件必須同時滿足才可以,一旦滿足就要確定這個TableSplit的開始rowkey和結束rowkey了:A、startRow.length == 0 || Bytes.compareTo(keys.getFirst()[i], startRow) >= 0如果為true的話,說明用戶設定的起始rowkey可能是從表的開頭開始或者是當前region的起始rowkey大於用戶設定的則應該將當前region的起始rowkey作為TableSplit的起始rowkey,如果表達式為false的話將用戶設定的起始rowkey作為TableSplit的起始rowkey;B、(stopRow.length == 0 || Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) && keys.getSecond()[i].length > 0如果為true的話,說明首先要確定是否設定的結束rowkey或者當前region的結束rowkey小於用戶設定的結束rowkey,且要保證當前region不是最后一個(keys.getSecond()[i].length > 0),這樣的的TableSplit的結束rowkey就是當前region的結束rowkey,如果為false則將用戶設定的結束rowkey為TableSplit的結束rowkey,為什么要不是最后一個region呢?因為最后一個region的end rowkey的長度始終為0,比較之下會將最后一個region的end rowkey設置給TableSplit,顯然這是不對,能到這里說明這個region應該被分配給一個TableSplit,如果是最后一個region的話,那么這個TableSplit的結束rowkey應該是用戶設定的而非這個region自己的。獲得這個tableSplit的開始和結束rowkey之后就可以封裝這個TableSplit了,並放入InputSplit列表。最終待所有的region遍歷結束之后返回這個InputSplit列表。

  這樣getSplits方法就結束了。

  二、createRecordReader方法,這個方法代碼如下:

 1 /**
 2    * Builds a TableRecordReader. If no TableRecordReader was provided, uses
 3    * the default.
 4    *
 5    * @param split  The split to work with.
 6    * @param context  The current context.
 7    * @return The newly created record reader.
 8    * @throws IOException When creating the reader fails.
 9    * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
10    *   org.apache.hadoop.mapreduce.InputSplit,
11    *   org.apache.hadoop.mapreduce.TaskAttemptContext)
12    */
13   @Override
14   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
15       InputSplit split, TaskAttemptContext context)
16   throws IOException {
17     if (table == null) {
18       throw new IOException("Cannot create a record reader because of a" +
19           " previous error. Please look at the previous logs lines from" +
20           " the task's full log for more details.");
21     }
22     TableSplit tSplit = (TableSplit) split;
23     TableRecordReader trr = this.tableRecordReader;
24     // if no table record reader was provided use default
25     if (trr == null) {
26       trr = new TableRecordReader();
27     }
28     Scan sc = new Scan(this.scan);
29     sc.setStartRow(tSplit.getStartRow());
30     sc.setStopRow(tSplit.getEndRow());
31     trr.setScan(sc);
32     trr.setHTable(table);
33     try {
34       trr.initialize(tSplit, context);
35     } catch (InterruptedException e) {
36       throw new InterruptedIOException(e.getMessage());
37     }
38     return trr;
39   }
View Code

  這個方法主要是獲取TableSplit,然后構造一個Scan,設定開始和結束rowkey;設定HTablePool;將Scan和HTable傳遞給一個TableRecordReader對象,然后調用initialize(tSplit, context)初始化,最后返回這個TableRecordReader。可以看出TableRecordReader這個是讀取key/value的。TableRecordReader中實際操作數據的是TableRecordReaderImpl,TableRecordReader的nextKeyValue()、getCurrentValue()、initialize、getCurrentKey()方法會調用TableRecordReaderImpl的相應方法。

  TableRecordReaderImpl的initialize方法主要是重新創建一個新的Scan,並將createRecordReader傳過來的賦值給這個新的currentScan,並獲取對應的ResultScanner。

  TableRecordReaderImpl的nextKeyValue()會先創建一個key = new ImmutableBytesWritable()和value = new Result(),這就是我們繼承TableMapper中map方法中的參數類型,然后每次調用該方法通過執行value = this.scanner.next()方法來獲取HBase中的一行數據賦值給value,這里表明如果scanner.next()運行無異常的話key中是沒有數據的(出現異常之后會存儲value對應行的rowkey),只有value有數據。執行了這個方法就可以通過getCurrentValue()、getCurrentKey()方法來獲取value和key了。

 

  上面這些講解了在HBase上使用MapReduce時的分片過程及如何讀取這些分片上的數據的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM