在上一篇博文介紹了MySQL數據庫取得pymysql的使用,參考:https://www.cnblogs.com/minseo/p/15597428.html
本文介紹異步MySQL異步驅動aiomysql的使用
1,安裝異步模塊
如果沒有模塊則先使用pip安裝模塊
pip3 install asyncio pip3 install aiomysql
2,創建MySQL數據庫連接池
和同步方式不一樣的是使用異步不能直接創建數據庫連接conn,需要先創建一個數據庫連接池對象__pool通過這個數據庫連接池對象來創建數據庫連接
數據庫配置信息和介紹pymysql同步使用的數據庫是一樣的
import asyncio,aiomysql,time # 數據庫配置dict db_config = { 'host': 'localhost', 'user': 'www-data', 'password': 'www-data', 'db': 'awesome' } # 創建數據庫連接池協程函數 async def create_pool(**kw): global __pool __pool = await aiomysql.create_pool( host=kw.get('host', 'localhost'), port=kw.get('port', 3306), user=kw['user'], password=kw['password'], db=kw['db'] ) loop=asyncio.get_event_loop() loop.run_until_complete(create_pool(**db_config)) # 在事件循環中運行了協程函數則生成了全局變量__pool是一個連接池對象 <aiomysql.pool.Pool object at 0x00000244AD1724C8> print(__pool) # <aiomysql.pool.Pool object at 0x00000244AD1724C8>
3,創建執行sql語句的協程函數
因為是異步模塊,只能在事件循環中通過await關鍵字調用,使用需要創建執行sql語句的協程函數
在協程函數內使用全局上一步創建的連接池對象來創建連接conn和浮標對象cur,通過浮標對象來執行sql語句,執行方法和pymysql模塊的執行方法是一樣的
cursor.execute(sql,args) sql # 需要執行的sql語句例如'select * from table_name' args # 替換sql語句的格式化字符串,即sql語句可以使用%s代表一個字符串,然后在args中使用對應的變量或參數替換,args為一個list或元組,即是一個有序的序列需要和sql中的%s一一對應 # 例如sql='select * from table_name where id=%s' args=['12345'] # 相當於使用args中的參數替換sql中的%s # select * from table_name where id='12345'
下面分別創建兩個協程函數select execute一個用來執行搜索操作,一個用來執行insert,update,delete等修改操作
# 執行select函數 async def select(sql,args,size=None): with await __pool as conn: cur = await conn.cursor(aiomysql.DictCursor) await cur.execute(sql.replace('?','?s'),args or ()) if size: rs = await cur.fetchmany(size) else: rs = await cur.fetchall() await cur.close() return rs # 執行insert update delete函數 async def execute(sql,args): with await __pool as conn: try: cur = await conn.cursor() await cur.execute(sql.replace('?','%s'),args) affetced = cur.rowcount await conn.commit() await cur.close() except BaseException as e: raise return affetced
4,實踐執行sql語句
實踐執行sql語句前我們首先在本機創建一個數據庫和對應的表用於測試
數據庫對應的主機,用戶名,密碼,庫名,表名如下
host: localhost user: www-data password: www-data db:awesome table_name: users
創建表名的sql語句如下,需要在數據庫中創建好對應的表
CREATE TABLE `users` ( `id` varchar(50) NOT NULL, `email` varchar(50) NOT NULL, `passwd` varchar(50) NOT NULL, `admin` tinyint(1) NOT NULL, `name` varchar(50) NOT NULL, `image` varchar(500) NOT NULL, `created_at` double NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `idx_email` (`email`), KEY `idx_created_at` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
創建好的表對應的結構如下
mysql> desc users; +------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +------------+--------------+------+-----+---------+-------+ | id | varchar(50) | NO | PRI | NULL | | | email | varchar(50) | NO | UNI | NULL | | | passwd | varchar(50) | NO | | NULL | | | admin | tinyint(1) | NO | | NULL | | | name | varchar(50) | NO | | NULL | | | image | varchar(500) | NO | | NULL | | | created_at | double | NO | MUL | NULL | | +------------+--------------+------+-----+---------+-------+ 7 rows in set (2.68 sec)
①執行insert操作
# insert start import time sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111'] async def insert(): await execute(sql,args) loop.run_until_complete(insert()) # insert end
執行方式和pymysql沒有區別,不同的是需要在事件循環中使用關鍵字await調用
執行完畢在MySQL中查看插入的數據
mysql> select * from users; +--------+-------------+----------+-------+------+-------------+------------------+ | id | email | passwd | admin | name | image | created_at | +--------+-------------+----------+-------+------+-------------+------------------+ | 111111 | test@qq.com | password | 1 | test | about:blank | 1637738541.48629 | +--------+-------------+----------+-------+------+-------------+------------------+ 1 row in set (0.00 sec)
②執行update操作
直接在loop事件循環中執行execute協程函數也可以
# update start import time sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] loop.run_until_complete(execute(sql,args)) # update end
執行以后把email和name都修改了
③執行delete操作
# delete start sql = 'delete from `users` where `id`=?' args = ['111111'] loop.run_until_complete(execute(sql,args)) # delete end
同樣根據關鍵字id指定的值刪除了這條數據
④執行selete操作
在執行select操作前我們保證數據庫里面至少有一條數據
# select start sql = 'select * from users' args = [] rs = loop.run_until_complete(select(sql,args)) print(rs) # select end
這里直接執行搜索的協程函數select根據函數的定義返回的是所有結果的list,元素是查詢結果的字典
輸出為
[{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637739212.74493}]
如果結果有多個則使用list的下標取出
補充
同步模塊pymysql和異步模塊aiomysql執行速度對比
假如我們需要往數據庫插入20000條數據,我們分別使用同步模式和異步模式
首先刪除數據庫所有測試數據
delete from users;
同步的代碼
d:/learn-python3/學習腳本/pymysql/use_pymysql.py
import pymysql db_config = { 'host': 'localhost', 'user': 'www-data', 'password': 'www-data', 'db': 'awesome' } # 創建連接,相當於把字典內的鍵值對傳遞 # 相當於執行pymysql.connect(host='localhost',user='www-data',password='www-data',db='awesome') conn = pymysql.connect(**db_config) # 創建游標 cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from users' args = [] # 執行查詢返回結果數量 # 執行查詢 rs=cursor.execute(sql,args) # 獲取查詢結果 # 獲取查詢的第一條結果,返回一個dict,dict元素是查詢對應的鍵值對 # 如果查詢結果有多條則執行一次,游標移動到下一條數據,在執行一次又返回一條數據 # print(cursor.fetchone()) # print(cursor.fetchone()) # print(cursor.fetchall()) # print(cursor.fetchmany()) # {'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734} # 獲取查詢的所有結果,返回一個list,list元素是dict,dict元素是查詢對應的鍵值對 # print(cursor.fetchall()) # [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}] # 獲取查詢的前幾條結果,返回一個dict,dict元素是查詢對應的鍵值對 # print(cursor.fetchmany(1)) # [{'id': '111111', 'email': 'test@qq.com', 'passwd': 'password', 'admin': 1, 'name': 'test', 'image': 'about:blank', 'created_at': 1637723578.5734}] # 執行修改操作 import time # # insert start sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' args = ['test1@qq.com','password',1,'test','about:blank',time.time(),'1111121'] # 使用replace 把'?'替換成'%s' # rs = cursor.execute(sql.replace('?','%s'),args) # print(cursor.rowcount) # conn.commit() # print(rs) # insert end # update start # sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' # args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # print(cursor.execute(sql.replace('?','%s'),args)) # conn.commit() # update end # delete start # sql = 'delete from `users` where `id`=?' # args = ['111111'] # print(cursor.execute(sql.replace('?','%s'),args)) # conn.commit() # delete end # 寫成函數調用,函數內部使用了數據庫連接對象conn # 可以先定義成全局變量global def select(sql,args,size=None): cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute(sql.replace('?','%s'),args or ()) if size: rs = cursor.fetchmany(size) else: rs = cursor.fetchall() cursor.close # logging.info('rows returned: %s' % len(rs)) return rs def execute(sql,args): cursor = conn.cursor(pymysql.cursors.DictCursor) try: cursor.execute(sql.replace('?','%s'),args) # rowcount方法把影響函數返回 rs = cursor.rowcount cursor.close() conn.commit() except: raise return rs start_time = time.time() for n in range(20000): sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' email = 'test%s@qq.com' %n args = [email,'password',1,'test','about:blank',time.time(),n] execute(sql,args) end_time = time.time() # 打印開始和結束時間的差 print(end_time - start_time)
我們使用一個循環20000次往數據庫插入數據
執行,插入數據比較多需要等待一段時間輸出
D:\learn-python3\函數式編程>C:/Python37/python.exe d:/learn-python3/學習腳本/pymysql/use_pymysql.py 77.46903562545776
可以在數據庫查詢到這20000條數據,而且這個表的字段created_at存儲了創建這條數據的時間戳,我們可以看到,id越往后的時間戳越往后,說明數據是同步按順序一一插入的
我們按照字段created_at排序查詢
下面我們刪除所有數據使用異步方式插入
異步的代碼如下
d:/learn-python3/學習腳本/aiomysql/use_aiomysql.py
import asyncio,aiomysql,time # 數據庫配置dict db_config = { 'host': 'localhost', 'user': 'www-data', 'password': 'www-data', 'db': 'awesome' } # 創建數據庫連接池協程函數 async def create_pool(**kw): global __pool __pool = await aiomysql.create_pool( host=kw.get('host', 'localhost'), port=kw.get('port', 3306), user=kw['user'], password=kw['password'], db=kw['db'] ) loop=asyncio.get_event_loop() loop.run_until_complete(create_pool(**db_config)) # 在事件循環中運行了協程函數則生成了全局變量__pool是一個連接池對象 <aiomysql.pool.Pool object at 0x00000244AD1724C8> print(__pool) # <aiomysql.pool.Pool object at 0x00000244AD1724C8> # 執行select函數 async def select(sql,args,size=None): with await __pool as conn: cur = await conn.cursor(aiomysql.DictCursor) await cur.execute(sql.replace('?','?s'),args or ()) if size: rs = await cur.fetchmany(size) else: rs = await cur.fetchall() await cur.close() return rs # 執行insert update delete函數 async def execute(sql,args): with await __pool as conn: try: cur = await conn.cursor() await cur.execute(sql.replace('?','%s'),args) affetced = cur.rowcount await conn.commit() await cur.close() except BaseException as e: raise return affetced # insert start # import time # sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' # args = ['test@qq.com','password',1,'test','about:blank',time.time(),'111111'] # async def insert(): # await execute(sql,args) # loop.run_until_complete(insert()) # insert end # update start # import time # sql = 'update `users` set `email`=?, `passwd`=?, `admin`=?, `name`=?, `image`=?, `created_at`=? where `id`=?' # args = ['test2@qq.com','password',1,'test2','about:blank',time.time(),'111111'] # loop.run_until_complete(execute(sql,args)) # update end # delete start # sql = 'delete from `users` where `id`=?' # args = ['111111'] # loop.run_until_complete(execute(sql,args)) # delete end # select start # sql = 'select * from users' # args = [] # rs = loop.run_until_complete(select(sql,args)) # print(rs) # select end async def insert1(): for n in range(10000): sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' email = 'test%s@qq.com' %n args = [email,'password',1,'test','about:blank',time.time(),n] await execute(sql,args) async def insert2(): for n in range(10001,20001): sql = 'insert into `users` (`email`, `passwd`, `admin`, `name`, `image`, `created_at`, `id`) values (?, ?, ?, ?, ?, ?, ?)' email = 'test%s@qq.com' %n args = [email,'password',1,'test','about:blank',time.time(),n] await execute(sql,args) async def main(): # 需要組合成一個事件才會異步執行即在執行insert1的時候同步執行insert2 await asyncio.gather(insert1(),insert2()) start_time = time.time() loop.run_until_complete(main()) end_time = time.time() print(end_time - start_time)
這里我們定義了兩個協程函數,分別用來執行前10000個數據和后10000個數據的插入,在main()把這兩個協程函數組合成一個事件循環
等待一段時間后執行輸出如下,忽略這個warning,可以看到執行時間明顯比同步時間短
d:/learn-python3/學習腳本/aiomysql/use_aiomysql.py:42: DeprecationWarning: with await pool as conn deprecated, useasync with pool.acquire() as conn instead with await __pool as conn: 39.794615507125854
我們去數據庫查詢一下數據也可以看到id從0開始和id從10001開始幾乎是同時插入的