今天,想談一下Python中的進程和線程。
最近在學習Django的時候,涉及到了多進程和多線程的知識點,所以想着一下把Python中的這塊知識進行總結,所以系統地學習了一遍,將知識梳理如下。
1. 進程和線程的關系
既然談論到進程和線程,當然要老生常談一個問題,那就是什么是進程,什么又是線程呢?
用最簡單的話解釋就是一台電腦能同時運行多個QQ就是進程,每個QQ你打開不同窗口聊天,發圖片,發視頻就是線程。再比如Linux系統中我們通過ps -ef查看所有進程,每個進程都有一個pid,且唯一,其中pid=1的進程是用來回收所有孤兒進程的,一旦kill掉,系統就關閉了。
我們看一下定義:
- 進程是系統進行資源分配和調度的一個獨立單位。
- 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧),但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源。
而它們有以下4點主要區別:
- 一個程序至少有一個進程,一個進程至少有一個線程。
- 線程的划分尺度小於進程(資源比進程少),使得多線程程序的並發性高。
- 進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率。
- 線線程不能夠獨立執行,必須依存在進程中。
線程執行開銷小,但不利於資源的管理和保護,而進程正相反。而且同一進程多個線程之間是數據共享的,多個進程之間數據相互獨立,但是可以采取一些方式進行信息交互(比如隊列)。
2. Python中的進程
用過Java的人都知道,Java每個JVM都是一個進程,在其中通過多線程進行開發,我們有兩種方式創建線程——繼承Thread類或者實現Runnable接口,想學習Java多線程詳見:https://www.cnblogs.com/wxd0108/p/5479442.html
當然Java也提供了多進程方式,一般是通過Runtime,詳見:https://www.cnblogs.com/xing901022/p/5568419.html
說的遠了,我們這里談的Python中的多進程和多線程,首先我們說一下多進程。
2.1 多進程創建方式
可以歸納為三種:fork,multiprocessing以及進程池Pool。
(1) fork方式
Python的os模塊封裝了常見的系統調用,其中就包括fork,可以在Python程序中輕松創建子進程:
import os # 注意,fork函數,只在Unix/Linux/Mac上運行,windows不可以
pid = os.fork() if pid == 0: print('哈哈1') else: print('哈哈2')
注意:fork()函數只能在Unix/Linux/Mac上面運行,不可以在Windows上面運行。
說明:
- 程序執行到os.fork()時,操作系統會創建一個新的進程(子進程),然后復制父進程的所有信息到子進程中
- 然后父進程和子進程都會從fork()函數中得到一個返回值,在子進程中這個值一定是0,而父進程中是子進程的 id號
在Unix/Linux操作系統中,提供了一個fork()系統函數,它非常特殊。
普通的函數調用,調用一次,返回一次,但是fork()調用一次,返回兩次,因為操作系統自動把當前進程(稱為父進程)復制了一份(稱為子進程),然后,分別在父進程和子進程內返回。
子進程永遠返回0,而父進程返回子進程的ID。
這樣做的理由是,一個父進程可以fork出很多子進程,所以,父進程要記下每個子進程的ID,而子進程只需要調用getppid()就可以拿到父進程的ID。我們可以通過os.getpid()獲取當前進程ID,通過os.getppid()獲取父進程ID。
那么,父子進程之間的執行有順序嗎?
答案是沒有!這完全取決於操作系統的調度算法。
而多次fork()就會產生一個樹的結構:
(2)multiprocessing方式
如果你打算編寫多進程的服務程序,Unix/Linux無疑是正確的選擇。由於Windows沒有fork調用,難道在Windows上無法用Python編寫多進程的程序?當然可以!由於Python是跨平台的,自然也應該提供一個跨平台的多進程支持。multiprocessing模塊就是跨平台版本的多進程模塊。
#coding=utf-8
from multiprocessing import Process import os # 子進程要執行的代碼
def run_proc(name): print('子進程運行中,name= %s ,pid=%d...' % (name, os.getpid())) import time time.sleep(10) print('子進程已結束') if __name__=='__main__': print('父進程 %d.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('子進程將要執行') p.start()
從結果我們看出,只要通過start()開啟了子進程之后,主進程會等待子進程執行完才結束!
Process的語法結構如下:
Process([group [, target [, name [, args [, kwargs]]]]]) target:表示這個進程實例所調用對象; args:表示調用對象的位置參數元組; kwargs:表示調用對象的關鍵字參數字典; name:為當前進程實例的別名; group:大多數情況下用不到; Process類常用方法: is_alive():判斷進程實例是否還在執行; join([timeout]):是否等待進程實例執行結束,或等待多少秒; start():啟動進程實例(創建子進程); run():如果沒有給定target參數,對這個對象調用start()方法時,就將執行對象中的run()方法; terminate():不管任務是否完成,立即終止; Process類常用屬性: name:當前進程實例別名,默認為Process-N,N為從1開始遞增的整數; pid:當前進程實例的PID值;
當然我們還可以通過繼承的方式來創建進程。
#coding=utf-8
from multiprocessing import Process import os class MyProcess(Process): # 因為Process類本身也有__init__方法,這個子類相當於重寫了這個方法,
# 但這樣就會帶來一個問題,我們並沒有完全的初始化一個Process類,所以就不能使用從這個類繼承的一些方法和屬性,
# 最好的方法就是將繼承類本身傳遞給Process.__init__方法,完成這些初始化操作
def __init__(self, name): Process.__init__(self) self.name = name def run(self): print('子進程運行中,name= %s ,pid=%d...' % (self.name, os.getpid())) import time time.sleep(10) print('子進程已結束') if __name__=='__main__': print('父進程 %d.' % os.getpid()) p = MyProcess('test') print('子進程將要執行') p.start()
執行效果同上。
(3)Pool方式
當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。
初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來執行。
#coding=utf-8
from multiprocessing import Pool import os, time, random def worker(msg): print("%s開始執行,進程號為%d"%(msg, os.getpid())) time.sleep(1) print "%s執行完畢"%(msg) if __name__ == '__main__': po = Pool(3) # 定義一個進程池,最大進程數3
for i in range(10): # Pool.apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閑出來的子進程去調用目標
po.apply_async(worker, (i,)) print("----start----") po.close() # 關閉進程池,關閉后po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之后
print("-----end-----")
這里有一點需要注意:一定要寫po.join(),否則主進程不會等着子進程執行完再結束的!
執行結果:
----start----
0開始執行,進程號為13404
1開始執行,進程號為13304
2開始執行,進程號為11692
0執行完畢
3開始執行,進程號為13404
2執行完畢
1執行完畢
4開始執行,進程號為11692
5開始執行,進程號為13304
3執行完畢
6開始執行,進程號為13404
4執行完畢
7開始執行,進程號為11692
5執行完畢
8開始執行,進程號為13304
6執行完畢
9開始執行,進程號為13404
7執行完畢
8執行完畢
9執行完畢
-----end-----
如果換成apply,那么結果如下:
0開始執行,進程號為8624
0執行完畢
1開始執行,進程號為12956
1執行完畢
2開始執行,進程號為8168
2執行完畢
3開始執行,進程號為8624
3執行完畢
4開始執行,進程號為12956
4執行完畢
5開始執行,進程號為8168
5執行完畢
6開始執行,進程號為8624
6執行完畢
7開始執行,進程號為12956
7執行完畢
8開始執行,進程號為8168
8執行完畢
9開始執行,進程號為8624
9執行完畢
----start----
-----end-----
看完程序,我們研究一下其函數:
multiprocessing.Pool常用函數解析:
apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(並行執行,堵塞方式必須等待上一個進程退出才能執行下一個進程),args為傳遞給func的參數列表,kwds為傳遞給func的關鍵字參數列表;
apply(func[, args[, kwds]]):使用阻塞方式調用func
close():關閉Pool,使其不再接受新的任務;
terminate():不管任務是否完成,立即終止;
join():主進程阻塞,等待子進程的退出, 必須在close或terminate之后使用;
實現一個多進程下的文件夾復制功能:
#coding=utf-8
import os from multiprocessing import Pool def copyFileTask(name, oldFolderName, newFolderName): # 完成copy一個文件的功能
fr = open(oldFolderName+"/"+name, 'rb+') fw = open(newFolderName+"/"+name, 'wb+') str = fr.read(1024 * 5) while (str != ''): fw.write(str) str = fr.read(1024 * 5) fr.close() fw.close() def main(): # 獲取要copy的文件夾名字
oldFolderName = raw_input('請輸入文件夾名字:') # 創建一個文件夾
newFolderName = oldFolderName+'-復件'.decode('utf-8').encode('gbk') os.mkdir(newFolderName) #獲取old文件夾里面所有文件的名字
fileNames = os.listdir(oldFolderName) #使用多進程的方式copy原文件夾所有內容到新的文件夾中
pool = Pool(5) for name in fileNames: pool.apply_async(copyFileTask, (name, oldFolderName, newFolderName)) pool.close() pool.join() if __name__ == '__main__': main()
2.2 進程間通信
我們之前已經說過,進程間通信比較麻煩,但是也不是不可以哦,我們這里通過其中的一種message queue的方式進行進程間通信。
初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,或數量為負值,那么就代表可接受的消息數量沒有上限(直到內存的盡頭);
- Queue.qsize():返回當前隊列包含的消息數量;
- Queue.empty():如果隊列為空,返回True,反之False ;
- Queue.full():如果隊列滿了,返回True,反之False;
- Queue.get([block[, timeout]]):獲取隊列中的一條消息,然后將其從列隊中移除,block默認值為True;
1)如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果為空,此時程序將被阻塞(停在讀取狀態),直到從消息列隊讀到消息為止,如果設置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出"Queue.Empty"異常;
2)如果block值為False,消息列隊如果為空,則會立刻拋出"Queue.Empty"異常;
- Queue.get_nowait():相當Queue.get(False);
- Queue.put(item,[block[, timeout]]):將item消息寫入隊列,block默認值為True;
如果block使用默認值,且沒有設置timeout(單位秒),消息列隊如果已經沒有空間可寫入,此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間為止,如果設置了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"異常;如果block值為False,消息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"異常;
- Queue.put_nowait(item):相當Queue.put(item, False);
注意:如果以multiprocessing的方式創建進程,使用multiprocessing.Queue()創建隊列;如果以進程池的方式創建進程,以multiprocessing.Manager()中的Queue()創建隊列,否則會報錯!
下面是一個多進程文件復制實例:
#coding=utf-8
import shutil from multiprocessing import Process, Pool, Manager, Queue import os from time import sleep def read(q): f = open('C:\Users\Think\Pictures\頭像.jpg'.decode('utf-8').encode('gbk'), 'rb+') # for line in f.readlines():
# q.put(line)
# shutil.copy('C:\Users\Think\Pictures\頭像.jpg'.decode('utf-8').encode('gbk'), 'C:\Users\Think\Pictures\頭像2.jpg'.decode('utf-8').encode('gbk'))
str = f.read(5) while (str != ''): q.put(str) str = f.read(5) f.close() def write(q): f = open('C:\Users\Think\Pictures\頭像2.jpg'.decode('utf-8').encode('gbk'), 'ab+') for i in range(q.qsize()): f.write(q.get()) f.close() if __name__ == '__main__': po = Pool(2) q = Manager().Queue() # q = Queue() # 這種方式不適合進程池
po.apply(read, (q,)) po.apply(write, (q,)) po.close() po.join()
3. Python中的線程
3.1 GIL
在介紹Python中的線程之前,先明確一個問題,Python中的多線程是假的多線程! 為什么這么說,我們先明確一個概念,全局解釋器鎖(GIL)。
Python代碼的執行由Python虛擬機(解釋器)來控制。Python在設計之初就考慮要在主循環中,同時只有一個線程在執行,就像單CPU的系統中運行多個進程那樣,內存中可以存放多個程序,但任意時刻,只有一個程序在CPU中運行。同樣地,雖然Python解釋器可以運行多個線程,只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同時只有一個線程在運行。在多線程環境中,Python虛擬機按照以下方式執行。
1.設置GIL。
2.切換到一個線程去執行。
3.運行。
4.把線程設置為睡眠狀態。
5.解鎖GIL。
6.再次重復以上步驟。
對所有面向I/O的(會調用內建的操作系統C代碼的)程序來說,GIL會在這個I/O調用之前被釋放,以允許其他線程在這個線程等待I/O的時候運行。如果某線程並未使用很多I/O操作,它會在自己的時間片內一直占用處理器和GIL。也就是說,I/O密集型的Python程序比計算密集型的Python程序更能充分利用多線程的好處。
我們都知道,比方我有一個4核的CPU,那么這樣一來,在單位時間內每個核只能跑一個線程,然后時間片輪轉切換。但是Python不一樣,它不管你有幾個核,單位時間多個核只能跑一個線程,然后時間片輪轉。看起來很不可思議?但是這就是GIL搞的鬼。任何Python線程執行前,必須先獲得GIL鎖,然后,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把所有線程的執行代碼都給上了鎖,所以,多線程在Python中只能交替執行,即使100個線程跑在100核CPU上,也只能用到1個核。通常我們用的解釋器是官方實現的CPython,要真正利用多核,除非重寫一個不帶GIL的解釋器。
我們不妨做個試驗:
#coding=utf-8
from multiprocessing import Pool from threading import Thread from multiprocessing import Process def loop(): while True: pass
if __name__ == '__main__': for i in range(3): t = Thread(target=loop) t.start() while True: pass
我的電腦是4核,所以我開了4個線程,看一下CPU資源占有率:
我們發現CPU利用率並沒有占滿,大致相當於單核水平。
而如果我們變成進程呢?
我們改一下代碼:
#coding=utf-8
from multiprocessing import Pool from threading import Thread from multiprocessing import Process def loop(): while True: pass
if __name__ == '__main__': for i in range(3): t = Process(target=loop) t.start() while True: pass
結果直接飆到了100%,說明進程是可以利用多核的!
為了驗證這是Python中的GIL搞得鬼,我試着用Java寫相同的代碼,開啟線程,我們觀察一下:
package com.darrenchan.thread; public class TestThread { public static void main(String[] args) { for (int i = 0; i < 3; i++) { new Thread(new Runnable() { @Override public void run() { while (true) { } } }).start(); } while(true){ } } }
由此可見,Java中的多線程是可以利用多核的,這是真正的多線程!而Python中的多線程只能利用單核,這是假的多線程!
難道就如此?我們沒有辦法在Python中利用多核?當然可以!剛才的多進程算是一種解決方案,還有一種就是調用C語言的鏈接庫。對所有面向I/O的(會調用內建的操作系統C代碼的)程序來說,GIL會在這個I/O調用之前被釋放,以允許其他線程在這個線程等待I/O的時候運行。我們可以把一些 計算密集型任務用C語言編寫,然后把.so鏈接庫內容加載到Python中,因為執行C代碼,GIL鎖會釋放,這樣一來,就可以做到每個核都跑一個線程的目的!
可能有的小伙伴不太理解什么是計算密集型任務,什么是I/O密集型任務?
計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。
計算密集型任務由於主要消耗CPU資源,因此,代碼運行效率至關重要。Python這樣的腳本語言運行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。
第二種任務的類型是IO密集型,涉及到網絡、磁盤IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和內存的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。
IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少,因此,用運行速度極快的C語言替換用Python這樣運行速度極低的腳本語言,完全無法提升運行效率。對於IO密集型任務,最合適的語言就是開發效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
綜上,Python多線程相當於單核多線程,多線程有兩個好處:CPU並行,IO並行,單核多線程相當於自斷一臂。所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴展來實現,不過這樣就失去了Python簡單易用的特點。不過,也不用過於擔心,Python雖然不能利用多線程實現多核任務,但可以通過多進程實現多核任務。多個Python進程有各自獨立的GIL鎖,互不影響。
分享一個視頻教程:https://pan.baidu.com/s/1i4I9je9
分享廖雪峰的博客:廖雪峰博客
分享魏印福的博客:魏印福博客
3.2 多線程創建方式
Python的標准庫提供了兩個模塊:thread和threading,thread是低級模塊,threading是高級模塊,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高級模塊。
創建方式很簡單,就和Process差不多,剛才那個例子就說明問題了:
#coding=utf-8
from multiprocessing import Pool from threading import Thread from multiprocessing import Process def loop(): while True: pass
if __name__ == '__main__': for i in range(3): t = Thread(target=loop) t.start() while True: pass
注意:同剛才進程這種創建方式一樣,主線程會等待所有的子線程結束后才結束!
查看線程數量,可以通過
len(threading.enumerate())
查看線程名字,可以通過
threading.currentThread().name
同樣也可以通過繼承的方式創建(重寫run方法):
#coding=utf-8
from multiprocessing import Pool from threading import Thread import threading from multiprocessing import Process class MyThread(Thread): def run(self): while True: pass
if __name__ == '__main__': for i in range(3): t = MyThread() t.start() while True: # print len(threading.enumerate())
pass
多線程的執行順序也是不固定的!
線程的幾種運行狀態:
3.3 進程和線程全局變量共享比較
先看一下進程:
from multiprocessing import Process import time from threading import Thread m = 5
def func1(): global m m += 1
print m def func2(): global m time.sleep(3) print m if __name__ == '__main__': p1 = Process(target=func1) p2 = Process(target=func2) p1.start() p2.start()
執行結果是6和5。
如果是線程呢?
from multiprocessing import Process import time from threading import Thread m = 5
def func1(): global m m += 1
print m def func2(): global m time.sleep(3) print m if __name__ == '__main__': t1 = Thread(target=func1) t2 = Thread(target=func2) t1.start() t2.start()
執行結果是6和6。
由此說明,
- 多進程中,每個進程中所有數據(包括全局變量)都各有擁有一份,互不影響
- 在一個進程內的所有線程共享全局變量,能夠在不適用其他方式的前提下完成多線程之間的數據共享(但是局部變量是自己的)
3.4 同步和互斥鎖
同步就是協同步調,按預定的先后次序進行運行。通過一個最簡單的例子說明:
#coding=utf-8
from threading import Thread import time g_num = 0 def test1(): global g_num for i in range(1000000): g_num += 1
print("---test1---g_num=%d"%g_num) def test2(): global g_num for i in range(1000000): g_num += 1
print("---test2---g_num=%d"%g_num) p1 = Thread(target=test1) p1.start() # time.sleep(3) #取消屏蔽之后 再次運行程序,結果會不一樣,,,為啥呢?
p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num)
加了屏蔽之后,運行結果如下:
---g_num=2880---
---test1---g_num=1374568
---test2---g_num=1376569
不加屏蔽,運行結果如下:
---test1---g_num=1000000
---g_num=1000034---
---test2---g_num=2000000
問題產生的原因就是沒有控制多個線程對同一資源的訪問,對數據造成破壞,使得線程運行的結果不可預期。如果多個線程共同訪問同一片數據,則由於數據訪問的順序不一樣,有可能導致數據結果不一致的問題,這種現象稱為“競態條件”。
解決辦法?上鎖!Java中我們一般通過synchronized關鍵字或者Lock類,可以參考我以前的博客:http://www.cnblogs.com/DarrenChan/p/6528578.html
在Python里面呢?
#創建鎖
mutex = threading.Lock() #鎖定
mutex.acquire([blocking]) #釋放
mutex.release()
其中,鎖定方法acquire可以有一個blocking參數。
如果設定blocking為True,則當前線程會堵塞,直到獲取到這個鎖為止(如果沒有指定,那么默認為True)
如果設定blocking為False,則當前線程不會堵塞
剛才那個例子:
#coding=utf-8
from threading import Thread, Lock import time g_num = 0 def test1(): global g_num for i in range(1000000): if mutex.acquire(): g_num += 1 mutex.release() print("---test1---g_num=%d"%g_num) def test2(): global g_num for i in range(1000000): if mutex.acquire(): g_num += 1 mutex.release() print("---test2---g_num=%d"%g_num) mutex = Lock() p1 = Thread(target=test1) p1.start() p2 = Thread(target=test2) p2.start() print("---g_num=%d---"%g_num)
---g_num=10---
---test1---g_num=1999392
---test2---g_num=2000000
執行結果最后2000000,符合預期。當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之后,鎖進入“unlocked”狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
鎖的好處:
- 確保了某段關鍵代碼只能由一個線程從頭到尾完整地執行
鎖的壞處:
- 阻止了多線程並發執行,包含鎖的某段代碼實際上只能以單線程模式執行,效率就大大地下降了
- 由於可以存在多個鎖,不同的線程持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖(死鎖可以通過銀行家算法解決)
那,如果我想讓多個線程按照一定順序執行呢?
#coding=utf-8
from threading import Thread,Lock from time import sleep class Task1(Thread): def run(self): while True: if lock1.acquire(): print("------Task 1 -----") sleep(0.5) lock2.release() class Task2(Thread): def run(self): while True: if lock2.acquire(): print("------Task 2 -----") sleep(0.5) lock3.release() class Task3(Thread): def run(self): while True: if lock3.acquire(): print("------Task 3 -----") sleep(0.5) lock1.release() #使用Lock創建出的鎖默認沒有“鎖上”
lock1 = Lock() #創建另外一把鎖,並且“鎖上”
lock2 = Lock() lock2.acquire() #創建另外一把鎖,並且“鎖上”
lock3 = Lock() lock3.acquire() t1 = Task1() t2 = Task2() t3 = Task3() t1.start() t2.start() t3.start()
執行結果如下:
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
------Task 1 -----
------Task 2 -----
------Task 3 -----
......
3.5 生產者與消費者模式
為什么要使用生產者和消費者模式?
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
什么是生產者消費者模式?
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦。

#encoding=utf-8
import threading import time #python2中
from Queue import Queue #python3中 # from queue import Queue
class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count +1 msg = '生成產品'+str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消費了 '+queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(500): queue.put('初始產品'+str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
3.6 ThreadLocal
在多線程環境下,每個線程都有自己的數據。一個線程使用自己的局部變量比使用全局變量好,因為局部變量只有線程自己能看見,不會影響其他線程,而全局變量的修改必須加鎖。
但是局部變量函數調用很麻煩,需要一層一層傳遞參數。
def process_student(name): std = Student(name) # std是局部變量,但是每個函數都要用它,因此必須傳進去:
do_task_1(std) do_task_2(std) def do_task_1(std): do_subtask_1(std) do_subtask_2(std) def do_task_2(std): do_subtask_2(std) do_subtask_2(std)
那我們可以采用全局變量嗎?不行,因為每個線程處理不同對象,不能共享!
我們可以采用全局字典的辦法:
global_dict = {} def std_thread(name): std = Student(name) # 把std放到全局變量global_dict中:
global_dict[threading.current_thread()] = std do_task_1() do_task_2() def do_task_1(): # 不傳入std,而是根據當前線程查找:
std = global_dict[threading.current_thread()] ... def do_task_2(): # 任何函數都可以查找出當前線程的std變量:
std = global_dict[threading.current_thread()] ...
ThreadLocal就是幫我們干了這樣一件事!全局變量local_school就是一個ThreadLocal對象,每個Thread對它都可以讀寫student屬性,但互不影響。你可以把local_school看成全局變量,但每個屬性如local_school.student都是線程的局部變量,可以任意讀寫而互不干擾,也不用管理鎖的問題,ThreadLocal內部會處理。可以理解為全局變量local_school是一個dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等。ThreadLocal最常用的地方就是為每個線程綁定一個數據庫連接,HTTP請求,用戶身份信息等,這樣一個線程的所有調用到的處理函數都可以非常方便地訪問這些資源。
#coding=utf-8
import threading # 創建全局ThreadLocal對象:
local_school = threading.local() def process_student(): # 獲取當前線程關聯的student:
std = local_school.student print('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread(name): # 綁定ThreadLocal的student:
local_school.student = name process_student() if __name__ == '__main__': t1 = threading.Thread(target= process_thread, args=('陳馳',), name='Thread-A') t2 = threading.Thread(target= process_thread, args=('劉卓',), name='Thread-B') t1.start() t2.start()
Hello, 陳馳 (in Thread-A)
Hello, 劉卓 (in Thread-B)
3.7 異步
類似於Ajax的回調,python中也可以。
#coding=utf-8
from multiprocessing import Pool import time import os def test111(): print ("---進程池中的進程---pid=%d,ppid=%d--"%(os.getpid(), os.getppid())) for i in range(3): print("----%d---"%i) time.sleep(1) return "hahah"
def test222(args): print("---callback func--pid=%d"%os.getpid()) print("---callback func--args=%s"%args) if __name__ == '__main__': pool = Pool(3) pool.apply_async(func=test111, callback=test222) pool.close() pool.join() time.sleep(5) print("----主進程-pid=%d----" % os.getpid())
在Python下運行,剛開始執行test111里面的內容,等執行完,會通知主進程進行回調函數,執行test222。
---進程池中的進程---pid=8044,ppid=5988--
----0---
----1---
----2---
---callback func--pid=5988
---callback func--args=hahah
----主進程-pid=5988----
4. 總結
首先,要實現多任務,通常我們會設計Master-Worker模式,Master負責分配任務,Worker負責執行任務,因此,多任務環境下,通常是一個Master,多個Worker。
如果用多進程實現Master-Worker,主進程就是Master,其他進程就是Worker。
如果用多線程實現Master-Worker,主線程就是Master,其他線程就是Worker。
多進程模式最大的優點就是穩定性高,因為一個子進程崩潰了,不會影響主進程和其他子進程。(當然主進程掛了所有進程就全掛了,但是Master進程只負責分配任務,掛掉的概率低)著名的Apache最早就是采用多進程模式。
多進程模式的缺點是創建進程的代價大,在Unix/Linux系統下,用fork
調用還行,在Windows下創建進程開銷巨大。另外,操作系統能同時運行的進程數也是有限的,在內存和CPU的限制下,如果有幾千個進程同時運行,操作系統連調度都會成問題。
多線程模式通常比多進程快一點,但是也快不到哪去,而且,多線程模式致命的缺點就是任何一個線程掛掉都可能直接造成整個進程崩潰,因為所有線程共享進程的內存。在Windows上,如果一個線程執行的代碼出了問題,你經常可以看到這樣的提示:“該程序執行了非法操作,即將關閉”,其實往往是某個線程出了問題,但是操作系統會強制結束整個進程。
在Windows下,多線程的效率比多進程要高,所以微軟的IIS服務器默認采用多線程模式。由於多線程存在穩定性的問題,IIS的穩定性就不如Apache。為了緩解這個問題,IIS和Apache現在又有多進程+多線程的混合模式,真是把問題越搞越復雜。
4.1 線程切換
無論是多進程還是多線程,只要數量一多,效率肯定上不去,為什么呢?
我們打個比方,假設你不幸正在准備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。
如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱為單任務模型,或者批處理任務模型。
假設你打算切換到多任務模型,可以先做1分鍾語文,再切換到數學作業,做1分鍾,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多任務是一樣的了,以幼兒園小朋友的眼光來看,你就正在同時寫5科作業。
但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫保存現場),然后,打開數學課本、找出圓規直尺(這叫准備新環境),才能開始做數學作業。操作系統在切換進程或者線程時也是一樣的,它需要先保存當前執行的現場環境(CPU寄存器狀態、內存頁等),然后,把新任務的執行環境准備好(恢復上次的寄存器狀態,切換內存頁等),才能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬盤狂響,點窗口無反應,系統處於假死狀態。
所以,多任務一旦多到一個限度,就會消耗掉系統所有的資源,結果效率急劇下降,所有任務都做不好。
4.2 異步IO
考慮到CPU和IO之間巨大的速度差異,一個任務在執行的過程中大部分時間都在等待IO操作,單進程單線程模型會導致別的任務無法並行執行,因此,我們才需要多進程模型或者多線程模型來支持多任務並發執行。
現代操作系統對IO操作已經做了巨大的改進,最大的特點就是支持異步IO。如果充分利用操作系統提供的異步IO支持,就可以用單進程單線程模型來執行多任務,這種全新的模型稱為事件驅動模型,Nginx就是支持異步IO的Web服務器,它在單核CPU上采用單進程模型就可以高效地支持多任務。在多核CPU上,可以運行多個進程(數量與CPU核心數相同),充分利用多核CPU。由於系統總的進程數量十分有限,因此操作系統調度非常高效。用異步IO編程模型來實現多任務是一個主要的趨勢。
對應到Python語言,單進程的異步編程模型稱為協程,有了協程的支持,就可以基於事件驅動編寫高效的多任務程序。我們會在后面討論如何編寫協程。
參考:廖雪峰博客
《Python核心編程》