基於Solr的多表join查詢加速方法


前言

DT時代對平台或商家來說最有價值的就是數據了,在大數據時代數據呈現出數據量大,數據的維度多的特點,用戶會使用多維度隨意組合條件快速召回數據。數據處理業務場景需要實時性,需要能夠快速精准的獲得到需要的數據。之前的通過數據庫的方式來處理數據的方式,由於數據庫的某些固有特性已經很難滿足大數據時代對數據處理的需求。

 

所以,在大數據時代使用hadoop,hive,spark,作為處理離線大數據的補充手段已經大行其道。 以上提到的這些數據處理手段,只能離線數據處理方式,無法實現實時性。Solr作為補充,能夠很好地解決大數據的多維度查詢和數據召回實時性要求。

 

本文通過分析阿里淘寶聚石塔環境中遇到的一個具體需求是如何實現的,通過這個例子,拋磚引玉來體現SORL在數據處理上的優勢。

 

需求說明

阿里聚石塔是銜接淘寶大賣家,軟件開發者和平台提供者這三者的生態圈,阿里通過聚石塔平台,將阿里雲底層的PAAS,IAAS環境提供給第三方開發者,而第三方開發者可以通過自己開發的軟件產品。

 

賣家的交易數據是最有價值的數據,通過交易數據可以衍生出很多產品,例如管理交易的ERP軟件,會員營銷工具CRM,在聚石塔環境中通過大賣家授權,這部分數據可以授權給獨立軟件開發者ISV。

在CRM系統中需要能夠通過設置買家的行為屬性快速過濾出有價值的買家記錄,進行精准會員營銷。 以下是兩個具體需求,首先看兩個線框圖:


以上是賣家需要實時篩選一段時間內購買數量在一個區間之內的買家。

 

再看一個線框圖:

賣家需要實時搜索一個時間段內,消費金額在某個區間之內的買家會員。這里的區間是以天為單位的,時間跨度可長可短。

 

了解了線框圖之后,我們還要再看看對應的數據庫ER圖:

 

表結構相當簡單,只有兩張表,稍微有點經驗的開發工程師就會寫出以下SQL:

Sql代碼   收藏代碼
  1. select  buyer.buyer_id , count(trade.trade_id) as pay_count  
  2. From buyer   
  3. inner join trade on(  
  4. buyer.buyer_id = trade.buyer_id and buyer.seller_id = trade.seller_id)  
  5. where trade.trade_time> ? and trade.trade_time < ? and buyer.seller_id=?  
  6. group by buyer.buyer_id  
  7. having pay_count > =1 AND pay_count <=5  

第二個線框圖會用以下SQL語句來實現:

Sql代碼   收藏代碼
  1. select  buyer.buyer_id , sum(trade.fee) as pay_sum  
  2. From buyer   
  3. inner join trade on(  
  4. buyer.buyer_id = trade.buyer_id and buyer.seller_id = trade.seller_id)  
  5. where trade.trade_time> ? AND trade.trade_time < ? and buyer.seller_id=?  
  6. group by buyer.buyer_id  
  7. having pay_sum > =20 and pay_sum <=100  

以上,兩個SQL語句大同小異,having部分稍有不同, SQL語句並不算復雜,但是在大數據情況下,無法在毫秒級反饋給用戶。另外,假如where部分有其他查詢條件,比如,買家的性別,買家所屬的地區等,就需要數據庫上設置更多的聯合索引,所以這個需求使用SQL語句根本無法實現的。

 

查詢加速

問題已經明確,那么解決的辦法是什么呢?是使用數據的存儲過程?存儲過程底層還是依賴數據庫表的固有特性,無非是提供一些以時間換空間的策略來實現罷了,換湯不換葯,而且各個數據庫產品的存儲過程實現很很大差別,一旦選擇了某一個數據的存儲過程之后以后再要遷移數據到其他數據平台上就非常困難了。

 

這里要向大家隆重介紹搜索引擎Solr。因為,搜索引擎在底層使用倒排索引,這和數據庫有本質區別,倒排索引在數據查詢的性能上天生就比數據的Btree樹好上百倍,具體原因不在這里展開了。雖然某些數據庫也支持了倒排索引例如PG,但畢竟不是通用的解決辦法。一旦添加了這類型的索引會影響數據的寫入吞吐量,因為重建索引非常耗時間。

 

開源JAVA社區中使用最廣泛的應該屬Solr了,筆者所在的團隊就是長期研究將Solr應用到企業級應用場景中,在原生Solr之上做了很多優化和適配,方便企業級用戶使用。

言歸正傳,先講講大致思路,實現的架構圖如下:


 
 

全量數據准備

這里要說明的一點,發送到搜索引擎中的數據是一條寬表數據,所謂寬表數據是將ER關系為1對N的實體,聚合成一條記錄。聚合方式有兩種,一種是向1的維度聚合,比如用戶實體和消費記錄實體,寬表記錄如果是以用戶維度來聚合和話,就會將所有的消費記錄以某個特殊字符作為分割符,聚合成一個字段,作為用戶記錄的一個冗余字段。也可以以消費記錄為維度聚合,將關聯的用戶信息作為一個冗余字段,可想而知這樣的聚合方式用戶數據在索引數據中會有很多重復。

 

打寬表這個環節看似和搜索不怎么相關,但是合理的寬表數據結構能大幅度地提高用戶數據查詢效率。

 

全量流程用Hive來實現的,如果是在阿里雲公有雲環境中可以用ODPS,因ODPS是PAAS服務。

 

增量通道,需要寫一個打寬表操作。因為搜索引擎特有的結構,增量同步更新持續一段時間之后會生成很多索引碎片,所以必須要隔一段時間從數據源重新導出並構建一次全量索引數據。

這里介紹一下上面提交到用戶-消費記錄的寬表結構(簡單起見,去掉了表中和問題域不相關的字段):

 

Buyer表:

買家id 賣家id
Buyer_id Seller_id

 

Trade表:

買家id 賣家id 交易id 交易時間 單筆費用
Buyer_id Seller_id trade_id trade_time Fee

 

聚合寬表結構:

買家id 賣家id dynamic_info(聚合字段)
Buyer_id Seller_id sellerId_date_buyerId_payment_payCount[;sellerId_date_buyerId_payment_payCount]

 

這里需要對dynamic_info 聚合字段詳細說明一下:

sellerId_date_buyerId_payment_payCount 這是一個聚合單元,從左向右依次的含義是:賣家ID,購買的日期(精確到天),買家ID,購買天之內的費用總和,購買天之內的購買次數總和。

 

Dynamic_info字段可以有多個聚合單元組成,每個單元中的date是按天去重的,假如一個用戶在某一天在一家店中有多條購買記錄最終也會聚合成一個單元。給一個聚合字段的實際示例:

 

Sql代碼   收藏代碼
  1. Dynamic_info:9999_20151111_222_345.6_3;9999_20151212_222_627.5_1  

 

這個字段的意義就是,一個id為222的用戶在2015年雙11當天購買了3筆價值345元的商品,在雙12當天在這個商家處又購買了一筆價值627.5元的商品。

 

之所以在Solr上進行快速數據查詢的原因是,Solr的數據源是一個已經聚合好的一份數據,數據庫上執行的join操作會耗費大量IO,在Solr查詢省去了這部分時間。

 

寬表數據從多個分表聚合,數據的語義沒有變化,只是組織形式發生了變化,如果一個SAAS的服務提供上同時為十幾萬個大賣家提供篩選服務,而每個大賣家又積累的交易數據是非常大的,全部加在一起,要將數據進行聚合化操作,有非常大的CPU和IO開銷,好在在雲服務時代有強大的離線計算工具如hadoop,ODPS可以將大數據如同肉面粉一般揉(處理)成任何你想要的結構,分分鍾不在話下。

 

Solr引擎端數據處理

准備好全量源數據,之后就是將其轉化為Lucene的索引文件了,這個過程請查閱Solr Wiki便可,這里不進行闡述。這里要重點描述的是Solr服務端如何響應用戶的查詢請求,返回給用戶需要的查詢結果。

 

