Python 多線程和線程池


一,前言

  • 進程:是程序,資源集合,進程控制塊組成,是最小的資源單位
    • 特點:就對Python而言,可以實現真正的並行效果
    • 缺點:進程切換很容易消耗cpu資源,進程之間的通信相對線程來說比較麻煩  
  • 線程:是進程中最小的執行單位。
    • 特點無法利用多核,無法實現真正意義上是並行效果。
    • 優點:對於IO密集型的操作可以很好利用IO阻塞的時間

二,GIL(全局解釋器鎖)

  python目前有很多解釋器,目前使用最廣泛的是CPython,還有PYPY和JPython等解釋器,但是使用最廣泛的還是CPython解釋器,而對於全局解釋器鎖來說,就是在CPython上面才有的,它的原理是在解釋器層面加上一把大鎖,保證同一時刻只能有一個python線程在解釋器中執行。

  對於計算密集型的python多線程來說,無法利用到多線程帶來的效果, 在2.7時計算密集型的python多線程執行效率比順序執行的效率還低的多,在python3.5中對這種情況進行了優化,基本能實現這種多線程執行時間和順序執行時間差不多的效果。

  對於I/O密集型的python多線程來說,GIL的影響不是很大,因為I/O密集型的python多線程進程,每個線程在等待I/O的時候,將會釋放GIL資源,供別的線程來搶占。所以對於I/O密集型的python多線程進程來說,還是能比順序執行的效率要高的。

  python的GIL這個東西。。。比較惡心,但是由於CPython解釋器中的很多東西都是依賴這個東西開發的,如果改的話,將是一件浩大的工程。。。所以到現在還是存在這個問題,這也是python最為別人所詬病的地方。。。

三,多線程

  多線程相當於一個並發(concunrrency)系統。並發系統一般同時執行多個任務。如果多個任務可以共享資源,特別是同時

寫入某個變量的時候,就需要解決同步的問題,比如多線程火車售票系統:兩個指令,一個指令檢查票是否賣完,另一個指令,多

個窗口同時賣票,可能出現賣出不存在的票。

  3.1 python實現多線程

  python實現多線程有兩種方式,分別是直接調用和繼承調用,如下實例:

  直接調用:

import threading
import time


# 定義線程運行函數
def mv():
    print('播放=========')
    time.sleep(2)
    print('ending=======')


# 帶參數方式
def play(name):
    print("打游戲======"+name)
    time.sleep(3)
    print("ending======")


if __name__ == '__main__':
    th = threading.Thread(target=mv)
    th2 = threading.Thread(target=play, args=("LOL",))
    th.start()
    th2.start()

  繼承調用:

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, num):
        super(MyThread, self).__init__()
        self.num = num

    def run(self):
        print("play game %s======" % self.num)
        time.sleep(2)
        print("end play")


if __name__ == '__main__':
    mt = MyThread(1)
    mt2 = MyThread(2)
    mt.start()
    mt2.start()  

  可以看到直接調用是導入threading模塊並定義一個函數,之后實例化threading.Thread類的時候,將剛定義的函數名通過target參數傳遞進去,然后調用實例的start()方法啟動一個線程。

  而繼承式調用是創建一個類繼承自threading.Thread類,並在構造方法中調用父類的構造方法,之后重寫run方法,run方法中就是每個線程起來之后執行的內容,就類似於前面通過target參數傳遞進去的函數。之后以這個繼承的類來創建對象,並執行對象的start()方法啟動一個線程。

  從這里可以看出,其實。。。直接調用通過使用target參數把函數帶進類里面之后應該是用這個函數替代了run方法。

  3.2 線程阻塞和守護線程(join和setDaemon)

  join()方法在該線程對象啟動了之后調用線程的join()方法之后,那么主線程將會阻塞在當前位置直到子線程執行完成才繼續往下走,如果所有子線程對象都調用了join()方法,那么主線程將會在等待所有子線程都執行完之后再往下執行。

  setDaemon(True)方法在子線程對象調用start()方法(啟動該線程)之前就調用的話,將會將該子線程設置成守護模式啟動,這是什么意思呢?當子線程還在運行的時候,父線程已經執行完了,如果這個子線程設置是以守護模式啟動的,那么隨着主線程執行完成退出時,子線程立馬也退出,如果沒有設置守護啟動子線程(也就是正常情況下)的話,主線程執行完成之后,進程會等待所有子線程執行完成之后才退出。

  join示例:

import threading
import time


# 定義線程運行函數
def mv():
    print('播放=========')
    time.sleep(2)
    print('ending=======')


# 帶參數方式
def play(name):
    print("打游戲======"+name)
    time.sleep(3)
    print("ending======")


games = ["LOL", "DNF", "PUBG"]

if __name__ == '__main__':
    th = threading.Thread(target=mv)
    th.start()
    # th.join()  # 帶上該代碼,程序(也就是主線程)不會馬上往下執行,回顯執行完th線程
    for i in [threading.Thread(target=play, args=(i, )) for i in games]:
        i.start()

  setDaemon示例:

import threading
import time


# 定義線程運行函數
def mv():
    print('播放=========')
    time.sleep(5)
    print('ending=======')


if __name__ == '__main__':
    th = threading.Thread(target=mv)
    th.setDaemon(True) # 若有改行代碼,程序執行直接退出,不打印出 ending=====,因為主線程執行完畢了直接退出。
    th.start()

  3.3 線程鎖

  互斥鎖的產生是因為前面提到過多線程之間是共享同一塊內存地址的,也就是說多個不同的線程能夠訪問同一個變量中的數據,那么,當多個線程要修改這個變量,會產生什么情況呢?當多個線程修改同一個數據的時候,如果操作的時間夠短的話,能得到我們想要的結果,但是,如果修改數據不是原子性的(這中間的時間太長)的話。。。很有可能造成數據的錯誤覆蓋,從而得到我們不想要的結果。例子如下:

import threading
import time

count = 0


def add_num():
    global count
    tmp = count
    time.sleep(0.001) # 若沒有改代碼,輸出100,若有改代碼輸出是10,11,9等不確定是數
    count = tmp + 1


def run(add_fun):
    global count
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=add_fun)
        t.start()
        thread_list.append(t)
    for j in thread_list:
        j.join()

    print(count)


if __name__ == '__main__':
    run(add_num)

  出現上述情況的原因:是因為,盡管count+=1是非原子操作,但是因為CPU執行的太快了,比較難以復現出多進程的非原子操作導致的進程不安全。經過代替之后,盡管只sleep了0.001秒,但是對於CPU的時間來說是非常長的,會導致這個代碼塊執行到一半,GIL鎖就釋放了。即tmp已經獲取到count的值了,但是還沒有將tmp + 1賦值給count。而此時其他線程如果執行完了count = tmp + 1, 當返回到原來的線程執行時,盡管count的值已經更新了,但是count = tmp + 1是個賦值操作,賦值的結果跟count的更新的值是一樣的。最終導致了我們累加的值有很多丟失。

  互斥鎖

   針對上面的情況,我們就需要引入互斥鎖的這一概念。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。 

import threading
import time

count = 0


def add_num():
    global count
    if lock.acquire():  # 獲得鎖,並返回True
        tmp = count
        time.sleep(0.001)
        count = tmp + 1
        lock.release()  # 執行完釋放鎖


def run(add_fun):
    global count
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=add_fun)
        t.start()
        thread_list.append(t)
    for j in thread_list:
        j.join()

    print(count)


if __name__ == '__main__':
    lock = threading.Lock()
    run(add_num)

  另一種獲得鎖的方式(with):

import threading
import time

count = 0


def add_num():
    global count
    with lock:  # 獲得鎖,並返回True
        tmp = count
        time.sleep(0.001)
        count = tmp + 1


def run(add_fun):
    global count
    thread_list = []
    for i in range(100):
        t = threading.Thread(target=add_fun)
        t.start()
        thread_list.append(t)
    for j in thread_list:
        j.join()

    print(count)


if __name__ == '__main__':
    lock = threading.Lock()
    run(add_num)

  迭代死鎖

import threading
import time


class MyThread(threading.Thread):
    def run(self):
        global num
        time.sleep(1)
        if mutex.acquire():  # 第一次獲得鎖
            num = num+1
            msg = self.name+' set num to '+str(num)
            print(msg)
        mutex.acquire()  # 在鎖未釋放的時候第二次獲得鎖,需要注意的是這里的鎖指的是同一個鎖對象mutex
        mutex.release()
        mutex.release()


num = 0
mutex = threading.Lock()


def test():
    for i in range(5):
        t = MyThread()
        t.start()


if __name__ == '__main__':
  test()  # 無輸出,一直阻塞,因為沒有釋放鎖,又想再次獲得鎖,該鎖已經被拿走了,是空的就會一直等待鎖釋放

  上述代碼中,無輸出,一直阻塞,因為沒有釋放鎖,又想再次獲得鎖,該鎖已經被拿走了,是空的就會一直等待鎖釋放使用的是同一個鎖對象muex,若創建一個新的鎖對象,就不會出現這些情況。

  相互調用鎖

import threading
import time


def fun1():
    print('=====')
    lock1.acquire()
    time.sleep(1)
    print('-------')
    lock2.acquire()  # lock2在另一線程使用,為釋放
    time.sleep(2)
    lock1.release()
    lock2.release()


def fun2():
    print("-------")
    lock2.acquire()
    print("locl2====")
    time.sleep(0.1)
    lock1.acquire()  # lock1在上一線程使用未釋放
    print("lock3=====")
    time.sleep(1)
    lock1.release()
    lock2.release()


lock1 = threading.Lock()
lock2 = threading.Lock()


def run():
    th =threading.Thread(target=fun1)
    th2 =threading.Thread(target=fun2)
    th.start()
    th2.start()


if __name__ == '__main__':
    run()

  像上面這種情況,鎖未釋放就獲取另一把鎖,而另一把鎖已經在使用,同時另一線程反過來調用第一把鎖,從而產生這種相互等待鎖的阻塞情況。

  死鎖解決

  為了支持在同一線程中多次請求同一資源,python提供了“可重入鎖”:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。這里以例1為例,如果使用RLock代替Lock,則不會發生死鎖:

import threading
import time
class MyThread(threading.Thread):
  def run(self):
    global num
    time.sleep(1)
    if mutex.acquire(1):
      num = num+1
      msg = self.name+' set num to '+str(num)
      print msg
      mutex.acquire()
      mutex.release()
      mutex.release()
num = 0
mutex = threading.RLock()
def test():
  for i in range(5):
    t = MyThread()
    t.start()
if __name__ == '__main__':
  test()

  3.5 信號量

  Semaphore管理一個內置的計數器

  Semaphore與進程池看起來類似,但是是完全不同的概念。

  進程池:Pool(4),最大只能產生四個進程,而且從頭到尾都只是這四個進程,不會產生新的。

  信號量:信號量是產生的一堆進程/線程,即產生了多個任務都去搶那一把鎖

from threading import Thread,Semaphore,currentThread
import time,random
sm = Semaphore(5) #運行的時候有5個人
def task():
    sm.acquire()
    print('\033[42m %s上廁所'%currentThread().getName())
    time.sleep(random.randint(1,3))
    print('\033[31m %s上完廁所走了'%currentThread().getName())
    sm.release()
if __name__ == '__main__':
    for i in range(20):  #開了10個線程 ,這20人都要上廁所
        t = Thread(target=task)
        t.start()

Semaphore舉例

四,線程池

  線程池在系統啟動時即創建大量空閑的線程,程序只要將一個函數提交給線程池,線程池就會啟動一個空閑的線程來執行它。當該函數執行結束后,該線程並不會死亡,而是再次返回到線程池中變成空閑狀態,等待執行下一個函數。
  此外,使用線程池可以有效地控制系統中並發線程的數量。當系統中包含有大量的並發線程時,會導致系統性能急劇下降,甚至導致 Python 解釋器崩潰,而線程池的最大線程數參數可以控制系統中並發線程的數量不超過此數。

  4.1 介紹和基本方法

官網:https://docs.python.org/dev/library/concurrent.futures.html

concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
Both implement the same interface, which is defined by the abstract Executor class.
1、submit(fn, *args, **kwargs)
異步提交任務

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循環submit的操作

3、shutdown(wait=True) 
相當於進程池的pool.close()+pool.join()操作
wait=True,等待池內所有任務執行完畢回收完資源后才繼續
wait=False,立即返回,並不會等待池內的任務執行完畢
但不管wait參數為何值,整個程序都會等到所有任務執行完畢
submit和map必須在shutdown之前

4、result(timeout=None)
取得結果

5、add_done_callback(fn)
回調函數

  4.2  線程池的使用

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

  4.3 異步調用和同步調用

  1、同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,在執行下一行代碼,導致程序是串行

  2、異步調用:提交完任務后,不用原地等待任務執行完畢

  回調函數:可以為進程池或線程池內得每個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢后自動觸發,並接受任務的返回值當作參數,該函數成為回調函數。

#提交任務的兩種方式
#1、同步調用:提交完任務后,就在原地等待任務執行完畢,拿到結果,再執行下一行代碼,導致程序是串行執行
#
# from concurrent.futures import ThreadPoolExecutor
# import time
# import random
#
# def la(name):
#     print('%s is laing' %name)
#     time.sleep(random.randint(3,5))
#     res=random.randint(7,13)*'#'
#     return {'name':name,'res':res}
#
# def weigh(shit):
#     name=shit['name']
#     size=len(shit['res'])
#     print('%s 拉了 《%s》kg' %(name,size))
#
#
# if __name__ == '__main__':
#     pool=ThreadPoolExecutor(13)
#
#     shit1=pool.submit(la,'alex').result()
#     weigh(shit1)
#
#     shit2=pool.submit(la,'wupeiqi').result()
#     weigh(shit2)
#
#     shit3=pool.submit(la,'yuanhao').result()
#     weigh(shit3)


#2、異步調用:提交完任務后,不地等待任務執行完畢,

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(3,5))
    res=random.randint(7,13)*'#'
    return {'name':name,'res':res}


def weigh(shit):
    shit=shit.result()
    name=shit['name']
    size=len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(13)

    pool.submit(la,'alex').add_done_callback(weigh)

    pool.submit(la,'wupeiqi').add_done_callback(weigh)

    pool.submit(la,'yuanhao').add_done_callback(weigh)

五,總結

  對應IO密集型任務可以通過創建線程池來提高效率,而對於計算密集型則沒必要,計算密集型可以考慮分布式計算。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM