一、什么是Hadoop
Hadoop是一個由Apache基金會所開發的分布式系統基礎架構。
用戶可以在不了解分布式底層細節的情況下,開發分布式程序。充分利用集群的威力進行高速運算和存儲。
Hadoop實現了一個分布式文件系統(Hadoop Distributed File System),簡稱HDFS。HDFS有高容錯性的特點,並且設計用來部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)來訪問應用程序的數據,適合那些有着超大數據集(large data set)的應用程序。HDFS放寬了(relax)POSIX的要求,可以以流的形式訪問(streaming access)文件系統中的數據。
Hadoop的框架最核心的設計就是:HDFS和MapReduce。HDFS為海量的數據提供了存儲,則MapReduce為海量的數據提供了計算。
二、MapReduce理論基礎
每個MapReduce job都是Hadoop客戶端想要執行的一個工作單元,它一般由輸入數據、MapReduce程序和配置信息組成,而Hadoop會把每個job分隔成兩類任務(task):map任務和reduce任務。在Hadoop集群中有兩類節點來執行兩類job進程的執行
1.1 大數據處理
任何基礎業務包含了收集、分析、監控、過濾、搜索或組織web內容的公司或組織都面臨着所謂的“大數據”問題:“web規模”處理即海量數據處理的代名詞。社交類網站的興起也使得這些組織面臨着另一個問題:用戶行為數據分析,這涉及到通過日志文件記錄用戶的對web頁面瀏覽、點擊、停留時長等,而后對日志文件中的大量數據進行分析以支持進行合理、正確的商業決策。 那么,大數據處理究竟意味着對多大規模的數據進行處理?一個簡單的例子:Google在2004年平均每天利用MapReduce處理100GB的數據,到2008年平均每天處理的數據已經達到20PB;2009年,Facebook的數據量達到2.5PB,且以每天15TB的速度在增長。PB級別的數據集正變得越來越常見,大數據時代的到來已然是不爭的事實,密集數據處理也正迅速成為現實需求。 大數據問題的處理需要以與傳統數據處理方式所不同的方法去實現,這正是MapReduce思想得以大放光彩的核心所在。MapReduce在實現大數據處理上有着多個基礎理論思想的支撐,然而這些基礎理論甚至實現方法都未必是MapReduce所創,它們只是被MapReduce采用獨特的方式加以利用而已。 (1)向外擴展(Scale out)而非向上擴展(Scale up):大數據的處理更適合采用大量低端商業服務器(scale out)而非少量高端服務器(scale up)。后者正是向上擴展的系統性能提升方式,它通常采用有着SMP架構的主機,然而有着大量的CPU插槽(成百上千個)及大量的共享內存(可以多達數百GB)的高端服務器非常昂貴,但其性能的增長卻非線性上升的,因此性價比很一般。而大量的低端商業服務器價格低廉、易於更換和伸縮等特性有效避免了向上擴展的敝端。 (2)假設故障很常見(Assume failures are common):在數據倉庫架構級別,故障是不可避免且非常普遍的。假設一款服務器出故障的平均概率為1000天1次,那么10000台這種服務器每天出錯的可能性將達到10次。因此,大規模向外擴展的應用場景中,一個設計優良且具有容錯能力的服務必須能有效克服非常普遍的硬件故障所帶來的問題,即故障不能導致用戶應用層面的不一致性或非確定性。MapReduce編程模型能通過一系列機制如任務自動重啟等健壯地應付系統或硬件故障。 (3)將處理移向數據(Move processing to the data):傳統高性能計算應用中,超級計算機一般有着處理節點(processing node)和存儲節點(storage node)兩種角色,它們通過高容量的設備完成互聯。然而,大多數數據密集型的處理工作並不需要多么強大的處理能力,於是把計算與存儲互相分開將使得網絡成為系統性能瓶頸。為了克服計算如此類的問題,MapReduce在其架構中將計算和存儲合並在了一起,並將數據處理工作直接放在數據存儲的位置完成,只不過這需要分布式文件系統予以支撐。 (4)順序處理數據並避免隨機訪問(Process data sequentially and avoid random access):大數據處理通常意味着海量的數量難以全部載入內存,因而必須存儲在磁盤上。然而,機械式磁盤尋道操作的先天性缺陷使得隨機數據訪問成為非常昂貴的操作,因此避免隨機數據訪問並以順序處理為目的完成數據組織成為亟待之需。固態磁盤雖然避免了機械磁盤的某此缺陷,然而其高昂的價格以及並沒有消除的隨機訪問問題仍然無法帶來性能上的飛躍發展。MapReduce則主要設計用來在海量數據集上完成批處理操作,即所有的計算被組織成較長的流式處理操作,以延遲換取較大的吞吐能力。 (5)向程序員隱藏系統級別的細節(Hide system-level details from the application developer) (6)無縫擴展(Seamless scalability)
1.2 MapReduce和大數據問題
海量數據處理的核心思想無非是將一個較大的問題進行“分割包圍、逐個殲滅”。然而其難點和關鍵點在於如何將一個大的問題分分割成多個可以分別在不同的CPU上或不同的主機上進行處理的獨立小問題,而且這些獨立進行處理的小問題所產生的中間結果又該如何合並成最終結果並予以輸出。因此,看似簡單的化整為零的處理思想卻不得不面臨如下的難題: (1) 如何將大問題分割為小任務?進一步地,如何將大問題分解為可以並行處理的小任務? (2) 如何將分解好的小任務派送給分布式系統中的某主機且是較為適合解決此問題的主機上的worker完成處理? (3) 如何保證某worker獲取所需的數據? (4) 如何協調不同worker之間進行同步? (5) 如何將某worker的部分結果共享給其它需要此結果的worker? (6) 如何在出現軟件或硬件故障時仍然能保證上述工作的順利進行? 在傳統的並行或分布式編程模型中,程序員不得不顯式地解決上述的部分甚至是全部問題,而在共享內存編程中,程序員需要顯式地協調對共享數據結構的如互斥鎖的訪問、顯式地通過柵(barrier)等設備解決進程同步問題、並得時刻警惕着程序中可能出現的死鎖或競爭條件。雖然有些編程語言也或多或少地規避了讓程序員面對上述問題,但卻也避免不了將資源分配給各worker的問題。MapReduce的優勢之一便是有效地向程序員隱藏了這些問題。
1.3 函數式編譯語言
MapReduce是一種類似於Lisp或ML的函數式編程語言。函數式編程的核心特性之一是基於高階函數,即能夠接受其它函數作為參數的函數完成編程。MapReduce有兩個常見地內置高階函數map和fold。 如圖所示,給定一個列表,map(接受一個參數)以函數f為其參數並將其應用於列表中的所有元素;fold(接受兩個參數)以函數g和一個初始值作為參數,然后將g應用於初始值和列表中的第一個元素,結果被放置於中間變量中。中間變量和第二個元素將作為g函數下一次應用時的參數,而后如此操作直至將列表中的所有元素處理完畢后,fold會將最終處理結果保存至一個中間變量中。 於是,基於上述過程,我們可以把map視作利用f函數將給定數據集完成形式轉換的操作,同樣地,fold就可以被看作利用g函數完成數據聚合的操作。我們就可以由此得知,各函數式程序在運行時彼此間是隔離的,因此,在map中將f函數應用於列表中每一個元素的操作可以並行進行,進一步地講,它們可以分布於集群中的不同節點上並行執行。然而,受限於數據的本地性,fold操作需要等到列表中的每一個元素都准備停當之后才能進行。幸運地是,現實生活中的應用程序並不要求g函數應用於列表中的所有元素,因此,列表中元素可以被分為多個邏輯組,並將fold操作並行地應用在這些邏輯組上即可。由此,fold操作也可以以並行的方式高效完成。 MapReduce有兩個常見地內置高階函數map和reduce,其map就類似於上述過程中的map操作,reduce對應於上述過程中的fold操作。只不過,MapReduce的執行框架能自行協調map與reduce並將其應用於在商業服務器硬件平台上並行處理海量數據。 更為精確地說,MapReduce有三個相互關聯卻各不相同的概念。首先,MapReduce是一個如上所述的函數式編程語言。其次,MapReduce也是一個運行框架,它能夠協調運行基於MapReduce思想開發的程序。最后,MapReduce還可以被看作編程模型和執行框架的軟件實現,如Google的專有實現和另一個開源實現Hadoop等。
1.4 mapper和reducer
鍵值對兒(Key-value pair)是MapReduce的基礎數據結構。Key和Value可以是基礎類型數據,如整數、浮點數、字符串或未經加工的字節數據,也可以是任意形式的復雜數據類型。程序員可以自行定義所需的數據類型,也可借助於Protocol Buffer、Thrift或Avro提供的便捷方式完成此類工作。 MapReduce算法設計的工作之一就是在給定數據集上定義“鍵-值”數據結構,比如在搜索引擎搜集、存儲網頁類工作中,key可以使用URL來表示,而value則是網頁的內容。而在有些算法中,Key也可以是沒有任何實際意義的數據,其在數據處理過程中可被安全忽略。在MapReduce中,程序員需要基於如下方式定義mapper和reducer: map: (k1,v1)-->[(k2,v20)] reduce: (k2,[v2])-->[(k3,v3)] 其中[...]意味着其可以是一個列表。這些傳遞給MapReduce進行處理的數據存儲於分布式文件上,mapper操作將應用於每一個傳遞過來的鍵-值對並生成一定數量的中間鍵值對(intermediate key-value),而后reduce操作將應用於這些中間鍵值對並輸出最終的鍵值對。然而,mapper操作和reducer操作之間還隱含着一個應用於中間鍵值對的“分組”操作,同一個鍵的鍵值對需要被歸類至同一組中並發送至同一個reducer,而傳送給每個reducer的分組中的鍵值對是基於鍵進行排序后的列表。reducer生成的結果將會保存至分布式文件系統,並存儲為一個或多個以r(即reducer號碼)結尾的文件,但mapper生成的中間鍵值對數據則不會被保存。 在Hadoop中,mapper和reducer是分別由MAP和REDUCE方法實現的對象。每個map任務(接收一個稱作input split的鍵值對列表)都被初始化一個mapper對象,並會由執行框架為每個輸入的鍵值對調用一次其map方法。程序員可以配置啟動的map任務個數,但其真正啟動的數目則由執行框架根據數據的物理分布最終給定。類似地,每個reduce任務由REDUCE方法初始化為一個reduce對象,並會由執行框架為其接受的每個中間鍵值對調用一次REDUCE方法,所不同的是,程序員可以明確限定啟動的reduce任務的個數。 mapper和reducer可以直接在各自接收的數據上執行所需要的操作,然而,當使用到外部資源時,多個mapper或reducer之間可能會產生資源競爭,這勢必導致其性能下降,因此,程序員必須關注其所用資源的競爭條件並加入適當處理。其次,mapper輸出的中間鍵值對與接受的鍵值對可以是不同的數據類型,類似地,reducer輸出的鍵值對與其接收的中間鍵值對也可以是不同的數據類型,這可能會給編程過程及程序運行中的故障排除帶來困難,但這也正是MapReduce強大功能的體現之一。 除了常規的兩階段MapReduce處理流外,其還有一些變化形式。比如將mapper輸出的結果直接保存至磁盤中(每個mapper對應一個文件)的沒有reducer的MapReduce作業,不過僅有reducer而沒有mapper的作業是不允許的。不過,就算用不着reducer處理具體的操作,利用reducer將mapper的輸出結果進行重新分組和排序后進行輸出也能以另一種形式提供的完整MapReduce模式。 MapReduce作業一般是通過HDFS讀取和保存數據,但它也可以使用其它滿足MapReduce應用的數據源或數據存儲,比如Google的MapReduce實現中使用了Bigtable來完成數據的讀入或輸出。BigTable屬於非關系的數據庫,它是一個稀疏的、分布式的、持久化存儲的多維度排序Map,其設計目的是可靠的處理PB級別的數據,並且能夠部署到上千台機器上。在Hadoop中有一個類似的實現HBase可用於為MapReduce提供數據源和數據存儲。
1.5 Hadoop運行框架
MapReduce程序也稱作為MapReduce作業,一般由mapper代碼、reducer代碼以及其配置參數(如從哪兒讀入數據,以及輸出數據的保存位置)組成。准備好的作業可通過JobTracker(作業提交節點)進行提交,然后由運行框架負責完成后續的其它任務。這些任務主要包括如下幾個方面。 (1) 調度 每個MapReduce作業都會划分為多個稱作任務(task)的較小單元,而較大的作業划分的任務數量也可能會超出整個集群可運行的任務數,此時就需要調度器程序維護一個任務隊列並能夠追蹤正在運行態任務的相關進程,以便讓隊列中處於等待狀態的任務派送至某轉為可用狀態的節點運行。此外,調度器還要負責分屬於不同作業的任務協調工作。 對於一個運行中的作業來說,只有所用的map任務都完成以后才能將中間數據分組、排序后發往reduce作業,因此,map階段的完成時間取決於其最慢的一個作業的完成時間。類似的,reduce階段的最后一個任務執行結束,其最終結果才為可用。因此,MapReduce作業完成速度則由兩個階段各自任務中的掉隊者決定,最壞的情況下,這可能會導致作業長時間得不到完成。出於優化執行的角度,Hadoop和Google MapReduce實現了推測執行(Speculative execution)機制,即同一個任務會在不同的主機上啟動多個執行副本,運行框架從其最快執行的任務中取得返回結果。不過,推測執行並不能消除其它的滯后場景,比如中間鍵值對數據的分發速度等。 (2) 數據和代碼的協同工作(data/code co-location) 術語“數據分布”可能會帶來誤導,因為MapReduce盡力保證的機制是將要執行的代碼送至數據所在的節點執行,因為代碼的數據量通常要遠小於要處理的數據本身。當然,MapReduce並不能消除數據傳送,比如在某任務要處理的數據所在的節點已經啟動很多任務時,此任務將不得不在其它可用節點運行。此時,考慮到同一個機架內的服務器有着較充裕的網絡帶寬,一個較優選擇是從數據節點同一個機架內挑選一個節點來執行此任務。 (3) 同步(Synchronization) 異步環境下的一組並發進程因直接制約而互相發送消息而進行互相合作、互相等待,使得各進程按一定的速度執行的過程稱為進程間同步,其可分為進程同步(或者線程同步)和數據同步。就編程方法來說,保持進程間同步的主要方法有內存屏障(Memory barrier),互斥鎖(Mutex),信號量(Semaphore)和鎖(Lock),管程(Monitor),消息(Message),管道(Pipe)等。MapReduce是通過在map階段的進程與reduce階段的進程之間實施隔離來完成進程同步的,即map階段的所有任務都完成后對其產生的中間鍵值對根據鍵完成分組、排序后通過網絡發往各reducer方可開始reduce階段的任務,因此這個過程也稱為“shuffle and sort”。 (4) 錯誤和故障處理(Error and fault handling) MapReduce運行框架本身就是設計用來容易發生故障的商用服務器上了,因此,其必須有着良好的容錯能力。在任何類別的硬件故障發生時,MapReduce運行框架均可自行將運行在相關節點的任務在一個新挑選出的節點上重新啟動。同樣,在任何程序發生故障時,運行框架也要能夠捕獲異常、記錄異常並自動完成從異常中恢復。另外,在一個較大規模的集群中,其它任何超出程序員理解能力的故障發生時,MapReduce運行框架也要能夠安全挺過。
1.6 partitioner和combiner
除了前述的內容中的組成部分,MapReduce還有着另外兩個組件:partiontioner和combiner。 Partitioner負責分割中間鍵值對數據的鍵空間(即前面所謂的“分組”),並將中間分割后的中間鍵值對發往對應的reducer,也即partitioner負責完成為一個中間鍵值對指派一個reducer。最簡單的partitioner實現是將鍵的hash碼對reducer進行取余計算,並將其發往余數對應編號的reducer,這可以盡力保證每個reducer得到的鍵值對數目大體上是相同的。不過,由於partitioner僅考慮鍵而不考慮“值”,因此,發往每個reducer的鍵值對在鍵數目上的近似未必意味着數據量的近似。 Combiner是MapReduce的一種優化機制,它的主要功能是在“shuffle and sort”之前先在本地將中間鍵值對進行聚合,以減少在網絡上發送的中間鍵值對數據量。因此可以把combiner視作在“shuffle and sort”階段之前對mapper的輸出結果所進行聚合操作的“mini-reducer”。在實現中,各combiner之間的操作是隔離的,因此,它不會涉及到其它mapper的數據結果。需要注意的是,就算是某combiner可以有機會處理某鍵相關的所有中間數據,也不能將其視作reducer的替代品,因為combiner輸出的鍵值對類型必須要與mapper輸出的鍵值對類型相同。無論如何,combiner的恰當應用將有機會有效提高作業的性能。
三、分布式文件系統(DFS)
前面我們已經了解了Hadoop中實現的MapReduce是一個編程模型和運行框架,它能夠通過JobTracker接收客戶提交的作業而后將其分割為多個任務后並行運行在多個TaskTracker上。而問題是,這些TaskTracker如何高效獲取所要處理的數據?
在傳統的高性能集群中,計算節點和存儲節點是各自獨立的,它們之間通過高速網絡完成互聯,然而,在面臨海量數據處理的問題時,網絡必然會成為整個系統的性能瓶頸,這就需要引入超高速的網絡如萬兆以太網或Infiniband。然而,對大數場景來講它們屬於“奢侈品”,且昂貴的投入並不能帶來網絡性能的線性提升,因此性價比不高。面對這種問題,MapReduce采取了將計算節點與存儲節點合二為一的集群模型,它利用分布式文件系統將數據存儲於多個節點上,而后讓處理過程在各數據節點本地直接進行,從而極大地降低了數據通過網絡傳送的需求。不過,這里仍然需要說明的是,MapReduce並非依賴於分布式文件系統,只不過運行在非分布式文件系統的MapReduce的諸多高級特性將無用武之地。
事實上,分布式文件系統並非MapReduce帶來的新生事物,只不過,MapReduce站在前人的基礎上將分布式文件系統進行了改造以使得它更能夠適用於在MapReduce中完成海量數據處理。Google為在他們的MapReduce中實現的分布式文件系統為GFS(Google File System),而Hadoop的實現稱作HDFS(Hadoop Distributed File System)。
2.1 HDFS的設計理念
HDFS專為存儲大文件而設計,可運行於普通的商業服務器上,基於流式數據訪問模型完成數據存取。HDFS將所有文件的元數據存儲於名稱節點(NameNode)的內存中,能夠利用分布式特性高效地管理“大”文件(GB級別甚至更大的文件),對於有着海量小文件的應用場景則會給名稱節點帶去巨大壓力並使得其成為系統性能瓶頸。再者,HDFS為MapReduce的計算框架而設計,存儲下來數據主要用於后續的處理分析,其訪問模型為“一次寫入、多次讀取”;因此,數據在HDFS中存儲完成后,僅能在文件尾部附加新數據,而不能對文件進行修改。另外,HDFS專為了高效地傳輸大文件進行了優化,其為了完成此目標,在“低延遲”特性上做出了很大讓步,因此,其不適用於較小訪問延遲的應用。
2.2 HDFS架構
2.2.1 HDFS數據塊
與傳統文件系統一樣,HDFS也在“塊(block)”級別存取文件,所不同的是,傳統文件系統數據塊一般較小(1KB、2KB或4KB等),HDFS的數據塊大小默認為64MB,甚至可以使用128MB或256MB級別的數據塊。HDFS使用塊抽象層管理文件,可以實現將分塊分為多個邏輯部分后分布於多個存儲節點,也能夠有效簡化存儲子系統。而對於存儲節點來說,較大的塊可以減少磁盤的尋道次數,進而提升I/O性能。
2.2.2 名稱節點(NameNode)和數據節點(DataNode)
HDFS集群中節點的工作模型為“master-worker”:其包含一個名稱節點(master)和多個數據節點(worker)。 名稱節點負責管理HDFS的名稱空間,即以樹狀結構組織的目錄及文件的元數據信息,這些信息持久存儲於名稱節點本地磁盤上並保存為名稱空間鏡像(namespace image)和編輯日志(edit log)兩個文件。名稱節點並不存儲數據塊,它僅需要知道每個文件對應數據塊的存儲位置,即真正存儲了數據塊的數據節點。然而,名稱節點並不會持久存儲數據塊所與其存儲位置的對應信息,因為這些信息是在HDFS集群啟動由名稱節點根據各數據節點發來的信息進行重建而來。這個重建過程被稱為HDFS的安全模式。數據節點的主要任務包括根據名稱節點或客戶的要求完成存儲或讀取數據塊,並周期性地將其保存的數據塊相關信息報告給名稱節點。 默認情況下,HDFS會在集群中為每個數據塊存儲三個副本以確保數據的可靠性、可用性及性能表現。在一個大規模集群中,這三個副本一般會保存至不同機架中的數據節點上以應付兩種常見的故障:單數據節點故障和導致某機架上的所有主機離線的網絡故障。另外,如前面MapReduce運行模型中所述,為數據塊保存多個副本也有利於MapReduce在作業執行過程中透明地處理節點故障等,並為MapReduce中作業協同處理以提升性能提供了現實支撐。名稱節點會根據數據節點的周期性報告來檢查每個數據塊的副本數是否符合要求,低於配置個數要求的將會對其進行補足,而多出的將會被丟棄。 HDFS提供了POSIX網絡的訪問接口,所有的數據操作對客戶端程序都是透明的。當客戶端程序需要訪問HDFS中的數據時,它首先基於TCP/IP協議與名稱節點監聽的TCP端口建立連接,接着通過客戶端協議(Client Protocol)發起讀取文件的請求,而后名稱節點根據用戶請求返回相關文件的塊標識符(blockid)及存儲了此數據塊的數據節點。接下來客戶端向對應的數據節點監聽的端口發起請求並取回所需要數據塊。當需要存儲文件並寫數據時,客戶端程序首先會向名稱節點發起名稱空間更新請求,名稱節點檢查用戶的訪問權限及文件是否已經存在,如果沒有問題,名稱空間會挑選一個合適的數據節點分配一個空閑數據塊給客戶端程序。客戶端程序直接將要存儲的數據發往對應的數據節點,在完成存儲后,數據節點將根據名稱節點的指示將數據塊復制多個副本至其它節點。
2.2.3 名稱節點的可用性
由前一節所述的過程可以得知,名稱節點的宕機將會導致HDFS文件系統中的所有數據變為不可用,而如果名稱節點上的名稱空間鏡像文件或編輯日志文件損壞的話,整個HDFS甚至將無從重建,所有數據都會丟失。因此,出於數據可用性、可靠性等目的,必須提供額外的機制以確保此類故障不會發生,Hadoop為此提供了兩種解決方案。 最簡單的方式是將名稱節點上的持久元數據信息實時存儲多個副本於不同的存儲設備中。Hadoop的名稱節點可以通過屬性配置使用多個不同的名稱空間存儲設備,而名稱節點對多個設備的寫入操作是同步的。當名稱節點故障時,可在一台新的物理主機上加載一份可用的名稱空間鏡像副本和編輯日志副本完成名稱空間的重建。然而,根據編輯日志的大小及集群規模,這個重建過程可能需要很長時間。 另一種方式是提供第二名稱節點(Secondary NameNode)。第二名稱節點並不真正扮演名稱節點角色,它的主要任務是周期性地將編輯日志合並至名稱空間鏡像文件中以免編輯日志變得過大。它運行在一個獨立的物理主機上,並需要跟名稱節點同樣大的內存資源來完成文件合並。另外,它還保存一份名稱空間鏡像的副本。然而,根據其工作機制可知,第二名稱節點要滯后於主節點,因此名稱節點故障時,部分數據丟失仍然不可避免。 盡管上述兩種機制可以最大程序上避免數據丟失,但其並不具有高可用的特性,名稱節點依然是一個單點故障,因為其宕機后,所有的數據將不能夠被訪問,進而所有依賴於此HDFS運行的MapReduce作業也將中止。就算是備份了名稱空間鏡像和編輯日志,在一個新的主機上重建名稱節點並完成接收來自各數據節點的塊信息報告也需要很長的時間才能完成。在有些應用環境中,這可能是無法接受的,為此,Hadoop 0.23引入了名稱節點的高可用機制——設置兩個名稱節點工作於“主備”模型,主節點故障時,其所有服務將立即轉移至備用節點。進一步信息請參考官方手冊。 在大規模的HDFS集群中,為了避免名稱節點成為系統瓶頸,在Hadoop 0.23版本中引入了HDFS聯邦(HDFS Federation)機制。HDFS聯邦中,每個名稱節點管理一個由名稱空間元數據和包含了所有塊相關信息的塊池組成名稱空間卷(namespace volume),各名稱節點上的名稱空間卷是互相隔離的,因此,一個名稱節點的損壞並不影響其它名稱節點繼續提供服務。進一步信息請參考官方手冊。
