一、說明
多線程這個東西,感覺一直以來都是用一次就要學一次,今天需要將之前寫的腳本改成線程池的形式又學習了一輪。為了以后方便在這直接記下來。
二、多線程實現
2.1 多線程的基本實現

import threading import time import datetime # 該類是自定義的多線程類 # 多己寫多線程時仿造記類實現自己的多線程類即可 class MyThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) # 必須實現函數,run函數被start()函數調用 def run(self): thread_name = threading.current_thread().name print(f"開始線程: {thread_name}") self.print_time() print(f"退出線程: {thread_name}") # 可選函數,此處函數的代碼可移到run函數內部,也可放到MyThread之外,無關緊要 # 線程要做的具體事務函數,我們這里就打印兩輪時間 def print_time(self): count = 2 thread_name = threading.current_thread().name while count: time.sleep(1) print(f"{thread_name}: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')}") count -= 1 # 該類是一個演於調用MyThread的類 # 其實其代碼也完全可以放在if __name__ == "__main__"處 class TestClass(): def __init__(self): pass def main_logic(self): # 創建新線程實例 thread_1 = MyThread() thread_2 = MyThread() # 啟動新線程 thread_1.start() thread_2.start() # thread_1.join()即當前線程(亦即主線程)把時間讓給thread_1,待thread_1運行完再回到當前線程 # thread_2.join()即當前線程(亦即主線程)把時間讓給thread_2,待thread_1運行完再回到當前線程 # join()方法非阻塞 # 如果沒對某個線程使用join()方法,那么當前線程(亦即主線程)不會等待該線程執行完再結束,他會直接結束 # 在多線程的進程中,主線程的地位和其他線程的地位是平等的,不會說主線程退出了就會導致整個進程,進而導致其他線程被迫終止 # 自己把這兩句join()注釋掉再運行一遍,可以更好理解這里的說法 thread_1.join() thread_2.join() print("退出主線程") if __name__ == "__main__": obj = TestClass() obj.main_logic()
運行結果如下:
2.2 多線程間的同步
上一小結的代碼運行可以成功,但在輸出的時候,可以看到是很混亂的:線程1剛打印完自己的名字還沒打印時間線程2就搶着打印自己的名字了。
造成這種結果的原因是,默認的多線程中,各線程之間是沒有感知的。比如線程2並不知道線程1執行到了哪里(反過來亦然),是剛打印完名字還是已經打印完名字和時間還是其他什么狀態,都是不知道的。
所謂同步最主要的就是使用某種方法讓線程之間能在一定程度上知道對方在做什么,至少的目標是在使用共用資源時能告訴對方我正在使用你先不要用,實現這“至少”目標最常用的方法是使用鎖。
Python3中鎖對應的類是threading.Lock(),可通過該類的acquire()方法來獲取鎖,然后通過該類的release()方法來釋放鎖。

import sys import threading import time import datetime # 該類是自定義的多線程類 # 多己寫多線程時仿造記類實現自己的多線程類即可 class MyThread(threading.Thread): def __init__(self,threading_lock): threading.Thread.__init__(self) # 同步添加處2/4:承接傳進來的鎖 self.threading_lock = threading_lock # 注意鎖必須定義在線程類外部,不能如下在線程類內部自己定義鎖 # 因為如果鎖在線程類內部才定義,每個線程都是不同的線程類實例,那么各線程間的鎖變量本質上就不是同一個鎖變量了 # self.threading_lock = threading.Lock() # 必須實現函數,run函數被start()函數調用 def run(self): thread_name = threading.current_thread().name print(f"開始線程: {thread_name}") self.print_time() print(f"退出線程: {thread_name}") # 可選函數,此處函數的代碼可移到run函數內部,也可放到MyThread之外,無關緊要 # 線程要做的具體事務函數,我們這里就打印兩輪時間 def print_time(self): count = 2 thread_name = threading.current_thread().name while count: time.sleep(1) # 同步添加處3/4:要操作共用資源(這里即打印)時獲取鎖 self.threading_lock.acquire() print(f"this is thread : {thread_name}") print(f"now time is : {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S %f')}") # 同步添加處4/4:操作完共用資源(這里即打印)后釋放鎖 self.threading_lock.release() count -= 1 # 該類是一個演於調用MyThread的類 # 其實其代碼也完全可以放在if __name__ == "__main__"處 class TestClass(): def __init__(self): pass def main_logic(self): # 同步添加處1/4:定義一個鎖對象 threading_lock = threading.Lock() # 創建新線程實例 thread_1 = MyThread(threading_lock) thread_2 = MyThread(threading_lock) # 啟動新線程 thread_1.start() thread_2.start() # thread_1.join()即當前線程(亦即主線程)把時間讓給thread_1,待thread_1運行完再回到當前線程 # thread_2.join()即當前線程(亦即主線程)把時間讓給thread_2,待thread_1運行完再回到當前線程 # join()方法非阻塞 # 如果沒對某個線程使用join()方法,那么當前線程(亦即主線程)不會等待該線程執行完再結束,他會直接結束 # 在多線程的進程中,主線程的地位和其他線程的地位是平等的,不會說主線程退出了就會導致整個進程,進而導致其他線程被迫終止 # 自己把這兩句join()注釋掉再運行一遍,可以更好理解這里的說法 thread_1.join() thread_2.join() print("退出主線程") if __name__ == "__main__": obj = TestClass() obj.main_logic()
使用鎖后輸出如下:
2.3 錯誤的鎖使用方式
雖然在上一小節的代碼注釋中有說明,但一是自己踩了坑然后想了半天二是感覺應該還是比較典型的容易犯的問題,所以單獨一節再強調一下。
代碼注釋如下:
總的意思就是線程同步用的鎖必須要來自線程類外部而不能是來自線程類內部的自己實例化。
舉個例子,比如我們有一個類叫Test其內部有一個變量a,Test類有兩個實例化對象test1和test2,那么我們知道test1.a和test2.a並不是一個變量,對test1.a的任何操作都不會影響test2.a。
所以自己print順序混亂,使用了鎖還是混亂時,就要注意是不是自己的鎖是在線程類內部實例化的(不要盯着網上什么print()不是線程安全的、要用sys.stdout.write()之類的說法想半天)。
三、線程池實現
假設我們有100個任務,我們打算分10輪進行,每輪創建10個線程去處理10個任務,這是一個可行的做法,但有些粗放。
每輪創建10個線程,那么就意味着完成這100個任務前后共有100個線程的創建及消毀的動作,而如果我們創建10個線程組成線程池那么就能反復復用這10個線程,完成100個任務的前后總共只有10個線程的創建及消毀的動作。(但要注意整個過程中並不一定是每個線程都處理10個任務了,有的線程可能處理多於10個,有的線程則可能處理少於10個)
減少線程創建及消毀過程中損失的計算資源正是線程池的意義所在。
3.1 由普通單線程類改造成使用線程池的類的方法
有時候我們會遇到這種情況:我們之前已經寫好了一個類,但該類是單線程的我們現在機改成多線程的形式。
我們下邊來看一下如何該動最少的代碼,將該類改成使用線程池的形式;同時有了第二大節的知識,也直接改成線程同步的形式。
假設原始的單線程的代碼形式如下:

import threading import time class TestClass(): def __init__(self): pass def main_logic(self): for i in range(4): self.do_something(i) pass def do_something(self, para): thread_name = threading.current_thread().name print(f"this is thread : {thread_name}") print(f"the parameter value is : {para}") time.sleep(1) pass if __name__ == "__main__": obj = TestClass() obj.main_logic()
執行結果如下:
改造后代碼如下:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 線程池,進程池 import threading import time class TestClass(): def __init__(self): # 線程池+線程同步改造添加代碼處1/5: 定義鎖和線程池 # 我們第二大節中說的是鎖不能在線程類內部實例化,這個是調用類不是線程類,所以可以在這里實例化 self.threadLock = threading.Lock() # 定義2個線程的線程池 self.thread_pool = ThreadPoolExecutor(2) # 定義2個進程的進程池。進程池沒用寫在這里只想表示進程池的用法和線程池基本一樣 # self.process_pool = ProcessPoolExecutor(2) pass def main_logic(self): for i in range(4): # 線程池+線程同步改造添加代碼處3/5: 注釋掉原先直接調的do_something的形式,改成通過添加的中間函數調用的形式 # self.do_something(i) self.call_do_something(i) pass # 線程池+線程同步改造添加代碼處2/5: 添加一個通過線程池調用do_something的中間方法。參數與do_something一致 def call_do_something(self, para): self.thread_pool.submit(self.do_something, para) def do_something(self, para): thread_name = threading.current_thread().name # 線程池+線程同步改造添加代碼處4/5: 獲取鎖 self.threadLock.acquire() print(f"this is thread : {thread_name}") print(f"the parameter value is : {para}") # 線程池+線程同步改造添加代碼處5/5: 釋放鎖 self.threadLock.release() time.sleep(1) pass if __name__ == "__main__": obj = TestClass() obj.main_logic()
執行結果如下:
3.2 處理submit非阻塞導致的所有任務會一次性創建的問題
self.thread_pool.submit()是非阻塞的,提交任務后立即返回,這就導致call_do_something()也會立即返回,進而導致main_logic()中的for會一下執行完。
也就是說,還是假設我們有100個任務用10個線程去處理,那么這100個任務基本就是一下就創建完成。當然線程池自己會組織好這100個任務,慢慢地讓這10個線程去處理。
這種機制在100個任務下還沒什么大的影響,但如果我們有100萬個任務呢,100萬個任務同時創建,100萬個任務的信息堆在內存中,內存消耗就是很大的問題了。
所以當任務量很大、或者任務的參數很大時就要注意和處理submit非阻塞可能會導致的內存消耗問題。代碼在上一小節之上修改如下:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 線程池,進程池 import threading import time class TestClass(): def __init__(self): # 線程池+線程同步改造添加代碼處1/5: 定義鎖和線程池 # 我們第二大節中說的是鎖不能在線程類內部實例化,這個是調用類不是線程類,所以可以在這里實例化 self.threadLock = threading.Lock() # 定義2個線程的線程池 self.thread_pool = ThreadPoolExecutor(2) # 定義2個進程的進程池。進程池沒用寫在這里只想表示進程池的用法和線程池基本一樣 # self.process_pool = ProcessPoolExecutor(2) pass def main_logic(self): for i in range(6): # 線程池+線程同步改造添加代碼處3/5: 注釋掉原先直接調的do_something的形式,改成通過添加的中間函數調用的形式 # self.do_something(i) self.call_do_something(i) pass # 線程池+線程同步改造添加代碼處2/5: 添加一個通過線程池調用do_something的中間方法。參數與do_something一致 def call_do_something(self, para): # 控制未完成任務數添加代碼處1/3:定義一個變量,記錄當前任務列表。規范寫法是在__init__中定義我為代碼信中放在這里 try: self.task_handler_list except: self.task_handler_list = [] # 控制未完成任務數添加代碼處2/3:提交任務時把任務句柄記錄到上邊定義的列表中 task_handler = self.thread_pool.submit(self.do_something, para) self.task_handler_list.append(task_handler) # 控制未完成任務數添加代碼處3/3:當未完成任務數不多於線程的2倍時才允許此任務返回 while True: # 我們可能聽說過使用as_completed()可以獲取執行完成的任務列表,但實際發現as_completed是阻塞的他要等待任務響應他的詢問 # 所以並不推薦使用以下形式來獲取未執行完成的任務列表 # task_handler_list = list(set(task_handler_list) - set(concurrent.futures.as_completed(task_handler_list))) # 將已完成的任務移出列表 for task_handler_tmp in self.task_handler_list: if task_handler_tmp.done(): self.task_handler_list.remove(task_handler_tmp) # 如果未完成的任務已多於線程數的兩倍那么先停一下,先不要再增加任務,因為幾萬個ip一把放到內存中是個很大的消耗 if len(self.task_handler_list) > 2 * 2: print("unfinished task is more than double thread_count, will be wait a seconds.") # 睡眠多久看自己需要,我這設2秒 time.sleep(2) else: return True def do_something(self, para): thread_name = threading.current_thread().name # 線程池+線程同步改造添加代碼處4/5: 獲取鎖 self.threadLock.acquire() print(f"this is thread : {thread_name}") print(f"the parameter value is : {para}") # 線程池+線程同步改造添加代碼處5/5: 釋放鎖 self.threadLock.release() time.sleep(1) pass if __name__ == "__main__": obj = TestClass() obj.main_logic()
運行結果如下,可以看到成功實現在當未完成任務過多時阻止繼續創建新的任務:
3.3 多線程不共享線程函數內的局部變量
多線程共用內存地址空間所以變量是共享的,即變量在一個線程中被修改那么該變量在其他線程中的值也會被修改。
但這句話需要進一步明確,比如在3.1節的代碼中do_something函數有thread_name這個變量,如果在線程1獲取thread_name之后打印thread_name之前,線程2也獲取了thread_name,那么線程1的thread_name值是否會變成線程2的名稱?
答案是不會,多線程雖然共用地址空間,但是不同線程啟動的函數如同在主線程對一個函數進行多次調用一樣,並不放在同一地址,所以線程函數內的局部變量也就不是共享的變量。除了thread_name,para是傳值過來的參數也是線程函數內的局部所以也不共享。
3.4 多線程異常不會被打印更不會導致進程結束(20200326更新)
今天在跑一份線程池代碼時發現進程沒有按預期結束,該代碼有一初始值為0的線程計數器,在線程函數的開頭線程計數器加1,在線程函數的開頭線程計數器減1,在最后線程計數器為0(即所有線程運行完畢)時退出。但該計數器一直不為0。
反復排查之下發現是多線程(線程池)場景中,當一個線程(更准確地應該說是任務)出現異常時,該線程會直接結束,異常也不會被拋出。測試代碼如下:

import ast import time import threading from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor # 線程池,進程池 class TestClass(): def __init__(self): # 線程池+線程同步改造添加代碼處1/5: 定義鎖和線程池 # 我們第二大節中說的是鎖不能在線程類內部實例化,這個是調用類不是線程類,所以可以在這里實例化 self.threadLock = threading.Lock() # 定義2個線程的線程池 self.thread_pool = ThreadPoolExecutor(2) # 定義2個進程的進程池。進程池沒用寫在這里只想表示進程池的用法和線程池基本一樣 # self.process_pool = ProcessPoolExecutor(2) pass def main_logic(self): paras =["{'a':'1'}",{'b','2'},"{'c':'3'}",{'d','4'}] for para in paras: # 線程池+線程同步改造添加代碼處3/5: 注釋掉原先直接調的do_something的形式,改成通過添加的中間函數調用的形式 # self.do_something(i) # 單線程模式 # self.do_something(para) # 多線程(線程池模式) self.call_do_something(para) pass # 線程池+線程同步改造添加代碼處2/5: 添加一個通過線程池調用do_something的中間方法。參數與do_something一致 def call_do_something(self, para): self.thread_pool.submit(self.do_something, para) def do_something(self, para): # thread_name = threading.current_thread().name # 這個的作用是把字符串變為json對象 json_obj = ast.literal_eval(para) # 多線程時,上邊這句異常之后並不會拋出異常,且在其后邊的本該此線程運行的代碼都不會被執行 print(f"{json_obj}") # 線程池+線程同步改造添加代碼處4/5: 獲取鎖 # self.threadLock.acquire() # print(f"this is thread : {thread_name}") # print(f"the parameter value is : {para}") # # 線程池+線程同步改造添加代碼處5/5: 釋放鎖 # self.threadLock.release() # time.sleep(1) pass if __name__ == "__main__": obj = TestClass() obj.main_logic()
當單線程模式時,即注釋self.call_do_something(para)啟用self.do_something(para)時,運行代碼可以看到異常拋出:
當多線程(線程池)模式時,即注釋self.do_something(para)啟用self.call_do_something(para)時,運行代碼可以看到並沒有異常,只是出現異常位置之后的代碼(這里是print)並不會被執行:
我們有理由相信一個庫在單線程模式下不自己處理而是直接拋出異常,那么在多線程之下他應當保持一樣的表現。所以多線程模式下異常雖然沒有被打印也沒有導致進程結束,但異常應當還是拋出了的,即也可以通過try來捕獲。測試結果如下,證實了我們這一猜想。
四、關於Python3並發/並行的一些技術細節討論(20200409更新)
4.1 並發和並行的區別
並發:看似多個任務同時進行,但具體到某一時刻,只有其中一個任務能在CPU運行的狀態。
並行:同一時刻,多個任務能同時在CPU運行的狀態。
說明:單核CPU只能實現並發不能實現並行的,只有多核CPU才可能實現並行。C語言的多線程是並行的,Python的多線程由於GIL的存在只能並發。
4.2 GIL是什么
Python解析器(CPython)不是線程安全的,即當一個線程訪問一個對象時另外的線程仍然可以訪問該對象,如果不加以限制那么就很容易出現數據錯誤(就如我們自己多線程不加鎖去操作同一個變量一樣),由此Python引入了GIL。
GIL,global interpreter lock,全局解析器鎖,只有獲取到了該鎖的線程才能在解析器中執行,顯然同一個時刻只可能有一個線程能獲取到鎖,所以同一個Python解析器中同一時刻只可能有一個線程能夠運行,所以Python多線程只能並發不能並行。
至於為什么C語言不需要類似GIL的東西Python需要GIL,問題似乎是出在Python是解析型、弱類型語言。即凡是解析型、弱類型語言,如Ruby等,現在應該都有這個問題。
4.3 GIL的影響有多大及Python的多線程是不是雞肋
GIL使得同一解析器中仍意時刻只能有一個線程得到運行,即意味着任意時刻一個Python進程只能使用一個CPU核,這完全不能發揮多核計算機的優勢。如果說什么是Python的最大問題,我覺得這就是Python最大的問題。
但我們要明確以下幾點:
第一,只有在多線程狀態下GIL才會產生影響。如果你寫的程序是單線程的,那么GIL對其運行速度沒有影響。
第二,只對計算密集型任務GIL才會產生影響。如果多線程程序中線程更多是在等待IO而不是等待CPU進行計算,那么GIL對其運行速度沒有多大影響。
所以GIL只對多線程且計算密集的程序有影響,但一般而言,除非是進行長時間不間斷的運算或編寫強調並行的服務器,不然我們一般編寫的腳本吃不完一個核的運算能力。
4.4 改進辦法是什么
但不管怎么說GIL使得多核計算機的優勢不能得到充分發揮是個實實在在的問題,你不能一句問題不大就一筆帶過了,萬一我就是要編寫計算密集的程序、就是要並行你到底行還是不行呢。
答案是想讓Python多線程實現並行現在看來是不太行的,但我們可以使用多進程。前邊我們說GIL限制時一直在強調“同一個解析器”,只要我們使用不在同一解析器的進程,那就能實現利用多個核、實現並行的效果。
4.5 多進程的劣勢是什么
那么多人在說GIL問題,這很難讓人想信多進程就能完美繞過GIL帶來的限制,所以,說吧,多進程的劣勢是什么。
多進程確實存在以下兩個比較大的問題:
第一個是讓系統難受的,即多進程相對多線程會消耗更多的系統資源,當然這在所有語言中都存在。
第二個是讓開發者難受的,即不同解析器或者說不同Python進程間的通信是很麻煩的事情。
多進程間通信首先推薦隊列。隊列是最簡單的,而且其內部(multiprocessing.Manager().Queue())已經實現好了跨進進程間的鎖,訪問時不需要加鎖。
其次推薦multiprocessing.connection。multiprocessing.connection基於socket實現,但其與直接的socket的區別是,你通過put()推入的什么object通過get()獲取到的就是什么object,不需要他不需要像tcp那樣自己定界。
共享變量,則可參見“Python3多進程共享變量實現方法”。
4.6 多線程/多進程對目標函數的要求
多線程target的函數或線程池submit到的函數,不能是匿名函數,可以是一級函數、當前類的成員函數、其他類的實例的成員函數;add_done_callback只能是一級函數。
多進程target的函數或進程池submit到的函數,不能是匿名函數、當前類的成員函數,可以是一級函數、其他類的實例的成員函數;add_done_callback只能是一級函數。(一些書說多進程target的函數或進程池submit到的函數只能是一級函數,但實測發現其他類的實例的成員函數是可以的。)
另外要注意,多進程給目標進程函數提交的參數,不能是線程鎖、或者logger、queue.Queue()等自帶線程鎖的變量,不然會導致pickle無法序列化錯誤(TypeError: can't pickle _thread.lock objects)。
對於目標函數是其他類的實例的成員函數的情況,還要要求該類成員變量也不能是線程鎖、或者logger、queue.Queue()等自帶線程鎖的變量,不然也會報一樣的錯誤。
對於這個問題,變通的處理方法是使用進程級對應的類代替,如multiprocessing.Manager().Lock()、multiprocessing.Manager().Queue(),但logger寫同一個文件那似乎就沒有能保證進程安全的寫法了。
參考: