hadoop Streaming的使用


1.streaming的作用

Haoop支持用其他語言來編程,需要用到名為Streaming的通用API。

Streaming主要用於編寫簡單,短小的MapReduce程序,可以通過腳本語言編程,開發更快捷,並充分利用非Java庫。

HadoopStreaming使用Unix中的流與程序交互,從stdin輸入數據,從stdout輸出數據。實際上可以用任何命令作為mapper和reducer。數據流示意如下:

cat [intput_file] | [mapper] | sort | [reducer] > [output_file]

2.使用方法

使用如下命令:

hadoop jar contrib/streaming/hadoop-streaming-0.20.203.0.jar  \

            >-input cite75_99.txt  \

           > -output output  \

           >-mapper 'cut -f 2 -d ,'  \

           >-reducer 'uniq'

第一行表示使用的StreamingAPI,位於圖中位置得jar包中

-input和 -output參數用於設置輸入輸出文件或目錄

-mapper和-reducer通過引號中得參數進行設定,分別進行了截取第二列數據,

uniq進行了排序去重。

注意:每行是完全按照字母方式排序,因為Streaming完全采用文本方式處理數據,而不知道其他得數據類型。輸出結果如下:

kqiao@ubuntu:~/hadoop-0.20.203.0$ hadoop fs -cat outputStreaming/part-00000 | head -10
"CITED"    
1    
10000    
100000    
1000006    
1000007    
1000011    
1000017    
1000026    
1000033

...... 

3.用Streaming處理鍵值對

默認情況下,Streaming使用\t分離記錄中得鍵和值,當沒有\t時,整個記錄被視為鍵,值為空白文本。

不同於AttributeMax.py為每個鍵尋找最大值,這次我們試着為每個國家找到專利聲明數的平均值。(Hadoop包含得名為Aggregate包,可以為每個鍵尋找最大值)

(1)Streaming中得mapper通過STDIN讀取一個 分片,並將每一行提取為一個記錄。Mapper可以選擇是把每條記錄翻譯為一個鍵值對,還是一行文本

            此步從 輸入文件到<k1,v1>

  (2) 對於mapper輸出的每一行,Streaming API將之翻譯為用\t分隔的鍵值對,類似於MapReduce中的划分,可以用pationer來處理鍵。最終所有鍵一致的key/value進入相同reducer。

   (3)沒個reducer以鍵為基准排序鍵值對,如同在Java模式中, 相同鍵的鍵值對被結組為一個鍵和一列值。reducer處理這些分組。

    (4)在實踐中,reducer的輸出(STDOUT)被寫入到一個文件中(由-output指定)

  對AverageByAttributeMaper.py:

<<<<<<<<<<<<<<<<<<<Mapper   daima>>>>>>>>>>>>>>>>>>>

  • 無reducer方式運行時:-D mapred.reducer.tasks=0

      輸出由行組成:一個國家代碼  \t   一個計數值      ,           並且其順序與輸入記錄一致

  • 以IdentityReducer方式運行,設置-D mapred.reducer.tasks=1(這種方式只要不設置-reducer選項即可)

執行結束可以看到 雖然每行的內容與上一個相同,但是順序被重排,鍵相同的“結組”在一起。可以根據這些信息考慮自己的reducer設計:

AverageByAttributeReducer.py——將相同鍵的值求和計數,在遇到新的鍵或到文件尾時,計算前一個鍵的平均值並輸出到STDOUT中。

<<<<<<<<<<<<<<<<<<<<<<Reducer  daima>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

這時可以運行完整的MapReduce作業。會得到真正的平均值文件。

4.通過Aggregate包使用Streaming

  Hadoop有一個成為Aggregate軟件包,極大簡化數據集的匯總統計。尤其在使用Streaming時。

在Streaming中Aggregate包作為Reducer做聚集統計,只需提供一個mapper處理記錄並以特定格式輸出。輸出每行如下:

function:key\tvalue

function為值聚合函數的名稱(由Aggregate包中預定義函數獲得),接着一組 鍵值對,值聚合函數列表如下:

如果要計算每年授權的專利數,考慮編寫MapReduce程序的方法:

可以使mapper的輸出將年設置為key,而value的值恆為1。這樣reducer只需要對所有的1求和即可。使用基於Aggregate包的Streaming來實現:AttributeCount.py

#!/usr/bin/env python
import sys
index = int(sys.argv[1])
for line in sys.stdin:
    fields = line.split(",")
    print "LongValueSum:" + fields[index] + "\t" + "1"

關鍵語句:print  "LongValueSum:" + fields[index] + "\t" + "1"。。按指定格式( function:key\tvalue) 打印到輸出!

運行如下:

hadoop jar contrib/streaming/hadoop-streaming-0.20.203.jar  \
-file  AttributeCount.py    \
-input  apat63_99.txt    \
-output  output   \
-mapper 'AttributeCount.py 1'   \
-reducer aggregate


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM