原文:http://zhuanlan.zhihu.com/dataman/20504669
數據挖掘系列篇(10):大眾點評的實時計算
實時計算是目前在推薦、搜索廣告等場景中經常需要應用的地方,它不像離線計算那樣可以有長時間來准備數據,做數據處理。在實際的應用場景,要考慮到用戶的感受。比如我在城西銀泰搜索附近的商家,這個就需要實時計算距離來支持。還有像雙十一這樣的推薦場景,營銷活動時間也有一天,必須做到實時計算來查看到活動的效果,並及時來調整營銷方案。阿里、百度在實時計算平台做了很多應用。今天看到一篇王新春介紹的大眾點評的實時計算storm的應用,所以來分享下。
實時計算在點評的使用場景
類別一:Dashboard、實時DAU、新激活用戶數、實時交易額等
♦ Dashboard類:北斗(報表平台)、微信(公眾號)和雲圖(流量分析)等
♦ 實時DAU:包括主APP(Android/iPhone/iPad)、團APP、周邊快查、PC、M站
♦ 新激活用戶數:主APP
♦ 實時交易額:閃惠/團購交易額
以報表平台為例,下圖是一張APP UV的實時曲線圖,它以分鍾級別粒度展現了 實時的DAU數據和曲線。

從圖中可以看見一個尖點,這個尖點就是當天push過后帶來的用戶,這樣可以看到實時的運營效率。
類別二:搜索、推薦、安全等
以搜索為例:用戶在點評的每一步有價值的操作(包括:搜索、點擊、瀏覽、購買、收藏等),都將實時、智能地影響搜索結果排序,從而顯著提升用戶搜索體驗、搜索轉化率。
某用戶 搜索“ 火鍋 ”,當他 在搜索結果頁 點擊了“ 重慶高老九火鍋 ”后, 再次刷新搜索結果列表時,該商戶的排序就會提升到頂部 。
再結合其他的一些實時反饋的個性化推薦策略,最終使團購的交易額有了明顯的增加,轉化率提升了2個多點。

插圖2
實時計算在業界的使用場景
場景1:阿里JStorm
♦ 雙11實時交易數據
場景2:360 Storm
♦ 搶票軟件驗證碼自動識別:大家用360瀏覽器在12306上買票的時候,驗證碼自動識別是在Storm上計算完成的。
♦ 網盤圖片縮略圖生成:360網盤的縮略圖也是實時生成出來的,這樣可以節約大量的文件數量和存儲空間。
♦ 實時入侵檢測
♦ 搜索熱詞推薦
場景3:騰訊TDProcess
分布式K/V存儲引擎TDEngine和支持數據流計算的TDProcess,TDProcess是基於Storm的計算引擎,提供了通用的計算模型,如Sum、Count、PV/UV計算和TopK統計等。
場景4:京東Samza
整個業務主要應用訂單處理,實時分析統計出待定區域中訂單各個狀態的量:待定位、待派工、待揀貨、待發貨、待配送、待妥投等。
點評如何構建實時計算平台
點評的實時計算平台是一個端到端的方案,從下面的平台 架構圖,可以看出整體架構是一個比較長的過程,包括了數據源、數據的傳輸通道、計算、存儲和對外服務等。
插圖3
實時計算平台首先解決的問題是,數據怎么獲取,如何拿到那些數據。
我們現在做到了幾乎所有點評線上產生的數據都可以毫秒級拿到,封裝對應的數據輸入源Spout。 通過Blackhole支持日志類實時獲取,包括打點日志、業務Log、Nginx日志等。 整合Puma Client第一時間獲取數據庫數據變更。 整合Swallow獲取應用消息。
Blackhole是我們團隊開發的類Kafka系統,主要目標是批量從業務方拉取日志時做到數據的完整性和一致性,然后也提供了實時的消費能力。 Puma是以MySQL binlog為基礎開發的,這樣可以實時拿到數據庫的update、delete、insert操作。 Swallow是點評的MQ系統 。
通過整合各種傳輸通道,並且封裝相應的Spout,做業務開發的同學就完全不用關心數據怎樣可靠獲取,只需要寫自己的業務邏輯就可以了。
解決了數據和傳輸問題后,計算過程則在Storm中完成。 如果在Storm計算過程中或計算出結果后,需要與外部存儲系統交互,我們也提供了一個data-service服務 ,通過點評的RPC框架提供接口, 用戶不用關心實際Redis/HBase這些系統的細節和部署情況, 以及這個數據到底是在Redis還是HBase中的,我們可以根據SLA來做自動切換; 同時計算的結果也是通過data-service服務,再反饋到線上系統。
就拿剛剛搜索結果的例子,搜索業務在用戶再次搜索的時候會根據userId請求一次data-service,然后拿到這個用戶的最近瀏覽記錄,並重新排序結果,返回給用戶。 這樣的好處就是實時計算業務和線上其他業務完全解耦,實時計算這邊出現問題,不會導致線上業務出現問題。
Storm基礎知識簡單介紹
Apache Storm( Apache Storm )是由Twitter開源的分布式實時計算系統。Storm可以非常容易、可靠地處理無限的數據流。對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。Storm可以使用何編程語言進行開發。
Storm的集群表面上看和Hadoop的集群非常像,但是在Hadoop上面運行的是MapReduce的Job,而在Storm上面運行的是Topology。
Storm和Hadoop一個非常關鍵的區別是Hadoop的MapReduce Job最終會結束, 而Storm的Topology會一直運行(除非顯式地殺掉)。
Storm基本概念:
插圖4
Nimbus和Supervisor之間的通訊是依靠ZooKeeper來完成,並且Nimbus進程和Supervisor都是快速失敗(fail-fast)和無狀態的。可以用kill -9來殺死Nimbus和Supervisor進程,然后再重啟它們,它們可以繼續工作。
在Storm中,Spout 是Topology中產生源數據流的組件。通常Spout獲取從Kafka、MQ等的數據,然后調用nextTuple函數,發射數據出去供Bolt消費。
插圖5
圖中的Spout就發射出去了兩條數據流。
而Bolt是在Topology中接受Spout的數據,然后執行處理的組件。Bolt在接收到消息后會調用execute函數,用戶可以在其中執行自己想要的操作。
插圖6
為什么用Storm呢,因為Storm有它的優點:
易用性
只要遵守Topology,Spout, Bolt的編程規范即可開發出一個擴展性極好的應用,像底層RPC,Worker之間冗余,數據分流之類的操作,開發者完全不用考慮。
擴展性
當某一級處理單元速度不夠時,直接配置一下並發數,即可線性擴展性能。
健壯性
當Worker失效或機器出現故障時, 自動分配新的Worker替換失效Worker。
准確性
采用Acker機制,保證數據不丟失。采用事務機制,保證數據准確性。
剛剛介紹了一些Storm的基礎概念和特性,再用一張比較完整的圖來回顧一下整個Storm的體系架構:
插圖7
Storm提交一個作業的時候,是通過Thrift的Client執行相應的命令來完成。
Nimbus針對該Topology建立本地的目錄,Nimbus中的調度器根據Topology的配置計算Task,並把Task分配到不同的Worker上,調度的結果寫入Zookeeper中。
Zookeeper上建立assignments節點,存儲Task和Supervisor中Worker的對應關系。
在Zookeeper上創建workerbeats節點來監控Worker的心跳。
Supervisor去Zookeeper上獲取分配的Tasks信息,啟動一個或者多個Worker來執行。
每個Worker上運行多個Task,Task由Executor來具體執行。Worker根據Topology信息初始化建立Task之間的連接,相同Worker內的Task通過DisrupterQueue來通信,不同Worker間默認采用Netty來通信,然后整個Topology就運行起來了。
如何保證業務運行可靠性
首先Storm自身有很多容錯機制,我們也加了很多監控信息,方便業務同學監控自己的業務狀態。 在Storm上,遇到的一個很基本的問題就是,各個業務是運行的Worker會跑在同一台物理機上。曾經有位同學就在自己的Worker中起了200多個線程來處理json,結果就是這台機器的CPU都被他的Worker吃光了,其他的業務也跟着倒霉。
因此我們也使用CGroup做了每個Worker的資源隔離, 主要限制了CPU和Memory的使用。相對而言JStorm在很多方面要完善一些,JStorm自己就帶資源隔離。 對應監控來說,基本的主機維度的監控在ganglia上可以看見,比如現在集群的運行狀況。
下圖是現在此時的集群的網絡和負載:

插圖8
這些信息並不能保證業務就OK,因此我們將Storm上的很多監控信息和點評的開源監控系統Cat集成在了一起,從Cat上可以看見更多的業務運行狀態信息。

比如在Cat中我可以看見整個集群的TPS,現在已經從30多萬降下來了。 然后我可以設置若干的報警規則, 如:連續N分鍾降低了50%可以報警。 然后也監控了各個業務Topology的TPS、Spout輸入、Storm的可用Slot等的變化。
這個圖就是某個業務的TPS信息, 如果TPS同比或者環比出現問題,也可以報警給業務方。
Storm使用經驗分享
1.使用組件的並行度代替線程池
Storm自身是一個分布式、多線程的框架,對每個Spout和Bolt,我們都可以設置其並發度;它也支持通過rebalance命令來動態調整並發度,把負載分攤到多個Worker上。
如果自己在組件內部采用線程池做一些計算密集型的任務,比如JSON解析,有可能使得某些組件的資源消耗特別高,其他組件又很低,導致Worker之間資源消耗不均衡,這種情況在組件並行度比較低的時候更明顯。
比如某個Bolt設置了1個並行度,但在Bolt中又啟動了線程池,這樣導致的一種后果就是,集群中分配了這個Bolt的Worker進程可能會把機器的資源都給消耗光了,影響到其他Topology在這台機器上的任務的運行。如果真有計算密集型的任務,我們可以把組件的並發度設大,Worker的數量也相應提高,讓計算分配到多個節點上。
為了避免某個Topology的某些組件把整個機器的資源都消耗光的情況,除了不在組件內部啟動線程池來做計算以外,也可以通過CGroup控制每個Worker的資源使用量。
2.不要用DRPC批量處理大數據
RPC提供了應用程序和Storm Topology之間交互的接口,可供其他應用直接調用,使用Storm的並發性來處理數據,然后將結果返回給調用的客戶端。這種方式在數據量不大的情況下,通常不會有問題,而當需要處理批量大數據的時候,問題就比較明顯了。
(1)處理數據的Topology在超時之前可能無法返回計算的結果。
(2)批量處理數據,可能使得集群的負載短暫偏高,處理完畢后,又降低回來,負載均衡性差。
批量處理大數據不是Storm設計的初衷,Storm考慮的 是時效性和批量之間的均衡,更多地看中前者。需要准實時地處理大數據量,可以考慮Spark Stream等批量框架。
3.不要在Spout中處理耗時的操作
Spout中nextTuple方法會發射數據流,在啟用Ack的情況下,fail方法和ack方法會被觸發。
需要明確一點,在Storm中Spout是單線程(JStorm的Spout分了3個線程,分別執行nextTuple方法、fail方法和ack方法)。如果nextTuple方法非常耗時,某個消息被成功執行完畢后,Acker會給Spout發送消息,Spout若無法及時消費,可能造成ACK消息超時后被丟棄,然后Spout反而認為這個消息執行失敗了,造成邏輯錯誤。反之若fail方法或者ack方法的操作耗時較多,則會影響Spout發射數據的量,造成Topology吞吐量降低。
4.注意fieldsGrouping的數據均衡性
fieldsGrouping是根據一個或者多個Field對數據進行分組,不同的目標Task收到不同的數據,而同一個Task收到的數據會相同。
假設某個Bolt根據用戶ID對數據進行fieldsGrouping,如果某一些用戶的數據特別多,而另外一些用戶的數據又比較少,那么就可能使得下一級處理Bolt收到的數據不均衡,整個處理的性能就會受制於某些數據量大的節點。可以加入更多的分組條件或者更換分組策略,使得數據具有均衡性。
5.優先使用localOrShuffleGrouping
localOrShuffleGrouping是指如果目標Bolt中的一個或者多個Task和當前產生數據的Task在同一個Worker進程里面,那么就走內部的線程間通信,將Tuple直接發給在當前Worker進程的目的Task。否則,同shuffleGrouping。
localOrShuffleGrouping的數據傳輸性能優於shuffleGrouping,因為在Worker內部傳輸,只需要通過Disruptor隊列就可以完成,沒有網絡開銷和序列化開銷。因此在數據處理的復雜度不高,而網絡開銷和序列化開銷占主要地位的情況下,可以優先使用localOrShuffleGrouping來代替shuffleGrouping。
6.設置合理的MaxSpoutPending值
在啟用Ack的情況下,Spout中有個RotatingMap用來保存Spout已經發送出去,但還沒有等到Ack結果的消息。RotatingMap的最大個數是有限制的,為p*num-tasks。其中p是topology.max.spout.pending值,也就是MaxSpoutPending(也可以由TopologyBuilder在setSpout通過setMaxSpoutPending方法來設定),num-tasks是Spout的Task數。如果不設置MaxSpoutPending的大小或者設置得太大,可能消耗掉過多的內存導致內存溢出,設置太小則會影響Spout發射Tuple的速度。
7.設置合理的Worker數
Worker數越多,性能越好?先看一張Worker數量和吞吐量對比的曲線(來源於JStorm文檔: jstorm/docs at master · alibaba/jstorm · GitHub 0.9.4.1jstorm性能測試.docx)。

從圖可以看出,在12個Worker的情況下,吞吐量最大,整體性能最優。這是由於一方面,每新增加一個Worker進程,都會將一些原本線程間的內存通信變為進程間的網絡通信,這些進程間的網絡通信還需要進行序列化與反序列化操作,這些降低了吞吐率。
另一方面,每新增加一個Worker進程,都會額外地增加多個線程(Netty發送和接收線程、心跳線程、SystemBolt線程以及其他系統組件對應的線程等),這些線程切換消耗了不少CPU,sys 系統CPU消耗占比增加,在CPU總使用率受限的情況下,降低了業務線程的使用效率。
8.平衡吞吐量和時效性
Storm的數據傳輸默認使用Netty。在數據傳輸性能方面,有如下的參數可以調整:
storm.messaging.netty.server_worker_threads和storm.messaging.netty.client_worker_threads分別為接收消息線程和發送消息線程的數量。
netty.transfer.batch.size是指每次 Netty Client向 Netty Server發送的數據的大小,如果需要發送的Tuple消息大於netty.transfer.batch.size,則Tuple消息會按照netty.transfer.batch.size進行切分,然后多次發送。
storm.messaging.netty.buffer_size為每次批量發送的Tuple序列化之后的TaskMessage消息的大小。
storm.messaging.netty.flush.check.interval.ms表示當有TaskMessage需要發送的時候, Netty Client檢查可以發送數據的頻率。
降低storm.messaging.netty.flush.check.interval.ms的值,可以提高時效性。增加netty.transfer.batch.size和storm.messaging.netty.buffer_size的值,可以提升網絡傳輸的吐吞量,使得網絡的有效載荷提升(減少TCP包的數量,並且TCP包中的有效數據量增加),通常時效性就會降低一些。因此需要根據自身的業務情況,合理在吞吐量和時效性直接的平衡。
除了這些參數,我們怎么找到Storm中性能的瓶頸,可以通過如下的一些途徑來進行:

在Storm的UI中,對每個Topology都提供了相應的統計信息,其中有3個參數對性能來說參考意義比較明顯,包括Execute latency、Process latency和Capacity。
分別看一下這3個參數的含義和作用。
(1)Execute latency:消息的平均處理時間,單位為毫秒。
(2)Process latency:消息從收到到被ack掉所花的時間,單位為毫秒。如果沒有啟用Acker機制,那么Process latency的值為0。
(3)Capacity:計算公式為Capacity = Bolt或者Executor調用execute方法處理的消息數量 * 消息平均執行時間 / 時間區間。這個值越接近1,說明Bolt或者Executor基本一直在調用execute方法,因此並行度不夠,需要擴展這個組件的Executor數量。
為了在Storm中達到高性能,我們在設計和開發Topology的時候,需要注意以下原則:
(1)模塊和模塊之間解耦,模塊之間的層次清晰,每個模塊可以獨立擴展,並且符合流水線的原則。
(2)無狀態設計,無鎖設計,水平擴展支持。
(3)為了達到高的吞吐量,延遲會加大;為了低延遲,吞吐量可能降低,需要在二者之間平衡。
(4)性能的瓶頸永遠在熱點,解決熱點問題。
(5)優化的前提是測量,而不是主觀臆測。收集相關數據,再動手,事半功倍。
關於計算框架的后續想法
目前Hadoop/Hive專注於離線分析業務,每天點評有1.6萬個離線分析任務。Storm專注於實時業務,實時每天會處理100億+條的數據。
在這兩個框架目前有很大的gap,一個是天級別,一個是秒級別,然后有大量的業務是准實時的,比如分鍾級別。因此我們會使用Spark來做中間的補充。
Spark Streaming + Spark SQL也能夠降低很大的開發難度。相對而言,目前Storm的學習和開發成本還是偏高。要做一個10萬+ TPS的業務在Storm上穩定運行,需要對Storm了解比較深入才能做到,不然會發現有這樣或者那樣的問題。
后面,我們計划在的大數據開發者平台上,統一實時計算/准實時計算和離線計算任務的管理和監控。
【Q&A】
【問題1】 Blackhole和Swallow專注點區別是什么?
Blackhole主要專注於日志類型的業務,就像Kafka一樣,日志類型的對可靠性和一致性要求不會那么高,但是需要支持非常大的QPS,比如幾十萬到幾百萬。
【問題2】日志格式是統一定義的吧?能分享一下日志格式嗎?
日志格式是統一的,我們提供了一個基於log4j的日志框架,里面定義好了KV的分隔符。業務把日志輸出到文件,然后通過Blackhole把日志文件讀取,然后在Spout中完成解析,在Blot中就是具體的日志的KV對了,業務就自己去使用。至於格式,很簡單,只要定義好每個KV對的分隔符,然后K和V的分隔符就可以了。
【問題3】S專注在業務上?考慮過事務么,會不會有重復處理造成數據異常的問題?
對於這個問題,首先我們在實際業務中還沒有使用事務。在沒有啟用事務的情況下,需要考慮業務的冪等的問題。如果業務可以冪等,那么重復數據不會有任何問題。因為像Kafka等系統,保證的是at leaset once,數據源就會有重復數據出現。然后啟用事務會對性能也有比較大的影響,這個就自己權衡了
【問題4】APP Client端的數據采集,是否有延遲的問題?
如果是打點數據有延遲,如果一直訪問,延遲很小,1s以內;如果只瀏覽幾次,那么的確可能延遲比較大。client端是以batch發上來,為了省流量。因此有些數據就通過從數據庫那邊拖來,比如用戶收藏了商戶,打點和數據庫都可以拿到,那么就從數據庫拿
【問題5】 系統中的MQ也是用kafka嗎?點評的量級,Kafka的集群數大概是多少?
MQ不是Kafka,是點評基於ActiveMQ修改的,然后消息持久化是在mongodb中。我們用了7台broker支撐了每天2T+的流量
【問題6】根據用戶行為排序,這個會不會影響搜索的性能,是如何解決的?
點評推薦系統就是根據用戶id去redis獲取實時信息,作為score的一個feature。 對搜索影響不大的。作為推薦第一個使用實時數據,效果提升很明顯的。
【問題7】實時計算這里是多大級別的服務器集群呢?
目前,只用1台Nimbus + 9台Supervisor支撐了了20多個業務,峰值的時候大概可以跑到40萬TPS。
【問題8】日志采用寫文件方式,是不是對磁盤io負載高?並發能達到多少?blackhole拉取 這個不能實時吧?
寫文件是寫Page Cache的,因此不會高,可以參考Kafka的文檔。blackhole拉取現在是監聽了文件的變更,因此毫秒內可以知道。
【問題9】 請問點評Storm集群中,共享spout的多個業務的topology划分粒度是怎樣的?
是這樣的,比如流量類型的,后面很多業務會用到流量數據,IP維度的統計,GUID維度的統計,PV統計等,這類會在一個Topology中,因為后續業務只需要使用這個Topology的輸出就可以了。而且流量數據很大,每個業務自己處理,那的確浪費很嚴重。因此這個是共享的,我們也保證他的可用性。其他業務目前我們沒有共享的情況
【問題10】你們的數據抽取會對業務系統有性能影響,而且你們可以做到毫秒級,你們如何降低或消除這些性能影響的?
目前所有的抽取都是旁路的,不是業務的主流程上,因此不會有多大影響。比如業務輸出日志,發送MQ消息等。
【問題11】在最開始的時候您說點評開發了自己的RPC框架。為什么點評要自己開發而不用現有的開源框架呢?
自己開發時候開源還很少。而且不成熟。
【問題12】對於某些數據的采集,是否有采樣策略,如APP Client端的數據采集,還是全量采集?
目前打點數據是全量的,PV MV等都是全量過來的,通過長鏈,小批量+壓縮過來。有一些特殊性的,量又不大的,會走實時發送的通路
【問題13】 除了上面講到的業務點,點評目前還在哪些業務線用到storm計算實時數據?
安全,反作弊,推薦,廣告等都用。
【問題14】各個業務的Spout數據接口是如何定義的。怎么與業務開發人員交互?
比如日志類型的Spout,業務需要知道訂閱那個數據源就可以,其他不管。輸出就是KV對,然后我們有個地方可以去查,這些日志格式是什么含義。
【問題15】 聽過一次騰訊的分享,他們對於storm的使用做了sql接口,點評在做這樣的嘗試么,有沒有可以分享的sql解析工具?
目前沒有使用SQL接口,可以參考esper。
【問題16】Storm使用的那個版本,對JVM做了那些優化,有沒有遇到當cpu90%以上時,出現worker宕掉,然而發生連鎖反應work全掛?
線上版本是0.9.3,0.9.3有幾個bug比較討厭,然后考慮升級0.9.4,同時修改netty server的接收代碼邏輯,在上游數據處理快,下游來不及處理的時候,並且不開ACK情況下目前會導致下游OOM。
cpu 90%沒有遇到worker down的情況,比如今天某個高峰worker就跑到了500%。
【路人補充】這個是0.9.3netty client的一個bug,在0.9.4給修正了,當worker在不同supervisor上遷移時,就可能出現這個問題。
【問題17】在Storm中有沒有應用Esper?
目前沒有。
【問題18】介紹里說實時計算用Storm,分鍾級別計算用Spark;是否一定要嚴格這么划分,有無其他評判標准?比如數據量等
目前沒有嚴格規定,主要是看你對實時性和可靠性的要求。Spark目前在7*24小時的次序運行我們覺得穩定性還差一點。然后Storm的實時性會高一些,Spark略差一些,但是Spark開發成本低,因此業務自己來選。
【問題19】Storm業務配置變更是怎么實現動態更新的?
這個目前配置項都是放在點評基於zk的lion上來完成的,因此可以反推。
【問題20】storm的計算結果存儲都采用的什么介質?
目前我們是用Redis為主,HBase和MySQL為輔,然后部分結果發到MQ。
好了,本次精彩的分享就到這里,后續會繼續上大眾點評的推薦算法和過濾機制。
祝大家周末愉快!
