每次執行程序(比如說瀏覽器,音樂播放器)的時候都會完成一定的功能,比如說瀏覽器幫我們打開網頁。 進程就是一個程序在一個數據集上的一次動態執行過程。進程一般由程序、數據集、進程控制塊三部分組成。進程的創建、撤銷和切換的開銷比較大
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程中的最小單元,線程的引入減小了程序並發執行時的開銷。線程沒有自己的系統資源,只擁有在運行時必不可少的資源。但線程可以與同屬與同一進程的其他線程共享進程所擁有的其他資源。線程是屬於進程的,線程運行在進程空間內,同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。
協程,又稱微線程,線程是系統級別的它們由操作系統調度,而協程則是程序級別的由程序根據需要自己調度。在一個線程中會有很多函數,我們把這些函數稱為子程序,在子程序執行過程中可以中斷去執行別的子程序,而別的子程序也可以中斷回來繼續執行之前的子程序,這個過程就稱為協程。也就是說在同一線程內一段代碼在執行過程中會中斷然后跳轉執行別的代碼,接着在之前中斷的地方繼續開始執行,類似與yield操作。協程是一中多任務實現方式,它不需要多個進程或線程就可以實現多任務。
multiprocessing是python的多進程管理包。
threading 模塊建立在 _thread 模塊之上。_thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊通過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。
greenlet、gevent(第三方模塊)可以實現協程
python 進程
進程
程序是指令和數據的有序集合,其本身沒有任何運行的含義,是一個靜態的概念。
進程是一個“執行中的程序”,進程的實質是程序的一次執行過程,進程是動態產生,動態消亡的。進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;進程由程序、數據和進程控制塊三部分組成。由於進程間的相互制約,使進程具有執行的間斷性,即進程按各自獨立的、不可預知的速度向前推進
進程調度
先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既可以作業調度,也可以作用域進程調度。FCFS算法比較有利於長作業(進程),而不利於短作業(進程)。由此可知,本算法適合於CPU繁忙型作業,而不利於I/O繁忙型作業(進程)。
短作業(進程)優先調度算法(SJ/PF)是指對短作業或者短進程優先調度的算法,該算法既可以用於作業調度,也可用於進程調度。但其對長作業不利;不能保證緊迫性作業(進程)被及時處理;作業的長短只是被估算出來的。
時間片輪轉(Round Robin,RR)將CPU的處理時間分成固定大小的時間片,如果一個進程在被調度選中之后用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放自己所占有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
進程的並行與並發
並行:並行是指兩者同時執行,比如賽跑,兩個人都在不停的往前跑;(資源夠用,比如三個線程,四核CPU)
並發:並行是指資源有限的情況下,兩者交替輪流使用資源,比如一段路(單核CPU資源)同時只能過一個人,A走一段后,讓給B,B用完繼續給A,交替使用,目的是提高效率。
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不同的程序在執行,這就要求必須有多個處理器。
並發是從宏觀上,在一個時間段上可以看出是同時執行,比如一個服務器同時處理多個session。
同步/異步
同步:所謂同步就是一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列
。要么成功都成功,失敗都失敗,兩個任務的狀態可以保持一致。
異步:所謂異步是不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作,依賴的任務也立即執行,只要自己完成了整個任務就算完成了
。至於被依賴的任務最終是否真正完成,依賴它的任務無法確定,所以它是不可靠的任務序列
。
比如我去銀行辦理業務,可能會有兩種方式: 第一種 :選擇排隊等候; 第二種 :選擇取一個小紙條上面有我的號碼,等到排到我這一號時由櫃台的人通知我輪到我去辦理業務了; 第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務情況; 第二種:后者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)往往注冊一個回調機制,
在所等待的事件被觸發時由觸發機制(在這里是櫃台的人)通過某種機制(在這里是寫在小紙條上的號碼,喊號)找到等待該事件的人。
阻塞/非阻塞
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來說的
不論是排隊還是使用號碼等待通知,如果在這個等待的過程中,等待者除了等待消息通知之外不能做其它的事情,那么該機制就是阻塞的,表現在程序中,也就是該程序一直阻塞在
該函數調用處不能繼續往下執行。相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,因為他(等待者)沒有阻塞在這個消息
通知上,而是一邊做自己的事情一邊等待。 注意:同步非阻塞形式實際上是效率低下的,想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有。如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個
程序需要在這兩種不同的行為之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)
的事情,程序沒有在兩種不同的操作中來回切換。
同步/異步與阻塞/非阻塞
(1)同步阻塞形式
效率最低。拿上面的舉例來說,就是你專心排隊,什么別的事都不做
(2)異步阻塞形式
如果在銀行等待辦理業務的人采用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間里他不能離開銀行去做其它的事情,那么很顯然,這個人被阻塞在了這個等待的操作上面;異步操作也可以被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知書時被阻塞。
(3)同步非阻塞形式
實際上是效率低下的。想象一下你一邊打着電話一邊還需要抬頭看到底隊伍排到你了沒有,如果把打電話和觀察排隊的位置看成是程序的兩個操作的話,這個程序需要在這兩種不同的行為之間來回的切換
,效率可想而知是低下的。
(4)異步非阻塞形式
效率更高,因為打電話是你(等待者)的事情,而通知你則是櫃台(消息觸發機制)的事情,程序沒有在兩種不同的操作中來回切換。
比如說,這個人突然發覺自己煙癮犯了,需要出去抽根煙,於是他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那么他就沒有被阻塞在這個等待的操作上面,自然這個就是異步+非阻塞的方式了。
很多人會把同步和阻塞混淆,是因為很多時候同步操作會以阻塞的形式表現出來
很多人也會把異步和非阻塞混淆,因為異步操作一般都不會在真正的IO操作處被阻塞
。
1、multiprocessing模塊
python中的多線程無法利用多核優勢,如果想要充分的使用CPU資源(os.cpu_count()查看),在python中大部分情況需要使用多進程。Python中提供了multiprocess模塊。multiprocess中幾乎包含了和進程有關的所有子模塊。大致分為四個部分:創建進程部分、進程同步部分、進程池部分、進程之間數據共享。multiprocessing常用組件及功能:
1.1、管理進程模塊:
- Process(用於創建進程模塊)
- Pool(用於創建管理進程池)
- Queue(用於進程通信,資源共享)
- Value,Array(用於進程通信,資源共享)
- Pipe(用於管道通信)
- Manager(用於資源共享)
1.2、同步子進程模塊:
- Condition
- Event
- Lock
- RLock
- Semaphore
2、Array,Value---共享數據
如果你真有需要共享數據, multiprocessing提供了兩種方式。
(1)multiprocessing,Array,Value
數據可以用Value或Array存儲在一個共享內存地圖里,如下:
from multiprocessing importArray,Value,Process def func(a,b): a.value = 3.333333333333333 for i in range(len(b)): b[i] = -b[i] if __name__ == "__main__": num = Value('d',0.0) arr = Array('i',range(11)) c = Process(target=func,args=(num,arr)) d= Process(target=func,args=(num,arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print(i)<br> #輸出: #3.1415927 #[0, -1, -2,-3, -4, -5, -6, -7, -8, -9]
創建num和arr時,“d”和“i”參數由Array模塊使用的typecodes創建:“d”表示一個雙精度的浮點數,“i”表示一個有符號的整數,這些共享對象將被線程安全的處理。
Array(‘i’, range(10))中的‘i’參數:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
(2)Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。
from multiprocessing importProcess,Manager def f(d,l): d["name"] = "zhangyanlin" d["age"] = 18 d["Job"] = "pythoner" l.reverse() if __name__ == "__main__": with Manager() as man: d = man.dict() l = man.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l) #輸出: #{0.25: None, 1: '1', '2': 2} #[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
3、multiprocess.Process模塊介紹
Process模塊是一個創建進程的模塊,借助這個模塊,就可以完成進程的創建。
Process([group [, target [, name [, args [, kwargs]]]]]) 強調: 1. 需要使用關鍵字的方式來指定參數 2. args指定的為傳給target函數的位置參數,是一個元組形式,必須有逗號
方法介紹:
1 p.start():啟動進程,並調用該子進程中的p.run() 2 p.run():進程啟動時運行的方法,正是它去調用target指定的函數,我們自定義類的類中一定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操作,如果p創建了子進程,該子進程就成了僵屍進程,使用該方法需要特別小心這種情況。如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖 4 p.is_alive():如果p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的進程,而不能join住run開啟的進程
屬性介紹:
1 p.daemon:默認值為False,如果設為True,代表p為后台運行的守護進程。設定為True后,p不能創建自己的新進程,必須在p.start()之前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時為None、如果為–N,表示被信號N結束(了解即可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是為涉及網絡連接的底層進程間通信提供安全性(了解即可)
Process模塊創建並開啟子進程的兩種方式
#方法一; import os from multiprocessing import Process def func1(name): print('hello', name) print("我是子進程: %d;我的父進程id是:%d" % (os.getpid(), os.getppid())) def func2(): print('hello') if __name__ == '__main__': p1 = Process(target=func1, args=('xiaobai',)) # 此處傳參必須是元組數據類型 p1.start() print("我是父進程:%d" % os.getpid()) p2 = Process(target=func2) p2.start() ''' # 執行結果 我是父進程:12612 hello xiaobai 我是子進程: 5760; 我的父進程id是:12612 '''
# 方法二:# 通過繼承Process類的形式開啟進程的方式 import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): #固定名字run !!! print(os.getpid()) print('%s 正在和女神聊天' % self.name) if __name__ == '__main__': p1 = MyProcess('xiaobai') p2 = MyProcess('xiaohei') p1.start() # start會自動調用run方法 p2.start() # 說明:如果需要傳參,必須寫入到__init__方法里面,且必須加上super().__init__();因為父類Process里面也有__init__方法。
Process對象的join方法
import time from multiprocessing import Process def func(name): print("hello", name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=func, args=('xiaobai',)) p.start() p.join() # 加上join方法后,父進程就會阻塞等待子進程結束而結束。 print("父進程")
Process開啟多進程
多個進程同事運行(注意,子進程的執行順序不是根據自動順序決定的)
import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(1) if __name__ == '__main__': for i in range(10): p = Process(target=func, args=(i,)) p.start()
import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(0.1) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) p.join() print("父進程執行中")
進程之間的數據隔離問題
from multiprocessing import Process n = 100 #在windows系統中把全局變量定義在if __name__ == '__main__'之上就可以了 def work(): global n n = 0 print("子進程內:", n) if __name__ == '__main__': p = Process(target=work) p.start() print("主進程內:", n)
守護進程
主進程創建守護進程,守護進程會隨着主進程的結束而結束。守護進程內無法再開啟子進程,否則拋出異常:AssertionError: daemonic processes are not allowed to have children
import time from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止. #可能p1執行的打印信息任務會因為主進程打印(main----)被終止.
socket聊天並發實例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程一定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多進程實現socket聊天並發-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) 使用多進程實現socket聊天並發-client
進程同步(鎖)multiprocess.Lock
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
# 多進程搶占輸出資源 import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 執行結果 """ 0: 14316 is running 1: 9900 is running 2: 10056 is running 1: 9900 is done 2: 10056 is done 0: 14316 is done """
# 使用鎖維護執行順序 import os import time import random from multiprocessing import Process, Lock def work(lock, n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(lock, i)) p.start() # 執行結果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,沒錯,加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,即串行修改,速度是慢了,但犧牲了速度卻保證了數據的安全性。因此我們最好找尋一種解決方案能夠兼顧:1、效率高(多個進程共享一塊內存的數據)2、幫我們處理好鎖問題,這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道
隊列和管道都是將數據存放於內存中,隊列又是基於(管道+鎖)實現的,可以讓我們從復雜的鎖問題中解脫出來,我們應該盡可能使用消息傳遞和隊列,避免處理復雜的同步和鎖問題,而且在進程數目增多時,往往可以獲得更好的可擴展性。
4、進程池(Using a pool of workers)
Pool類描述了一個工作進程池,他有幾種不同的方法讓任務卸載工作進程。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。我們可以用Pool類創建一個進程池, 展開提交的任務給進程池。 例:
#apply from multiprocessing import Pool import time def f1(i): time.sleep(0.5) print(i) return i + 100 if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply(func=f1,args=(i,)) #apply_async def f1(i): time.sleep(0.5) print(i) return i + 100 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply_async(func=f1,args=(i,),callback=f2) pool.close() pool.join()
一個進程池對象可以控制工作進程池的哪些工作可以被提交,它支持超時和回調的異步結果,有一個類似map的實現。
- processes :使用的工作進程的數量,如果processes是None那么使用 os.cpu_count()返回的數量。
- initializer: 如果initializer是None,那么每一個工作進程在開始的時候會調用initializer(*initargs)。
- maxtasksperchild:工作進程退出之前可以完成的任務數,完成后用一個心的工作進程來替代原進程,來讓閑置的資源被釋放。maxtasksperchild默認是None,意味着只要Pool存在工作進程就會一直存活。
- context: 用在制定工作進程啟動時的上下文,一般使用 multiprocessing.Pool() 或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context
注意:Pool對象的方法只可以被創建pool的進程所調用。
進程池的方法
-
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,由於這個原因,apply_async()更適合並發執行,另外,func函數僅被pool中的一個進程運行。
-
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : 是apply()的一個變體,會返回一個結果對象。如果callback被指定,那么callback可以接收一個參數然后被調用,當結果准備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被立即完成,否則處理結果的線程會被阻塞。
-
close() : 阻止更多的任務提交到pool,待任務完成后,工作進程會退出。
-
terminate() : 不管任務是否完成,立即停止工作進程。在對pool對象進程垃圾回收的時候,會立即調用terminate()。
-
join() : wait工作線程的退出,在調用join()前,必須調用close() or terminate()。這樣是因為被終止的進程需要被父進程調用wait(join等價與wait),否則進程會成為僵屍進程。
-
map(func, iterable[, chunksize])
-
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
-
imap(func, iterable[, chunksize])
-
imap_unordered(func, iterable[, chunksize])
-
starmap(func, iterable[, chunksize])
-
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
python 線程
1、threading模塊
threading 模塊建立在 _thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊通過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。Thread方法:
t.start() : 激活線程,
t.getName() : 獲取線程的名稱
t.setName() : 設置線程的名稱
t.name : 獲取或設置線程的名稱
t.is_alive() : 判斷線程是否為激活狀態
t.isAlive() :判斷線程是否為激活狀態
t.setDaemon() 設置為后台線程或前台線程(默認:False);通過一個布爾值設置線程是否為守護線程,必須在執行start()方法之后才可以使用。如果是后台線程,主線程執行過程中,后台線程也在進行,主線程執行完畢后,后台線程不論成功與否,均停止;如果是前台線程,主線程執行過程中,前台線程也在進行,主線程執行完畢后,等待前台線程也執行完成后,程序停止
t.isDaemon() : 判斷是否為守護線程
t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法之后該屬性才有效,否則它只返回None。
t.join() :逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義
t.run() :線程被cpu調度后自動執行線程對象的run方法
from threading import Thread from threading import currentThread # 獲取當前線程對象的 對象 import time def task(): print('%s is runing' %currentThread().getName()) # 獲取線程名 time.sleep(2) print('%s is down' % currentThread().getName()) if __name__ == '__main__': t = Thread(target=task, name='這里設置子線程初始化名') t.start() t.setName('設置線程名') # !!!! t.join() # 等待子線程運行結束 # currentThread() 等同於 線程對象t 所以獲取線程名也可以t.getName() print('主線程', currentThread().getName()) # 但在主線程內(並沒有線程對象)要獲取線程名必須用 currentThread().getName() t.isAlive() # 線程是否存活! 查看線程對象是否存活
python 線程的兩種開啟方法
#方法1 from threading import Thread # 創建線程的模塊 def task(name): print(name) if __name__ == '__main__': # 開啟線程 參數1:方法名(不要帶括號) 參數2:參數(元祖) 返回對象 p = Thread(target=task, args=('線程1',)) p.start() # 只是給操作系統發送了一個就緒信號,並不是執行。操作系統接收信號后安排cpu運行 print('主') #方法2 - 類的方法 from threading import Thread # 創建線程的模塊 class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 固定名字run !!!必須用固定名 print(self.name) if __name__ == '__main__': # 必須要這樣啟動 p = MyThread('子線程1') p.start() print('主)
2、線程鎖threading.RLock和threading.Lock
由於線程之間是進行隨機調度,並且每個線程可能只執行n條執行之后,CPU接着執行其他線程。為了保證數據的准確性,引入了鎖的概念。所以可能出現如下問題:
例:假設列表A的所有元素就為0,當一個線程從前向后打印列表的所有元素,另外一個線程則從后向前修改列表的元素為1,那么輸出的時候,列表的元素就會一部分為0,一部分為1,這就導致了數據的不一致。鎖的出現解決了這個問題。
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 獲得鎖 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 釋放鎖 for i in range(10): t =threading.Thread(target=Func) t.start()
RLock允許在同一線程中被多次acquire。而Lock卻不允許這種情況。 如果使用RLock,那么acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所占用的瑣。
3、threading.Event
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那么當程序執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那么event.wait 方法時便不再阻塞。
- clear:將“Flag”設置為False
- set:將“Flag”設置為True
- Event.isSet() :判斷標識位是否為Ture。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
當線程執行的時候,如果flag為False,則線程會阻塞,當flag為True的時候,線程不會阻塞。它提供了本地和遠程的並發性。
5、threading.Condition
Condition類實現了一個conditon變量。 這個conditiaon變量允許一個或多個線程等待,直到他們被另一個線程通知。 如果lock參數,被給定一個非空的值,,那么他必須是一個lock或者Rlock對象,它用來做底層鎖。否則,會創建一個新的Rlock對象,用來做底層鎖。
wait(timeout=None) : 等待通知,或者等到設定的超時時間。當調用這wait()方法時,如果調用它的線程沒有得到鎖,那么會拋出一個RuntimeError 異常。 wati()釋放鎖以后,在被調用相同條件的另一個進程用notify() or notify_all() 叫醒之前 會一直阻塞。wait() 還可以指定一個超時時間。
如果有等待的線程,notify()方法會喚醒一個在等待conditon變量的線程。notify_all() 則會喚醒所有在等待conditon變量的線程。 notify()和notify_all()不會釋放鎖,也就是說,線程被喚醒后不會立刻返回他們的wait() 調用。除非線程調用notify()和notify_all()之后放棄了鎖的所有權。
例子: 生產者-消費者模型,
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
5、queue模塊
Queue 就是對隊列,它是線程安全的,,舉例來說,我們去麥當勞吃飯。飯店里面有廚師職位,前台負責把廚房做好的飯賣給顧客,顧客則去前台領取做好的飯。這里的前台就相當於我們的隊列。形成管道樣,廚師做好飯通過前台傳送給顧客,所謂單向隊列
這個模型也叫生產者-消費者模型。
import queue q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,為0 時,表示隊列長度無限制。 q.join() # 等到隊列為kong的時候,在執行別的操作 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列為空的時候,返回True 否則返回False (不可靠) q.full() # 當隊列滿的時候,返回True,否則返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,可以參數block默認為True,表示當隊列滿時,會等待隊列給出可用位置, 為False時為非阻塞,此時如果隊列已滿,會引發queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,過后, 如果隊列無法給出放入item的位置,則引發 queue.Full 異常 q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認為True,表示獲取值的時候,如果隊列為空,則阻塞,為False時,不阻塞, 若此時隊列為空,則引發 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,過后,如果隊列為空,則引發Empty異常。 q.put_nowait(item) # 等效於 put(item,block=False) q.get_nowait() # 等效於 get(item,block=False)
代碼如下:
#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t =threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t =threading.Thread(target=consumer, args=(i,)) t.start()
python 協程
協程存在的意義:對於多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。協程的適用場景:當程序中存在大量不需要CPU的操作時(IO),適用於協程;
1.通過yield實現協程:
import time def A(): while 1: print('------A-----') time.sleep(0.1) yield() def B(): while 1: print('-------B-----') time.sleep(0.1) next(a) a = A() B()
執行結果:
-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
-------B-----
------A-----
···
2.通過greenlet實現協程:
yield能實現協程,不過實現過程不易於理解,greenlet是在這方面做了改進。
from greenlet import greenlet import time def A(): while 1: print('-------A-------') time.sleep(0.5) g2.switch() def B(): while 1: print('-------B-------') time.sleep(0.5) g1.switch() g1 = greenlet(A) #創建協程g1 g2 = greenlet(B) g1.switch() #跳轉至協程g1
執行結果:
-------A-------
-------B-------
-------A-------
-------B-------
-------A-------
···
3.通過gevent實現協程:
greenlet可以實現協程,不過每一次都要人為的去指向下一個該執行的協程,顯得太過麻煩。gevent每次遇到io操作,需要耗時等待時,會自動跳到下一個協程繼續執行。
gevent 是一個第三方庫,可以輕松通過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。gevent會主動識別程序內部的IO操作,當子程序遇到IO后,切換到別的子程序。如果所有的子程序都進入IO,則阻塞。
import gevent def A(): while 1: print('-------A-------') gevent.sleep(1) #用來模擬一個耗時操作,注意不是time模塊中的sleep def B(): while 1: print('-------B-------') gevent.sleep(0.5) #每當碰到耗時操作,會自動跳轉至其他協程 g1 = gevent.spawn(A) # 創建一個協程 g2 = gevent.spawn(B) g1.join() #等待協程執行結束 g2.join()
執行結果:
-------A-------
-------B-------
-------B-------
-------A-------
-------B-------
-------B-------
-------A-------
-------B-------
-------B-------
···
4.協程gevent完成回顯服務器:
import gevent from gevent import monkey,socket monkey.patch_all() #有IO才做時需要這一句 s = socket.socket(2,1) #用的都是gevent模塊中的socket,但用法一樣 s.setsockopt(1,2,1) s.bind(('',8080)) s.listen(1024) def func_accept(): while 1: cs,userinfo = s.accept() print('來了一個客戶'+str(userinfo)) g = gevent.spawn(func_recv,cs) #每當有用戶連接,增加一條協程 def func_recv(cs): while 1: recv_data = cs.recv(1024) print(recv_data) #程誰堵塞了,便會跳轉至其他協程 if len(recv_data) > 0: cs.send(recv_data) else: cs.close() break g1 = gevent.spawn(func_accept) g1.join()