python基礎-線程和進程


一、多進程

  1、multiprocessing:提供跨平台的多進程支持

  2、Pool:進程池.

  3、進程間通信:multiprocessing.Queue;multiprocessing.Pipes

二、多線程

  1、Lock:線程鎖.

  2、多核CPU

  3、ThreadLocal:每個線程使用自己的局部變量.提高性能.

三、線程 vs 進程

  1、線程切換:效率,性能問題.

  2、計算密集型 vs. IO密集型

  3、異步IO:協程.

四、分布式進程

------------------

一、多進程

  線程是最小的執行單元,而進程由至少一個線程組成。

  python中os模塊中的fork()可以復制當前進程,並且子進程返回0,父進程返回子進程的PID。那么子進程可以用getppid()得到父進程的PID :

# multiprocessing.py
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()      #windows系統下沒有fork()調用,so,只能在posix的系統下執行(Unix,Linux,Mac,BSD...) if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

  常見的Apache服務就是由父進程監聽端口,然后fork()出子進程來處理http請求.

  1、multiprocessing:提供跨平台的多進程支持

    Python中有multiprocessing模塊提供跨平台的多進程支持:

from multiprocessing import Process
import os

# 子進程要執行的代碼
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()    #啟動進程
    p.join()     #等待子進程結束后再繼續往下運行,通常用於進程間的同步。 print 'Process end.'

  2、Pool :進程池.

    要啟動大量的子進程,可以用進程池的方式批量創建子進程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print 'Run task %s (%s)...' % (name,os.getpid())
    start = time.time()
    time.sleep(random.random() * 3)  #隨機暫停
    end = time.time()
    print 'Task %s runs %0.2f seconds.' % (name,(end-start))

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool()      #創建一個進程池. for i in range(5):
        p.apply_async(long_time_task,args=(i,))  #將要執行的函數加入進程池 print 'Waiting for all subprocesses done....'
    p.close()
    p.join()    #等待所有子進程執行完畢,之前必須先調用close(),然后不能添加新的Process.
    print 'All subprocessesdone.'
Parent process 1356.
Waiting for all subprocesses done....
Run task 3 (6296)...
Task 3 runs 0.52 seconds.
Run task 2 (4244)...
Task 2 runs 1.58 seconds.
Run task 0 (5720)...
Task 0 runs 1.48 seconds.
Run task 1 (7692)...
Task 1 runs 2.55 seconds.
Run task 4 (1860)...
Task 4 runs 2.96 seconds.
All subprocessesdone.
[Finished in 3.3s]

  Pool的默認大小是CPU的核數,當然,你也可以改變:

p = Pool(5)

  3、進程間通信:

    multiprocessing模塊包裝了底層的機制,提供了QueuePipes等多種方式來交換數據,以Queue為例:參考:這里

from multiprocessing import Process,Queue
import os, time, random

#寫數據進程執行的代碼:
def write(q):
    for value in ['A','B','C']:
        print 'Put %s to queue...'  % value
        q.put(value)
    time.sleep(random.random())

#讀數據進程執行的代碼:
def read(q):
    while True:
        if q.empty():
            return
        else:
            value = q.get()
            print value,'from Queue'

if __name__=='__main__':
    #父進程創建Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    #啟動子進程pw,寫入:
    pw.start()
    #啟動子進程pr,讀取:
    pr.start()
    #等待pw,pr結束:    
    pw.join()
    pr.terminate()

二、多線程:

   任務可以由多進程完成,也可以由一個進程內的多線程完成.

   python 提供threadtheading兩個線程模塊,thread較底層,threading較方便.大多數情況下使用theading就足夠了.

#theading_test
import time, threading

# 新線程執行的代碼:
def loop():
    print 'thread %s is running...' % threading.current_thread().name #返回當前線程的實例
    n = 0
    while n < 5:
        n = n + 1
        print 'thread %s >>> %s' % (threading.current_thread().name, n)
        time.sleep(1)
    print 'thread %s ended.' % threading.current_thread().name

print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')  #在創建線程時指定名字LoopThread,否則默認為Thread-1,Thread-2....
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name

  1、Lock

    多進程中,每個進程都有變量的拷貝,而多線程中,共享進程中的變量,所以需要lock來限定變量的修改.

     Threading.Lock()可以實現同一時間只能有一個線程擁有鎖,然后可以修改變量,因為鎖只有一把,其他線程則需要等待該線程釋放鎖后才能進行操作:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要獲取鎖:
        lock.acquire()   #當多個線程同時執行lock.acquire()時,只有一個線程能成功地獲取鎖,然后繼續執行代碼,其他線程就繼續等待直到獲得鎖為止 try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要釋放鎖:
            lock.release()

    鎖的好處就是確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行.

    但是同時也阻止了多線程並發執行,

    其次,由於可以存在多個鎖,操作不當可能導致多個線程全部掛起(pause),只能又操作系統強制終止.

  2、多核CPU:

    如果你不幸擁有一個多核CPU,你肯定在想,多核應該可以同時執行多個線程。

    如果寫一個死循環的話,會出現什么情況呢?

    打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監控某個進程的CPU使用率。

    我們可以監控到一個死循環線程會100%占用一個CPU。

    如果有兩個死循環線程,在多核CPU中,可以監控到會占用200%的CPU,也就是占用兩個CPU核心。

    要想把N核CPU的核心全部跑滿,就必須啟動N個死循環線程。

    試試用Python寫個死循環:

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

    啟動與CPU核心數量相同的N個線程,在4核CPU上可以監控到CPU占用率僅有160%,也就是使用不到兩核。

    即使啟動100個線程,使用率也就170%左右,仍然不到兩核。

    但是用C、C++或Java來改寫相同的死循環,直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?

    因為Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先獲得GIL鎖,然后,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100個線程跑在100核CPU上,也只能用到1個核。

    GIL是Python解釋器設計的歷史遺留問題,通常我們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。

    所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現,不過這樣就失去了Python簡單易用的特點。

    不過,也不用過於擔心,Python雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。

  3、ThreadLocal:

    每個線程使用自己的局部變量.提高性能.

import threading

# 創建全局ThreadLocal對象:
local_school = threading.local()

def process_student():
    print 'Hello, %s (in %s)' % (local_school.student, threading.current_thread().name)

def process_thread(name):
    # 綁定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)
[Finished in 0.1s]

    全局變量local_school就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。

 三、進程 vs. 線程:

    多進程和多線程,是實現多任務最常用的兩種方式。

    首先,要實現多任務,通常會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,多任務環境下,通常是一個Master,多個Worker

    如果用多進程實現Master-Worker,主進程就是Master,其他進程就是Worker

    如果用多線程實現Master-Worker,主線程就是Master,其他線程就是Worker

    多進程模式最大的優點就是穩定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。

    多進程模式的缺點是創建進程的代價大,在Unix/Linux系統下,用fork調用還行,在Windows下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和CPU的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。

    多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在Windows上,如果一個線程執行的代碼出了問題,經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。

    在Windows下,多線程的效率比多進程要高,所以微軟的IIS服務器默認采用多線程模式。由於多線程存在穩定性的問題,IIS的穩定性就不如Apache。為了緩解這個問題,IISApache現在又有多進程+多線程的混合模式。

  1、線程切換:

    無論是多進程還是多線程,只要數量一多,效率肯定上不去,為什么?

    打個比方,假設你不幸正在准備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。

    如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱為單任務模型,或者批處理任務模型。

    假設你打算切換到多任務模型,可以先做1分鍾語文,再切換到數學作業,做1分鍾,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是一樣的了,以幼兒園小朋友的眼光來看,你就正在同時寫5科作業。

    但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),然后,打開數學課本、找出圓規直尺(這叫准備新環境),才能開始做數學作業。操作系統在切換進程或者線程時也是一樣的,它需要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),然后,把新任務的執行環境准備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬盤狂響,點窗口無反應,系統處於假死狀態。

    所以,多任務一旦多到一個限度,就會消耗掉系統所有的資源,結果效率急劇下降,所有任務都做不好。

  2、計算密集型 vs. IO密集型:

    是否采用多任務的第二個考慮是任務的類型。可以把任務分為計算密集型和IO密集型。

    計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。

    計算密集型任務由於主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。

    第二種任務的類型是IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。

    IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對於IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

  3、異步IO:

    考慮到CPUIO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法並行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務並發執行。

    現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由於系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。

    對應到Python語言,單進程的異步編程模型稱為協程,有了協程的支持,就可以基於事件驅動編寫高效的多任務程序

四、分布式進程:

  在ThreadProcess中,應當優選Process,因為Process更穩定,而且,Process可以分布到多台機器上,而Thread最多只能分布到同一台機器的多個CPU上。

  Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多台機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由於managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。

  舉個例子:如果已經有一個通過Queue通信的多進程程序在同一台機器上運行,現在,由於處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩台機器上。怎么用分布式進程實現?

  原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。

  先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:

# taskmanager.py

import random, time, Queue
from multiprocessing.managers import BaseManager

# 發送任務的隊列:
task_queue = Queue.Queue()
# 接收結果的隊列:
result_queue = Queue.Queue()

# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設置驗證碼'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 啟動Queue:
manager.start()
# 獲得通過網絡訪問的Queue對象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個任務進去:
for i in range(10):
    n = random.randint(0, 10000)
    print('Put task %d...' % n)
    task.put(n)
# 從result隊列讀取結果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 關閉:
manager.shutdown()

    請注意,當在一台機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。

    然后,在另一台機器上啟動任務進程(本機上啟動也可以):

# taskworker.py

import time, sys, Queue
from multiprocessing.managers import BaseManager

# 創建類似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由於這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 連接到服務器,也就是運行taskmanager.py的機器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗證碼注意保持與taskmanager.py設置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 從網絡連接:
m.connect()
# 獲取Queue的對象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊列取任務,並把結果寫入result隊列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 處理結束:
print('worker exit.')

    任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。

    現在,可以試試分布式進程的工作效果了。先啟動taskmanager.py服務進程:

$ python taskmanager.py 
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...

    taskmanager進程發送完任務后,開始等待result隊列的結果。現在啟動taskworker.py進程:

$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

     taskworker進程結束,在taskmanager進程中會繼續打印出結果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

    Queue對象存儲在哪?注意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中。

    而Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由於QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如get_task_queue

    authkey有什么用?這是為了保證兩台機器正常通信,不被其他機器惡意干擾。如果taskworker.pyauthkeytaskmanager.pyauthkey不一致,肯定連接不上。

    分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多台機器的環境下。

    注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM