asyncio並發編程


一. 事件循環

   1.注:

       實現搭配:事件循環+回調(驅動生成器【協程】)+epoll(IO多路復用),asyncio是Python用於解決異步編程的一整套解決方案;

       基於asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(實現了web服務器,可以直接部署,真正部署還是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)
 1 import asyncio
 2 import time
 3 async def get_html(url):
 4     print('start get url')
 5     #不能直接使用time.sleep,這是阻塞的函數,如果使用time在並發的情況有多少個就有多少個2秒
 6     await asyncio.sleep(2)
 7     print('end get url')
 8 if __name__=='__main__':
 9     start_time=time.time()
10     loop=asyncio.get_event_loop()
11     task=[get_html('www.baidu.com') for i in range(10)]
12     loop.run_until_complete(asyncio.wait(task))
13     print(time.time()-start_time)
View Code

  2.如何獲取協程的返回值(和線程池類似):

 1 import asyncio
 2 import time
 3 from functools import partial
 4 async def get_html(url):
 5     print('start get url')
 6     await asyncio.sleep(2)
 7     print('end get url')
 8     return "HAHA"
 9 #需要接收task,如果要接收其他的參數就需要用到partial(偏函數),參數需要放到前面
10 def callback(url,future):
11     print(url+' success')
12     print('send email')
13 if __name__=='__main__':
14     loop=asyncio.get_event_loop()
15     task=loop.create_task(get_html('www.baidu.com'))
16     #原理還是獲取event_loop,然后調用create_task方法,一個線程只有一個loop
17     # get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以
18     #loop.run_until_complete(get_future)
19     #run_until_complete可以接收future類型,task類型(是future類型的一個子類),也可以接收可迭代類型
20     task.add_done_callback(partial(callback,'www.baidu.com'))
21     loop.run_until_complete(task)
22     print(task.result())
View Code
 
        

  3.wait和gather的區別:

    3.1wait簡單使用:

 1 import asyncio
 2 import time
 3 from functools import partial
 4 async def get_html(url):
 5     print('start get url')
 6     await asyncio.sleep(2)
 7     print('end get url')
 8 
 9 if __name__=='__main__':
10     loop=asyncio.get_event_loop()
11     tasks=[get_html('www.baidu.com') for i in range(10)]
12     #wait和線程的wait相似
13     loop.run_until_complete(asyncio.wait(tasks))
View Code

協程的wait和線程的wait相似,也有timeout,return_when(什么時候返回)等參數

 

    3.2gather簡單使用:

 1 import asyncio
 2 import time
 3 from functools import partial
 4 async def get_html(url):
 5     print('start get url')
 6     await asyncio.sleep(2)
 7     print('end get url')
 8 
 9 if __name__=='__main__':
10     loop=asyncio.get_event_loop()
11     tasks=[get_html('www.baidu.com') for i in range(10)]
12     #gather注意加*,這樣就會變成參數
13     loop.run_until_complete(asyncio.gather(*tasks))
View Code

  3.3gather和wait的區別:(定制性不強時可以優先考慮gather)

    gather更加高層,可以將tasks分組;還可以成批的取消任務

 

 1 import asyncio
 2 import time
 3 from functools import partial
 4 async def get_html(url):
 5     print('start get url')
 6     await asyncio.sleep(2)
 7     print('end get url')
 8 
 9 if __name__=='__main__':
10     loop=asyncio.get_event_loop()
11     groups1=[get_html('www.baidu.com') for i in range(10)]
12     groups2=[get_html('www.baidu.com') for i in range(10)]
13     #gather注意加*,這樣就會變成參數
14     loop.run_until_complete(asyncio.gather(*groups1,*groups2))
15     #這種方式也可以
16     # groups1 = [get_html('www.baidu.com') for i in range(10)]
17     # groups2 = [get_html('www.baidu.com') for i in range(10)]
18     # groups1=asyncio.gather(*groups1)
19     # groups2=asyncio.gather(*groups2)
20     #取消任務
21     # groups2.cancel()
22     # loop.run_until_complete(asyncio.gather(groups1,groups2))
View Code

 

二. 協程嵌套

  1.run_util_complete()源碼:和run_forever()區別並不大,只是可以在運行完指定的協程后可以把loop停止掉,而run_forever()不會停止

 

  2.loop會被放在future里面,future又會放在loop中

  3.取消future(task):

    3.1子協程調用原理:

      官網例子:

 

      解釋: await相當於yield from,loop運行協程print_sum(),print_sum又會去調用另一個協程compute,run_util_complete會把協程print_sum注冊到loop中。

      1.event_loop會為print_sum創建一個task,通過驅動task執行print_sum(task首先會進入pending【等待】的狀態);

      2.print_sum直接進入字協程的調度,這個時候轉向執行另一個協程(compute,所以print_sum變為suspended【暫停】狀態);

      3.compute這個協程首先打印,然后去調用asyncio的sleep(此時compute進入suspende的狀態【暫停】),直接把返回值返回給Task(沒有經過print_sum,相當於yield from,直接在調用方和子生成器通信,是由委托方print_sum建立的通道);

      4.Task會告訴Event_loop暫停,Event_loop等待一秒后,通過Task喚醒(越過print_sum和compute建立一個通道);

      5.compute繼續執行,變為狀態done【執行完成】,然后拋一個StopIteration的異常,會被await語句捕捉到,然后提取出1+2=3的值,進入print_sum,print_sum也被激活(因為拋出了StopIteration的異常被print_sum捕捉),print_sum執行完也會被標記為done的狀態,同時拋出StopIteration會被Task接收

三. call_soon、call_later、call_at、call_soon_threadsafe

  1.call_soon:可以直接接收函數,而不用協程

 1 import asyncio
 2 #函數
 3 def callback(sleep_time):
 4     print('sleep {} success'.format(sleep_time))
 5 #通過該函數暫停
 6 def stoploop(loop):
 7     loop.stop()
 8 if __name__=='__main__':
 9     loop=asyncio.get_event_loop()
10     #可以直接傳遞函數,而不用協程,call_soon其實就是調用的call_later,時間為0秒
11     loop.call_soon(callback,2)
12     loop.call_soon(stoploop,loop)
13     #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
14     loop.run_forever()
View Code

 

  2.call_later:可以指定多長時間后啟動(實際調用call_at,時間不是傳統的時間,而是loop內部的時間)

 1 import asyncio
 2 #函數
 3 def callback(sleep_time):
 4     print('sleep {} success'.format(sleep_time))
 5 #通過該函數暫停
 6 def stoploop(loop):
 7     loop.stop()
 8 if __name__=='__main__':
 9     loop=asyncio.get_event_loop()
10     loop.call_later(3,callback,1)
11     loop.call_later(1, callback, 2)
12     loop.call_later(1, callback, 2)
13     loop.call_later(1, callback, 2)
14     loop.call_soon(callback,4)
15     # loop.call_soon(stoploop,loop)
16     #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
17     loop.run_forever()
View Code

 

  3.call_at:指定某個時間執行

 1 import asyncio
 2 #函數
 3 def callback(sleep_time):
 4     print('sleep {} success'.format(sleep_time))
 5 #通過該函數暫停
 6 def stoploop(loop):
 7     loop.stop()
 8 if __name__=='__main__':
 9     loop=asyncio.get_event_loop()
10     now=loop.time()
11     print(now)
12     loop.call_at(now+3,callback,1)
13     loop.call_at(now+1, callback, 0.5)
14     loop.call_at(now+1, callback, 2)
15     loop.call_at(now+1, callback, 2)
16     # loop.call_soon(stoploop,loop)
17     #不能用run_util_complete(因為不是協程),run_forever找到call_soon一直運行
18     loop.run_forever()
View Code

 

  4.call_soon_threadsafe:

    線程安全的方法,不僅能解決協程,也能解決線程,進程,和call_soon幾乎一致,多了self._write_to_self(),和call_soon用法一致

四. ThreadPoolExecutor+asyncio(線程池和協程結合)

  1.使用run_in_executor:就是把阻塞的代碼放進線程池運行,性能並不是特別高,和多線程差不多

  

 1 #使用多線程,在協程中集成阻塞io
 2 import asyncio
 3 import socket
 4 from urllib.parse import urlparse
 5 from concurrent.futures import ThreadPoolExecutor
 6 import time
 7 def get_url(url):
 8     #通過socket請求html
 9     url=urlparse(url)
10     host=url.netloc
11     path=url.path
12     if path=="":
13         path="/"
14     #建立socket連接
15     client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
16     client.connect((host,80))
17     #向服務器發送數據
18     client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
19     #將數據讀取完
20     data=b""
21     while True:
22         d=client.recv(1024)
23         if d:
24             data+=d
25         else:
26             break
27     #會將header信息作為返回字符串
28     data=data.decode('utf8')
29     print(data.split('\r\n\r\n')[1])
30     client.close()
31 
32 if __name__=='__main__':
33     start_time=time.time()
34     loop=asyncio.get_event_loop()
35     excutor=ThreadPoolExecutor()
36     tasks=[]
37     for i in range(20):
38         task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')
39         tasks.append(task)
40     loop.run_until_complete(asyncio.wait(tasks))
41     print(time.time()-start_time)
View Code

 

五. asyncio模擬http請求

  注:asyncio目前沒有提供http協議的接口

 1 # asyncio目前沒有提供http協議的接口
 2 import asyncio
 3 from urllib.parse import urlparse
 4 import time
 5 
 6 
 7 async def get_url(url):
 8     # 通過socket請求html
 9     url = urlparse(url)
10     host = url.netloc
11     path = url.path
12     if path == "":
13         path = "/"
14     # 建立socket連接(比較耗時),非阻塞需要注冊,都在open_connection中實現了
15     reader, writer = await asyncio.open_connection(host, 80)
16     # 向服務器發送數據,unregister和register都實現了
17     writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
18     # 讀取數據
19     all_lines = []
20     # 源碼實現較復雜,有__anext__的魔法函數(協程)
21     async for line in reader:
22         data = line.decode('utf8')
23         all_lines.append(data)
24     html = '\n'.join(all_lines)
25     return html
26 
27 
28 async def main():
29     tasks = []
30     for i in range(20):
31         url = "http://www.baidu.com/"
32         tasks.append(asyncio.ensure_future(get_url(url)))
33     for task in asyncio.as_completed(tasks):
34         result = await task
35         print(result)
36 
37 
38 if __name__ == '__main__':
39     start_time = time.time()
40     loop = asyncio.get_event_loop()
41     # tasks=[get_url('http://www.baidu.com') for i in range(10)]
42     # 在外部獲取結果,保存為future對象
43     # tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]
44     # loop.run_until_complete(asyncio.wait(tasks))
45     # for task in tasks:
46     #     print(task.result())
47     # 執行完一個打印一個
48     loop.run_until_complete(main())
49     print(time.time() - start_time)
View Code

 

六. future和task

  1.future:協程中的future和線程池中的future相似

future中的方法,都和線程池中的相似

set_result方法

 不像線程池中運行完直接運行代碼(這是單線程的,會調用call_soon方法)

  

  2.task:是future的子類,是future和協程之間的橋梁

 會首先啟動_step方法

該方法會首先啟動協程,把返回值(StopIteration的值)做處理,用於解決協程和線程不一致的地方

七. asyncio同步和通信

  1.單線程協程不需要鎖:

 1 import asyncio
 2 total=0
 3 async def add():
 4     global total
 5     for i in range(1000000):
 6         total+=1
 7 
 8 
 9 async def decs():
10     global total
11     for i in range(1000000):
12         total-=1
13 if __name__=='__main__':
14     loop=asyncio.get_event_loop()
15     tasks=[add(),decs()]
16     loop.run_until_complete(asyncio.wait(tasks))
17     print(total)
View Code

 

  2.某種情況需要鎖:

 

asyncio中的鎖(同步機制)

 

 

 1 import asyncio,aiohttp
 2 #這是並沒有調用系統的鎖,只是簡單的自己實現(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【單線程】】
 3 #Queue還可以限流,如果只需要通信還可以直接使用全局變量否則可以
 4 from asyncio import Lock,Queue
 5 catche={}
 6 lock=Lock()
 7 async def get_stuff():
 8     #實現了__enter__和__exit__兩個魔法函數,可以用with
 9     # with await lock:
10     #更明確的語法__aenter__和__await__
11     async with lock:
12         #注意加await,是一個協程
13         #await lock.acquire()
14         for url in catche:
15             return catche[url]
16         #異步的接收
17         stauff=aiohttp.request('Get',url)
18         catche[url]=stauff
19         return stauff
20         #release是一個簡單的函數
21         #lock.release()
22 
23 async def parse_stuff():
24     stuff=await get_stuff()
25 
26 async def use_stuff():
27     stuff=await get_stuff()
28 #如果沒有同步機制,就會發起兩次請求(這里就可以加一個同步機制)
29 tasks=[parse_stuff(),use_stuff()]
30 loop=asyncio.get_event_loop()
31 loop.run_until_complete(asyncio.wait(tasks))
View Code

 

八. aiohttp實現高並發爬蟲

 

 1 # asyncio去重url,入庫(異步的驅動aiomysql)
 2 import aiohttp
 3 import asyncio
 4 import re
 5 import aiomysql
 6 from pyquery import pyquery
 7 
 8 start_url = 'http://www.jobbole.com/'
 9 waiting_urls = []
10 seen_urls = []
11 stopping = False
12 #限制並發數
13 sem=asyncio.Semaphore(3)
14 
15 
16 async def fetch(url, session):
17     async with sem:
18         await asyncio.sleep(1)
19         try:
20             async with session.get(url) as resp:
21                 print('url_status:{}'.format(resp.status))
22                 if resp.status in [200, 201]:
23                     data = await resp.text()
24                     return data
25         except Exception as e:
26             print(e)
27 
28 
29 def extract_urls(html):
30     '''
31     解析無io操作
32     '''
33     urls = []
34     pq = pyquery(html)
35     for link in pq.items('a'):
36         url = link.attr('href')
37         if url and url.startwith('http') and url not in urls:
38             urls.append(url)
39             waiting_urls.append(url)
40     return urls
41 
42 
43 async def init_urls(url, session):
44     html = await fetch(url, session)
45     seen_urls.add(url)
46     extract_urls(html)
47 
48 
49 async def handle_article(url, session, pool):
50     '''
51     處理文章
52     '''
53     html = await fetch(url, session)
54     seen_urls.append(url)
55     extract_urls(html)
56     pq = pyquery(html)
57     title = pq('title').text()
58     async with pool.acquire() as conn:
59         async with conn.cursor() as cur:
60             insert_sql = "insert into Test(title) values('{}')".format(title)
61             await cur.execute(insert_sql)
62 
63 
64 async def consumer(pool):
65     with aiohttp.CLientSession() as session:
66         while not stopping:
67             if len(waiting_urls) == 0:
68                 await asyncio.sleep(0.5)
69                 continue
70             url = waiting_urls.pop()
71             print('start url:' + 'url')
72             if re.match('http://.*?jobble.com/\d+/', url):
73                 if url not in seen_urls:
74                     asyncio.ensure_future(handle_article(url, session, pool))
75                     await asyncio.sleep(30)
76             else:
77                 if url not in seen_urls:
78                     asyncio.ensure_future(init_urls(url, session))
79 
80 
81 async def main():
82     # 等待mysql連接好
83     pool = aiomysql.connect(host='localhost', port=3306, user='root',
84                             password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)
85     async with aiohttp.CLientSession() as session:
86         html = await fetch(start_url, session)
87         seen_urls.add(start_url)
88         extract_urls(html)
89     asyncio.ensure_future(consumer(pool))
90 
91 if __name__ == '__main__':
92     loop = asyncio.get_event_loop()
93     asyncio.ensure_future(loop)
94     loop.run_forever(main(loop))
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM