一、概述
aiomysql是一個從asyncio(PEP-3156/tulip)框架訪問MySQL數據庫的庫。它依賴並重用PyMySQL的大部分部分。aiomysql試圖成為一個很棒的aiopg庫,並保留相同的api、外觀和感覺。
在內部aimysql是PyMySQL的副本,底層io調用切換到async,基本上是等待並在適當的位置添加async def coroutine。從aiopg移植的sqlalchemy支持。
安裝模塊
pip3 install aiomysql
簡單示例
import asyncio import aiomysql loop = asyncio.get_event_loop() async def test_example(): conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='', db='mysql', loop=loop) cur = await conn.cursor() await cur.execute("SELECT Host,User FROM user") print(cur.description) r = await cur.fetchall() print(r) await cur.close() conn.close() loop.run_until_complete(test_example())
二、demo演示
環境說明
操作系統:centos 7.6
mysql版本:5.7
數據庫名:test
數據庫默認編碼:utf8mb4
具體表結構以及數據,請參考鏈接:
https://www.cnblogs.com/xiao987334176/p/12721498.html
這里面有2個表
單次執行
執行select和update

#!/usr/bin/env python3 # coding: utf-8 """ mysql 異步版本 """ import traceback import logging import aiomysql import asyncio import time logobj = logging.getLogger('mysql') class Pmysql: def __init__(self): self.coon = None self.pool = None async def initpool(self): try: logobj.debug("will connect mysql~") __pool = await aiomysql.create_pool( minsize=5, # 連接池最小值 maxsize=10, # 連接池最大值 host='192.168.31.230', port=3306, user='root', password='abcd1234', db='test', autocommit=True, # 自動提交模式 ) return __pool except: logobj.error('connect error.', exc_info=True) async def getCurosr(self): conn = await self.pool.acquire() # 返回字典格式 cur = await conn.cursor(aiomysql.DictCursor) return conn, cur async def query(self, query, param=None): """ 查詢操作 :param query: sql語句 :param param: 參數 :return: """ conn, cur = await self.getCurosr() try: await cur.execute(query, param) return await cur.fetchall() except: logobj.error(traceback.format_exc()) finally: if cur: await cur.close() # 釋放掉conn,將連接放回到連接池中 await self.pool.release(conn) async def execute(self, query, param=None): """ 增刪改 操作 :param query: sql語句 :param param: 參數 :return: """ conn, cur = await self.getCurosr() try: await cur.execute(query, param) if cur.rowcount == 0: return False else: return True except: logobj.error(traceback.format_exc()) finally: if cur: await cur.close() # 釋放掉conn,將連接放回到連接池中 await self.pool.release(conn) async def getAmysqlobj(): mysqlobj = Pmysql() pool = await mysqlobj.initpool() mysqlobj.pool = pool return mysqlobj async def test_select(): mysqlobj = await getAmysqlobj() # UPDATE `youku`.`person` SET `psName` = '張三豐' WHERE (`id` = '3'); exeRtn = await mysqlobj.query("select * from users") # print("查詢結果",exeRtn) return exeRtn async def test_update(): mysqlobj = await getAmysqlobj() # UPDATE `youku`.`person` SET `psName` = '張三豐' WHERE (`id` = '3'); exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'") # print("exeRtn", exeRtn, type(exeRtn)) if exeRtn: # print('操作成功') return '操作成功' else: # print('操作失敗') return '操作失敗' async def main(): # 調用方 tasks = [test_select(), test_update()] # 把所有任務添加到task中 done, pending = await asyncio.wait(tasks) # 子生成器 for r in done: # done和pending都是一個任務,所以返回結果需要逐個調用result() # print('協程無序返回值:'+r.result()) print(r.result()) if __name__ == '__main__': start = time.time() loop = asyncio.get_event_loop() # 創建一個事件循環對象loop try: loop.run_until_complete(main()) # 完成事件循環,直到最后一個任務結束 finally: loop.close() # 結束事件循環 print('所有IO任務總耗時%.5f秒' % float(time.time() - start))
執行輸出:
操作成功 [{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '123@qq.com', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}] 所有IO任務總耗時0.03948秒
批量插入
批量插入使用executemany
插入3萬條數據

#!/usr/bin/env python3 # coding: utf-8 import time import asyncio import aiomysql start = time.time() loop = asyncio.get_event_loop() async def test_example(): conn = await aiomysql.connect(host='192.168.31.230', port=3306, user='root', password='abcd1234', db='test', loop=loop) # create default cursor cursor = await conn.cursor() # execute sql query data = [] for i in range(1,30000): data.append(('xiao%s'%i, '123', '12345678910', '123@qq.com', '2020-04-10 01:22:07'),) stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);" await cursor.executemany(stmt, data) await conn.commit() # detach cursor from connection await cursor.close() # close connection conn.close() loop.run_until_complete(test_example()) print('所有IO任務總耗時%.5f秒' % float(time.time() - start))
執行輸出:
所有IO任務總耗時11.96885秒
本文參考鏈接:
https://www.cnblogs.com/ygy1997/p/11753335.html