yarn是什么?
1、它是一個資源調度及提供作業運行的系統環境平台
資源:cpu、mem等
作業:map task、reduce Task
yarn產生背景?
它是從hadoop2.x版本才引入
1、hadoop1.x版本它是如何資源調度及作業運行機制原理
a、JobTracker(主節點)
(a):接受客戶端的作業提交
(b):交給任務調度器安排任務的執行
(c):通知空閑的TaskTracker去處理
(d): 與TaskTracker保持心跳機制
b、TaskTracker(從節點)
(a):執行map task和reduce task
(b): 與JobTracker保持心跳機制
缺點:
1、單點故障問題
2、負載壓力
3、只能運行mapreduce的程序
引入了yarn機制
1、減少負載壓力
2、主備機制
3、支持不同的程序運行
yarn整體的架構?
yarn主要的核心組件?
resourcemanager
作用:
(1)接受客戶端提交作業
(2)啟動一個app master去處理
資源分配
(3)監控nodemanager
nodemanager
作用:
(1)管理單個節點上的資源
(2)接受resourcemanager發送過來的指令
(3)接受app master發送過來的指令
(4) 啟動Container
app master
(1)運行作業的主控者
(2)獲取切片數據
(3)從resourcemanager審請運行作業資源
(4)監控作業運行的狀態
Container:
它其實就是一個虛擬主機的抽象,分配cpu和內存,主要運行作業
app master
Container
Client
yarn的工作機制(重點)
1、連接運行器平台
根據mapreduce.framework.name變量配置
如果等於yarn:則創建YARNRunner對象
如果等於Local:則創建LocalJobRunner對象
2、如果是yarn平台,對resoucemanager提交作業審請
3、resourcemanager返回一個jobid和數據保存目錄(hdfs://xxx/staging/xxx)
4、客戶端根據返回數據保存目錄路徑,將job.split、job.xml、jar文件提交到hdfs://xxx/staging/xxx目錄
5、提交數據資源之后,客戶端對resouremanager提交任務運行
6、resourcemanager將任務存儲任務隊列
7、resourcemanager發送命令nodemanager處理從任務取出的任務
8、nodemanager往resourcemanageer審請我要創建一個app master
a、在nodemanager創建一個container,再啟動app master
9、app master讀取數據切片處理方案
10、app master往resourcemanager審請運行資源
11、resourcemanager往空閑的nodemanager主機發送指令,要創建Container
12、app master往nodemanger發送運行指令,container運行任務。
如下圖:
是否可以直接從本地idea直接將程序運行到yarn平台?
以wordcount為例:
代碼如下:
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* * 作用:體現mapreduce的map階段的實現 * KEYIN:輸入參數key的數據類型 * VALUEIN:輸入參數value的數據類型 * KEYOUT,輸出key的數據類型 * VALUEOUT:輸出value的數據類型 * * 輸入: * map(key,value)=偏移量,行內容 * * 輸出: * map(key,value)=單詞,1 * * 數據類型: * java數據類型: * int-------------->IntWritable * long------------->LongWritable * String----------->Text * 它都實現序列化處理 * * */ public class WcMapTask extends Mapper<LongWritable, Text,Text, IntWritable> { /* *根據拆分輸入數據的鍵值對,調用此方法,有多少個鍵,就觸發多少次map方法 * 參數一:輸入數據的鍵值:行的偏移量 * 參數二:輸入數據的鍵對應的value值:偏移量對應行內容 * */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String words[]=line.split(" "); for (String word : words) { context.write(new Text(word),new IntWritable(1)); } } }
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* * 此類:處理reducer階段 * 匯總單詞次數 * KEYIN:輸入數據key的數據類型 * VALUEIN:輸入數據value的數據類型 * KEYOUT:輸出數據key的數據類型 * VALUEOUT:輸出數據value的數據類型 * * * */ public class WcReduceTask extends Reducer<Text, IntWritable,Text,IntWritable> { /* * 第一個參數:單詞數據 * 第二個參數:集合數據類型匯總:單詞的次數 * * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } context.write(key,new IntWritable(count)); } }
package com.gec.demo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable sum=new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for (IntWritable value : values) { count+=value.get(); } sum.set(count); context.write(key,sum); } }
package com.gec.demo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Hello world! * */ public class App { public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf=new Configuration(); // conf.set("fs.defaultFS","hdfs://hadoop-001:9000"); // conf.set("mapreduce.framework.name","yarn"); // conf.set("yarn.resourcemanager.hostname","hadoop-002"); conf.set("mapred.jar","D:\\JAVA\\projectsIDEA\\BigdataStudy\\mrwordcountbyyarn\\target\\wordcountbyyarn-1.0-SNAPSHOT.jar"); Job job=Job.getInstance(conf); //設置Driver類 job.setJarByClass(App.class); //設置運行那個map task job.setMapperClass(WcMapTask.class); //設置運行那個reducer task job.setReducerClass(WcReduceTask.class); job.setCombinerClass(WcCombiner.class); //設置map task的輸出key的數據類型 job.setMapOutputKeyClass(Text.class); //設置map task的輸出value的數據類型 job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定要處理的數據所在的位置 FileInputFormat.setInputPaths(job, "/wordcount/input/big.txt"); //指定處理完成之后的結果所保存的位置 FileOutputFormat.setOutputPath(job, new Path("/wordcount/output7")); //向yarn集群提交這個job boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
其中
是因為在resource文件夾中直接添加配置文件
配置文件分別如下:




注意:這里的配置文件要和虛擬機中的配置文件一樣,否則可能會出錯,最好的做法是從虛擬機中直接copy出來