主要內容:mapreduce整體工作機制介紹;wordcont的編寫(map邏輯 和 reduce邏輯)與提交集群運行;調度平台yarn的快速理解以及yarn集群的安裝與啟動。
1、mapreduce整體工作機制介紹
回顧第HDFS第一天單詞統計實例(HDFS版wordcount):
統計HDFS的/wordcount/input/a.txt文件中的每個單詞出現的次數——wordcount
但是,進一步思考:如果文件又多又大,用上面那個程序有什么弊端?
慢!因為只有一台機器在進行運算處理
從這個簡單的案例中我們收到一些啟發:
1、可以在任何地方運行程序,訪問HDFS上的文件並進行統計運算,並且可以把統計的結果寫回HDFS的結果文件中;
2、如何變得更快?核心思想:讓我們的運算程序並行在多台機器上執行!
3、面向接口編程,整個程序的主框架是通用的,使用業務接口編程,主流程形成通用的框架,寫好之后不需要修改,我們只要按照業務接口,提供具體的業務實現類,即可完成具體的業務操作。
幸運的是,hadoop中已經為我們提供了分布式計算的解決方案,就是mapreduce計算框架,用來在分布式環境下處理數據。
下圖是mapreduce整體工作機制的簡要介紹,后續還會有詳細的介紹。
mapreduce與我們之前自己寫個hdfs版本的wordcount一樣,都是運算程序,而且可以在分布式環境下,並行運行,他的主處理流程同我們的wordcount主處理流程一樣都是成形的計算框架設定好之后就不會改變了,框架中用到的統一的業務接口和業務方法(業務接口數據處理邏輯)需要我們實現並提供給框架。框架按照接口規定方法中設定好的參數要求,想特定類型和形式的數據傳給用戶提供的實現類,用戶接受框架傳入的參數,進行業務處理並將處理結果寫入接口中規定的數據緩存對象中。
mapreduce程序分倆個階段,整個運算流程分為map階段和reduce階段,分別有mapTask和reduceTask程序來實現。mapTask和reduceTask的大致工作機制如下。
mapTask可以在很多機器上運行,具體運行多少個mapTask要看要處理的數據總量有多少,這個過程由程序自動計算,無需我們擔心,計算好之后每個mapTask都會分到自己要處理的數據范圍,術語叫做數據切片;一般來講是這么計算的,賠了保證每個task處理的數據大體差不多,程序會將hdfs中的待處理的文件進行切片划分,默認一個切片(一個maptask要處理的數據范圍)128M大小。假如要處理的文件有:a.txt(200m)b.txt(500m)c.txt(100m)d.txt(120m)如果一個task程序負責一個文件,顯然是不公平不合理的,其實hadoop會按照128m大小為一個單位,對數據進行切片操作:a->2,b->4,c->1,d->1,總數據一共被切分成8個切片或者說8個任務,一個mapTask就處理一個切片或者說任務,一共需要8個maptask,那么就分配8個mapTask,這樣每個mapTask就明確了自己的任務(所有task的處理邏輯都一樣,都是上面提到的用戶提供的業務實現類,只不過是處理的數據范圍不同)和要處理的數據范圍,接下來就是啟動一批mapTask進行作業,當然如果文件很多很大,會需要很多的mapTask,至於一次啟動多少個task以及一台物理機器會運行多少個mapTask,這和你的集群規模以及運行配置有關。mapTask就是一個程序,一台機子上可以啟動過個mapTask,如果你你的集群只有兩台機器負責mapTask運算,理論上每台機器會分啟動4個maptask任務,但是若果機器性能有限,一次最多只能負載3個mapTask,也沒關系,只不過是先運行一批mapTask(3+3=6個)每個task都有自己的任務只執行自己分陪到的任務,運行結束后在啟動剩余的2個mapTask。所以不用擔心機器不夠用,既然任務分的很明確,可以每次運行一批mapTask,分批完成全部的。
maptask啟動后會干什么,這個過程已經在mapreduce中寫死了,每個mapTask會分到部分數據,然后一行一行的去讀數據,每讀一行數據,進行一次處理,具體的處理邏輯有用戶提供的接口實現類來完成(需要用戶提供具體的業務實現類,並且以某種方式通知mapreduce框架去調用哪一個實現類,可以通過配置文件或者參數的形式;mapTask將讀到的數據作為參數傳給業務方法,業務方法將處理的結果傳給mapTask)。
那么這樣還有一個問題,每個mapTask的處理數據范圍和結果都只是整個數據的一個局部,並非全局結果,如何得到全局結果,這就需要mapreduce的第二個階段,reduce階段進行局部數據匯總統計。
reduce階段有reduceTask程序來實現,可以在很多他機器上並行運行。reduceTask數量與mapTask數量沒有關系,reduceTask要整理mapTask產生的數據,就需要統一大家的數據形式,這里統一為key :value鍵值對的形式。mapTask產生的key:value需要傳遞給reduceTask,而且核心思想是同時要確保,相同key的數據必須傳遞給同一個reduceTask,這就需要mapTask和reduceTask之間的數據分發機制,shuffle機制:可以相同的key:value數據一定發給同一個reduceTask程序。
reduce Task 聚合操作具體做什么,聚合操作就是對key相同的一組數據進行處理,具體的聚合邏輯通過接口的方式暴露給用戶,由用戶來指定(同mapTask方式)。
reduce Task處理結果,reduce Task將最后的聚合結果寫入hdfs中,每個reduce Task最終形成一個文件,文件名稱默認是part-r+reduceTask的編號。
總結:
map階段,我們只需要提供具體的業務類,對mapTask讀到的一行數據進行處理
reduce階段,仍然需要我們提供具體的邏輯,對reduce拿到的一組相同key的kv數據進行處理
處理結果的傳遞:無論是map階段還是recude階段,數據處理結果的后續流程無需我們關系,我們只需要將各個階段的數據都交給人家提供好的context對象就好;map階段會將數據存着,將來想方設法地將數據結果傳遞給reduceTask,而且保證同一個key只給同一個reduce,reduce階段會將數據寫入hdfs,只要有一個結果key:value,就會往文件中追加一行。
2、wordcount示例
maptask每次度一行數據都會將數據作為參數傳遞給我們提給的業務接口實現類中的map方法(map(long l,String v ,context)map方法中的參數分別為,該行行首地址的偏移量,該行的數據,緩存對象),在wordcont程序中,map每次拿到maptask傳遞過來一行數據,首先將文本數據切分,形成單詞數據,直接將(word,1)形式的數據寫入context中,以便將來給reduce(context怎么緩存,后續會介紹),為什么里不做統計呢,將單行相同的單詞統計一下?單行數據統計之后,任然是單行的結果,最后還得在recue中統計,避免無意義的中將統計,我們最終只在reduce中進行統計。maptask通過shuffle機制將(word,1)形式的數據發給,reduce同時保證相同的key只發往同一個reduce,這些發過來的相同key的一組數據在reduce這邊落地成文件;文件中的參數如何給reduceTask的處理邏輯中共的reduce方法(reduce(k,value迭代器,context)參數分別是:一組數據的key,改組數據中的key都相同,任意一個都可以;value迭代器,可以不斷的取出下一個值,context對象)。每一組(相同key)數據調用一次reduce邏輯。
2.1、 wordcount程序整體運行流程示意圖
map階段: 將每一行文本數據變成<單詞,1>這樣的kv數據
reduce階段:將相同單詞的一組kv數據進行聚合:累加所有的v
注意點:mapreduce程序中,
map階段的進、出數據,
reduce階段的進、出數據,
類型都應該是實現了HADOOP序列化框架的類型,如:
String對應Text
Integer對應IntWritable
Long對應LongWritable
2.2、 編碼實現
WordcountMapper類開發

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * KEYIN :是map task讀取到的數據的key的類型,是一行的起始偏移量Long * VALUEIN:是map task讀取到的數據的value的類型,是一行的內容String * * KEYOUT:是用戶的自定義map方法要返回的結果kv數據的key的類型,在wordcount邏輯中,我們需要返回的是單詞String * VALUEOUT:是用戶的自定義map方法要返回的結果kv數據的value的類型,在wordcount邏輯中,我們需要返回的是整數Integer * * * 但是,在mapreduce中,map產生的數據需要傳輸給reduce,需要進行序列化和反序列化,而jdk中的原生序列化機制產生的數據量比較冗余,就會導致數據在mapreduce運行過程中傳輸效率低下 * 所以,hadoop專門設計了自己的序列化機制,那么,mapreduce中傳輸的數據類型就必須實現hadoop自己的序列化接口 * * hadoop為jdk中的常用基本類型Long String Integer Float等數據類型封住了自己的實現了hadoop序列化接口的類型:LongWritable,Text,IntWritable,FloatWritable * * * * * @author ThinkPad * */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 切單詞 String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
WordcountReducer類開發

import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; Iterator<IntWritable> iterator = values.iterator(); while(iterator.hasNext()){ IntWritable value = iterator.next(); count += value.get(); } context.write(key, new IntWritable(count)); } }
到這里map和reduce邏輯都寫完了,接下來就是
1)設置參數告訴mapTask調哪個類,reduceTask調用哪個類 ;
2)然后將代碼提交到集群去運行。
目前為止我們的工程中既有mapTask又有recuceTask,還有我們自己實現的邏輯類,是一個完整的工程,現在想要運行該程序,可不想單機版的程序那樣直接run main函數那么簡單,因為這是一個分布式的程序,它的運行需要依托一個平台,也就是說將來提交工程到集群中去運行的時候,哪些機子啟動mapTask,哪些機子啟動reduceTask等,啟動過程不是一個簡單的事情,而是一個復雜的調度過程,需要一套完整的調度系統或者說平台來進行管理,而hadoop中已經為我們提供了這樣的一個平台(yarn,也是一個集群,是一個分布式系統,同樣有很多服務程序)來完成上述工作。所以我們還需要在集群中安裝該平台,在安裝完之后,還需要寫代碼與該平台交互,將我們的工程jar包和配置參數告知平台,讓平台幫我們運行程序。
2.3、運行mapreduce程序
1、首先,為你的mapreduce程序開發一個提交job到yarn的客戶端類(模板代碼):
描述你的mapreduce程序運行時所需要的一些信息(比如用哪個mapper、reducer、map和reduce輸出的kv類型、jar包所在路徑、reduce task的數量、輸入輸出數據的路徑)
將信息和整個工程的jar包一起交給yarn
2、然后,將整個工程(yarn客戶端類+ mapreduce所有jar和自定義類)打成jar包
3、然后,將jar包上傳到hadoop集群中的任意一台機器上
4、最后,運行jar包中的(YARN客戶端類)
[root@hdp-04 ~]# hadoop jar wc.jar cn.edu360.hadoop.mr.wc.JobSubmitter
JobSubmitter客戶端類開發
細節,操作hdfs的用戶有權限要求,整個工程要打成jar包,因為job會向yarn集群上傳jar包;windows下提交job會有兼容性問題。

import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 用於提交mapreduce job的客戶端程序 * 功能: * 1、封裝本次job運行時所需要的必要參數 * 2、跟yarn進行交互,將mapreduce程序成功的啟動、運行 * @author ThinkPad * */ public class JobSubmitter { public static void main(String[] args) throws Exception { // 在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份 // 或者通過eclipse圖形化界面來設置 -DHADOOP_USER_NAME=root System.setProperty("HADOOP_USER_NAME", "root") ; Configuration conf = new Configuration(); // 1、設置job運行時要訪問的默認文件系統, map階段要去讀數據,reduce階段要寫數據 conf.set("fs.defaultFS", "hdfs://hdp-01:9000"); // 2、設置job提交到哪去運行:有本地模擬的方式local conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hdp-01"); // 3、如果要從windows系統上運行這個job提交客戶端程序,則需要加這個跨平台提交的參數 conf.set("mapreduce.app-submission.cross-platform","true"); // job中還要封裝個多的參數 Job job = Job.getInstance(conf); // 1、封裝參數:jar包所在的位置:因為job客戶端將來要把jar包(整個工程)發給yarn //job.setJar("d:/wc.jar"); job.setJarByClass(JobSubmitter.class);//動態獲取方式 // 2、封裝參數: 本次job所要調用的Mapper實現類、Reducer實現類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 3、封裝參數:本次job的Mapper實現類、Reducer實現類產生的結果數據的key、value類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path output = new Path("/wordcount/output"); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root"); if(fs.exists(output)){ fs.delete(output, true); } // 4、封裝參數:本次job要處理的輸入數據集所在路徑、最終結果的輸出路徑 FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job, output); // 注意:輸出路徑必須不存在 // 5、封裝參數:想要啟動的reduce task的數量(默認1),map task不需要設定,會根據數據集的大小自動切片計算。 job.setNumReduceTasks(2); // 6、提交job給yarn,等待集群完成,這是一個阻塞式方法 // 返回true表示mapreduce程序正常運行,false表示mapreduce程序運行失敗,可能是中間的某一步。 boolean res = job.waitForCompletion(true);//true便是吧Resource manager(會不斷的反饋信息)反饋回來的信息輸出。 //job.submit()//提交之后直接退出 //控制退出碼 System.exit(res?0:-1); } }
3、yarn快速理解
mapreduce程序應該是在很多機器上並行啟動,而且先執行map task,當眾多的maptask都處理完自己的數據后,還需要啟動眾多的reduce task,這個過程如果用用戶自己手動調度不太現實,需要一個自動化的調度平台——hadoop中就為運行mapreduce之類的分布式運算程序開發了一個自動化調度平台——YARN(yarn,是一個集群,是一個分布式系統,同樣有很多服務程序、包括:mapTask,reduceTask,mrappmaster)。
3.1、yarn的基本概念
yarn是一個分布式程序的運行調度平台,有很多服務程序,會運行在不同的機器上。
yarn中有兩大核心角色:主要起作用的是Resource Manager
1、Resource Manager
接受用戶(job客戶端)提交的分布式計算程序,並為其划分資源
管理、監控各個Node Manager上的資源情況,以便於均衡負載
2、Node Manager
管理它所在機器的運算資源(cpu + 內存)
負責接受Resource Manager分配的任務,創建容器(一個容器默認1G內存大小)、接收jar包,啟動程序、回收資源
比如客戶端提交了一個任務,並通過配置信息,指明需要12個容器運行,Resource Manager接收到客戶端請求,將12個容器的計算任務安排給node manager,然后客戶端去找對應的node manager進行交互,使用容器進行計算。
3.2、安裝yarn集群
yarn集群中有兩個角色:
主節點:Resource Manager 1台
從節點:Node Manager N台
node manager在物理上應該跟data node部署在一起,即與HDFS中的data node重疊在一起
resource manager在物理上應該獨立部署在一台專門的機器上,一般安裝在一台專門的機器上
3.2.1、修改配置文件
yarn-site.xml
<property> <name>yarn.resourcemanager.hostname</name> <value>hdp-04</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property>
<!-- 一個node manager內存大小 --> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property>
<!-- node manager 的cpu核數 -->
<property>
<name>
yarn.nodemanager.resource.cpu-vcores
</name>
<value>2</value>
</property>
其他的配置可以參考官網
一個運行mapTask reduceTask的容器,默認至少1G內存大小
mrappmaster至少需要1.5G,這個程序 在整個mapreduce程序中之啟動一個實例,它是所有mapTask和reduceTask的主管,且要先與task啟動,運行在某一台nodemanager機器提供的容器上(容器就是運算資源的抽象)。
3.2.2、復制到每一台機器上
3.2.3、啟動yarn集群
逐一啟動
批量啟動
在hdp-04上,修改hadoop的slaves(可以與hdfs共用)文件,列入要啟動nodemanager的機器
然后將hdp-04到所有機器的免密登陸配置好
然后,就可以用腳本啟動yarn集群:
該自動化腳本與hdfs自動化腳本不同,后者可以在任何的機器上執行,均能正常啟動hdfs,而前這只能在resouce manager機器上,因為該腳本啟動resouce manager是不會去看配置文件(雖然有),在哪里敲在哪里啟動,而啟動node manager時會看slaves文件。
sbin/start-yarn.sh #停止: sbin/stop-yarn.sh
3.2.4、檢查yarn的進程
用jps檢查yarn的進程,用web瀏覽器查看yarn的web控制台
啟動完成后,可以在windows上用瀏覽器訪問resourcemanager的web端口8088:
看resource mananger是否認出了所有的node manager節點
上圖中的紅框表示的內存數其實是有參數來配置的,默認值(一台nodemanager是8G內存,而且是8核),與實際值嚴重不同。cup核數是一個虛擬值,假設node manager對應的物理機器的可用內存是2G,核數1,而我們在通過配置參數指定了最大的內存為2G,核數為2核,其實將物理機器的1核算力平均分成了兩份,作為node manager的2核。