MapReduce概述
MapReduce是一種分布式計算模型,運行時不會在一台機器上運行.hadoop是分布式的,它是運行在很多的TaskTracker之上的.
在我們的TaskTracker上面跑的是Map或者是Reduce Task任務.
通常我們在部署hadoop taskTracker 的時候,我們的TaskTracker同時還是我們的Datanode節點.datanode和tasktracker總是部署在一起的.
MapReduce執行流程:
為什么要有多個datanode:
因為我們的datanode是專門用來存儲數據的,我們的數據很大,在一個節點上是存不下的,存不下的情況下,我們就把數據存放在多個節點上.
MapReduce:分布式計算模型.把我們的程序代碼分到所有的tasktracker節點上去運行.只處理當前datanode上的數據,datanode和程序代碼都在一台機器上處理,避免了網絡傳輸.
我們的代碼拿到tasktracker上去執行,我們的tasktracker執行的數據來源於datanode,我們的程序就把各個datanode上的數據給處理了.
reduce匯總的是這種map的輸出,map處理的數據來自於datanode,但是map程序處理后的結果不一定放在datanode中,可以放到linux磁盤.reduce處理的數據來自於各個數據處理節點的linux磁盤.reduce處理完之后的輸出放到datanode上.
如果有節點空閑,reduce節點就在空閑節點上運行,如果都跑程序,就隨機一個節點跑reduce
tasktracker處理的任務都是來自於datanode,處理數據是並行的.map處理完之后結果放到linux磁盤上.r educe程序的處理,是把map處理后linux磁盤上的數據都匯總到reduce節點處理,reduce處理完之后,將結果輸出到datanode上.
我們的數據是放在hdfs中,hdfs決定把數據是放在哪個datanode上的,決定的權利不在於我們的處理,而是在於hdfs.到底放在哪個datanode上不需要我們去關心.
datanode有副本,數據在進行存儲的時候,是把數據放到多個datanode上.
並行處理數據,把我們處理問題的應用程序放到各個存放數據的節點上進行處理,處理完之后獲得的是每一個本地的數據,通過redcue把各個本地的數據進行匯總起來,就得到一個最終的結果.reduce可以有多個.
原來集中式的數據處理方式,缺點是海量數據移動到一個數據處理節點上,程序運行的大量時間消耗在網絡傳輸上.串行,性能不好.
把計算程序放到存儲數據的各個節點上並行執行.map程序計算本地節點的數據,並行結束后,會有很多的中間結果,reduce程序是把Map程序運行的中間結果匯總到一起,作為最終結果.
原來的這種處理數據的方式,是把應用程序放到一個地方,然后海量的數據不斷的往這個應用上挪,它的大量時間消耗在網絡傳輸上還有磁盤的io上.程序處理起來並不復雜,因為數據量太大,所以把時間都耗費到這上面了.
我們改進行一下計算方法,把我們的小程序放到各個的數據節點上,map程序就去處理本機的數據,每一個map程序都去處理本機的數據,處理完之后,會得到多個中間結果.
map處理本地操作可以節省網絡傳輸,在本地就可以把數據處理了.map程序適合於計算的本地化.我們的Reduce程序不能實現計算的本地化,因為是匯總map的輸出,map的輸出必然會分布在很多的機器上.
我們的map是放在各個tasktracker上去執行的,就是把各個tasktracker的本地數據給處理掉,處理后會得到一個中間結果,reduce程序就會各個map處理的結果給匯總起來,mapreduce在這里就是這么一個過程,map是處理各個節點的.reduce是匯總map輸出的.
MapReduce是一個分布式計算模型,主要是用來處理海量數據的.
MapReduce原理:
MapReduce計算模型包括Map和Reduce兩個階段,我們用戶只需要處理map階段和reduce階段就行了.
1) map用來處理本機數據,在處理本地的數據時,需要想我的數據存放在本機的什么位置,我要進行什么樣的計算,計算結果我要放在本機的什么位置.這些東西都是由mapreduce框架給我們實現的,數據在哪,我們只需要知道hdfs就行了,數據處理之后的中間結果放在哪,這個也是mapreduce框架給我們做的,我們自己不需要管.
2) reduce是把map輸出的結果給匯總到一起,map輸出的結果在哪,怎樣傳輸到reduce中,我們開發人員也不需要管,我們只需要管數據匯總這一件事就可以了,處理之后的結果,只需要再寫進hdfs中就可以了,別的就不需要管了.
所以我們實現一個分布式計算還是比較簡單的,這里邊我們關心的是我們map處理的數據來自於hdfs,處理之后又會寫出到中間結果,reduce程序又會把我們的中間結果的數據拿過來進行處理.處理完成之后又會把結果寫出到hdfs中,在處理的過程中是在不斷的傳輸數據,數據傳輸的的方式是采用鍵值(key,value)對的形式.鍵值對也就是我們兩個函數的形參,輸入參數.
MapReduce執行流程:
Mapper任務處理的數據位於各個程序上的,處理完之后,會產生一個中間的輸出,Reduce就是專門處理Mapper產生的中間輸出的.reduce處理完之后,就會把結果作為一個中間結果輸出出來.Map任務和Reduce任務到底在那個TaskTracker上去執行,什么樣的tasktracker執行map任務,什么樣的taskTracker去執行Reduce任務,這個事不需要我們去關心,是框架中的JobTracker管理的.Jobtracker它里邊的這個程序來自於客戶的提交.客戶把我們的程序提交給Jobtracker之后,用戶就不需要參與了,JobTracker就會自動把我們的程序分配到TaskTracker上去執行,有的tasktracker上跑map,有的taskTracker上跑reduce.Map程序讀數據來自於hdfs,我們只需要告訴是哪個文件的路徑就可以了,別的不需要我們去管.MapReduce就會把我們的程序自動的運行,把原始的數據處理完產生中間數據,然后在處理,最終就會產生一個最終的結果,用戶看到的其實是最后的reduce輸出結果.
map任務處理完之后產生的數據位於我們各個節點本地的,也就是我們linux磁盤,而不是位於hdfs中.會起多個reduce任務,每個reduce任務會取每個map任務對應的數據,這樣reduce就會把各個map任務的需要的數據給拿到.
map和reduce之間數據分發的過程稱作shuffle過程,shuffle在細節中:map數據產生之后需要進行分區,每個reduce處理的數據就是不同map分區下的數據.reduce就會把所有map分區中的數據處理完之后寫出到磁盤中.
按官方的源碼步驟講會把shuffle歸結為reduce階段,map到reduce數據分發的過程叫做shuffle.
shuffle是把我們map中的數據分發到reduce中去的一個過程.
reduce執行完之后就直接結束了,直接寫出去.不會經過Jobtracker,但是會通知Jobtracker運行結束.
有幾個reduce就有幾個shuffle分發的過程.
map它只做本機的處理,處理完之后,是由reduce做匯總的.會讀取所有map中相同分區中的數據,所以shuffle可以認為是reduce的一部分,因為map執行完之后就已經結束了.
匯總節點是主動去其他節點要數據.reduce這個節點其實是知道各個map的,一些map執行完之后,會把數據寫到本地linux磁盤,那么我們的reduce就會通過http的協議把map端處理后的數據要過來.
JobTracker是管理者,TaskTracker是干活的,TaskTracker分map任務和reduce任務,那么map任務運行完成之后,會告訴JobTracker我寫完了,JobTracker一看map寫完之后,就會在一個TaskTracker起一個Reduce任務,把他們這些執行完畢之后的map任務的地址告訴reduce,reduce就會通過http協議去map那讀取數據.理解這些東西需要有JobTracker做管理,只要是出現他們之間做協調的時候,全部都是JobTracker做協調,管理的.哪個機器承擔reduce任務也是JobTracaker在接到任務之后分配好了的.因為TasktTracker只是工作者,本身沒有思考能力的,只有JobTracker有思考能力.
JobTracker分配的原理:在存儲數據的節點上起map任務,jobTracker怎么會知道哪些節點存放數據呢 這個需要問namenode,namenode會知道哪些Datanode會存放數據.
要處理的文件被划分為多少個block就會有多少個map.JobTracker 沒有存儲任何東西,只是一個管理角色.
map在輸出的時候會確定分成多少個區對應的就會有多少個reduce任務,數據分發的時候就會由shuffle的這個過程進行分發.所以說按道理來講的話,reduce分區的數量應該有map分區的數量來決定的.
map的個數由inputSplit的個數決定的.因為inputSplit的大小默認和block的大小一樣的.
hadoop的一個特點就是高容錯性,JobTracker會監控各個節點的map任務和reduce任務的執行情況,如果有一個map任務宕了,會啟用一個重啟機制,會再重啟一個mapper任務去執行.如果連續宕個三次左右,就不會重啟了.那么這個程序的整個運行就失敗了.會有一定的容錯性在里邊的,這個容錯性是由JobTracker來進行控制的.
map處理其他節點的block,我們用戶是沒法控制的.
有datanode的節點 殺死Tasktracker,我們的程序在運行的時候只能使用其他節點的block了.我們的處理的原始數據,不允許被很多的map任務處理,只允許被一個處理,我們的數據是分配到多個dataNode上的,那么這一個map勢必要讀取其他節點的block.
MapReduce的執行過程:
1.map任務處理:
1.1 讀取hdfs文件為內容,把內容中的每一行解析成一個個的鍵(key)值(value)對.文件總是有行的,鍵是字節的偏移量,值是每一行的內容,每一個鍵值對調用一次map函數.map函數處理輸入的每一行.
1.2 自定義map函數,寫自己的邏輯,對輸入的key,value(把每一行解析出的key,value)處理,轉換成新的key,value輸出.
1.3 對輸出的key,value進行分區.根據業務要求,把map輸出的數據分成多個區..
1.4 對不同分區上的數據,按照key進行排序,分組.相同key的value放到一個集合中.
1.5 把分組后的數據進行歸約.
2.reduce任務處理:
shuffle:把我們map中的數據分發到reduce中去的一個過程,分組還是在map這邊的.
2.1 每個reduce會接收各個map中相同分區中的數據.對多個map任務的輸出,按照不同的分區通過網絡copy到不同reduce節點.shuffle實際指的就是這個過程.
2.2 對多個map任務的輸出進行合並,排序.寫reduce函數自己的邏輯,對輸入的key,value處理,轉換成新的key,value輸出.
2.3 把reduce的輸出保存到新的文件中.
TaskTracker節點上如果跑的是map任務,我們的map任務執行完之后,就會告訴我們的JobTracker執行完畢,把這個數據讓我們的reduce來讀取.讀取的時機是一個map執行完畢之后讓reduce去處理獲取數據.
JobTracker只做管理和通知,數據只在map和reduce之間流動,准確的說,只會在TaskTracker之間流動.
排序是框架內置的.默認就有.分組不是減少網絡開銷,分組不是合並,只是把相同的key的value放到一起,並不會減少數據.
分組是給了同一個map中相同key的value見面的機會.作用是為了在reduce中進行處理.
map函數僅能處理一行,兩行中出現的這個單詞是無法在一個map中處理的.map不能處理位於多行中的相同的單詞.分組是為了兩行中的相同的key的value合並到一起.
在自定義MyMapper類內部定義HashMap處理的是一個block,在map方法內部定義處理的是一行.
在hadoop全局中不會有線程問題,因為hadoop起的是進程,不會有並發問題存在.
為什么hadoop不使用線程?
線程實際指的是在集中式開發下,通過線程,可以讓我們的並發量,處理的吞吐量上升,線程會帶來一個數據競爭的問題.hadoop中MapReduce是通過分布式多進程來實現高吞吐量,在里邊不會通過線程來解決問題,因為它里邊已經有很多的服務器,很多的線程了,沒有必要使用線程.