Hadoop 系列(一)文件讀寫過程及MR過程


最近把自己學習到的知識捋一捋,發現現在除了spark和hive別的沒有能拿的出手的,雖然java也會但是只是限制於能寫東西。
想把知識體系好好補充一下,就開始hadoop系列的文章,好好的把hadoop從頭到尾學習一下。

一:文件IO流程

文件讀流程

    

    1.client打開DistributesFileSystem API(集群文件系統的API)  open方法

    2.調用API的get塊信息的方法(拿到所有的塊信息)

    3.打開FSDataInputStream API(讀取數據的API),一個塊三個副本(三台機器),(就近)找一個的機器去根據塊信息讀取對應的數據

    4.一個塊讀取完成之后,在元數據里面找到下一個塊最近的datanode

    5.將所有的塊拿過來之后,在客戶端進行拼接成一個完成的文件

    6.關閉鏈接(資源)

文件寫流程

 

    1.client打開DistributesFileSystem API(集群文件系統的API)  create方法

    2.傳入文件的相關信息(文件名稱,文件大小,文件擁有者)返回文件切成幾個塊,哪一個塊放在哪一個文件上。

    3.打開FSDataOutputStream API(寫取數據的API),將一個塊寫到一個機器上,這個機器在同步到其他機器上

    4.文件總體完成之后在告訴Namenode 寫文件成功

    5.關閉鏈接(資源)

MR 過程

 

    1.將每個文件存為不同的block,將block進行切分操作操作(影響map數)

    2.有可能有多個maptask線程並發執行,具體執行看代碼怎么去寫。(輸出和輸入必須是鍵值對的形式)

    3.將相同的數據shuffle到同一個節點里面去執行reduce。(reduce個數決定於map的輸出)

    4.將結果輸出到output

Shuffle 過程

    1.Input--map(read in memory )--partation(決定reduce個數)--sort--split to disk---fetch(將一個機器上的map合並成一個文件)

    2.fetch(key相同的數據合並成一個文件)--merge --reduce(+1操作)--輸出數據

 備注:內存緩存區約為100M,緩存區快滿的時候(split.percent 0.8約80M)需要有一個線程將數據刷到磁盤,這個過程叫溢寫。該線程會對這些數據作排序操作。

 c

    MR詳細流程(Shuffle過程在其中蘊涵)

    1.(map--split階段)將輸入文件進行切割操作,最大塊(64M)成為一個文件,大於的文件要切成兩個。決定map個數

    2.(map--map階段)將文件讀取進來,進行自定義的map操作

    3.(map--溢寫階段)讀文件進內存快滿了的時候,進行partation(決定reduce個數)、sort(可以執行Combiner map端聚合數值value成為2或者文件)、split to disk 。

    4.(map--merge階段)將map輸出的多個文件進行merge操作。(將value寫成一個數組)

    5.(reduce--讀取數據階段)將文件讀取進來,進行merge操作(value寫成一個數組)寫到臨時文件里面

    6.(reduce--reduce階段)臨時文件里面的數據進行自定義reduce操作

MR Helloworld

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        //將單詞輸出為<單詞,1>
        for(String word:words){
            //相同的單詞分發給相同的reduce
            context.write(new Text(word),new IntWritable(1));
        }
    }
}
class WordcountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        /*
         * key--一組相同單詞kv對的key
         * */
        int count =0;

        for(IntWritable value:values){
            count += value.get();
        }
        context.write(key,new IntWritable(count));
    }
}
public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        FileSystem fs= FileSystem.get(conf);
        String outputPath = "/software/java/data/output/";
        fs.delete(new Path(outputPath),true);

        Job job = Job.getInstance(conf);
        job.setJarByClass(WordcountDriver.class);
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);


        FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/"));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        //將job配置的參數,以及job所用的java類所在的jar包提交給yarn去運行
        //job.submit();
        boolean res = job.waitForCompletion(true);
    }

}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.example</groupId>
  <artifactId>BigData</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
    <scala.version>2.7.0</scala.version>
    <hadoop.version>2.7.7</hadoop.version>
  </properties>


  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-app</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-hs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-examples</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
  </build>
</project>

日志:在resource下面加上一個log4j.properties文件。這樣可以打印更多的日志

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=DEBUG, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM