用Map-Reduce的思維處理數據


  在很多人的眼里,Map-Reduce等於Hadoop,沒有Hadoop談Map-Reduce猶如自上談兵,實則不然,Map-Reduce是一種計算模型,只是非常適合在並行的環境下運行,Hadoop是Map-Reduce的一種實現,沒有Hadoop照樣可以跑Map-Reduce程序。python就內置有map()和reduce方法(雖然與hadoop的map-reduce有區別)。

  這篇文章主要介紹如何用python在linux的管道進行map-reduce編程,本文寫的所有map-reduce程序都可以原封不動的放在Hadoop下運行,關於用Hadoop Streaming調用python的map-reduce程序,可以參考這篇文章(非常詳細,推薦看看),本文主要通過幾個實例來講解如何用map-reduce來處理具體的問題。

一、python的內置函數map reduce

  首先來看看python的兩個內置函數map()和reduce()

1、map  

map(function, iterable, ...)

  map對迭代器的每個元素調用function函數,在Python2中,將得到的結果組成一個list,而在python3中結果是一個迭代器。function的參數只有一個,即為迭代器的元素,看一個實例:

>>> d = [1,2,3,4,5]
>>> m = map(lambda x:x*2,d)
>>> print m
[2, 4, 6, 8, 10]

   這里的map可以理解為一種映射,將迭代器中的元素經過function映射到另一個值。

2、reduce

reduce(function, iterable[, initializer])

  reduce可以理解為規約,它對迭代器從左到右累計調用function,function有兩個參數,第一個是之前的累積值,第二個當前規約的元素,實例:

>>> d = [1,2,3,4,5]
>>> r = reduce(lambda x,y:x+y,d)
>>> r
15

  利用reduce定義一個求前n個整數之和的函數 

>>> def accuPlus(n):
    return reduce(lambda x,y:x+y,range(1,n+1))

>>> print accuPlus(10)
55
>>> print accuPlus(500)
125250
>>> 

  上面介紹的python的兩個內置函數與我接下來要介紹的map-reduce還是有一些區別,適用於分布式下的map-redece主要是以鍵值對的形式處理數據,map階段發射很多鍵值對出去,然后按鍵排序,而reduce則對鍵相同的鍵值對進行處理,也可認為是一種規約。

二、用map-reduce做詞頻統計

  詞頻統計可以說是map-reduce的Hello World程序,它簡單明了,卻描述了map-reduce的基本原理,統計中文的詞頻得先要分詞,目前有一些免費的分詞軟件,中科院和哈工大的都還不錯,現在我們有下面這樣一篇已經分好詞的文章(data.txt)需要進行詞頻統計:

雙子座 的 你 是 這樣 的 嗎 1 手機 不 離 身 睡覺 不 關機 2 對待 不同 的 人 有 不同 的 性格 3 從 小 懂得 很多 道理 但 知 行 往往 難以 合 一 4 有 時候 很 神經 有時候 很 鎮靜 5 會 因為 別人 一 句 話 傷心 但 不 會 被 發現 6 很 會 安慰 別人 卻 不 會 安慰 自己 7 會 經常 懷念 從 前

  定義map的處理(mapper.py)如下:

 1 #!encoding=utf-8
 2 
 3 from sys import stdin
 4 
 5 def mapprint(word):
 6     '''定義map函數對元素的處理方式,這里直接打印'''
 7     print word
 8     
 9 #對每行進行統計
10 for line in stdin:
11     words = line.strip().split(' ')
12     map(mapprint,words)

  mapper.py從標准輸入流中對每一行進行處理,然后將每個單詞打印到標准輸出,輸出之后需要對詞進行排序,才能被reduce處理,利用管道我們來看看mapper.py對data.txt進行處理:

$ cat data.txt |python mapper.py |sort

  輸出的部分結果如下:

  • 安慰
    安慰
    被
    別人
    別人
    不
    不
    不
    不
    不同
    不同
    從
    從
    但
    但
    道理

  接下來我們看看reducer.py,mapper的輸出排序后,相同的元素會出現在一起,我們可以從上往下掃描所有的元素,並維持一個計數器,遇到相同的詞就把計數器加1,遇到不相同的詞,就說明上一個詞已經統計完畢了,可以將這個詞及其詞頻輸出到標准輸出:

 1 #!encoding=utf-8
 2 from sys import stdin
 3 
 4 last = None
 5 count = 0
 6 for line in stdin:
 7     word = line.strip()
 8     if word != last:#遇到不同的詞
 9         if last:
10             print '%s\t%d' % (last,count)
11         last = word
12         count = 0
13     count += 1
14 #輸出最后一個詞
15 if last:
16     print '%s\t%d' % (last,count)
17     

  結合map和reduce,整個詞頻統計執行如下:

$ cat data.txt |python mapper.py | sort |python reducer.py

  輸出的詞頻結果:

1    1
2    1
3    1
4    1
5    1
6    1
7    1
安慰    21
別人    24
不同    222
道理    14
懂得    1
對待    1
發現    1
關機    113
很多    11
懷念    15
經常    1111
難以    11111
傷心    11
神經    1
時候    11
手機    1
雙子座    1
睡覺    1
往往    111
性格    12
因為    12
有時候    1
這樣    1
鎮靜    11
自己    1

   為什么要把這個問題復雜化,完全可以用一個程序來處理統計詞頻這事,看下面的count.py

 1 from collections import defaultdict
 2 from sys import stdin
 3 
 4 m = defaultdict(int)
 5 for line in stdin:
 6         words = line.strip().split(' ')
 7         for word in words:
 8                 m[word] += 1
 9 
10 for word,count in m.items():
11         print '%s\t%s' % (word,count)

  有兩個理由讓我偏向於使用map-reduce作數據處理:

  • 首先map-reduce的思路清晰,它將一個問題分成兩步,map階段我們只需要把詞打印到標准輸出,再經過排序進入reduce階段,reduce對相同的詞進行累加;
  • map-reduce的程序可以原封不動的在Hadoop上跑起來,大規模的數據可以完爆count.py,現在的編程應該傾向於人的易理解性與擴展性,而不是在小數據上提升那么一丁點性能。

二、用map-reduce做多表等值連接

  假設我們有這樣一份某app的日志文件app.log,包含appId、IMEI、userInfo這三個字段(這里的講述表的等值連接不牽涉到userInfo字段,所以該字段都置為U):

appId      IMEI         userInfo
8111111    I86733062    U1
8111112    I86733010    U2
8111113    I86733048    U3
8111114    I86733012    U4
8111115    I86733020    U5
8111116    I86733063    U6
8111117    I86733042    U7
8111118    I86733022    U8
8111119    I86733016    U9
8111120    I86733027    U10

  現在我們想挖掘更多的用戶信息,得到了一份微博的數據,微博的數據weibo.log包含weiboId、IMEI、weiboInfo(同意我們將weibInfo置為W):

weiboId    IMEI    weiboInfo
1287680    I86733017    W1
1287681    I86733048    W2
1287682    I86733015    W3
1287683    I86733047    W4
1287684    I86733020    W5
1287685    I86733051    W6
1287686    I86733022    W7
1287687    I86733036    W8

  兩個表都有一個共同的字段,即手機的IMEI序列號,通過微博的IMEI與我們app日志文件IMEI合並,就可以找出一些app用戶的微博信息,進行進一步的分析,現在我們需要的是將既在app.log又在weibo.log中的用戶找出來。

  合並app.log和weibo.log到aw.log,合並了之后是方便於map的處理,但我們需要標記每條記錄來自於哪個表,我們用A標准來自於app.log,用W標志weibo.log:

$ awk 'NR > 1 {printf "A\t%s\n",$0}' app.log > aw.log
$ awk 'NR > 1 {printf "W\t%s\n",$0}' weibo.log >> aw.log

  這樣合並之后的aw.log如下:

A       8111111 I86733062       U1
A       8111112 I86733010       U2
A       8111113 I86733048       U3
A       8111114 I86733012       U4
A       8111115 I86733020       U5
A       8111116 I86733063       U6
A       8111117 I86733042       U7
A       8111118 I86733022       U8
A       8111119 I86733016       U9
A       8111120 I86733027       U10
W       1287680 I86733017       W1
W       1287681 I86733048       W2
W       1287682 I86733015       W3
W       1287683 I86733047       W4
W       1287684 I86733020       W5
W       1287685 I86733051       W6
W       1287686 I86733022       W7
W       1287687 I86733036       W8

  現在我們來寫map的mapper.py,map的輸出應該以IMEI作為key,其他字段作為value,這樣經過排序后,才能保證在reduce階段,IMEI相同的app.log記錄和weibo.log記錄能夠連續出現,mapper.py實現如下:

1 #!encoding=utf-8
2 from sys import stdin
3 
4 for line in stdin:
5         data = line.strip().split('\t')
6         if len(data) != 4:#過濾掉錯誤行
7                 continue
8         #把IMEI放在最前面,以便以IMEI排序
9         print '%s\t%s\t%s\t%s' % (data[2],data[0],data[1],data[3])

  mapper輸出並排序下:

$ cat aw.log |python mapper.py |sort -k1
I86733010       A       8111112 U2
I86733012       A       8111114 U4
I86733015       W       1287682 W3
I86733016       A       8111119 U9
I86733017       W       1287680 W1
I86733020       A       8111115 U5
I86733020       W       1287684 W5
I86733022       A       8111118 U8
I86733022       W       1287686 W7
I86733027       A       8111120 U10
I86733036       W       1287687 W8
I86733042       A       8111117 U7
I86733047       W       1287683 W4
I86733048       A       8111113 U3
I86733048       W       1287681 W2
I86733051       W       1287685 W6
I86733062       A       8111111 U1
I86733063       A       8111116 U6

  接下來處理reduce了,reduce從流中不斷掃描行,當遇到相同的IMEI,並且一個來自weibo.log一個來自app.log,就把這兩條記錄拼接起來,reducer.py:

 1 #!encoding=utf-8
 2 from sys import stdin
 3 
 4 wIMEI = None#記錄來自為微博的IMEI
 5 weibo = None
 6 
 7 aIMEI = None#記錄來自app的IMEI
 8 app = None
 9 
10 for line in stdin:
11     data = line.strip().split('\t',2)
12     if len(data) != 3:#過濾錯誤的數據行
13         continue
14     if data[1] == 'A':
15         aIMEI = data[0]
16         app = data[2]
17     elif data[1] == 'W':
18         wIMEI = data[0]
19         weibo = data[2]
20     if wIMEI == aIMEI and wIMEI is not None:#兩個IMEI相等時連接兩行
21         print '%s\t%s\t%s' % (wIMEI,app,weibo)
22         aIMEI = wIMEI = None#重置
23 
24 if wIMEI == aIMEI and wIMEI is not None:#最后的記錄不要忘記輸出
25     print '%s\t%s\t%s' % (wIMEI,app,weibo)

  連接map-reduce,整個等值連接結果如下:

$ cat aw.log |python mapper.py |sort -k1 | python reducer.py
I86733020       8111115 U5      1287684 W5
I86733022       8111118 U8      1287686 W7
I86733048       8111113 U3      1287681 W2

  上述的map-reduce階段依然可以用一個程序跑,但日志文件往往比較大,幾GB到幾十GB也很正常,但我們的map-reduce總是能伸縮自如,用Hadoop不怕數據量大。

 

 三、用map-reduce做矩陣的迭代運算

  我們來看最后一個例子,一個矩陣的一下輪的元素等於其周圍四面八方所有元素以及自己的之和的平均值,比如下面矩陣:

1    2    3

4    5    6

7    8    9

  下一輪矩陣為:

(1+2+4+5)/4        (1+2+3+4+5+6)/6                (2+3+5+6)/4
(1+2+4+5+7+8)/6   (1+2+3+4+5+6+7+8+9)/9    (2+3+5+6+8+9)/6
(4+5+7+8)/5 (4+5+6+7+8+9)/6 (5+6+8+9)/4

  當矩陣非常大的時候,我們需要把矩陣轉為三元組的形式<x,y,value>,這樣方便map-reduce的處理,否則,矩陣非常大,矩陣的一行就能爆內存。比如上面的矩陣轉化為三元組后變為:

0,0,1
0,1,2
0,2,3
1,0,4
1,1,5
1,2,6
2,0,7
2,1,8
2,2,9

  現在我們處理map,先理清map階段需要發射的key和value是什么,map掃描每一行得到的是矩陣的一個元素,而這個元素會參與周圍所有元素的均值計算,因此我們對每一個<x,y,value>輸出的鍵值對是(<xi,yj,>,<x,y,value>),<xi,yj,>是<x,y>周圍的坐標,map輸出后,按xi,yj順序排序,對所有key值相同的元素累加並求平均值,mapper.py:

 1 #!encoding=utf-8
 2 from sys import stdin
 3 
 4 M = 3#矩陣的行數
 5 N = 3#矩陣的列數
 6 
 7 for line in stdin:
 8     data = line.strip().split(',')
 9     x,y = map(int,data[0:2])
10     value = float(data[2])
11     for i in xrange(-1,2):
12         if x + i < 0 or x + i >= M:#超出上下邊界
13             continue
14         for j in xrange(-1,2):
15             if y + j < 0 or y + j >= N:#超出左右邊界
16                 continue
17             print '%d,%d\t%f' % (x+i,y+j,value)#為周圍的每個值計算提供貢獻

   執行mapper.py,排序時注意一下,字段分割符是制表符'\t',實際上hadoop的默認分割符就是制表符:

$ cat data.txt |python mapper.py |sort -t$'\t'

 mapper.py輸出的中間過程比較長,我們看一下部分結果:

0,0    1.000000
0,0    2.000000
0,0    4.000000
0,0    5.000000
0,1    1.000000
0,1    2.000000
0,1    3.000000
0,1    4.000000
0,1    5.000000
0,1    6.000000
0,2    2.000000
0,2    3.000000
0,2    5.000000
0,2    6.000000

  接下來寫reducer.py,在reduce階段依舊是掃描每一行,並判斷是否與上一行的key相同,如果相同就累加value,如果不同,則計算平均值,並輸出到標准輸出:

 1 from sys import stdin
 2 last = None
 3 count = 0
 4 s = 0
 5 for line in stdin:
 6     p,v = line.strip().split('\t')
 7     value = float(v)
 8     if last != p:
 9         if last:
10             print '%5s,%10f' % (last,s/count)
11         last = p
12         s = 0
13         count = 0
14     s += value
15     count += 1
16 if last:
17       print '%5s,%10f' % (last,s/count)

  連接map-reduce執行:

$ cat data.txt |python mapper.py |sort -t$'\t'|python reducer.py

  輸出如下:  

  0,0,  3.000000
  0,1,  3.500000
  0,2,  4.000000
  1,0,  4.500000
  1,1,  5.000000
  1,2,  5.500000
  2,0,  6.000000
  2,1,  6.500000
  2,2,  7.000000

  其實這里,矩陣連續迭代到max(M,N)+3次之后,矩陣的每個元素都將趨向於整個矩陣的平均值,因為經過max(M,N)+3次迭代后,最左邊的元素已經傳遞到了最右邊,最上面的元素也已經傳遞到了最下邊,作為一個單機版的測試,我寫了迭代測試:

 1 from os import system
 2 
 3 lastMatrix = 'data.txt'
 4 matrix = None
 5 for i in xrange(1,4+3):
 6     if matrix != None:
 7         lastMatrix = matrix
 8     matrix = 'data.'+str(i)
 9     print i
10     cmd = 'cat '+lastMatrix+"|python mapper.py |sort -t'\t'|python reducer.py >"+matrix
11     system(cmd)
12 
13 system('cat '+matrix)

  迭代6次后輸出: 

  0,0,  4.937500
  0,1,  4.953125
  0,2,  4.968750
  1,0,  4.984375
  1,1,  5.000000
  1,2,  5.015625
  2,0,  5.031250
  2,1,  5.046875
  2,2,  5.062500

  可以看到已經基本接近矩陣均值5了。

四、總結

  上面介紹了三個實例,用map-reduce來處理數據,可以看到,按照這種思路寫代碼,不僅思路清晰,而且擴展性強,因為背后有hadoop在支撐。需要注意的是,上面的map-reduce代碼在hadoop中運行時,應該對每一行的處理加上一個try...except,過濾掉那些異常數據,因為海量數據里總是有一些噪聲數據,如果沒有try-except,hadoop雖然在失敗時會重試,但重試多次后任然失敗,那整個任務就會失敗了。

  最后,完整的代碼在github上可以查看。

轉載請注明出處:http://www.cnblogs.com/fengfenggirl/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM