算法原理
map階段
在map階段,需要做的是進行數據准備。把來自矩陣A的元素aij,標識成p條<key, value>的形式,key="i,k",(其中k=1,2,...,p),value="a:j,aij";把來自矩陣B的元素bij,標識成m條<key, value>形式,key="k,j"(其中k=1,2,...,m),value="b:i,bij"。
經過處理,用於計算cij需要的a、b就轉變為有相同key("i,j")的數據對,通過value中"a:"、"b:"能區分元素是來自矩陣A還是矩陣B,以及具體的位置(在矩陣A的第幾列,在矩陣B的第幾行)。
shuffle階段
這個階段是Hadoop自動完成的階段,具有相同key的value被分到同一個Iterable中,形成<key,Iterable(value)>對,再傳遞給reduce。
reduce階段
通過map數據預處理和shuffle數據分組兩個階段,reduce階段只需要知道兩件事就行:
<key,Iterable(value)>對經過計算得到的是矩陣C的哪個元素?因為map階段對數據的處理,key(i,j)中的數據對,就是其在矩陣C中的位置,第i行j列。
Iterable中的每個value來自於矩陣A和矩陣B的哪個位置?這個也在map階段進行了標記,對於value(x:y,z),只需要找到y相同的來自不同矩陣(即x分別為a和b)的兩個元素,取z相乘,然后加和即可。
過程如下圖所示:
算法實現
mapper.py
1 #!/usr/bin/env python3 2 import sys 3 4 flag = 0 # 0表示輸入A、B矩陣信息,1表示處理A矩陣,2表示處理B矩陣 5 row_a, col_a, row_b, col_b = 0, 0, 0, 0 # A、B矩陣shape 6 current_row = 1 # 記錄現在處理矩陣的第幾行 7 8 9 def read_input(): 10 for lines in sys.stdin: 11 yield lines 12 13 14 if __name__ == '__main__': 15 for line in read_input(): 16 if line.count('\n') == len(line): # 去空行 17 pass 18 data = line.strip().split('\t') 19 20 if flag == 0: 21 flag = 1 22 row_a = int(data[0]) 23 col_a = int(data[1]) 24 row_b = int(data[2]) 25 col_b = int(data[3]) 26 if row_a == 0 or row_b == 0 or col_a == 0 or col_b ==0 or col_a != row_b: 27 print("矩陣輸入錯誤!") 28 break 29 30 elif flag == 1: 31 for i in range(col_b): 32 for j in range(col_a): 33 print("%s,%s\tA:%s,%s" % (current_row, i+1, j+1, data[j])) 34 current_row += 1 35 if current_row > row_a: 36 flag = 2 37 current_row = 1 38 39 elif flag == 2: 40 for i in range(row_a): 41 for j in range(col_b): 42 print("%s,%s\tB:%s,%s" % (i+1, j+1, current_row, data[j])) 43 current_row += 1
reducer.py
這是我一開始所寫的版本。
1 #!/usr/bin/env python3 2 import sys 3 4 5 last, now = None, None 6 s = 0.0 7 count = 0 8 matrix_a, matrix_b = {}, {} 9 10 11 def read_input(): 12 for lines in sys.stdin: 13 yield lines 14 15 16 if __name__ == '__main__': 17 for line in read_input(): 18 if line.count('\n') == len(line): # 去空行 19 pass 20 data = line.strip().split('\t') 21 now = data[0] 22 if last is None: 23 last = now 24 count = 0 25 elif last != now: 26 for key in matrix_a: 27 s += float(matrix_a[key])*float(matrix_b[key]) 28 print("%s\t%s" % (last, s)) 29 s = 0.0 30 count = 0 31 last = now 32 33 value1 = data[1][0] 34 value2 = data[1].split(':')[1].split(',')[0] 35 value3 = data[1].split(',')[1] 36 if value1 == 'A': 37 count += 1 38 matrix_a[value2] = value3 39 else: 40 matrix_b[value2] = value3 41 42 for key in matrix_a: 43 s += float(matrix_a[key])*float(matrix_b[key]) 44 print("%s\t%s" % (last, s))
后來借鑒參考了別人的代碼后,學習了groupby,下面的代碼就簡潔多了。
1 #!/usr/bin/env python3 2 import sys 3 from itertools import groupby 4 from operator import itemgetter 5 6 7 def read_input(splitstr): 8 for line in sys.stdin: 9 line = line.strip() 10 if len(line) == 0: 11 continue 12 yield line.split(splitstr) 13 14 15 if __name__ == '__main__': 16 data = read_input('\t') 17 lstg = (groupby(data, itemgetter(0))) 18 try: 19 for flag, group in lstg: 20 matrix_a, matrix_b = {}, {} 21 total = 0.0 22 for element, g in group: 23 matrix = g.split(':')[0] 24 pos = g.split(':')[1].split(',')[0] 25 value = g.split(',')[1] 26 if matrix == 'A': 27 matrix_a[pos] = value 28 else: 29 matrix_b[pos] = value 30 for key in matrix_a: 31 total += float(matrix_a[key]) * float(matrix_b[key]) 32 print("%s\t%s" % (flag, total)) 33 except Exception: 34 pass
算法運行
執行結果為:
參考:
[1] 用MapReduce實現矩陣乘法
[3] MapReduce實現矩陣乘法