ipyparallel WordCount實現


          i pyparallel 之中,可以利用多個engine同時運行一個任務來加快處理的速度。在ipyparallel之中,集群被抽象為view,包括direct_view和balanced_view。其中,direct_view是所有的engine的抽象,當然也可以自行指定由哪些engine構成,而balanced_view是多個engine經過負載均衡之后,抽象出來的由“單一”engine構成的view。利用ipyparallel並行化的基本思路是將要處理的數據首先進行切分,然后分布到每一個engine上,然后將最終的處理結果合並,得到最終的結果,其思路和mapreduce類似。
         下面是一個ipyparallel的並行化wordcount實現,主要思路是:首先讀取文件中的句子。利用dview的scatter方法將所有的句子切分成n塊發送到每一個engine上,正好每一個engine一個。然后在每一個engine上對切分之后的句子統計詞頻,最后歸並所有engine處理之后的結果。
#!/usr/bin/env python
# coding: utf-8

import time

from itertools import repeat
from ipyparallel import Client, Reference
from urllib import urlretrieve
#對text進行wordcount處理
def wordfreq(text):
    """Return a dictionary of words and word counts in a string."""
    freqs = {}
    for word in text.split():
        lword = word.lower()
        freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#輸出詞頻前n個的單詞以及其出現的次數
def print_wordfreq(freqs, n=10):
    """Print the n most common words and counts in the freqs dict."""

    words, counts = freqs.keys(), freqs.values()
    items = zip(counts, words)
    items.sort(reverse=True)
    for (count, word) in items[:n]:
        print(word, count)

#自行實現的並行版本的word_freq,對若干行句子進行處理,返回詞,出現次數 鍵值對
def myword_freq(texts):
    freqs = {}
    for str in texts:
        for word in str.split():
            lword = word.lower()
            freqs[lword] = freqs.get(lword, 0) + 1
    return freqs
#自行實現的並行版本的wordfreq,首先將texts[]分散傳送至每一個engine,然后在每一個engine上執行程序myword_freq,返回求出的詞 詞頻鍵值對
def myPwordfreq(view,lines):
    #將文本平均分布在每一個engine上
    view.scatter('texts',lines,flatten=True)
    ar=view.apply(myword_freq,Reference('texts'))
    freqs_list=ar.get()
    #歸並最終的處理結果 reduce it!
    word_set=set()
    for f in freqs_list:
        word_set.update(f.keys())
    freqs=dict(zip(word_set,repeat(0)))
    for f in freqs_list:
        for word,count in f.items():
            freqs[word]+=count
    return freqs

if __name__ == '__main__':
    # Create a Client and View
    rc = Client()

    dview = rc[:]
    # Run the serial version
    print("Serial word frequency count:")
    text = open('lines.txt').read()
    tic = time.time()
    freqs = wordfreq(text)
    toc = time.time()
    print_wordfreq(freqs, 10)
    print("Took %.3f s to calculate"%(toc-tic))
    # The parallel version
    print("\nParallel word frequency count:")
    lines=text.splitlines()
    tic=time.time()
    pfreqs=myPwordfreq(dview,lines)
    toc=time.time()
    print_wordfreq(pfreqs)
    print("Took %.3f s to calculate"%(toc-tic))

 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM