學會使用Python的threading模塊、掌握並發編程基礎


threading模塊

Python中提供了threading模塊來實現線程並發編程,官方文檔如下:

官方文檔

添加子線程

實例化Thread類

使用該方式新增子線程任務是比較常見的,也是推薦使用的。

簡單的代碼示例如下,創建3個子線程並向其添加任務,然后運行並打印它們的線程ID和線程名字:

import threading
import time


def task(params):
    print("sub thread run")
    currentThread = threading.current_thread()
    time.sleep(3)
    print("current subthread id : %s\ncurrent subthread name : %s\ncurrent subthread params : %s" % (
        currentThread.ident, currentThread.name, params))


if __name__ == "__main__":
    print("main thread run")
    for item in range(3):
        subThreadIns = threading.Thread(target=task, args=(item, ))
        subThreadIns.start()
    print("main thread run end")

# main thread run
# sub thread run
# sub thread run
# sub thread run
# main thread run end
# current subthread id : 123145534398464
# current subthread name : Thread-1
# current subthread params : 0
# current subthread id : 123145544908800
# current subthread name : Thread-3
# current subthread params : 2
# current subthread id : 123145539653632
# current subthread name : Thread-2
# current subthread params : 1

❶:返回一個線程對象,注意args的參數必須是一個tuple,否則拋出異常,也就是說單實參必須添加逗號

❷:start()方法是指該線程對象能夠被系統調度了,但不是立即運行該線程,而是等待系統調度后才運行。所以你會看見上面子線程的運行順序是0、2、1,另外一個線程對象只能運行一次該方法,若多次運行則拋出RunTimeError的異常。

❸:獲取當前的線程對象

❹:獲取當前線程對象的編號和名字,以及傳入的參數。當線程啟動時,系統都會分配給它一個隨機的編號和名字

首先上述代碼會先運行主線程,然后會創建3個子線程並運行。

當子線程運行的時候碰到了sleep(3)這種I/O操作時會釋放掉GIL鎖,並將線程執行權交還給了主線程。

然后主線程就運行完畢了,此時主線程並不會被kill掉,而是等待子線程運行結束后才會被kill掉,而子線程則是運行完畢后會被立刻kill掉。

我們可以看見,上面3個任務如果按照串行執行共會花費9.+秒時間,而通過多線程來運行,則僅需花費3.+秒的時間,極大的提升了任務處理效率。

自定義類覆寫run()方法

上面的子線程任務對象是一個全局函數,我們也可以將它作為方法來進行調用。

書寫一個類並繼承Threading類,覆寫run()方法即可:

import threading
import time


class TaskClass(threading.Thread):  # ❶
    def __init__(self, params):
        self.params = params  # ❷
        super(__class__, self).__init__()

    def run(self):
        print("sub thread run")
        currentThread = threading.currentThread()
        time.sleep(3)
        print("current subthread id : %s\ncurrent subthread name : %s\ncurrent subthread params : %s" % (
            currentThread.ident, currentThread.name, self.params))


if __name__ == "__main__":
    print("main thread run")
    for item in range(3):
        subThreadIns = TaskClass(item)
        subThreadIns.start()
    print("main thread run end")

# main thread run
# sub thread run
# sub thread run
# sub thread run
# main thread run end
# current subthread id : 123145495068672
# current subthread name : Thread-1
# current subthread params : 0
# current subthread id : 123145500323840
# current subthread name : Thread-2
# current subthread params : 1
# current subthread id : 123145505579008
# current subthread name : Thread-3
# current subthread params : 2

❶:必須繼承Threading類並調用父類的__init__()方法

❷:傳入的參數

源碼淺析

為什么添加子線程有2種截然不同的方式呢?它們之間有什么區別?這些都可以從源碼中找到答案。

我們從Thread類的實例看起,首先是__init__()方法(threading.py line 738 - 800),它主要做了一些初始化的准備工作:

class Thread:

    _initialized = False

    _exc_info = _sys.exc_info


    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
    
        # 如果group不是None,就會拋出斷言異常
        assert group is None, "group argument must be None for now"
        
        # 如果kwargs是None,則構造一個空字典
        if kwargs is None:
            kwargs = {}
            
        # 傳入的執行任務的函數或者None
        self._target = target
        
        # 線程名字
        self._name = str(name or _newname())
        
        # 任務函數傳入的元組參數
        self._args = args
        # 任務函數傳入的關鍵字參數
        self._kwargs = kwargs
        # 是否是守護線程啟動,如果不是None則以守護線程模式啟動
        if daemon is not None:
            self._daemonic = daemon
        # 如果是None,則繼承當前線程的守護模式
        else:
            self._daemonic = current_thread().daemon
        # 線程編號
        self._ident = None
        # 鎖定狀態,None
        self._tstate_lock = None
        # 一把Event事件鎖
        self._started = Event()
        # 是否停止運行的標志位
        self._is_stopped = False
        # 初始化狀態改為True
        self._initialized = True
 
        self._stderr = _sys.stderr

        _dangling.add(self)

參數釋義:

  • group:應該為None,為了日后擴展ThreadGroup類而保留的
  • target:傳入一個可調用對象,即線程任務task,默認為None,即可以不進行傳入
  • name:線程啟動時將不再由系統分配線程名稱,而是自定義,默認情況下,系統分配的線程名稱會由 "Thread-N" 的格式構成一個唯一的名稱,其中 N 是小的十進制數
  • args:用於調用目標函數的參數元組,默認是()空元組,你必須傳入一個元組
  • kwargs:用於調用目標函數的關鍵字參數字典,默認是None,你必須傳入一個字典
  • daemon:命名關鍵字參數,應當傳入一個布爾值,默認為None,它會指定該線程是否是以守護線程模式啟動,如果為None,該線程將繼承當前線程的守護模式屬性

接下來看start()方法,它是告知系統當前線程完成調度,可隨時啟用的方法(threading.py line 828 - 851):

    def start(self):

        # 如果初始狀態不為True,則拋出異常
        if not self._initialized:
            raise RuntimeError("thread.__init__() not called")

        # 判斷當前線程是否被鎖住,如果被鎖住則拋出異常
        if self._started.is_set():
            raise RuntimeError("threads can only be started once")
        with _active_limbo_lock:
            _limbo[self] = self
        try:
            # 執行引導
            _start_new_thread(self._bootstrap, ())
        except Exception:
            with _active_limbo_lock:
                del _limbo[self]
            raise
    
        self._started.wait()

這里關鍵是看self._bootstrap()方法,該該方法位於(threading.py line 870 - 888),看看它會做什么事情:

    def _bootstrap(self):
        try:
            self._bootstrap_inner()
        except:
            if self._daemonic and _sys is None:
                return
            raise

繼續找self._bootstrap_inner()方法,該該方法位於(threading.py line 901 - 964)。

在該方法的916行時,它會執行run()方法:

    def _bootstrap_inner(self):
            ...
            try:
                # 執行run
                self.run()
            except SystemExit:
                pass
            except:
              ...

如果此時你按照第二種添加子線程的方式,則直接會運行被子類TaskClass覆寫的run()方法。

如果是第一種添加子線程的方式,則還需要往里面看(threading.py line 835 - 868):

 def run(self):

        try:
            # self._target = 我們自己傳遞的可調用對象task
            if self._target:
                self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs

至此可以發現,不管是使用哪一種方式添加子線程,都會運行5個方法。

所以說它們內部實現其實都是一樣的,沒什么特別的,也不要覺得它特別神奇。

threading模塊方法大全

以下是threading模塊提供的類或方法:

類方法 描述 返回值
threading.Thread(target, args, kwargs) 創建並返回一個線程對象 threadObject
threading.Timer(interval, function, args, kwargs) 創建並返回一個延遲啟動的線程對象 threadObject
threading.active_count() 獲取當前進程下存活的線程數量 int
threading.enumerate() 查看當前進程存活了的所有線程對象,以列表形式返回 [threadObject, ...]
threading.main_thread() 獲取主線程對象 threadObject
threading.current_thread() 獲取當前正在執行的線程對象 threadObject
threading.currentThread() 獲取當前正在執行的線程對象 threadObject
threading.get_ident() 獲取當前正在執行的線程對象的編號 int

下面我將使用該代碼對上述功能進行演示:

import threading
import time

class TaskClass(threading.Thread):

    def run(self):
        time.sleep(3)
        pass

if __name__ == "__main__":
    for i in range(3):
        subThreadIns = TaskClass()
        subThreadIns.start()

1)獲取當前進程下存活的線程數量:

print(threading.active_count())

# 4

2)查看當前進程存活了的所有線程對象,以列表形式返回:

print(threading.enumerate())

# [<_MainThread(MainThread, started 4425459136)>, <TaskClass(Thread-1, started 123145449238528)>, <TaskClass(Thread-2, started 123145454493696)>, <TaskClass(Thread-3, started 123145459748864)>]

3)獲取主線程對象:

print(threading.main_thread())

# <_MainThread(MainThread, started 4565407168)>

4)獲取當前正在執行的線程對象:

print(threading.currentThread())

# <_MainThread(MainThread, started 4383299008)>

5)獲取當前正在執行的線程對象的編號:

print(threading.get_ident())

# 4380034496

threadObject方法大全

以下是針對線程對象提供的屬性或者方法:

方法/屬性 描述 返回值
threadObject.start() 通知系統該線程調度完畢,可以隨時進行啟動,一個線程對象只能運行一次該方法,若多次運行則拋出RunTimeError異常 ...
threadObject.join(timeout=None) 主線程默認會等待子線程運行結束后再繼續執行,timeou為等待的秒數,如不設置該參數則一直等待。 ...
threadObject.getName() 獲取線程對象的名字 str
threadObject.setName(name) 設置線程對象的名字 None
threadObject.is_alive() 查看線程對象是否存活 bool
threadObject.isAlive() 查看線程對象是否存活,不推薦使用 bool
threadObject.isDaemon() 查看線程對象是守護線程 bool
threadObject.setDaemon() 設置線程對象為守護線程,主線程運行完畢之后設置為守護線程的子線程便立即結束執行 None
threadObject.ident 獲取線程對象的編號 int
threadObject.name 獲取或者設置線程對象的名字 str or None
daemon 查看線程對象是守護線程 bool

主線程阻塞

默認情況下,當子線程啟動后,主線程會依舊往下運行而不是等待所有的子線程運行完畢后再繼續往下運行。

如圖所示,主線程在運行結束后並不會被理解kill掉,而是所有的子線程運行完畢后主線程才會被kill掉:

image-20210701172414613

我們可以利用threadObject.join(timeout=None)來讓主線程等待子線程運行完畢后再繼續向下運行,timeout為等待的秒數,如不設置該參數則一直等待。

如圖所示,這是沒有設置timeout的示意圖,主線程必須等待所有子線程運行完畢后再接着運行:

image-20210701172435152

代碼示例:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    print("main thread start run")
    threadLst = []
    
    for i in range(3):
        threadLst.append(TaskClass())
    for ins in threadLst:
        ins.start()  # 開始運行所有子線程
    for ins in threadLst:
        ins.join()   # 讓主線程等待所有子線程運行完畢后再接着運行,注意,設置主線程等待的子線程必須處於活躍狀態
        
    print("main thread carry on run")
    print("main thread run end")
    
# main thread start run
# Thread-1 start run
# Thread-2 start run
# Thread-3 start run
# Thread-1 run end
# Thread-2 run end
# Thread-3 run end
# main thread carry on run
# main thread run end

守護線程

守護線程是指當主線程運行完畢后,子線程是否還要繼續運行。

默認threadObject.setDaemon()為None,也就是False,即當前主線程運行完畢后,子線程依舊可以接着運行。

image-20210701174232233

如果threadObject.setDaemon()為True,則當前主線程運行完畢后,子線程即使沒有運行完畢也會結束運行。

image-20210701174053747

代碼示例:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    print("main thread start run")
    threadLst = []
    for i in range(3):
        threadLst.append(TaskClass())
    for ins in threadLst:
    
        # 注意,守護線程的設置必須在線程未啟動時設置
        ins.setDaemon(True)
        ins.start()

    print("main thread carry on run")
    print("main thread run end")

# main thread start run
# Thread-1 start run
# Thread-2 start run
# Thread-3 start run
# main thread carry on run
# main thread run end

join()與setDaemon(True)共存

如果同時設置setDaemon(True)與join()方法會怎么樣呢?有兩種情況:

  1. join()方法沒有設置timeout(沒有設置即表示死等)或者timeout的時間比子線程作業時間要長,這代表子線程會死在主線程之前,setDaemon(True)也就沒有了意義,即失效了
  2. join()設置了timeout並且timeout的時間比子線程作業時間要短,這代表主線程會死在子線程之前,setDaemon(True)生效,子線程會跟着主線程一起死亡。

情況一:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    subThread = TaskClass()
    subThread.setDaemon(True) # 主線程運行完后會立即終止子線程的運行。但是由於有join(),故不生效。
    subThread.start()
    subThread.join() # 主線程必須等待子線程運行結束后再接着運行
    print("main thread run end")

# Thread-1 start run
# Thread-1 run end
# main thread run end

情況2:

import threading
import time


class TaskClass(threading.Thread):

    def run(self):
        thName = threading.current_thread().name
        print("%s start run" % thName)
        time.sleep(3)
        print("%s run end" % thName)


if __name__ == "__main__":
    subThread = TaskClass()
    subThread.setDaemon(True) # 主線程運行完后會立即終止子線程的運行。但是由於有join(),故不生效。
    subThread.start()
    subThread.join(1) # 主線程必須等待子線程運行結束后再接着運行,只等待1s
    print("main thread run end")

# Thread-1 start run
# main thread run end

線程延遲啟動

使用threading模塊中提供的Timer類,可讓子線程延遲啟動,如下所示:

import threading
import time


def task():
    print("sub thread start run")
    time.sleep(3)
    print("sub thread run end")


if __name__ == "__main__":
    print("main thread run")
    t1 = threading.Timer(interval=3, function=task)
    t1.start()  # 3秒后才啟動子線程
    t1.join()
    print("main thread run end")

# main thread run
# sub thread start run
# sub thread run end
# main thread run end

如果要用類的形式,則可以繼承threading.Timer()類,並修改self.function屬性,個人極度不推薦這樣做。

如下所示,在不知道某一個方法怎么使用時扒扒源碼看一看,翻翻官方文檔就大概能了解:

import threading
import time


class TaskClass(threading.Timer):
    def __init__(self, *args, **kwargs):
        # 必須要修改function為你想執行的方法
        super(__class__, self).__init__(*args, **kwargs)
        self.function = self.task

    def task(self, x, y):
        print("sub thread start run")
        time.sleep(3)
        print("parmas %s %s" % (x, y))
        print("sub thread run end")


if __name__ == "__main__":
    # 必須傳入一個None
    t1 = TaskClass(interval=3, function=None, args=(1, 2))
    t1.start()
    t1.join()
    print("main thread run end")

# sub thread start run
# parmas 1 2
# sub thread run end
# main thread run end

多線程編程應用場景

由於GIL鎖的存在,Python中對於I/O操作來說可以使用多線程編程,如果是計算密集型的操作則不應該使用多線程進行處理,因為沒有I/O操作就不能通過I/O切換來執行其他線程,故對於計算密集型的操作來說多線程沒有什么優勢,甚至還可能比普通串行還慢(因為涉及到線程切換,雖然是毫秒級別,但是計算的數值越大這個切換也就越密集,GIL鎖是100個CPU指令切換一次的)

注意:我們是在Python2版本下進行此次測試,Python3版本確實相差不大,但是,從本質上來說依然是這樣的。

計算密集型程序的普通串行運行時間:

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000): # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    add()
    sub()

    end_time = time.time()
    print("執行時間:",end_time - start_time)
    
    
# ==== 執行結果 ==== 三次采集

"""
大約在 1.3 - 1.4 秒
"""

計算密集型程序的多線程並發運行時間:

# coding:utf-8

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000):  # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    t1 = threading.Thread(target=add,)
    t2 = threading.Thread(target=sub,)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    end_time = time.time()
    print(u"執行時間:",end_time - start_time)

# ==== 執行結果 ==== 三次采集

"""
大約 4 - 5 秒
"""


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM