Python+MapReduce實現矩陣相乘


算法原理

map階段

在map階段,需要做的是進行數據准備。把來自矩陣A的元素aij,標識成p條<key, value>的形式,key="i,k",(其中k=1,2,...,p),value="a:j,aij";把來自矩陣B的元素bij,標識成m條<key, value>形式,key="k,j"(其中k=1,2,...,m),value="b:i,bij"。

經過處理,用於計算cij需要的a、b就轉變為有相同key("i,j")的數據對,通過value中"a:"、"b:"能區分元素是來自矩陣A還是矩陣B,以及具體的位置(在矩陣A的第幾列,在矩陣B的第幾行)。

shuffle階段  

這個階段是Hadoop自動完成的階段,具有相同key的value被分到同一個Iterable中,形成<key,Iterable(value)>對,再傳遞給reduce。

reduce階段

通過map數據預處理和shuffle數據分組兩個階段,reduce階段只需要知道兩件事就行:

<key,Iterable(value)>對經過計算得到的是矩陣C的哪個元素?因為map階段對數據的處理,key(i,j)中的數據對,就是其在矩陣C中的位置,第i行j列。
Iterable中的每個value來自於矩陣A和矩陣B的哪個位置?這個也在map階段進行了標記,對於value(x:y,z),只需要找到y相同的來自不同矩陣(即x分別為a和b)的兩個元素,取z相乘,然后加和即可。

過程如下圖所示:

算法實現

mapper.py

 1 #!/usr/bin/env python3
 2 import sys
 3 
 4 flag = 0       # 0表示輸入A、B矩陣信息,1表示處理A矩陣,2表示處理B矩陣
 5 row_a, col_a, row_b, col_b = 0, 0, 0, 0  # A、B矩陣shape
 6 current_row = 1  # 記錄現在處理矩陣的第幾行
 7 
 8 
 9 def read_input():
10     for lines in sys.stdin:
11         yield lines
12 
13 
14 if __name__ == '__main__':
15     for line in read_input():
16         if line.count('\n') == len(line):    # 去空行
17             pass
18         data = line.strip().split('\t')
19 
20         if flag == 0:
21             flag = 1
22             row_a = int(data[0])
23             col_a = int(data[1])
24             row_b = int(data[2])
25             col_b = int(data[3])
26             if row_a == 0 or row_b == 0 or col_a == 0 or col_b ==0 or col_a != row_b:
27                 print("矩陣輸入錯誤!")
28                 break
29 
30         elif flag == 1:
31             for i in range(col_b):
32                 for j in range(col_a):
33                     print("%s,%s\tA:%s,%s" % (current_row, i+1, j+1, data[j]))
34             current_row += 1
35             if current_row > row_a:
36                 flag = 2
37                 current_row = 1
38 
39         elif flag == 2:
40             for i in range(row_a):
41                 for j in range(col_b):
42                     print("%s,%s\tB:%s,%s" % (i+1, j+1, current_row, data[j]))
43             current_row += 1

reducer.py

這是我一開始所寫的版本。

 1 #!/usr/bin/env python3
 2 import sys
 3 
 4 
 5 last, now = None, None
 6 s = 0.0
 7 count = 0
 8 matrix_a, matrix_b = {}, {}
 9 
10 
11 def read_input():
12     for lines in sys.stdin:
13         yield lines
14 
15 
16 if __name__ == '__main__':
17     for line in read_input():
18         if line.count('\n') == len(line):    # 去空行
19             pass
20         data = line.strip().split('\t')
21         now = data[0]
22         if last is None:
23             last = now
24             count = 0
25         elif last != now:
26             for key in matrix_a:
27                 s += float(matrix_a[key])*float(matrix_b[key])
28             print("%s\t%s" % (last, s))
29             s = 0.0
30             count = 0
31             last = now
32 
33         value1 = data[1][0]
34         value2 = data[1].split(':')[1].split(',')[0]
35         value3 = data[1].split(',')[1]
36         if value1 == 'A':
37             count += 1
38             matrix_a[value2] = value3
39         else:
40             matrix_b[value2] = value3
41 
42     for key in matrix_a:
43         s += float(matrix_a[key])*float(matrix_b[key])
44     print("%s\t%s" % (last, s))

 后來借鑒參考了別人的代碼后,學習了groupby,下面的代碼就簡潔多了。

 1 #!/usr/bin/env python3
 2 import sys
 3 from itertools import groupby
 4 from operator import itemgetter
 5 
 6 
 7 def read_input(splitstr):
 8     for line in sys.stdin:
 9         line = line.strip()
10         if len(line) == 0:
11             continue
12         yield line.split(splitstr)
13 
14 
15 if __name__ == '__main__':
16     data = read_input('\t')
17     lstg = (groupby(data, itemgetter(0)))
18     try:
19         for flag, group in lstg:
20             matrix_a, matrix_b = {}, {}
21             total = 0.0
22             for element, g in group:
23                 matrix = g.split(':')[0]
24                 pos = g.split(':')[1].split(',')[0]
25                 value = g.split(',')[1]
26                 if matrix == 'A':
27                     matrix_a[pos] = value
28                 else:
29                     matrix_b[pos] = value
30             for key in matrix_a:
31                 total += float(matrix_a[key]) * float(matrix_b[key])
32             print("%s\t%s" % (flag, total))
33     except Exception:
34         pass

 

算法運行

執行結果為:

 

 

參考:

[1] 用MapReduce實現矩陣乘法

[2] python版mapreduce矩陣相乘

[3] MapReduce實現矩陣乘法


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM