前言:來園子已經有8個月了,當初入園憑着滿腔熱血和一腦門子沖動,給自己起了個響亮的旗號“大數據 小世界”,頓時有了種世界都是我的,世界都在我手中的趕腳。可是......時光飛逝,歲月如梭~~~隨手一翻自己的博客,可視化已經快占據了半壁江山,思來想去,還是覺得把一直掛在嘴頭,放在心頭的大數據拿出來說說,哦不,是拿過來學學。入園前期寫了有關Nutch和Solr的自己的一些閱讀體會和一些嘗試,掛着大數據的旗號做着爬蟲的買賣。可是,時間在流失,對於大數據的憧憬從未改變,尤其是Hadoop一直讓我魂牽夢繞,打今兒起,開始着手自己的大數據系列,把別人擠牙膏的時間用在學習上,收拾好時間,收拾好資料,收拾好自己,重返Hadoop。
以下是對於大數據學習的一種預期規划:
主要理論指導材料:Hadoop實戰2
主要手段:敲代碼、結合API理解
預期目標:深入了解Hadoop,能為我所用
正文:記得去年還在學校寫小論文的時候,我花了一天的時間,懵懵懂懂的把Hadoop的環境給打起來了,今年出來接觸社會,由於各種原因,自己又搭了幾次偽分布式的環境,每次想學習Hadoop的心態好比每次背單詞,只要一背單詞,總是又從“abandon”開始背起。所以環境這塊就不多說了,網上這樣的帖子早已爛大街(因為Hadoop版本更新很快,目前應該是到2.6版本了,所以博文肯定一直在推陳出新)。用的Ubuntu12.0系統,因為之前一直弄的是0.20.2版本,后來也沒想着換,換也來不及了,0.20.2老朋友,可靠,還選他。倒是現在的Hadoop整個框架已經有所改變,HDFS還在,只是從0.23.0以后就不見了MapReduce的蹤跡,現在好像是重新洗牌編程了YARN,小弟懂得不多,大牛莫怪。。。
今天是大數據系列第一槍,主要內容分為三個部分:1.先用理論知識壓壓場;2.通過Hadoop的HelloWorld程序講解下自己的疑惑與理解;3.借助專利數據集動手寫個簡單的MapReduce程序。
1.首先來說說整個Hadoop大家族,然后粗略的了解下HDFS以及MapReduce。
1.1.hadoop的子項目構成以及相應的配套服務圖:

(1)Core:一系列分布式文件系統和通用I/O的組件和接口(序列化、Java RPC和持久化數據結構)
(2)Avro:一種提供高效、跨語言RPC的數據序列系統,持久化數據存儲。Avro是Hadoop的一個子項目,由Hadoop的 創始人Doug Cutting(也是Lucene,Nutch等項目的創始人)牽頭開發。
Avro是一個數據序列化系統,設計用於支持大 批量數據交換的應用。它的主要特點有:支持二進制序列化方式,可以便捷,快速地處理大量數據;動態語言友好,Avro提供的機制使動態語言可以方便地處理 Avro數據。
(3)MapReduce:分布式數據處理模式和執行環境。
(4)HDFS:分布式文件系統。
(5)Pig:一種數據流語言和運行環境,用以檢索非常大的數據集。Pig運行在MapReduce和HDFS的集群上,是對大型數據集進行分析、評估的平台。
Pig是一種編程語言,它簡化了Hadoop常見的工作任務。Pig可加載數據、表達轉換數據以及存儲最終結果。Pig內置的操作使得半結構化數據變得有意義(如日志文件)。同時Pig可擴展使用Java中添加的自定義數據類型並支持數據轉換。
(6)HBase:一個分布式的、列存儲數據庫。HBase使用HDFS作為底層存儲,同時支持MapReduce的批量式計算和點查詢(隨機讀取)。
(7)ZooKeeper:一個分布式的,高可用性的協調服務。ZooKeeper提供分布式鎖之類的基本服務用於構建分布式應用。
(8)Hive:分布式數據倉庫。Hive管理與HDFS總存儲的數據,並提供基於SQL的查詢語言(由運行時引擎翻譯成MapReduce作業)用以查詢數據。
Hive在Hadoop中扮演數據倉庫的角色。Hive添加數據的結構在HDFS(hive superimposes structure on data in HDFS),並允許使用類似於SQL語法進行數據查詢。與Pig一樣,Hive的核心功能是可擴展的。
(9)Chukwa:分布式數據收集和分析系統。Chukwa運行HDFS中存儲數據的收集器,它使用MapReduce來生成報告。
1.2HDFS采用了主從(Master/Slave)結構模型,一個HDFS集群是由一個NameNode和若干個DataNode組成的。其中NameNode作為主服務器,管理文件系統的命名空間和客戶端對文件的訪問操作;集群中的DataNode管理存儲的數據。NameNode執行文件系統的命名操作,比如打開、關閉、重命名文件或目錄等,它也負責數據塊到具體DataNode的映射。DataNode負責處理文件系統客戶端的文件讀寫請求,並在NameNode的同意調度下進行數據塊的創建、刪除和復制工作。
MapReduce框架是由一個單獨運行在主節點的JobTracker和運行在每個集群從節點的TaskTracker共同組成的。主節點負責調度構成一個作業的所有任務,這些任務分布在不同的從節點上。主節點監控它們的執行情況,並且重新執行之前失敗的任務;從節點僅負責由主節點指派的任務。
Hadoop的MapReduce模型是通過輸入key/value對進行運算得到輸出key/value對。其分為Map過程和Reduce過程。
Map主要的工作是接收一個key/value對,產生一個中間key/value對,之后MapReduce把集合中所有相同key值的value放在一起並傳遞給Reduce函數。
Reduce函數接收key和相關的value集合並合並這些value值,得到一個較小的value集合。
下圖是MapReduce的數據流圖,體現了MapReduce處理大數據集的過程。這個過程就是將大數據分解為成百上千個小數據集,每個(或若干個)數據集分別由集群中的一個節點進行處理並生成的中間結果,然后這些中間結果又由大量的節點合並,形成最終結果。

2.以小見大——從Hadoop的HelloWorld說起
每種語言、框架都有屬於自己的“HelloWorld”,Hadoop也不例外,在下載的Hadoop包中就有example文件夾,里面提供了10+個例子,首當其沖,各種博文中出現頻率最高的當屬WordCount無疑,其算法思想簡單,又能結合大數據集做實驗,能夠很好的體現Hadoop為何物、有何用、如何用的特征。如果能把一個WordCount的每個細節都弄清楚了,基本上也算是掌握了Hadoop的大部分了,下面就來看看WordCount的原生代碼:
1 package org.apache.hadoop.examples; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.util.GenericOptionsParser; 16 17 public class WordCount { 18 19 public static class TokenizerMapper 20 extends Mapper<Object, Text, Text, IntWritable>{ 21 22 private final static IntWritable one = new IntWritable(1); 23 private Text word = new Text(); 24 25 public void map(Object key, Text value, Context context 26 ) throws IOException, InterruptedException { 27 StringTokenizer itr = new StringTokenizer(value.toString()); 28 while (itr.hasMoreTokens()) { 29 word.set(itr.nextToken()); 30 context.write(word, one); 31 } 32 } 33 } 34 35 public static class IntSumReducer 36 extends Reducer<Text,IntWritable,Text,IntWritable> { 37 private IntWritable result = new IntWritable(); 38 39 public void reduce(Text key, Iterable<IntWritable> values, 40 Context context 41 ) throws IOException, InterruptedException { 42 int sum = 0; 43 for (IntWritable val : values) { 44 sum += val.get(); 45 } 46 result.set(sum); 47 context.write(key, result); 48 } 49 } 50 51 public static void main(String[] args) throws Exception { 52 Configuration conf = new Configuration(); 53 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 54 if (otherArgs.length != 2) { 55 System.err.println("Usage: wordcount <in> <out>"); 56 System.exit(2); 57 } 58 Job job = new Job(conf, "word count"); 59 job.setJarByClass(WordCount.class); 60 job.setMapperClass(TokenizerMapper.class); 61 job.setCombinerClass(IntSumReducer.class); 62 job.setReducerClass(IntSumReducer.class); 63 job.setOutputKeyClass(Text.class); 64 job.setOutputValueClass(IntWritable.class); 65 job.setInputFormatClass(TextInputFormat.class); 66 job.setInputFormatClass(TextInputFormat.class); 67 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 68 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 69 System.exit(job.waitForCompletion(true) ? 0 : 1); 70 } 71 }
先且不談整個WordCount如何借助HDFS分布式文件系統進行數據存取操作,這里我們看看WordCount的MapReducer如何處理。
2.1何為MapReduce:
MapReduce顧名思義,由Map和Reduce兩部分組成,通俗點說,Map用於將數據集分拆到集群中節點運行,而Reduce負責整合聚合最終結果輸出。那Hadoop為什么要廢如此周折又是分又是合,直接通過傳統的手段完成自己的代碼邏輯不是更簡單?是的,沒錯,對於一般操作,傳統手段也能辦到,設置更加簡潔,但是這里討論的背景是大數據,而Hadoop就是應這個背景而出現的。舉個不恰當的例子,原始社會,大家思想很簡單,生活也很簡單,上山砍點柴,自個兒就能背回家取暖,這樣基本能滿足自己的需求了,不會有任何問題。某天有個腦洞大開的先人想到如果能夠用木頭造個房子,大家的生活質量必定有所改善,不怕風吹雨淋了。那么問題來了,對樹木的需求量變大了,憑某個人的力量恐怕很難辦到,所以,他們也弄了個集群,找了很多人,每個人負責背點柴(運行任務),大家團隊協作,共同完成這個在個人面前龐大到難以完成的任務,此外,這個集群還可以隨機添加個體(節點),活量很多的時候老婆、孩子都去幫忙,稍微閑點的時候讓他們回家,不參與集群活動。
所以說,面對當前日益增長的數據量,傳統單個pc或是服務器已經無法支撐或是成本很高,而Hadoop利用了看似繁雜的手段卻能有效的解決數據量瓶頸問題,它會將一個大數據集切割以Block為單位(如64M),將這些Block分別分配到相對空閑的節點上執行任務操作,經過一系列操作后,會將這些輸出作為Reduce的輸入,經過合並后得到最終的輸出結果,Map和Reduce中的所有輸入輸出都是以<key,value>的形式存在。整個過程就是Map和Reduce扮演的角色。MapReduce的數據變化歷程如下圖所示:

2.2如何定義輸入輸出格式:
從代碼中可以看出對於輸入文件的格式規范使用的是TextInputFormat,通過萬能的Hadoop API可以發現該類是extends FileInputFormat類,而FileInputFormat是實現了InputFormat接口的。那我們先來看下這個InputFormat是干嘛用的,在API中我們發現,MapReduce是依賴於InputFormat這個接口的,主要用來驗證具體任務的輸入格式;將輸入文件拆分為InputSplits。形象點說就是,當數據傳給Map時,Map會將經過拆分后的分片(InputSplit)送給InputFormat,InputFormat調用getRecordReader方法生成RecordReader,RecordReader在調用createKey和createValue方法創建出大家熟悉的符合Map格式要求的<key,value>鍵值對,所以說,InputFormat是為Map的輸入格式<key,value>服務的。與此相對應的OutputFormat類也是同樣的道理。
這里,要特地強調一點自己對於整個WordCount是如何將文件輸入並切分以及如何讀取的疑惑以及理解:之前一直在想代碼StringTokenizer itr = new StringTokenizer(value.toString());是如何實現很多條記錄過來,比如一個文件中有二行文本,僅憑StringTokenizer如何完成切分,現在才知道因為有了TextInputFormat的約束,所以之前已經根據TextInputFormat的特性將文件中每行都划分出來,以行為單位向Map輸送數據,所以代碼中的StringTokenizer類只要對制表符或是空格進行分詞就可以了。舉例來說,有兩個文件:
file1:hello world bye world
file2:hello hadoop bye hadoop
經過TextInputFormat格式限定后,就會將文件的每一行作為一條記錄,並將每行記錄轉換為<key,value>的形式,如下:
file1:
0 hello world bye world
file2:
0 hello hadoop bye hadoop
這里兩個都是0,是因為兩個文件被分配到不同的Map中了。
3.自己動手使用專利數據統計每條專利被引用的次數
數據集:從NBER獲得,網址為:http://www.nber.org/patents
其中包含專利引用數據集cite75_99.txt.
具體代碼如下,主要是通過cite75_99.txt中的第二個屬性即被引用的屬性,進行計數,生成結果形式為<被引用的專利號,被引用的次數>,舉例來說,cite75_99.txt中的數據形式為:
| CITTING | CITED |
| 1 | 2 |
| 2 | 3 |
| 3 | 2 |
| 4 | 1 |
| 5 | 2 |
CITTING表示專利號,CITED表示被引用的專利號,第一行表示專利1引用了專利2,所以從這個表來看,專利1和專利3分別被引用1次,專利2被應用3次。但是因為這個數據集相對來說比較大,有250+M,所以采用MapReduce進行處理。代碼如下:
1 package org.apache.mapreduce; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapred.FileInputFormat; 11 import org.apache.hadoop.mapred.FileOutputFormat; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.util.GenericOptionsParser; 16 17 public class Test1123 { 18 19 /** 20 * @param args 21 */ 22 public static class MapperClass extends Mapper<Object,Text,Text,IntWritable>{ 23 public static final IntWritable one = new IntWritable(1); 24 public Text text = new Text(); 25 26 public void map(Object key, Text value, Context context){ 27 String citedPatent= value.toString().split(",")[1]; 28 text.set(citedPatent); 29 try { 30 context.write(text, one); 31 } catch (IOException e) { 32 e.printStackTrace(); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 } 37 } 38 39 public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{ 40 public IntWritable result = new IntWritable(); 41 42 public void reduce(Text key, Iterable<IntWritable> values, Context context){ 43 int sum = 0; 44 for (IntWritable value:values){ 45 sum+=value.get(); 46 } 47 result.set(sum); 48 try { 49 context.write(key, result); 50 } catch (IOException e) { 51 e.printStackTrace(); 52 } catch (InterruptedException e) { 53 e.printStackTrace(); 54 } 55 56 } 57 58 59 } 60 public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { 61 Configuration conf = new Configuration(); 62 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 63 if (otherArgs.length != 2) { 64 System.err.println("Usage: wordcount <in> <out>"); 65 System.exit(2); 66 } 67 Job job = new Job(conf, "Test1123"); 68 69 job.setJarByClass(Test1123.class); 70 job.setMapperClass(MapperClass.class); 71 job.setCombinerClass(ReducerClass.class); 72 job.setReducerClass(ReducerClass.class); 73 job.setOutputKeyClass(Text.class); 74 job.setOutputValueClass(IntWritable.class); 75 76 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 77 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 78 System.exit(job.waitForCompletion(true) ? 0 : 1); 79 System.out.println("end"); 80 81 } 82 83 }
執行過程部分片段如下:
Step1:

Step2:

Step3:

Step4:

全程耗時三分鍾,覺得跑的還是很easy的,今天就到這吧,后面還需要進行理論充電,歡迎各位大牛指教,如果有用,記得點贊哦^_^
本文鏈接:《Hadoop閱讀筆記(一)——強大的MapReduce》
友情贊助
如果你覺得博主的文章對你那么一點小幫助,恰巧你又有想打賞博主的小沖動,那么事不宜遲,趕緊掃一掃,小額地贊助下,攢個奶粉錢,也是讓博主有動力繼續努力,寫出更好的文章^^。
1. 支付寶 2. 微信

