之前的文章大量的內容在和大家探討分布式存儲,接下來的章節進入了分布式計算領域。坦白說,個人之前專業的重心側重於存儲,對許多計算的內容理解可能不是和確切,如果文章中的理解有所不妥,願虛心賜教。本篇將和大家聊一聊分布式計算的一個子集:批處理。
批處理系統通常也叫脫機系統,需要大量的輸入數據,運行一個作業來處理它,並產生一些輸出數據。工作通常需要一段較長的時間(從幾分鍾到幾天)。批處理作業通常是周期性地運行的(例如,一天一次)。批處理作業的主要性能度量通常是吞吐量。
1.MapReduce
批處理是我們構建可靠、可擴展和可維護應用程序的重要組成部分。而谷歌在2004年發布的批處理算法:MapReduce,是處理大規模數據集的重要模型,雖然與為數據倉庫專門開發的並行處理系統相比,MapReduce是一種相當低級的編程模型,但它依然對批處理的模型理解有很大的幫助,所以我們以MapReduce作為起點,開啟我們的批處理的計算之旅。
分布式存儲系統與MapReduce
MapReduce是一種相當生硬,野蠻的工具,但卻十分有效。單個MapReduce作業:可以有一個或多個輸入,並生成一個或多個輸出。 MapReduce作業是函數式編程的模型,不會修改輸入,除了生成輸出之外,不會產生任何副作用。輸出文件按順序編寫一次(不修改已寫入文件的任何現有部分)。
MapReduce作業需要讀、寫文件的分布式文件系統。如:HDFS,GFS,GlusterFS,Amazon S3 等等。之后我們使用HDFS作為運行環境,但這些原則適用於任何的分布式存儲系統。HDFS是基於無共享的存儲集群,而共享磁盤存儲由集中式存儲設備實現,通常使用定制硬件和特殊的網絡基礎設施(如光纖通道)。所以HDFS不需要特殊的硬件,只需要由傳統的數據中心網絡連接的計算機。HDFS的守護進程的每台計算機上運行,將允許其他節點訪問存儲在該計算機上文件的數據,而中央服務器NameNode跟蹤哪些文件塊存儲在哪台機器。因此,創建一個大的文件HDFS上,可以使用集群之中的所有計算機。
為了容忍機器和磁盤故障,可以在集群的多台機器上復制文件塊。所以多台機器上的同一數據的幾個副本,當然這里也可以使用糾刪碼技術,可以允許丟失的數據以比完全復制更低的存儲開銷被存儲。糾刪碼技術類似於RAID,它在同一台機器上的多個磁盤上提供冗余。不同之處在於,對糾刪碼的副本進行讀寫時需要額外的編解碼計算。
MapReduce的工作流程
MapReduce與傳統的UNIX命令管道的主要區別在於,MapReduce可以跨越多台計算機並行計算,手動編寫的Mapper和Reducer不需要了解輸入來自何處或輸出的去處,由框架來處理機器之間移動數據的復雜性。
下圖展示了一個MapReduce作業的工作流程,作業的輸入是HDFS的一個目錄,目錄內每個文件塊作為一個單獨的分區,由一個單獨的Map任務處理,每個輸入文件的大小通常是數百兆字節(取決於HDFS的塊大小)。MapReduce的調度器試圖在存儲輸入文件副本塊機器上運行Mapper,只要該機器有足夠內存和CPU資源來運行Map任務。通過這樣的方式節省了在網絡上復制文件塊的帶寬,減少了網絡負載,利用了分布式計算的局部性原理。

應用程序代碼被打包成Jar文件,上傳到分布式存儲系統之上,對應的節點會下載應用程序的Jar文件,然后啟動Map任務並開始讀取輸入文件,每次將一條記錄傳遞給Mapper的回調函數,並輸出Map處理過后的鍵值對。Map的任務的數量取決於輸入文件塊的數量,但是Reduce任務的數量由作業作者配置,為了確保同一個鍵的所有鍵值對都由同一個Reducer處理,框架使用一個散列鍵來確定鍵值對應該對應的Reduce任務。
MapReduce需要對鍵值對進行排序,但數據集可能太大,無法用一台機器上的常規排序算法進行排序。所以,每個Map任務根據散列將鍵值對輸出到對應的Reducer的磁盤分區,並對鍵值對進行排序。每當Mapper完成工作時,MapReduce調度器通知Reducer,它們可以開始從Mapper獲取輸出文件。Reducer從Mapper端獲取對應的輸出的鍵值對文件,並進行歸並排序,保持排序順序,這個過程稱之為Shuffle。
最后,Reducer調用Reduce函數來處理這些有序的鍵值對,並且可以生成任意數量的輸出記錄,並寫入分布式存儲系統。這便是一次完整的MapReduce任務的全過程。
MapReduce作業的鏈式調度
一個MapReduce作業可以解決的問題范圍是有限的。因此,MapReduce的作業需要被鏈接到工作流中,這樣一個作業的輸出就成為下一個作業的輸入。Hadoop的MapReduce框架,可以隱式的通過目錄名來鏈接:第一個MapReduc的作業配置寫輸出到HDFS的指定的目錄,第二個MapReduce作業讀取相同的目錄名作為輸入。從MapReduce的框架來看,它們是兩個獨立的工作。
只有當前一個作業成功完成時,下一個作業的輸入才會被認為是有效的(失敗的MapReduce作業的結果會被丟棄)。所以不同的作業之前會產生依賴關系的有向無環圖,來處理這些依賴關系的工作執行,目前Hadoop有許多對批處理的調度程序,如:Oozie,Azkaban, Luigi, Airflow,等。在一個大型公司之中,許多不同的團隊可能運行不同的工作,它們讀取彼此的輸出,所以通過工具支持管理等復雜的數據流是很重要的。
2.MapReduce作業的業務場景
我們通過一個實例,來具體了解類MapReduce作業的業務場景。如下圖所示:左邊是一個由日志記錄的行為描述,稱為用戶活動,右邊是一個數據庫的用戶用戶表。

數據分析人員的任務可能需要將用戶活動與用戶的信息關聯起來:分析哪些頁面最受年齡組的歡迎。但是用戶活動日志之中,只包含了用戶的ID,而不包含完整的用戶信息。這時候就需要一個Join操作,最簡單的實現思路是逐一檢查用戶活動,並對每個用戶ID來查詢用戶數據庫,顯然,這樣的實現會帶來很糟糕的性能表現。任務吞吐量將受到數據庫服務器往返時間的限制,並且本地緩存的有效性將非常依賴於數據的分布,並行運行海量的查詢可能會超出數據服務器的處理能力。為了在作業過程之中有更大的吞吐量,計算必須(盡可能地)在一台機器上進行。通過網絡上隨機訪問請求要處理的每一條記錄是十分緩慢的。此外,查詢遠程數據庫將意味着批處理作業變得不確定,因為遠程數據庫中的數據隨時可能會更改。
因此,更好的方法是獲取用戶數據庫的副本(使用ETL將數據庫的數據中提取到“數據倉庫”),並將其放入分布式存儲系統之中。這樣,我們可以使用MapReduce這樣的工具來更加有效地處理。如下圖所示:由MapReduce框架按鍵對Mapper輸出進行分區,然后對鍵值對排序時,其效果是所有活動事件和具有相同用戶ID的用戶記錄在同一個Reducer之中並且彼此相鄰。之后,Reducer可以很容易地執行實際的Join邏輯:每個用戶ID都調用一次Reduce函數,輸出活動的URL和用戶的年齡。隨后可以啟動一個新的MapReduce作業來計算每個URL的查看器年齡分布,並按年齡組分組。

接下來,我們來梳理一些業務層面的細節,以及用MapReduce框架的一些細節:
-
業務邏輯分離
在上述的業務場景之中,最重要的就是保證同一個用戶ID的活動需要匯集到同一個Reducer來進行處理,這個就是前文我們聊到Shuffle的功能,所有鍵值相同的鍵值對都會被傳遞到相同的目的地。MapReduce編程模型將計算的通信協作與應用程序邏輯處理分離。這就是MapReduce框架的高明之處,由MapReduce的框架本身處理所有的網絡通信,業務人員專注於應用程序代碼的實現,如果在這個過程之中出現了節點的故障,MapReduce透明的失敗重試來確保應用程序邏輯不受影響。 -
數據分組
數據除了Join場景之外,通過鍵值對對數據進行分組也是數據系統常用的操作:對所有具有相同鍵的記錄都形成一個組,之后對組內的數據進行操作。 現在問題來了?我們怎么樣使用MapReduce來實現這樣的分組操作呢?實現方式也很簡單,通過在Map函數之中對鍵值對進行改造,插入使鍵值對產生預期分組的Key,之后分區和排序將相同的Key匯集到同一個Reducer之中。在MapReduce上實現時,分組和Join看起來非常相似。 -
數據傾斜
如果同一個鍵相關的數據量非常大,對於MapReduce框架來說可能會成為一個挑戰,因為相同鍵會匯集到同一個Reducer進行處理。例如,在社交網絡中,少數名人可能有數以百萬計的追隨者。(第一章我們就舉過這個例子)所以在MapReduce作業之中存在數據傾斜,如何來進行補償呢?在Pig之中,會先運行一個采樣任務來確定哪個鍵是熱的,在作業實際執行時,Mapper會把出現數據傾斜的鍵值對通過隨機選擇分發個指定的多個Reducer。而Hive的傾斜連接優化采用了另一種方法。它需要在表元數據中顯式指定熱鍵,它將與這些鍵相關的記錄存儲在元數據之中,后續對表進行操作時,采用類似於Pig的優化思路。
3.批處理的意義
前文已經討論了MapReduce作業的工作流程,現在我們回到一個問題來:所有處理的結果是什么?為什么我們一開始就要做所有這些工作? 批處理操作的核心是對數據系統之中的數據進行解析,這類操作需要掃描大量的記錄,進行分組和聚合,並輸出到數據庫以報告的形式呈現,通過報告給消費者或分析師進行數據決策。
同樣,批處理適合建立搜索索引。谷歌最初使用MapReduce是為它的搜索引擎構建索引,通過5到10個MapReduce作業的工作流來實現實現的。如果需要執行全文搜索一組文件中,通過批處理過程是一個非常有效的方法:由每個Map任務對數據分區,之后每個Reducer建立分區索引,將索引文件寫入到分布式文件系統。因為通過關鍵字查詢搜索索引是只讀操作,這些索引文件在創建后是不可變的。 如果索引的文檔集發生變化,一個選項是周期性地為整個文檔集重新運行整個索引工作流程,並在完成新索引文件時將以前的索引文件替換為新的索引文件。(如果只是少量文件的變化,則不適用批處理任務進行處理)
批處理的作業的將輸入視為不可變且避免副作用(如向外部數據庫寫入),不僅實現了良好的性能,而且變得更容易維護。如果您在代碼中引入了一個bug,輸出錯誤,可以簡單地回滾到以前版本的代碼並重新運行該作業,並且再次輸出正確的結果。更簡單的解決方案,可以將舊輸出保存在不同的目錄中,然后簡單地進行切換。由於這種易於回滾的特性,功能開發可以比在不能回滾的環境中進行得更快。有利於敏捷的軟件開發。批處理將邏輯處理代碼與配置分離,這里便允許優雅地重用代碼:一個團隊可以專注於實現邏輯處理,而其他團隊可以決定何時何地運行該作業。
小結:
本篇我們梳理了MapReduce的處理框架,並探討了許多批處理作業的特點。除了MapReduce的模型,數據系統中仍然有許多處理數據的計算模型,接下來會和大家來繼續探討數據系統之中的計算模型..............