處理用戶在時間段內購買量或購買額度進行過濾,需要構建一個QParser的插件,這個插件的作用是遍歷和查參數中匹配的條件項生成命中的DocSet命中結果集。

 

Qparser代碼實現

 

下面是QparserPlugin.java節選:

 

Java代碼   收藏代碼
  1. for (LeafReaderContext leaf : readerContext.leaves()) {  
  2.                 docBase = leaf.docBase;  
  3.                 reader = leaf.reader();  
  4.                 liveDocs = reader.getLiveDocs();  
  5.                 terms = reader.terms("dynamic_info");  
  6.                 termEnum = terms.iterator();  
  7.                 String prefixStart = sellerId + "_" + startTime;  
  8.                 String prefixEnd = sellerId + "_" + endTime;  
  9.                 String termStr = null;  
  10.                 int docid = -1;  
  11.                 if ((termEnum.seekCeil(new BytesRef(prefixStart))) != SeekStatus.END) {  
  12.   
  13.                     do {  
  14.                         Matcher matcher = DYNAMIC_INFO  
  15.                                 .matcher(termStr = termEnum.term()  
  16.                                         .utf8ToString());  
  17.   
  18.                         if (!matcher.matches()) {  
  19.                             continue;  
  20.                         }  
  21.   
  22.                         posting = termEnum.postings(posting);  
  23.                         docid = posting.nextDoc();  
  24.   
  25. if (!(docid != PostingsEnum.NO_MORE_DOCS  
  26.     && (liveDocs == null || (liveDocs != null && liveDocs.get(docid))))) {  
  27.         continue;  
  28. }  
  29.   
  30.                         if ((matcher.group(1) + "_" + matcher.group(2))  
  31.                                 .compareTo(prefixEnd) > 0) {  
  32.                             break;  
  33.                         }  
  34.   
  35.                         addStatis(buyerStatis, docBase, docid, matcher);  
  36.   
  37.                     } while (termEnum.next() != null);  
  38.                 }  
  39.   
  40.             }  

 

以上代碼的執行邏輯是,截取prefixStart和prefixEnd之間的term序列,進行分析如果符合過濾條件就將對應docid插入buyerStatis收集器中。

 

等第一輪數據處理過程中就在對聚合結果進行增量累加,代碼如下:

Java代碼   收藏代碼
  1. private static StaticReduce addStatis(  
  2.             Map<Integer, StaticReduce> buyerStatis, int docBase, int docid,  
  3.             Matcher matcher) {  
  4.         StaticReduce statis = buyerStatis.get(docBase + docid);  
  5.         if (statis == null) {  
  6.             statis = new StaticReduce(docBase + docid, Long.parseLong(matcher  
  7.                     .group(3))/* buyerid */);  
  8.             buyerStatis.put(docBase + docid, statis);  
  9.         }  
  10.   
  11.         if (statis.buyerId != Long.parseLong(matcher.group(3))) {  
  12.             return statis;  
  13.         }  
  14.   
  15.         try {  
  16.             statis.addPayCount(Integer.parseInt(matcher.group(5)));  
  17.         } catch (Exception e) {  
  18.   
  19.         }  
  20.         try {  
  21.             statis.addPayment(Float.parseFloat(matcher.group(4)));  
  22.         } catch (Exception e) {  
  23.   
  24.         }  
  25.         return statis;  
  26.     }  

同時對購買數量,和購買金額進行累加。

 

最后對累加結果進行過濾,符合過濾條件的,將docid插入到bitset中:

Java代碼   收藏代碼
  1. for (StaticReduce statis : buyerStatis.values()) {  
  2.                 // TODO 這里自己判斷是否要收集這條記錄  
  3.         if (statis.payCount > Integer.MAX_VALUE  
  4.                 || statis.paymentSum > 1) {  
  5.             System.out.println("count:" + statis.payCount + ",sum:"  
  6.                         + statis.paymentSum);  
  7.                     bitSet.set(statis.luceneDocId);  
  8.         }  
  9.   
  10. }  
  11.                         BitDocIdSet docIdSet = new BitDocIdSet(bitSet);  
  12.             return new QParser(qstr, localParams, params, req) {  
  13.                 @Override  
  14.                 public Query parse() throws SyntaxError {  
  15.   
  16.                     return bitquery;  
  17.                 }  
  18.             };  

最后將bitSet包裝成BitQuery作為Qparser的parse函數的返回值,返回有solr進一步和其他結果集進行過濾。

 

Solrconfig配置實現

 

需要將以上的QparserPlugin插件注入到solr中,需要在solrconfig中寫以下配置:

Xml代碼   收藏代碼
  1. <queryParser name="timesegstats" class="com.xxx.qp.TimeSegStatsQParserPlugin" >                                    
  2.     <str name="buyerField">buyer_id</str>                                                                                                                
  3.     <str name="compoundField">dynamic_info </str>                                                                            
  4.     <str name="countField">emailSendCount</str>                                                                                                
  5.     <str name="statsFields"></str>                                                                                                                    
  6. </queryParser>   

 

 

Solr查詢語句Q參數設置

 

Sql代碼   收藏代碼
  1. q={!multiqp q.op=AND}seller_id:1441097932588   
  2. AND {!timesegstats sellerId=1441097932588 statsField=buyActivity startTime=20150901 endTime=20150924 startValue=1 endValue=200}  
  3. AND {!timesegstats sellerId=1441097932588 statsField=paycount startTime=20140901 endTime=20150924 startValue=2 endValue=100}    

 

總結

以上是一個用Solr搜索引擎解決數據庫查詢瓶頸的實例,其實搜索引擎的使用場景非常廣泛,不僅可以用在像百度這樣的大規模非結構化的數據查詢,可以定制比較復雜的排序規則。Solr更可以解決像本文講到的數據庫加速的場景,使得原本在數據庫上沒有無法實現的SQL查詢,可以通過Solr搜索引擎上輕松實現。

 

本文講到的需求,也可以使用像hive這樣的離線處理工具來實現,每次處理完成后將結果再導入到mysql中,業務端通過讀取數據庫表中的數據來向用戶展示處理結果。這樣做雖然可行,但是,沒有辦法將處理結果的實時性沒有辦法保證,而且,離線處理結果的數據結構是固化的,沒有辦法做到將處理結果靈活調整。而用Solr做到數據的查詢出口,可以很好地解決以上兩個問題。

 ==================================================================================================================================

有兄弟在留言中提了幾個問題,比較專業,我在這里詳細地回答一下:

1你們用的solr是cloud模式嗎?
是的

2數據量可以有多大?
理論上能找到一個數據分組鍵的話,那可以在一個collection上分出多個share,為了以后擴容方便,分組數目最好是按照2的mie次來分。我之前的公司里有一個100億+數據量的,
現在做的一個索引,分了四組, 每組兩千萬數據,用的是兩核四G的機器,100Gssd磁盤

3 全量的時候,索引會被清空嗎?
 是的全部清空,因為索引的更新或更新操作會對索引文件做標記刪除,這樣歷史文件會越來越大,而且更新過程中會產生子segment,雖然,solr會啟動一個后台線程不斷地去mege子索引,但子索引合並過程中需要大量io和cpu開銷,會影響在線查詢的RT時間,所以必須定期做一次全量構建。
 為了不影響在線索引查詢的性能,構建全量不是在core節點上完成的,是有專門有一個索引build機器,待索引構建完成之后,再將索引文件拷貝會solr core節點上,做一次reload操作,瞬間替換。

4 如果清空,某一時刻用戶搜索啥也搜不到,怎么解決的?
 因為全量構建是需要一定時間,正如你說的,如果這個構建周期比較長的話,當新的全量生效之后,在這個索引構建周期之內的增量數據會丟失掉。為了做到全量構建過程中的增量索引數據不丟失,做到無縫切換,需要對solr的tlog機制作一下改造,適當增長tlog在磁盤中的保存時間,等到reload之后,就以數據庫開始dump的時間作為啟始時間,重新replay一下tlog日志,就能做到不丟數據了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM