python並發編程之多線程


1、線程理論知識

   概念:指的是一條流水線的工作過程的總稱,是一個抽象的概念,是CPU基本執行單位。

  進程和線程之間的區別:

    1. 進程僅僅是一個資源單位,其中包含程序運行所需的資源,而線程就相當於車間的流水線,負責執行具代碼。

    2. 每個進程至少包含一個線程,由操作系統自動創建,稱之為主線程

    3. 每個進程可以有任意數量的線程

    4.創建進程的開銷要比創建進程小得多

    5. 同一進程的線程間數據是共享的

    6.線程之間是平等的,沒有子父級關系,同一進程下的各線程的PID相同

    7. 創建線程的代碼可以寫在任意位置,不一定非要在main函數下。

  為什么使用線程:

    提高程序執行效率

2、開啟線程的兩種方式

  和進程類似,但是開啟方式不一定非要建在main函數下。

# 第一種方式,實例化 Thread
# from threading import Thread
#
# def task():
#     print("subthread is running....")
#
# t = Thread(target=task)
# t.start()
# print('main is over....')

# 第二種方式,繼承Thread類

from threading import Thread

class MyThread(Thread):
    def run(self):
        print("subthread is running....")
兩種方式

3、主線程和子線程之間的關系

  1. 主線程任務執行完畢后,主線程會等待所有子線程全部執行完畢后結束

  2. 在同一進程中,所有線程都是平等的,沒有子父級關系

# 驗證主線程代碼執行完后會不會立即結束,
import random
import time
import threading
from threading import Thread
def task(name):
    print("%s is running..." % name)
    time.sleep(random.randint(1, 3))
    print(threading.enumerate())
    print("%s is over....." % name)


t = Thread(target=task, args=('aaa',))
t.start()

print('main over....')
# 驗證主線程代碼執行完后會不會立即結束,

4、驗證線程和進程之間的區別

from threading import Thread
import time

def task():
    global num
    time.sleep(1)
    num -= 1
num = 10
t = Thread(target=task,)
t.start()
t.join()
print(num)
同一進程中線程的數據是可以共享的
from multiprocessing import Process
from threading import Thread
import time

def task():
    pass

def expense(cls):
    """用來測試線程或進程創建開銷"""
    lis = []
    start = time.time()
    for i in range(50):
        p = cls(target=task,)
        p.start()
        lis.append(p)
    for p in lis:
        p.join()
    return time.time()-start
創建線程的開銷要比創建進程小的多

5、線程的安全問題

 1.互斥鎖

  數據共享必然會造成競爭,競爭就會造成數據錯亂問題。

  解決辦法:和進程一樣,加互斥鎖。

from threading import Thread, Lock
import time

num = 10

def task(lock):
    global num
    lock.acquire()
    a = num
    time.sleep(0.5)
    num = a-1
    lock.release()

ts = []
lock = Lock()
for i in range(10):
    t = Thread(target=task,args=(lock,))
    t.start()
    ts.append(t)
for t in ts:
    t.join()
print(num)
加互斥鎖,保證數據安全

  2.死鎖

  死鎖不是一種鎖,而是一種鎖的狀態,

  一般出現死鎖的情況有兩種:

    1. 對同一把鎖多次acquire.(使用RLOCK鎖,代替LOCK)

    2. 兩個或兩個以上的進程或線程在執行過程中,因爭奪資源造成的相互等待現象。(解決辦法:能不加最好不加,要加就只加一把)

from threading import Thread, Lock
import time

def task1(name, locka, lockb):
    locka.acquire()
    print("%s拿到a鎖"%name)
    time.sleep(0.3)
    lockb.acquire()
    print('%s拿到b鎖'%name)
    lockb.release()
    locka.release()
def task2(name, locka, lockb):
    lockb.acquire()
    print("%s拿到b鎖"%name)
    time.sleep(0.3)
    locka.acquire()
    print('%s拿到a鎖'%name)
    locka.release()
    lockb.release()

locka = Lock()
lockb = Lock()
t1 = Thread(target=task1, args=('t1', locka, lockb))
t2 = Thread(target=task2, args=('t2', locka, lockb))
t1.start()
t2.start()
死鎖的第二種情況的示例

  3.可重入鎖

  只能解決同一線程多次執行acquire情況。

  只有一個線程所有的acquire都被釋放,其他線程才能拿到這個鎖。

  也會發生死鎖現象。

from threading import Thread, RLock

lock = RLock()
lock.acquire()
lock.acquire()
lock.acquire()
lock.acquire()

print("over")
lock = RLock()

def task1():
    lock.acquire()
    print('task1')
def task2():
    lock.acquire()
    print('task2')


Thread(target=task1).start()
Thread(target=task2).start()
示例

  4. 信號量

  也是一種鎖,用來控制同一時間,有多少線程可以提供並發訪問,不是用來處理線程安全問題

from threading import Semaphore, Thread
import time
s_lock = Semaphore(3)


def task():
    s_lock.acquire()
    time.sleep(1)
    print("run.....")
    s_lock.release()


for i in range(20):
    t = Thread(target=task)
    t.start()
示例

6、守護線程

  守護線程在所有非守護線程結束后結束。

import threading
from threading import Thread
import time

def task1():
    print('thread-1 is running...')
    time.sleep(3)
    print('thread-1 over....')

def task2():
    print('thread-2 is running...')
    time.sleep(1)
    print('thread-2 over....')

if __name__ == '__main__':
    t1 = Thread(target=task1,)
    t2 = Thread(target=task2,)
    t1.setDaemon(True)
    t1.start()
    t2.start()
    print(t1.ident)
    print(threading.enumerate())
    print("main over...")
示例

7、GIL

  全局解釋器鎖,是一互斥鎖,只有在Cpython解釋器存在。

  為什么需要:因為一個python.exe進行運行只有一份解釋器,如果這個進程開啟的多個線程都要執行代碼,多線程之間就要競爭解釋器,一旦競爭就有可能出現問題。

  帶來的好處:保證了多線程同時訪問解釋器時的數據安全問題。

  帶來的問題:同一時間只有一個線程訪問解釋器,使得多線程無法真正的並發

  出現的原因:默認情況下,一個進程只有一個線程不會是不會出問題,但不要忘了還有GC線程,一旦出現多個線程就可能出現問題,所以當初就簡單粗暴的加上了GIL鎖

  GIL加鎖和解鎖時機:

    加鎖:在調用解釋器時立即加鎖

    解鎖:當前線程遇到IO時釋放,或者當前線程執行超過設定值釋放(py2計算的是執行代碼的行數,py3中計算的是時間)

  解決辦法:使用多進程或使用其他的python解釋器

8、線程池和進程池

  一種容器,本質十一存儲線程或進程的列表

  為什么使用? 因為服務器不能無限開啟線程或進程,所以需要對線程數量加以控制,線程池就是幫我們完成線程/進程的創建、銷毀以及任務分配

  特點:

    線程池在創建時不會開啟線程,

    等到任務提交時,如果沒有空閑線程,並且已存在的線程數量小於最大值,開啟新線程,

    線程開啟后不會關閉,直到進程全部結束為止

  (線程池的建立也要建在main函數下

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
pool= ProcessPoolExecutor(maxsize),創建進程池,maxsize為最大進程個數

res = pool.submit(task, 'a'), 提交任務

res.result(timeout),接收調用的返回值,timeout為超時時間,超時報錯
該函數是阻塞函數,會一直等待任務執行完畢
pool.shutdown(wait),所有任務執行完畢,阻塞函數
wait=True, 等待池內所有任務執行完畢后回收資源才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
方法和屬性
from concurrent.futures import ThreadPoolExecutor
import time

def task(num):
    time.sleep(0.5)
    print("%s is running....."%num)
    return num**2

pool = ThreadPoolExecutor()
ress = []
for i in range(10):
    res = pool.submit(task, i)
    ress.append(res)

pool.shutdown(wait=False)

for i in ress:
     print(i.result())

print('over')
示例

 9、同步異步阻塞非阻塞

  阻塞和非阻塞都是指程序的運行狀態

    阻塞:當程序執行遇到IO操作,無法繼續執行代碼

    非阻塞:程序執行沒有遇到IO操作,或通過某種方式,使程序遇到了也不會停在原地,還可以繼續執行

  同步異步指的是提交任務的方式

    同步:發起任務后必須原地等待任務執行完成,才可以繼續執行

    異步:發起任務后不用等待任務執行,可以立即執行其他操作

    異步效率高於同步,發起異步任務方式:就是多線程和多進程

  同步和阻塞的不同:阻塞一定使CPU已經切換,同步雖然在等待,但CPU沒有切走,還在當前進程中執行其他任務

10、異步回調

   其實說的是回調函數,給異步任務綁定一個函數,當任務完成時會自動調用該函數。

  優點:不用原地等待,任務結果立即獲取

  線程池或進程池內內的調用回調函數方法add_done_back(), 且回調函數必須有且只有一個參數,就是調用對象本身。

  線程池的回調函數是在子線程內執行,

  進程池的回調函數是在主進程下執行

import requests
from concurrent.futures import ThreadPoolExecutor
import threading


def product_data(url):
    data = requests.get(url)
    return data.text,url


def parser_data(f):
    res = f.result()
    print(len(res[0]), res[1], "當前線程", threading.current_thread())




if __name__ == '__main__':
    urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/']
    pool = ThreadPoolExecutor()
    for url in urls:
        f = pool.submit(product_data, url)
        f.add_done_callback(parser_data)
線程池異步調用
import requests
from concurrent.futures import  ProcessPoolExecutor
import os
def product_data(url):
    data = requests.get(url)
    return data.text,url


def parser_data(f):
    res = f.result()
    print(len(res[0]), res[1], ", callback pid", os.getpid() )




if __name__ == '__main__':
    urls = ['http://www.baidu.com','https://www.cnblogs.com/ywsun/', 'https://www.processon.com/']
    pool = ProcessPoolExecutor()
    print('main process', os.getpid())
    for url in urls:
        f = pool.submit(product_data, url)
        f.add_done_callback(parser_data)
進程池異步調用

11、線程隊列

  queue 該模塊下提供了一些常見的數據容器,僅僅是容器,沒有數據共享特點

  Queue,先進先出

  LifoQueue,后進先出

  PriorityQueue,可設置優先級的隊列。插入元組,第一個元素是優先級,可是數字、字母,對應的數值越小優先級越高

import queue


q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高
q.put((2,'a'))
q.put((1,'b'))
q.put((3,'c'))


print(q.get())
print(q.get())
print(q.get())


# 如果是字符,按照ASCII表來排序
q.put(('a', "sfsja"))
q.put(('b', "sdfsdf"))
q.put(('A', "sdfsdf"))
q.put(('ae', "sdfsdf"))
q.put(('ab', "sdfsdf"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
PriorityQueue示例

12、事件

  用於線程間通訊,線程間本就是數據共享,也就是即使沒有事件,也沒有問題

  線程之間,執行流程是完全獨立的,一些時候可能需要知道另一個進程發生了什么,然后采取一些行動。
  方法:
    event.isSet(),返回event的狀態值;
    event.wait(), 如果event.isSet()==False將阻塞進程。
    event.set(), 設置event的狀態Ture,將所有阻塞池的線程激活,進入就緒狀態。等待操作系統調度。
    even.clear(), 恢復event的狀態位False。
from threading import Thread, Event
import time
import random
boot= Event()


def server():
    print('啟動服務器。。。。')
    time.sleep(random.randint(1,3))
    print('服務器運行。。。。。')
    boot.set()

def connect():
    print('開始嘗試連接')
    boot.wait()
    print('連接成功')

t1 = Thread(target=server)
t1.start()

t2 = Thread(target=connect)
t2.start()
View Code

 

 

 

 

 

 

 

 

 

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM