IPython並行計算工具


 

IPython並行計算工具

 


解決並行計算和分布式計算的問題

  • 運行解釋說明

    • 一直以來Python的並發問題都會被大家所詬病,正是因為全局解釋鎖的存在,導致其不能夠真正的做到並發的執行。所以,我們就需要ipyparallel的存在來幫助我們處理並發計算的問題了。
    • ipyparallel中,可以利用多個engine同時運行一個任務來加快處理的速度。集群被抽象為view,包括direct_viewbalanced_view。其中,direct_view是所有的engine的抽象,當然也可以自行指定由哪些engine構成,而balanced_view是多個engine經過負載均衡之后,抽象出來的由“單一”engine構成的view。利用ipyparallel並行化的基本思路是將要處理的數據首先進行切分,然后分布到每一個engine上,然后將最終的處理結果合並,得到最終的結果,其思路和mapreduce類似。
  • 並行計算分類

    • ipcluster - 單機並行計算
    • ipyparallel - 分布式計算
  • 相關連接地址

  • 安裝方式

 
bash
# 使用pip安裝 $ pip install ipyparallel
  • 配置並行環境
 
bash
# 命令可以簡單的創建一個通用的並行環境profile配置文件 $ ipython profile create --parallel --profile=myprofile

1. 並行計算示例

做一次wordcount的計算測試。

  • 數據來源地址
 
bash
# 使用wget下載 $ wget http://www.gutenberg.org/files/27287/27287-0.txt
  • 不並行的版本
 
python
In [1]: import re In [2]: import io In [3]: from collections import defaultdict In [4]: non_word = re.compile(r'[\W\d]+', re.UNICODE) In [5]: common_words = { ...: 'the','of','and','in','to','a','is','it','that','which','as','on','by', ...: 'be','this','with','are','from','will','at','you','not','for','no','have', ...: 'i','or','if','his','its','they','but','their','one','all','he','when', ...: 'than','so','these','them','may','see','other','was','has','an','there', ...: 'more','we','footnote', 'who', 'had', 'been', 'she', 'do', 'what', ...: 'her', 'him', 'my', 'me', 'would', 'could', 'said', 'am', 'were', 'very', ...: 'your', 'did', 'not', ...: } In [6]: def yield_words(filename): ...: import io ...: with io.open(filename, encoding='latin-1') as f: ...: for line in f: ...: for word in line.split(): ...: word = non_word.sub('', word.lower()) ...: if word and word not in common_words: ...: yield word ...: In [7]: def word_count(filename): ...: word_iterator = yield_words(filename) ...: counts = {} ...: counts = defaultdict(int) ...: while True: ...: try: ...: word = next(word_iterator) ...: except StopIteration: ...: break ...: else: ...: counts[word] += 1 ...: return counts ...: In [8]: %time counts = word_count(filename) CPU times: user 3.32 ms, sys: 1.4 ms, total: 4.72 ms Wall time: 10.9 ms
  • 用 IPython 來跑一下
 
python
# 在terminal輸入如下命令,然后在ipython中就都可使用並行計算 # 指定兩個核心來執行 [escape@loaclhost ~]$ ipcluster start -n 2
  • 先講下 IPython 並行計算的用法
 
python
# import之后才能用%px*的magic In [1]: from IPython.parallel import Client In [2]: rc = Client() # 因為我啟動了2個進程 In [3]: rc.ids Out[3]: [0, 1] # 如果不自動每句都需要: `%px xxx` In [4]: %autopx %autopx enabled # 這里沒autopx的話需要: `%px import os` In [5]: import os # 2個進程的pid In [6]: print os.getpid() [stdout:0] 62638 [stdout:1] 62636 # 在autopx下這個magic不可用 In [7]: %pxconfig --targets 1 [stderr:0] ERROR: Line magic function `%pxconfig` not found. [stderr:1] ERROR: Line magic function `%pxconfig` not found. # 再執行一次就會關閉autopx In [8]: %autopx %autopx disabled # 指定目標對象, 這樣下面執行的代碼就會只在第2個進程下運行 In [10]: %pxconfig --targets 1 # 其實就是執行一段非阻塞的代碼 In [11]: %%px --noblock ....: import time ....: time.sleep(1) ....: os.getpid() ....: Out[11]: <AsyncResult: execute> # 看只返回了第二個進程的pid In [12]: %pxresult Out[1:21]: 62636 # 使用全部的進程, ipython可以細粒度的控制那個engine執行的內容 In [13]: v = rc[:] # 每個進程都導入time模塊 In [14]: with v.sync_imports(): ....: import time ....: importing time on engine(s) In [15]: def f(x): ....: time.sleep(1) ....: return x * x ....: # 同步的執行 In [16]: v.map_sync(f, range(10)) Out[16]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # 異步的執行 In [17]: r = v.map(f, range(10)) # celery的用法 In [18]: r.ready(), r.elapsed Out[18]: (True, 5.87735) # 獲得執行的結果 In [19]: r.get() Out[19]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  • 並行版本
 
python
In [20]: def split_text(filename): ....: text = open(filename).read() ....: lines = text.splitlines() ....: nlines = len(lines) ....: n = 10 ....: block = nlines//n ....: for i in range(n): ....: chunk = lines[i*block:(i+1)*(block)] ....: with open('count_file%i.txt' % i, 'w') as f: ....: f.write('\n'.join(chunk)) ....: cwd = os.path.abspath(os.getcwd()) ....: # 不用glob是為了精准 ....: fnames = [ os.path.join(cwd, 'count_file%i.txt' % i) for i in range(n)] ....: return fnames In [21]: from IPython import parallel In [22]: rc = parallel.Client() In [23]: view = rc.load_balanced_view() In [24]: v = rc[:] In [25]: v.push(dict( ....: non_word=non_word, ....: yield_words=yield_words, ....: common_words=common_words ....: )) Out[25]: <AsyncResult: _push> In [26]: fnames = split_text(filename) In [27]: def count_parallel(): .....: pcounts = view.map(word_count, fnames) .....: counts = defaultdict(int) .....: for pcount in pcounts.get(): .....: for k, v in pcount.iteritems(): .....: counts[k] += v .....: return counts, pcounts .....: # 這個時間包含了我再聚合的時間 In [28]: %time counts, pcounts = count_parallel() # 是不是比直接運行少了很多時間 CPU times: user 50.6 ms, sys: 8.82 ms, total: 59.4 ms # 這個時間是 Wall time: 99.6 ms In [29]: pcounts.elapsed, pcounts.serial_time, pcounts.wall_time Out[29]: (0.104384, 0.13980499999999998, 0.104384)

可以看出cpu時間上確實減少了,幾乎一半,但真實時間上卻反而增加到了164ms,用%timeit 查看,發現實際使用時間反而多出了20ms這是因為cpu計算完后還要聚合結果。這個過程也得耗時,也就是說,並行是有額外開銷的。


2. 最簡單的應用

並行就是多個核心同時執行任務了,最簡單的就是執行重復任務,將函數提交到引擎中。

 
python
c = Client() a = lambda :"hi~"
 
python
# 並行計算 %time c[:].apply_sync(a) CPU times: user 22.6 ms, sys: 5.05 ms, total: 27.7 ms Wall time: 35.4 ms ['hi~', 'hi~', 'hi~', 'hi~']
 
python
# 使用列表生成器 %time [a() for i in range(2)] CPU times: user 10 µs, sys: 6 µs, total: 16 µs Wall time: 17.9 µs ['hi~', 'hi~']

看得出,cpython還是相當給力的,在這種小規模計算上並行反而比用列表生成器慢很多。


3. 直接調用 ipyparallel

我們可以通過DirectView直接在ipython中通過Client對象直接的操作多個engine

 
python
from ipyparallel import Client rc = Client() # 查看有多少個engine rc.ids [0, 1, 2, 3] # 使用全部engine dview = rc[:]
 
python
%time map(lambda x:x**2,range(32)) CPU times: user 21 µs, sys: 5 µs, total: 26 µs Wall time: 26.9 µs [0, 1, 4, 9, ..., 900, 961]
 
python
# 並行的map工具 %time dview.map_sync(lambda x:x**2,range(32)) CPU times: user 31.3 ms, sys: 5.12 ms, total: 36.4 ms Wall time: 41.4 ms [0, 1, 4, 9, ..., 900, 961]

看來還是單進程給力哇!


4. 負載均衡 view

並行的一大難題便是負載均衡,直接使用DirectView並沒有這方面優化,可以使用LoadBalancedView來使用負載均衡的view

 
python
lview = rc.load_balanced_view()
 
python
%time lview.map_sync(lambda x:x**2,range(32)) CPU times: user 230 ms, sys: 47.3 ms, total: 277 ms Wall time: 305 ms [0, 1, 4, 9, ..., 900, 961]

 文章作者: Escape
 版權聲明: 本博客所有文章除特別聲明外,均采用 CC BY 4.0 許可協議。轉載請注明來源 Escape !


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM