一.python中的GIL(Global Interpreter Lock)
詳情:https://www.cnblogs.com/SuKiWX/p/8804974.html
介紹:
GIL:全局解釋器鎖(Cpython中才有,Jpython沒有,pypy是去gil的);
cpython:pyhon中的一個線程對應C語言中的一個線程;
gil使得同一個時刻只有一個線程在一個cpu上執行字節碼,無法將多個線程映射到多cpu上;
gil在一些情況下會釋放,是結合字節碼和時間片釋放(Python2和Python3有差別),gil在遇到io操作的時候會主動釋放
#gil會釋放,最后的結果不定,釋放的位置不定 import threading total=1 def add(): global total for i in range(1000000): total+=1 def decs(): global total for i in range(1000000): total-=1 thread1=threading.Thread(target=add) thread2=threading.Thread(target=decs) thread1.start() thread2.start() thread1.join() thread2.join() print(total)
注:
Python GIL其實是功能和性能之間權衡后的產物,它尤其存在的合理性,也有較難改變的客觀因素。從本分的分析中,我們可以做以下一些簡單的總結:
-
-
-
- 因為GIL的存在,只有IO Bound場景下得多線程會得到較好的性能
- 如果對並行計算性能較高的程序可以考慮把核心部分也成C模塊,或者索性用其他語言實現
- GIL在較長一段時間內將會繼續存在,但是會不斷對其進行改進
-
-
二.python多線程編程
1.利用Thread實例實現多線程:
這里子線程默認為非守護線程(主線程運行完,子線程不會退出,繼續運行完)
# 對於io操作,多線程和多進程性能差別不大 # 通過Thread實例化 import time import threading def get_detail_html(url): print('我獲取詳情內容了') time.sleep(2) print('我獲取內容完了') def get_detail_url(url): print('我獲取url了') time.sleep(2) print('我獲取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) start_time=time.time() thread1.start() thread2.start() #時間非常小,是運行代碼的時間差,而不是2秒 #這樣運行一共有三個線程,主線程和其他兩個子線程(thread1,thread2),而且是並行的,子線程啟動后,主線程仍然往下運行,因此時間不是2秒 #守護線程(主線程退出,子線程就會kill掉) print('last time:{}'.format(time.time()-start_time))
共三個進程,主線程和兩個子線程
2.守護線程:(主線程退出,子線程就會被kill掉)
import time import threading def get_detail_html(url): print('我獲取詳情內容了') time.sleep(4) print('我獲取內容完了') def get_detail_url(url): print('我獲取url了') time.sleep(2) print('我獲取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) #將線程1設置成守護線程(主線程退出,該線程就會被kill掉),但會等線程2運行完(非守護線程) thread1.setDaemon(True) start_time=time.time() thread1.start() thread2.start() print('last time:{}'.format(time.time()-start_time))
線程1未運行完退出
3.join():等某個子線程執行完在繼續執行主線程代碼:
import time import threading def get_detail_html(url): print('我獲取詳情內容了') time.sleep(4) print('我獲取內容完了') def get_detail_url(url): print('我獲取url了') time.sleep(2) print('我獲取url完了') if __name__=='__main__': thread1=threading.Thread(target=get_detail_html,args=('',)) thread2=threading.Thread(target=get_detail_url,args=('',)) start_time=time.time() thread1.start() thread2.start() #等待兩個線程執行完 thread1.join() thread2.join() print('last time:{}'.format(time.time()-start_time))
執行時間是線程最大時間(並發執行)
4.繼承Thread實現多線程(代碼較復雜時):
import time import threading class GetDetailHtml(threading.Thread): def __init__(self, name): super().__init__(name=name) def run(self): print('我獲取詳情內容了') time.sleep(4) print('我獲取內容完了') class GetDetailUrl(threading.Thread): def __init__(self,name): super().__init__(name=name) def run(self): print('我獲取url了') time.sleep(2) print('我獲取url完了') if __name__=='__main__': thread1=GetDetailHtml('get_detail_html') thread2=GetDetailUrl('get_detail_url') start_time=time.time() thread1.start() thread2.start() #等待兩個線程執行完 thread1.join() thread2.join() print('last time:{}'.format(time.time()-start_time))
三.線程間通信-Queue
1.線程通信方式——共享變量:(全局變量或參數等)
注:共享變量的方式是線程不安全的操作(不推薦)

1 import time 2 import threading 3 url_lists = [] 4 def get_detail_html(): 5 #可以單獨放在某一個文件管理(注意引入時要引用文件) 6 global url_lists 7 url_lists=url_lists 8 while True: 9 if len(url_lists): 10 url=url_lists.pop() 11 print('我獲取詳情內容了') 12 time.sleep(4) 13 print('我獲取內容完了') 14 def get_detail_url(url_lists): 15 while True: 16 print('我獲取url了') 17 time.sleep(2) 18 for i in range(20): 19 url_lists.append('url'+str(i)) 20 print('我獲取url完了') 21 if __name__ == '__main__': 22 thread_url=threading.Thread(target=get_detail_url,args=(url_lists,)) 23 thread_url.start() 24 #開啟十個線程爬取詳情 25 for i in range(10): 26 thread_html=threading.Thread(target=get_detail_html,) 27 thread_html.start()
2.通過queue的方式進行線程同步:
注:是線程安全的(Queue本身就是線程安全的【使用了線程鎖的機制】,使用了雙端隊列,deque)
Queue中的方法:qsize()查看對列大小,empty()判斷隊列是否為空,full()判斷隊列是否滿,滿了的話put方法就會阻塞,等待有空位加入,put()將數據放入隊列,默認是阻塞的(block參數,可以設置成非阻塞,還有timeout等待時間),get()從隊列取數據

1 import time 2 import threading 3 from queue import Queue 4 url_lists = [] 5 def get_detail_html(queue): 6 while True: 7 url=queue.get() 8 print('我獲取詳情內容了') 9 time.sleep(4) 10 print('我獲取內容完了') 11 def get_detail_url(queue): 12 while True: 13 print('我獲取url了') 14 time.sleep(2) 15 for i in range(20): 16 queue.put('url'+str(i)) 17 print('我獲取url完了') 18 if __name__ == '__main__': 19 #設置隊列最大值1000,過大對內存會有很大影響 20 urls_queue=Queue(maxsize=1000) 21 thread_url=threading.Thread(target=get_detail_url,args=(urls_queue,)) 22 thread_url.start() 23 #開啟十個線程爬取詳情 24 for i in range(10): 25 thread_html=threading.Thread(target=get_detail_html,args=(urls_queue,)) 26 thread_html.start() 27 #執行該方法才能執行退出,和join成對出現 28 urls_queue.task_done() 29 urls_queue.join()
四. 線程同步(Lock、RLock、Semaphores、Condition)
問題:如例一中的問題,最后的結果不正確且不穩定,是因為在字節碼運行時,線程隨時可能跳轉,導致賦值不正確,因此需要一個鎖,讓某段代碼運行時,另一代碼段不運行
1.Lock:鎖住的代碼段都只能有一個代碼段運行
獲取(acquire)和釋放(release)鎖都需要時間:因此用鎖會影響性能;還有可能引起死鎖(互相等待,A和B都需要a,b兩個資源,A獲取了a,B獲取了B,A等待b,B等待a或則未釋放鎖再次獲取);
產生死鎖的四個條件:互斥條件;不剝奪條件;請求和保持條件;循環等待條件

1 import threading 2 from threading import Lock 3 total=1 4 lock=Lock() 5 def add(): 6 global total 7 for i in range(1000000): 8 #獲取鎖 9 lock.acquire() 10 total+=1 11 #釋放鎖,釋放后其他才能獲取 12 lock.release() 13 14 def decs(): 15 global total 16 for i in range(1000000): 17 lock.acquire() 18 total-=1 19 lock.release() 20 thread1=threading.Thread(target=add) 21 thread2=threading.Thread(target=decs) 22 thread1.start() 23 thread2.start() 24 thread1.join() 25 thread2.join() 26 print(total)
2.RLock(可重入的鎖):在一個線程中可以,可以連續多次acquire(獲取資源),一定要注意acquire的次數要和release的次數一致
3.Condition:條件變量(用於復雜的線程間同步)
3.1使用鎖進行先后對話:發現先啟動的線程把話先說完(第一個線程啟動后運行完,第二個線程還沒有啟動,或者還未切換到另一個線程)

3.2通過condition實現:
(通過調用with方法(實際是__enter__魔法函數),也可以使用acquire()方法【如self.conf.acquire()】,但記得一定要release()之后才能調用其他函數【wait(),notify()】,還有注意線程啟動順序【先接收方先啟動,否則接收不到】),Condition有兩層鎖,一把地層鎖,會在線程調用了wait()方法時釋放,上面的鎖會在每次調用wait()時分配一把鎖並放入到condition的等待隊列中,等待notify()方法的喚醒
Condition重要函數:acquire(),release()【都調用了Lock的acquire,release】,wait()【允許等待某個通知在操作,會獲取一把鎖並把Condition中的鎖釋放掉】,notify()【發送通知,釋放鎖】

1 import threading 2 3 4 5 class LYQ1(threading.Thread): 6 def __init__(self, cond): 7 super().__init__(name="LYQ1") 8 self.cond = cond 9 10 def run(self): 11 with self.cond: 12 print('LYQ1:你好,我是{}'.format(self.name)) 13 #輸出后發出通知 14 self.cond.notify() 15 self.cond.wait() 16 print('LYQ1:哈哈') 17 self.cond.notify() 18 self.cond.wait() 19 print('LYQ1:嘿嘿') 20 self.cond.notify() 21 22 23 class LYQ2(threading.Thread): 24 def __init__(self, cond): 25 super().__init__(name="LYQ2") 26 self.cond = cond 27 28 def run(self): 29 with self.cond: 30 #等待通知 31 self.cond.wait() 32 print('LYQ2:你好,我是{}'.format(self.name)) 33 self.cond.notify() 34 self.cond.wait() 35 print('LYQ2:好的'.format(self.name)) 36 self.cond.notify() 37 self.cond.wait() 38 print('LYQ2:好久不見') 39 self.cond.notify() 40 41 42 if __name__ == "__main__": 43 cond = threading.Condition() 44 lyq1 = LYQ1(cond=cond) 45 lyq2 = LYQ2(cond=cond) 46 #注意啟動順序,如果先啟動lyq1,發送通知確沒有接收(lyq2還沒有啟動) 47 lyq2.start() 48 lyq1.start() 49 ''' 50 輸出: 51 LYQ1:你好,我是LYQ1 52 LYQ2:你好,我是LYQ2 53 LYQ1:哈哈 54 LYQ2:好的 55 LYQ1:嘿嘿 56 LYQ2:好久不見'''
4.Semaphores:(有一個參數value可以控制線程(並發數),調用acquire方法value就會減一,如果減少到為0就會阻塞在那兒等待有空位,調用release()value就會加一)【線程數量過多會影響切換線程的效率】
Semaphores內部實質是用Condition完成的,Queue實質也是;
用來控制進入數量的鎖(如文件寫一般只能一個線程,讀可以允許同時多個線程讀。)

1 # 控制線程的數量 2 from threading import Semaphore 3 import threading 4 import time 5 6 7 class UrlProducer(threading.Thread): 8 def __init__(self, sem): 9 super().__init__() 10 self.sem = sem 11 12 def run(self): 13 for i in range(20): 14 #調用acquire方法,Semaphore中的value就會減一(value),如果為0就阻塞在這兒 15 self.sem.acquire() 16 html_get = HtmlGet('url' + str(i),sem) 17 html_get.start() 18 19 20 class HtmlGet(threading.Thread): 21 def __init__(self, url,sem): 22 super().__init__() 23 self.url = url 24 self.sem=sem 25 26 def run(self): 27 time.sleep(2) 28 print('獲取網頁成功') 29 #調用release方法,Semaphore中的value就會加一(value) 30 self.sem.release() 31 32 33 if __name__ == '__main__': 34 #允許並發的個數 35 sem=Semaphore(3) 36 urlproducer = UrlProducer(sem) 37 urlproducer.start()
五.concurrent線程池編碼
1.為什么需要線程池:
提供了數量控制,獲取線程的狀態及返回值;當一個線程完成的時候主線程能立即知道;futures能讓多線程和多進程編碼接口一致
2.ThreadPoolExecutor中重要函數:
submit():通過submit函數提交執行的函數到線程池,立即返回值(不會阻塞);
done():done()判斷某個任務是否執行成功;
result():獲取返回值;
cance():取消某個任務(還未執行,執行中不能取消);
wait()讓主線程阻塞等待子線程完成,可以添加參數等待多長時間就不等待了
3.獲取已經完成的任務:
as_completed() [from concurrent.futures import ThreadPoolExecutor,as_completed];
map(屬於ThreadPoolExecutor)

1 #as_completed會將成功的url的返回值yield出去 2 from concurrent.futures import ThreadPoolExecutor,as_completed,wait 3 import time 4 def get_html(times): 5 time.sleep(times) 6 print('get success {}'.format(times)) 7 return times 8 9 excutor=ThreadPoolExecutor(max_workers=2) 10 # #通過submit函數提交執行的函數到線程池,立即返回值(不會阻塞) 11 # ret1=excutor.submit(get_html,(3)) 12 # #done()判斷函數是否執行成功 13 # print(ret1.done()) 14 # print(ret1.cancel()) 15 # ret2=excutor.submit(get_html,(2)) 16 # print(ret2.done()) 17 # #result()獲取返回值 18 # print(ret1.result()) 19 urls=[2,3,4] 20 #獲取已經完成的任務 21 all_task=[excutor.submit(get_html,(url)) for url in urls] 22 #wait(讓主線程阻塞等待子線程完成),可以 23 wait(all_task,return_when='FIRST_COMPLETED') 24 print('haha') 25 # for future in as_completed(all_task): 26 # data=future.result() 27 # print(data) 28 # for data in excutor.map(get_html,urls): 29 # print(data)
4:ThreadPoolExecutor源碼簡介:
submit方法會加一把鎖,創建Future,然后放入WorkItem(執行單元),將執行單元放入執行隊列中,_adjust_thread_count()會比較啟動得線程數量和最大線程數量,如果小於就會啟動一個線程放入_threads里面,_threads里面執行的是_worker()函數,參數為work_que,獲取WorkItem,然后調用里面的run()方法,將值設置到future中去。
Future中的方法,cancel(),done()等等都是判斷狀態,result()是會阻塞的,調用Condition()
六.多進程編程-multiprocessing
1.和多線程對比:
·1.1多進程開銷大,多線程開銷小;
1.2耗CPU的操作,多進程編程比多線程編程好很多,對於IO操作來說,使用多線程操作比多進程好(線程切換比進程切換性能高)
2.例:
1.1對於耗CPU的操作(多進程優於多線程):

1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed 2 import time 3 4 def fib(n): 5 if n<=2: 6 return 1 7 return fib(n-2)+fib(n-1) 8 if __name__=='__main__': 9 #代碼要放在這里面,不然可能拋異常 10 with ThreadPoolExecutor(3) as excutor: 11 start_time=time.time() 12 all_task=[excutor.submit(fib,num) for num in range(25,40)] 13 for future in as_completed(all_task): 14 data=future.result() 15 print('結果:'+str(data)) 16 print('多線程所需時間:'+str(time.time()-start_time)) 17 ''' 18 多線程所需時間:72.10901117324829 19 ''' 20 21 with ProcessPoolExecutor(3) as excutor: 22 start_time=time.time() 23 all_task=[excutor.submit(fib,num) for num in range(25,40)] 24 for future in as_completed(all_task): 25 data=future.result() 26 print('結果:'+str(data)) 27 print('多進程所需時間:'+str(time.time()-start_time)) 28 ''' 29 多進程所需時間:43.14996862411499 30 '''
1.2對於IO操作,多線程由於多進程:

1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed 2 import time 3 4 def random_sleep(n): 5 time.sleep(n) 6 return n 7 if __name__=='__main__': 8 #代碼要放在這里面,不然可能拋異常 9 with ThreadPoolExecutor(3) as excutor: 10 start_time=time.time() 11 all_task=[excutor.submit(random_sleep,num) for num in [2]*30] 12 for future in as_completed(all_task): 13 data=future.result() 14 print('休息:'+str(data)+'秒') 15 print('多線程所需時間:'+str(time.time()-start_time)) 16 ''' 17 多線程所需時間:20.010841131210327 18 ''' 19 20 with ProcessPoolExecutor(3) as excutor: 21 start_time=time.time() 22 all_task=[excutor.submit(random_sleep,num) for num in [2]*30] 23 for future in as_completed(all_task): 24 data=future.result() 25 print('休息:'+str(data)+'秒') 26 print('多進程所需時間:'+str(time.time()-start_time)) 27 ''' 28 20.755817651748657 29 '''
3.進程有趣的例子:

1 import os 2 import time 3 #fork只能用於Linux/Unix 4 #進程間數據是隔離的,每個進程都有完整的數據,fork()會將父進程代碼復制一遍在運行(fork后的代碼) 5 pid=os.fork() 6 print('LYQ') 7 if pid==0: 8 print('子進程:{},父進程:{}'.format(os.getpid(),os.getppid())) 9 else: 10 print('我是父進程:{}'.format(pid)) 11 #暫停兩秒,父進程還沒有退出,子進程可以運行完,父進程退出就可以kill掉子進程 12 #不暫停的話,父進程退出,子進程仍然在運行,輸出 13 time.sleep(2) 14 ''' 15 暫停的輸出: 16 LYQ 17 我是父進程:8587 18 LYQ 19 子進程:8587,父進程:8578 20 21 不暫停的輸出: 22 LYQ 23 我是父進程:8587 24 [root@izwz97n253zzwjtudbqt5uz ~]# LYQ 25 子進程:8587,父進程:1 26 '''
4.multiprocessing:(比ProcessPoolExecutor更底層【基於multiprocessing實現】,推薦ProcessPoolExecutor更好的設計,和ThreadPoolExecutor相似):

1 import time 2 import multiprocessing 3 def get_html(n): 4 time.sleep(n) 5 print('sub_process success') 6 return n 7 8 if __name__=='__main__': 9 process=multiprocessing.Process(target=get_html,args=(1,)) 10 #獲取進程號,沒有start之前為None 11 print(process.pid) 12 process.start() 13 print(process.pid) 14 process.join() 15 print('main_process success')
5進程池:

1 ...... 2 pool=multiprocessing.Pool(3) 3 #異步提交任務 4 # result=pool.apply_async(get_html,args=(2,)) 5 # #關閉不在進入進程池 6 # pool.close() 7 # pool.join() 8 # print(result.get()) 9 #和執行順序一樣 10 for result in pool.imap(get_html,[1,5,3]): 11 print('{} sleep success'.format(result)) 12 #和先后完成順序一樣 13 for result in pool.imap_unordered(get_html, [1, 5, 3]): 14 print('{} sleep success'.format(result))
七.進程間通信
1.共享全局變量在多進程中不適用(會把數據復制到子進程中,數據是獨立的,修改也不會影響),quue中的Queue也不行,需要做一些處理

1 from multiprocessing import Queue,Process 2 import time 3 def producer(queue): 4 queue.put('a') 5 time.sleep(2) 6 7 def consumer(queue): 8 time.sleep(2) 9 data=queue.get() 10 print(data) 11 12 if __name__=='__main__': 13 queue=Queue(10) 14 pro_producer=Process(target=producer,args=(queue,)) 15 pro_consumer=Process(target=consumer,args=(queue,)) 16 pro_producer.start() 17 pro_consumer.start() 18 pro_producer.join() 19 pro_consumer.join()
2.multiprocessing中的Queue不能用於進程池(需要用到manager):
queue=Manager().Queue(10)

1 from queue import Queue——>用於多線程 2 from multiprocessing import Queue——>用於非進程池的多進程通信 3 from multiprocessing import Manager——>manager.Queue()用於進程池通信
3.通過Pipe進行進程間通信(管道),pipe只能適用於兩個進程 ,Pipe性能高於queue

1 from multiprocessing import Pipe 2 import time 3 def producer(pipe): 4 pipe.send('a') 5 time.sleep(2) 6 7 def consumer(pipe): 8 time.sleep(2) 9 data=pipe.recv() 10 print(data) 11 12 if __name__=='__main__': 13 #通過Pipe進行進程間通信(管道),pipe只能適用於兩個進程 14 recv_pipe,send_pipe=Pipe() 15 queue=Manager().Queue(10) 16 pro_producer=Process(target=producer,args=(send_pipe,)) 17 pro_consumer=Process(target=consumer,args=(recv_pipe,)) 18 pro_producer.start() 19 pro_consumer.start() 20 pro_producer.join() 21 pro_consumer.join()
4.進程間共享內存(Manager):

1 from multiprocessing import Manager,Process 2 3 def add_data(pro_dict, key, value): 4 pro_dict[key] = value 5 6 if __name__=='__main__': 7 #常用的類型都有 8 9 process_dict=Manager().dict() 10 fir=Process(target=add_data,args=(process_dict,'name1','LYQ1')) 11 sed = Process(target=add_data, args=(process_dict, 'name2', 'LYQ2')) 12 fir.start() 13 sed.start() 14 fir.join() 15 sed.join() 16 print(process_dict)