網絡編程進階:並發編程之多線程


多線程:

在傳統操作系統中,每個進程有一個地址空間,而且默認就有一個控制線程; 進程的作用就是隔離數據。

進程只是用來把資源集中到一起(進程只是一個資源單位,或者說資源集合),而線程才是CPU上的執行單位。(進程必須靠線程去執行)

線程就類似於一條流水線工作的過程;多線程(即多個控制線程)的概念是:在一個進程中存在多個線程,多個線程共享該進程的地址空間;

線程、進程區別: 

1. 同一個進程內的多個線程共享該進程內的地址資源;

2. 創建線程的開銷遠小於創建進程的開銷(創建一個進程需要向操作系統申請新的地址空間)

 

開啟線程的兩種方式(跟開啟進程相似):

方式一:

from threading import Thread
import time

def sayhi(name):
    print("hello %s"%name)
    time.sleep(1)
    print("bye %s"%name)

if __name__ == "__main__":
    t1 = Thread(target=sayhi,args=("neo",))
    t1.start()  # t1.start()也是給操作系統發信號,但是操作系統已經不需要開辟新的內存空間 print("主線程")   #  站在執行的角度看,這個py文件是主線程;站在資源的角度看,這個是主進程

# 運行結果:
# hello neo    # 先打印的是新線程中的內容,因為開辟新線程t1.start()只需要給操作系統發信號,不需要開辟新的內存空間,所以新線程內的任務可以立馬執行;所以說創建線程的開銷遠小於創建進程
# 主線程
# bye neo

 方式二:

from threading import Thread
import time

class MyThread(Thread):
    def __init__(self,name):
        super().__init__()   # Thread子類的__init__()中,必須先寫 super().__init__(),即先繼承父類的__init__()再定義自己的邏輯
        self.name = name

    """
    If the subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) 
    before doing anything else to the thread.
    """

    def run(self):
        print("hello %s"%self.name)
        time.sleep(1)
        print("bye %s"%self.name)

if __name__ == "__main__":
    t1 = MyThread("neo")
    t1.start()
    print("主線程")

# 運行結果:
# hello neo
# 主線程
# bye neo

 查看當前進程的pid也可以利用multiprocessing模塊下的 current_process().pid

 

Thread對象的其他屬性或方法:

from threading import Thread,current_thread,active_count,enumerate
import time

def task():
    print("%s is running"%current_thread().getName())  # current_thread()是返回當前的線程變量; # getName()是返回線程名
    time.sleep(1)
    print("%s is done"%current_thread().getName())

if __name__ == "__main__":
    t = Thread(target=task,name="子線程1")  # 線程之間的地位是平等的,沒有主、子之分; # name= 要是不寫,系統用默認名
    t.start()  # 給操作系統發信號,幾乎是瞬間開啟新的線程

    t.setName("兒子線程1")  # 更改線程名字;  # 這個例子中,task中的第一個print,里面的線程名還是 “子線程1”,time.sleep(1)之后,第二個print的線程名就變成了“兒子線程1”

    print(active_count())  # 返回當前活躍的線程數
    print(enumerate())  # 返回當前所有活躍的線程對象;列表的形式  # active_count() 就相當於 len(enumerate())

    t.join()   #  主線程卡在這一步,等待t這個線程執行結束
    print(t.is_alive())  # 查看線程是否存活;返回bool值

    # print(t.getName())  # 在“主線程”中也可以查看“子線程”名
    print("主線程名字",current_thread().getName())
    current_thread().setName("主線程名")

運行結果:

 

守護線程:無論是進程還是線程,都遵循:守護xx在主xx運行完畢后被銷毀

需要強調的是:運行完畢並非終止運行

  1. 對主進程來說,運行完畢指的是主進程代碼運行完畢;

  2. 對主線程來說,運行完畢指的是主線程所在的進程內所有非守護線程統統運行完畢,主線程才算運行完畢

詳細解釋:

  1. 主進程在其代碼結束后就已經算運行完畢了(守護進程就在此時被回收),然后主進程會一直等非守護的子進程都運行完畢后回收子進程的資源(否則會產生僵屍進程),才會結束;

  2. 主線程在其他非守護線程運行完畢后才算運行完畢(守護線程在此時就會被回收)。因為主線程的結束意味着進程的結束,進程整體的資源都會被回收,而進程必須保證非守護線程都運行完畢后才能結束

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(2)
    print("end456")

if __name__ == "__main__":
    t1 = Thread(target=foo)
    t2 = Thread(target=bar)

    t1.daemon = True  # 把t1設置成守護線程; # 在start()之前設置
    t1.start()
    t2.start()

    print("main----")

# 運行結果:
# 123
# 456
# main----
# end123
# end456

線程互斥鎖:

from threading import Thread,Lock
import time

n = 100

def task():
    global n  # global n 不需要加鎖
    mutex.acquire()  # 加鎖就是為了局部串行; # 修改共享數據的部分需要加鎖
    temp = n
    time.sleep(0.1)  # 0.1秒足以讓“主線程”中的那100個“子線程”起來
    n = temp - 1
    mutex.release()  # 通過加鎖效率降低

if __name__ == "__main__":
    mutex = Lock()   # threading下面的Lock
    t_list = []
    for i in range(100):
        t = Thread(target=task)
        t_list.append(t)
        t.start()

    for t in t_list:
        t.join()

    print("",n)
    
# 運行結果:
# 主 0

 

GIL:global interpreter lock; 只有CPython才有GIL

GIL是加在CPython解釋器上的互斥鎖,本質也是一把互斥鎖,所有互斥鎖的本質都一樣,都是將並發運行變成串行,以此來控制同一時間內共享的數據只能被一個任務所修改,進而保證數據安全

保護不同的數據安全,就應該加不同的鎖 (GIL保護的是解釋器里關於垃圾回收線程的數據(解釋器級別的數據),保護不了你自己的共享數據;想要保護你自己的共享數據,需要你自己加互斥鎖)

要想了解GIL,需要確定一點: 每次執行python程序,都會產生一個獨立的進程

# 運行一個py文件,啟動的就是一個python解釋器的進程(python.exe的)

# 運行python文件需要經歷3步:
# 1. 先把python解釋器的內容加載到內存
# 2. 把py文件的代碼從硬盤加載到內存
# 3. 把py文件的內容交給Cpython解釋器去執行(py文件里面的代碼只是字符串,並不能運行;執行的是python解釋器里面C給你實現的功能)
# 注: 第3步可以這么理解: 把python解釋器當成一個函數,而你寫的py文件代碼是python解釋器這個函數的參數,然后把py文件的代碼當做參數傳給python解釋器這個函數
# 所以你運行一個py文件,產生的是一個python解釋器的進程

 

GIL與多線程:

有了GIL的存在,同一時刻同一進程中只有一個線程被執行;所以多線程無法利用多核優勢

多進程還是多線程?首先要明白:

  1. CPU不是用來做IO的,而是用來做計算的;

  2. 多CPU,意味着可以有多個核並行完成計算,所以多核提升的是計算性能

  3. 每個CPU一旦遇到I/O阻塞,仍然需要等待,所以多核對I/O操作沒什么用

結論:

  1. 對計算來說,CPU越多越好,但是對於I/O來說,再多的CPU也沒用;

  2. 對運行一個程序來說,隨着CPU的增多執行效率肯定會有所提高(不管提升幅度多大,總會有提高),這是因為一個程序基本不會是純計算或者純I/O,所以我們只能相對的去看一個程序到底是計算密集還是I/O密集型,從而得出用多線程還是多進程

現在的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提升,甚至不如串行(沒有大量切換),但是,對於IO密集型的任務效率會有顯著提升

 

多線程性能測試:

如果並發的多個任務是計算密集型:多進程效率高

# 計算密集型:多進程效率高

from multiprocessing import Process
from threading import Thread
import time,os

def task():
    res = 0
    for i in range(100000000):
        res *= i

if __name__ == "__main__":
    l = []
    print(os.cpu_count())  # 本機4核
    start = time.time()
    for i in range(4):
        p = Process(target=task)  # 耗時 16.18108034133911秒
        # p = Thread(target=task)    # 耗時 26.78442931175232秒
        l.append(p)
        p.start()

    for i in l:
        i.join()

    stop = time.time()

    print("run time is %s"%(stop-start))

如果並發的多個任務是I/O密集型:多線程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
    # print('===>')

if __name__ == '__main__':
    l=[]
    print(os.cpu_count()) #本機為4核
    start=time.time()
    for i in range(400):
        p=Process(target=work) #耗時32s多,大部分時間耗費在創建進程上
        # p=Thread(target=work) #耗時2.0602962970733643 秒
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

應用:多線程用於IO密集型,如socket,爬蟲,web等; 多進程用於計算密集型(多進程能用到多核優勢),如金融分析

 

死鎖與遞歸鎖:

死鎖:指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,他們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程;如下所示:

from threading import Thread,Lock
import time

mutexA = Lock()
mutexB = Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print("%s 拿到A鎖了"%self.name)  # Thread對象下面自帶有 name這個屬性

        mutexB.acquire()
        print("%s 拿到B鎖了" % self.name)

        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print("%s 拿到B鎖了" % self.name)
        time.sleep(1)

        mutexA.acquire()
        print("%s 拿到A鎖了" % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":
    for i in range(5):
        t = MyThread()
        t.start()

# 運行結果:
# Thread-1 拿到A鎖了
# Thread-1 拿到B鎖了
# Thread-1 拿到B鎖了
# Thread-2 拿到A鎖了   # 出現死鎖,整個程序阻塞住

 

遞歸鎖:

解決方法,遞歸鎖:在python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock;

這個RLock內部維護着一個Lock和一個counter(計數器)變量,counter記錄了acquire的次數,從而使得資源可以被多次acquire;知道一個線程中所有的acquire都被release,其他的線程才能獲得資源;上面的例子如果使用RLock代替Lock,則不會發生死鎖,二者的區別是:遞歸鎖可以連續acquire多次,而互斥鎖只能acquire一次

from threading import Thread,RLock
import time

mutexA = mutexB = RLock()  # 一個線程拿到鎖,counter +1,該線程內又碰到加鎖的情況,則counter持續 +1;這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutexA.acquire()
        print("%s 拿到A鎖了"%self.name)

        mutexB.acquire()
        print("%s 拿到B鎖了" % self.name)

        mutexB.release()
        mutexA.release()

    def func2(self):
        mutexB.acquire()
        print("%s 拿到B鎖了" % self.name)
        time.sleep(1)

        mutexA.acquire()
        print("%s 拿到A鎖了" % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == "__main__":
    for i in range(5):
        t = MyThread()
        t.start()

 

信號量:

信號量也是一種鎖,可以指定信號量(例如5),對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間可以有5個任務拿到鎖去執行

from threading import Thread,Semaphore,current_thread
import time

def task():
    with sm:
        print("%s get sm"%current_thread().getName())
        time.sleep(1)

    """
    with sm: 和 with open(file)的用法是一樣的:
    在with sm會先自動 sm.acquire(), with sm內的代碼執行完后會自動 sm.release()
    """

if __name__ == "__main__":
    sm = Semaphore(3)   # 3是指信號量的最大計數量(最多有幾把鎖),即同一時間最有能有幾個線程搶到鎖去執行
    for i in range(10):
        t = Thread(target=task)
        t.start()

"""
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器 -1; 調用 release()時內置計數器 +1;
計數器不能小於0,當計數器為0時, acquire()將阻塞線程直到其他線程調用 release()
"""

 

Event事件:

線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,我們就需要使用threading庫中的Event對象;對象包含一個可由線程設置的信號標志,它允許線程等待某些事件的發生。在初始狀況下,Event對象中的信號被設置為False;如果有線程等待一個Event對象,而這個Event對象的標志為False,那么這個線程將會被一直阻塞到該標志為True。一個線程如果將一個Event對象的信號標志設置為True,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為True的Event對象那么它將忽略這個事件,繼續執行

from threading import Event
event.is_set()  # 返回event的狀態值
event.wait()  # 如果 event.is_set() == False將阻塞線程  # wait(number)是設置最大等待時長(秒)
event.set()  # 設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態,等待操作系統調度
event.clear()  # 恢復event的狀態值為False

 

from threading import Thread,Event,current_thread
import time

event = Event()  # 實例化一個Event對象

def conn():
    n = 0
    while not event.is_set():   # 這行代碼的含義是:event還沒有被設置成True,即 還沒有經歷 event.set()這一步 # .is_set()也該寫成 .isSet()
        if n == 6:
            print("%s tried to connect too many times"%current_thread().getName())
            return
        print("%s is trying to connect %s times"%(current_thread().getName(),n))
        event.wait(0.5)  # 最多等0.5s
        n += 1
    print("%s is connected"%current_thread().getName())

def check():
    print("%s is checking"%current_thread().getName())
    time.sleep(5)
    event.set()  # 這一步將event設置成了True

if __name__ == "__main__":
    for i in range(3):
        t = Thread(target=conn)
        t.start()
    t = Thread(target=check)
    t.start()

 

定時器(Timer):隔一段時間后執行某任務

from threading import Thread,Timer

def task(name):
    print("hello %s"%name)

timer = Timer(3,task,args=("neo",))  # Timer第一個參數interval傳入時間間隔(單位:秒),意為幾秒之后去執行后面的任務;第二個是函數名;后面傳入函數需要傳入的參數,兩種方式:args和kwargs,args是原則的形式
timer.start()  # 啟動這個定時器

"""
Timer是Thread的子類,Timer實例化后得到是一個線程對象
所以,timer.start()就是開啟了一個新的線程
"""

利用定時器Timer制作一個定時有效的驗證碼 :

from threading import Timer
import random

class Code:

    def __init__(self):
        self.code_cache()   # 實例化的時候就先生成一個驗證碼存放到cache里面

    def code_cache(self):
        self.cache = self.make_code()   # 生成的驗證碼放到緩存里面
        print(self.cache)
        self.timer = Timer(10,self.code_cache)   # 每隔10秒就重新生成一個線程去調用code_cache()方法
        self.timer.start()

    def make_code(self,n=4):
        res = ""
        for i in range(n):
            s1 = str(random.randint(0,9))
            s2 = chr(random.randint(65,90))  # 65-90是A-Z在ASCII碼中對應的位置
            res += random.choice([s1,s2])   # 從s1(數字)和s2(A-Z)中隨機選出一個添加到 res 里面
        return res

    def check(self):
        while True:
            code_ipt = input("輸入您的驗證碼:").strip()   # 如果輸入的驗證碼不正確,則每隔10就生成一個新的驗證碼
            if code_ipt.upper() == self.cache:
                print("驗證碼正確")
                self.timer.cancel()   # 關閉定時器
                return

code = Code()
code.check()

 

線程queue:

 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

有三種不同的用法:

class queue.Queue(maxsize=0) # 隊列:先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
結果(先進先出):
first
second
third
'''

 class queue.LifoQueue(maxsize=0)  # 堆棧:last in first out

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
結果(后進先出):
third
second
first
'''

 

class queue.PriorityQueue(maxsize=0)  # 優先級隊列: 存儲數據時可設置優先級的隊列

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())

'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

 

基於多線程的套接字通信:

服務端:

from socket import *
from threading import Thread

def comm(conn):
    while True:
        try:
            data = conn.recv(1024)
       if not data: break conn.send(data.upper())
except ConnectionResetError: conn.close() break def server(ip,port): server = socket(AF_INET,SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn,client_addr = server.accept() t = Thread(target=comm,args=(conn,)) t.start() server.close() if __name__ == "__main__": server("127.0.0.1",8080)

"""
主線程也是一個線程,所以可以讓主線程去做建鏈接的任務,開啟新的“子線程”去做通信的任務
"""

 

線程池進程池:(池的目的是對數目加以限制)

基於多進程或多線程能實現並發的套接字通信,然而這種實現方式的致命缺陷是:服務端開啟的進程數或線程數都會隨着並發的客戶端數目增多而增多,這會對服務端主機帶來巨大的壓力甚至癱瘓,所以我們必須要對服務端開啟的進程數或線程數加以控制,讓機器在一個自己可以承受的范圍內運行,這就是進程池或線程池的用途,例如進程池就是用來存放進程的池子,本質還是基於多進程,只不過是對開啟進程的數目加上了限制

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor # 線程池,提供異步調用
ProcessPoolExecutor  # 進程池,提供異步調用
# 進程池:

from
concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os,time,random def task(name): print("name:%s pid:%s run"%(name,os.getpid())) time.sleep(random.randint(1,3)) if __name__ == "__main__": pool = ProcessPoolExecutor(4) # 4表示池子里面最多能放的進程數;如果不寫,默認是CPU數 for i in range(10): pool.submit(task,"neo%s"%i) # pool.submit(func,args,kwargs) 第一個參數寫函數名,后面寫函數需要傳入的參數;# pool.submit()是異步提交任務 # 異步提交任務:提交完任務后立馬就走,不需要管任務有沒有起來,也不需要拿結果 print("") """ 進程池 pool = ProcessPoolExecutor(4) 從始至終最多只會有4個進程;如下圖運行結果的pid所示 """

線程池:

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import os,time,random

def task():
    print("thread name:%s pid:%s run"%(current_thread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == "__main__":
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task)

    pool.shutdown()  # shutdown()的作用是:相當於pool.close() + pool.join(),即關閉線程池的入口,並且等待池內所有線程執行完畢; # 默認 shutdown(wait=True)

    print("")

 提交任務的兩種方式:

  1. 同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,在執行下一行代碼(會導致程序變成串行執行)

  2. 異步調用:提交完任務后,不會等待任務執行完畢

回調函數: 可以為進程池或線程池內的每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發,並接收任務執行完后的對象當做參數傳入,該函數稱為回調函數

from concurrent.futures import ThreadPoolExecutor
import time,random

def task1(name):
    print("%s is running"%name)
    time.sleep(random.randint(2,3))
    length = random.randint(5,12)*"#"
    return {"name":name,"length":length}

def task2(obj):
    value_of_task1 = obj.result()  # obj是pool.submit(task1,"neo")執行完后的一個對象,需要利用 result()方法取出對象中的值,即 task1 return出來的值
    name = value_of_task1["name"]
    length = len(value_of_task1["length"])
    print("NAME:%s  LENGTH:%s"%(name,length))

if __name__ == "__main__":
    pool = ThreadPoolExecutor(13)

    pool.submit(task1,"neo").add_done_callback(task2)  # 這就是異步調用函數; # pool.submit(task1,"neo") 異步調用,執行完畢后會自動調用后面的task2函數,task2這個函數有且只能有一個參數,在給task2傳參數時,會把pool.submit(task1,"neo")當成一個對象傳入task2的那個參數中,所以task2中需要利用 result()把這個對象中的值取出來

    pool.submit(task1,"alex").add_done_callback(task2)
    pool.submit(task1,"egon").add_done_callback(task2)

"""
同步調用的方法:
如在 __main__ 中
res = pool.submit(task1,"neo").result()   # 同步調用; 此時提交完任務后程序就在這一步原地等待,直到新開的線程task1執行完畢后再利用 result()得到了task1中的返回值,程序才繼續往下執行
"""

 

模擬爬去數據:

# requests模塊可模擬瀏覽器的行為
# requests.get(url) # 到目標地址去下載一個文件到本地
# requests.get(url).text # 目標網頁的內容


# 模擬爬蟲:異步調用+回調函數的應用
import requests,time
from concurrent.futures import ThreadPoolExecutor

def get(url):
    print("getting %s"%url)
    response = requests.get(url)
    time.sleep(3)  # 模擬網絡IO
    return {"url":url,"content":response.text}  # response.text是所下載網頁中的內容

def parse(res):
    res = res.result()  # 取得 pool.submit(get,url)的結果
    url = res["url"]
    length = len(res["content"])
    print("parse %s res is %s"%(url,length))

if __name__ == "__main__":
    urls = [
        "https://docs.python.org/3/library/",
        "https://www.python.org/",
        "https://pypi.python.org/pypi"
    ]
    # 應該采用異步調用,保證爬取任務能並發執行
    # 爬去任務是IO密集型,因為get任務會遇到網絡IO(下載網頁等),所以應該采用多線程
    # 線程也不能無限增加,需要對線程數量做一個設置,所以應該采用 concurrent.futures 的 ThreadPoolExecutor

    pool = ThreadPoolExecutor(2)  # 提交了3個任務,但線程池最多能容納2個,所以第3個任務會等池子中的某個任務執行完畢后再去執行
    for url in urls:
        pool.submit(get,url).add_done_callback(parse)

線程池的套接字通信:

from socket import *
from concurrent.futures import ThreadPoolExecutor

def comm(conn):
    conn = conn
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            conn.close()
            break

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,client_addr = server.accept()
        pool.submit(comm,conn)  # 直接把comm放入線程池中就行,不需要回調函數

    server.close()

if __name__ == "__main__":
    pool = ThreadPoolExecutor(2)
    server("127.0.0.1",8081)  # 主線程還是用來干建鏈接的活

# 運行結果:
# 通過線程池控制了線程數,同一時間最多只能有兩個服務端通信

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM