PythonI/O進階學習筆記_10.python的多線程


 content:
1. python的GIL
2. 多線程編程簡單示例
3. 線程間的通信
4. 線程池
5. threadpool Future 源碼分析
 
===========================
 
一. python的GIL
關於python的GIL,有一位博主寫的我覺得真的挺好的,清晰明了通俗易懂。 http://cenalulu.github.io/python/gil-in-python/
在這里就不贅述了,但是注意文章中的試驗結論是基於python2的。python3中已經有所改進所以文中示例未必能得出當時相同的結論。
但是依舊對我們理解GIL很有幫助。
 
那總結下來什么是GIL呢?
global interpreter lock
python前期為了在多線程編程的時候更為簡單,於是誕生了GIL。GIL使得同一時刻只有一個線程在一個cpu上執行字節碼,無法將多個線程映射到多個cpu上。也就使得python無法實現真正意義上的多線程。
 
那是不是有了GIL是不是就絕對安全了?我們編碼的時候就不需要考慮線程安全了?
並不是,GIL釋放的時間可能那時候進程並沒有執行完成。
GIL會在適當的時候釋放,比如在字節碼某特定行數以及特定時間片被釋放,也會在遇到io操作的時候主動釋放。
 
二. 多線程編程簡單示例
想要實現開啟線程執行任務,有兩種方法,直接用Thread進行實例化,或者自己實現繼承Thread的子類。
 
1.通過therad類實例化
這種情況適用於代碼量比較少,邏輯比較簡單的時候
import time
import  threading
def get_detail_html(url):
    print("get detail html start")
    time.sleep(2)
    print("get detail html stop")
 
def get_detail_url(url):
    print("url start")
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    thread1= threading.Thread(target=get_detail_html,args=("",))
    thread2= threading.Thread(target=get_detail_url,args=("",))
    start_time=time.time()
    
    # thread1.setDaemon()
    # thread2.setDaemon()
 
    thread1.start()
    thread2.start()
 
    thread1.join()
    thread2.join()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass

 

2.通過繼承Thread來實現多線程(繼承Thread,完成自己的thread子類)
按這種情況來寫的話,我們就需要重載我們的run方法。(注意是run方法而不是start)
import time
import  threading
 
class thread_get_detail_html(threading.Thread):
    def run(self):
        print("get detail html start")
        time.sleep(2)
        print("get detail html stop")
 
class thread_get_detail_url(threading.Thread):
    def run(self):
        print("url start")
        time.sleep(2)
        print("url end")
 
if __name__=="__main__":
    # thread1= threading.Thread(target=get_detail_html,args=("",))
    # thread2= threading.Thread(target=get_detail_url,args=("",))
    thread1=thread_get_detail_html()
    thread2=thread_get_detail_url()
    start_time=time.time()
 
    # thread1.setDaemon()
    # thread2.setDaemon()
 
    thread1.start()
    thread2.start()
 
    thread1.join()
    thread2.join()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass

以上就能發現,啟動了兩個線程分別執行了thread_get_detail_url和thread_get_detail_url。

 
三. 線程間的通信
實際上在二中,是在模擬一個簡易爬蟲的流程。先獲取所有我們要爬取的url,然后再對每個url的html頁面內容進行獲取。那么這就涉及到一個問題了,thread_get_detail_url和thread_get_detail_html之間,需要thread_get_detail_url來的帶一個url列表,而thread_get_detail_html也能獲取這個url列表去進行操作。
這就涉及到線程間的通信了。
python中常用的線程間的這種需求的通信方式有:

- 全局變量
- Queue消息隊列
假設我們現在繼續來完成這個爬蟲的正常邏輯。

1. 線程間的變量傳遞

1.1 全局變量
import time
import  threading
 
detail_url_list=[]
 
def get_detail_html():
    global detail_url_list
    if len(detail_url_list)==0:
        return
    url=detail_url_list.pop()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url():
    global  detail_url_list
    print("url start")
    for i in range(20):
        detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    thread1= threading.Thread(target=get_detail_url)
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html)
        thread_2.start()
    print("lasttime :{}".format(time.time()-start_time))
    pass

實際上,還可以更方便。將變量作為參數傳遞,在方法中就不需要global了。

import time
import  threading
 
detail_url_list=[]
def get_detail_html(detail_url_list):
    if len(detail_url_list)==0:
        return
    url=detail_url_list.pop()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url(detail_url_list):
    print("url start")
    for i in range(20):
        detail_url_list.append("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    thread1= threading.Thread(target=get_detail_url,args=(detail_url_list,))
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html,args=(detail_url_list,))
        thread_2.start()
 
    print("lasttime :{}".format(time.time()-start_time))
    pass
但是這樣是不能應用於多進程的。
還可以生成一個variables.py文件,直接import這個文件,這種情況變量過多的時候,這種方法比較方便。
但是如果我們直接import變量名,是不能看到其他進程對這個變量的修改的。
 
但是以上的方法都是線程不安全的操作。想要達到我們要的效果,就必須要加鎖。所以除非對鎖足夠了解,知道自己在干嘛,否則並不推薦這種共享變量的方法來進行通信。
 
1.2 queue消息隊列
 
a.queue實現上述
import time
import  threading
from queue import  Queue
 
def get_detail_html(queue):
    url=queue.get()
    print("get detail html start :{}".format(url))
    time.sleep(2)
    print("get detail html stop :{}".format(url))
 
def get_detail_url(queue):
    print("url start")
    for i in range(20):
        queue.put("htttp://www.baidu.com/{id}".format(id=i))
    time.sleep(2)
    print("url end")
 
if __name__=="__main__":
    start_time=time.time()
    url_queue=Queue()
    thread1= threading.Thread(target=get_detail_url,args=(url_queue,))
    thread1.start()
    for i in range(10):
        thread_2=threading.Thread(target=get_detail_html,args=(url_queue,))
        thread_2.start()

 

b.queue是如何實現線程安全的?
我們並不推薦1.1中直接用全局變量的方法,是因為需要我們自己花精力去維護其中的鎖操作才能實現線程安全。而python的Queue是在內部幫我們實現了線程安全的。
queue使用了deque deque是在字節碼的程度上就實現了線程安全的
 
c.queue的其他方法
get_nowait(立即取出一個元素,不等待)(異步)
put_nowait(立即放入一個元素,不等待)(異步)
join: 一直block住,從quque的角度阻塞住線程。調用task_done()函數退出。
 

2.線程間的同步問題

2.1 線程為什么需要同步?同步到底是個啥意思?
這是在多線程中,必須要面對的問題。
例子:我們有個共享變量total,一個方法對total進行加法,一個方法對加完之后的total進行減法。
如果循環對total進行加減的次數比較大的時候,就會比較明顯的發現,每次運行的時候,得到的taotal可能是不一樣的。
import threading
 
total=0
def add():
    global total
    for i in range(100000000):
        total += 1
def desc():
    global  total
    for i in range(100000000):
        total = total - 1
if __name__=="__main__":
    add_total=threading.Thread(target=add)
    desc_total=threading.Thread(target=desc)
    add_total.start()
    desc_total.start()
    add_total.join()
    desc_total.join()
 
    print(total)
為什么不會像我們希望的最后的total為0呢?
從字節碼的角度上看,我們看看簡化后的add和desc的字節碼。
#input 
def add1(a):
    a += 1
 
def desc1(a):
    a -= 1
 
import  dis
print(dis.dis(add1))
print(dis.dis(desc1))
 
#output
22           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_ADD
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
25           0 LOAD_FAST                0 (a)
              2 LOAD_CONST               1 (1)
              4 INPLACE_SUBTRACT
              6 STORE_FAST               0 (a)
              8 LOAD_CONST               0 (None)
             10 RETURN_VALUE
None
從字節碼來看流程為:#1.load a #2.load 1 #3.add  #4.賦值給a
任何一步字節碼都是有可能被切換出去另外一個線程的字節碼去操作a,可能在1線程運行到4字節碼(a和1相加)的時候,開始運行2線程的6字節碼(賦值給a)。
類似的有銀行存取錢、商品庫存等也會有這個問題。
 
2.2 線程如何同步?
用鎖將這段代碼段鎖住,鎖住時,不進行切換。直接運行完這段代碼段。
 
a.Lock和Rlock
threading中有提供lock。
import threading
from threading import  Lock
total=0
lock=Lock()
def add():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        total += 1
        lock.release()
def desc():
    global  total
    global lock
    for i in range(1000000):
        lock.acquire()
        total -= 1
        lock.release()
 
if __name__=="__main__":
    add_total=threading.Thread(target=add)
    desc_total=threading.Thread(target=desc)
    add_total.start()
    desc_total.start()
 
    add_total.join()
    desc_total.join()
    print(total)
    pass
注意acquire和release成對存在。運行的時候會發現比不加鎖的時候慢比較多。所以其實鎖的問題也很明顯:鎖會影響性能,鎖會引起死鎖。死鎖里有個非常常見的問題資源競爭是很容易發生的。
那能不能我鎖里套着鎖呢?Lock方法是不可以的,但是threading提供了Rlock可重入鎖。
Rlock在同一個線程里面,可以連續調用多次acquire,但是注意acquire和release也一定是要成對存在的。
from threading import  RLock
 
total=0
lock=RLock()
def add():
    global total
    global lock
    for i in range(1000000):
        lock.acquire()
        lock.acquire()
        total += 1
        lock.release()
        lock.release()
 
3.condition使用以及源碼分析
condition是條件變量,用於復雜的線程間同步。
 
3.1 condition的使用
例子:現有一個需求,要求 天貓精靈和小愛一人一句進行對話。如果我們現用lock來實現是沒辦法做到這邊說完一句,那邊就說一句的。所以有了condition。
在這個例子中,需要用到condition的兩個重要方法 notify()和wait()。notify()用於通知這邊動作完成,wait()用於阻塞住等待消息。
#input
import  threading
 
class XiaoAi(threading.Thread):
    def __init__(self,cond):
        self.cond=cond
        super().__init__(name="小愛")
    def run(self):
        with self.cond:
            print("小愛: 天貓在嗎 我是小愛")
            self.cond.notify()  #小愛print完了,信號發送
            self.cond.wait()  #小愛等待接受信號
            print("小愛: 我們來背詩吧")
            self.cond.notify()
class TianMao(threading.Thread):
    def __init__(self,cond):
        self.cond=cond
        super().__init__(name="天貓")
 
    def run(self):
        with self.cond:
            self.cond.wait()
            print("天貓: 在 我是天貓")
            self.cond.notify()
            self.cond.wait()
            print("天貓: 好啊")
            self.cond.notify()
 
if __name__=="__main__":
    condition=threading.Condition()
    xiaoai=XiaoAi(condition)
    tianmao=TianMao(condition)
 
    tianmao.start()
    xiaoai.start()
 
#output:
小愛: 天貓在嗎 我是小愛
天貓: 在 我是天貓
小愛: 我們來背詩吧
天貓: 好啊
ps:需要注意的是
  • condition必須先with 再調用 notify和wait方法
  • 這么寫的時候,線程的start()順序很重要
 
3.2 Condition源碼分析
condition其實是有兩層鎖的。一把底層鎖,會在線程調用了wait()的時候釋放。
上層鎖會在wait()的時候放入雙端隊列中,在調用notify()的時候被喚醒。
 
a.condition=threading.Condition()
condition初始化的時候申請了一把鎖
 
b.self.cond.wait()
先釋放了condition初始化的時候申請的底層鎖,然后又申請了鎖放入雙端隊列。
 
c. self.cond.notify()
 
4.信號量 semaphore
是可以用來控制線程執行數量的鎖。
 
4.1 semaphore的使用
需求:現在有個文件,對文件可以進行讀和寫,但是寫是互斥的,讀是共享的。並且對讀的共享數也是有控制的。
例:爬蟲。控制爬蟲的並發數。
import threading
import time
 
class HtmlSpider(threading.Thread):
    def __init__(self,url,sem):
        super().__init__()
        self.url=url
        self.sem=sem
 
    def run(self):
        time.sleep(2)
        print("got html text success")
        self.sem.release()
 
class UrlProducer(threading.Thread):
    def __init__(self,sem):
        super().__init__()
        self.sem=sem
    def run(self):
        for i in range(20):
            self.sem.acquire()
            html_test=HtmlSpider("www.baidu.com/{}".format(i),self.sem)
            html_test.start()
 
if __name__=="__main__":
    sem=threading.Semaphore(3)  #設置控制的數量為3
 
    urlproducer=UrlProducer(sem)
    urlproducer.start()

ps:

  • 每acquire一次,數量就會被減少一,release的時候數量會自動回來。
  • 需要注意sem釋放的地方,應該是在HtmlSpider運行完之后進行釋放。
 
4.2 semaphore源碼 
實際上semaphore就是對condition的簡單應用。
 
a.sem=threading.Semaphore(3)
實際上就是在初始化的時候,調用了Condition。
 
b.self.sem.acquire()
我們簡單看這個邏輯就是,如果設置的數用完了,就讓condition進入wait狀態,否則就把數量減一。
 
c.self.sem.release()
release 也是很簡單的數量加一和condition的notify。
 
 
5.除了上述的對Condition的應用,queue模塊中的Queue也對Condition做了更為復雜的應用。特別是queue中的put。
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        。。。
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
 
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
 
    def put(self, item, block=True, timeout=None):
       
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()
    。。。。。。

 

四. 線程池
在前面進行線程間通信的時候,想要多個線程進行並發的時候,需要我們自己去維護鎖。
但是我們現在希望有工具來幫我們對想要線程並發數進行管理。於是有了線程池。
那么為什么明明有了信號量 semaphore 還需要線程池呢?
因為線程池不只是控制了線程數量而已。
 
比如說,現在有需求,在主進程中,我們需要得到某個線程的狀態。
並且線程的狀態不管是退出還是什么,主進程能立刻知道。
futures讓多線程和多進程的接口一致。
 
1.使用線程池
concurrent.futures中有兩個類ThreadPoolExecutor和 ProcessPoolExecutor 分別用於線程池和進程池的創建,基類是futures的Executor類。
使用ThreadPoolExecutor只需要將要執行的函數和要並發的線程數交給它就可以了。
使用線程池來執行線程任務的步驟如下:
a.調用 ThreadPoolExecutor 類的構造器創建一個線程池。
b.定義一個普通函數作為線程任務。
c.調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。submit返回的是Future類(重要)
d. 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。
#例:將之前爬蟲模擬的腳本改為線程池用。

from concurrent.futures import  ThreadPoolExecutor
import time

def get_html(times):
    time.sleep(times)
    print("get html page {} successed!".format(times))
    return times

excutor=ThreadPoolExecutor(max_workers=2)
#submit 提交到線程池
#submit的返回很重要,返回的對象Future類可以判斷這個函數的執行狀態等
#submit 是非阻塞的
task1=excutor.submit(get_html,(3))
task2=excutor.submit(get_html,(2))

print(task1.done())  
print(task2.done())

#result 是阻塞的,接受函數的返回
print(task1.result())
print(task2.result())

#output:
False
False
get html page 2 successed!
get html page 3 successed!
3
2
為什么done()輸出的是false呢。因為submit的返回是非阻塞的,沒有等task執行完就返回了task done的狀態。
如果隔幾秒輸出done()的返回又是true了。
 
Future對象常用的其他方法:cancel()
取消該 Future 代表的線程任務。如果該任務正在執行,不可取消,則該方法返回 False;否則,程序會取消該任務,並返回 True。
print(task1.done())
print(task1.cancel())
print(task2.done())

#result 是阻塞的,接受函數的返回
print(task1.result())

print(task2.result())

output:
False
False
False
get html page 2 successed!
get html page 3 successed!
3
2

cancelled():返回 Future 代表的線程任務是否被成功取消。

 
2.獲取所有完成的future的狀態和值as_completed和map
 
2.1 .as_complete
from concurrent.futures import  ThreadPoolExecutor,as_completed
import time
 
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
for future in as_completed(future_list):
    print(future.result())
 
#output:
get url successed: " www.test.com/2 "
2
get url successed: " www.test.com/5 "
5
get url successed: " www.test.com/3 "
3

as_completed():yield 所有完成的futures的所有返回。

那么as_complete是如何做到收集所有完成的異步方法的狀態的呢?

先把所有已經是finish狀態的future返回,
再一直while pending,等待timeout范圍內的future變成finish,把finish的future yield出來。
from concurrent.futures import  ThreadPoolExecutor,as_completed
import time
 
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
for future in as_completed(future_list):
    print(future.result())
 
#output:
get url successed: " www.test.com/2 "
2
get url successed: " www.test.com/5 "
5
get url successed: " www.test.com/3 "
3
 
2.2  通過excutor的map方法 獲取已經完成的future
excutor的map,和map是差不多的,傳遞函數和參數列表,就會多多線程運行參數列表數的線程。
與a中不一樣的是,map返回的順序和url中的順序是一樣的,而a的as_completed是誰先finishi誰就先被yield出來。
而且map返回的就是result,而as_completed返回的是Future。
from concurrent.futures import  ThreadPoolExecutor,as_completed,wait
import time
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
 
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
 
result_list=excutor.map(get_html,uids)
for result in result_list:
    print(result)
 
#output:
get url successed: " www.test.com/2 "
get url successed: " www.test.com/5 "
5
2
get url successed: " www.test.com/3 "
3

 

3.wait()方法
wait方法用於阻塞,指定某一個task或者一些task執行完成再往下執行。
def wait(fs, timeout=None, return_when=ALL_COMPLETED)
例:如果我想在全部task執行完之后打印"task end"字符串
from concurrent.futures import  ThreadPoolExecutor,as_completed,wait
import time
def get_html(uid):
    time.sleep(uid)
    url="www.test.com/{}".format(uid)
    print("get url successed: \" {} \"".format(url))
    return uid
excutor=ThreadPoolExecutor(max_workers=2)
uids=[5,2,3]
future_list=[ excutor.submit(get_html,(uid)) for uid in uids]
print("task end")
 
#output:
task end
get url successed: " www.test.com/2 "
get url successed: " www.test.com/5 "
get url successed: " www.test.com/3 "

#還沒有執行完,就輸出了 task end。需要加上: wait(future_list)
print("task end") #output: get url successed: " www.test.com/2 " get url successed: " www.test.com/5 " get url successed: " www.test.com/3 " task end
wait還可以指定在什么時候執行完后返回。
 
五. threadpool Future 源碼分析
這段源碼我是真的很想認真通俗淺顯的去分析的。因為在python的多進程和多線程中這個Future的概念是十分常見和重要的。
但是我發現我解釋起來太蒼白了,還沒有直接去看源碼來的通俗易懂。就在這放一小段入口的我自己的理解吧。
看完了上面幾段筆記,肯定會有這些疑惑:
- submit返回的Future對象到底是啥?
    Future是用來表示task的對象的一個類,很多人稱為未來對象,就是這個任務未必現在執行完成了,但是未來是會執行完成的。
    得到了Future對象,能通過其中的屬性和方法得到task的狀態,是否執行完成等。
    在python的多線程、多進程中,很多地方用到了Future概念。
    具體屬性可以去看Class Future中的屬性和方法。
 
- 那么Future這個對象是怎么設計的呢?Future怎么知道task的狀態改變的呢?
在之前的例子里,我們用ThreadPoolExecutor的submit提交所有的task,返回了Future對象。
那么submit對Future對象的哪些屬性進行了哪些處理然后返回,才能讓我們得到它的result的呢?
submit的源碼:最主要的邏輯是注釋了的那幾句。
def submit(self, fn, *args, **kwargs):
    with self._shutdown_lock:
        if self._broken:
            raise BrokenThreadPool(self._broken)

        if self._shutdown:
            raise RuntimeError('cannot schedule new futures after shutdown')
        if _shutdown:
            raise RuntimeError('cannot schedule new futures after'
                               'interpreter shutdown')

        f = _base.Future()    #初始化一個future對象f
        w = _WorkItem(f, fn, args, kwargs) #實際上是這個_WorkItem把(future對象,執行函數,函數需要的參數)放進去的,並且完成函數的執行,並且設置future的result

        self._work_queue.put(w)       #將w這個task放入 _work_queue隊列,會在下面這個方法中,被起的Thread進行調用。
        self._adjust_thread_count()   #調整線程數量,並且初始化線程,開啟線程。Thread方法的參數是self._work_queue。起來的線程中執行的task是上兩步生成的w隊列。
        return f

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM