多任務處理方式之二:多線程


線程的理解

  • 1、操作系統能夠進行運算調度的最小單位,即程序執行的最小單位

  • 2、進程負責程序所必須的資源分配(文本區域、數據區域、堆棧區域)一個進程中也經常需要同時做多件事,即要同時運行多個‘子任務’,這些子任務即線程

    線程是每一個進程中的單一順序控制流 ,其包含在進程中,是進程的實際運作單位(進程是線程的容器)

    ⼀個程序中⾄少要有⼀個進程,⼀個進程中⾄少要有⼀個線程

    線程不能夠獨⽴執⾏,必須依存在進程中

  • 3、線程基本不占用系統資源,其只擁有在運行過程中必不可少的資源(如程序計數器、一組寄存器和棧)

  • 4、同一個進程中的所有線程都共享此進程所擁有的全部資源

    一個進程中的線程共享相同的內存單元/內存地址空間——>可以訪問相同的變量和對象,而且它們從同一堆中分配對象——>通信、數據交換、同步操作

  • 5、線程之間的通信主要通過共享所屬進程的資源

    線程間的通信是在同一地址空間上進行的,所以不需要額外的通信機制,這就使得通信更簡便而且信息傳遞的速度也更快

  • 6、線程的上下文切換很快,資源開銷較少,但是相對於進程而言,不夠安全,在多個線程共同操作進程的某一資源時,可能會丟失數據

  • 7、線程和進程之間的區別


線程的五種狀態

  1. 新狀態:線程對象已經創建,還未調用 start() 方法。
  1. 可運行狀態:當線程有資格運行,但調度程序還沒有把它選定為運行線程時線程所處的狀態。當 start()方法調用時,線程首先進入可運行狀態。在線程運行之后或者從阻塞、等待或睡眠狀態回來后,也返回到可運行狀態。
  1. 運行狀態:線程調度程序從可運行池中選擇一個線程作為當前線程時線程所處的狀態。這也是線程進入運行狀態的唯一方式。
  1. 等待/阻塞/睡眠狀態:這是線程有資格運行時它所處的狀態。實際上這個三狀態組合為一種,其共同點是:線程仍舊是活的(可運行的),但是當前沒有條件運行。但是如果某件事件出現,他可能返回到可運行狀態。
  1. 死亡態:當線程的run()方法完成時就認為它死去。這個線程對象也許是活的,但是,它已經不是一個單獨執行的線程。線程一旦死亡,就不能復生。如果在一個死去的線程上調用 start()方法,會拋出 java.lang.IllegalThreadStateException 異常。

GIL全局解釋器鎖

Python中的多線程可以並發,但不能並行(同一個進程下的多個線程不能被多個cpu同時執行),緣由就是GIL全局解釋器鎖,導致同一時間內只有一個線程在執行

python 文件的執行流程為:

  • 操作系統先將python解釋器和需要執行的py文件由硬盤加載到內存中,開辟一個進程空間
  • 此進程即使得python解釋器首先將py文件中的代碼指令通過編譯器編譯成字節碼
  • 編譯完成的c的字節碼通過虛擬機轉換為機器碼由cpu執行

這個執行流程即是py文件執行進程中的主線程

若Python中的多線程並行,則每個線程都要執行上述過程,從而同一時間需要多個CPU同時執行轉換而來的機器碼,極大限度的提高執行效率。但眾所周知,Python是由荷蘭人吉多·范羅蘇姆 (Guido van Rossum)於1989年聖誕節期間開發的一個新的腳本解釋程序,而雙核cpu是2005年才被普遍應用的,即在當時的條件下,Cpython執行多線程時應用不了多核。故為了避免多個線程並發執行而造成數據的不完整以及線程的不安全,龜叔在python的解釋器中加上了互斥鎖——全局解釋器鎖(GIL鎖),即使得Cpython在所有線程進入解釋器之前加了一個全局解釋器鎖,當執行完當前py文件后才釋放該鎖,這便導致了python中同一時間內只有一個線程在執行

注:若想使得多線程並行,可以用多進程間接實現線程的並行,或者更換解釋器為Pypy、Jpython


線程創建

使用python中的threading模塊中的Thread類創建線程

from threading import Thread

threading模塊提供的Thread類來創建線程對象
from threading import Thread 
import os


def func(num):
    print('當前線程{},所歸屬的進程id號{}'.format(os.getpid(), num))


for i in range(10):
    # 異步創建10個子線程
    t = Thread(target=func, args=(i,))
    t.start()

# 主線程執行任務
print(os.getpid())
自定義類繼承Thread類,每次實例化這個類的時候,就等同於實例化線程對象

這種方法付只需要重寫 threading.Thread 類的 run 方法,然后調用 start() 開啟線程就可以了

from threading import Thread
import time 


class MyThread(Thread):
    def __init__(self, name):
        # 手動調用父類的構造方法
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print("當前線程正在執行runing ... ", self.name)


if __name__ == "__main__":
    t = MyThread("機器今天會再次爆炸么?")
    t.start()
    print("主線程執行結束 ... ")

Thread 類中的基本方法

  • t.is_alive() 檢測線程是否仍然存在

  • t.setName() 設置線程名字

  • t.getName() 獲取線程名字

from threading import Thread
import time


def func():
    time.sleep(1)


if __name__ == "__main__":
    t = Thread(target=func)

    t.start()
    print(t , type(t))
    
    print(t.is_alive())  # False
    
    print(t.getName())
    
    t.setName("xboyww")
    print(t.getName())

  • currentThread().ident 查看線程id號
  • enumerate() 返回目前正在運行的線程列表
  • activeCount() 返回目前正在運行的線程數量
from threading import Thread
import time
from threading import currentThread
from threading import enumerate
from threading import activeCount


# 1.currentThread().ident 查看線程id號

def func():
    print("子線程id", currentThread().ident, os.getpid())


if __name__ == "__main__":
    Thread(target=func).start()
    print("主線程id", currentThread().ident, os.getpid())



# 2.enumerate()        返回目前正在運行的線程列表

def func():
    print("子線程id", currentThread().ident, os.getpid())
    time.sleep(0.5)


if __name__ == "__main__":
    for i in range(10):
        Thread(target=func).start()
    lst = enumerate()
    # 子線程10 + 主線程1個 = 11
    print(lst ,len(lst))


    # 3.activeCount()      返回目前正在運行的線程數量
    print(activeCount())

線程池(ThreadPoolExecutor)

默認如果一個線程短時間內可以完成更多的任務,就不會創建額外的新的線程,以節省資源

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread


def func(i):
    print("thread ... start", cthread().ident, i)
    time.sleep(3)
    print("thread ... end", i)
    return cthread().ident


if __name__ == "__main__":
    lst = []
    setvar = set()
    # (1) 創建線程池對象
    """限制線程池最多創建os.cpu_count() * 5 = 線程數,所有任務全由這幾個線程完成,不會額外創建線程"""
    tp = ThreadPoolExecutor()  # 我的電腦40個線程並發

    # (2) 異步提交任務
    for i in range(100):
        res = tp.submit(func, i)
        lst.append(res)

    # (3) 獲取返回值
    for i in lst:
        setvar.add(i.result())

    # (4) 等待所有子線程執行結束
    tp.shutdown()

    print(len(setvar), setvar)
    print("主線程執行結束 ... ")


守護線程

守護線程 : 等待所有線程全部執行完畢之后,再自己終止,守護的是所有線程

線程對象.setDaemon(True)

from threading import Thread
import time


def func1():
    while True:
        time.sleep(0.5)
        print("我是func1")


def func2():
    print("我是func2 start ... ")
    time.sleep(3)
    print("我是func2 end ... ")


t1 = Thread(target=func1)
t2 = Thread(target=func2)

# 在start調用之前,設置守護線程
t1.setDaemon(True)

t1.start()
t2.start()

print("主線程執行結束 ... ")

同步 & 異步

同步

同步意味着順序、統一的時間軸

  • 場景1:是指完成事務的邏輯,先執行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,協同步調,按預定的先后次序進行運行

  • 場景2:一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成,這是一種可靠的任務序列

異步

異步則意味着亂序、效率優先的時間軸

  • 處理調用這個事務之后,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態、回調來通知調用者處理結果

  • 對於I/O相關的程序來說,異步編程可以大幅度的提高系統的吞吐量,因為在某個I/O操作的讀寫過程中,系統可以先去處理其它的操作(通常是其它的I/O操作)

  • 不確定執行順序


阻塞 & 非阻塞

阻塞

程序中有了IO操作,就會發生阻塞,必須要輸入/輸出一個字符串,否則代碼不往下執行

非阻塞

程序中沒有任何耗時操作,無需等待正常往下執行

同步阻塞 :效率低,cpu利用不充分
異步阻塞 :比如socketserver,可以同時連接多個,但是彼此都有recv
同步非阻塞:沒有類似input的代碼,從上到下執行.默認的正常情況代碼
異步非阻塞:效率是最高的,cpu過度充分,過度發熱, 需液冷


串行 & 並行 & 並發

假設有A、B兩個任務,則串行、並行、並發的區別如圖所示。

串行

A和B兩個任務運行在一個CPU線程上,在A任務執行完之前不可以執行B。即,在整個程序的運行過程中,僅存在一個運行上下文,即一個調用棧一個堆。程序會按順序執行每個指令

並行

並行指兩個或兩個以上任務同一時刻被不同的cpu執行。在多道程序環境下,並行性使多個程序同一時刻可在不同CPU上同時執行。比如,A和B兩個任務可以同時運行在不同的CPU線程上,效率較高,但受限於CPU線程數,如果任務數量超過了CPU線程數,那么每個線程上的任務仍然是順序執行的。

並發

並發指多個線程在宏觀(相對於較長的時間區間而言)上表現為同時執行,而實際上是輪流穿插着執行,並發的實質是一個物理CPU在若干道程序之間多路復用,其目的是提高有限物理資源的運行效率。 並發與並行串行並不是互斥的概念,如果是在一個CPU線程上啟用並發,那么自然就還是串行的,而如果在多個線程上啟用並發,那么程序的執行就可以是既並發

圖示


線程同步

由於一個進程中的多個線程享進程中的資源,所以可能造成多個線程同時修改一個變量的情況(即線程⾮安全),可能造成數據混亂,故需要進⾏同步控制,即線程同步

可以通過延時確定多線程的執行順序,但不推薦。

import threading
import time


def work1(nums):
    nums.append(44)
    print('-----in work1-----', nums)


def work2(nums):
    time.sleep(1)
    # 延時一會保證另一線程執行
    print('-----in work2-----', nums)


g_nums = [11, 22, 33]
t1 = threading.Thread(target=work1, args=(g_nums,))
t1.start()
t2 = threading.Thread(target=work2, args=(g_nums,))
t2.start()

互斥鎖(threading模塊中定義的Lock類)

互斥鎖保證了每次只有⼀個線程操作共享數據,從⽽保證了多線程情況下數據的安全性(原子性),可以實現線程同步

互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”狀態,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程操作共享數據,從而保證了多線程情況下數據的安全性。

盡量使用一把鎖解決問題,不要互相嵌套,否則容易死鎖

import threading

num = 0


def test1():
    global num
    
    # 調用Lock對象的acquire()方法獲得鎖時,這把鎖進入“locked”狀態
    # 如果此時另一個線程2試圖獲得這個鎖,該線程2就會變為同步阻塞狀態
    if mutex.acquire():
        for i in range(1000):
            num += 1
            
    # 調用Lock對象的release()方法釋放鎖之后,該鎖進入“unlocked”狀態。
    mutex.release()


def test2():
    global num
    
    # 線程調度程序繼續從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態
    if mutex.acquire():
        for i in range(1000):
            num += 1
    mutex.release()


mutex = threading.Lock()
p1 = threading.Thread(target=test1)
p1.start()
p2 = threading.Thread(target=test2)
p2.start()
print(num)
死鎖(只上鎖,不解鎖)

在多個線程間共享多個資源的時候, 如果兩個線程分別占有⼀部分資源並且同時等待對⽅的資源, 就會造成死鎖

在多線程程序中,死鎖問題很大一部分是由於線程同時獲取多個鎖造成的。如一個線程獲取了第一個鎖,然后在獲取第二個鎖的時候發生阻塞,那么這個線程就可能阻塞其他線程的執行,從而導致整個程序假死

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        # 線程1被 A 鎖——>鎖定
        if mutexA.acquire():
            print(self.name + '---do1---up---')
            time.sleep(1)
            if mutexB.acquire():
                print(self.name + '---do1---down---')
                mutexB.release()
                
        # 線程1被 A 鎖釋放的前提是:線程1 搶到 B 鎖
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        time.sleep(1)
        # 線程2被 B 鎖——>鎖定
        if mutexB.acquire():
            print(self.name + '---do2---up---')
            if mutexA.acquire():
                print(self.name + '---do2---down---')
            	mutexA.release()
                
		# 線程2被 B 鎖釋放的前提是:線程2 搶到 A 鎖
        mutexB.release()


if __name__ == '__main__':
    mutexA = threading.Lock()
    mutexB = threading.Lock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    
    
# Thread-1---do1---up---
# Thread-2---do2---up---
# 程序卡死

# 線程1不釋放A鎖
# 線程2不釋放B鎖

遞歸鎖(threading模塊中定義的RLock類)

用於快速解決項目因死鎖問題不能正常運行的場景,用來處理異常死鎖的

import threading
import time


class MyThread1(threading.Thread):
    def run(self):
        if mutexA.acquire():
            print(self.name + '---do1---up---')
            time.sleep(1)
            if mutexB.acquire():
                print(self.name + '---do1---down---')
                mutexB.release()
        mutexA.release()


class MyThread2(threading.Thread):
    def run(self):
        time.sleep(1)
        if mutexB.acquire():
            print(self.name + '---do2---up---')
            if mutexA.acquire():
                print(self.name + '---do2---down---')
            	mutexA.release()

        mutexB.release()


if __name__ == '__main__':
    mutexA = threading.RLock()
    mutexB = threading.RLock()
    t1 = MyThread1()
    t2 = MyThread2()
    t1.start()
    t2.start()
    
    
# Thread-1---do1---up---
# Thread-1---do1---down---
# Thread-2---do2---up---
# Thread-2---do2---down---

信號量(threading模塊中定義的Semaphore類)

信號量 semaphore:用於控制同一時間內可以操作進程資源的線程數量的一把鎖,簡言之信號量是用來控制線程並發數的一把鎖,也可以實現線程同步

使用場景:在讀寫文件的時候,一般只有一個線程在寫,而讀可以有多個線程同時進行,如果需要限制同時讀文件的線程個數,這時候就可以用到信號量了(如果用互斥鎖,就是限制同一時刻只能有一個線程讀取文件)

import time
import threading


def foo(se):
    se.acquire()
    time.sleep(2)
    print("ok")
    se.release()


if __name__ == "__main__":
    # 設置同一時間內可以有5個線程並發
    se = threading.Semaphore(5)
    
    for i in range(20):
        t1 = threading.Thread(target=foo, args=(se,))
        t1.start()  # 此時可以控制同時進入的線程數

線程隊列(queue模塊)

通過3種類型的隊列來實現線程同步,都實現了鎖原語(可以理解為原⼦操作, 即要么不做, 要么就做完) , 能夠在多線程中直接使⽤

queue.Queue:FIFO(先⼊先出) 隊列 Queue
# 基本使用
from queue import Queue

# put 存
# get 取
# put_nowait 存,超出了隊列長度,報錯
# get_nowait 取,沒數據取不出來,報錯


# linux windows 線程中put_nowait,get_nowait都支持

"""先進先出,后進后出"""
# maxsize為一個整數,表示隊列的最大條目數,可用來限制內存的使用。
# 一旦隊列滿,插入將被阻塞直到隊列中存在空閑空間。如果maxsize小於等於0,隊列大小為無限。maxsize默認為0

q = Queue(maxsize=0)
q.put(1)
q.put(2)
print(q.get())
print(q.get())
# 取不出來,阻塞
# print(q.get())
print(q.get_nowait())


q2 = Queue(3)
q2.put(11)
q2.put(22)
q2.put(33)
# 放不進去了,阻塞
# q2.put(44)
q2.put_nowait(44)
import threading
import time
from queue import Queue


class Pro(threading.Thread):
    def run(self):
        global queue
        count = 0
        while True:
            if queue.qsize() < 1000:
                for i in range(100):
                    count = count + 1
                    msg = '生成產品' + str(count)
                    queue.put(msg)  # 隊列中添加新產品
                    print(msg)
            time.sleep(1)


class Con(threading.Thread):
    def run(self):
        global queue
        while True:
            if queue.qsize() > 100:
                for i in range(3):
                    msg = self.name + '消費了' + queue.get()
                    print(msg)
            time.sleep(1)


if __name__ == "__main__":
    queue = Queue()
    # 創建一個隊列。線程中能用,進程中不能使用
    for i in range(500):  # 創建500個產品放到隊列里
        queue.put('初始產品' + str(i))  # 字符串放進隊列
        for i in range(2):  # 創建了兩個線程
            p = Pro()
            p.start()
        for i in range(5):  # 5個線程
            c = Con()
            c.start()

queue.LifoQueue:LIFO(后⼊先出) 棧 LifoQueue
# LifoQueue 先進后出,后進先出(按照棧的特點設計)

from queue import LifoQueue


lq = LifoQueue(3)
lq.put(11)
lq.put(22)
lq.put(33)
# print(lq.put_nowait(444))

print(lq.get())
print(lq.get())
print(lq.get())
queue.PriorityQueue:(優先級隊列) PriorityQueue
# PriorityQueue 按照優先級順序排序 (默認從小到大排序)

from queue import PriorityQueue


# 如果都是數字,默認從小到大排序
pq = PriorityQueue()
pq.put(13)
pq.put(3)
pq.put(20)
print(pq.get())
print(pq.get())
print(pq.get())

# 如果都是字符串
"""如果是字符串,按照ascii編碼排序"""
pq1 = PriorityQueue()
pq1.put("chinese")
pq1.put("america")
pq1.put("latinos")
pq1.put("blackman")

print(pq1.get())
print(pq1.get())
print(pq1.get())
print(pq1.get())

# 要么全是數字,要么全是字符串,不能混合 error
"""
pq2 = PriorityQueue()
pq2.put(13)
pq2.put("aaa")
pq2.put("擬稿")
"""

pq3 = PriorityQueue()
# 默認按照元組中的第一個元素排序
pq3.put( (20,"wangwen") )
pq3.put( (18,"wangzhen") )
pq3.put( (30,"weiyilin") )
pq3.put( (40,"xiechen") )

print(pq3.get())
print(pq3.get())
print(pq3.get())
print(pq3.get())

生產消費者模式

  • 進程(線程)之間如果直接通信,可能會出現兩個問題

    • 耦合性太強
    • 速率有可能不匹配

    解決方式,找一個緩沖區來中轉數據即生產者——消費者模式


線程異步

通過回調函數可以實現多線程異步執行

回調函數:
把函數當成參數傳遞給另外一個函數
在當前函數執行完畢之后,最后調用一下該參數(函數),這個函數就是回調函數

功能:
打印狀態: a屬性
支付狀態: b屬性
退款狀態: c屬性
轉賬的狀態: d屬性
把想要的相關成員或者相關邏輯寫在自定義的函數中
支付寶接口在正常執行之后,會調用自定義的函數,來執行相應的邏輯
那么這個函數就是回調函數

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread as cthread
import os, time


def func1(i):
    print("Process start ... ", os.getpid())
    time.sleep(0.5)
    print("Process end ... ", i)
    return "*" * i


def func2(i):
    print("thread start ... ", cthread().ident)
    time.sleep(0.5)
    print("thread end ... ", i)
    return "*" * i


def call_back1(obj):
    print("<==回調函數callback進程號:===>", os.getpid())
    print(obj.result())


def call_back2(obj):
    print("<==回調函數callback線程號:===>", cthread().ident)
    print(obj.result())


# (1) 進程池的回調函數: 由主進程執行調用完成

if __name__ == "__main__":
    p = ProcessPoolExecutor(5)
    for i in range(1, 11):
        res = p.submit(func1, i)
        # 進程對象.add_done_callback(回調函數) 
        '''
        add_done_callback 可以把res本對象和回調函數自動傳遞到函數里來
        '''
        res.add_done_callback(call_back1)
    p.shutdown()
    print("主進程執行結束 ... ", os.getpid())



# (2) 線程池的回調函數: 由當前子線程執行調用完成
if __name__ == "__main__":
    tp = ThreadPoolExecutor(5)
    for i in range(1, 11):
        res = tp.submit(func2, i)
        # 線程對象.add_done_callback(回調函數) 
        '''
        add_done_callback 可以把res本對象和回調函數自動傳遞到函數里來
        '''
        res.add_done_callback(call_back2)
    tp.shutdown()
    print("主線程執行結束 ... ", cthread().ident)
from multiprocessing import Pool
import random
import time


def download(f):
    for i in range(1, 4):
        print(f"{f}下載文件{i}")
        time.sleep(random.randint(1, 3))
    return "下載完成"


def alterUser(msg):
    print(msg)


if __name__ == "__main__":
    p = Pool(3)
    # 當func執行完畢后,return的東西會給到回調函數callback
    p.apply_async(func=download, args=("線程1",), callback=alterUser)
    p.apply_async(func=download, args=("線程2",), callback=alterUser)
    p.apply_async(func=download, args=("線程3",), callback=alterUser)
    p.close()
    p.join()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM