多線程、多進程和線程池編程


一.python中的GIL(Global Interpreter Lock)

  詳情:https://www.cnblogs.com/SuKiWX/p/8804974.html

  介紹:

    GIL:全局解釋器鎖(Cpython中才有,Jpython沒有,pypy是去gil的);

    cpython:pyhon中的一個線程對應C語言中的一個線程;

       gil使得同一個時刻只有一個線程在一個cpu上執行字節碼,無法將多個線程映射到多cpu上;
       gil在一些情況下會釋放,是結合字節碼和時間片釋放(Python2和Python3有差別),gil在遇到io操作的時候會主動釋放

#gil會釋放,最后的結果不定,釋放的位置不定
import threading
total=1
def add():
    global total
    for i in range(1000000):
        total+=1

def decs():
    global total
    for i in range(1000000):
        total-=1

thread1=threading.Thread(target=add)
thread2=threading.Thread(target=decs)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)

 

  注:

    Python GIL其實是功能和性能之間權衡后的產物,它尤其存在的合理性,也有較難改變的客觀因素。從本分的分析中,我們可以做以下一些簡單的總結:

        • 因為GIL的存在,只有IO Bound場景下得多線程會得到較好的性能
        • 如果對並行計算性能較高的程序可以考慮把核心部分也成C模塊,或者索性用其他語言實現
        • GIL在較長一段時間內將會繼續存在,但是會不斷對其進行改進

 

二.python多線程編程

  1.利用Thread實例實現多線程:

    這里子線程默認為非守護線程(主線程運行完,子線程不會退出,繼續運行完)

# 對於io操作,多線程和多進程性能差別不大
# 通過Thread實例化

import time
import threading
def get_detail_html(url):
    print('我獲取詳情內容了')
    time.sleep(2)
    print('我獲取內容完了')
def get_detail_url(url):
    print('我獲取url了')
    time.sleep(2)
    print('我獲取url完了')
if __name__=='__main__':
    thread1=threading.Thread(target=get_detail_html,args=('',))
    thread2=threading.Thread(target=get_detail_url,args=('',))
    start_time=time.time()
    thread1.start()
    thread2.start()
    #時間非常小,是運行代碼的時間差,而不是2秒
    #這樣運行一共有三個線程,主線程和其他兩個子線程(thread1,thread2),而且是並行的,子線程啟動后,主線程仍然往下運行,因此時間不是2秒
    #守護線程(主線程退出,子線程就會kill掉)
    print('last time:{}'.format(time.time()-start_time))

 

 

共三個進程,主線程和兩個子線程

  2.守護線程:(主線程退出,子線程就會被kill掉

import time
import threading
def get_detail_html(url):
    print('我獲取詳情內容了')
    time.sleep(4)
    print('我獲取內容完了')
def get_detail_url(url):
    print('我獲取url了')
    time.sleep(2)
    print('我獲取url完了')
if __name__=='__main__':
    thread1=threading.Thread(target=get_detail_html,args=('',))
    thread2=threading.Thread(target=get_detail_url,args=('',))
    #將線程1設置成守護線程(主線程退出,該線程就會被kill掉),但會等線程2運行完(非守護線程)
    thread1.setDaemon(True)
    start_time=time.time()
    thread1.start()
    thread2.start()
    print('last time:{}'.format(time.time()-start_time))

 

線程1未運行完退出

  3.join():等某個子線程執行完在繼續執行主線程代碼:

import time
import threading
def get_detail_html(url):
    print('我獲取詳情內容了')
    time.sleep(4)
    print('我獲取內容完了')
def get_detail_url(url):
    print('我獲取url了')
    time.sleep(2)
    print('我獲取url完了')
if __name__=='__main__':
    thread1=threading.Thread(target=get_detail_html,args=('',))
    thread2=threading.Thread(target=get_detail_url,args=('',))
    start_time=time.time()
    thread1.start()
    thread2.start()
    #等待兩個線程執行完
    thread1.join()
    thread2.join()
    print('last time:{}'.format(time.time()-start_time))

 

執行時間是線程最大時間(並發執行)

  4.繼承Thread實現多線程(代碼較復雜時):

import time
import threading
class GetDetailHtml(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)
    def run(self):
        print('我獲取詳情內容了')
        time.sleep(4)
        print('我獲取內容完了')

class GetDetailUrl(threading.Thread):
    def __init__(self,name):
        super().__init__(name=name)
    def run(self):
        print('我獲取url了')
        time.sleep(2)
        print('我獲取url完了')
if __name__=='__main__':
    thread1=GetDetailHtml('get_detail_html')
    thread2=GetDetailUrl('get_detail_url')
    start_time=time.time()
    thread1.start()
    thread2.start()
    #等待兩個線程執行完
    thread1.join()
    thread2.join()
    print('last time:{}'.format(time.time()-start_time))

三.線程間通信-Queue

  1.線程通信方式——共享變量:(全局變量或參數等)

    注:共享變量的方式是線程不安全的操作(不推薦)

 1 import time
 2 import threading
 3 url_lists = []
 4 def get_detail_html():
 5     #可以單獨放在某一個文件管理(注意引入時要引用文件)
 6     global url_lists
 7     url_lists=url_lists
 8     while True:
 9         if len(url_lists):
10             url=url_lists.pop()
11             print('我獲取詳情內容了')
12             time.sleep(4)
13             print('我獲取內容完了')
14 def get_detail_url(url_lists):
15     while True:
16         print('我獲取url了')
17         time.sleep(2)
18         for i in range(20):
19             url_lists.append('url'+str(i))
20         print('我獲取url完了')
21 if __name__ == '__main__':
22     thread_url=threading.Thread(target=get_detail_url,args=(url_lists,))
23     thread_url.start()
24     #開啟十個線程爬取詳情
25     for i in range(10):
26         thread_html=threading.Thread(target=get_detail_html,)
27         thread_html.start()
View Code

 

  2.通過queue的方式進行線程同步:

    注:是線程安全的(Queue本身就是線程安全的【使用了線程鎖的機制】,使用了雙端隊列,deque)

    Queue中的方法:qsize()查看對列大小,empty()判斷隊列是否為空,full()判斷隊列是否滿,滿了的話put方法就會阻塞,等待有空位加入,put()將數據放入隊列,默認是阻塞的(block參數,可以設置成非阻塞,還有timeout等待時間),get()從隊列取數據

 

 1 import time
 2 import threading
 3 from queue import Queue
 4 url_lists = []
 5 def get_detail_html(queue):
 6     while True:
 7         url=queue.get()
 8         print('我獲取詳情內容了')
 9         time.sleep(4)
10         print('我獲取內容完了')
11 def get_detail_url(queue):
12     while True:
13         print('我獲取url了')
14         time.sleep(2)
15         for i in range(20):
16             queue.put('url'+str(i))
17         print('我獲取url完了')
18 if __name__ == '__main__':
19     #設置隊列最大值1000,過大對內存會有很大影響
20     urls_queue=Queue(maxsize=1000)
21     thread_url=threading.Thread(target=get_detail_url,args=(urls_queue,))
22     thread_url.start()
23     #開啟十個線程爬取詳情
24     for i in range(10):
25         thread_html=threading.Thread(target=get_detail_html,args=(urls_queue,))
26         thread_html.start()
27     #執行該方法才能執行退出,和join成對出現
28     urls_queue.task_done()
29     urls_queue.join()
View Code

 

四. 線程同步(Lock、RLock、Semaphores、Condition)

  問題:如例一中的問題,最后的結果不正確且不穩定,是因為在字節碼運行時,線程隨時可能跳轉,導致賦值不正確,因此需要一個鎖,讓某段代碼運行時,另一代碼段不運行

  1.Lock:鎖住的代碼段都只能有一個代碼段運行

    獲取(acquire)和釋放(release)鎖都需要時間:因此用鎖會影響性能;還有可能引起死鎖(互相等待,A和B都需要a,b兩個資源,A獲取了a,B獲取了B,A等待b,B等待a或則未釋放鎖再次獲取);

    產生死鎖的四個條件:互斥條件;不剝奪條件;請求和保持條件;循環等待條件

 1 import threading
 2 from threading import Lock
 3 total=1
 4 lock=Lock()
 5 def add():
 6     global total
 7     for i in range(1000000):
 8         #獲取鎖
 9         lock.acquire()
10         total+=1
11         #釋放鎖,釋放后其他才能獲取
12         lock.release()
13 
14 def decs():
15     global total
16     for i in range(1000000):
17         lock.acquire()
18         total-=1
19         lock.release()
20 thread1=threading.Thread(target=add)
21 thread2=threading.Thread(target=decs)
22 thread1.start()
23 thread2.start()
24 thread1.join()
25 thread2.join()
26 print(total)
View Code

 

  2.RLock(可重入的鎖):在一個線程中可以,可以連續多次acquire(獲取資源),一定要注意acquire的次數要和release的次數一致

 

   3.Condition:條件變量(用於復雜的線程間同步)

    3.1使用鎖進行先后對話:發現先啟動的線程把話先說完(第一個線程啟動后運行完,第二個線程還沒有啟動,或者還未切換到另一個線程)

View Code

    3.2通過condition實現:

     (通過調用with方法(實際是__enter__魔法函數),也可以使用acquire()方法【如self.conf.acquire()】,但記得一定要release()之后才能調用其他函數【wait(),notify()】,還有注意線程啟動順序【先接收方先啟動,否則接收不到】),Condition有兩層鎖,一把地層鎖,會在線程調用了wait()方法時釋放,上面的鎖會在每次調用wait()時分配一把鎖並放入到condition的等待隊列中,等待notify()方法的喚醒

      Condition重要函數:acquire(),release()【都調用了Lock的acquire,release】,wait()【允許等待某個通知在操作,會獲取一把鎖並把Condition中的鎖釋放掉】,notify()【發送通知,釋放鎖】

 1 import threading
 2 
 3 
 4 
 5 class LYQ1(threading.Thread):
 6     def __init__(self, cond):
 7         super().__init__(name="LYQ1")
 8         self.cond = cond
 9 
10     def run(self):
11         with self.cond:
12             print('LYQ1:你好,我是{}'.format(self.name))
13             #輸出后發出通知
14             self.cond.notify()
15             self.cond.wait()
16             print('LYQ1:哈哈')
17             self.cond.notify()
18             self.cond.wait()
19             print('LYQ1:嘿嘿')
20             self.cond.notify()
21 
22 
23 class LYQ2(threading.Thread):
24     def __init__(self, cond):
25         super().__init__(name="LYQ2")
26         self.cond = cond
27 
28     def run(self):
29         with self.cond:
30             #等待通知
31             self.cond.wait()
32             print('LYQ2:你好,我是{}'.format(self.name))
33             self.cond.notify()
34             self.cond.wait()
35             print('LYQ2:好的'.format(self.name))
36             self.cond.notify()
37             self.cond.wait()
38             print('LYQ2:好久不見')
39             self.cond.notify()
40 
41 
42 if __name__ == "__main__":
43     cond = threading.Condition()
44     lyq1 = LYQ1(cond=cond)
45     lyq2 = LYQ2(cond=cond)
46     #注意啟動順序,如果先啟動lyq1,發送通知確沒有接收(lyq2還沒有啟動)
47     lyq2.start()
48     lyq1.start()
49     '''
50     輸出:
51     LYQ1:你好,我是LYQ1
52     LYQ2:你好,我是LYQ2
53     LYQ1:哈哈
54     LYQ2:好的
55     LYQ1:嘿嘿
56     LYQ2:好久不見'''
View Code

   4.Semaphores:(有一個參數value可以控制線程(並發數),調用acquire方法value就會減一,如果減少到為0就會阻塞在那兒等待有空位,調用release()value就會加一)【線程數量過多會影響切換線程的效率】   

    Semaphores內部實質是用Condition完成的,Queue實質也是;

    用來控制進入數量的鎖(如文件寫一般只能一個線程,讀可以允許同時多個線程讀。

 1 # 控制線程的數量
 2 from threading import Semaphore
 3 import threading
 4 import time
 5 
 6 
 7 class UrlProducer(threading.Thread):
 8     def __init__(self, sem):
 9         super().__init__()
10         self.sem = sem
11 
12     def run(self):
13         for i in range(20):
14             #調用acquire方法,Semaphore中的value就會減一(value),如果為0就阻塞在這兒
15             self.sem.acquire()
16             html_get = HtmlGet('url' + str(i),sem)
17             html_get.start()
18 
19 
20 class HtmlGet(threading.Thread):
21     def __init__(self, url,sem):
22         super().__init__()
23         self.url = url
24         self.sem=sem
25 
26     def run(self):
27         time.sleep(2)
28         print('獲取網頁成功')
29         #調用release方法,Semaphore中的value就會加一(value)
30         self.sem.release()
31 
32 
33 if __name__ == '__main__':
34     #允許並發的個數
35     sem=Semaphore(3)
36     urlproducer = UrlProducer(sem)
37     urlproducer.start()
View Code

五.concurrent線程池編碼

  1.為什么需要線程池:

    提供了數量控制,獲取線程的狀態及返回值;當一個線程完成的時候主線程能立即知道;futures能讓多線程和多進程編碼接口一致

  2.ThreadPoolExecutor中重要函數:

    submit():通過submit函數提交執行的函數到線程池,立即返回值(不會阻塞);

    done():done()判斷某個任務是否執行成功;

    result():獲取返回值;

    cance():取消某個任務(還未執行,執行中不能取消);

    wait()讓主線程阻塞等待子線程完成,可以添加參數等待多長時間就不等待了

  3.獲取已經完成的任務:

    as_completed() [from concurrent.futures import ThreadPoolExecutor,as_completed];

    map(屬於ThreadPoolExecutor)  

  

 1 #as_completed會將成功的url的返回值yield出去
 2 from concurrent.futures import ThreadPoolExecutor,as_completed,wait
 3 import time
 4 def get_html(times):
 5     time.sleep(times)
 6     print('get success {}'.format(times))
 7     return times
 8 
 9 excutor=ThreadPoolExecutor(max_workers=2)
10 # #通過submit函數提交執行的函數到線程池,立即返回值(不會阻塞)
11 # ret1=excutor.submit(get_html,(3))
12 # #done()判斷函數是否執行成功
13 # print(ret1.done())
14 # print(ret1.cancel())
15 # ret2=excutor.submit(get_html,(2))
16 # print(ret2.done())
17 # #result()獲取返回值
18 # print(ret1.result())
19 urls=[2,3,4]
20 #獲取已經完成的任務
21 all_task=[excutor.submit(get_html,(url)) for url in urls]
22 #wait(讓主線程阻塞等待子線程完成),可以
23 wait(all_task,return_when='FIRST_COMPLETED')
24 print('haha')
25 # for future in as_completed(all_task):
26 #     data=future.result()
27 #     print(data)
28 # for data in excutor.map(get_html,urls):
29 #     print(data)
View Code

   4:ThreadPoolExecutor源碼簡介:

    submit方法會加一把鎖,創建Future,然后放入WorkItem(執行單元),將執行單元放入執行隊列中,_adjust_thread_count()會比較啟動得線程數量和最大線程數量,如果小於就會啟動一個線程放入_threads里面,_threads里面執行的是_worker()函數,參數為work_que,獲取WorkItem,然后調用里面的run()方法,將值設置到future中去。

Future中的方法,cancel(),done()等等都是判斷狀態,result()是會阻塞的,調用Condition()

六.多進程編程-multiprocessing

  1.和多線程對比:

    ·1.1多進程開銷大,多線程開銷小;

     1.2耗CPU的操作,多進程編程比多線程編程好很多,對於IO操作來說,使用多線程操作比多進程好(線程切換比進程切換性能高)

  2.例:

    1.1對於耗CPU的操作(多進程優於多線程):

 1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
 2 import time
 3 
 4 def fib(n):
 5     if n<=2:
 6         return 1
 7     return fib(n-2)+fib(n-1)
 8 if __name__=='__main__':
 9     #代碼要放在這里面,不然可能拋異常
10     with ThreadPoolExecutor(3) as excutor:
11         start_time=time.time()
12         all_task=[excutor.submit(fib,num) for num in range(25,40)]
13         for future in as_completed(all_task):
14             data=future.result()
15             print('結果:'+str(data))
16         print('多線程所需時間:'+str(time.time()-start_time))
17     '''
18     多線程所需時間:72.10901117324829
19     '''
20 
21     with ProcessPoolExecutor(3) as excutor:
22         start_time=time.time()
23         all_task=[excutor.submit(fib,num) for num in range(25,40)]
24         for future in as_completed(all_task):
25             data=future.result()
26             print('結果:'+str(data))
27         print('多進程所需時間:'+str(time.time()-start_time))
28     '''
29     多進程所需時間:43.14996862411499
30     '''
View Code

 

    1.2對於IO操作,多線程由於多進程:    

 1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
 2 import time
 3 
 4 def random_sleep(n):
 5     time.sleep(n)
 6     return n
 7 if __name__=='__main__':
 8     #代碼要放在這里面,不然可能拋異常
 9     with ThreadPoolExecutor(3) as excutor:
10         start_time=time.time()
11         all_task=[excutor.submit(random_sleep,num) for num in [2]*30]
12         for future in as_completed(all_task):
13             data=future.result()
14             print('休息:'+str(data)+'')
15         print('多線程所需時間:'+str(time.time()-start_time))
16     '''
17     多線程所需時間:20.010841131210327
18     '''
19 
20     with ProcessPoolExecutor(3) as excutor:
21         start_time=time.time()
22         all_task=[excutor.submit(random_sleep,num) for num in [2]*30]
23         for future in as_completed(all_task):
24             data=future.result()
25             print('休息:'+str(data)+'')
26         print('多進程所需時間:'+str(time.time()-start_time))
27     '''
28     20.755817651748657
29     '''
View Code

  3.進程有趣的例子:

 1 import os
 2 import time
 3 #fork只能用於Linux/Unix
 4 #進程間數據是隔離的,每個進程都有完整的數據,fork()會將父進程代碼復制一遍在運行(fork后的代碼)
 5 pid=os.fork()
 6 print('LYQ')
 7 if pid==0:
 8     print('子進程:{},父進程:{}'.format(os.getpid(),os.getppid()))
 9 else:
10     print('我是父進程:{}'.format(pid))
11     #暫停兩秒,父進程還沒有退出,子進程可以運行完,父進程退出就可以kill掉子進程
12     #不暫停的話,父進程退出,子進程仍然在運行,輸出
13     time.sleep(2)
14 '''
15 暫停的輸出:
16 LYQ
17 我是父進程:8587
18 LYQ
19 子進程:8587,父進程:8578
20 
21 不暫停的輸出:
22 LYQ
23 我是父進程:8587
24 [root@izwz97n253zzwjtudbqt5uz ~]# LYQ
25 子進程:8587,父進程:1
26 '''
View Code

   4.multiprocessing:(比ProcessPoolExecutor更底層【基於multiprocessing實現】,推薦ProcessPoolExecutor更好的設計,和ThreadPoolExecutor相似):

    

 1 import time
 2 import multiprocessing
 3 def get_html(n):
 4     time.sleep(n)
 5     print('sub_process success')
 6     return n
 7 
 8 if __name__=='__main__':
 9     process=multiprocessing.Process(target=get_html,args=(1,))
10     #獲取進程號,沒有start之前為None
11     print(process.pid)
12     process.start()
13     print(process.pid)
14     process.join()
15     print('main_process  success')
View Code

 

  5進程池:

 1 ......
 2 pool=multiprocessing.Pool(3)
 3     #異步提交任務
 4    #  result=pool.apply_async(get_html,args=(2,))
 5    # #關閉不在進入進程池
 6    #  pool.close()
 7    #  pool.join()
 8    #  print(result.get())
 9    #和執行順序一樣
10    for result in pool.imap(get_html,[1,5,3]):
11        print('{} sleep success'.format(result))
12    #和先后完成順序一樣
13    for result in pool.imap_unordered(get_html, [1, 5, 3]):
14        print('{} sleep success'.format(result))
View Code

 

七.進程間通信

  1.共享全局變量在多進程中不適用(會把數據復制到子進程中,數據是獨立的,修改也不會影響),quue中的Queue也不行,需要做一些處理

 1 from multiprocessing import Queue,Process
 2 import time
 3 def producer(queue):
 4      queue.put('a')
 5      time.sleep(2)
 6 
 7 def consumer(queue):
 8     time.sleep(2)
 9     data=queue.get()
10     print(data)
11 
12 if __name__=='__main__':
13     queue=Queue(10)
14     pro_producer=Process(target=producer,args=(queue,))
15     pro_consumer=Process(target=consumer,args=(queue,))
16     pro_producer.start()
17     pro_consumer.start()
18     pro_producer.join()
19     pro_consumer.join()
View Code

 

  2.multiprocessing中的Queue不能用於進程池(需要用到manager):

   queue=Manager().Queue(10)
    
1 from queue import Queue——>用於多線程
2 from multiprocessing import Queue——>用於非進程池的多進程通信
3 from multiprocessing import Manager——>manager.Queue()用於進程池通信
View Code

 

   3.通過Pipe進行進程間通信(管道),pipe只能適用於兩個進程 ,Pipe性能高於queue

 1 from multiprocessing import Pipe
 2 import time
 3 def producer(pipe):
 4      pipe.send('a')
 5      time.sleep(2)
 6 
 7 def consumer(pipe):
 8     time.sleep(2)
 9     data=pipe.recv()
10     print(data)
11 
12 if __name__=='__main__':
13     #通過Pipe進行進程間通信(管道),pipe只能適用於兩個進程
14     recv_pipe,send_pipe=Pipe()
15     queue=Manager().Queue(10)
16     pro_producer=Process(target=producer,args=(send_pipe,))
17     pro_consumer=Process(target=consumer,args=(recv_pipe,))
18     pro_producer.start()
19     pro_consumer.start()
20     pro_producer.join()
21     pro_consumer.join()
View Code

 

  4.進程間共享內存(Manager): 

 1 from multiprocessing import Manager,Process
 2 
 3 def add_data(pro_dict, key, value):
 4     pro_dict[key] = value
 5 
 6 if __name__=='__main__':
 7     #常用的類型都有
 8 
 9     process_dict=Manager().dict()
10     fir=Process(target=add_data,args=(process_dict,'name1','LYQ1'))
11     sed = Process(target=add_data, args=(process_dict, 'name2', 'LYQ2'))
12     fir.start()
13     sed.start()
14     fir.join()
15     sed.join()
16     print(process_dict)
View Code

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM