你知道 Python 怎么異步操作數據庫嗎?(aiomysql、asyncpg、aioredis)


楔子

Python 目前已經進化到了 3.8 版本,對操作數據庫也提供了相應的異步支持。當我們做一個 Web 服務時,性能的瓶頸絕大部分都在數據庫上,如果一個請求從數據庫中讀數據的時候能夠自動切換、去處理其它請求的話,是不是就能提高並發量了呢。

下面我們來看看如何使用 Python 異步操作 MySQL、PostgreSQL 以及 Redis,以上幾個可以說是最常用的數據庫了。至於 SQLServer、Oracle,本人沒有找到相應的異步驅動,有興趣可以自己去探索一下。

而操作數據庫無非就是增刪改查,下面我們來看看如何異步實現它們。

異步操作 MySQL

異步操作 MySQL 的話,需要使用一個 aiomysql,直接 pip install aiomysql 即可。aiomysql 底層依賴於 pymysql,所以 aiomysql 並沒有單獨實現相應的連接驅動,而是在 pymysql 之上進行了封裝。

查詢記錄

下面先來看看如何查詢記錄。

import asyncio
import aiomysql.sa as aio_sa


async def main():
    # 創建一個異步引擎
    engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",
                                        port=3306,
                                        user="root",
                                        password="root",
                                        db="_hanser",
                                        connect_timeout=10)

    # 通過 engine.acquire() 獲取一個連接
    async with engine.acquire() as conn:
        # 異步執行, 返回一個 <class 'aiomysql.sa.result.ResultProxy'> 對象
        result = await conn.execute("SELECT * FROM girl")
        # 通過 await result.fetchone() 可以獲取滿足條件的第一條記錄, 一個 <class 'aiomysql.sa.result.RowProxy'> 對象
        data = await result.fetchone()

        # 可以將 <class 'aiomysql.sa.result.RowProxy'> 對象想象成一個字典
        print(data.keys())  # KeysView((1, '古明地覺', 16, '地靈殿'))
        print(list(data.keys()))  # ['id', 'name', 'age', 'place']

        print(data.values())  # ValuesView((1, '古明地覺', 16, '地靈殿'))
        print(list(data.values()))  # [1, '古明地覺', 16, '地靈殿']

        print(data.items())  # ItemsView((1, '古明地覺', 16, '地靈殿'))
        print(list(data.items()))  # [('id', 1), ('name', '古明地覺'), ('age', 16), ('place', '地靈殿')]

        # 直接轉成字典也是可以的
        print(dict(data))  # {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'}
    
    # 最后別忘記關閉引擎, 當然你在創建引擎的時候也可以通過 async with aio_sa.create_engine 的方式創建
    # async with 語句結束后會自動執行下面兩行代碼
    engine.close()
    await engine.wait_closed()
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

怎么樣,是不是很簡單呢,和同步庫的操作方式其實是類似的。但是很明顯,我們在獲取記錄的時候不會只獲取一條,而是會獲取多條,獲取多條的話使用 await result.fetchall() 即可。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa


async def main():
    # 通過異步上下文管理器的方式創建, 會自動幫我們關閉引擎
    async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            result = await conn.execute("SELECT * FROM girl")
            # 此時的 data 是一個列表, 列表里面是 <class 'aiomysql.sa.result.RowProxy'> 對象
            data = await result.fetchall()
            # 將里面的元素轉成字典
            pprint(list(map(dict, data)))
            """
            [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'},
             {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
             {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}]
            """


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

除了 fetchone、fetchall 之外,還有一個 fetchmany,可以獲取指定記錄的條數。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa


async def main():
    # 通過異步上下文管理器的方式創建, 會自動幫我們關閉引擎
    async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            result = await conn.execute("SELECT * FROM girl")
            # 默認是獲取一條, 得到的仍然是一個列表
            data = await result.fetchmany(2)
            pprint(list(map(dict, data)))
            """
            [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'},
             {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'}]
            """


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上就是通過 aiomysql 查詢數據庫中的記錄,沒什么難度。但是值得一提的是,await conn.execute 里面除了可以傳遞一個原生的 SQL 語句之外,我們還可以借助 SQLAlchemy。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text


async def main():
    async with aio_sa.create_engine(host="xx.xxx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
            result = await conn.execute(sql)
            data = await result.fetchall()
            pprint(list(map(dict, data)))
            """
            [{'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
             {'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'}]
            """


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

添加記錄

然后是添加記錄,我們同樣可以借助 SQLAlchemy 幫助我們拼接 SQL 語句。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine


async def main():
    async with aio_sa.create_engine(host="xx.xx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            # 我們還需要創建一個 SQLAlchemy 中的引擎, 然后將表反射出來
            s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
            tbl = Table("girl", MetaData(bind=s_engine), autoload=True)

            insert_sql = tbl.insert().values(
                [{"name": "十六夜咲夜", "age": 17, "place": "紅魔館"},
                 {"name": "琪露諾", "age": 60, "place": "霧之湖"}])

            # 注意: 執行的執行必須開啟一個事務, 否則數據是不會進入到數據庫中的
            async with conn.begin():
                # 同樣會返回一個 <class 'aiomysql.sa.result.ResultProxy'> 對象
                # 盡管我們插入了多條, 但只會返回最后一條的插入信息
                result = await conn.execute(insert_sql)
                # 返回最后一條記錄的自增 id
                print(result.lastrowid)
                # 影響的行數
                print(result.rowcount)
        
        # 重新查詢, 看看記錄是否進入到數據庫中
        async with engine.acquire() as conn:
            data = await (await conn.execute("select * from girl")).fetchall()
            data = list(map(dict, data))
            pprint(data)
            """
            [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '地靈殿'},
             {'age': 16, 'id': 2, 'name': '霧雨魔理沙', 'place': '魔法森林'},
             {'age': 400, 'id': 3, 'name': '芙蘭朵露', 'place': '紅魔館'},
             {'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '紅魔館'},
             {'age': 60, 'id': 17, 'name': '琪露諾', 'place': '霧之湖'}]
            """


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

還是很方便的,但是插入多條記錄的話只會返回插入的最后一條記錄的信息,所以如果你希望獲取每一條的信息,那么就一條一條插入。

修改記錄

修改記錄和添加記錄是類似的,我們來看一下。

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text


async def main():
    async with aio_sa.create_engine(host="xx.xx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
            tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
            update_sql = tbl.update().where(text("name = '古明地覺'")).values({"place": "東方地靈殿"})

            # 同樣需要開啟一個事務
            async with conn.begin():
                result = await conn.execute(update_sql)
                print(result.lastrowid)  # 0
                print(result.rowcount)   # 1
		
        # 查詢結果
        async with engine.acquire() as conn:
            data = await (await conn.execute("select * from girl where name = '古明地覺'")).fetchall()
            data = list(map(dict, data))
            pprint(data)
            """
            [{'age': 16, 'id': 1, 'name': '古明地覺', 'place': '東方地靈殿'}]
            """


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

可以看到,記錄被成功的修改了。

刪除記錄

刪除記錄就更簡單了,直接看代碼。

import asyncio
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine, text


async def main():
    async with aio_sa.create_engine(host="xx.xx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            s_engine = create_engine("mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")
            tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
            update_sql = tbl.delete()  # 全部刪除

            # 同樣需要開啟一個事務
            async with conn.begin():
                result = await conn.execute(update_sql)
                # 返回最后一條記錄的自增 id, 我們之前修改了 id = 0 記錄, 所以它跑到最后了
                print(result.lastrowid)  # 0
                # 受影響的行數
                print(result.rowcount)   # 6


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

此時數據庫中的記錄已經全部被刪除了。

整體來看還是比較簡單的,並且支持的功能也比較全面。

異步操作 PostgreSQL

異步操作 PostgreSQL 的話,我們有兩個選擇,一個是 asyncpg 庫,另一個是 aiopg 庫。

asyncpg 是自己實現了一套連接驅動,而 aiopg 則是對 psycopg2 進行了封裝,個人更推薦 asyncpg,性能和活躍度都比 aiopg 要好。

下面來看看如何使用 asyncpg,首先是安裝,直接 pip install asyncpg 即可。

查詢記錄

首先是查詢記錄。

import asyncio
from pprint import pprint
import asyncpg

async def main():
    # 創建連接數據庫的驅動
    conn = await asyncpg.connect(host="localhost",
                                 port=5432,
                                 user="postgres",
                                 password="zgghyys123",
                                 database="postgres",
                                 timeout=10)
    # 除了上面的方式,還可以使用類似於 SQLAlchemy 的方式創建
    # await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # 調用 await conn.fetchrow 執行 select 語句,獲取滿足條件的單條記錄
    # 調用 await conn.fetch 執行 select 語句,獲取滿足條件的全部記錄
    row1 = await conn.fetchrow("select * from girl")
    row2 = await conn.fetch("select * from girl")

    # 返回的是一個 Record 對象,這個 Record 對象等於將返回的記錄進行了一個封裝
    # 至於怎么用后面會說
    print(row1)  # <Record id=1 name='古明地覺' age=16 place='地靈殿'>
    pprint(row2)
    """
    [<Record id=1 name='古明地覺' age=16 place='地靈殿'>,
     <Record id=2 name='椎名真白' age=16 place='櫻花庄'>,
     <Record id=3 name='古明地戀' age=15 place='地靈殿'>]
    """

    # 關閉連接
    await conn.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

以上我們演示了如何使用 asyncpg 來獲取數據庫中的記錄,我們看到執行 select 語句的話,我們可以使用 conn.fetchrow(query) 來獲取滿足條件的單條記錄,conn.fetch(query) 來獲取滿足條件的所有記錄。

Record 對象

我們說使用 conn.fetchone 查詢得到的是一個 Record 對象,使用 conn.fetch 查詢得到的是多個 Record 對象組成的列表,那么這個 Rcord 對象怎么用呢?

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    row = await conn.fetchrow("select * from girl")

    print(type(row))  # <class 'asyncpg.Record'>
    print(row)  # <Record id=1 name='古明地覺' age=16 place='地靈殿'>

    # 這個 Record 對象可以想象成一個字典
    # 我們可以將返回的字段名作為 key, 通過字典的方式進行獲取
    print(row["id"], row["name"])  # 1 古明地覺

    # 除此之外,還可以通過 get 獲取,獲取不到的時候會返回默認值
    print(row.get("id"), row.get("name"))  # 1 古明地覺
    print(row.get("xxx"), row.get("xxx", "不存在的字段"))  # None 不存在的字段

    # 除此之外還可以調用 keys、values、items,這個不用我說,都應該知道意味着什么
    # 只不過返回的是一個迭代器
    print(row.keys())  # <tuple_iterator object at 0x000001D6FFDAE610>
    print(row.values())  # <tuple_iterator object at 0x000001D6FFDAE610>
    print(row.items())  # <RecordItemsIterator object at 0x000001D6FFDF20C0>

    # 我們需要轉成列表或者元組
    print(list(row.keys()))  # ['id', 'name', 'age', 'place']
    print(list(row.values()))  # [1, '古明地覺', 16, '地靈殿']
    print(dict(row.items()))  # {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'}
    print(dict(row))  # {'id': 1, 'name': '古明地覺', 'age': 16, 'place': '地靈殿'}

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

當然我們也可以借助 SQLAlchemy 幫我們拼接 SQL 語句。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
    # 我們不能直接傳遞一個 Select 對象, 而是需要將其轉成原生的字符串才可以
    rows = await conn.fetch(str(sql))
    pprint(list(map(dict, rows)))  
    """
    [{'id': 2, 'name': '椎名真白', 'place': '櫻花庄'},
     {'id': 3, 'name': '古明地戀', 'place': '地靈殿'}]
    """

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

此外,conn.fetch 里面還支持占位符,使用百分號加數字的方式,舉個例子:

import asyncio
from pprint import pprint
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    rows = await conn.fetch("select * from girl where id != $1", 1)
    pprint(list(map(dict, rows)))
    """
    [{'age': 16, 'id': 2, 'name': '椎名真白', 'place': '櫻花庄'},
     {'age': 15, 'id': 3, 'name': '古明地戀', 'place': '地靈殿'}]
    """

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

還是推薦使用 SQLAlchemy 的方式,這樣更加方便一些,就像 aiomysql 一樣。但是對於 asyncpg 而言,實際上接收的是一個原生的 SQL 語句,是一個字符串,因此它不能像 aiomysql 一樣自動識別 Select 對象,我們還需要手動將其轉成字符串。而且這樣還存在一個問題,至於是什么我們下面介紹添加記錄的時候說。

添加記錄

然后是添加記錄,我們看看如何往庫里面添加數據。

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    # 執行 insert 語句我們可以使用 execute
    row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",
                             '十六夜咲夜', 17, '紅魔館')
    pprint(row)  # INSERT 0 1
    pprint(type(row))  # <class 'str'>

    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

通過 execute 可以插入單條記錄,同時返回相關信息,但是說實話這個信息沒什么太大用。除了 execute 之外,還有 executemany,用來執行多條插入語句。

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    # executemany:第一條參數是一個模板,第二條命令是包含多個元組的列表
    # 執行多條記錄的話,返回的結果為 None
    rows = await conn.executemany("insert into girl(name, age, place) values ($1, $2, $3)",
                                  [('十六夜咲夜', 17, '紅魔館'), ('琪露諾', 60, '霧之湖')])
    print(rows)  # None

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

注意:如果是執行大量 insert 語句的話,那么 executemany 要比 execute 快很多,但是 executemany 不具備事務功能。

await conn.executemany("insert into girl(id, name) values($1, $2)",
                       [(7, "八意永琳"), (7, "八意永琳"), (8, "zun")])

我們表中的 id 是主鍵,不可以重復,這里插入三條記錄。顯然第二條記錄中的 id 和第一條重復了,執行的時候會報錯。但是第一條 (7, "八意永琳") 是進入到數據庫了的,盡管第二條記錄在插入的時候執行失敗。當然第二條執行失敗,那么第二條之后的也就無法執行了,這一點務必注意。那如果我們想要實現事務的話該怎么做呢?

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

    # 調用 conn.transaction() 會開啟一個事務,當然這里建議使用異步上下文管理
    async with conn.transaction():
        await conn.executemany("insert into girl(id, name) values($1, $2)",
                               [(999, "太田順也"), (999, "zun")]
                               )

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

兩條記錄主鍵重復,最終這兩條記錄都沒進入到數據庫中。因此插入記錄的話,個人建議直接開啟一個事務,然后通過 executemany 即可。

因此 fetchrow、fetch 是專門針對 select 語句,而 execute 和 executemant 是針對 select 語句之外的其它語句。

但是問題來了,無論是 execute 還是 executemany,它們都沒有返回插入記錄之后的具體信息。比如:插入一條記錄之后,我們希望返回插入記錄的自增 id,這個時候該怎么做呢?答案是依舊使用 fetch。

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    async with conn.transaction():
        # 對於 fetch 而言, 我們不能像 executemany 那樣, 傳遞一個包含元組的列表
        # 因此想插入多條記錄的話, 只能先拼接好 SQL 語句, 並且想返回對應的自增 id 的話, 需要在語句結尾加上 returning id
        # 當然這個是數據庫的語法, 否則得到的就是一個 None
        rows = await conn.fetch("insert into girl(name, age, place) values ('太田順也', 43, 'japan'), ('zun', 43, 'japan') returning id")

    print(rows)  # [<Record id=15>, <Record id=16>]
    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

因此如果只是關注記錄是否插入成功,那么推薦使用 executemany 加事務的方式,這樣我們只需要傳遞一個包含元組的列表即可。如果還需要獲取插入記錄的自增 id,那么需要使用 fetch 加事務的方式(如果是多條 SQL 的話),但是需要事先將 SQL 語句拼接好才行,並且還要使用 PostgreSQL 提供的 returning 字句,否則是不會返回信息的(只能得到一個 None)。

至於 fetchrow 和 execute,它們只針對於單條,因此建議直接使用 fetch 和 executemany 即可。

修改記錄

修改記錄的話,仍然可以使用 executemany 或者 fetch,區別還是我們上面說的那樣。

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    async with conn.transaction():
        # 修改一條記錄, 返回一個字符串
        row = await conn.execute("update girl set name = $1 where name = $2", 'BAKA⑨', '琪露諾')
        print(row)  # UPDATE 1
        
        # 修改多條記錄, 返回一個 None
        rows = await conn.executemany("update girl set name = $1 where name = $2",
                                      [("古明地盆", "古明地覺"), ("芙蘭朵露斯卡雷特", "芙蘭朵露")])
        print(rows)  # None

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

同樣的,我們也可以使用 fetch,搭配 returning 語句。

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    async with conn.transaction():
        row = await conn.fetch("update girl set name = 'BAKA9' where name = 'BAKA⑨' returning id")
        print(row)  # [<Record id=6>]

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

刪除記錄

沒什么可說的了,直接看代碼吧。

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    async with conn.transaction():
        row = await conn.execute("delete from girl where name in ($1, $2)", "BAKA9", "古明地戀")
        print(row)  # DELETE 2

        rows = await conn.executemany("delete from girl where id = $1",
                                      # 嵌套元組的列表, 即使是一個值也要寫成元組
                                      # 會將每一個元組里面的值和占位符進行逐一匹配
                                      [(2,), (3,)])
        print(rows)  # None

    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

同理如果想獲取返回的自增 id的話,也是可以的,方面和上面一樣。

import asyncio
import asyncpg


async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    async with conn.transaction():
        # 雖然不支持像 executemany 那樣傳遞一個列表, 但是下面這種方式還是可以的
        # 會將元素和占位符逐一替換
        rows = await conn.fetch("delete from girl where name in ($1, $2) returning id", "太田順也", "zun")
        print(rows)  # [<Record id=7>, <Record id=8>, <Record id=9>, <Record id=10>, <Record id=11>, <Record id=12>]
    # 關閉連接
    await conn.close()


if __name__ == '__main__':
    asyncio.run(main())

以上就是常見的增刪改查操作,雖然沒有 SQLAlchemy 那么強大,但是也足夠用,當然 SQLAlchemy 內部提供的一些屬性也是通過執行 SQL 語句獲取的,然后封裝成一個屬性給你。如果需要的話,我們也可以手動實現執行 SQL 來獲取。

連接池

asyncpg 還提供了連接池,需要的話往池子里面去取即可。

import asyncio
import asyncpg


async def main():
    pool = await asyncpg.create_pool(
        "postgres://postgres:zgghyys123@localhost:5432/postgres",
        min_size=10,  # 連接池初始化時默認的最小連接數, 默認為1 0
        max_size=10,  # 連接池的最大連接數, 默認為 10
        max_queries=5000,  # 每個鏈接最大查詢數量, 超過了就換新的連接, 默認 5000
        # 最大不活躍時間, 默認 300.0, 超過這個時間的連接就會被關閉, 傳入 0 的話則永不關閉
        max_inactive_connection_lifetime=300.0
    )
    # 如果還有其它什么特殊參數,也可以直接往里面傳遞,因為設置了 **connect_kwargs
    # 專門用來設置一些數據庫獨有的某些屬性

    # 從池子中取出一個連接
    async with pool.acquire() as conn:
        async with conn.transaction():
            row = await conn.fetchrow("select '100'::int + 200")
            # 我們看到沒有指定名字,隨意返回字段名叫做 ?column?
            # 不要慌,PostgreSQL 中返回的也是這個結果
            print(row)  # <Record ?column?=300>

            # 解決辦法就是起一個別名
            row = await conn.fetchrow("select '100'::int + 200 as result")
            print(row)  # <Record result=300>

    # 我們的連接是從池子里面取出的,上下文結束之后會自動放回到到池子里面


if __name__ == '__main__':
    # 這里就不要使用asyncio.run(main())了
    # 而是創建一個事件循環,然后運行
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

通過以上的例子,我們看到 asyncpg 還是非常好用的。另外值得一提的是,asyncpg 不依賴 psycopg2,asyncpg 是自己獨立實現了連接 PostgreSQL 的一套驅動,底層不需要依賴 psycopg2 這個模塊。

效率對比

我們之所以使用 asyncpg,無非是為了效率,那么 asyncpg 和傳統的 psycopg2 相比,在效率上究竟有多少差距呢?我們來測試一下。

SELECT count(*) FROM interface; -- 8459729

我數據庫中有一張表叫做 interface,是之前工作的時候從對方接口獲取的,我們就用它來進行測試吧。

先使用同步版本的來訪問,看看用多長時間。

import time
from sqlalchemy import create_engine, text

engine = create_engine("postgres://postgres:zgghyys123@localhost:5432/postgres")

with engine.begin() as conn:
    start = time.perf_counter()
    for _ in range(20):
        res = conn.execute(
            text('select * from interface where "ProwlerPersonID" = :arg'),
            {"arg": "c9fcbed8-fa47-481a-9d73-5fd1dd344f19"})
        print(f"滿足條件的記錄有:{len(res.fetchall())}條")
    end = time.perf_counter()
    print("總耗時:", end - start)  # 50.0419027
"""
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
滿足條件的記錄有:228186條
總耗時: 50.0419027
"""

我們看到記錄有 20 多萬條,所以就不打印記錄了,因為執行的是同一條 SQL,所以結果是一樣的,然后我們看到花了 50 秒鍾。

再來看看使用異步版本用多長時間。

import time
import asyncio
import asyncpg


async def run_sql(conn, query_list):
    result = []
    for query in query_list:
        result.append(await conn.fetch(*query))
    await conn.close()
    return [f"滿足條件的記錄有:{len(_)}條" for _ in result]


async def main():
    async with asyncpg.create_pool("postgres://postgres:zgghyys123@localhost:5432/postgres") as pool:
        query_list = [('select * from interface where "ProwlerPersonID" = $1',
                       "c9fcbed8-fa47-481a-9d73-5fd1dd344f19")
                      for _ in range(20)]

        # 我們要創建5個連接異步訪問
        count = len(query_list) // 5
        # 將 20 個任務分成 5 份
        query_list = [query_list[c * 4: (c + 1) * 4] for c in range(count + 1)]

        tasks = []
        for q in query_list:
            conn = await pool.acquire()
            tasks.append(run_sql(conn, q))
        results = await asyncio.gather(*tasks)
        return results

if __name__ == '__main__':
    start = time.perf_counter()
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(main())
    end = time.perf_counter()
    for result in results:
        for _ in result:
            print(_)
    print("總耗時:", end - start)
    """
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    滿足條件的記錄有:228186條
    總耗時: 9.8730488
    """

我們看到花了將近十秒鍾,正好是同步版本的五分之一,因為我們使用了連接池中的五個連接。

注意:如果是 SQLAlchemy,即便你給每一個 SQL 開一個連接,如果不使用多線程,只是同步訪問的話,那么耗時還是 50 秒左右,因為它是同步訪問的。

而使用異步的模式訪問的話,每個連接都可以進行異步訪問,那么我們創建的 5 個連接中,如果一個連接阻塞了,會切換到其它的連接去執行。所以耗時為五分之一,不過這里可能有人會覺得困惑,不知道我上面的代碼做了些什么,這里來解釋一下。

async def f1():
    for _ in [耗時協程1, 耗時協程2, 耗時協程3]:
        await _
        
def f2():
    for _ in [耗時函數1, 耗時函數2, 耗時函數3]:
        _()

我們上面的兩段代碼,如果函數和協程里面的代碼做了相同的事情的話,那么這兩個 for 循環耗時基本是一致的。首先函數無需解釋,關鍵是協程為什么會這樣。

我們 await 協程,會等待這個協程完成,對於一個 for 循環來說,不可能說當前循環還沒執行完畢就去執行下一層循環。所以無論是協程還是普通的函數,都要經歷三輪循環,所以它們的耗時是基本一致的。

如果想解決這個問題,那么就要使用 asyncio.gather(*協程列表),如果是 await asyncio.gather(*[耗時協程1, 耗時協程2, 耗時協程3]),那么時間相比普通函數來說,就會有明顯的縮短。因為此時這三個協程是同時發出的。

我們上面使用 asyncio.gather 的目的就在於此,但是問題是我們為什么要創建多個連接、為什么要把 20 個任務分成 5 份呢。

首先對於數據庫來講,一個連接只能同時執行一個 SQL,如果你使用多線程、但是每個線程用的卻是同一個連接的話,你會發現耗時和原來基本沒區別。雖然線程阻塞會自動切換,但是你使用的連接已經被別人用了,所以請求同時只能發一個。如果是 asyncpg 的話,一個連接同時執行多了 SQL,那么會直接報錯。

import asyncio
import asyncpg


async def main():
    async with asyncpg.create_pool("postgres://postgres:zgghyys123@localhost:5432/postgres") as pool:
        async with pool.acquire() as conn:
            # 將20個請求同時發出,理論上可以,但是對於數據庫來說是不同意的
            # 因為我們這里的conn都是同一個連接
            query_list = [conn.fetch('select * from interface where "ProwlerPersonID" = $1',
                                     "c9fcbed8-fa47-481a-9d73-5fd1dd344f19")
                          for _ in range(20)]
            await asyncio.gather(*query_list)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
"""
  File "asyncpg\protocol\protocol.pyx", line 301, in query
  File "asyncpg\protocol\protocol.pyx", line 664, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
"""

閱讀報錯信息,提示我們:無法執行操作,另一個操作已經在執行了。

說明我們的20個請求都是同一個連接發出的,第一個連接在執行的時候,第二個連接理論上應該處於阻塞狀態的,但是這里直接報錯了。但不管怎么樣,都不是我們想要的。

所以我們只能使用上面說的那樣,使用 for 循環里面寫 await 協程 的方式,但是這樣和同步又沒什么區別。因此我們創建 5 個連接,對 5 個連接使用 asyncio.gather,也就是讓這五個連接同時執行。盡管每個連接內部執行的邏輯是同步的,但是這 5 個連接整體是異步的,因為它們彼此沒有關聯,是不同的連接,因此異步耗時為同步的五分之一。

異步操作 Redis

最后來看看如何異步操作 Redis,異步操作 Redis 我們需要使用 aioredis 這個第三方庫。安裝同樣簡單,直接 pip install aioredis 即可。

import asyncio
import aioredis

async def main():
    conn = await aioredis.create_connection("redis://:passwd@localhost:6379")
    # 執行的話, 執行通過 await conn.execute 即可, 就像在命令行里執行 Redis 命令一樣
    data = await conn.execute("set", "name", "kugura_nana", "ex", 10)
    
    # 關閉連接
    await conn.close()
    # 判斷連接是否關閉
    print(conn.closed)  # True
    
asyncio.run(main())

更詳細的用法可以參考官網,Redis 的內容還是比較多的,但都比較簡單。

此外,如果你編寫的服務是一個同步的服務,那么至少在 Redis 方面其實沒太大必要換成異步庫,因為 Redis 本身已經足夠快了。

總結

高並發在現如今是一個主流,任何服務都需要考慮到並發量的問題,如果能夠很輕松地支持並發的話,那么肯定是非常受歡迎的。而 Go 語言之所以這么火,很大一部分原因就是它在語言層面就支持並發,當然其它語言也不甘落后。於是 Python 在 3.5 的時候也引入了原生協程,而緊接着圍繞着相關生態的異步庫自然隨之誕生。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM