Lily HBase Indexer同步HBase二級索引到Solr丟失數據的問題分析



一、問題描述

部分業務需要使用HBase的數據進行多維度分析,我們采用了將部分數據同步到Solr,通過Solr進行多維度查詢返回對應的Rowkey,再從HBase批量獲取數據。因此我們使用了一個比較成熟的方案Lily HBase Indexer來同步二級索引到Solr。但是使用的時候出現了Solr丟失數據的問題。基本上每天Solr都會比HBase少幾千條數據。

二、分析步驟

由於我們使用的是CDH集群,下面所有操作都是基於該環境

2.1 查看日志

到每個節點的/var/log/hbase-solr/var/log/solr查看了日志,都沒發現寫入失敗的記錄

2.2 修改Solr的硬提交

由於日志沒有發現錯誤,猜測是Solr的數據在緩存中沒提交上去。
在solr的collecttion目錄下的conf/solrconfig.xml文件,將Solr的硬提交激活,操作如下

  1. <autoCommit>
  2. <maxTime>${solr.autoCommit.maxTime:60000}</maxTime>
  3. <openSearcher>true</openSearcher>
  4. </autoCommit>

然后保存配置,將修改update 到Solr集群。然后測試仍舊出現上述問題

2.3 尋求StackOverFlow幫助

目前是沒看到問題出在哪里了,因此只能去網上搜索一下具體原因了。網上有這么兩個帖子
hbase-indexer solr numFound different from hbase table rows size

HBase Indexer導致Solr與HBase數據不一致問題解決

他們都提到了修改morphline-hbase-mapper.xml,添加read-row
如下:

重新刷新hbase-indexer配置

這次發現數目對了,但是字段缺了

2.4 修改了read-row="never"后,丟失部分字段

由於設置了read-row之后數據不會再次從HBase中獲取,因此只會讀取WAL。假如修改了部分字段,HBaseIndexer就會提交相應的字段上去。例如
HBase中有name和age兩個字段

  1. put 'HBase_Indexer_Test','001','cf1:name','xiaoming'
  2. put 'HBase_Indexer_Test','002','cf1:name','xiaohua'

此時的數據為

然后執行

  1. put 'HBase_Indexer_Test','001','cf1:age','12'

最后只能看到

說明這種模式只從WAL獲取數據,並且將獲取的數據覆蓋到了Solr里面。

那么這樣看來只能修改HBase indexer的代碼了

2.5 修改代碼

Lily HBase Indexer的代碼是托管在github 上的,如果是單獨安裝的請直接訪問NGDATA的這個工程:http://ngdata.github.io/hbase-indexer/
如果是使用的CDH版本,請訪問:https://github.com/cloudera/hbase-indexer

我這里使用CDH 5.7.0版本進行測試。在releases選項中可以找到對應版本號的包,下載解壓之后可以看到一個Maven工程。可以看到它包含如下模塊

./hbase-indexer-engine/src/main/java/com/ngdata/hbaseindexer/indexer/Indexer.java文件中有一個calculateIndexUpdates方法,其中有如下代碼:

  1. Result result = rowData.toResult();
  2. if(conf.getRowReadMode()==RowReadMode.DYNAMIC){
  3. if(!mapper.containsRequiredData(result)){
  4. result = readRow(rowData);
  5. }
  6. }
  7. boolean rowDeleted = result.isEmpty();
  1. privateResult readRow(RowData rowData)throwsIOException{
  2. TimerContext timerContext = rowReadTimer.time();
  3. try{
  4. HTableInterface table = tablePool.getTable(rowData.getTable());
  5. try{
  6. Getget= mapper.getGet(rowData.getRow());
  7. return table.get(get);
  8. }finally{
  9. table.close();
  10. }
  11. }finally{
  12. timerContext.stop();
  13. }
  14. }

從代碼中可以看出其執行的流程圖如下:

假如我們使用默認的Dynamic模式寫入了大量的數據,那么意味着有部分數據會在WAL生成后一段時間內無法“落地”,那么就可能出現下面的情況:

  • HBase RegionServer 將Put操作先寫WAL (這個時候Put還沒保存到Region);
  • 異步處理的HBase Indexer獲取到這個WAL日志,對數據進行處理,進入了我們上面說的這段條件邏輯代碼,恰巧Result里面沒有一部分Solr索引列,那么需要調用readRow方法從HBase重新讀取數據,這個時候調用HTable.get(Get) 並沒有獲取到數據(Result.isEmpty()為真);
  • HBase RegionServer把Put保存到Region ;
  • 那么對於第二個步驟里面的HBase Indexer,那條記錄將被當成delelet操作,所以在后面的邏輯將其當成solr delete document的操作。所以在Solr中才會出現部分數據丟失和數值不對。

知道了問題在哪里之后,我們嘗試修改他的源碼。由於HBase將預寫日志的內容寫到HBase region中會有一定的滯后性,因此我們可以認為預寫日志中的內容總是最新的數據。假設我們有一條rowkey =001的數據如下:

列名
Rowkey 001
cf1:A a
cf1:B b
cf1:C c

我們將C的值改成D。由於夾雜在很多條數據中,可能日志中拿到了C = 'd',但是HBase中仍舊是'c',我們需要將HBase的數據拿出來,再將預寫日志中的數據覆蓋它,便有了下面的代碼

  1. privateResult readRow(RowData rowData)throwsIOException{
  2. TimerContext timerContext = rowReadTimer.time();
  3. try{
  4. HTableInterface table = tablePool.getTable(rowData.getTable());
  5. try{
  6. Get get = mapper.getGet(rowData.getRow());
  7. return merge(table.get(get), rowData.toResult());
  8. //return table.get(get);
  9. }finally{
  10. table.close();
  11. }
  12. }finally{
  13. timerContext.stop();
  14. }
  15. }
  16. privateResult merge(Result data,Result wal)throwsIOException{
  17. //如果data為空,則直接返回WAL的數據
  18. if(data.isEmpty()){
  19. return wal;
  20. }
  21. /* //如果rowkey不相同,則返回wal的數據
  22. if (!Bytes.toString(data.getRow()).equals(Bytes.toString(wal.getRow()))) {
  23. return wal;
  24. }*/
  25. TreeMap<String,Cell> cellMap =newTreeMap<String,Cell>();
  26. CellScanner dataScanner = data.cellScanner();
  27. CellScanner walScanner = wal.cellScanner();
  28. while(dataScanner.advance()){
  29. Cell cell = dataScanner.current();
  30. String cf =Bytes.toString(CellUtil.cloneFamily(cell));
  31. String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
  32. String key = cf +"->"+ cq;
  33. cellMap.put(key, cell);
  34. }
  35. while(walScanner.advance()){
  36. Cell cell = walScanner.current();
  37. String cf =Bytes.toString(CellUtil.cloneFamily(cell));
  38. String cq =Bytes.toString(CellUtil.cloneQualifier(cell));
  39. String key = cf +"->"+ cq;
  40. cellMap.put(key, cell);
  41. }
  42. ArrayList<Cell> cells =newArrayList<Cell>();
  43. cells.addAll(cellMap.values());
  44. returnResult.create(cells);
  45. }

值得一提的是,HBase返回的result中,列的排序是按照"列族名+列名"的字典排序。比如表中有["cf1:name","cf2:cellphone","cf1:age"] 三個列,那么返回的時候會排列成["cf1:age","cf1:name","cf2:cellphone"]。在創建新的Result對象的時候也必須遵循這樣的規則,因此這里使用了treemap。不要問我為什么,我特么調了一整天才發現這個問題。

2.6 重新打包分發

進入hbase-indexer-engine的工程,執行mvn clean install -DskipTests進行打包,稍等片刻便好了

在target下面有一個hbase-indexer-engine-1.5-cdh5.7.0.jar文件(這里的版本號對應自己的環境),將這個jar文件分發到集群的hbase-indexer的目錄下,CDH版本放在在/opt/cloudera/parcels/CDH/jars/下即可。

然后重啟服務進行測試。

三、結果

數據跑了一天,Solr中對應的條數和HBase的一樣。
因此我們修改的代碼是有效的。

四、思考

上面我們是合並了數據然后全部覆蓋到Solr的,如果HBase存在大量的Update操作,那么勢必每次列數都會和映射到Solr里面的列不一致,因此每次都會從HBase中get一次數據,這樣肯定會影響性能。那么我們能否使用ReadRow.Never模式 + Solr的原子更新的方式來實現呢?


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM