i
pyparallel 之中,可以利用多個engine同時運行一個任務來加快處理的速度。在ipyparallel之中,集群被抽象為view,包括direct_view和balanced_view。其中,direct_view是所有的engine的抽象,當然也可以自行指定由哪些engine構成,而balanced_view是多個engine經過負載均衡之后,抽象出來的由“單一”engine構成的view。利用ipyparallel並行化的基本思路是將要處理的數據首先進行切分,然后分布到每一個engine上,然后將最終的處理結果合並,得到最終的結果,其思路和mapreduce類似。
下面是一個ipyparallel的並行化wordcount實現,主要思路是:首先讀取文件中的句子。利用dview的scatter方法將所有的句子切分成n塊發送到每一個engine上,正好每一個engine一個。然后在每一個engine上對切分之后的句子統計詞頻,最后歸並所有engine處理之后的結果。
#!/usr/bin/env python # coding: utf-8 import time from itertools import repeat from ipyparallel import Client, Reference from urllib import urlretrieve #對text進行wordcount處理 def wordfreq(text): """Return a dictionary of words and word counts in a string.""" freqs = {} for word in text.split(): lword = word.lower() freqs[lword] = freqs.get(lword, 0) + 1 return freqs #輸出詞頻前n個的單詞以及其出現的次數 def print_wordfreq(freqs, n=10): """Print the n most common words and counts in the freqs dict.""" words, counts = freqs.keys(), freqs.values() items = zip(counts, words) items.sort(reverse=True) for (count, word) in items[:n]: print(word, count) #自行實現的並行版本的word_freq,對若干行句子進行處理,返回詞,出現次數 鍵值對 def myword_freq(texts): freqs = {} for str in texts: for word in str.split(): lword = word.lower() freqs[lword] = freqs.get(lword, 0) + 1 return freqs #自行實現的並行版本的wordfreq,首先將texts[]分散傳送至每一個engine,然后在每一個engine上執行程序myword_freq,返回求出的詞 詞頻鍵值對 def myPwordfreq(view,lines): #將文本平均分布在每一個engine上 view.scatter('texts',lines,flatten=True) ar=view.apply(myword_freq,Reference('texts')) freqs_list=ar.get() #歸並最終的處理結果 reduce it! word_set=set() for f in freqs_list: word_set.update(f.keys()) freqs=dict(zip(word_set,repeat(0))) for f in freqs_list: for word,count in f.items(): freqs[word]+=count return freqs if __name__ == '__main__': # Create a Client and View rc = Client() dview = rc[:] # Run the serial version print("Serial word frequency count:") text = open('lines.txt').read() tic = time.time() freqs = wordfreq(text) toc = time.time() print_wordfreq(freqs, 10) print("Took %.3f s to calculate"%(toc-tic)) # The parallel version print("\nParallel word frequency count:") lines=text.splitlines() tic=time.time() pfreqs=myPwordfreq(dview,lines) toc=time.time() print_wordfreq(pfreqs) print("Took %.3f s to calculate"%(toc-tic))
