import aiomysql import asyncio
# config配置文件
# mysql
pool: aiomysql.Pool
MYSQL_HOST = "81.71.137.167"
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_DB = 'mall_demo02'
MYSQL_PASSWD = 'LgDk%zMG0x!lfc@C'
MYSQL_CONNECTION_MAXSIZE = 2
MYSQL_POOL_RECYCLE = 60
''' 异步连接池 ''' async def get_mysql_pool(): return await aiomysql.create_pool(host=config.MYSQL_HOST, port=config.MYSQL_PORT, user=config.MYSQL_USER, password=config.MYSQL_PASSWD, db=config.MYSQL_DB, loop=asyncio.get_event_loop(), autocommit=False, maxsize=config.MYSQL_CONNECTION_MAXSIZE, pool_recycle=config.MYSQL_POOL_RECYCLE) task = [ asyncio.ensure_future(get_mysql_pool()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task)) pool = [t.result() for t in task] config.pool = pool[0]
# 使用连接池来操作mysql
async def execute(sql: str, args: Union[tuple, list] = None) -> (int, list):
conn: aiomysql.Connection
cursor: aiomysql.DictCursor
rows: int
res: list
async with config.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
try:
rows = await cursor.execute(sql, args)
res = await cursor.fetchall()
return rows, res
except Exception as e:
await conn.ping()
rows = await cursor.execute(sql, args)
res = await cursor.fetchall()
return rows, res
async def execute_with_commit(sql: str, args: Union[tuple, list] = None) -> int:
conn: aiomysql.Connection
cursor: aiomysql.Cursor
rows: int
print(sql)
async with config.pool.acquire() as conn:
async with conn.cursor(aiomysql.Cursor) as cursor:
try:
rows = await cursor.execute(sql, args)
await conn.commit()
return rows
except Exception as e:
await conn.ping()
await cursor.execute(sql, args)
await conn.commit()
return conn.affected_rows()