Hadoop閱讀筆記(一)——強大的MapReduce


 

前言:來園子已經有8個月了,當初入園憑着滿腔熱血和一腦門子沖動,給自己起了個響亮的旗號“大數據 小世界”,頓時有了種世界都是我的,世界都在我手中的趕腳。可是......時光飛逝,歲月如梭~~~隨手一翻自己的博客,可視化已經快占據了半壁江山,思來想去,還是覺得把一直掛在嘴頭,放在心頭的大數據拿出來說說,哦不,是拿過來學學。入園前期寫了有關NutchSolr的自己的一些閱讀體會和一些嘗試,掛着大數據的旗號做着爬蟲的買賣。可是,時間在流失,對於大數據的憧憬從未改變,尤其是Hadoop一直讓我魂牽夢繞,打今兒起,開始着手自己的大數據系列,把別人擠牙膏的時間用在學習上,收拾好時間,收拾好資料,收拾好自己,重返Hadoop。

以下是對於大數據學習的一種預期規划:
主要理論指導材料:Hadoop實戰2
主要手段:敲代碼、結合API理解
預期目標:深入了解Hadoop,能為我所用

 

正文:記得去年還在學校寫小論文的時候,我花了一天的時間,懵懵懂懂的把Hadoop的環境給打起來了,今年出來接觸社會,由於各種原因,自己又搭了幾次偽分布式的環境,每次想學習Hadoop的心態好比每次背單詞,只要一背單詞,總是又從“abandon”開始背起。所以環境這塊就不多說了,網上這樣的帖子早已爛大街(因為Hadoop版本更新很快,目前應該是到2.6版本了,所以博文肯定一直在推陳出新)。用的Ubuntu12.0系統,因為之前一直弄的是0.20.2版本,后來也沒想着換,換也來不及了,0.20.2老朋友,可靠,還選他。倒是現在的Hadoop整個框架已經有所改變,HDFS還在,只是從0.23.0以后就不見了MapReduce的蹤跡,現在好像是重新洗牌編程了YARN,小弟懂得不多,大牛莫怪。。。

  今天是大數據系列第一槍,主要內容分為三個部分:1.先用理論知識壓壓場;2.通過Hadoop的HelloWorld程序講解下自己的疑惑與理解;3.借助專利數據集動手寫個簡單的MapReduce程序。

  1.首先來說說整個Hadoop大家族,然后粗略的了解下HDFS以及MapReduce。

  1.1.hadoop的子項目構成以及相應的配套服務圖:

  

  (1)Core:一系列分布式文件系統和通用I/O的組件和接口(序列化、Java RPC和持久化數據結構)
  (2)Avro:一種提供高效、跨語言RPC的數據序列系統,持久化數據存儲。Avro是Hadoop的一個子項目,由Hadoop的 創始人Doug Cutting(也是Lucene,Nutch等項目的創始人)牽頭開發。
  Avro是一個數據序列化系統,設計用於支持大 批量數據交換的應用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數據;動態語言友好,Avro提供的機制使動態語言可以方便地處理 Avro數據。
  (3)MapReduce:分布式數據處理模式和執行環境。
  (4)HDFS:分布式文件系統。
  (5)Pig:一種數據流語言和運行環境,用以檢索非常大的數據集。Pig運行在MapReduce和HDFS的集群上,是對大型數據集進行分析、評估的平台。
Pig是一種編程語言,它簡化了Hadoop常見的工作任務。Pig可加載數據、表達轉換數據以及存儲最終結果。Pig內置的操作使得半結構化數據變得有意義(如日志文件)。同時Pig可擴展使用Java中添加的自定義數據類型並支持數據轉換。
  (6)HBase:一個分布式的、列存儲數據庫。HBase使用HDFS作為底層存儲,同時支持MapReduce的批量式計算和點查詢(隨機讀取)。
  (7)ZooKeeper:一個分布式的,高可用性的協調服務。ZooKeeper提供分布式鎖之類的基本服務用於構建分布式應用。
  (8)Hive:分布式數據倉庫。Hive管理與HDFS總存儲的數據,並提供基於SQL的查詢語言(由運行時引擎翻譯成MapReduce作業)用以查詢數據。
Hive在Hadoop中扮演數據倉庫的角色。Hive添加數據的結構在HDFS(hive superimposes structure on data in HDFS),並允許使用類似於SQL語法進行數據查詢。與Pig一樣,Hive的核心功能是可擴展的。
  (9)Chukwa:分布式數據收集和分析系統。Chukwa運行HDFS中存儲數據的收集器,它使用MapReduce來生成報告。


  1.2HDFS采用了主從(Master/Slave)結構模型,一個HDFS集群是由一個NameNode和若干個DataNode組成的。其中NameNode作為主服務器,管理文件系統的命名空間和客戶端對文件的訪問操作;集群中的DataNode管理存儲的數據。NameNode執行文件系統的命名操作,比如打開、關閉、重命名文件或目錄等,它也負責數據塊到具體DataNode的映射。DataNode負責處理文件系統客戶端的文件讀寫請求,並在NameNode的同意調度下進行數據塊的創建、刪除和復制工作。

  MapReduce框架是由一個單獨運行在主節點的JobTracker和運行在每個集群從節點的TaskTracker共同組成的。主節點負責調度構成一個作業的所有任務,這些任務分布在不同的從節點上。主節點監控它們的執行情況,並且重新執行之前失敗的任務;從節點僅負責由主節點指派的任務。

  Hadoop的MapReduce模型是通過輸入key/value對進行運算得到輸出key/value對。其分為Map過程和Reduce過程。
  Map主要的工作是接收一個key/value對,產生一個中間key/value對,之后MapReduce把集合中所有相同key值的value放在一起並傳遞給Reduce函數。
  Reduce函數接收key和相關的value集合並合並這些value值,得到一個較小的value集合。
  下圖是MapReduce的數據流圖,體現了MapReduce處理大數據集的過程。這個過程就是將大數據分解為成百上千個小數據集,每個(或若干個)數據集分別由集群中的一個節點進行處理並生成的中間結果,然后這些中間結果又由大量的節點合並,形成最終結果。

  

 

  2.以小見大——從Hadoop的HelloWorld說起

  每種語言、框架都有屬於自己的“HelloWorld”,Hadoop也不例外,在下載的Hadoop包中就有example文件夾,里面提供了10+個例子,首當其沖,各種博文中出現頻率最高的當屬WordCount無疑,其算法思想簡單,又能結合大數據集做實驗,能夠很好的體現Hadoop為何物、有何用、如何用的特征。如果能把一個WordCount的每個細節都弄清楚了,基本上也算是掌握了Hadoop的大部分了,下面就來看看WordCount的原生代碼:

 1 package org.apache.hadoop.examples;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.GenericOptionsParser;
16 
17 public class WordCount {
18 
19   public static class TokenizerMapper 
20        extends Mapper<Object, Text, Text, IntWritable>{
21     
22     private final static IntWritable one = new IntWritable(1);
23     private Text word = new Text();
24       
25     public void map(Object key, Text value, Context context
26                     ) throws IOException, InterruptedException {
27       StringTokenizer itr = new StringTokenizer(value.toString());
28       while (itr.hasMoreTokens()) {
29         word.set(itr.nextToken());
30         context.write(word, one);
31       }
32     }
33   }
34   
35   public static class IntSumReducer 
36        extends Reducer<Text,IntWritable,Text,IntWritable> {
37     private IntWritable result = new IntWritable();
38 
39     public void reduce(Text key, Iterable<IntWritable> values, 
40                        Context context
41                        ) throws IOException, InterruptedException {
42       int sum = 0;
43       for (IntWritable val : values) {
44         sum += val.get();
45       }
46       result.set(sum);
47       context.write(key, result);
48     }
49   }
50 
51   public static void main(String[] args) throws Exception {
52     Configuration conf = new Configuration();
53     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
54     if (otherArgs.length != 2) {
55       System.err.println("Usage: wordcount <in> <out>");
56       System.exit(2);
57     }
58     Job job = new Job(conf, "word count");
59     job.setJarByClass(WordCount.class);
60     job.setMapperClass(TokenizerMapper.class);
61     job.setCombinerClass(IntSumReducer.class);
62     job.setReducerClass(IntSumReducer.class);
63     job.setOutputKeyClass(Text.class);
64     job.setOutputValueClass(IntWritable.class);
65     job.setInputFormatClass(TextInputFormat.class);
66     job.setInputFormatClass(TextInputFormat.class);
67     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
68     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
69     System.exit(job.waitForCompletion(true) ? 0 : 1);
70   }
71 }

  先且不談整個WordCount如何借助HDFS分布式文件系統進行數據存取操作,這里我們看看WordCount的MapReducer如何處理。

  2.1何為MapReduce:

  MapReduce顧名思義,由Map和Reduce兩部分組成,通俗點說,Map用於將數據集分拆到集群中節點運行,而Reduce負責整合聚合最終結果輸出。那Hadoop為什么要廢如此周折又是分又是合,直接通過傳統的手段完成自己的代碼邏輯不是更簡單?是的,沒錯,對於一般操作,傳統手段也能辦到,設置更加簡潔,但是這里討論的背景是大數據,而Hadoop就是應這個背景而出現的。舉個不恰當的例子,原始社會,大家思想很簡單,生活也很簡單,上山砍點柴,自個兒就能背回家取暖,這樣基本能滿足自己的需求了,不會有任何問題。某天有個腦洞大開的先人想到如果能夠用木頭造個房子,大家的生活質量必定有所改善,不怕風吹雨淋了。那么問題來了,對樹木的需求量變大了,憑某個人的力量恐怕很難辦到,所以,他們也弄了個集群,找了很多人,每個人負責背點柴(運行任務),大家團隊協作,共同完成這個在個人面前龐大到難以完成的任務,此外,這個集群還可以隨機添加個體(節點),活量很多的時候老婆、孩子都去幫忙,稍微閑點的時候讓他們回家,不參與集群活動。

  所以說,面對當前日益增長的數據量,傳統單個pc或是服務器已經無法支撐或是成本很高,而Hadoop利用了看似繁雜的手段卻能有效的解決數據量瓶頸問題,它會將一個大數據集切割以Block為單位(如64M),將這些Block分別分配到相對空閑的節點上執行任務操作,經過一系列操作后,會將這些輸出作為Reduce的輸入,經過合並后得到最終的輸出結果,Map和Reduce中的所有輸入輸出都是以<key,value>的形式存在。整個過程就是Map和Reduce扮演的角色。MapReduce的數據變化歷程如下圖所示:

  

  2.2如何定義輸入輸出格式:

  從代碼中可以看出對於輸入文件的格式規范使用的是TextInputFormat,通過萬能的Hadoop API可以發現該類是extends FileInputFormat類,而FileInputFormat是實現了InputFormat接口的。那我們先來看下這個InputFormat是干嘛用的,在API中我們發現,MapReduce是依賴於InputFormat這個接口的,主要用來驗證具體任務的輸入格式;將輸入文件拆分為InputSplits。形象點說就是,當數據傳給Map時,Map會將經過拆分后的分片(InputSplit)送給InputFormat,InputFormat調用getRecordReader方法生成RecordReader,RecordReader在調用createKey和createValue方法創建出大家熟悉的符合Map格式要求的<key,value>鍵值對,所以說,InputFormat是為Map的輸入格式<key,value>服務的。與此相對應的OutputFormat類也是同樣的道理。

  這里,要特地強調一點自己對於整個WordCount是如何將文件輸入並切分以及如何讀取的疑惑以及理解:之前一直在想代碼StringTokenizer itr = new StringTokenizer(value.toString());是如何實現很多條記錄過來,比如一個文件中有二行文本,僅憑StringTokenizer如何完成切分,現在才知道因為有了TextInputFormat的約束,所以之前已經根據TextInputFormat的特性將文件中每行都划分出來,以行為單位向Map輸送數據,所以代碼中的StringTokenizer類只要對制表符或是空格進行分詞就可以了。舉例來說,有兩個文件:

  file1:hello world bye world

  file2:hello hadoop bye hadoop

  經過TextInputFormat格式限定后,就會將文件的每一行作為一條記錄,並將每行記錄轉換為<key,value>的形式,如下:

  file1:

  0  hello world bye world

  file2:

  0  hello hadoop bye hadoop

  這里兩個都是0,是因為兩個文件被分配到不同的Map中了。

 

  3.自己動手使用專利數據統計每條專利被引用的次數

  數據集:從NBER獲得,網址為:http://www.nber.org/patents

   其中包含專利引用數據集cite75_99.txt.

  具體代碼如下,主要是通過cite75_99.txt中的第二個屬性即被引用的屬性,進行計數,生成結果形式為<被引用的專利號,被引用的次數>,舉例來說,cite75_99.txt中的數據形式為:

CITTING CITED
     1   2
2 3
3 2
4 1
5 2

  CITTING表示專利號,CITED表示被引用的專利號,第一行表示專利1引用了專利2,所以從這個表來看,專利1和專利3分別被引用1次,專利2被應用3次。但是因為這個數據集相對來說比較大,有250+M,所以采用MapReduce進行處理。代碼如下:

 1 package org.apache.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.util.StringTokenizer;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.fs.Path;
 8 import org.apache.hadoop.io.IntWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapred.FileInputFormat;
11 import org.apache.hadoop.mapred.FileOutputFormat;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.util.GenericOptionsParser;
16 
17 public class Test1123 {
18 
19     /**
20      * @param args
21      */
22     public static class MapperClass extends Mapper<Object,Text,Text,IntWritable>{
23         public static final IntWritable one = new IntWritable(1);
24         public  Text text = new Text();
25         
26         public void map(Object key, Text value, Context context){
27             String citedPatent= value.toString().split(",")[1];
28             text.set(citedPatent);
29             try {
30                 context.write(text, one);
31             } catch (IOException e) {
32                 e.printStackTrace();
33             } catch (InterruptedException e) {
34                 e.printStackTrace();
35             }
36         }
37     }
38     
39     public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{
40         public IntWritable result = new IntWritable();
41         
42         public void reduce(Text key, Iterable<IntWritable> values, Context context){
43             int sum = 0;
44             for (IntWritable value:values){
45                 sum+=value.get();
46             }
47             result.set(sum);
48             try {
49                 context.write(key, result);
50             } catch (IOException e) {
51                 e.printStackTrace();
52             } catch (InterruptedException e) {
53                 e.printStackTrace();
54             }
55             
56         }
57         
58         
59     }
60     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
61         Configuration conf = new Configuration();
62         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
63         if (otherArgs.length != 2) {
64           System.err.println("Usage: wordcount <in> <out>");
65           System.exit(2);
66         }
67         Job job = new Job(conf, "Test1123");
68         
69         job.setJarByClass(Test1123.class);
70         job.setMapperClass(MapperClass.class);
71         job.setCombinerClass(ReducerClass.class);
72         job.setReducerClass(ReducerClass.class);
73         job.setOutputKeyClass(Text.class);
74         job.setOutputValueClass(IntWritable.class);
75         
76         org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
77         org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
78         System.exit(job.waitForCompletion(true) ? 0 : 1);
79         System.out.println("end");
80 
81     }
82 
83 }

  執行過程部分片段如下:

  Step1:

  

  Step2:

  

  Step3:

  

  Step4:

  

  全程耗時三分鍾,覺得跑的還是很easy的,今天就到這吧,后面還需要進行理論充電,歡迎各位大牛指教,如果有用,記得點贊哦^_^

  本文鏈接:《Hadoop閱讀筆記(一)——強大的MapReduce

 

友情贊助

如果你覺得博主的文章對你那么一點小幫助,恰巧你又有想打賞博主的小沖動,那么事不宜遲,趕緊掃一掃,小額地贊助下,攢個奶粉錢,也是讓博主有動力繼續努力,寫出更好的文章^^。

    1. 支付寶                          2. 微信

                      


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM