一 進程與線程的概念
1.1 進程
1.2 線程
1.3 進程與線程的關系
1.4 並行和並發
1.5 同步與異步
二 threading模塊
2.1 線程對象的創建
2.1.1 Thread類直接創建
import threading import time def countNum(n): # 定義某個線程要運行的函數 print("running on number:%s" %n) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=countNum,args=(23,)) #生成一個線程實例 t2 = threading.Thread(target=countNum,args=(34,)) t1.start() #啟動線程 t2.start() print("ending!")
2.1.2 Thread類繼承式創建
#繼承Thread式創建 import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num=num def run(self): print("running on number:%s" %self.num) time.sleep(3) t1=MyThread(56) t2=MyThread(78) t1.start() t2.start() print("ending")
2.2 Thread類的實例方法
2.2.1 join()和setDaemon()
# join():在子線程完成運行之前,這個子線程的父線程將一直被阻塞。 # setDaemon(True): ''' 將線程聲明為守護線程,必須在start() 方法調用之前設置,如果不設置為守護線程程序會被無限掛起。 當我們在程序運行中,執行一個主線程,如果主線程又創建一個子線程,主線程和子線程 就分兵兩路,分別運行,那么當主線程完成 想退出時,會檢驗子線程是否完成。如果子線程未完成,則主線程會等待子線程完成后再退出。但是有時候我們需要的是只要主線程 完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦''' import threading from time import ctime,sleep import time def Music(name): print ("Begin listening to {name}. {time}".format(name=name,time=ctime())) sleep(3) print("end listening {time}".format(time=ctime())) def Blog(title): print ("Begin recording the {title}. {time}".format(title=title,time=ctime())) sleep(5) print('end recording {time}'.format(time=ctime())) threads = [] t1 = threading.Thread(target=Music,args=('FILL ME',)) t2 = threading.Thread(target=Blog,args=('',)) threads.append(t1) threads.append(t2) if __name__ == '__main__': #t2.setDaemon(True) for t in threads: #t.setDaemon(True) #注意:一定在start之前設置 t.start() #t.join() #t1.join() #t2.join() # 考慮這三種join位置下的結果? print ("all over %s" %ctime())

daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False. The entire Python program exits when no alive non-daemon threads are left. 當daemon被設置為True時,如果主線程退出,那么子線程也將跟着退出, 反之,子線程將繼續運行,直到正常退出。
2.2.2 其它方法
Thread實例對象的方法
# isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啟動后、結束前,不包括啟動前和終止后的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
2.3 GIL(全局解釋器鎖)
'''
定義: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) '''
Python中的線程是操作系統的原生線程,Python虛擬機使用一個全局解釋器鎖(Global Interpreter Lock)來互斥線程對Python虛擬機的使用。為了支持多線程機制,一個基本的要求就是需要實現不同線程對共享資源訪問的互斥,所以引入了GIL。
GIL:在一個線程擁有了解釋器的訪問權之后,其他的所有線程都必須等待它釋放解釋器的訪問權,即使這些線程的下一條指令並不會互相影響。
在調用任何Python C API之前,要先獲得GIL
GIL缺點:多處理器退化為單處理器;優點:避免大量的加鎖解鎖操作
2.3.1 GIL的早期設計
2.3.2 GIL的影響
計算密集型:

2.3.3 解決方案
2.4 同步鎖 (Lock)
import time import threading def addNum(): global num #在每個線程中都獲取這個全局變量 #num-=1 temp=num time.sleep(0.1) num =temp-1 # 對此公共變量進行-1操作 num = 100 #設定一個共享變量 thread_list = [] for i in range(100): t = threading.Thread(target=addNum) t.start() thread_list.append(t) for t in thread_list: #等待所有線程執行完畢 t.join() print('Result: ', num)
鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完后,再調用release方法釋放鎖:
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操作 ''' R.release()
擴展思考

2.5 死鎖與遞歸鎖
2.6 Event對象
2.7 Semaphore(信號量)
import threading import time semaphore = threading.Semaphore(5) def func(): if semaphore.acquire(): print (threading.currentThread().getName() + ' get semaphore') time.sleep(2) semaphore.release() for i in range(20): t1 = threading.Thread(target=func) t1.start()
應用:連接池
思考:與Rlock的區別?
2.8 隊列(queue)
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
2.8.1 get與put方法
''' 創建一個“隊列”對象 import Queue q = Queue.Queue(maxsize = 10) Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數 maxsize來設定隊列長度。如果maxsize小於1就表示隊列長度無限。 將一個值放入隊列中 q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值; 第二個block為可選參數,默認為 1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0, put方法將引發Full異常。 將一個值從隊列中取出 q.get() 調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數為block,默認為True。如果隊列為空且 block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列將引發Empty異常。 '''
2.8.2 join與task_done方法
''' join() 阻塞進程,直到所有任務完成,需要配合另一個方法task_done。 def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait() task_done() 表示某個任務完成。每一條get語句后需要一條task_done。 import queue q = queue.Queue(5) q.put(10) q.put(20) print(q.get()) q.task_done() print(q.get()) q.task_done() q.join() print("ending!") '''
2.8.3 其他常用方法
''' 此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回隊列的大小 q.empty() 如果隊列為空,返回True,反之False q.full() 如果隊列滿了,返回True,反之False q.full 與 maxsize 大小對應 q.get([block[, timeout]]) 獲取隊列,timeout等待時間 q.get_nowait() 相當q.get(False)非阻塞
q.put(item) 寫入隊列,timeout等待時間 q.put_nowait(item) 相當q.put(item, False) q.task_done() 在完成一項工作之后,q.task_done() 函數向任務已經完成的隊列發送一個信號 q.join() 實際上意味着等到隊列為空,再執行別的操作 '''
2.8.4 其他模式
''' Python Queue模塊有三種隊列及構造函數: 1、Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) 2、LIFO類似於堆,即先進后出。 class queue.LifoQueue(maxsize) 3、還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) import queue #先進后出 q=queue.LifoQueue() q.put(34) q.put(56) q.put(12) #優先級 q=queue.PriorityQueue() q.put([5,100]) q.put([7,200]) q.put([3,"hello"]) q.put([4,{"name":"alex"}]) while 1: data=q.get() print(data) '''
2.8.5 生產者消費者模型
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大於生產者,那么消費者就必須等待生產者。為了解決這個問題於是引入了生產者和消費者模式。
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當於一個緩沖區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師做好菜,不需要直接和客戶交流,而是交給前台,而客戶去飯菜也不需要不找廚師,直接去前台領取即可,這也是一個結耦的過程。
import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 #q.task_done() #q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() #q.task_done() #q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) else: print("-----no baozi anymore----") count +=1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
三 multiprocessing模塊
# Process類調用 from multiprocessing import Process import time def f(name): print('hello', name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('alvin:%s'%i,)) p_list.append(p) p.start() for i in p_list: p.join() print('end') # 繼承Process類調用 from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() # self.name = name def run(self): print ('hello', self.name,time.ctime()) time.sleep(1) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
3.2 process類
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前還沒有實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程准備就緒,等待CPU調度
run():strat()調用run方法,如果實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():不管任務是否完成,立即停止工作進程
屬性:
daemon:和線程的setDeamon功能一樣
name:進程名字。
pid:進程號。
from multiprocessing import Process import os import time def info(name): print("name:",name) print('parent process:', os.getppid()) print('process id:', os.getpid()) print("------------------") time.sleep(1) def foo(name): info(name) if __name__ == '__main__': info('main process line') p1 = Process(target=info, args=('alvin',)) p2 = Process(target=foo, args=('egon',)) p1.start() p2.start() p1.join() p2.join() print("ending")
通過tasklist(Win)或者ps -elf |grep(linux)命令檢測每一個進程號(PID)對應的進程名
3.3 進程間通訊
3.3.1 進程對列Queue
from multiprocessing import Process, Queue import queue def f(q,n): #q.put([123, 456, 'hello']) q.put(n*n+1) print("son process",id(q)) if __name__ == '__main__': q = Queue() #try: q=queue.Queue() print("main process",id(q)) for i in range(3): p = Process(target=f, args=(q,i)) p.start() print(q.get()) print(q.get()) print(q.get())
3.3.2 管道(pipe)
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name":"yuan"}, 'hello']) response=conn.recv() print("response",response) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" parent_conn.send("兒子你好!") p.join()
Pipe()返回的兩個連接對象代表管道的兩端。 每個連接對象都有send()和recv()方法(等等)。 請注意,如果兩個進程(或線程)嘗試同時讀取或寫入管道的同一端,管道中的數據可能會損壞。
3.3.3 manager
Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另一個進程的數據。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
from multiprocessing import Process, Manager def f(d, l,n): d[n] = n d["name"] ="alvin" l.append(n) #print("l",l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(5)) p_list = [] for i in range(10): p = Process(target=f, args=(d,l,i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) print(l)
3.4 進程池
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進進程,那么程序就會等待,直到進程池中有可用進程為止。
from multiprocessing import Pool import time def foo(args): time.sleep(1) print(args) if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=foo, args= (i,)) p.close() # 等子進程執行完畢后關閉線程池 # time.sleep(2) # p.terminate() # 立刻關閉線程池 p.join()
進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有以下幾個主要方法:
- apply:從進程池里取一個進程並執行
- apply_async:apply的異步版本
- terminate:立刻關閉線程池
- join:主進程等待所有子進程執行完畢,必須在close或terminate之后
- close:等待所有進程結束后,才關閉線程池
四 協程
協程,又稱微線程,纖程。英文名Coroutine。一句話說明什么是線程:協程是一種用戶態的輕量級線程。
協程擁有自己的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其他地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。因此:
協程能保留上一次調用時的狀態(即所有局部狀態的一個特定組合),每次過程重入時,就相當於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
4.1 yield與協程
import time """ 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。 如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高。 """ # 注意到consumer函數是一個generator(生成器): # 任何包含yield關鍵字的函數都會自動成為生成器(generator)對象 def consumer(): r = '' while True: # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回; # yield指令具有return關鍵字的作用。然后函數的堆棧會自動凍結(freeze)在這一行。 # 當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時, # 就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): # 1、首先調用c.next()啟動生成器 next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 2、然后,一旦生產了東西,通過c.send(n)切換到consumer執行; cr = c.send(n) # 4、produce拿到consumer處理的結果,繼續生產下一條消息; print('[PRODUCER] Consumer return: %s' % cr) # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。 c.close() if __name__=='__main__': # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。 c = consumer() produce(c) ''' result: [PRODUCER] →→ Producing 1... [CONSUMER] ←← Consuming 1... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 2... [CONSUMER] ←← Consuming 2... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 3... [CONSUMER] ←← Consuming 3... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 4... [CONSUMER] ←← Consuming 4... [PRODUCER] Consumer return: 200 OK [PRODUCER] →→ Producing 5... [CONSUMER] ←← Consuming 5... [PRODUCER] Consumer return: 200 OK '''
4.2 greenlet
from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()