https://www.cnblogs.com/yuanchenqi/articles/6755717.html


 

一 進程與線程的概念

1.1 進程

考慮一個場景:瀏覽器,網易雲音樂以及notepad++ 三個軟件只能順序執行是怎樣一種場景呢?另外,假如有兩個程序A和B,程序A在執行到一半的過程中,需要讀取大量的數據輸入(I/O操作),而此時CPU只能靜靜地等待任務A讀取完數據才能繼續執行,這樣就白白浪費了CPU資源。你是不是已經想到在程序A讀取數據的過程中,讓程序B去執行,當程序A讀取完數據之后,讓程序B暫停。聰明,這當然沒問題,但這里有一個關鍵詞:切換。

既然是切換,那么這就涉及到了狀態的保存,狀態的恢復,加上程序A與程序B所需要的系統資源(內存,硬盤,鍵盤等等)是不一樣的。自然而然的就需要有一個東西去記錄程序A和程序B分別需要什么資源,怎樣去識別程序A和程序B等等(比如讀書)。

進程定義:

進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據集、進程控制塊三部分組成。我們編寫的程序用來描述進程要完成哪些功能以及如何完成;數據集則是程序在執行過程中所需要使用的資源;進程控制塊用來記錄進程的外部特征,描述進程的執行變化過程,系統可以利用它來控制和管理進程,它是系統感知進程存在的唯一標志。

舉一例說明進程:
想象一位有一手好廚藝的計算機科學家正在為他的女兒烘制生日蛋糕。他有做生日蛋糕的食譜,廚房里有所需的原料:面粉、雞蛋、糖、香草汁等。在這個比喻中,做蛋糕的食譜就是程序(即用適當形式描述的算法)計算機科學家就是處理器(cpu),而做蛋糕的各種原料就是輸入數據。進程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和。現在假設計算機科學家的兒子哭着跑了進來,說他的頭被一只蜜蜂蟄了。計算機科學家就記錄下他照着食譜做到哪兒了(保存進程的當前狀態),然后拿出一本急救手冊,按照其中的指示處理蟄傷。這里,我們看到處理機從一個進程(做蛋糕)切換到另一個高優先級的進程(實施醫療救治),每個進程擁有各自的程序(食譜和急救手冊)。當蜜蜂蟄傷處理完之后,這位計算機科學家又回來做蛋糕,從他離開時的那一步繼續做下去。

1.2 線程

線程的出現是為了降低上下文切換的消耗,提高系統的並發性,並突破一個進程只能干一樣事的缺陷,使到進程內並發成為可能。

假設,一個文本程序,需要接受鍵盤輸入,將內容顯示在屏幕上,還需要保存信息到硬盤中。若只有一個進程,勢必造成同一時間只能干一樣事的尷尬(當保存時,就不能通過鍵盤輸入內容)。若有多個進程,每個進程負責一個任務,進程A負責接收鍵盤輸入的任務,進程B負責將內容顯示在屏幕上的任務,進程C負責保存內容到硬盤中的任務。這里進程A,B,C間的協作涉及到了進程通信問題,而且有共同都需要擁有的東西——-文本內容,不停的切換造成性能上的損失。若有一種機制,可以使任務A,B,C共享資源,這樣上下文切換所需要保存和恢復的內容就少了,同時又可以減少通信所帶來的性能損耗,那就好了。是的,這種機制就是線程。
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,由線程ID、程序計數器、寄存器集合和堆棧共同組成。線程的引入減小了程序並發執行時的開銷,提高了操作系統的並發性能。線程沒有自己的系統資源。

1.3 進程與線程的關系

進程是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操作系統結構的基礎。或者說進程是具有一定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位。
線程則是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。

進程和線程的關系:

(1)一個線程只能屬於一個進程,而一個進程可以有多個線程,但至少有一個線程。
(2)資源分配給進程,同一進程的所有線程共享該進程的所有資源。
(3)CPU分給線程,即真正在CPU上運行的是線程。

1.4 並行和並發

並行處理(Parallel Processing)是計算機系統中能同時執行兩個或更多個處理的一種計算方法。並行處理可同時工作於同一程序的不同方面。並行處理的主要目的是節省大型和復雜問題的解決時間。並發處理(concurrency Processing):指一個時間段中有幾個程序都處於已啟動運行到運行完畢之間,且這幾個程序都是在同一個處理機(CPU)上運行,但任一個時刻點上只有一個程序在處理機(CPU)上運行

並發的關鍵是你有處理多個任務的能力,不一定要同時。並行的關鍵是你有同時處理多個任務的能力。所以說,並行是並發的子集

1.5 同步與異步

在計算機領域,同步就是指一個進程在執行某個請求的時候,若該請求需要一段時間才能返回信息,那么這個進程將會一直等待下去,直到收到返回信息才繼續執行下去;異步是指進程不需要一直等下去,而是繼續執行下面的操作,不管其他進程的狀態。當有消息返回時系統會通知進程進行處理,這樣可以提高執行的效率。舉個例子,打電話時就是同步通信,發短息時就是異步通信。

二 threading模塊

2.1 線程對象的創建

2.1.1 Thread類直接創建

復制代碼
import threading import time def countNum(n): # 定義某個線程要運行的函數 print("running on number:%s" %n) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=countNum,args=(23,)) #生成一個線程實例 t2 = threading.Thread(target=countNum,args=(34,)) t1.start() #啟動線程  t2.start() print("ending!")
復制代碼

2.1.2 Thread類繼承式創建

復制代碼
#繼承Thread式創建 import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num=num def run(self): print("running on number:%s" %self.num) time.sleep(3) t1=MyThread(56) t2=MyThread(78) t1.start() t2.start() print("ending")
復制代碼

2.2 Thread類的實例方法

2.2.1 join()和setDaemon()

復制代碼
# join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。 # setDaemon(True): ''' 將線程聲明為守護線程,必須在start() 方法調用之前設置,如果不設置為守護線程程序會被無限掛起。 當我們在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成 想退出時,會檢驗子線程是否完成。如果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是只要主線程 完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦''' import threading from time import ctime,sleep import time def Music(name): print ("Begin listening to {name}. {time}".format(name=name,time=ctime())) sleep(3) print("end listening {time}".format(time=ctime())) def Blog(title): print ("Begin recording the {title}. {time}".format(title=title,time=ctime())) sleep(5) print('end recording {time}'.format(time=ctime())) threads = [] t1 = threading.Thread(target=Music,args=('FILL ME',)) t2 = threading.Thread(target=Blog,args=('',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': #t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:一定在start之前設置  t.start() #t.join() #t1.join() #t2.join() # 考慮這三種join位置下的結果? print ("all over %s" %ctime())
復制代碼
復制代碼
daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False. The entire Python program exits when no alive non-daemon threads are left. 當daemon被設置為True時,如果主線程退出,那么子線程也將跟着退出, 反之,子線程將繼續運行,直到正常退出。
復制代碼

 2.2.2 其它方法

復制代碼
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
復制代碼

2.3 GIL(全局解釋器鎖)

復制代碼
'''
定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''
復制代碼

Python中的線程是操作系統的原生線程,Python虛擬機使用一個全局解釋器鎖(Global Interpreter Lock)來互斥線程對Python虛擬機的使用。為了支持多線程機制,一個基本的要求就是需要實現不同線程對共享資源訪問的互斥,所以引入了GIL。
GIL:在一個線程擁有了解釋器的訪問權之后,其他的所有線程都必須等待它釋放解釋器的訪問權,即使這些線程的下一條指令並不會互相影響。
在調用任何Python C API之前,要先獲得GIL
GIL缺點:多處理器退化為單處理器;優點:避免大量的加鎖解鎖操作

2.3.1 GIL的早期設計

Python支持多線程,而解決多線程之間數據完整性和狀態同步的最簡單方法自然就是加鎖。 於是有了GIL這把超級大鎖,而當越來越多的代碼庫開發者接受了這種設定后,他們開始大量依賴這種特性(即默認python內部對象是thread-safe的,無需在實現時考慮額外的內存鎖和同步操作)。慢慢的這種實現方式被發現是蛋疼且低效的。但當大家試圖去拆分和去除GIL的時候,發現大量庫代碼開發者已經重度依賴GIL而非常難以去除了。有多難?做個類比,像MySQL這樣的“小項目”為了把Buffer Pool Mutex這把大鎖拆分成各個小鎖也花了從5.5到5.6再到5.7多個大版為期近5年的時間,並且仍在繼續。MySQL這個背后有公司支持且有固定開發團隊的產品走的如此艱難,那又更何況Python這樣核心開發和代碼貢獻者高度社區化的團隊呢?

2.3.2 GIL的影響

無論你啟多少個線程,你有多少個cpu, Python在執行一個進程的時候會淡定的在同一時刻只允許一個線程運行。
所以,python是無法利用多核CPU實現多線程的。
這樣,python對於計算密集型的任務開多線程的效率甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率還是有顯著提升的。

計算密集型:

View Code

2.3.3 解決方案

用multiprocessing替代Thread multiprocessing庫的出現很大程度上是為了彌補thread庫因為GIL而低效的缺陷。它完整的復制了一套thread所提供的接口方便遷移。唯一的不同就是它使用了多進程而不是多線程。每個進程有自己的獨立的GIL,因此也不會出現進程之間的GIL爭搶。

View Code

當然multiprocessing也不是萬能良葯。它的引入會增加程序實現時線程間數據通訊和同步的困難。就拿計數器來舉例子,如果我們要多個線程累加同一個變量,對於thread來說,申明一個global變量,用thread.Lock的context包裹住三行就搞定了。而multiprocessing由於進程之間無法看到對方的數據,只能通過在主線程申明一個Queue,put再get或者用share memory的方法。這個額外的實現成本使得本來就非常痛苦的多線程程序編碼,變得更加痛苦了。

總結:因為GIL的存在,只有IO Bound場景下得多線程會得到較好的性能 - 如果對並行計算性能較高的程序可以考慮把核心部分也成C模塊,或者索性用其他語言實現 - GIL在較長一段時間內將會繼續存在,但是會不斷對其進行改進。

所以對於GIL,既然不能反抗,那就學會去享受它吧!

2.4 同步鎖 (Lock)

復制代碼
import time import threading def addNum(): global num #在每個線程中都獲取這個全局變量 #num-=1  temp=num time.sleep(0.1) num =temp-1 # 對此公共變量進行-1操作  num = 100 #設定一個共享變量  thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢  t.join() print('Result: ', num)
復制代碼

鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調用release方法釋放鎖:

復制代碼
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操作 ''' R.release()
復制代碼

擴展思考

View Code

2.5 死鎖與遞歸鎖

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程。

復制代碼
import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): self.fun1() self.fun2() def fun1(self): mutexA.acquire() # 如果鎖被占用,則阻塞在這里,等待鎖的釋放 print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) mutexB.release() mutexA.release() def fun2(self): mutexB.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time())) time.sleep(0.2) mutexA.acquire() print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time())) mutexA.release() mutexB.release() if __name__ == "__main__": print("start---------------------------%s"%time.time()) for i in range(0, 10): my_thread = MyThread() my_thread.start()
復制代碼

在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:

1
mutex = threading.RLock()

2.6 Event對象

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就 會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標志被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標志為假,那么這個線程將會被一直阻塞直至該標志為真。一個線程如果將一個Event對象的信號標志設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那么它將忽略這個事件, 繼續執行

復制代碼
event.isSet():返回event的狀態值;

event.wait():如果 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; event.clear():恢復event的狀態值為False。
復制代碼

      

可以考慮一種應用場景(僅僅作為說明),例如,我們有多個線程從Redis隊列中讀取數據來處理,這些線程都要嘗試去連接Redis的服務,一般情況下,如果Redis連接不成功,在各個線程的代碼中,都會去嘗試重新連接。如果我們想要在啟動時確保Redis服務正常,才讓那些工作線程去連接Redis服務器,那么我們就可以采用threading.Event機制來協調各個工作線程的連接操作:主線程中會去嘗試連接Redis服務,如果正常的話,觸發事件,各工作線程會嘗試連接Redis服務。

復制代碼
import threading import time import logging logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s',) def worker(event): logging.debug('Waiting for redis ready...') event.wait() logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1) def main(): readis_ready = threading.Event() t1 = threading.Thread(target=worker, args=(readis_ready,), name='t1') t1.start() t2 = threading.Thread(target=worker, args=(readis_ready,), name='t2') t2.start() logging.debug('first of all, check redis server, make sure it is OK, and then trigger the redis ready event') time.sleep(3) # simulate the check progress  readis_ready.set() if __name__=="__main__": main()
復制代碼

threading.Event的wait方法還接受一個超時參數,默認情況下如果事件一致沒有發生,wait方法會一直阻塞下去,而加入這個超時參數之后,如果阻塞時間超過這個參數設定的值之后,wait方法會返回。對應於上面的應用場景,如果Redis服務器一致沒有啟動,我們希望子線程能夠打印一些日志來不斷地提醒我們當前沒有一個可以連接的Redis服務,我們就可以通過設置這個超時參數來達成這樣的目的:

def worker(event): while not event.is_set(): logging.debug('Waiting for redis ready...') event.wait(2) logging.debug('redis ready, and connect to redis server and do some work [%s]', time.ctime()) time.sleep(1)

這樣,我們就可以在等待Redis服務啟動的同時,看到工作線程里正在等待的情況。

2.7 Semaphore(信號量)

Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞線程直到其他線程調用release()。

實例:(同時只有5個線程可以獲得semaphore,即可以限制最大連接數為5):

復制代碼
import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()
復制代碼

應用:連接池

思考:與Rlock的區別?

2.8 隊列(queue)

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

2.8.1 get與put方法

復制代碼
''' 創建一個“隊列”對象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數 maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值; 第二個block為可選參數,默認為 1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0, put方法將引發Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且 block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。 '''
復制代碼

2.8.2  join與task_done方法

復制代碼
''' join() 阻塞進程,直到所有任務完成,需要配合另一個方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某個任務完成。每一條get語句后需要一條task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
復制代碼

2.8.3 其他常用方法

復制代碼
''' 此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小 q.empty() 如果隊列為空,返回True,反之False q.full() 如果隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 相當q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列為空,再執行別的操作 '''
復制代碼

2.8.4 其他模式

復制代碼
''' Python Queue模塊有三種隊列及構造函數: 1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 2、LIFO類似於堆,即先進后出。 class queue.LifoQueue(maxsize) 3、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) import queue #先進后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #優先級 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) '''
復制代碼

2.8.5 生產者消費者模型

在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。

生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。

這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前台,而客戶去飯菜也不需要不找廚師,直接去前台領取即可,這也是一個結耦的過程。

復制代碼
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
復制代碼

三 multiprocessing模塊

Multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency,effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

由於GIL的存在,python中的多線程其實並不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多進程。

multiprocessing包是Python中的多進程管理包。與threading.Thread類似,它可以利用multiprocessing.Process對象來創建一個進程。該進程可以運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象可以像多線程那樣,通過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。所以,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

3.1 python的進程調用

復制代碼
# Process類調用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') # 繼承Process類調用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
復制代碼

3.2 process類

構造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 線程組,目前還沒有實現,庫引用中提示必須是None; 
  target: 要執行的方法; 
  name: 進程名; 
  args/kwargs: 要傳入方法的參數。

實例方法:

  is_alive():返回進程是否在運行。

  join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。

  start():進程准備就緒,等待CPU調度

  run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。

  terminate():不管任務是否完成,立即停止工作進程

屬性:

  daemon:和線程的setDeamon功能一樣

  name:進程名字。

  pid:進程號。

復制代碼
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) def foo(name): info(name) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=foo, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending")
復制代碼

通過tasklist(Win)或者ps -elf |grep(linux)命令檢測每一個進程號(PID)對應的進程名

3.3 進程間通訊 

3.3.1 進程對列Queue

復制代碼
from multiprocessing import Process, Queue import queue def f(q,n): #q.put([123, 456, 'hello']) q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #try: q=queue.Queue() print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get())
復制代碼

3.3.2 管道(pipe)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

復制代碼
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe()  p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("兒子你好!") p.join()
復制代碼

Pipe()返回的兩個連接對象代表管道的兩端。 每個連接對象都有send()和recv()方法(等等)。 請注意,如果兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。

3.3.3 manager

Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

復制代碼
from multiprocessing import Process, Manager def f(d, l,n): d[n] = n d["name"] ="alvin" l.append(n) #print("l",l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
復制代碼

3.4 進程池

進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。

復制代碼
from multiprocessing import Pool import time def foo(args): time.sleep(1) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=foo, args= (i,)) p.close() # 等子進程執行完畢后關閉線程池 # time.sleep(2) # p.terminate() # 立刻關閉線程池 p.join()
復制代碼

進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。

進程池中有以下幾個主要方法:

  1. apply:從進程池里取一個進程並執行
  2. apply_async:apply的異步版本
  3. terminate:立刻關閉線程池
  4. join:主進程等待所有子進程執行完畢,必須在close或terminate之后
  5. close:等待所有進程結束后,才關閉線程池

四 協程

協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。

協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:

協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。

4.1 yield與協程

復制代碼
import time """ 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。 如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高。 """ # 注意到consumer函數是一個generator(生成器): # 任何包含yield關鍵字的函數都會自動成為生成器(generator)對象 def consumer(): r = '' while True: # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回; # yield指令具有return關鍵字的作用。然后函數的堆棧會自動凍結(freeze)在這一行。 # 當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時, # 就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 1、首先調用c.next()啟動生成器  next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 2、然后,一旦生產了東西,通過c.send(n)切換到consumer執行; cr = c.send(n) # 4、produce拿到consumer處理的結果,繼續生產下一條消息; print('[PRODUCER] Consumer return: %s' % cr) # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。  c.close() if __name__=='__main__': # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。 c = consumer() produce(c) ''' result: [PRODUCER] →→ Producing 1... [CONSUMER] ←← Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 2... [CONSUMER] ←← Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 3... [CONSUMER] ←← Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 4... [CONSUMER] ←← Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 5... [CONSUMER] ←← Consuming 5... [PRODUCER] Consumer return: 200 OK '''
復制代碼

4.2 greenlet

greenlet機制的主要思想是:生成器函數或者協程函數中的yield語句掛起函數的執行,直到稍后使用next()或send()操作進行恢復為止。可以使用一個調度器循環在一組生成器函數之間協作多個任務。greentlet是python中實現我們所謂的"Coroutine(協程)"的一個基礎庫.

復制代碼
from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
復制代碼

4.2 基於greenlet的框架

4.2.1 gevent模塊實現協程

Python通過yield提供了對協程的基本支持,但是不完全。而第三方的gevent為Python提供了比較完善的協程支持。

gevent是第三方庫,通過greenlet實現協程,其基本思想是:

當一個greenlet遇到IO操作時,比如訪問網絡,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行。由於IO操作非常耗時,經常使程序處於等待狀態,有了gevent為我們自動切換協程,就保證總有greenlet在運行,而不是等待IO。

由於切換是在IO操作時自動完成,所以gevent需要修改Python自帶的一些標准庫,這一過程在啟動時通過monkey patch完成:

復制代碼
import gevent import time def foo(): print("running in foo") gevent.sleep(2) print("switch to foo again") def bar(): print("switch to bar") gevent.sleep(5) print("switch to bar again") start=time.time() gevent.joinall( [gevent.spawn(foo), gevent.spawn(bar)] ) print(time.time()-start)
復制代碼

當然,實際代碼里,我們不會用gevent.sleep()去切換協程,而是在執行到IO操作時,gevent自動切換,代碼如下:

復制代碼
from gevent import monkey
monkey.patch_all() import gevent from urllib import request import time def f(url): print('GET: %s' % url) resp = request.urlopen(url) data = resp.read() print('%d bytes received from %s.' % (len(data), url)) start=time.time() gevent.joinall([ gevent.spawn(f, 'https://itk.org/'), gevent.spawn(f, 'https://www.github.com/'), gevent.spawn(f, 'https://zhihu.com/'), ]) # f('https://itk.org/') # f('https://www.github.com/') # f('https://zhihu.com/') print(time.time()-start)
復制代碼

擴展

View Code

eventlet實現協程(了解)

eventlet 是基於 greenlet 實現的面向網絡應用的並發處理框架,提供“線程”池、隊列等與其他 Python 線程、進程模型非常相似的 api,並且提供了對 Python 發行版自帶庫及其他模塊的超輕量並發適應性調整方法,比直接使用 greenlet 要方便得多。

其基本原理是調整 Python 的 socket 調用,當發生阻塞時則切換到其他 greenlet 執行,這樣來保證資源的有效利用。需要注意的是:
eventlet 提供的函數只能對 Python 代碼中的 socket 調用進行處理,而不能對模塊的 C 語言部分的 socket 調用進行修改。對后者這類模塊,仍然需要把調用模塊的代碼封裝在 Python 標准線程調用中,之后利用 eventlet 提供的適配器實現 eventlet 與標准線程之間的協作。
雖然 eventlet 把 api 封裝成了非常類似標准線程庫的形式,但兩者的實際並發執行流程仍然有明顯區別。在沒有出現 I/O 阻塞時,除非顯式聲明,否則當前正在執行的 eventlet 永遠不會把 cpu 交給其他的 eventlet,而標准線程則是無論是否出現阻塞,總是由所有線程一起爭奪運行資源。所有 eventlet 對 I/O 阻塞無關的大運算量耗時操作基本沒有什么幫助。

總結

協程的好處:

無需線程上下文切換的開銷
無需原子操作鎖定及同步的開銷
方便切換控制流,簡化編程模型
高並發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高並發處理。
缺點:

無法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程需要和進程配合才能運行在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序

五 IO模型

同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什么,到底有什么區別?這個問題其實不同的人給出的答案都可能不同,比如wiki,就認為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同,並且在討論這個問題的時候上下文(context)也不相同。所以,為了更好的回答這個問題,先限定一下本文的上下文。
本文討論的背景是Linux環境下的network IO。 

Stevens在文章中一共比較了五種IO Model:

      •     blocking IO
      •     nonblocking IO
      •     IO multiplexing
      •     signal driven IO
      •     asynchronous IO

由於signal driven IO在實際中並不常用,所以我這只提及剩下的四種IO Model。
再說一下IO發生時涉及的對象和步驟。
對於一個network IO (這里我們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,它會經歷兩個階段:

  •  等待數據准備 (Waiting for the data to be ready)
  •  將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)

記住這兩點很重要,因為這些IO Model的區別就是在兩個階段上各有不同的情況。

5.1 blocking IO (阻塞IO)

在linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程大概是這樣:

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:准備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據准備好了,它就會將數據從kernel中拷貝到用戶內存,然后kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
所以,blocking IO的特點就是在IO執行的兩個階段都被block了。

5.2 non-blocking IO(非阻塞IO)

linux下,可以通過設置socket使其變為non-blocking。當對一個non-blocking socket執行讀操作時,流程是這個樣子:

從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有准備好,那么它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作后,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有准備好,於是它可以再次發送read操作。一旦kernel中的數據准備好了,並且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。所以,用戶進程其實是需要不斷的主動詢問kernel數據好了沒有。

 注意:

      在網絡IO時候,非阻塞IO也會進行recvform系統調用,檢查數據是否准備好,與阻塞IO不一樣,”非阻塞將大的整片時間的阻塞分成N多的小的阻塞, 所以進程不斷地有機會 ‘被’ CPU光顧”。即每次recvform系統調用之間,cpu的權限還在進程手中,這段時間是可以做其他事情的,

      也就是說非阻塞的recvform系統調用調用之后,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒准備好,此時會返回一個error。進程在返回之后,可以干點別的事情,然后再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據准備好,再拷貝數據到進程,進行數據處理。需要注意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

View Code

優點:能夠在等待任務完成的時間里干其他活了(包括提交其他任務,也就是 “后台” 可以有多個任務在同時執行)。

缺點:任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。

5.3 IO multiplexing(IO多路復用)

      IO multiplexing這個詞可能有點陌生,但是如果我說select,epoll,大概就都能明白了。有些地方也稱這種IO方式為event driven IO。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:

      當用戶進程調用了select,那么整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上,還更差一些。因為這里需要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。(多說一句。所以,如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。)
在IO multiplexing Model中,實際中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

結論: select的優勢在於可以處理多個連接,不適用於單個連接 

View Code 

思考1:select監聽fd變化的過程

用戶進程創建socket對象,拷貝監聽的fd到內核空間,每一個fd會對應一張系統文件表,內核空間的fd響應到數據后,就會發送信號給用戶進程數據已到;用戶進程再發送系統調用,比如(accept)將內核空間的數據copy到用戶空間,同時作為接受數據端內核空間的數據清除,這樣重新監聽時fd再有新的數據又可以響應到了(發送端因為基於TCP協議所以需要收到應答后才會清除)。

思考2: 上面的示例中,開啟三個客戶端,分別連續向server端發送一個內容(中間server端不回應),結果會怎樣,為什么?

 5.4 Asynchronous I/O(異步IO)

linux下的asynchronous IO其實用得很少。先看一下它的流程:

 

用戶進程發起read操作之后,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之后,首先它會立刻返回,所以不會對用戶進程產生任何block。然后,kernel會等待數據准備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

5.5 IO模型比較分析

      到目前為止,已經將四個IO Model都介紹完了。現在回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這兩者的區別。調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還准備數據的情況下會立刻返回。

在說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 
      兩者的區別就在於synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。有人可能會說,non-blocking IO並沒有被block啊。這里有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有准備好,這時候不會block進程。但是,當kernel中數據准備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不一樣,當進程發起IO 操作之后,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。

各個IO Model的比較如圖所示:

     

經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據准備完成以后,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然后他人做完后發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。

 5.6 selectors模塊

復制代碼
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)
復制代碼

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM