問題描述:現在有n個文本文件,使用MapReduce的方法實現詞頻統計。
附上統計詞頻的關鍵代碼,首先是一個通用的MapReduce模塊:
class MapReduce:
__doc__ = '''提供map_reduce功能'''
@staticmethod
def map_reduce(i, mapper, reducer):
"""
map_reduce方法
:param i: 需要MapReduce的集合
:param mapper: 自定義mapper方法
:param reducer: 自定義reducer方法
:return: 以自定義reducer方法的返回值為元素的一個列表
"""
intermediate = [] # 存放所有的(intermediate_key, intermediate_value)
for (key, value) in i.items():
intermediate.extend(mapper(key, value))
# sorted返回一個排序好的list,因為list中的元素是一個個的tuple,key設定按照tuple中第幾個元素排序
# groupby把迭代器中相鄰的重復元素挑出來放在一起,key設定按照tuple中第幾個元素為關鍵字來挑選重復元素
# 下面的循環中groupby返回的key是intermediate_key,而group是個list,是1個或多個
# 有着相同intermediate_key的(intermediate_key, intermediate_value)
groups = {}
for key, group in itertools.groupby(sorted(intermediate, key=lambda im: im[0]), key=lambda x: x[0]):
groups[key] = [y for x, y in group]
# groups是一個字典,其key為上面說到的intermediate_key,value為所有對應intermediate_key的intermediate_value
# 組成的一個列表
return [reducer(intermediate_key, groups[intermediate_key]) for intermediate_key in groups]
然后需要針對詞頻統計這個實際問題寫好自己的mapper方法和reducer方法:
class WordCount:
__doc__ = '''詞頻統計'''
def mapper(self, input_key, input_value):
"""
詞頻統計的mapper方法
:param input_key: 文件名
:param input_value: 文本內容
:return: 以(詞,1)為元素的一個列表
"""
return [(word, 1) for word in
self.remove_punctuation(input_value.lower()).split()]
def reducer(self, intermediate_key, intermediate_value_list):
"""
詞頻統計的reducer方法
:param intermediate_key: 某個詞
:param intermediate_value_list: 出現記錄列表,如[1,1,1]
:return: (詞,詞頻)
"""
return intermediate_key, sum(intermediate_value_list)
@staticmethod
def remove_punctuation(text):
"""
去掉字符串中的標點符號
:param text: 文本
:return: 去掉標點的文本
"""
return re.sub(u"\p{P}+", "", text)
用3個文本文件進行測試:
text\a.tex:
The quick brown fox jumped over the lazy grey dogs.
text\b.txt:
That's one small step for a man, one giant leap for mankind.
text\c.txt:
Mary had a little lamb,
Its fleece was white as snow;
And everywhere that Mary went,
The lamb was sure to go.
調用如下:
filenames = ["text\\a.txt", "text\\b.txt", "text\\c.txt"]
i = {}
for filename in filenames:
f = open(filename)
i[filename] = f.read()
f.close()
wc = WordCount()
print(MapReduce.map_reduce(i, wc.mapper, wc.reducer))
輸出結果:
[('white', 1), ('little', 1), ('sure', 1), ('snow;', 1), ('went,', 1), ('as', 1), ('lamb,', 1), ('go.', 1), ('lamb', 1), ('its', 1), ('a', 1), ('was', 2), ('to', 1), ('fleece', 1), ('that', 1), ('the', 1), ('mary', 2), ('everywhere', 1), ('had', 1), ('and', 1)]
上面提出的方法只使用了最基本的MapReduce思想,所以不支持大數據量的測試,畢竟各種調度之類的內容沒有考慮到。