
現象:
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
|totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
| 33808| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
當前表預分區10個
按照當月數據看,當前測試表中總數量是:33798
hbase的總數量也是:33798
神奇的地方:使用sparkSQL對接hbase查詢的數量是:33808
當時的sql語句是:select count(1) from orderData
很神奇,因為通過sql查詢后,總數據多了10條
============================================================
原因:
這里設置了hbase SCAN_BATCHSIZE這個值,會設置scan的batchsize。這個設置的文檔是這樣說的:
Set the maximum number of values to return for each call to next()
之前一直以為這里是設置一次讀取多少行,其實values貌似是讀取多少列,並且開啟了這個值會導致hbase scan時返回一行的部分結果;
於是將這個設置注釋掉,程序即可正常運行
進一步的,我們從hbase端代碼看看這個設置。hbase的scan會兩個成員變量:
private boolean allowPartialResults = false;private int batch = -1;
allowPartialResult這個很明顯就是會返回部分結果的設置,那么這個batch呢?setBatch()時並不會設置allowPartialResult。但是在Scan的getResultsToAddToCache()函數中,如果batch值大於0,會設置isBatch=true。之后會有這段代碼:
// If the caller has indicated in their scan that they are okay with seeing partial results, // then simply add all results to the list. Note that since scan batching also returns results // for a row in pieces we treat batch being set as equivalent to allowing partials. The // implication of treating batching as equivalent to partial results is that it is possible // the caller will receive a result back where the number of cells in the result is less than // the batch size even though it may not be the last group of cells for that row. if (allowPartials || isBatchSet) { addResultsToList(resultsToAddToCache, resultsFromServer, 0, (null == resultsFromServer ? 0 : resultsFromServer.length)); return resultsToAddToCache; }
之前錯誤代碼:
TableInputFormat.SCAN_BATCHSIZE
lazy val buildScan = { val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", GlobalConfigUtils.hbaseQuorem) hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns) hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey) hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey) hbaseConf.set(TableInputFormat.SCAN_BATCHSIZE , "10000")//TODO 此處導致查詢數據不一致 hbaseConf.set(TableInputFormat.SCAN_CACHEDROWS , "10000") hbaseConf.set(TableInputFormat.SHUFFLE_MAPS , "1000") val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD( hbaseConf, classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val rs: RDD[Row] = hbaseRdd.map(tuple => tuple._2).map(result => { var values = new ArrayBuffer[Any]() hbaseTableFields.foreach { field => values += Resolver.resolve(field, result) } Row.fromSeq(values.toSeq) }) rs }
解決:
去掉TableInputFormat.SCAN_BATCHSIZE的設置即可
去掉后的查詢結果:
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
|totalCount|January|February|March|April| May|June|July|August|September|October|November|December|totalMileage|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
| 33798| 0| 0| 0| 0|33798| 0| 0| 0| 0| 0| 0| 0| 79995.0|
+----------+-------+--------+-----+-----+-----+----+----+------+---------+-------+--------+--------+------------+
問題解決~
