python 協程實現文件I/O


前言

  • 前段時間突然被人問到python的協程,當場我就宕機了。然后就開始查詢各種資料,來彌補這里的欠缺。雖然暫時沒實戰過,但總比啥都不知道好一些。
  • 當我學了一些資料之后,我發現網上資料,大多數代碼是這樣的:
import asyncio, time

async def hello(x):
    print("Hello world!")
    # 異步調用asyncio.sleep(1):
    r = await asyncio.sleep(1) # 模擬阻塞
    print(x, r)
    print("Hello again!")
    return False

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello("第一:"), hello("第二:")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 然后,后面的代碼就變成類似這樣的:
import asyncio

async def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = await connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    await writer.drain()
    while True:
        line = await reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 好吧,我太菜了,我現在只知道這玩意兒,也就是協程可以實現網絡異步I/O,然后我好奇心就來了,既然網絡異步I/O都實現了,那么文件異步I/O沒道理不能實現啊,然后我又開始了查詢之路
  • 在我努力了一天之后,我終於找到了兩個似乎可行的方案,先說一句,我真的才開始學協程,所以各位大佬,有啥不對的,直接指出來
  • 方案一:
async def hello(x):
    print("Hello world!")
    # 異步調用asyncio.sleep(1):
    async with open("2.txt", "r") as f:
        data = await f.readlines()
    print("Hello again!")
    return False

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello("第一:"), hello("第二:")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 執行結果是這樣的:
D:\Users\User\Desktop\python_code_test\my_code\venv\Scripts\python.exe D:/Users/User/Desktop/python_code_test/my_code/asynchronous/02.py
Hello world!
Hello world!
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<hello() done, defined at D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py:25> exception=AttributeError('__aenter__')>
Traceback (most recent call last):
  File "D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py", line 28, in hello
    async with open("2.txt", "r") as f:
AttributeError: __aenter__
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<hello() done, defined at D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py:25> exception=AttributeError('__aenter__')>
Traceback (most recent call last):
  File "D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py", line 28, in hello
    async with open("2.txt", "r") as f:
AttributeError: __aenter__

Process finished with exit code 0
  • 好吧,open對象沒有__aenter__方法,那么我不用with總可以吧
async def hello(x):
    print("Hello world!")
    # 異步調用asyncio.sleep(1):
    # async with open("2.txt", "r") as f:
    #     data = await f.readlines()
    f = open("2.txt", "r")
    data = await f.readlines()
    print("Hello again!")
    return False

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello("第一:"), hello("第二:")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 運行結果如下:
D:\Users\User\Desktop\python_code_test\my_code\venv\Scripts\python.exe D:/Users/User/Desktop/python_code_test/my_code/asynchronous/02.py
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<hello() done, defined at D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py:25> exception=TypeError("object list can't be used in 'await' expression")>
Traceback (most recent call last):
  File "D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py", line 31, in hello
    data = await f.readlines()
TypeError: object list can't be used in 'await' expression
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<hello() done, defined at D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py:25> exception=TypeError("object list can't be used in 'await' expression")>
Traceback (most recent call last):
  File "D:\Users\User\Desktop\python_code_test\my_code\asynchronous\02.py", line 31, in hello
    data = await f.readlines()
TypeError: object list can't be used in 'await' expression
Hello world!
Hello world!

Process finished with exit code 0
  • list類型的數據不能使用await關鍵字,啊這。。。。。就很尷尬了,然后我就又又又開始查資料,然后發現有個第三方庫:gevent,或許它可以通過實現文件異步I/O
import gevent
from gevent import monkey
# monkey.patch_all()      # 用於將標准庫中大部分阻塞式調用修改為協作式運行


def fetch(path, name):
    # f = open(path, "r")
    # data = f.read()
    print(path)
    with open(path, "r") as f:
        data = f.read()
    # print(name)
    print(path)
    return data


if __name__ == "__main__":
    g_list = list()
    for url in ["1.txt", "2.txt", "3.txt"]:
        g = gevent.spawn(fetch, url, url)
        g_list.append(g)
    gevent.joinall(g_list)
    for g in g_list:
        print(g.value)
  • 這個代碼的運行結果如下:
D:\Users\User\Desktop\python_code_test\my_code\venv\Scripts\python.exe D:/Users/User/Desktop/python_code_test/my_code/asynchronous/05.py
1.txt
1.txt
2.txt
2.txt
3.txt
3.txt
我是文件1
我是文件2
我是文件3

Process finished with exit code 0
  • 說實話,最開始的時候,我看見上面的打印信息,以為gevent已經可以實現文件異步I/O了,因為兩次print(path)完成后才打印結果,這里我把自己坑了,之所以后面才打印結果,完全是因為我把打印結果返回之后才能打印,如果直接在print(path)中間寫print(data),那么結果就應該是類似這樣的:
1.txt
我是文件1
1.txt
2.txt
我是文件2
2.txt
3.txt
我是文件3
3.txt
我是文件1
我是文件2
我是文件3
  • 所以這里是我理解錯誤,我重新復盤這一塊的時候,總感覺不對,仔細想了想,才發現自己坑了自己。后面我找到了另一個方法實現,使用第三方庫:aiofiles
  • 如果不使用gevent,直接跑,那么代碼是這樣的:
async def hello(x):
    print("Hello world!")
    print(x)
    with open(x, "r") as f:
        data = f.readlines()
    # r = await hello_wait(x)
    print(data)
    print(x)
    print("Hello again!")
    return False

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello("1.txt"), hello("2.txt"), hello("3.txt")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 結果如下:
D:\Users\User\Desktop\python_code_test\my_code\venv\Scripts\python.exe D:/Users/User/Desktop/python_code_test/my_code/asynchronous/02.py
Hello world!
3.txt
['我是文件3']
3.txt
Hello again!
Hello world!
2.txt
['我是文件2']
2.txt
Hello again!
Hello world!
1.txt
['我是文件1']
1.txt
Hello again!

Process finished with exit code 0
  • 實現文件異步I/O的模塊:aiofiles,代碼如下:
import asyncio
import aiofiles

async def hello_wait(x):
    n = 0
    print(n)
    while True:
        async with aiofiles.open("2.txt", "a") as f:
             await f.write(f"{n}\n")
        if n > 2:
            break
        n += 1
    async with aiofiles.open("2.txt", "r") as f:
        data = await f.readlines()
    print(x,data)
    return False

async def hello(x):
    print("Hello world!")
    print(x)
    r = await hello_wait(x)
    print(r)
    print("Hello again!")
    return False

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
tasks = [hello("1.txt"), hello("2.txt"), hello("3.txt")]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • 執行結果:
D:\Users\User\Desktop\python_code_test\my_code\venv\Scripts\python.exe D:/Users/User/Desktop/python_code_test/my_code/asynchronous/02.py
Hello world!
2.txt
0
Hello world!
3.txt
0
Hello world!
1.txt
0
3.txt ['0\n', '0\n', '0\n', '1\n', '1\n', '1\n', '2\n', '2\n', '3\n', '3\n', '3\n']
False
Hello again!
1.txt ['0\n', '0\n', '0\n', '1\n', '1\n', '1\n', '2\n', '2\n', '3\n', '3\n', '3\n']
False
Hello again!
2.txt ['0\n', '0\n', '0\n', '1\n', '1\n', '1\n', '2\n', '2\n', '3\n', '3\n', '3\n']
False
Hello again!

Process finished with exit code 0
  • 這個模塊可以實現本地的文件異步I/O,支持版本在python3.6以上,具體可以參考github上的說明

總結一下

  1. 用async+await真的可以實現文件異步I/O,只不過需要安裝另一個第三方庫:aiofiles

  2. 廢了那么多時間去研究這個,很大一部分原因是,鑽牛角尖去了,但是還是有收獲,至少知道了aiofiles可以實現

  3. 我參考的文檔有這些:

  4. gevent模塊雖然我最后沒弄出來文件異步I/O,但是這個模塊依然是個很強大的模塊,可以參考這幾個鏈接:

  5. aiofiles模塊代碼鏈接:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM