流計算概述
什么是流數據:
數據有靜態數據和流數據。
靜態數據:
很多企業為了支持決策分析而構建的數據倉庫系統,其中存放的大量歷史數據就是靜態數據。技術人員可以利用數據挖掘和OLAP(On-Line Analytical Processing)分析工具從靜態數據中找到對企業有價值的信息。

圖:靜態數據的一般處理流程
流數據:
近年來,在Web應用、網絡監控、傳感監測等領域,興起了一種新的數據密集型應用——流數據,即數據以大量、快速、時變的流形式持續到達。
實例:PM2.5檢測、電子商務網站用戶點擊流
流數據具有如下特征:
數據快速持續到達,潛在大小也許是無窮無盡的。
數據來源眾多,格式復雜。
數據量大,但是不十分關注存儲,一旦經過處理,要么被丟棄,要么被歸檔存儲。
注重數據的整體價值,不過分關注個別數據。
數據順序顛倒,或者不完整,系統無法控制將要處理的新到達的數據元素的順序。
批量計算和實時計算:
對靜態數據和流數據的處理,對應着兩種截然不同的計算模式:批量計算和實時計算。
批量計算:充裕時間處理靜態數據,如Hadoop。
流計算:流數據不適合采用批量計算,因為流數據不適合用傳統的關系模型建模。流數據必須采用實時計算,響應時間為秒級。在大數據時代,數據格式復雜、來源眾多、數據量巨大,對實時計算提出了很大的挑戰。因此,針對流數據的實時計算——流計算,應運而生。

圖:數據的兩種處理模型
流計算的概念:
流計算:實時獲取來自不同數據源的海量數據,經過實時分析處理,獲得有價值的信息。

圖:流計算示意圖
流計算秉承一個基本理念,即數據的價值隨着時間的流逝而降低,如用戶點擊流。因此,當事件出現時就應該立即進行處理,而不是緩存起來進行批量處理。為了及時處理流數據,就需要一個低延遲、可擴展、高可靠的處理引擎.
對於一個流計算系統來說,它應達到如下需求:
高性能
海量式
實時性
分布式
易用性
可靠性
Streaming定義:
Streaming是基於開源Storm,是一個分布式、實時計算框架。
特點:
實時響應,低延時。
數據不存儲,先計算
連續查詢
事件驅動
傳統數據庫計算:數據先存儲,在查詢處理。
流計算與Hadoop:
Hadoop設計的初衷是面向大規模數據的批量處理。
MapReduce是專門面向靜態數據的批量處理的,內部各種實現機制都為批處理做了高度優化,不適合用於處理持續到達的動態數據。
可能會想到一種“變通”的方案來降低批處理的時間延遲——將基於MapReduce的批量處理轉為小批量處理,將輸入數據切成小的片段,每隔一個周期就啟動一次MapReduce作業。但這種方式也無法有效處理流數據。
切分成小片段,可以降低延遲,但是也增加了附加開銷,還要處理片段之間依賴關系。
需要改造MapReduce以支持流式處理。
結論:魚和熊掌不可兼得,Hadoop擅長批處理,不適合流計算。
Streaming在FusionInsight中的位置:

圖:Streaming在FusionInsight中的位置
Streaming是一個實時分布式的實時計算框架,在實時業務匯總有廣泛的應用。
流計算框架:
當前業界誕生了許多專門的流數據實時計算系統來滿足各自需求。
商業級:IBM InfoSphere Streams和IBM StreamBase。
開源流計算框架
Twitter Storm:免費、開源的分布式實時計算系統,可簡單、高效、可靠地處理大量的流數據。
Yahoo! S4(Simple Scalable Streaming System):開源流計算平台,是通用的、分布式的、可擴展的、分區容錯的、可插拔的流式系統。
公司為支持自身業務開發的流計算框架:
Facebook Puma
Dstream(百度)
銀河流數據處理平台(淘寶)
流計算的應用:
流計算是針對流數據的實時計算,可以應用在多種場景中。如百度、淘寶等大型網站中,每天都會產生大量流數據,包括用戶的搜索內容、用戶的瀏覽記錄等數據。采用流計算進行實時數據分析,可以了解每個時刻的流量變化情況,甚至可以分析用戶的實時瀏覽軌跡,從而進行實時個性化內容推薦。但是,並不是每個應用場景都需要用到流計算的。流計算適合於需要處理持續到達的流數據、對數據處理有較高實時性要求的場景。
主要應用於以下幾種場景:
1 .實時分析:如實時日志處理、交通流量分析等。
2. 實時統計:如網站的實時訪問統計、排序等。
3. 實時推薦:如實時的廣告定位、時間營銷等。
流計算處理流程
概述:
傳統的數據處理流程,需要先采集數據並存儲在關系數據庫等數據管理系統中,之后由用戶通過查詢操作和數據管理系統進行交互。
傳統的數據處理流程隱含了兩個前提:
存儲的數據是舊的。存儲的靜態數據是過去某一時刻的快照,這些數據在查詢時可能已不具備時效性了。
需要用戶主動發出查詢來獲取結果。
流計算的處理流程一般包含三個階段:數據實時采集、數據實時計算、實時查詢服務。

圖:流計算處理流程示意圖
數據實時采集:
數據實時采集階段通常采集多個數據源的海量數據,需要保證實時性、低延遲與穩定可靠。
以日志數據為例,由於分布式集群的廣泛應用,數據分散存儲在不同的機器上,因此需要實時匯總來自不同機器上的日志數據。
目前有許多互聯網公司發布的開源分布式日志采集系統均可滿足每秒數百MB的數據采集和傳輸需求,如:
Facebook的Scribe
LinkedIn的Kafka
淘寶的Time Tunnel
基於Hadoop的Chukwa和Flume
數據采集系統的基本架構一般有以下三個部分:

圖:數據采集系統基本框架
Agent:主動采集數據,並把數據推送到Collector部分。
Collector:接收多個Agent的數據,並實現有序、可靠、高性能的轉發。
Store:存儲Collector轉發過來的數據(對於流計算不存儲數據)。
數據實時計算:
數據實時計算階段對采集的數據進行實時的分析和計算,並反饋實時結果。
經流處理系統處理后的數據,可視情況進行存儲,以便之后再進行分析計算。在時效性要求較高的場景中,處理之后的數據也可以直接丟棄。

圖:數據實時計算流程
實時查詢服務:
實時查詢服務:經由流計算框架得出的結果可供用戶進行實時查詢、展示或儲存。
傳統的數據處理流程,用戶需要主動發出查詢才能獲得想要的結果。而在流處理流程中,實時查詢服務可以不斷更新結果,並將用戶所需的結果實時推送給用戶。
雖然通過對傳統的數據處理系統進行定時查詢,也可以實現不斷地更新結果和結果推送,但通過這樣的方式獲取的結果,仍然是根據過去某一時刻的數據得到的結果,與實時結果有着本質的區別。
可見,流處理系統與傳統的數據處理系統有如下不同:
流處理系統處理的是實時的數據,而傳統的數據處理系統處理的是預先存儲好的靜態數據。
用戶通過流處理系統獲取的是實時結果,而通過傳統的數據處理系統,獲取的是過去某一時刻的結果。
流處理系統無需用戶主動發出查詢,實時查詢服務可以主動將實時結果推送給用戶。
開源流計算框架Storm
Storm簡介:
Twitter Storm是一個免費、開源的分布式實時計算系統,Storm對於實時計算的意義類似於Hadoop對於批處理的意義,Storm可以簡單、高效、可靠地處理流數據,並支持多種編程語言。
Storm框架可以方便地與數據庫系統進行整合,從而開發出強大的實時計算系統。
Twitter是全球訪問量最大的社交網站之一,Twitter開發Storm流處理框架也是為了應對其不斷增長的流數據實時處理需求。
Storm的特點:
Storm可用於許多領域中,如實時分析、在線機器學習、持續計算、遠程RPC、數據提取加載轉換等。
Storm具有以下主要特點:
整合性
簡易的API
可擴展性
可靠的消息處理
支持各種編程語言
快速部署
免費、開源
系統架構:

圖:流計算系統架構圖
基本概念:
Storm主要術語包括Streams、Spouts、Bolts、Topology和Stream Groupings.
Topology:Streaming中運行的一個實時應用程序。
Nimbus:負責資源分配和任務調度。
Supervisor:負責接收Nimbus分配的任務,啟動和停止屬於自己管理的worker進程。
Worker:Topology運行時的物理進程。每個Worker是一個JVM進程。
Spout:Storm認為每個Stream都有一個源頭,並把這個源頭抽象為Spout。
在一個Topology中產生源數據流的組件。
通常Spout會從外部數據源(隊列、數據庫等)讀取數據,然后封裝成Tuple形式,發送到Stream中。Spout是一個主動的角色,在接口內部有個nextTuple函數,Storm框架會不停的調用該函數。
Bolt:在一個Topology中接收數據后然后執行處理的組件。
Task:Worker中每一個Spout/Bolt的線程稱為一個Task。
Tuple:Streaming的核心數據結構,是消息傳遞的基本單元,不可變Key-Value對,這些Tuple會以一種分布式的方式進程創建和處理。
Stream:Storm將流數據Stream描述成一個無限的Tuple序列,這些Tuple序列會以分布式的方式並行地創建和處理。即無界的Tuple序列。
Zookeeper:為Streaming服務中各自進程提供分布式的協作服務、主備Nimbus、Supervisor、Worker將自己的信息注冊到Zookeeper中,Nimbus據此感知各個角色的監控狀態。
Topology介紹:

圖:Topology示意圖
Storm將Spouts和Bolts組成的網絡抽象成Topology,它可以被提
交到Storm集群執行。Topology可視為流轉換圖,圖中節點是一個Spout或Bolt,邊則表示Bolt訂閱了哪個Stream。當Spout或者Bolt發送元組時,它會把元組發送到每個訂閱了該Stream的Bolt上進行處理。
Topology里面的每個處理組件(Spout或Bolt)都包含處理邏輯, 而組件之間的連接則表示數據流動的方向。
Topology里面的每一個組件都是並行運行的:
在Topology里面可以指定每個組件的並行度,Storm會在集群里面分配那么多的線程來同時計算。
在Topology的具體實現上,Storm中的Topology定義僅僅是一些Thrift結構體(二進制高性能的通信中間件),支持各種編程語言進行定義。
一個Topology是由一組Spout組件(數據源)和Bolt組件(邏輯處理)通過Stream Groupings進行連接的有向無環圖(DAG)。
業務處理邏輯被封裝進Streaming中的Topology中。
Worker介紹:

圖:Worker Process示意圖
Worker:一個Worker是一個JVM進程,所有的Topology都是在一個或者多個Worker中運行的。Worker啟動后是長期運行的,除非人工停止。Worker進程的個數取決於Topology的設置,且無設置上限,具體可獲得並調度啟動的Worker個數則取決於Supervisor配置的slot個數。
Executor:在一個單獨的Worker進程中會運行一個或多個Executor線程。每個Executor只能運Spout或者Bolt中的一個或多個Task實例。
Task:是最終完成數據處理的實體單元。
Task介紹:

圖:Task示意圖
Topology里面的每一個Component(組件)(Spout/Blot)節點都是並行運行的。在Topology里面,可以指定每個節點的並發度,Streaming則會在集群里分配響應的Task來同時計算,以增強系統的處理能力。
消息分發策略(Stream Groupings):
Groupings:Storm中的Stream Groupings用於告知Topology如何在兩個組件間(如Spout和Bolt之間,或者不同的Bolt之間)進行Tuple的傳送。每一個Spout和Bolt都可以有多個分布式任務,一個任務在什么時候、以什么方式發送Tuple就是由Stream Groupings來決定的。
目前,Storm中的Stream Groupings有如下幾種方式:
ShuffleGrouping:隨機分組,隨機分發Stream中的Tuple,保證每個Bolt的Task接收Tuple數量大致一致。
FieldsGrouping:按照字段分組,保證相同字段的Tuple分配到同一個Task中。
AllGrouping:廣播發送,每一個Task都會收到所有的Tuple。
GlobalGrouping:全局分組,所有的Tuple都發送到同一個Task中。
NonGrouping:不分組,和ShuffleGrouping類似,當前Task的執行會和它的被訂閱者在同一個線程中執行。
DirectGrouping:直接分組,直接指定由某個Task來執行Tuple的處理。

Storm框架設計:
Storm集群采用“Master—Worker”的節點方式:
Master節點運行名為“Nimbus”的后台程序(類似Hadoop中的“JobTracker”),負責在集群范圍內分發代碼、為Worker分配任務和監測故障。
Worker節點運行名為“Supervisor”的后台程序,負責監聽分配給它所在機器的工作,即根據Nimbus分配的任務來決定啟動或停止Worker進程,一個Worker節點上同時運行若干個Worker進程。
Storm使用Zookeeper來作為分布式協調組件,負責Nimbus和多個Supervisor之間的所有協調工作。借助於Zookeeper,若Nimbus進程或Supervisor進程意外終止,重啟時也能讀取、恢復之前的狀態並繼續工作,使得Storm極其穩定。

圖:Storm集群架構示意圖
Nimbus並不直接和Supervisor交換,而是通過Zookeeper進行消息的傳遞。
Storm和Hadoop架構組件功能對應關系:
Storm運行任務的方式與Hadoop類似:Hadoop運行的是MapReduce作業,而Storm運行的是“Topology”。
但兩者的任務大不相同,主要的不同是:MapReduce作業最終會完成計算並結束運行,而Topology將持續處理消息(直到人為終止)。

圖:Storm和Hadoop架構組件功能對應關系
Storm工作流程:

圖:Storm工作流程示意圖
Storm工作流程為:
提交Topology
將任務存儲在Zookeeper中
獲取分配的任務,並啟動Worker
Worker進程執行具體的任務
所有Topology任務的提交必須在Storm客戶端節點上進行,提交后,由Nimbus節點分配給其他Supervisor節點進行處理。
Nimbus節點首先將提交的Topology進行分片,分成一個個Task,分配給相應的Supervisor,並將Task和Supervisor相關
的信息提交到Zookeeper集群上。
Supervisor會去Zookeeper集群上認領自己的Task,通知自己的Worker進程進行Task的處理。
Streaming提供的接口:
REST接口:(Representational State Transfer)表述性狀態轉移接口。
Thrift接口:由Nimbus提供。Thrift是一個基於靜態代碼生成的跨語言的RPC協議棧實現,它可以生成包括C++,Java,Python, Ruby , PHP等主流語言的代碼實現,這些代碼實現了RPC的協議層和傳輸層功能,從而讓用戶可以集中精力與服務的調用和實現。
Streaming的關鍵特性介紹
Nimbus HA:

圖:Nimbus HA架構
使用Zookeeper分布式鎖:
Nimbus HA的實現是使用Zookeeper分布式鎖,通過主備間爭搶模式完成的Leader選舉和主備切換。
主備間元數據同步:
主備Nimbus之間會周期性的同步元數據,保證在發生主備切換后拓撲數據不丟失,業務不受損。
容災能力:

圖:容災示意圖
容災能力:節點失效,自動遷移到正常節點,業務不中斷。
整個過程無需人工干預!
消息可靠性:

在Streaming里面一個Tuple被完全處理的意思是:這個Tuple所派生的所有tuple都被成功處理。如果這個消息在Timeout所指定的時間內沒有成功處理,這個tuple就被認為處理失敗了。
可靠性級別設置:
如果並不要求每個消息必須被處理(允許在處理過程中丟失一些信息),那么可以關閉消息的可靠性處理機制,從而可以獲得較好的性能。關閉消息的可靠性機制一位着系統中的消息數會減半。
有三種方法可以關閉消息的可靠性處理機制:
將參數Config.TOPOLGY_ACKERS設置為0.
Spout發送一個消息時,使用不指定消息message ID的接口進行發送。
Blot發送消息時使用Unanchor方式發送,使Tuple樹不往下延伸,從而關閉派生消息的可靠性。
ACK機制:

圖:Ack機制
一個Spout發送一個Tuple時,會通知Acker一個新的根消息產生了,Acker會創建一個新的Tuple tree,並初始化校驗和為0.
Bolt發送消息時間向Acker發送anchor tuple,刷新tuple tree,並在發送成功后向Acker反饋結果。如果成功則重新刷新校驗和,如果失敗則Acker會立即通知Spout處理失敗。
當Tuple tree被完成吹了(校驗和為0),Acker會通知Spout處理成功。
Spout提供ack()和Fail()接口方法用戶處理Acker的反饋結果,需要用戶實現。一般在fail()方法中實現消息重發邏輯。
Streaming與其他組件:

整合HDFS/HBase等外部組件,將實時結構提供給其他組件,進程離線分析。
Spark Streaming
Spark Streaming設計:
Spark Streaming可整合多種輸入數據源,如Kafka、Flume、
HDFS,甚至是普通的TCP套接字。經處理后的數據可存儲至文件
系統、數據庫,或顯示在儀表盤里。

圖:SPark Streaming支持的輸入、輸出數據源
Spark Streaming的基本原理是將實時輸入數據流以時間片(秒級)為單位進行拆分,然后經Spark引擎以類似批處理的方式處理每個時間片數據。

圖:Spark Streaming執行流程
**Spark Streaming最主要的抽象是DStream(Discretized Stream,離散化數據流),表示連續不斷的數據流。**在內部實現上,Spark Streaming的輸入數據按照時間片(如1秒)分成一段一段的DStream,每一段數據轉換為Spark中的RDD,並且對DStream的操作都最終轉變為對相應的RDD的操作。

圖:DStream操作示意圖
Spark Streaming 與 Storm的對比:
Spark Streaming和Storm最大的區別在於,Spark Streaming無法實現毫秒級的流計算,而Storm可以實現毫秒級響應。
Spark Streaming構建在Spark上,一方面是因為Spark的低延遲執行引擎(100ms+)可以用於實時計算,另一方面,相比於Storm,RDD數據集更容易做高效的容錯處理。
Spark Streaming采用的小批量處理的方式使得它可以同時兼容批量和實時數據處理的邏輯和算法,因此,方便了一些需要歷史數據和實時數據聯合分析的特定應用場合。
Samza技術原理
基本概念:
(1)作業:一個作業(Job)是對一組輸入流進行處理轉化成輸出流的程序。
(2)分區:
Samza的流數據單位既不是Storm中的元組,也不是Spark Streaming中的DStream,而是一條條消息。
Samza中的每個流都被分割成一個或多個分區,對於流里的每一個分區而言,都是一個有序的消息序列,后續到達的消息會根據一定規則被追加到其中一個分區里。
(3)任務:
一個作業會被進一步分割成多個任務(Task)來執行,其中,每個任務負責處理作業中的一個分區。
分區之間沒有定義順序,從而允許每一個任務獨立執行。
YARN調度器負責把任務分發給各個機器,最終,一個工作中的多個任務會被分發到多個機器進行分布式並行處理。
(4)數據流圖:
一個數據流圖是由多個作業構成的,其中,圖中的每個節點表示包含數據的流,每條邊表示數據傳輸。
多個作業串聯起來就完成了流式的數據處理流程。
由於采用了異步的消息訂閱分發機制,不同任務之間可以獨立運行。

圖:數據流圖
Samza的系統架構:
Samza系統架構主要包括:
流數據層(Kafka)
執行層(YARN)
處理層(Samza API)
流處理層和執行層都被設計成可插拔的,開發人員可以使用其他框架來替代YARN和Kafka。

圖:MapReduce批處理架構和Samza流處理架構對比
處理分析過程:

圖:處理分析過程圖
處理分析過程如下:
Samza客戶端需要執行一個Samza作業時,它會向YARN的ResouceManager提交作業請求。
ResouceManager通過與NodeManager溝通為該作業分配容器(包含了CPU、內存等資源)來運行Samza ApplicationMaster。
Samza ApplicationMaster進一步向ResourceManager申請運行任務的容器。
獲得容器后,Samza ApplicationMaster與容器所在的NodeManager溝通,啟動該容器,並在其中運行Samza Task Runner。
Samza Task Runner負責執行具體的Samza任務,完成流數據處理分析。
Storm、Spark Streaming和Samza的應用場景
從編程的靈活性來講,Storm是比較理想的選擇,它使用Apache Thrift,可以用任何編程語言來編寫拓撲結構(Topology)。
當需要在一個集群中把流計算和圖計算、機器學習、SQL查詢分析等進行結合時,可以選擇Spark Streaming,因為,在Spark上可以統一部署Spark SQL,Spark Streaming、MLlib,GraphX等組件,提供便捷的一體化編程模型。
當有大量的狀態需要處理時,比如每個分區都有數十億個元組,則可以選擇Samza。當應用場景需要毫秒級響應時,可以選擇Storm和Samza,因為Spark Streaming無法實現毫秒級的流計算。
