Python中使用多進程來實現並行處理的方法小結


進程和線程是計算機軟件領域里很重要的概念,進程和線程有區別,也有着密切的聯系,先來辨析一下這兩個概念:

1.定義

進程是具有一定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位.
線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.

2.關系

一個線程可以創建和撤銷另一個線程;同一個進程中的多個線程之間可以並發執行.

相對進程而言,線程是一個更加接近於執行體的概念,它可以與同進程中的其他線程共享數據,但擁有自己的棧空間,擁有獨立的執行序列。

3.區別

進程和線程的主要差別在於它們是不同的操作系統資源管理方式。進程有獨立的地址空間,一個進程崩潰后,在保護模式下不會對其它進程產生影響,而線程只是一個進程中的不同執行路徑。線程有自己的堆棧和局部變量,但線程之間沒有單獨的地址空間,一個線程死掉就等於整個進程死掉,所以多進程的程序要比多線程的程序健壯,但在進程切換時,耗費資源較大,效率要差一些。但對於一些要求同時進行並且又要共享某些變量的並發操作,只能用線程,不能用進程。

1) 簡而言之,一個程序至少有一個進程,一個進程至少有一個線程.

2) 線程的划分尺度小於進程,使得多線程程序的並發性高。

3) 另外,進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。

4) 線程在執行過程中與進程還是有區別的。每個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。但是線程不能夠獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。

5) 從邏輯角度來看,多線程的意義在於一個應用程序中,有多個執行部分可以同時執行。但操作系統並沒有將多個線程看做多個獨立的應用,來實現進程的調度和管理以及資源分配。這就是進程和線程的重要區別。

4.優缺點

線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。同時,線程適合於在SMP機器上運行,而進程則可以跨機器遷移。

這篇文章主要講多進程在Python中的應用

Unix/Linux操作系統提供了一個fork()系統調用,它非常特殊。普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。

子進程永遠返回0,而父進程返回子進程的ID。這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getpid()就可以拿到父進程的ID。

python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:

1
2
3
4
5
6
7
8
9
import os
 
print ( 'Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid = = 0 :
   print ( 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else :
   print ( 'I (%s) just created a child process (%s).' % (os.getpid(), pid))

運行結果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由於Windows沒有fork調用,上面的代碼在Windows上無法運行。

有了fork調用,一個進程在接到新任務時就可以復制出一個子進程來處理新任務,常見的Apache服務器就是由父進程監聽端口,每當有新的http請求時,就fork出子進程來處理新的http請求。

multiprocessing

如果你打算編寫多進程的服務程序,Unix/linux無疑是正確的選擇。由於Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程序?

由於Python是跨平台的,自然也應該提供一個跨平台的多進程支持。multiprocessing模塊就是跨平台版本的多進程模塊。

multiprocessing模塊提供了一個Process類來代表一個進程對象,下面的例子演示了啟動一個子進程並等待其結束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os
 
# 子進程要執行的代碼
def run_proc(name):
   print ( 'Run child process %s (%s)...' % (name, os.getpid()))
 
if __name__ = = '__main__' :
   print ( 'Parent process %s.' % os.getpid())
   p = Process(target = run_proc, args = ( 'test' ,))
   print ( 'Child process will start.' )
   p.start()
   p.join()
   print ( 'Child process end.' )

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,用start()方法啟動,這樣創建進程比fork()還要簡單。

join()方法可以等待子進程結束后再繼續往下運行,通常用於進程間的同步。

Pool

如果要啟動大量的子進程,可以用進程池的方式批量創建子進程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random
 
def long_time_task(name):
   print ( 'Run task %s (%s)...' % (name, os.getpid()))
   start = time.time()
   time.sleep(random.random() * 3 )
   end = time.time()
   print ( 'Task %s runs %0.2f seconds.' % (name, (end - start)))
 
if __name__ = = '__main__' :
   print ( 'Parent process %s.' % os.getpid())
   p = Pool( 4 )
   for i in range ( 5 ):
     p.apply_async(long_time_task, args = (i,))
   print ( 'Waiting for all subprocesses done...' )
   p.close()
   p.join()
   print ( 'All subprocesses done.' )

執行結果如下:

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代碼解讀:

對Pool對象調用join()方法會等待所有子進程執行完畢,調用join()之前必須先調用close(),調用close()之后就不能繼續添加新的Process了。

請注意輸出的結果,task 0,1,2,3是立刻執行的,而task 4要等待前面某個task完成后才執行,這是因為Pool的默認大小在我的電腦上是4,因此,最多同時執行4個進程。這是Pool有意設計的限制,並不是操作系統的限制。如果改成:

1
p = Pool( 5 )

就可以同時跑5個進程。

由於Pool的默認大小是CPU的核數,如果你不幸擁有8核CPU,你要提交至少9個子進程才能看到上面的等待效果。

子進程

很多時候,子進程並不是自身,而是一個外部進程。我們創建了子進程后,還需要控制子進程的輸入和輸出。

subprocess模塊可以讓我們非常方便地啟動一個子進程,然后控制其輸入和輸出。

下面的例子演示了如何在Python代碼中運行命令nslookup www.python.org,這和命令行直接運行的效果是一樣的:

1
2
3
4
5
import subprocess
 
print ( '$ nslookup www.python.org' )
r = subprocess.call([ 'nslookup' , 'www.python.org' ])
print ( 'Exit code:' , r)

運行結果:

$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53
Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223
Exit code: 0

如果子進程還需要輸入,則可以通過communicate()方法輸入:

1
2
3
4
5
6
7
import subprocess
 
print ( '$ nslookup' )
p = subprocess.Popen([ 'nslookup' ], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
output, err = p.communicate(b 'set q=mx\npython.org\nexit\n' )
print (output.decode( 'utf-8' ))
print ( 'Exit code:' , p.returncode)

上面的代碼相當於在命令行執行命令nslookup,然后手動輸入:

set q=mx
python.org
exit

進程間通信

Process之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue、Pipes等多種方式來交換數據。

我們以Queue為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue里讀數據:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process, Queue
import os, time, random
 
# 寫數據進程執行的代碼:
def write(q):
   print ( 'Process to write: %s' % os.getpid())
   for value in [ 'A' , 'B' , 'C' ]:
     print ( 'Put %s to queue...' % value)
     q.put(value)
     time.sleep(random.random())
 
# 讀數據進程執行的代碼:
def read(q):
   print ( 'Process to read: %s' % os.getpid())
   while True :
     value = q.get( True )
     print ( 'Get %s from queue.' % value)
 
if __name__ = = '__main__' :
   # 父進程創建Queue,並傳給各個子進程:
   q = Queue()
   pw = Process(target = write, args = (q,))
   pr = Process(target = read, args = (q,))
   # 啟動子進程pw,寫入:
   pw.start()
   # 啟動子進程pr,讀取:
   pr.start()
   # 等待pw結束:
   pw.join()
   # pr進程里是死循環,無法等待其結束,只能強行終止:
   pr.terminate()

運行結果如下:

Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模塊封裝了fork()調用,使我們不需要關注fork()的細節。由於Windows沒有fork調用,因此,multiprocessing需要“模擬”出fork的效果,父進程所有Python對象都必須通過pickle序列化再傳到子進程去,所有,如果multiprocessing在Windows下調用失敗了,要先考慮是不是pickle失敗了。

小結

在Unix/Linux下,可以使用fork()調用實現多進程。

要實現跨平台的多進程,可以使用multiprocessing模塊。

進程間通信是通過Queue、Pipes等實現的。

多線程

多任務可以由多進程完成,也可以由一個進程內的多線程完成。進程是由若干線程組成的,一個進程至少有一個線程。

由於線程是操作系統直接支持的執行單元,因此,高級語言通常都內置多線程的支持,Python也不例外,並且,Python的線程是真正的Posix Thread,而不是模擬出來的線程。

Python的標准庫提供了兩個模塊:_thread 和 threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高級模塊。

啟動一個線程就是把一個函數傳入並創建Thread實例,然后調用start()開始執行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time, threading
 
# 新線程執行的代碼:
def loop():
   print ( 'thread %s is running...' % threading.current_thread().name)
   n = 0
   while n < 5 :
     n = n + 1
     print ( 'thread %s >>> %s' % (threading.current_thread().name, n))
     time.sleep( 1 )
   print ( 'thread %s ended.' % threading.current_thread().name)
 
print ( 'thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target = loop, name = 'LoopThread' )
t.start()
t.join()
print ( 'thread %s ended.' % threading.current_thread().name)
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

由於任何進程默認就會啟動一個線程,我們把該線程稱為主線程,主線程又可以啟動新的線程,Python的threading模塊有個current_thread()函數,它永遠返回當前線程的實例。主線程實例的名字叫MainThread,子線程的名字在創建時指定,我們用LoopThread命名子線程。名字僅僅在打印時用來顯示,完全沒有其他意義,如果不起名字Python就自動給線程命名為Thread-1,Thread-2……

Lock

多線程和多進程最大的不同在於,多進程中,同一個變量,各自有一份拷貝存在於每個進程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個變量都可以被任何一個線程修改,因此,線程之間共享數據最大的危險在於多個線程同時改一個變量,把內容給改亂了。

來看看多個線程同時操作一個變量怎么把內容給改亂了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time, threading
# 假定這是你的銀行存款:
balance = 0
def change_it(n):
   # 先存后取,結果應該為0:
   global balance
   balance = balance + n
   balance = balance - n
def run_thread(n):
   for i in range ( 100000 ):
     change_it(n)
t1 = threading.Thread(target = run_thread, args = ( 5 ,))
t2 = threading.Thread(target = run_thread, args = ( 8 ,))
t1.start()
t2.start()
t1.join()
t2.join()
print (balance)

我們定義了一個共享變量balance,初始值為0,並且啟動兩個線程,先存后取,理論上結果應該為0,但是,由於線程的調度是由操作系統決定的,當t1、t2交替執行時,只要循環次數足夠多,balance的結果就不一定是0了。

原因是因為高級語言的一條語句在CPU執行時是若干條語句,即使一個簡單的計算:

1
balance = balance + n

也分兩步:

  1. 計算balance + n,存入臨時變量中;
  2. 將臨時變量的值賦給balance。

也就是可以看成:

1
2
x = balance + n
balance = x

數據錯誤的原因:是因為修改balance需要多條語句,而執行這幾條語句時,線程可能中斷,從而導致多個線程把同一個對象的內容改亂了。

兩個線程同時一存一取,就可能導致余額不對,你肯定不希望你的銀行存款莫名其妙地變成了負數,所以,我們必須確保一個線程在修改balance的時候,別的線程一定不能改。

如果我們要確保balance計算正確,就要給change_it()上一把鎖,當某個線程開始執行change_it()時,我們說,該線程因為獲得了鎖,因此其他線程不能同時執行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由於鎖只有一個,無論多少線程,同一時刻最多只有一個線程持有該鎖,所以,不會造成修改的沖突。創建一個鎖就是通過threading.Lock()來實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
balance = 0
lock = threading.Lock()
 
def run_thread(n):
   for i in range ( 100000 ):
     # 先要獲取鎖:
     lock.acquire()
     try :
       # 放心地改吧:
       change_it(n)
     finally :
       # 改完了一定要釋放鎖:
       lock.release()

當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止。

獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠等待下去,成為死線程。所以我們用try...finally來確保鎖一定會被釋放。

鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行,壞處當然也很多,首先是阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個線程全部掛起,既不能執行,也無法結束,只能靠操作系統強制終止。

多核CPU

如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執行多個線程。

如果寫一個死循環的話,會出現什么情況呢?

打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監控某個進程的CPU使用率。

我們可以監控到一個死循環線程會100%占用一個CPU。如果有兩個死循環線程,在多核CPU中,可以監控到會占用200%的CPU,也就是占用兩個CPU核心。要想把N核CPU的核心全部跑滿,就必須啟動N個死循環線程。

試試用Python寫個死循環:

1
2
3
4
5
6
7
8
9
10
import threading, multiprocessing
 
def loop():
   x = 0
   while True :
     x = x ^ 1
 
for i in range (multiprocessing.cpu_count()):
   t = threading.Thread(target = loop)
   t.start()

啟動與CPU核心數量相同的N個線程,在4核CPU上可以監控到CPU占用率僅有102%,也就是僅使用了一核。

但是用C、C++或Java來改寫相同的死循環,直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?

因為Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先獲得GIL鎖,然后,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100個線程跑在100核CPU上,也只能用到1個核。

GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。

所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現,不過這樣就失去了Python簡單易用的特點。

不過,也不用過於擔心,Python雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。

多線程編程,模型復雜,容易發生沖突,必須用鎖加以隔離,同時,又要小心死鎖的發生。

Python解釋器由於設計時有GIL全局鎖,導致了多線程無法利用多核。

ThreadLocal

在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖。但是局部變量也有問題,就是在函數調用的時候,傳遞起來很麻煩:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
 
# 創建全局ThreadLocal對象:
local_school = threading.local()
 
def process_student():
   # 獲取當前線程關聯的student:
   std = local_school.student
   print ( 'Hello, %s (in %s)' % (std, threading.current_thread().name))
 
def process_thread(name):
   # 綁定ThreadLocal的student:
   local_school.student = name
   process_student()
 
t1 = threading.Thread(target = process_thread, args = ( 'Alice' ,), name = 'Thread-A' )
t2 = threading.Thread(target = process_thread, args = ( 'Bob' ,), name = 'Thread-B' )
t1.start()
t2.start()
t1.join()
t2.join()

全局變量local_school就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。

可以理解為全局變量local_school是一個dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等。

ThreadLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。

一個ThreadLocal變量雖然是全局變量,但每個線程都只能讀寫自己線程的獨立副本,互不干擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題。

進程 vs. 線程

我們介紹了多進程和多線程,這是實現多任務最常用的兩種方式。現在,我們來討論一下這兩種方式的優缺點。

首先,要實現多任務,通常我們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,多任務環境下,通常是一個Master,多個Worker。

如果用多進程實現Master-Worker,主進程就是Master,其他進程就是Worker。

如果用多線程實現Master-Worker,主線程就是Master,其他線程就是Worker。

多進程模式最大的優點就是穩定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。

多進程模式的缺點是創建進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和CPU的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。

多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在Windows上,如果一個線程執行的代碼出了問題,你經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。

在Windows下,多線程的效率比多進程要高,所以微軟的IIS服務器默認采用多線程模式。由於多線程存在穩定性的問題,IIS的穩定性就不如Apache。為了緩解這個問題,IIS和Apache現在又有多進程+多線程的混合模式,真是把問題越搞越復雜。

線程切換

無論是多進程還是多線程,只要數量一多,效率肯定上不去,為什么呢?

我們打個比方,假設你不幸正在准備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。

如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱為單任務模型,或者批處理任務模型。

假設你打算切換到多任務模型,可以先做1分鍾語文,再切換到數學作業,做1分鍾,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是一樣的了,以幼兒園小朋友的眼光來看,你就正在同時寫5科作業。

但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),然后,打開數學課本、找出圓規直尺(這叫准備新環境),才能開始做數學作業。操作系統在切換進程或者線程時也是一樣的,它需要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),然后,把新任務的執行環境准備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬盤狂響,點窗口無反應,系統處於假死狀態。

所以,多任務一旦多到一個限度,就會消耗掉系統所有的資源,結果效率急劇下降,所有任務都做不好。

計算密集型 vs. IO密集型

是否采用多任務的第二個考慮是任務的類型。我們可以把任務分為計算密集型和IO密集型。

計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。

計算密集型任務由於主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。

第二種任務的類型是IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。

IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用運行速度極快的c語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對於IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

異步IO

考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法並行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務並發執行。

現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由於系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。

對應到Python語言,單進程的異步編程模型稱為協程,有了協程的支持,就可以基於事件驅動編寫高效的多任務程序。我們會在后面討論如何編寫協程。

分布式進程

在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多台機器上,而Thread最多只能分布到同一台機器的多個CPU上。

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。

舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩台機器上。怎么用分布式進程實現?

原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。

我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import random, time, queue
from multiprocessing.managers import BaseManager
 
# 發送任務的隊列:
task_queue = queue.Queue()
# 接收結果的隊列:
result_queue = queue.Queue()
 
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
   pass
 
# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register( 'get_task_queue' , callable = lambda : task_queue)
QueueManager.register( 'get_result_queue' , callable = lambda : result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address = (' ', 5000), authkey=b' abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range ( 10 ):
   n = random.randint( 0 , 10000 )
   print ( 'Put task %d...' % n)
   task.put(n)
# 從result隊列讀取結果:
print ( 'Try get results...' )
for i in range ( 10 ):
   r = result.get(timeout = 10 )
   print ( 'Result: %s' % r)
# 關閉:
manager.shutdown()
print ( 'master exit.' )

當我們在一台機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。

然后,在另一台機器上啟動任務進程(本機上啟動也可以):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time, sys, queue
from multiprocessing.managers import BaseManager
 
# 創建類似的QueueManager:
class QueueManager(BaseManager):
   pass
 
# 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register( 'get_task_queue' )
QueueManager.register( 'get_result_queue' )
 
# 連接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print ( 'Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的完全一致:
m = QueueManager(address = (server_addr, 5000 ), authkey = b 'abc' )
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range ( 10 ):
   try :
     n = task.get(timeout = 1 )
     print ( 'run task %d * %d...' % (n, n))
     r = '%d * %d = %d' % (n, n, n * n)
     time.sleep( 1 )
     result.put(r)
   except Queue.Empty:
     print ( 'task queue is empty.' )
# 處理結束:
print ( 'worker exit.' )

任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。https://www.jb51.net/article/65112.htm

小結

Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多台機器的環境下。

注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM