在很多人的眼里,Map-Reduce等於Hadoop,沒有Hadoop談Map-Reduce猶如自上談兵,實則不然,Map-Reduce是一種計算模型,只是非常適合在並行的環境下運行,Hadoop是Map-Reduce的一種實現,沒有Hadoop照樣可以跑Map-Reduce程序。python就內置有map()和reduce方法(雖然與hadoop的map-reduce有區別)。
這篇文章主要介紹如何用python在linux的管道進行map-reduce編程,本文寫的所有map-reduce程序都可以原封不動的放在Hadoop下運行,關於用Hadoop Streaming調用python的map-reduce程序,可以參考這篇文章(非常詳細,推薦看看),本文主要通過幾個實例來講解如何用map-reduce來處理具體的問題。
一、python的內置函數map reduce
首先來看看python的兩個內置函數map()和reduce()
1、map
map(function, iterable, ...)
map對迭代器的每個元素調用function函數,在Python2中,將得到的結果組成一個list,而在python3中結果是一個迭代器。function的參數只有一個,即為迭代器的元素,看一個實例:
>>> d = [1,2,3,4,5] >>> m = map(lambda x:x*2,d) >>> print m [2, 4, 6, 8, 10]
這里的map可以理解為一種映射,將迭代器中的元素經過function映射到另一個值。
2、reduce
reduce(function, iterable[, initializer])
reduce可以理解為規約,它對迭代器從左到右累計調用function,function有兩個參數,第一個是之前的累積值,第二個當前規約的元素,實例:
>>> d = [1,2,3,4,5] >>> r = reduce(lambda x,y:x+y,d) >>> r 15
利用reduce定義一個求前n個整數之和的函數
>>> def accuPlus(n): return reduce(lambda x,y:x+y,range(1,n+1)) >>> print accuPlus(10) 55 >>> print accuPlus(500) 125250 >>>
上面介紹的python的兩個內置函數與我接下來要介紹的map-reduce還是有一些區別,適用於分布式下的map-redece主要是以鍵值對的形式處理數據,map階段發射很多鍵值對出去,然后按鍵排序,而reduce則對鍵相同的鍵值對進行處理,也可認為是一種規約。
二、用map-reduce做詞頻統計
詞頻統計可以說是map-reduce的Hello World程序,它簡單明了,卻描述了map-reduce的基本原理,統計中文的詞頻得先要分詞,目前有一些免費的分詞軟件,中科院和哈工大的都還不錯,現在我們有下面這樣一篇已經分好詞的文章(data.txt)需要進行詞頻統計:
雙子座 的 你 是 這樣 的 嗎 1 手機 不 離 身 睡覺 不 關機 2 對待 不同 的 人 有 不同 的 性格 3 從 小 懂得 很多 道理 但 知 行 往往 難以 合 一 4 有 時候 很 神經 有時候 很 鎮靜 5 會 因為 別人 一 句 話 傷心 但 不 會 被 發現 6 很 會 安慰 別人 卻 不 會 安慰 自己 7 會 經常 懷念 從 前
定義map的處理(mapper.py)如下:
1 #!encoding=utf-8 2 3 from sys import stdin 4 5 def mapprint(word): 6 '''定義map函數對元素的處理方式,這里直接打印''' 7 print word 8 9 #對每行進行統計 10 for line in stdin: 11 words = line.strip().split(' ') 12 map(mapprint,words)
mapper.py從標准輸入流中對每一行進行處理,然后將每個單詞打印到標准輸出,輸出之后需要對詞進行排序,才能被reduce處理,利用管道我們來看看mapper.py對data.txt進行處理:
$ cat data.txt |python mapper.py |sort
輸出的部分結果如下:
-
安慰 安慰 被 別人 別人 不 不 不 不 不同 不同 從 從 但 但 道理
接下來我們看看reducer.py,mapper的輸出排序后,相同的元素會出現在一起,我們可以從上往下掃描所有的元素,並維持一個計數器,遇到相同的詞就把計數器加1,遇到不相同的詞,就說明上一個詞已經統計完畢了,可以將這個詞及其詞頻輸出到標准輸出:
1 #!encoding=utf-8 2 from sys import stdin 3 4 last = None 5 count = 0 6 for line in stdin: 7 word = line.strip() 8 if word != last:#遇到不同的詞 9 if last: 10 print '%s\t%d' % (last,count) 11 last = word 12 count = 0 13 count += 1 14 #輸出最后一個詞 15 if last: 16 print '%s\t%d' % (last,count) 17
結合map和reduce,整個詞頻統計執行如下:
$ cat data.txt |python mapper.py | sort |python reducer.py
輸出的詞頻結果:
1 1 2 1 3 1 4 1 5 1 6 1 7 1 安慰 2 被 1 別人 2 不 4 不同 2 從 2 但 2 道理 1 的 4 懂得 1 對待 1 發現 1 關機 1 合 1 很 3 很多 1 話 1 懷念 1 會 5 經常 1 句 1 離 1 嗎 1 難以 1 你 1 前 1 卻 1 人 1 傷心 1 身 1 神經 1 時候 1 是 1 手機 1 雙子座 1 睡覺 1 往往 1 小 1 行 1 性格 1 一 2 因為 1 有 2 有時候 1 這樣 1 鎮靜 1 知 1 自己 1
為什么要把這個問題復雜化,完全可以用一個程序來處理統計詞頻這事,看下面的count.py
1 from collections import defaultdict 2 from sys import stdin 3 4 m = defaultdict(int) 5 for line in stdin: 6 words = line.strip().split(' ') 7 for word in words: 8 m[word] += 1 9 10 for word,count in m.items(): 11 print '%s\t%s' % (word,count)
有兩個理由讓我偏向於使用map-reduce作數據處理:
- 首先map-reduce的思路清晰,它將一個問題分成兩步,map階段我們只需要把詞打印到標准輸出,再經過排序進入reduce階段,reduce對相同的詞進行累加;
- map-reduce的程序可以原封不動的在Hadoop上跑起來,大規模的數據可以完爆count.py,現在的編程應該傾向於人的易理解性與擴展性,而不是在小數據上提升那么一丁點性能。
二、用map-reduce做多表等值連接
假設我們有這樣一份某app的日志文件app.log,包含appId、IMEI、userInfo這三個字段(這里的講述表的等值連接不牽涉到userInfo字段,所以該字段都置為U):
appId IMEI userInfo 8111111 I86733062 U1 8111112 I86733010 U2 8111113 I86733048 U3 8111114 I86733012 U4 8111115 I86733020 U5 8111116 I86733063 U6 8111117 I86733042 U7 8111118 I86733022 U8 8111119 I86733016 U9 8111120 I86733027 U10
現在我們想挖掘更多的用戶信息,得到了一份微博的數據,微博的數據weibo.log包含weiboId、IMEI、weiboInfo(同意我們將weibInfo置為W):
weiboId IMEI weiboInfo 1287680 I86733017 W1 1287681 I86733048 W2 1287682 I86733015 W3 1287683 I86733047 W4 1287684 I86733020 W5 1287685 I86733051 W6 1287686 I86733022 W7 1287687 I86733036 W8
兩個表都有一個共同的字段,即手機的IMEI序列號,通過微博的IMEI與我們app日志文件IMEI合並,就可以找出一些app用戶的微博信息,進行進一步的分析,現在我們需要的是將既在app.log又在weibo.log中的用戶找出來。
合並app.log和weibo.log到aw.log,合並了之后是方便於map的處理,但我們需要標記每條記錄來自於哪個表,我們用A標准來自於app.log,用W標志weibo.log:
$ awk 'NR > 1 {printf "A\t%s\n",$0}' app.log > aw.log
$ awk 'NR > 1 {printf "W\t%s\n",$0}' weibo.log >> aw.log
這樣合並之后的aw.log如下:
A 8111111 I86733062 U1 A 8111112 I86733010 U2 A 8111113 I86733048 U3 A 8111114 I86733012 U4 A 8111115 I86733020 U5 A 8111116 I86733063 U6 A 8111117 I86733042 U7 A 8111118 I86733022 U8 A 8111119 I86733016 U9 A 8111120 I86733027 U10 W 1287680 I86733017 W1 W 1287681 I86733048 W2 W 1287682 I86733015 W3 W 1287683 I86733047 W4 W 1287684 I86733020 W5 W 1287685 I86733051 W6 W 1287686 I86733022 W7 W 1287687 I86733036 W8
現在我們來寫map的mapper.py,map的輸出應該以IMEI作為key,其他字段作為value,這樣經過排序后,才能保證在reduce階段,IMEI相同的app.log記錄和weibo.log記錄能夠連續出現,mapper.py實現如下:
1 #!encoding=utf-8 2 from sys import stdin 3 4 for line in stdin: 5 data = line.strip().split('\t') 6 if len(data) != 4:#過濾掉錯誤行 7 continue 8 #把IMEI放在最前面,以便以IMEI排序 9 print '%s\t%s\t%s\t%s' % (data[2],data[0],data[1],data[3])
mapper輸出並排序下:
$ cat aw.log |python mapper.py |sort -k1 I86733010 A 8111112 U2 I86733012 A 8111114 U4 I86733015 W 1287682 W3 I86733016 A 8111119 U9 I86733017 W 1287680 W1 I86733020 A 8111115 U5 I86733020 W 1287684 W5 I86733022 A 8111118 U8 I86733022 W 1287686 W7 I86733027 A 8111120 U10 I86733036 W 1287687 W8 I86733042 A 8111117 U7 I86733047 W 1287683 W4 I86733048 A 8111113 U3 I86733048 W 1287681 W2 I86733051 W 1287685 W6 I86733062 A 8111111 U1 I86733063 A 8111116 U6
接下來處理reduce了,reduce從流中不斷掃描行,當遇到相同的IMEI,並且一個來自weibo.log一個來自app.log,就把這兩條記錄拼接起來,reducer.py:
1 #!encoding=utf-8 2 from sys import stdin 3 4 wIMEI = None#記錄來自為微博的IMEI 5 weibo = None 6 7 aIMEI = None#記錄來自app的IMEI 8 app = None 9 10 for line in stdin: 11 data = line.strip().split('\t',2) 12 if len(data) != 3:#過濾錯誤的數據行 13 continue 14 if data[1] == 'A': 15 aIMEI = data[0] 16 app = data[2] 17 elif data[1] == 'W': 18 wIMEI = data[0] 19 weibo = data[2] 20 if wIMEI == aIMEI and wIMEI is not None:#兩個IMEI相等時連接兩行 21 print '%s\t%s\t%s' % (wIMEI,app,weibo) 22 aIMEI = wIMEI = None#重置 23 24 if wIMEI == aIMEI and wIMEI is not None:#最后的記錄不要忘記輸出 25 print '%s\t%s\t%s' % (wIMEI,app,weibo)
連接map-reduce,整個等值連接結果如下:
$ cat aw.log |python mapper.py |sort -k1 | python reducer.py I86733020 8111115 U5 1287684 W5 I86733022 8111118 U8 1287686 W7 I86733048 8111113 U3 1287681 W2
上述的map-reduce階段依然可以用一個程序跑,但日志文件往往比較大,幾GB到幾十GB也很正常,但我們的map-reduce總是能伸縮自如,用Hadoop不怕數據量大。
三、用map-reduce做矩陣的迭代運算
我們來看最后一個例子,一個矩陣的一下輪的元素等於其周圍四面八方所有元素以及自己的之和的平均值,比如下面矩陣:
1 2 3 4 5 6 7 8 9
下一輪矩陣為:
(1+2+4+5)/4 (1+2+3+4+5+6)/6 (2+3+5+6)/4
(1+2+4+5+7+8)/6 (1+2+3+4+5+6+7+8+9)/9 (2+3+5+6+8+9)/6
(4+5+7+8)/5 (4+5+6+7+8+9)/6 (5+6+8+9)/4
當矩陣非常大的時候,我們需要把矩陣轉為三元組的形式<x,y,value>,這樣方便map-reduce的處理,否則,矩陣非常大,矩陣的一行就能爆內存。比如上面的矩陣轉化為三元組后變為:
0,0,1 0,1,2 0,2,3 1,0,4 1,1,5 1,2,6 2,0,7 2,1,8 2,2,9
現在我們處理map,先理清map階段需要發射的key和value是什么,map掃描每一行得到的是矩陣的一個元素,而這個元素會參與周圍所有元素的均值計算,因此我們對每一個<x,y,value>輸出的鍵值對是(<xi,yj,>,<x,y,value>),<xi,yj,>是<x,y>周圍的坐標,map輸出后,按xi,yj順序排序,對所有key值相同的元素累加並求平均值,mapper.py:
1 #!encoding=utf-8 2 from sys import stdin 3 4 M = 3#矩陣的行數 5 N = 3#矩陣的列數 6 7 for line in stdin: 8 data = line.strip().split(',') 9 x,y = map(int,data[0:2]) 10 value = float(data[2]) 11 for i in xrange(-1,2): 12 if x + i < 0 or x + i >= M:#超出上下邊界 13 continue 14 for j in xrange(-1,2): 15 if y + j < 0 or y + j >= N:#超出左右邊界 16 continue 17 print '%d,%d\t%f' % (x+i,y+j,value)#為周圍的每個值計算提供貢獻
執行mapper.py,排序時注意一下,字段分割符是制表符'\t',實際上hadoop的默認分割符就是制表符:
$ cat data.txt |python mapper.py |sort -t$'\t'
mapper.py輸出的中間過程比較長,我們看一下部分結果:
0,0 1.000000 0,0 2.000000 0,0 4.000000 0,0 5.000000 0,1 1.000000 0,1 2.000000 0,1 3.000000 0,1 4.000000 0,1 5.000000 0,1 6.000000 0,2 2.000000 0,2 3.000000 0,2 5.000000 0,2 6.000000
接下來寫reducer.py,在reduce階段依舊是掃描每一行,並判斷是否與上一行的key相同,如果相同就累加value,如果不同,則計算平均值,並輸出到標准輸出:
1 from sys import stdin 2 last = None 3 count = 0 4 s = 0 5 for line in stdin: 6 p,v = line.strip().split('\t') 7 value = float(v) 8 if last != p: 9 if last: 10 print '%5s,%10f' % (last,s/count) 11 last = p 12 s = 0 13 count = 0 14 s += value 15 count += 1 16 if last: 17 print '%5s,%10f' % (last,s/count)
連接map-reduce執行:
$ cat data.txt |python mapper.py |sort -t$'\t'|python reducer.py
輸出如下:
0,0, 3.000000 0,1, 3.500000 0,2, 4.000000 1,0, 4.500000 1,1, 5.000000 1,2, 5.500000 2,0, 6.000000 2,1, 6.500000 2,2, 7.000000
其實這里,矩陣連續迭代到max(M,N)+3次之后,矩陣的每個元素都將趨向於整個矩陣的平均值,因為經過max(M,N)+3次迭代后,最左邊的元素已經傳遞到了最右邊,最上面的元素也已經傳遞到了最下邊,作為一個單機版的測試,我寫了迭代測試:
1 from os import system 2 3 lastMatrix = 'data.txt' 4 matrix = None 5 for i in xrange(1,4+3): 6 if matrix != None: 7 lastMatrix = matrix 8 matrix = 'data.'+str(i) 9 print i 10 cmd = 'cat '+lastMatrix+"|python mapper.py |sort -t'\t'|python reducer.py >"+matrix 11 system(cmd) 12 13 system('cat '+matrix)
迭代6次后輸出:
0,0, 4.937500 0,1, 4.953125 0,2, 4.968750 1,0, 4.984375 1,1, 5.000000 1,2, 5.015625 2,0, 5.031250 2,1, 5.046875 2,2, 5.062500
可以看到已經基本接近矩陣均值5了。
四、總結
上面介紹了三個實例,用map-reduce來處理數據,可以看到,按照這種思路寫代碼,不僅思路清晰,而且擴展性強,因為背后有hadoop在支撐。需要注意的是,上面的map-reduce代碼在hadoop中運行時,應該對每一行的處理加上一個try...except,過濾掉那些異常數據,因為海量數據里總是有一些噪聲數據,如果沒有try-except,hadoop雖然在失敗時會重試,但重試多次后任然失敗,那整個任務就會失敗了。
最后,完整的代碼在github上可以查看。
轉載請注明出處:http://www.cnblogs.com/fengfenggirl/