記一次newApiHadoopRdd查詢數據不一致問題


 

現象:

+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
|totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
| 33808| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+

當前表預分區10個

按照當月數據看,當前測試表中總數量是:33798

hbase的總數量也是:33798

神奇的地方:使用sparkSQL對接hbase查詢的數量是:33808

當時的sql語句是:select count(1) from orderData

很神奇,因為通過sql查詢后,總數據多了10條

============================================================

原因:

這里設置了hbase SCAN_BATCHSIZE這個值,會設置scan的batchsize。這個設置的文檔是這樣說的:

Set the maximum number of values to return for each call to next()

之前一直以為這里是設置一次讀取多少行,其實values貌似是讀取多少列,並且開啟了這個值會導致hbase scan時返回一行的部分結果;

於是將這個設置注釋掉,程序即可正常運行

 

進一步的,我們從hbase端代碼看看這個設置。hbase的scan會兩個成員變量:

  • private boolean allowPartialResults = false;
  • private int batch = -1;

allowPartialResult這個很明顯就是會返回部分結果的設置,那么這個batch呢?setBatch()時並不會設置allowPartialResult。但是在Scan的getResultsToAddToCache()函數中,如果batch值大於0,會設置isBatch=true。之后會有這段代碼:

// If the caller has indicated in their scan that they are okay with seeing partial results,
// then simply add all results to the list. Note that since scan batching also returns results
// for a row in pieces we treat batch being set as equivalent to allowing partials. The
// implication of treating batching as equivalent to partial results is that it is possible
// the caller will receive a result back where the number of cells in the result is less than
// the batch size even though it may not be the last group of cells for that row.
    if (allowPartials || isBatchSet) {
      addResultsToList(resultsToAddToCache, resultsFromServer, 0,
          (null == resultsFromServer ? 0 : resultsFromServer.length));
      return resultsToAddToCache;
    }

 

之前錯誤代碼:

TableInputFormat.SCAN_BATCHSIZE
lazy val buildScan = {

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", GlobalConfigUtils.hbaseQuorem)
    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns)
    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey)
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey)
    hbaseConf.set(TableInputFormat.SCAN_BATCHSIZE , "10000")//TODO 此處導致查詢數據不一致
    hbaseConf.set(TableInputFormat.SCAN_CACHEDROWS , "10000")
    hbaseConf.set(TableInputFormat.SHUFFLE_MAPS , "1000")

    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
      hbaseConf,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )

    val rs: RDD[Row] = hbaseRdd.map(tuple => tuple._2).map(result => {

      var values = new ArrayBuffer[Any]()
      hbaseTableFields.foreach { field =>
        values += Resolver.resolve(field, result)
      }
      Row.fromSeq(values.toSeq)
    })
    rs
  }

 

解決:

去掉TableInputFormat.SCAN_BATCHSIZE的設置即可

去掉后的查詢結果:

+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
|totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
| 33798| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+

問題解決~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM