threading多線程模塊


基本使用

  Python中提供了threading模塊用來實現線程並發編程,使用方法有兩種,一種是將threading模塊下的Therad類進行實例化的方式實現,一種是通過繼承threading模塊下的Therad類並覆寫run()方法實現。

  官方中文文檔

實例化Therad類創建子線程

  這種方式是最常用的也是推薦使用的方式。先來介紹一個Therad類中的方法,然后再看代碼。

  start():開始線程活動。

  它在一個線程里最多只能被調用一次。它安排對象的 run() 方法在一個獨立的控制進程中調用。如果同一個線程對象中調用這個方法的次數大於一次,會拋出 RuntimeError

  PS:該方法不會立即執行,只是告訴CPU說你可以調度我了,我准備好了,一定要注意不是立即執行!

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)  # <-- 這里睡眠了三秒,可以看見主線程繼續往下走了
    print("子線程任務處理完畢")


if __name__ == '__main__':
    
    # ==== 實例化出Thread類並添加子線程任務以及參數 ====

    t1 = threading.Thread(target=task, args=("線程[1]",))  # <-- 參數必須添加逗號。因為是args所以會打散,如果不加逗號則不能進行打散會拋出異常
    t1.start()  # 等待CPU調度..請注意這里不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

  我們可以看見,在進行time.sleep()的時候線程做了一次切換,這是因為該方法是屬於IO操作,所以GIL鎖將執行權限丟給了主線程。還有一點要注意的就是主線程任務處理完畢后不會立馬結束掉,而是等子線程任務處理完畢后才會真正將主線程連同子線程一起kill掉。

  圖示:

image-20200701030622391

自定義類繼承Therad並覆寫run方法

  這種方法並不常見,但是還是要舉例說出來。我們可以看到第一種方法是實例化出了Therad類,並且執行了其start()方法,然后子線程就可以被調度了,其實在內部是通過start()方法調用了Therad類下的run()方法的。

  run():代表線程活動的方法。

  你可以在子類型里重載這個方法。 標准的 run() 方法會對作為 target 參數傳遞給該對象構造器的可調用對象(如果存在)發起調用,並附帶從 argskwargs 參數分別獲取的位置和關鍵字參數。

  那么我們就可以自定義一個類並繼承Therad類,再覆寫run()方法

import threading
import time

print("主線程任務開始處理")


class Threading(threading.Thread):
    """自定義類"""


    def __init__(self, th_name):
        self.th_name = th_name
        super(Threading, self).__init__()

    def run(self):
        print("子線程任務開始處理,參數:{0}".format(self.th_name))
        time.sleep(3)  # <-- 這里睡眠了三秒,可以看見主線程繼續往下走了
        print("子線程任務處理完畢")


if __name__ == '__main__':
    
    t1 = Threading("線程[1]")
    t1.start()  # 等待CPU調度..請注意這里不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

  注意現在依然是主線程任務處理完畢后現在是不會立馬結束掉的,而是等子線程任務處理完畢后才會真正將主線程kill掉。其實原則上這兩種創建線程的方式都一模一樣。

源碼淺析-選讀

  這個源碼淺析非常淺,主要是來看一下基於實例化Therad類創建子線程內部是如何做的。

  那么我們看一下其Thread類的源碼,:

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)

  調用這個構造函數時,必需帶有關鍵字參數。參數如下:

  group 應該為 None;為了日后擴展 ThreadGroup 類實現而保留。

  target 是用於 run() 方法調用的可調用對象。默認是 None,表示不需要調用任何方法。

  name 是線程名稱。默認情況下,由 "Thread-N" 格式構成一個唯一的名稱,其中 N 是小的十進制數。

  args 是用於調用目標函數的參數元組。默認是 ()

  kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}

  如果不是 Nonedaemon 參數將顯式地設置該線程是否為守護模式。 如果是 None (默認值),線程將繼承當前線程的守護模式屬性。

  如果子類型重載了構造函數,它一定要確保在做任何事前,先發起調用基類構造器(Thread.__init__())。

class Thread:

    """注釋被我刪掉了"""
    
    _initialized = False  # 這是一個狀態位,來表示該線程是否被被初始化過
def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
                 
        """注釋被我刪掉了"""             
      
        assert group is None, "group argument must be None for now" #如果不是 None,daemon參數將顯式地設置該線程是否為守護模式。 如果是 None (默認值),線程將繼承當前線程的守護模式屬性。
        if kwargs is None:
            kwargs = {}  # kwargs 是用於調用目標函數的關鍵字參數字典。默認是 {}。
        self._target = target  # 對於第一種調用方式來說,它就是我們的task函數。
        self._name = str(name or _newname())  # 線程名
        self._args = args  # _args是用於調用目標函數的參數元組。默認是 ()。
        self._kwargs = kwargs 
        if daemon is not None: # 判斷其是否為守護線程
            self._daemonic = daemon
        else:
            self._daemonic = current_thread().daemon
        self._ident = None # 這個是線程的編號
        if _HAVE_THREAD_NATIVE_ID:  # 判斷是否具有本地ID
            self._native_id = None
        self._tstate_lock = None  # 鎖定的狀態
        self._started = Event() # 開始
        self._is_stopped = False # 狀態位,是否停止
        self._initialized = True  # 將初始化狀態為改為True
        # Copy of sys.stderr used by self._invoke_excepthook()
        self._stderr = _sys.stderr
        self._invoke_excepthook = _make_invoke_excepthook()
        # For debugging and _after_fork()
        _dangling.add(self)
Thread類的__init__方法

  我們可以看見其__init__方法大多都是做了一些初始化的東西。下面我們來看run()方法,它才是離我們最近的一個方法。

def run(self):

    """注釋被我刪掉了"""
    
    try:
        if self._target:  # 簡單吧,這個方法,就是判斷你有沒有傳入一個函數。即我們定義的task
            self._target(*self._args, **self._kwargs)  # 有的話就立即執行,我們傳入的name其實就放在了_args中。這里將它打散出來了,所以我們的task函數中的第一個參數name能收到。
    finally:
        # Avoid a refcycle if the thread is running a function with
        # an argument that has a member that points to the thread.
        del self._target, self._args, self._kwargs  # 不管處不出錯,都會清理他們。當然,如果有則是執行完成后清理
TCPServer中的run()方法

  好了,其實看到這里就行了。其實我們自定義類的傳參也可以不用覆寫__init__再去調用父類方法初始化進行傳參,我們完全以另一種方式,但是我個人不太推薦。

import threading
import time

print("主線程任務開始運行")

class Threading(threading.Thread):
    """自定義類"""

    def run(self):
        print(self._args)  # ('線程[1]',)
        print(self._kwargs)  # {}

        print("子線程任務開始處理,參數:{0}".format(self._args[0]))
        time.sleep(3)  # <-- 這里睡眠了三秒,可以看見主線程繼續往下走了
        print("子線程任務運行完畢")



if __name__ == '__main__':
    
    t1 = Threading(args=("線程[1]",))
    t1.start()  # 等待CPU調度..請注意這里不是立即執行
    
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
('線程[1]',)
主線程任務處理完畢
{}
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
"""
自定義類繼承Therad並覆寫run方法的其他方式參數傳入

threading模塊方法大全

thrading模塊方法大全
方法/屬性名稱 功能描述
threading.active_count() 查看當前進程下一共存活了多少個線程的數量,返回的是一個int值。
threading.current_thread() 獲取當前線程對象。
threading.currentThread() 同上
threading.excepthook(args, /) 處理由 Thread.run() 引發的未捕獲異常。
threading.get_ident() 返回當前線程對象的編號。
threading.get_native_id() 返回當前線程對象的編號。和threading.get_ident()相同。
threading.enumerate() 查看當前進程存活了的所有線程對象,以列表形式返回。
threading.main_thread() 返回主線程對象。
threading.settrace(func) 不太清楚..好像是測試用的。
threading.stack_size([size]) 返回創建線程時使用的堆棧大小。
threading.TIMEOUT_MAX 規定一個全局的所有阻塞函數的最大時間。

線程對象方法大全

線程對象方法大全(即Thread類的實例對象)  
方法/屬性名稱 功能描述
start() 啟動線程,該方法不會立即執行,而是告訴CPU自己准備好了,可以隨時調度,而非立即啟動。
run() 一般是自定義類繼承Thread並覆寫的方法,即線程的詳細任務邏輯。
join(timeout=None) 主線程默認會等待子線程運行結束后再繼續執行,timeout為等待的秒數,如不設置該參數則一直等待。
name 可以通過 = 給該線程設置一個通俗的名字。如直接使用該屬性則返回該線程的默認名字。
getName() 獲取該線程的名字。
setName() 設置該線程的名字。
ident 獲取線程的編號。
native_id 獲取線程的編號,和ident相同。
is_alive() 查看線程是否存活,返回布爾值。
isAlive( ) 同上,但是不推薦使用這種方法。
daemon 查看線程是否為一個守護線程,返回布爾值。默認為False
isDaemon() 查看線程是否為一個守護線程,返回布爾值。默認為False
setDaemon()

設置一個線程為守護線程,參數如果為True則表示該線程被設置為守護線程,默認為False

當主線程運行完畢之后設置為守護線程的子線程便立即結束執行...

常用方法示例

  由於方法太多了,所以這里就只例舉一些非常常用的。

守護線程setDaemon()

  setDaemon() :設置一個線程為守護線程,參數如果為True則表示該線程被設置為守護線程,默認為False。當主線程運行完畢之后設置為守護線程的子線程便立即結束執行...

  我們對比上面的圖,現在子線程是沒有設置為守護線程的:

image-20200701030701850

  當他設置為守護線程之后會是這樣的:

image-20200701031136387

  代碼如下:

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")


if __name__ == '__main__':
    
    t1 = threading.Thread(target=task, args=("線程[1]",))
    
    t1.setDaemon(True)  # <-- 設置線程對象t1為守護線程,注意這一步一定要放在start之前。
    
    t1.start()  # 等待CPU調度..請注意這里不是立即執行
    
    print("主線程任務處理完畢")

# ==== 執行結果 ====
    
"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
"""

線程阻塞join()

  join(timeout=None):主線程默認會等待子線程運行結束后再繼續執行,timeout為等待的秒數,如不設置該參數則一直等待。

  圖示如下:(未設置超時時間)

image-20200701032331066

  代碼如下:

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.start()  #  等待CPU調度..請注意這里不是立即執行

    t1.join()  # <--- 放在start()下面,死等

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
主線程任務處理完畢
"""

  圖示如下:(設置超時時間)

image-20200701032942587

  代碼如下:

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.start()  #  等待CPU調度..請注意這里不是立即執行

    t1.join(2)  # <--- 放在start()下面,等2秒后主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
子線程任務處理完畢
"""

注意,join()方法可以多次設置!

join()與setDaemon(True)共存

  如果同時設置setDaemon(True)join()方法會怎么樣呢?有兩種情況:

    1.join()方法沒有設置timeout(沒有設置即表示死等)或者timeout的時間比子線程作業時間要長,這代表子線程會死在主線程之前,setDaemon(True)也就沒有了意義,即失效了。

    2.join()設置了timeout並且timeout的時間比子線程作業時間要短,這代表主線程會死在子線程之前,setDaemon(True)生效,子線程會跟着主線程一起死亡。

# ==== 情況一 ====

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.setDaemon(True)  # <--- 放在start()上面,主線程運行完后會立即終止子線程的運行。但是由於有join(),故不生效。

    t1.start()  #  等待CPU調度..請注意這里不是立即執行

    t1.join()  # <--- 放在start()下面,等2秒后主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
子線程任務處理完畢
主線程任務處理完畢
"""
情況一
# ==== 情況二 ====

import threading
import time

print("主線程任務開始處理")

def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    time.sleep(3)
    print("子線程任務處理完畢")

if __name__ == '__main__':

    t1 = threading.Thread(target=task,args=("線程[1]",))

    t1.setDaemon(True)  # <--- 放在start()上面,主線程運行完后會立即終止子線程的運行。但是由於有join(),故不生效。

    t1.start()  #  等待CPU調度..請注意這里不是立即執行

    t1.join(2)  # <--- 放在start()下面,等2秒后主線程繼續執行

    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
主線程任務處理完畢
"""
情況二

設置與獲取線程名

  我們來看一下如何設置與獲取線程名。

  threading.current_thread() :獲取當前線程對象。

  getName() :獲取該線程的名字。

  setName() :設置該線程的名字。

  name :可以通過 = 給該線程設置一個通俗的名字。如直接使用該屬性則返回該線程的默認名字。

import threading
import time

print("主線程任務開始處理")


def task(th_name):
    print("子線程任務開始處理,參數:{0}".format(th_name))
    obj  =  threading.current_thread()  # 獲取當前線程對象
    print("獲取當前的線程名:{0}".format(obj.getName()))
    print("開始設置線程名")
    obj.setName("yyy")
    print("獲取修改后的線程名:{0}".format(obj.getName()))
    time.sleep(3)  # <-- 這里睡眠了三秒,可以看見主線程繼續往下走了
    print("子線程任務處理完畢")


if __name__ == '__main__':
    # ==== 第一步:實例化出Thread類並添加子線程任務以及參數 ====
    t1 = threading.Thread(target=task, args=("線程[1]",),name="xxx")  # 可以在這里設置,如果不設置則為默認格式:Thread-1 數字是按照線程個數來定的
    t1.start()  # 等待CPU調度..請注意這里不是立即執行

    print("主線程名:",threading.current_thread().name)  # 直接使用屬性 name
    print("主線程任務處理完畢")

# ==== 執行結果 ====

"""
主線程任務開始處理
子線程任務開始處理,參數:線程[1]
獲取當前的線程名:xxx
開始設置線程名
獲取修改后的線程名:yyy
主線程名: MainThread
主線程任務處理完畢
子線程任務處理完畢
"""

多線程的應用場景

  由於GIL鎖的存在,Python中對於I/O操作來說可以使用多線程編程,如果是計算密集型的操作則不應該使用多線程進行處理,因為沒有I/O操作就不能通過I/O切換來執行其他線程,故對於計算密集型的操作來說多線程沒有什么優勢。甚至還可能比普通串行還慢(因為涉及到線程切換,雖然是毫秒級別,但是計算的數值越大這個切換也就越密集,GIL鎖是100個CPU指令切換一次的)

  注意:我們是在Python2版本下進行此次測試,Python3版本確實相差不大,但是,從本質上來說依然是這樣的。

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000): # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    add()
    sub()

    end_time = time.time()
    print("執行時間:",end_time - start_time)
    
# ==== 執行結果 ==== 三次采集

"""
大約在 1.3 - 1.4 秒
"""
計算密集型程序的普通串行運行時間
# coding:utf-8

import threading
import time

num = 0
def add():
    global num
    for i in range(10000000):  # 一千萬次
        num += 1

def sub():
    global num
    for i in range(10000000):  # 一千萬次
        num -= 1

if __name__ == '__main__':
    start_time = time.time()

    t1 = threading.Thread(target=add,)
    t2 = threading.Thread(target=sub,)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    end_time = time.time()
    print(u"執行時間:",end_time - start_time)

# ==== 執行結果 ==== 三次采集

"""
大約 4 - 5 秒
"""
計算密集型程序的多線程並發運行時間

補充:Timer線程延遲啟動

import threading

def task():
    print(threading.current_thread().getName(),end="")
    print("啟動了...")

if __name__ == '__main__':
    t1 = threading.Timer(3,task)  
    t2 = threading.Timer(3,task)
    t1.start() # 線程任務三秒后准備就緒,可以被CPU調度執行。
    t2.start()
    
# ==== 執行結果 ====

"""
Thread-1啟動了...
Thread-2啟動了...
"""
Timer線程延遲啟動


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM