Python之進程與線程


PS:我們知道現代操作系統比如Mac OS X,UNIX,Linux,Windows等,都是支持“多任務”的操作系統。多任務的實現共有3種方式:多進程模式;多線程模式;多進程+多線程模式。Python既支持多進程又支持多線程,下面我們將會討論如何編寫這兩種多任務程序。

參考原文

  廖雪峰Python進程和線程

多進程

  為了讓Python程序實現多進程(multiprocessing),我們先來了解操作系統在這方面的相關知識。

 fork

  Unix/Linx操作系統提供了一個fork(系統調用,它非常特殊,不同於普通的函數(調用一次,返回一次),fork()調用一次返回兩次。這是因為操作系統自動把當前進程(父進程)復制了一份(子進程),然后分別在父進程和子進程中返回。

  在子進程中永遠返回0,而在父進程中返回子進程的ID。這樣做是因為一個父進程可以fork出很多子進程,所以父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。

  在Python中的os模塊中封裝了常見的系統調用,其中就包括了fork,可以在Python程序中輕松創建出子進程:

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

  結果:

Process (4423) start...
I (4423) just created a child process (4424).
I am child process (4424) and my parent is 4423.
注意:windows沒有fork調用。有了fork調用一個進程在接到新任務時就可以復制出一個子進程來處理新任務。

 multiprocessing

  既然Windows沒有fork調用,那怎么在Windows上用Python編寫多進程的程序?因為Python是跨平台的,其中的multiprocessing模塊就是跨平台版本的多進程模塊。

  在multiprocessing模塊中提供了一個Process類來代表一個進程對象。下面的例子演示了啟動一個子進程並等待其結束:

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

'''
Parent process 20280.
Child process will start.
Run child process test (5772)...
Child process end.
'''
Tips:用multiprocessing創建子進程時,Process類代表一個進程,只需傳入子進程需執行的函數和參數,用start方法啟動子進程,join()方法可以等待子進程結束后再往下運行(通常用於進程間的同步)。

 Pool

  上面的方法都是啟動一個子進程,但是當我們要啟動大量的子進程時,怎么辦呢?可以用進程池的方式批量創建子進程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name,os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocess done...')
    p.close()
    p.join()
    print('All subprocesses done.')

'''
Parent process 896.
Waiting for all subprocess done...
Run task 0 (9728)...
Run task 1 (22216)...
Run task 2 (20572)...
Run task 3 (6844)...
Task 0 runs 0.36 seconds.
Run task 4 (9728)...
Task 4 runs 0.43 seconds.
Task 3 runs 0.94 seconds.
Task 1 runs 1.19 seconds.
Task 2 runs 2.72 seconds.
All subprocesses done.
'''
View Code

  注意apply_async是異步的,就是說子進程執行的同時,主進程繼續向下執行。所以“Waiting for all subprocesses done...”先打印出來,close方法意味着不能再添加新的Process了。對Pool對象調用join()方法,會暫停主進程,等待所有的子進程執行完,所以“All subprocesses done.”最后打印。

Tips:task4最后執行,是因為Pool的默認大小是4(CPU的核數),所以最多執行4個進程。當然這是Pool有意設計的限制,並不是操作系統的限制,你也可以自己改變它的默認大小,就可以跑不止4個進程。

 外部子進程

  上面的子進程的代碼實現都是在主進程內部的,然而很多時候,子進程都是一個外部進程,我們需要控制子進程的輸入和輸出。

  subprocess(可以在當前程序中執行其他程序或命令)模塊可以讓我們非常方便地啟動一個外部子進程,然后控制其輸入和輸出

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit cod:', r)
'''
$ nslookup www.python.org
服務器:  ns.sc.cninfo.net
Address:  61.139.2.69

非權威應答:
名稱:    dualstack.python.map.fastly.net
Addresses:  2a04:4e42:36::223
          151.101.72.223
Aliases:  www.python.org

Exit cod: 0
'''

  上面的運行效果相當於在命令行直接輸入“nslookup www.python.org”(相當於開了一個進程)。

   如果子進程還需要通過手動輸入一些參數,那么可以通過communicate()方法輸入:

import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('gbk'))
print('Exit code:', p.returncode)

'''
$ nslookup
默認服務器:  ns.sc.cninfo.net
Address:  61.139.2.69

> > 服務器:  ns.sc.cninfo.net
Address:  61.139.2.69

python.org      MX preference = 50, mail exchanger = mail.python.org

mail.python.org internet address = 188.166.95.178
mail.python.org AAAA IPv6 address = 2a03:b0c0:2:d0::71:1
>
Exit code: 0
'''
View Code

  上面的代碼相當於在命令行直接輸入nslookup,然后手動輸入:

set q=mx
python.org
exit

 進程間通信

  Process之間肯定是需要通信的,操作系統提供了很多機制來實現進程間的通信。Python的multiprocessing模塊包裝了底層的機制,提供了Queue(隊列)、Pipes(管道)等多種方式來交換數據。下面我們就以Queue為例,在父進程中創建兩個子進程,一個往Queue里寫數據,一個從Queue中讀數據:

from multiprocessing import Process, Queue
import os, time, random

#寫數據進程執行的代碼:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

#讀數據進程執行的代碼:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    #父進程創建出Queue,並傳給各個子進程
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    #啟動子進程pw,寫入隊列
    pw.start()
    #啟動子進程pr,讀取隊列
    pr.start()
    #等待pw進程結束
    pw.join()
    #pr進程死循環,無法等待,只能強行終止;
    pr.terminate()

'''
Process to write: 8768
Put A to queue...
Process to read: 19700
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
'''
View Code

  在Linux/Unix下,multiprocessing模塊分裝了fork調用,而由於Windows沒有fork調用,因此multiprocessing要想模擬出fork的效果。父進程中的所有Python對象都必須通過pickle序列化到子進程中,因此如果multiprocessing在Windows下調用失敗了,要先考慮是不是pickle失敗了。

多線程

  前面已經說過了多個任務既可以用多進程來實現,又可以用多線程來實現。那么多線程與多進程相比,有什么優點呢?線程共享相同的內存空間,不同的線程可以讀取內存中的同一變量(每個進程都有各自獨立的空間)。線程帶來的開銷要比進程小。

  由於線程是操作系統直接支持的執行單元,因此許多高級語言都內置了多線程的支持,Python也不例外,Python中的線程是真正的Posix Thread而不是模擬出來的線程。

  要實現多線程,Python的標准庫提供了兩個模塊:_threadthreading,前者是低級模塊,后者是高級模塊,后者分裝了前者。絕大多數情況下,我們只需要使用threading這個高級的模塊。

  啟動一個線程就是把一個函數傳入並創建Thread實例,然后調用start()執行:

import time, threading

#新線程執行的代碼
def loop():
    print('thread %s is runnging...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopTread')
t.start()
t.join()
print('Thread %s ended.' % threading.current_thread().name)

'''thread MainThread is running...
thread LoopTread is runnging...
thread LoopTread >>> 1
thread LoopTread >>> 2
thread LoopTread >>> 3
thread LoopTread >>> 4
thread LoopTread >>> 5
thread LoopTread ended.
Thread MainThread ended.
'''
View Code

  由於任何進程都會默認啟動一個線程,我們就把這個線程稱為主線程,主線程又可以啟動新的線程。上面的current_thread()函數返回當前線程的實例,主線程的名字就MainThread,子線程的名字是在創建時我們指定的。

  使用多線程還是有風險的,因為在多線程所有變量被所有線程共享,此時可能會出現多個線程同時改變一個變量,導致出現錯誤。為了避免這個錯誤的出現,我們應該加鎖lock

Lock

  我們先不使用lock,來看一個錯誤的實例:

import time, threading

#假定這是你的銀行存款
balance = 0

def change_it(n):#先存后取結果應該為0
    global balance #共享變量
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for n in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

  我們啟動了連個線程,先存后取,理論上結果應該為0,但是線程對的調度也是由操作系統決定,所以,當t1 和 t2交替執行,循環次數夠多,結果就不一定是0了。因為高級語言的一條語句在CPU執行時是若干條語句。

  所以如果我們要保證balance的計算正確,就應該就上一把鎖,使該變量同一時刻只能被一個線程操作。在這里我們就可以給change_it()加上一把鎖:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()

  當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行下面的代碼,其他線程就只能等待直到或取到鎖為止。所以獲取到鎖的線程在用完后一定要釋放鎖,否則等待鎖開啟的線程,將永遠等待,所以我們用try...finally來確保鎖一定會被釋放。

Tips:鎖的壞處就是阻止了多線程的並發執行,效率大大地下降了。當不同的線程持有不同的鎖,並試圖獲取對方的鎖時,可能會造成死鎖。

  小結:多線程編程,模型復雜,容易發生沖突,必須加鎖以隔離,同時又要小心死鎖的發生。Python解釋器由於設計時有GIL全局鎖。導致了多線程無法利用多核,這就是模擬出來的並發(線程數量大於處理器數量)。

ThreadLocal

  我們已經知道多線中變量是可以共享的,在多線程的環境下,每個線程都有自己的數據。那么每一個線程應該也可以擁有自己的局部變量,線程使用自己的局部變量比使用全局變量好,因為局部變量只能自己使用,不會影響其他的線程,而使用全局變量的話則必須加鎖。

  那么具體怎么在Python中使用線程的局部變量呢?那就是使用ThreadLocal,先來看一個例子:

  

import threading

#創建全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    #獲取當前線程關聯的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))

def process_thread(name):
    #綁定當前線程關聯的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread,args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
'''
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
'''

  全局變量local_school就是一個ThreadLocal對象,每個線程對她都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。

  ThreaLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求用戶信息身份等。這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。

Tip:一個ThreadLocal變量雖然是全局變量,但每個線程都只能讀寫自己線程的獨立副本,互不干擾。ThreadLocal解決了參數在一個線程中各個函數之間互相傳遞的問題。

進程VS線程

  前面我們已經介紹了多進程多線程,這是實現多任務最常用的兩種方式。現在,我們來討論下這兩種方式的優缺點

  首先,要實現多任務,通常我們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,在多任務環境下,通常是一個Master,多個Worker。

  如果我們用多進程實現Master-Worker,主進程就是Master,其他進程就是Worker。如果用多線程實現Master-Worker,主線程就是Master,其他線程就是Worker。

  其中多進程模式最大的優點就是穩定性高,這是因為一個子進程崩潰了,不會影響主進程和其他子進程(當然主進程crash了,所有的進程就crash了,但是概率很低畢竟Master進程只負責分配任務),著名的Apache最早采用的就是多進程模式。但是多進程的缺點就是創建進程的代價大,在Unix/Linux系統下,用fork調用還行,但是在Windows下創建進程的開銷巨大。另外,操作系統能同時運行的進程也是有限的,在CPU和內存的限制下,如果有幾千個進程同時運行,那么操作系統連調度都會成問題。

  而多線程模式通常比多進程模式快一點,但也快不到哪去。而且,多線程模式致命的缺點就是因為任何一個線程crash了都可能造成整個進程crash,因為所有線程共享進程的內存。

Tips: 在Windows下,多線程的效率比多進程要高,所以微軟的IIS服務器默認采用多線程的模式。由於多線程存在穩定性問題,IIS的穩定性就不如Apache。但是現在為了平衡,IIS和Apache現在又有了多進程+多線程的混合模式。

什么時候采用多任務呢?

  我們需要考慮任務的類型,我們可以把任務分為計算密集型IO密集型

  顧名思義,計算密集型任務的特點就是要進行大量的計算消耗大量的CPU資源,如計算圓周率,對視頻進行高清解碼等,全靠CPU的運算能力。這種計算密集型任務最好不要用多任務完成,因為這樣會切換很多次才能執行完,切換任務花費的時間就很長了,就會導致CPU的效率低下。

Tips:由於計算密集型任務主要消耗CPU資源,因此代碼運行效率就非常重要了。因為Python這樣的腳本語言運行效率很低,所以對於計算密集型任務,最好用C語言編寫。

  再來說IO密集型任務,涉及到網絡磁盤的IO任務都是IO密集型任務,特點是CPU消耗很少,任務的大部分時間都在等待IO操作的完成。對於IO密集型任務,任務越多,CPU效率越高(但還是有一個限度)。

Tips:常見的大部分任務都是IO密集型任務,比如Web應用,對於IO密集型任務,最適合的語言就是開發效率最高(代碼量最少)的語言,所以腳本語言是首選,C語言最差。

異步IO

  考慮到CPU和IO之間巨大的速度差異,單進程單線程模式會導致別的任務無法執行,因此我們才需要多進程或多線程的模型來支持多任務並發。異步文件IO方式中,線程發送一個IO請求到內核,然后繼續處理其他的事情,內核完成IO請求后,將會通知線程IO操作完成了

  如果充分利用操作系統提供的異步IO支持,就可以利用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型。使用異步IO編程模型來實現多任務是一個主要的趨勢。

  在Python中,單進程單線程異步編程模型稱為協程,有了協程的支持就可以基於事件驅動編寫高效多任務程序了。

分布式進程

  Process可以分布到多台機器上,而Thread最多只能分布到同一台機器上上的多個CPU中。

  我們已經知道Python的multiprocessing模塊支持多進程,其中的managers子模塊還支持把多進程分布到多台機器上。

例子:如果我們已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把服務進程處理任務的進程分布兩台機器上。怎么用分布式進程實現?

  我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:

import random, time, queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = queue.Queue()
# 接收結果的隊列:
result_queue = queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10) #暫停10秒等待分布式進程處理結果並返回
    print('Result: %s' % r)
# 關閉:
manager.shutdown()
print('master exit.')

  接着在另一台機器上啟動任務進程也可以是本機:

# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連接到服務器,也就是運行task_master.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與task_master.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

   注意:先啟動master進程,完成兩個隊列的網上注冊,接着發出請求隊列task,等待result隊列的結果;此時啟動worker進程對task隊列進行操作,然后寫入到result隊列中;master得到響應結果,打印出result

 

Tips:Python的分布式進程的接口簡單封裝良好,適合需要把繁重任務分布到多台機器的環境下。注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM