python並發


Python中實現並發編程的三種方案:多線程、多進程和異步I/O。並發編程的好處在於可以提升程序的執行效率以及改善用戶體驗;壞處在於並發的程序不容易開發和調試,同時對其他程序來說它並不友好。

  • 多線程:Python中提供了Thread類並輔以Lock、Condition、Event、Semaphore和Barrier。Python中有GIL來防止多個線程同時執行本地字節碼,這個鎖對於CPython是必須的,因為CPython的內存管理並不是線程安全的,因為GIL的存在多線程並不能發揮CPU的多核特性。

    """
    面試題:進程和線程的區別和聯系? 進程 - 操作系統分配內存的基本單位 - 一個進程可以包含一個或多個線程 線程 - 操作系統分配CPU的基本單位 並發編程(concurrent programming) 1. 提升執行性能 - 讓程序中沒有因果關系的部分可以並發的執行 2. 改善用戶體驗 - 讓耗時間的操作不會造成程序的假死 """ import glob import os import threading from PIL import Image PREFIX = 'thumbnails' def generate_thumbnail(infile, size, format='PNG'): """生成指定圖片文件的縮略圖""" file, ext = os.path.splitext(infile) file = file[file.rfind('/') + 1:] outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}' img = Image.open(infile) img.thumbnail(size, Image.ANTIALIAS) img.save(outfile, format) def main(): """主函數""" if not os.path.exists(PREFIX): os.mkdir(PREFIX) for infile in glob.glob('images/*.png'): for size in (32, 64, 128): # 創建並啟動線程 threading.Thread( target=generate_thumbnail, args=(infile, (size, size)) ).start() if __name__ == '__main__': main()

    多個線程競爭資源的情況

    """
    多線程程序如果沒有競爭資源處理起來通常也比較簡單 當多個線程競爭臨界資源的時候如果缺乏必要的保護措施就會導致數據錯亂 說明:臨界資源就是被多個線程競爭的資源 """ import time import threading from concurrent.futures import ThreadPoolExecutor class Account(object): """銀行賬戶""" def __init__(self): self.balance = 0.0 self.lock = threading.Lock() def deposit(self, money): # 通過鎖保護臨界資源 with self.lock: new_balance = self.balance + money time.sleep(0.001) self.balance = new_balance class AddMoneyThread(threading.Thread): """自定義線程類""" def __init__(self, account, money): self.account = account self.money = money # 自定義線程的初始化方法中必須調用父類的初始化方法 super().__init__() def run(self): # 線程啟動之后要執行的操作 self.account.deposit(self.money) def main(): """主函數""" account = Account() # 創建線程池 pool = ThreadPoolExecutor(max_workers=10) futures = [] for _ in range(100): # 創建線程的第1種方式 # threading.Thread( # target=account.deposit, args=(1, ) # ).start() # 創建線程的第2種方式 # AddMoneyThread(account, 1).start() # 創建線程的第3種方式 # 調用線程池中的線程來執行特定的任務 future = pool.submit(account.deposit, 1) futures.append(future) # 關閉線程池 pool.shutdown() for future in futures: future.result() print(account.balance) if __name__ == '__main__': main()

    修改上面的程序,啟動5個線程向賬戶中存錢,5個線程從賬戶中取錢,取錢時如果余額不足就暫停線程進行等待。為了達到上述目標,需要對存錢和取錢的線程進行調度,在余額不足時取錢的線程暫停並釋放鎖,而存錢的線程將錢存入后要通知取錢的線程,使其從暫停狀態被喚醒。可以使用threading模塊的Condition來實現線程調度,該對象也是基於鎖來創建的,代碼如下所示:

    """
    多個線程競爭一個資源 - 保護臨界資源 - 鎖(Lock/RLock) 多個線程競爭多個資源(線程數>資源數) - 信號量(Semaphore) 多個線程的調度 - 暫停線程執行/喚醒等待中的線程 - Condition """ from concurrent.futures import ThreadPoolExecutor from random import randint from time import sleep import threading class Account(): """銀行賬戶""" def __init__(self, balance=0): self.balance = balance lock = threading.Lock() self.condition = threading.Condition(lock) def withdraw(self, money): """取錢""" with self.condition: while money > self.balance: self.condition.wait() new_balance = self.balance - money sleep(0.001) self.balance = new_balance def deposit(self, money): """存錢""" with self.condition: new_balance = self.balance + money sleep(0.001) self.balance = new_balance self.condition.notify_all() def add_money(account): while True: money = randint(5, 10) account.deposit(money) print(threading.current_thread().name, ':', money, '====>', account.balance) sleep(0.5) def sub_money(account): while True: money = randint(10, 30) account.withdraw(money) print(threading.current_thread().name, ':', money, '<====', account.balance) sleep(1) def main(): account = Account() with ThreadPoolExecutor(max_workers=10) as pool: for _ in range(5): pool.submit(add_money, account) pool.submit(sub_money, account) if __name__ == '__main__': main()
  • 多進程:多進程可以有效的解決GIL的問題,實現多進程主要的類是Process,其他輔助的類跟threading模塊中的類似,進程間共享數據可以使用管道、套接字等,在multiprocessing模塊中有一個Queue類,它基於管道和鎖機制提供了多個進程共享的隊列。下面是官方文檔上關於多進程和進程池的一個示例。

    """
    多進程和進程池的使用 多線程因為GIL的存在不能夠發揮CPU的多核特性 對於計算密集型任務應該考慮使用多進程 time python3 example22.py real 0m11.512s user 0m39.319s sys 0m0.169s 使用多進程后實際執行時間為11.512秒,而用戶時間39.319秒約為實際執行時間的4倍 這就證明我們的程序通過多進程使用了CPU的多核特性,而且這台計算機配置了4核的CPU """ import concurrent.futures import math PRIMES = [ 1116281, 1297337, 104395303, 472882027, 533000389, 817504243, 982451653, 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419 ] * 5 def is_prime(n): """判斷素數""" if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): """主函數""" with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print('%d is prime: %s' % (number, prime)) if __name__ == '__main__': main()

    說明:多線程和多進程的比較。

    以下情況需要使用多線程:

    1. 程序需要維護許多共享的狀態(尤其是可變狀態),Python中的列表、字典、集合都是線程安全的,所以使用線程而不是進程維護共享狀態的代價相對較小。
    2. 程序會花費大量時間在I/O操作上,沒有太多並行計算的需求且不需占用太多的內存。

    以下情況需要使用多進程:

    1. 程序執行計算密集型任務(如:字節碼操作、數據處理、科學計算)。
    2. 程序的輸入可以並行的分成塊,並且可以將運算結果合並。
    3. 程序在內存使用方面沒有任何限制且不強依賴於I/O操作(如:讀寫文件、套接字等)。
  • 異步處理:從調度程序的任務隊列中挑選任務,該調度程序以交叉的形式執行這些任務,我們並不能保證任務將以某種順序去執行,因為執行順序取決於隊列中的一項任務是否願意將CPU處理時間讓位給另一項任務。異步任務通常通過多任務協作處理的方式來實現,由於執行時間和順序的不確定,因此需要通過回調式編程或者future對象來獲取任務執行的結果。Python 3通過asyncio模塊和awaitasync關鍵字(在Python 3.7中正式被列為關鍵字)來支持異步處理。

    """
    異步I/O - async / await """ import asyncio def num_generator(m, n): """指定范圍的數字生成器""" yield from range(m, n + 1) async def prime_filter(m, n): """素數過濾器""" primes = [] for i in num_generator(m, n): flag = True for j in range(2, int(i ** 0.5 + 1)): if i % j == 0: flag = False break if flag: print('Prime =>', i) primes.append(i) await asyncio.sleep(0.001) return tuple(primes) async def square_mapper(m, n): """平方映射器""" squares = [] for i in num_generator(m, n): print('Square =>', i * i) squares.append(i * i) await asyncio.sleep(0.001) return squares def main(): """主函數""" loop = asyncio.get_event_loop() future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100)) future.add_done_callback(lambda x: print(x.result())) loop.run_until_complete(future) loop.close() if __name__ == '__main__': main()

    說明:上面的代碼使用get_event_loop函數獲得系統默認的事件循環,通過gather函數可以獲得一個future對象,future對象的add_done_callback可以添加執行完成時的回調函數,loop對象的run_until_complete方法可以等待通過future對象獲得協程執行結果。

    Python中有一個名為aiohttp的三方庫,它提供了異步的HTTP客戶端和服務器,這個三方庫可以跟asyncio模塊一起工作,並提供了對Future對象的支持。Python 3.6中引入了async和await來定義異步執行的函數以及創建異步上下文,在Python 3.7中它們正式成為了關鍵字。下面的代碼異步的從5個URL中獲取頁面並通過正則表達式的命名捕獲組提取了網站的標題。

    import asyncio
    import re import aiohttp PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>') async def fetch_page(session, url): async with session.get(url, ssl=False) as resp: return await resp.text() async def show_title(url): async with aiohttp.ClientSession() as session: html = await fetch_page(session, url) print(PATTERN.search(html).group('title')) def main(): urls = ('https://www.python.org/', 'https://git-scm.com/', 'https://www.jd.com/', 'https://www.taobao.com/', 'https://www.douban.com/') loop = asyncio.get_event_loop() tasks = [show_title(url) for url in urls] loop.run_until_complete(asyncio.wait(tasks)) loop.close() if __name__ == '__main__': main()

    說明:異步I/O與多進程的比較。

    當程序不需要真正的並發性或並行性,而是更多的依賴於異步處理和回調時,asyncio就是一種很好的選擇。如果程序中有大量的等待與休眠時,也應該考慮asyncio,它很適合編寫沒有實時數據處理需求的Web應用服務器。

    Python還有很多用於處理並行任務的三方庫,例如:joblib、PyMP等。實際開發中,要提升系統的可擴展性和並發性通常有垂直擴展(增加單個節點的處理能力)和水平擴展(將單個節點變成多個節點)兩種做法。可以通過消息隊列來實現應用程序的解耦合,消息隊列相當於是多線程同步隊列的擴展版本,不同機器上的應用程序相當於就是線程,而共享的分布式消息隊列就是原來程序中的Queue。消息隊列(面向消息的中間件)的最流行和最標准化的實現是AMQP(高級消息隊列協議),AMQP源於金融行業,提供了排隊、路由、可靠傳輸、安全等功能,最著名的實現包括:Apache的ActiveMQ、RabbitMQ等。

    要實現任務的異步化,可以使用名為Celery的三方庫。Celery是Python編寫的分布式任務隊列,它使用分布式消息進行工作,可以基於RabbitMQ或Redis來作為后端的消息代理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM