錯誤、調試和測試
程序運行中,可能會遇到BUG、用戶輸入異常數據以及其它環境的異常,這些都需要程序猿進行處理。Python提供了一套內置的異常處理機制,供程序猿使用,同時PDB提供了調試代碼的功能,除此之外,程序猿還應該掌握測試的編寫,確保程序的運行符合預期。
錯誤處理
在一般程序處理中,可以對函數的返回值進行檢查,是否返回了約定的錯誤碼。例如系統程序調用的錯誤碼一般都是-1
,成功返回0
。但是這種方式必須用大量的代碼來判斷是否出錯,所以高級語言內置了try...except...finally
的錯誤機制。
try:
print('try...')
r = 10 / int('2')
print('result:', r)
except ValueError as e:
print('ValueError:', e)
except ZeroDivisionError as e:
print('ZeroDivisionError:', e)
else:
print('no error!')
finally:
print('finally...')
print(‘END’)
當我們認為某些代碼可能會出錯時,就可以用try
來運行這段代碼,如果執行出錯,則后續代碼不會繼續執行,而是直接跳轉至錯誤處理代碼,即except
語句塊,執行完except
后,如果有finally
語句塊,則執行finally
語句塊,至此,執行完畢。如果發生了不同的錯誤類型,可以由不同的except語句塊處理,可以沒有finally
語句塊。
Python的錯誤也是類,所有的錯誤類型都繼承自BaseException
,常見的錯誤類型和繼承關系參考 官方文檔
使用
try...except
捕獲錯誤還有一個巨大的好處,就是可以跨越多層調用,比如函數main()調用foo(),foo()調用bar(),結果bar()出錯了,這時,只要main()捕獲到了,就可以處理。
調用堆棧
如果錯誤沒有被捕獲,它就會一直往上拋,最后被Python解釋器捕獲,打印一個錯誤信息,然后程序退出。出錯並不可怕,可怕的是不知道哪里出錯了。解讀錯誤信息是定位錯誤的關鍵。
記錄錯誤
在C語言中,如果發生錯誤想要記錄,必須自己編寫錯誤記錄的程序。Python內置的logging
模塊可以非常容易地記錄錯誤信息。通過配置,logging還可以把錯誤記錄到日志文件里,方便事后排查。
# err_logging.py
import logging
def foo(s):
return 10 / int(s)
def bar(s):
return foo(s) * 2
def main():
try:
bar('0')
except Exception as e:
logging.exception(e)
main()
print('END')
拋出錯誤
拋出錯誤,首先需要定義一個錯誤 Class,選擇好繼承關系,然后用raise
語句拋出一個錯誤實例。如果可以盡量使用Python內置的錯誤類型,僅在非常必要的時候自己定義錯誤類。
# err_raise.py
class FooError(ValueError):
pass
def foo(s):
n = int(s)
if n==0:
raise FooError('invalid value: %s' % s)
return 10 / n
foo('0')
調試 Debug
調試最簡單的辦法就是print()
,這個方法最簡單,但是在發布的時候需要把所有的調試信息注釋掉。
斷言 assert
凡是用print()
來輔助查看的地方,都可以用斷言(assert
)來替代。
def foo(s):
n = int(s)
assert n != 0, 'n is zero!'
return 10 / n
def main():
foo('0')
assert的意思是,表達式n != 0應該是True,否則,根據程序運行的邏輯,后面的代碼肯定會出錯。如果斷言失敗,assert
語句本身就會拋出AssertionError
。
啟動Python解釋器時可以用-O參數來關閉assert。
logging
使用 logging 不僅可以拋出錯誤,還可以輸出到文件。
import logging
logging.basicConfig(level=logging.INFO)
s = '0'
n = int(s)
logging.info('n = %d' % n)
print(10 / n)
這就是logging的好處,它允許你指定記錄信息的級別,有debug,info,warning,error
等幾個級別,當我們指定level=INFO
時,logging.debug
就不起作用了。同理,指定level=WARNING后,debug和info就不起作用了。這樣一來,你可以放心地輸出不同級別的信息,也不用刪除,最后統一控制輸出哪個級別的信息。
logging的另一個好處是通過簡單的配置,一條語句可以同時輸出到不同的地方,比如console和文件。
pdb
可以在命令行下使用pdb,啟動Python的調試器pdb,讓程序以單步方式運行,可以隨時查看運行狀態。
# err.py
s = '0'
n = int(s)
print(10 / n)
$ python3 -m pdb err.py
> /Users/michael/Github/learn-python3/samples/debug/err.py(2)<module>()
-> s = ‘0'
輸入1
可以查看代碼,輸入n
可以單步執行代碼。使用p
來查看變量,使用q
退出調試。
pdb.set_trace()
這個方法也是用pdb,但是不需要單步執行,我們只需要import pdb,然后,在可能出錯的地方放一個pdb.set_trace(),就可以設置一個斷點。運行代碼,程序會自動在pdb.set_trace()暫停並進入pdb調試環境,可以用命令p查看變量,或者用命令c
繼續運行。
單元測試
單元測試是用來對一個模塊、一個函數或者一個類來進行正確性檢驗的測試工作。
文檔測試
doctest非常有用,不但可以用來測試,還可以直接作為示例代碼。通過某些文檔生成工具,就可以自動把包含doctest的注釋提取出來。用戶看文檔的時候,同時也看到了doctest。
IO編程
IO就是Input / Output ,也就是輸入和輸出。IO編程中,Stream(流)是一個很重要的概念,可以把流想象成一個水管,數據就是水管里的水,但是只能單向流動。
由於計算機各個部件之間的速度不一致,所以處理IO問題時有兩種辦法:同步IO、異步IO。同步和異步的區別就在於是否等待IO執行的結果。
文件讀寫
讀寫文件是最常見的IO操作。Python內置了讀寫文件的函數,用法和C是兼容的。在磁盤上讀寫文件的功能都是由操作系統提供的,現代操作系統不允許普通的程序直接操作磁盤,所以,讀寫文件就是請求操作系統打開一個文件對象(通常稱為文件描述符),然后,通過操作系統提供的接口從這個文件對象中讀取數據(讀文件),或者把數據寫入這個文件對象(寫文件)。
讀文件
try:
f = open('/path/to/file', 'r')
print(f.read())
finally:
if f:
f.close()
with open('/path/to/file', 'r') as f:
print(f.read())
類似於c語言,open函數默認接收一個文件名、一個打開模式參數(r
、w
默認對應文本文件,rb
對應二進制文件)。默認打開的是UTF-8編碼的文件,如果需要打開其它編碼的,需要傳入encoding
參數,如果文本的編碼不一致可能導致讀取出錯,可以傳入錯誤處理參數errors
。read
方法一次將文件的所有內容讀入內存,可以通過參數指定讀入的長度read(size)
,也可以使用readline
方法每次讀入一行,使用readlines
一次讀入所有的行。文件使用后注意要進行關閉。
寫文件
>>> f = open('/Users/michael/test.txt', 'w')
>>> f.write('Hello, world!')
>>> f.close()
with open('/Users/michael/test.txt', 'w') as f:
f.write('Hello, world!’)
寫文件和讀文件是一樣的,唯一區別是調用open()函數時,傳入標識符’w’
或者’wb’
表示寫文本文件或寫二進制文件。當我們寫文件時,操作系統往往不會立刻把數據寫入磁盤,而是放到內存緩存起來,空閑的時候再慢慢寫入。只有調用close()方法時,操作系統才保證把沒有寫入的數據全部寫入磁盤。
StringIO 和 BytesIO
很多時候,數據讀寫不一定是文件,也可以在內存中讀寫。StringIO顧名思義就是在內存中讀寫str。
>>> from io import StringIO
>>> f = StringIO()
>>> f.write('hello')
5
>>> f.write(' ')
1
>>> f.write('world!')
6
>>> print(f.getvalue())
hello world!
>>> from io import StringIO
>>> f = StringIO('Hello!\nHi!\nGoodbye!')
>>> while True:
... s = f.readline()
... if s == '':
... break
... print(s.strip())
...
Hello!
Hi!
Goodbye!
StringIO操作的只能是str,如果要操作二進制數據,就需要使用BytesIO。BytesIO實現了在內存中讀寫bytes。
操作文件和目錄
Python內置的os
模塊也可以直接調用操作系統提供的接口函數。import os模塊后,就可以調用一些系統命令。
>>> import os
>>> os.name # 操作系統類型
'posix'
>>> os.uname()
posix.uname_result(sysname='Darwin', nodename='RousseaudeMacBook-Pro.local', release='15.6.0', version='Darwin Kernel Version 15.6.0: Mon Jan 9 23:07:29 PST 2017; root:xnu-3248.60.11.2.1~1/RELEASE_X86_64', machine='x86_64')
>>> os.environ
environ({'TERM_PROGRAM': 'Apple_Terminal', 'SHELL': '/bin/bash', 'TERM': 'xterm-256color', 'TMPDIR': '/var/folders/95/zrdts1md6j942mpyd7kd875h0000gn/T/', 'Apple_PubSub_Socket_Render': '/private/tmp/com.apple.launchd.fhDfjTsyk6/Render', 'TERM_PROGRAM_VERSION': '361.1', 'OLDPWD': '/Users/rousseau/Projects/python.my', 'TERM_SESSION_ID': '5A1B275C-3BE5-4673-B163-29DFF5C19C77', 'USER': 'rousseau', 'SSH_AUTH_SOCK': '/private/tmp/com.apple.launchd.mLtAPJeOFm/Listeners', '__CF_USER_TEXT_ENCODING': '0x1F5:0x0:0x0', 'PATH': '/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin', 'PWD': '/Users/rousseau/Projects/python.my/mypython', 'XPC_FLAGS': '0x0', 'XPC_SERVICE_NAME': '0', 'SHLVL': '1', 'HOME': '/Users/rousseau', 'LOGNAME': 'rousseau', 'LC_CTYPE': 'UTF-8', '_': '/usr/local/bin/python3', '__PYVENV_LAUNCHER__': '/usr/local/bin/python3’})
>>> os.path.abspath('.') # 查看當前目錄的絕對路徑:
'/Users/rousseau/Projects/python.my/mypython’
# 在某個目錄下創建一個新目錄,首先把新目錄的完整路徑表示出來:
>>> os.path.join('/Users/michael', 'testdir')
'/Users/michael/testdir'
# 然后創建一個目錄:
>>> os.mkdir('/Users/michael/testdir')
# 刪掉一個目錄:
>>> os.rmdir('/Users/michael/testdir’)
因為Windows和Unix的路徑表達方式不一樣,所以在處理路徑時,盡量使用Python提供的os.path.join()
和os.path.split()
避免處理發生問題。其它的文件處理函數os.rename
、os.remove
。
序列化
序列號我理解的就是將內存中變量的狀態和值轉換為文本,以方便進行持久化的存儲,也可能不進行存儲,但是序列話之后方便進行傳輸。我們把變量從內存中變成可存儲或傳輸的過程稱之為序列化,在Python中叫pickling,在其他語言中也被稱之為serialization,marshalling,flattening等等,都是一個意思。反過來,把變量內容從序列化的對象重新讀到內存里稱之為反序列化,即unpickling。
Python提供了pickle
模塊來實現序列化。
>>> import pickle
>>> d = dict(name='Bob', age=20, score=88)
>>> pickle.dumps(d)
b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x14X\x05\x00\x00\x00scoreq\x02KXX\x04\x00\x00\x00nameq\x03X\x03\x00\x00\x00Bobq\x04u.’
#也可以將序列化的內容寫入文本
>>> f = open('dump.txt', 'wb')
>>> pickle.dump(d, f)
>>> f.close()
#讀取的過程
>>> f = open('dump.txt', 'rb')
>>> d = pickle.load(f)
>>> f.close()
>>> d
{'age': 20, 'score': 88, 'name': ‘Bob’}
JSON
我們要在不同的編程語言之間傳遞對象,就必須把對象序列化為標准格式,比如XML,但更好的方法是序列化為JSON,因為JSON表示出來就是一個字符串,可以被所有語言讀取,也可以方便地存儲到磁盤或者通過網絡傳輸。JSON不僅是標准格式,並且比XML更快,而且可以直接在Web頁面中讀取,非常方便。
JSON表示的對象就是標准的JavaScript語言的對象,JSON和Python內置的數據類型對應如下:
JSON類型 | Python類型 |
---|---|
{} | dict |
[] | list |
“string” | str |
1234.56 | Int或Float |
true/false | True/False |
null | None |
Python內置的json模塊提供了非常完善的Python對象到JSON格式的轉換。
>>> json_str = '{"age": 20, "score": 88, "name": "Bob"}'
>>> json.loads(json_str)
{'age': 20, 'score': 88, 'name': ‘Bob’}
進程和線程
進程是程序運行的最小單位,線程是進程內部的子任務。多任務的實現模式:多進程、多線程、多進程+多線程。
多進程
Unix/Linux操作系統提供了一個fork()
系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
由於Windows環境沒有fork
調用,為了編寫具備跨平台能力的代碼,建議使用Python提供的multiprocessing模塊。
multiprocessing
multiprocessing模塊提供了一個Process類來代表一個進程對象。創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()
方法啟動,這樣創建進程比fork()還要簡單。join()
方法可以等待子進程結束后再繼續往下運行,通常用於進程間的同步。
from multiprocessing import Process
import os
# 子進程要執行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.’)
Pool
可以使用進程池的方式,創建大量的子進程。對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close()
,調用close()之后就不能繼續添加新的Process了。
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
進程間通信
Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。
from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進程創建Queue,並傳給各個子進程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動子進程pw,寫入:
pw.start()
# 啟動子進程pr,讀取:
pr.start()
# 等待pw結束:
pw.join()
# pr進程里是死循環,無法等待其結束,只能強行終止:
pr.terminate()
多線程
多任務可以由多進程完成,也可以由一個進程內的多線程完成。一個進程至少有一個線程。由於線程是操作系統直接支持的執行單元,因此,高級語言通常都內置多線程的支持,Python也不例外,並且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。
Python的標准庫提供了兩個模塊:_thread
和threading
,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading
這個高級模塊。
由於任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()
函數,它永遠返回當前線程的實例。主線程實例的名字叫MainThread
,子線程的名字在創建時指定,如果不起名字Python就自動給線程命名為Thread-1,Thread-2……
Lock
多線程和多進程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在於每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。
要解決上述問題,需要通過加鎖來解決。創建一個鎖就是通過```threading.Lock()```來實現,當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。
鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。
import time, threading
# 假定這是你的銀行存款:
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要釋放鎖:
lock.release()
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
多核CPU
Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先獲得GIL鎖,然后,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100個線程跑在100核CPU上,也只能用到1個核。
GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。
ThreadLocal
在多線程環境中,每個線程處理數據最好使用局部變量,但是需要在不同線程間傳遞參數的時候,會變的很麻煩。ThreadLocal
提供了創建與線程名稱關聯的局部變量功能能。ThreadLocal
最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。
import threading
# 創建全局ThreadLocal對象:
local_school = threading.local()
def process_student():
# 獲取當前線程關聯的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
進程 VS 線程
要實現多任務,通常我們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,多任務環境下,通常是一個Master,多個Worker。
如果用多進程實現Master-Worker,主進程就是Master,其他進程就是Worker。
如果用多線程實現Master-Worker,主線程就是Master,其他線程就是Worker。
多進程模式最大的優點就是穩定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。
多進程模式的缺點是創建進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和CPU的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。
多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在Windows上,如果一個線程執行的代碼出了問題,你經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。
計算密集型 vs. IO密集型
是否采用多任務的第二個考慮是任務的類型。我們可以把任務分為計算密集型和IO密集型。
計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。
計算密集型任務由於主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。
第二種任務的類型是IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。
IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對於IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
異步IO
考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法並行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務並發執行。
現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由於系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。
對應到Python語言,單進程的異步編程模型稱為協程。
分布式進程
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 發送任務的隊列:
task_queue = queue.Queue()
# 接收結果的隊列:
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.’)
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 創建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 處理結束:
print('worker exit.’)