想把知識體系好好補充一下,就開始hadoop系列的文章,好好的把hadoop從頭到尾學習一下。
一:文件IO流程
文件讀流程
1.client打開DistributesFileSystem API(集群文件系統的API) open方法
2.調用API的get塊信息的方法(拿到所有的塊信息)
3.打開FSDataInputStream API(讀取數據的API),一個塊三個副本(三台機器),(就近)找一個的機器去根據塊信息讀取對應的數據
4.一個塊讀取完成之后,在元數據里面找到下一個塊最近的datanode
5.將所有的塊拿過來之后,在客戶端進行拼接成一個完成的文件
6.關閉鏈接(資源)
文件寫流程
1.client打開DistributesFileSystem API(集群文件系統的API) create方法
2.傳入文件的相關信息(文件名稱,文件大小,文件擁有者)返回文件切成幾個塊,哪一個塊放在哪一個文件上。
3.打開FSDataOutputStream API(寫取數據的API),將一個塊寫到一個機器上,這個機器在同步到其他機器上
4.文件總體完成之后在告訴Namenode 寫文件成功
5.關閉鏈接(資源)
MR 過程
1.將每個文件存為不同的block,將block進行切分操作操作(影響map數)
2.有可能有多個maptask線程並發執行,具體執行看代碼怎么去寫。(輸出和輸入必須是鍵值對的形式)
3.將相同的數據shuffle到同一個節點里面去執行reduce。(reduce個數決定於map的輸出)
4.將結果輸出到output
Shuffle 過程
1.Input--map(read in memory )--partation(決定reduce個數)--sort--split to disk---fetch(將一個機器上的map合並成一個文件)
2.fetch(key相同的數據合並成一個文件)--merge --reduce(+1操作)--輸出數據
備注:內存緩存區約為100M,緩存區快滿的時候(split.percent 0.8約80M)需要有一個線程將數據刷到磁盤,這個過程叫溢寫。該線程會對這些數據作排序操作。
c
MR詳細流程(Shuffle過程在其中蘊涵)
1.(map--split階段)將輸入文件進行切割操作,最大塊(64M)成為一個文件,大於的文件要切成兩個。決定map個數
2.(map--map階段)將文件讀取進來,進行自定義的map操作
3.(map--溢寫階段)讀文件進內存快滿了的時候,進行partation(決定reduce個數)、sort(可以執行Combiner map端聚合數值value成為2或者文件)、split to disk 。
4.(map--merge階段)將map輸出的多個文件進行merge操作。(將value寫成一個數組)
5.(reduce--讀取數據階段)將文件讀取進來,進行merge操作(value寫成一個數組)寫到臨時文件里面
6.(reduce--reduce階段)臨時文件里面的數據進行自定義reduce操作
MR Helloworld
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); //將單詞輸出為<單詞,1> for(String word:words){ //相同的單詞分發給相同的reduce context.write(new Text(word),new IntWritable(1)); } } } class WordcountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { /* * key--一組相同單詞kv對的key * */ int count =0; for(IntWritable value:values){ count += value.get(); } context.write(key,new IntWritable(count)); } } public class WordcountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "file:///"); FileSystem fs= FileSystem.get(conf); String outputPath = "/software/java/data/output/"; fs.delete(new Path(outputPath),true); Job job = Job.getInstance(conf); job.setJarByClass(WordcountDriver.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/software/java/data/input/")); FileOutputFormat.setOutputPath(job, new Path(outputPath)); //將job配置的參數,以及job所用的java類所在的jar包提交給yarn去運行 //job.submit(); boolean res = job.waitForCompletion(true); } }
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>BigData</artifactId> <version>1.0-SNAPSHOT</version> <inceptionYear>2008</inceptionYear> <properties> <scala.version>2.7.0</scala.version> <hadoop.version>2.7.7</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-app</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-hs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-examples</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> </build> </project>
日志:在resource下面加上一個log4j.properties文件。這樣可以打印更多的日志
# Set root logger level to DEBUG and its only appender to A1. log4j.rootLogger=DEBUG, A1 # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n