Python並發
並發三種層次
個人理解,並發是計算機在邏輯上能處理多任務的能力。一般分類三種類型:
- 異步,異步本質上是單線程的,因為 IO 操作在很多時候會存在阻塞,異步就是在這種阻塞的時候,通過控制權的交換來實現多任務的。即異步本質上是運行過程中的控制權的交換。最典型的例子就是生產者消費者模型。
- 異步這個概念在不同的地方有不同的說法,比如 python 里面叫做協程,內部通過生成器來實現控制權的交換。但是無論怎么稱呼,異步這種並發方式都脫離不了控制權的交換這么一個事實。
- 多進程,進程是一個程序具體的實例,擁有自己獨立的內存單元。
- 多線程,線程依附於進程,共享存儲空間。
- 由於 Python 官方的解釋器 Cython 對多線程有一個全局的鎖(GIL),所以 Cython 中的線程局限性會比較大。這里不多解釋。
這里還有一個概念需要注意,在使用並發的時候弄清楚需要並發的任務是計算密集還是IO密集。
因為異步對於計算密集的任務是無效的。因為異步的本質是 IO 操作過程中阻塞時的控制權交換。在計算密集的任務中是沒有這樣的阻塞的。
協程
前面說了異步的本質是控制權的交換,這里通過一個生產者消費者模型的例子來體會一下這么個過程。
生成者消費者
def consumer(): # 定義消費者,由於有yeild關鍵詞,此消費者為一個生成器
print("[Consumer] Init Consumer ......")
r = "init ok" # 初始化返回結果,並在啟動消費者時,返回給生產者
while True:
n = yield r # 消費者通過yield接收生產者的消息,同時返給其結果
print("[Consumer] conusme n = %s, r = %s" % (n, r))
r = "consume %s OK" % n # 消費者消費結果,下個循環返回給生產者
def produce(c): # 定義生產者,此時的 c 為一個生成器
print("[Producer] Init Producer ......")
r = c.send(None) # 啟動消費者生成器,同時第一次接收返回結果
print("[Producer] Start Consumer, return %s" % r)
n = 0
while n < 5:
n += 1
print("[Producer] While, Producing %s ......" % n)
r = c.send(n) # 向消費者發送消息並准備接收結果。此時會切換到消費者執行
print("[Producer] Consumer return: %s" % r)
c.close() # 關閉消費者生成器
print("[Producer] Close Producer ......")
produce(consumer())
新關鍵字
# 異步IO例子:適配Python3.5,使用async和await關鍵字
async def hello(index): # 通過關鍵字async定義協程
print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
await asyncio.sleep(1) # 模擬IO任務
print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))
loop = asyncio.get_event_loop() # 得到一個事件循環模型
tasks = [hello(1), hello(2)] # 初始化任務列表
loop.run_until_complete(asyncio.wait(tasks)) # 執行任務
loop.close() # 關閉事件循環列表
網絡io
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(url, resp.status)
print(url, await resp.text())
loop = asyncio.get_event_loop() # 得到一個事件循環模型
tasks = [ # 初始化任務列表
get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
get("http://zhushou.360.cn/detail/index/soft_id/705490")
]
loop.run_until_complete(asyncio.wait(tasks)) # 執行任務
loop.close() # 關閉事件循環列表
線/進程
這里以進程的multiprocessing
模塊舉例,線程可以使用multiprocessing.dummy
,所有的API均相同。
例子
import multiprocessing as mp
############## 直接實例化 ############
def func(number):
result = number * 2
p = mp.Process(target=func, args=(3, )) #實例化進程對象
p.start() #運行進程
############ 類封裝 #############
class MyProcess(mp.Process):
def __init__(self, interval):
mp.Process.__init__(self)
# 需要重載的函數
def run(self):
print('I'm running)
p = MyProcess(1)
p.start()
#################################
p.terminal() # 主動結束進程
p.join() #讓主進程等待子進程結束
# 一些常用的屬性
p.pid #獲得進程的id號
p.name #獲得進程名
p.is_alive() #判斷進程是否還存活
p.daemon = True #設置進程隨主進程一起結束
mp.active_children() #獲得當前進程的所有子進程
mp.current_process() #返回正在運行的進程
os.getpid() #獲得當前進程的pid
線程池
from multiprocessing.dummy import Pool as ThreadPool
tasks = list()
def do_task(item):
return item
pool = ThreadPool(3)
################ 原始操作 #######################
for item in items:
pool.apply_async(do_task, (item,)) #添加進程,非阻塞,返回執行結果
pool.apply(do_task, (item,)) #阻塞
############## map操作 #####################3
results = pool.map(do_task, items)
################################
pool.close() #關閉進程池后不會有新的進程被創建
pool.join() #等到結束,必須在close后使用
進程通信
# Lock(鎖)
# 限制對資源的訪問
def func(lock): #使用with
with lock:
print('I got lock')
def func(lock): #不使用with
lock.acquire() #請求鎖
try:
print('I got lock')
finally:
lock.release() #釋放鎖
lock = mp.Lock() #申請鎖
p = mp.Process(target=func, args=(lock,))
p.start()
############################################
# Semaphore(信號量)
# 限制資源的最大連接數
def func(s):
s.aquire() #請求連接
s.release() #斷開連接
s = mp.Semaphore(2) #定義信號量的最大連接數
for i in range(5):
p = mp.Process(target=func, arg=(s))
p.start
############################################
# Event(事件)
# 進程間同步
def func(e):
e.wait() #定義等待時間,默認等待到e.set()為止,阻塞
e.is_set() #判斷消息是否被發出
print('got')
e = mp.Event()
p = mp.Process(target=func, args=(e,))
p.start()
e.set() #發出消息
############################################
# Queue(隊列)
# 多進程之間的數據傳遞
import Queue
Queue.Queue(maxsize = 0) # 先進先出, maxsize小於等於則不限大小
Queue.LifoQueue(maxsize = 0) # 后進先出
Queue.PriorityQueue(maxsize = 0) # 構造一個優先級隊列
#異常
Queue.Empty #當調用非阻塞的get()獲取空隊列的元素時, 引發異常
Queue.Full #當調用非阻塞的put()向滿隊列中添加元素時, 引發異常
# 生存者消費者模型
def produce(q):
try:
data = q.put(data, block=, timeout=)
# 若block為False且隊列已滿,則立即拋出Queue.Full
# 若block為True進程會阻塞timeout指定時間,直到隊列有空間,否則拋出Queue.Full
except:
def cosume(q):
try:
q.get(block=, timeout=) #與上同理
except:
q = mp.Queue()
pro = mp.Process(target=produce, args=(q, ))
cos = mp.Process(target=cosume, args=(q, ))
pro.start()
cos.start()
pro.join()
cos.join()
############################################
# Pipe(管道)
# 多進程之間的數據傳遞
def func1(pipe):
while True:
pipe.send(1)
def func2(pipe):
while True:
pipe.recv() #如果管道內無消息可接受,則會阻塞
pipe = mp.Pipe(duplex=) #參數默認為True即管道的兩邊均可收發
# 返回(conn1, conn2),當參數為False時conn1只能收信息,conn2只能發消息
p1 = mp.Process(target=func1, args=(pipe[0], ))
p2 = mp.Process(target=func2, args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()
並發池
新的並發池模塊concurrent.futures
再次封裝了並發操作,可以用於量大但簡單並發操作。
且進程線程通用關鍵字換一下就行。
future對象
from concurrent.futures import ThreadPoolExecutor
import time
def working(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 創建一個最大可容納2個task的線程池
worker1 = pool.submit(working, ("hello")) # 往線程池里面加入一個task
worker2 = pool.submit(working, ("world")) # 往線程池里面加入一個task
# submit 返回了一個future對象,即未完成的操作,我們可以通過調用函數來查看其狀態
worker1.done() # 判斷task1是否結束
worker1.result() # 查看task1返回的結果
worker2.result() # 查看task2返回的結果
executor對象
import concurrent.futures
items = list() # 任務對象
def do_task(item): # 處理函數
return item
#################### submit #########################
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(do_task, item): item for item in items}
for future in concurrent.futures.as_completed(futures):
item = futures[future]
result = future.result()
print(item, result)
#################### map #########################
# map跟submit的區別在於submit是無序的,而map是有序的
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for item, result in zip(items, executor.map(do_task, items)):
print(item, result)
#################### wait #########################
# wait返回一個元組,包含已完成任務的集合和未完成任務的集合。
pool = ThreadPoolExecutor(5)
futures = []
for item in items:
futures.append(pool.submit(do_task, item))
concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')
return_when
參數可選FIRST_COMPLETED
, FIRST_EXCEPTION
和ALL_COMPLETE
ALL_COMPLETE
會阻塞