Python多進程、多線程和協程簡介


進程和線程

進程是一個執行中的程序。每個進程都擁有自己的地址空間、內存、數據棧以及其他用於跟蹤執行的輔助數據。在單核CPU系統中的多進程,內存中可以有許多程序,但在給定一個時刻只有一個程序在運行;就是說,可能這一秒在運行進程A,下一秒在運行進程B,雖然兩者都在內存中,都沒有真正同時運行。

線程從屬於進程,是程序的實際執行者。一個進程至少包含一個主線程,也可以有更多的子線程。Python可以運行多線程,但和單核CPU多進程一樣,在給定時刻只有一個線程會執行。

Python 提供了多個模塊來支持多線程編程,包括thread、threading 和Queue 模塊等。程序是可以使用thread 和threading 模塊來創建與管理線程;推薦用threading模塊,它更先進,有更好的線程支持。thread 模塊提供了基本的線程和鎖定支持,在Python3 中該模塊被重命名為_thread;threading 模塊提供了更高級別、功能更全面的線程管理。使用Queue 模塊,用戶可以創建一個隊列數據結構,用於在多線程之間進行共享。

 

多進程 

利用Process來創建子進程

可以使用multiprocessing模塊中的Process來創建子進程,該模塊還有更高級的封裝,例如批量啟動進程的進程池(Pool)、用於進程間通信的隊列(Queue)和管道(Pipe)等。

# -*- coding:utf-8 -*-
from multiprocessing import Process
from time import ctime, sleep
def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())


if __name__=="__main__":
    p1 = Process(target=loop, args=(1, 4))
    p2 = Process(target=loop, args=(2, 3))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("finished")

利用進程池

Pool

Pool是用於批量啟動進程的進程池,我們可以使用它來啟動多進程

# -*- coding:utf-8 -*-
from multiprocessing import Pool
from time import ctime, sleep
def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())


if __name__=="__main__":
    pool = Pool(processes=3)
    for i, j in zip([1,2],[4,3]):
        # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去
        pool.apply_async(loop, args=(i, j))      
    pool.close()
    # 調用join之前,先調用close函數,否則會出錯。執行完close后不會有新的進程加入到pool,join函數等待所有子進程結束
    pool.join()  

ProcessPoolExecutor

從Python3.2開始,標准庫 concurrent.futures 模塊提供了ProcessPoolExecutor (進程池)供我們使用

# -*- coding:utf-8 -*-
from concurrent.futures import ProcessPoolExecutor
from time import ctime, sleep
def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())

if __name__=="__main__":
    with ProcessPoolExecutor(max_workers=3) as executor:
        all_task = [executor.submit(loop, i, j) for i, j in zip([1,2],[4,3])]

多線程

利用Thread創建子線程

# -*- coding:utf-8 -*-
import threading
from time import sleep, ctime, time

def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())

def main():
    threads = []
    for i, j in zip([1,2],[4,3]):
        t = threading.Thread(target=loop, args=(i, j))
        threads.append(t)
    # 線程開始執行
    for t in threads:
        t.start()
    # 等待所有線程執行完成
    for t in threads:
        t.join()

if __name__ == "__main__":
    start = time()
    main()    
    print("time: ", time()-start)

當所有線程都分配完成之后,通過調用每個線程的start()方法讓它們開始執行,而不是在這之前就會執行。join()方法將等待線程結束,或者在提供了超時時間的情況下,達到超時時間。相比於管理一組鎖(分配、獲取、釋放、檢查鎖狀態等)而言,使用join()方法要比等待鎖釋放的無限循環更加清晰(這也是這種鎖又稱為自旋鎖的原因)。

對於 join()方法而言,其另一個重要方面是其實它根本不需要調用。一旦線程啟動,它們就會一直執行,直到給定的函數完成后退出。如果主線程還有其他事情要去做,而不是等待這些線程完成(例如其他處理或者等待新的客戶端請求),就可以不調用join()。join()方法只有在你需要等待線程完成的時候才是有用的。

我們可以創建一個類繼承threading.Thead,讓這個類更加通用,而不只是針對loop()函數,如果我們有別的函數也可以用這個類來使用多線程。我們需要覆寫Thread的__init__()和run()方法,或者調用父類的__init__()然后覆寫run()方法。

Python官方文檔:https://docs.python.org/3/library/threading.html#thread-objects

# -*- coding:utf-8 -*-
import threading
from time import sleep, ctime
class MyThread(threading.Thread):
    def __init__(self, func, args, name=""):
        threading.Thread.__init__(self)
        self.name = name
        self.func = func
        self.args = args
    def run(self):
        self.func(*self.args)

def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())

def main():
    print("starting at:", ctime())
    threads = []
    for i, j in zip([1,2],[4,3]):
        t = MyThread(loop, args=(i, j), name=loop.__name__)
        threads.append(t)
        
    # 線程開始執行
    for t in threads:
        t.start()

    # 等待所有線程執行完成
    for t in threads:
        t.join()
    
    print("all DONE at:", ctime())

if __name__ == "__main__":
    main()

利用線程池

ThreadPool

# -*- coding:utf-8 -*-
from multiprocessing.dummy import Pool as ThreadPool
from time import ctime, sleep
def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())


if __name__=="__main__":
    pool = ThreadPool(processes=3)
    for i, j in zip([1,2],[4,3]):
        pool.apply_async(loop, args=(i, j))      
    pool.close()
    pool.join()  

ThreadPoolExecutor

從Python3.2開始,標准庫 concurrent.futures 模塊提供了ThreadPoolExecutor (線程池)供我們使用

# -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
from time import ctime, sleep
def loop(nloop, nsec):
    print("start loop", nloop, "at:", ctime())
    sleep(nsec)
    print("loop", nloop, "done at:", ctime())

if __name__=="__main__":
    with ThreadPoolExecutor(max_workers=3) as executor:
        all_task = [executor.submit(loop, i, j) for i, j in zip([1,2],[4,3])]

協程

協程,英文Coroutines,是一種比線程更加輕量級的存在。正如一個進程可以擁有多個線程一樣,一個線程也可以擁有多個協程。最重要的是,協程不是被操作系統內核所管理,而完全是由程序所控制(也就是在用戶態執行)。

這樣帶來的好處就是性能得到了很大的提升,不會像線程切換那樣消耗資源。

 

引入

帶有yield的函數不再是普通函數,而是生成器。send可以把一個函數的結果傳給另外一個函數,以此實現單線程內程序之間的切換。

# -*- coding:utf-8 -*-
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None) # 等價於next(c)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

這里,produce先用 c.send(None) 啟動生成器,consumer() 執行到 yield r 便停下來,並將r返回 給調用它的函數(比如next()或send());這時候consumer()被掛起,produce繼續執行,當運行到 r=c.send(n) 時又讓consumer()執行 ;此時consumer()將r賦值給n,並繼續往下運行,執行print()函數,並將 '200 OK' 賦值給 r ;之后進入下一個while循環,又到了 yield r ,這時就跟前面一樣了,停下了將r返回給調用它的函數,這時produce()里 r=c.send(n);不斷重復上面,直到循環結束,c.close()關閉生成器。

asyncio

asyncio是Python 3.4版本引入的標准庫,直接內置了對異步IO的支持。

asyncio的編程模型就是一個消息循環。我們從asyncio模塊中直接獲取一個EventLoop的引用,然后把需要執行的協程扔到EventLoop中執行,就實現了異步IO。

# -*- coding:utf-8 -*-
import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 異步調用asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()

@asyncio.coroutine把一個生成器標記為coroutine類型,然后,我們就把這個coroutine扔到EventLoop中執行。

yield from語法可以讓我們方便地調用另一個生成器。由於asyncio.sleep()也是一個coroutine,所以線程不會等待asyncio.sleep(),而是直接中斷並執行下一個消息循環。當asyncio.sleep()返回時,線程就可以從yield from拿到返回值(此處是None),然后接着執行下一行語句。

把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主線程並未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實現並發執行。

我們用asyncio的異步網絡連接來獲取sina、sohu和163的網站首頁:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

async/await

從Python 3.5開始引入了新的語法asyncawait,可以讓代碼更簡潔易讀。asyncio是用來編寫並發的代碼庫。 

# -*- coding:utf-8 -*-
import asyncio

async def slow_operation(n):
    await asyncio.sleep(1)
    print("Slow operation {} completed".format(n))

async def main():
    await asyncio.wait([
        slow_operation(1),
        slow_operation(2),
        slow_operation(3),
    ])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

 再看一個例子,這里的aiohttp實現了HTTP客戶端和HTTP服務器的功能,對異步操作提供了非常好的支持,有興趣可以閱讀它的官方文檔

import asyncio
import aiohttp

async def download(url):
    print('Fetch:', url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(url, '--->', resp.status)
            print(url, '--->', resp.cookies)
            print('\n\n', await resp.text())

def main():
    loop = asyncio.get_event_loop()
    urls = [
        'https://www.baidu.com',
        'http://www.sohu.com/',
        'http://www.sina.com.cn/',
        'https://www.taobao.com/',
        'https://www.jd.com/'
    ]
    tasks = [download(url) for url in urls]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

if __name__ == '__main__':
    main()

 

參考資料

https://www.cnblogs.com/friendwrite/articles/10414273.html

https://www.jianshu.com/p/a69dec87e646

https://www.cnblogs.com/sui776265233/p/9325996.html

https://www.liaoxuefeng.com/wiki/1016959663602400/1017968846697824

《Python核心編程》

https://www.jianshu.com/p/7be32bf906fb

https://github.com/jackfrued/Python-100-Days/blob/master/Day66-75/69.%E5%B9%B6%E5%8F%91%E4%B8%8B%E8%BD%BD.md

https://docs.python.org/zh-cn/3/library/asyncio.html#module-asyncio


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM