Hadoop yarn工作流程詳解


yarn是什么?
1、它是一個資源調度及提供作業運行的系統環境平台
資源:cpu、mem等
作業:map task、reduce Task


yarn產生背景?
它是從hadoop2.x版本才引入
1、hadoop1.x版本它是如何資源調度及作業運行機制原理
a、JobTracker(主節點)
(a):接受客戶端的作業提交
(b):交給任務調度器安排任務的執行
(c):通知空閑的TaskTracker去處理
(d): 與TaskTracker保持心跳機制


b、TaskTracker(從節點)
(a):執行map task和reduce task
(b): 與JobTracker保持心跳機制

缺點:
1、單點故障問題
2、負載壓力
3、只能運行mapreduce的程序

引入了yarn機制
1、減少負載壓力
2、主備機制
3、支持不同的程序運行

yarn整體的架


yarn主要的核心組件?

 

 


resourcemanager


作用:
(1)接受客戶端提交作業
(2)啟動一個app master去處理
資源分配
(3)監控nodemanager


 

nodemanager


作用:
(1)管理單個節點上的資源
(2)接受resourcemanager發送過來的指令
(3)接受app master發送過來的指令
(4) 啟動Container


 

app master


(1)運行作業的主控者
(2)獲取切片數據
(3)從resourcemanager審請運行作業資源
(4)監控作業運行的狀態


 

Container


它其實就是一個虛擬主機的抽象,分配cpu和內存,主要運行作業

 

app master
Container
Client


 


yarn的工作機制(重點)
1、連接運行器平台
根據mapreduce.framework.name變量配置
如果等於yarn:則創建YARNRunner對象
如果等於Local:則創建LocalJobRunner對象

2、如果是yarn平台,對resoucemanager提交作業審請
3、resourcemanager返回一個jobid和數據保存目錄(hdfs://xxx/staging/xxx)
4、客戶端根據返回數據保存目錄路徑,將job.split、job.xml、jar文件提交到hdfs://xxx/staging/xxx目錄
5、提交數據資源之后,客戶端對resouremanager提交任務運行
6、resourcemanager將任務存儲任務隊列
7、resourcemanager發送命令nodemanager處理從任務取出的任務
8、nodemanager往resourcemanageer審請我要創建一個app master
a、在nodemanager創建一個container,再啟動app master
9、app master讀取數據切片處理方案
10、app master往resourcemanager審請運行資源
11、resourcemanager往空閑的nodemanager主機發送指令,要創建Container
12、app master往nodemanger發送運行指令,container運行任務。

如下圖:

 

 


是否可以直接從本地idea直接將程序運行到yarn平台?

以wordcount為例:

代碼如下:

package com.gec.demo;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/*
* 作用:體現mapreduce的map階段的實現
* KEYIN:輸入參數key的數據類型
* VALUEIN:輸入參數value的數據類型
* KEYOUT,輸出key的數據類型
* VALUEOUT:輸出value的數據類型
*
* 輸入:
*      map(key,value)=偏移量,行內容
*
* 輸出:
*      map(key,value)=單詞,1
*
* 數據類型:
* java數據類型:
* int-------------->IntWritable
* long------------->LongWritable
* String----------->Text
* 它都實現序列化處理
*
 * */
public class WcMapTask extends Mapper<LongWritable, Text,Text, IntWritable>
{
    /*
    *根據拆分輸入數據的鍵值對,調用此方法,有多少個鍵,就觸發多少次map方法
    * 參數一:輸入數據的鍵值:行的偏移量
    * 參數二:輸入數據的鍵對應的value值:偏移量對應行內容
    * */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line=value.toString();

        String words[]=line.split(" ");

        for (String word : words) {

            context.write(new Text(word),new IntWritable(1));
        }

    }
}
package com.gec.demo;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/*
* 此類:處理reducer階段
*   匯總單詞次數
* KEYIN:輸入數據key的數據類型
* VALUEIN:輸入數據value的數據類型
* KEYOUT:輸出數據key的數據類型
* VALUEOUT:輸出數據value的數據類型
*
*
* */
public class WcReduceTask extends Reducer<Text, IntWritable,Text,IntWritable>
{
    /*
    * 第一個參數:單詞數據
    * 第二個參數:集合數據類型匯總:單詞的次數
    *
    * */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count=0;

        for (IntWritable value : values) {

            count+=value.get();
        }

        context.write(key,new IntWritable(count));

    }
}
package com.gec.demo;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WcCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable sum=new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count=0;

        for (IntWritable value : values) {
            count+=value.get();
        }
        sum.set(count);
        context.write(key,sum);
    }
}
package com.gec.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf=new Configuration();
//        conf.set("fs.defaultFS","hdfs://hadoop-001:9000");
//        conf.set("mapreduce.framework.name","yarn");
//        conf.set("yarn.resourcemanager.hostname","hadoop-002");
        conf.set("mapred.jar","D:\\JAVA\\projectsIDEA\\BigdataStudy\\mrwordcountbyyarn\\target\\wordcountbyyarn-1.0-SNAPSHOT.jar");
        Job job=Job.getInstance(conf);
        //設置Driver類
        job.setJarByClass(App.class);

        //設置運行那個map task
        job.setMapperClass(WcMapTask.class);
        //設置運行那個reducer task
        job.setReducerClass(WcReduceTask.class);
        job.setCombinerClass(WcCombiner.class);

        //設置map task的輸出key的數據類型
        job.setMapOutputKeyClass(Text.class);
        //設置map task的輸出value的數據類型
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定要處理的數據所在的位置
        FileInputFormat.setInputPaths(job, "/wordcount/input/big.txt");
        //指定處理完成之后的結果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("/wordcount/output7"));
        //向yarn集群提交這個job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);

    }
}

 

其中

 

是因為在resource文件夾中直接添加配置文件

配置文件分別如下:

 

core-site.xml

 

hdfs-site.xml
mapred-site.xml
yarn-site.xml

 

 

 

 注意:這里的配置文件要和虛擬機中的配置文件一樣,否則可能會出錯,最好的做法是從虛擬機中直接copy出來

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM