python並發、異步—多線程


不是並行,不是真正意義上的並發,可以單核實現並發。進程是資源單位(相當於車間),線程是運行單位(相當於生產線)

io多的項目,多線程更優於多進程

1 threading

  • 開啟線程—函數
from threading import Thread
import time


def t_func(name, n):
    time.sleep(n)
    print("name:", name)


if __name__ == '__main__':
    t = Thread(target=t_func, args=("lynn", 4))
    t1 = Thread(target=t_func, args=("fancy", 1))
    t.start()
    t1.start()
    t.join() # 線程t完全運行完,才繼續往下運行
    print("主")

注意:

target是函數名字,不加()

args是元組,必須按位置,只有一個參數時要加,

join方法,不加join方法,是異步的,加join是把異步變成同步,就是只有該線程完全運行完,才繼續往下運行,不影響其他線程。

  • 開啟線程—類
from threading import Thread
import time

class TClass(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        time.sleep(1)
        print("name:", self.name)
        return self.name

if __name__ == '__main__':
    t = TClass("lynn")
    t1 = TClass("fancy")
    t.start()
    t1.start()
    t.join()
    print("主")

其他方法

getName()線程的名字

setName()設置線程的名字

isAlive() 返回線程是否活動的

  • 守護線程

主線程所在的進程內,所有的線程運行完畢,停止運行。其實也是線程運行完畢,停止運行

from threading import Thread
import time

class TClass(Thread):
    def __init__(self, n, name):
        super().__init__()
        self.name = name
        self.n = n

    def run(self):
        time.sleep(self.n)
        print("name:", self.name)
        return self.name


if __name__ == '__main__':
    t = TClass(1, name="lynn")
    t1 = TClass(5, name="fancy")
    t.daemon = True
    t.start()
    t1.start()
    print("主")

注意:

​ 是主線程所在的進程內所有的線程運行完畢,停止運行

daemon必須在start方法之前

2 線程數據安全和通信

線程鎖

  • 互斥鎖

用來實現對共享資源的同步訪問,也稱為同步鎖

同一時間只有一個進程對加鎖的數據進行操作。把該部分變成串行,切不運行完,不釋放鎖,會一直阻塞。

from threading import Thread
from threading import Lock
import time


class TClass(Thread):
    def __init__(self, n, name, lock):
        super().__init__()
        self.name = name
        self.n = n
        self.lock = lock

    def run(self):
        with self.lock:
            with open("t_text.txt", "wt", encoding="utf-8")as f:
                f.write(self.name)
            time.sleep(self.n)
            with open("t_text.txt", "rt", encoding="utf-8")as f:
                print("name:", f.read())
            print(self.name)


if __name__ == '__main__':
    lock = Lock()
    t = TClass(1, name="lynn", lock=lock)
    t1 = TClass(5, name="fancy", lock=lock)
    t.daemon = True
    t.start()
    t1.start()
    print("主")

注意:

​ GIL鎖也是互斥鎖,是解釋器級別的互斥鎖

​ 盡量只在修改數據的部分加鎖,因為會把並發轉為串行,會影響效率

  • 死鎖

兩個或兩個以上的線程(進程),在運行過程中兩個線程(進程)互相等待,兩個鎖互相拿着沒釋放,沒有外部原因,會一直阻塞,稱為死鎖現象。

死鎖現象

import time
from threading import Thread
from threading import Lock

a_lock = Lock()
b_lock = Lock()


class DClass(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        if self.name == "lynn":
            self.a_func()
        if self.name == "fancy":
            self.b_func()

    def a_func(self):
        a_lock.acquire()
        time.sleep(1)
        print("拿到a鎖")
        b_lock.acquire()
        print("拿到b鎖")
        a_lock.release()
        print("釋放a鎖")
        b_lock.release()
        print("釋放b鎖")

    def b_func(self):
        b_lock.acquire()
        time.sleep(1)
        print("拿到b鎖")
        a_lock.acquire()
        print("拿到a鎖")
        b_lock.release()
        print("釋放b鎖")
        a_lock.release()
        print("釋放a鎖")


dc1 = DClass("lynn")
dc2 = DClass("fancy")
dc1.start()
dc2.start()

注意:

​ 線程A(進程) 拿着 鎖a,要拿鎖b釋放鎖a

​ 進程B(進程) 拿着 鎖b,要拿鎖a釋放鎖b

​ 鎖a和鎖b形成死鎖現象

  • 遞歸鎖RLock

解決死鎖現象,針對多個鎖的情況,遞歸鎖可以被單個線程(進程)拿多次,每拿一次做一次標記,釋放一次減去一個標記,標記為0時,才能被其他線程(進程)拿

import time
from threading import Thread
from threading import RLock

r_lock = RLock()

class DClass(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        if self.name == "lynn":
            self.a_func()
        if self.name == "fancy":
            self.b_func()

    def a_func(self):
        r_lock.acquire()
        print("拿到a鎖")
        r_lock.acquire()
        print("拿到b鎖")
        r_lock.release()
        print("釋放a鎖")
        time.sleep(1)
        r_lock.release()
        print("釋放b鎖")

    def b_func(self):
        r_lock.acquire()
        time.sleep(1)
        print("拿到c鎖")
        r_lock.acquire()
        print("拿到c鎖")
        r_lock.release()
        print("釋放c鎖")
        r_lock.release()
        print("釋放c鎖")


dc1 = DClass("lynn")
dc2 = DClass("fancy")
dc1.start()
dc2.start()

注意:

​ 只有被該線程(進程)全部釋放才能被別的線程拿

線程間的通信

線程時相互獨立的,數據是隔離的

  • Queue

管道:生產者消費者模型

from queue import Queue
from threading import Thread
import time


class SClass(Thread):
    def __init__(self, Q, name):
        super().__init__()
        self.Q = Q
        self.name = name

    def run(self):
        for i in range(100):
            self.Q.put("{}的{}包子".format(self.name, i))

class XClass(Thread):
    def __init__(self, Q):
        super().__init__()
        self.Q = Q
    def run(self):
        while True:
            time.sleep(0.1)
            res = self.Q.get()
            print(res)
            if not res:
                break

if __name__ == '__main__':
    Q = Queue(10)
    st = SClass(Q, "lynn")
    xc = XClass(Q)
    st.start()
    xc.start()
    st.join()
    Q.put(None)

3 ThreadPoolExecutor

支持線程池和進程池,python3.2之后版本

from concurrent.futures import ThreadPoolExecutor
import time


def thread_func(a):
    time.sleep(2)
    print("a")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5)as t:
        res = t.submit(thread_func, 1)
        # print(res.result()) # 會阻塞
        r1 = t.submit(thread_func)
        print("end")
        # time.sleep(2)
        print(res.done())
        print(res.result())
    print(res.done())

注意:

submit方法,不阻塞,是異步的,第一個參數是方法名,后邊按位置參數傳方法需要的參數

with會等所有的線程運行完畢,才繼續往下運行

done()查看線程的運行狀態,True為運行完畢

result()線程的返回值,會阻塞

  • wait

開啟多個線程

import time
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


class TClass:
    @staticmethod
    def run():
        time.sleep(10)
        print("ok")


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5)as t:
        t_list = (t.submit(TClass.run), )
        wait(t_list, timeout=0.1, return_when=ALL_COMPLETED)
    print('end')

注意:

​ 第一個參數必須是可迭代對象,最好是元組,里邊的元素是submit()方法提交的數據

timeout超時時間,超過這個時間,該方法的阻塞時間,默認線程運行完才會繼續往下運行

  • as_completed
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


class TClass:
    @staticmethod
    def run(a, b):
        time.sleep(3)
        print("ok", a, b)
        return {"name": "lynn"}


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5)as t:
        t_list = [t.submit(TClass.run, 1, 2) for i in range(5)]
    res_list = as_completed(t_list) # 阻塞
    for i in res_list:
        print(i.result()) # 返回值

注意:

as_completed參數是可迭代對象:列表、元組等,元素是submit方法處理的線程

​ for循環能取到每個線程的結果

  • map
import time
from concurrent.futures import ThreadPoolExecutor


class TClass:
    @staticmethod
    def run(a, b):
        time.sleep(1)
        print('ok', a, b)
        return a, b


if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=5)as t:
        genera_res = t.map(TClass.run, (1,3), (2,3)) # 結果是生成器
        for i in genera_res:
            print(i) # 線程的返回值

注意:

map參數直接是方法,不用submit

​ 參數以元組方式傳遞,多個參數多個元組,一個元組中多個參數表示調用多次


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM